Kekeの日記

エンジニア、読書なんでも

Apache BeamのJavaによるPipelineの書き方

image.png

はじめに

本記事はApache Beamを使ってPipelineを構築します。

次回、自分の得意なGolangで書くエントリを執筆します。

今回は他のApache Stormなどのストリーム処理システムでは一般的なJavaを使おうと思います。

注意

Apache Beam Quickstartでは、なぜかPythonGolangはSDKをインストールしたり環境構築から始まるのに対して、Javaは文字数カウントサンプルをやらさせます。

1. 文字数カウントチュートリアル

1.1 exampleのインストール

Apache Mavenがインストールされていることを前提とします。

mvn archetype:generate \
      -DarchetypeGroupId=org.apache.beam \
      -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
      -DarchetypeVersion=2.6.0 \
      -DgroupId=org.example \
      -DartifactId=word-count-beam \
      -Dversion="0.1" \
      -Dpackage=org.apache.beam.examples \
      -DinteractiveMode=false

そしてディレクトリに移動します。

cd word-count-beam

そして、以下のようなディレクトリ構造になっています。

.
├── pom.xml
└── src

1.2 コードの内容

1.3 データのソース

src内のコードを見てみると、どうやらシェイクスピアの文献の文字数を数えているようです。

// This example reads a public data set consisting of the complete works of Shakespeare.
    p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))

文献自体はgs://とあることからGoogle Cloud Storageにあるようです。 Apache Beamが元はGoogleのものだったので納得ですね。

Google Cloud Storageの公開URLは以下のようになります。

https://storage.googleapis.com/{プロジェクト名}/{バケット名}/{...バケット内の構造(パス)}

プロジェクト名やバケット名は以下で指定できるみたいです。 コメントがありました。

//   dataflowOptions.setProject("SET_YOUR_PROJECT_ID_HERE");
//   dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY");

そして読み込みます。

p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))

これによって、シェイクスピアのファイルを一行ずつ読み込みます。 そして、戻り値にはPCollectionを返します。\

1.2.2 Trasform

以下のようなTransformがPipelineとして定義されてあります。

        .apply(
            FlatMapElements.into(TypeDescriptors.strings())
                .via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
        .apply(Filter.by((String word) -> !word.isEmpty()))
        .apply(Count.perElement())
        .apply(
            MapElements.into(TypeDescriptors.strings())
                .via(
                    (KV<String, Long> wordCount) ->
                        wordCount.getKey() + ": " + wordCount.getValue()))
        .apply(TextIO.write().to("wordcounts"));

何をしているかというと、

まず、FlatMapElements変換をして、PCollection<String>に変換されます。

そして、Filterによって空文字を除去している。

次にCountで文字数をカウントしています。

そして最後にMapElementsで出力するためにMap要素にしています。

1.3 ローカルで実行してみる

以下のコマンドを実行して、生Beamとして実行します。

mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
     -Dexec.args="--inputFile=pom.xml --output=counts" -Pdirect-runner

ファイルがいくつも生成されているので、確認します。

ls count*

counts-00000-of-00004
counts-00001-of-00004
counts-00002-of-00004
counts-00003-of-00004

2. Apache Beamのプログラミングモデル

まず、はじめにBeamの重要なクラスを解説します。

  • Pipeline: BeamのPipelineそのものである。
  • PCollection: 分散データセットを表現する。bounded(有限)でもunbounded(ストリーム)の状態でも良い。
  • PTransform: PCollectionを入力に、データのtransformを行うものである。

3. 実際にPipelineを作成する

3.1 Pipelineオブジェクトを作成する

以下のようにoptionsを作成して、Pipelineを作成します。

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);

3.2 PCollectionを作成する

インメモリでも、テキストでもどちらでも可能です。

PCollection<String> lines = p.apply(
      "ReadMyFile", TextIO.read().from("protocol://path/to/some/inputData.txt"));

or

static final List<String> LINES = Arrays.asList(
      "To be, or not to be: that is the question: ",
      "Whether 'tis nobler in the mind to suffer ",
      "The slings and arrows of outrageous fortune, ",
      "Or to take arms against a sea of troubles, ");

p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of())
}

3.3 Transformを作成する

Beamでは、いくつもの種類のTransformを作成できます。

以下のような構文で作ることができる。

[Output PCollection] = [Input PCollection].apply([Transform])

これを繰り替えることによって様々なTransformを定義できるのである。

image.png

また、同じPCollectionに対して処理をかけたいときは以下のように書きます。

[Output PCollection 1] = [Input PCollection].apply([Transform 1])
[Output PCollection 2] = [Input PCollection].apply([Transform 2])

image.png

また、コアtransformがサポートされている。

以下のようなコアの変換はサポートしている

  • ParDo: 一般的な並列処理である。HadoopのMapに当たる。
  • GroupByKey: key/valueのコレクションの処理をする。
  • CoGroupKey:
  • Combine: データをまとめる。
  • Flatten: 一般的なflattenをする。
  • Partition: 小さなコレクションに分割する。
  • SimpleFunction: Mapのようなものである。applyを@Overrideをしなければならず、MapElements.via(simpleFunction)をして一つずつ実行される。
  • DoFn: ParDoで実行される。ストリームを読み込んだりするのに使われる。

Beamで重要なことは、DoFnをたくさんextendして定義していくことになることになる。

3.3.1 ParDo

まず、クラスを定義する。ここで<String>は入力で、Integerは出力である。

static class ComputeWordLengthFn extends DoFn<String, Integer> {
  @ProcessElement
  public void processElement(@Element String word, OutputReceiver<Integer> out) {
    // Use OutputReceiver.output to emit the output element.
    out.output(word.length());
  }
}

その中では、@ProcessElementで変換のメソッドを定義していくようになり、@ Elementで入力を定義する。

また、出力はOutputReceiverで渡す。

実際にapplyしてみる。

PCollection<Integer> wordLengths = words.apply(
    ParDo.of(new ComputeWordLengthFn()));

戻り値にはPCollectionが返ってくる。

3.3.2 GroupByKey

以下のようなコレクションがあるとする。

cat, 1
dog, 5
and, 1
jump, 3
tree, 2
cat, 5
dog, 2
and, 2
cat, 9
and, 6
...

そのときに出力は以下のようになる。

cat, [1,5,9]
dog, [5,2]
and, [1,2,6]
jump, [3]
tree, [2]
...

3.3.3 CoGroupBy

ここではユースケースとして、

  • 同じユーザー名などのキーで、いくつかvalueを持つようなものである。

たとえば結果として以下のようなものである。

final List<String> formattedResults =
    Arrays.asList(
        "amy; ['amy@example.com']; ['111-222-3333', '333-444-5555']",
        "carl; ['carl@email.com', 'carl@example.com']; ['444-555-6666']",
        "james; []; ['222-333-4444']",
        "julia; ['julia@example.com']; []");

3.3.4 Combine

コレクションを結合させるために使われる。

簡単な関数は以下のように定義できる。

public static class SumInts implements SerializableFunction<Iterable<Integer>, Integer> {
  @Override
  public Integer apply(Iterable<Integer> input) {
    int sum = 0;
    for (int item : input) {
      sum += item;
    }
    return sum;
  }
}

3.3.5 Flatten

同じデータタイプを持つような複数のPCollectionを一つのPCollectionにマージする。

概念的には、以下のようなものである。

PCollection<String> pc1 = ...;
PCollection<String> pc2 = ...;
PCollection<String> pc3 = ...;
PCollectionList<String> collections = PCollectionList.of(pc1).and(pc2).and(pc3);

PCollection<String> merged = collections.apply(Flatten.<String>pCollections());

3.3.6 Partition

これはFlattenと正反対の処理になる。

概念的な処理は以下のようになる。

PCollection<Student> students = ...;
// Split students up into 10 partitions, by percentile:
PCollectionList<Student> studentsByPercentile =
    students.apply(Partition.of(10, new PartitionFn<Student>() {
        public int partitionFor(Student student, int numPartitions) {
            return student.getPercentile()  // 0..99
                 * numPartitions / 100;
        }}));

// You can extract each partition from the PCollectionList using the get method, as follows:
PCollection<Student> fortiethPercentile = studentsByPercentile.get(4);

3.3.7 ユーザー定義関数のBeam変換の制約

一般的には、以下の制限がある。

  • データはシリアライズ可能である。
  • データは並列処理可能である。

4. Built-in Trasfrom

今までは、プレーンテキストを読み込んでいました。

しかし、Apache Kafkaからのストリームだったり、Apache Avroでシリアライズされていたり、いろんなソースが想定できます。

Javaでは以下のようなソースに対応しています。

f:id:bobchan1915:20180824161506p:plain

まとめ

  • Apache Beamは流行っているし、かなりドキュメントも質がいい
  • PipelineとしてのアプリケーションクラスにTransformもexec関数も実装すると責任を明確にできる
  • Google発ってことで安心できる。
  • Apache Flinkに対応できたり、テストが簡単だったり、運用に使えそう