ブログ・ア・ラ・クレーム

技術的なメモとかライフログとか。

Scala + Scio で Apache Beam あるいは Google Cloud Dataflow に入門する

Apache Beam データに対する ETL 処理を、様々なランタイムで同じコードで実行できるようにするものです。 これは Google Cloud Dataflow のモデルを元に OSS 化されたもので、バッチ処理とストリーム処理を(ほぼ)同じコードで実装できたり、 Hadoop や Spark 、 Dataflow など複数の実行環境に対応していたりします。 標準では JavaPython 向けの SDK が用意されていますが、前者はシンタックスが独特であり後者はまだ機能が乏しく、発展途上感が否めない部分があります。

本記事では Java SDK をラップしつつ機能追加をされた、 Spotify 製ライブラリ Scio を使って Apache Beam による ETL 処理の実装に入門してみようと思います。

そもそも標準の SDK はどんな感じ?

公式ページのドキュメント がなかなか分かりやすくまとまっております。 元々 GCP のサービスから端を欲しているお陰か、 GCP の他サービス、 GCS や BigQuery などとの連携が非常に容易です。 特に GCS に関しては呼び出すメソッド名は同じまま、パスの表現でローカルファイルを参照するか GCS を参照するかなど切り替えてくれるのが、なかなか抽象化されているように思えて好ましいです。 しかしながら変換処理になると、 PCollectionDoFnapply() するのをメソッドチェーンで繋いで・・・などなどちょっと独特のコードを書く必要があります。

この部分は Python SDK になると抽象度があがり、 MapFlatMapFilter などよくあるコレクション操作の関数が揃っていて便利です。 しかしながら Python SDK は機能がまだ不足しがちで、 Join 系のちょっとしたメソッドがなかったりストリーム処理に対応していないらしかったりするようです。 また、 Apache Beam を利用したコードも実行に時間がかかるため、型チェックで弾ける問題などは弾いて欲しいという欲求も湧き上がります。

そこで Scio

Scio は Java SDK の PCollection などをラップしてかつ抽象的なコレクション操作関数などを追加したライブラリです。 更に Type safe BigQuery I/O という機構や Cassandra, Elasticsearch などへの I/O などのアドオンも存在します。 詳しい情報については GitHub の Wiki が結構充実しています。

Scio 使用例

今回は例として Scala + Scio で BigQuery のレコードを読んでちょっとした処理をし、 BigQuery とついでに GCS に出力することを試してみます。 今回の例のために実装したコードなどは こちら

BigQuery への入出力に関しては Scio の Type safe BigQuery を使ってみます。 これらは @BigQueryType マクロでクエリやスキーマの定義をしておき、そのクラスのオブジェクトに対して変換処理をする形で実装します。 ちなみにこのマクロの実現のため Macro paradise を使っています。 マクロを使う場合は適宜 Macro paradise 利用のための設定を記載しておきましょう。

今回、マクロとクラスの定義については今回は以下のようにしてみました。 入力データは適当なパブリックデータを、 WHERE 句にはレコード数があまり多くなり過ぎないように適度に絞り込む形にしています。

...
  @BigQueryType.fromQuery(
    """
      |SELECT
      |  name, number
      |FROM
      |  [bigquery-public-data:usa_names.usa_1910_current]
      |WHERE
      |  year = 2015 AND gender = 'M'
    """.stripMargin)
  class USANames

  @BigQueryType.toTable
  case class InitialCount(initial: String, number: Long)
...

そして肝心の変換処理は以下のようになりました。 今回、人物の名前のイニシャルを取り出してその数をカウントするという処理を実装してみました。 Java SDK の時の apply() チェーンみたいなものはなく、非常にシンプルなコレクション操作風に実装できています。

    val transformed = extracted
      .flatMap(_.name match {
        case Some(n: String) => if (n.length >= 1) Seq(n.slice(0, 1)) else Nil
        case _ => Nil
      })
      .countByValue
      .map(kv => InitialCount(kv._1, kv._2))

ビルドツールに sbt を使っている場合は、以下のように runMain で実行できます。 (この時 --runner に DataflowRunner を指定する場合、 GCPAPI が有効になってなければ有効にせよなど色々例外が上がるかもです)

$ GOOGLE_APPLICATION_CREDENTIALS=/path/to/json_key sbt "runMain syucream.BqSample --project=<project> --runner=<runner>"

sbt を使って実行するのもだるいので、 sbt-pack を使って事前にビルドしたものを実行する例も示してみます。 ちなみに sbt-assembly は Scio 的には非推奨 なようです。。。

# build by using sbt-pack
$ GOOGLE_APPLICATION_CREDENTIALS=/path/to/json_key sbt pack

# run
$ GOOGLE_APPLICATION_CREDENTIALS=/path/to/json_key ./target/pack/bin/bq-sample --project=<project> --runner=<runner>"

runner に Dataflow を指定すると、 GCP の Dataflow の画面上でパイプラインを見ることができます。 今回の例の場合ですと処理のフローとしては BigQuery の入力が 1 つ、いくつかの変換処理を経て GCS と BigQuery の 2 出力を持つ、という形になります。 これがそれらしく図示されていて、どうやら途中でワーカが追加されオートスケールされていることが分かります。

f:id:syu_cream:20171020005438p:plain

おわりに

今回触った範囲ですと、 Scala + Scio で Apache Beam 、 Dataflow がかなり楽に動かせるように見受けられます。 もうちょっと複雑なパイプラインを書くとどうなるかはわかりませんが、今後に期待を寄せたいところであります。