ブログのしゅーくりーむ

技術的なメモとかライフログとか。

BigQuery SQL UDF の挙動を色々確認する

BigQuery ではユーザ定義関数(UDF) を作ることができる。 これを使って、よく使われる式や関数呼び出しの組み合わせを名前付けして再利用できる。

cloud.google.com

UDF は SQLJavaScript の二種類の言語による記述が可能で、後者は色々なトリッキーな利用例も世に出ているので知っている人も多いかも知れない。 例えば WebAssembly のコードを実行するとかである。

medium.com

本記事では 地味な方 素直な SQL での記述による UDF について色々挙動を確認してみる。

戻り値の型を推論してくれる

UDF は例えばシンプルなものだと以下のように記述できる。

CREATE TEMP FUNCTION
  zero()
  RETURNS INT64 AS (0);
SELECT
  zero()

ここで戻り値の型は省略することができる(勝手に解決してくれる)

CREATE TEMP FUNCTION
  zero()
  AS (0);
SELECT
  zero()

STRUCT 型の戻り値も推論してくれる。が、もちろん各フィールド名の情報はないのでダミーの値が補完されてしまう。

CREATE TEMP FUNCTION
  user()
  AS ((1, "taro"));
SELECT
  user()
[
  {
    "f0_": {
      "_field_1": "1",
      "_field_2": "taro"
    }
  }
]

RETURNS で型を明示するとそこに記述されたフィールド名を拾ってくれる。

CREATE TEMP FUNCTION
  user()
  RETURNS STRUCT<id INT64, name STRING>
  AS ((1, "taro"));
SELECT
  user()
[
  {
    "f0_": {
      "id": "1",
      "name": "taro"
    }
  }
]

UDF から UDF を呼ぶ際はすでに定義済みである必要がある

UDF から別の UDF を呼び出すこともできる。

CREATE TEMP FUNCTION
  callee() AS (42);
CREATE TEMP FUNCTION
  caller() AS (callee());
SELECT
  caller()

この定義順序を逆にするとクエリの実行が通らなくなる。 Function not found: callee; failed to parse CREATE [TEMP] FUNCTION statement at [5:16] というエラーが出てしまう。

CREATE TEMP FUNCTION
  caller() AS (callee());  -- <- Function not found :(
CREATE TEMP FUNCTION
  callee() AS (42);
SELECT
  caller()

UDF の再帰呼び出しをすることもできない。この例では Function not found: fib; failed to parse CREATE [TEMP] FUNCTION statement at [10:7] と怒られてしまう。

CREATE TEMP FUNCTION
  fib(n INT64) AS (
  IF
    ( n > 2,
      fib(n - 1) + fib(n - 2),
      1 ) );
SELECT
  fib(2)

だったら一度ダミーの UDF を定義してみよう!ということで一時 UDF じゃなく永続 UDF を、一度ダミーのものを定義したあと REPLACE してみる。 このクエリはそれ自体は valid と言われ実行可能である。

-- dummy
CREATE OR REPLACE FUNCTION
  udfs.fib(n INT64) AS (1);
 
CREATE OR REPLACE FUNCTION
  udfs.fib(n INT64) AS (
  IF
    ( n > 2,
      udfs.fib(n - 1) + udfs.fib(n - 2),
      1 ) );
SELECT
  udfs.fib(2)

が、これは実行してみると Query error: Too many nested views/persistent user-defined functions or possible circular reference of views/persistent user-defined functions referenced in query. Only 16 levels of nested views/persistent user-defined functions are allowed. at [11:1] と怒られてしまう。 UDF では 16 チェーンまでしか UDF 呼び出しをできない制限がありそれに引っかかっているように見えるエラーメッセージである。この例では udfs.fib(2) を呼び出してもそもそも再帰呼び出しされない気がするが・・・。

cloud.google.com

頑張ればループ処理を書ける

UDF では前述の通り再帰呼び出しできず、ループするような制御構文もサポートされていない。 が、 ARRAY 型の値と ARRAY 型関連関数を使うとループ、リストの各要素に対する繰り返し処理的なことができる。 ミソとなるのがみんな大好き UNNEST() でテーブルを手にいれることができることと、 ARRAY() の引数はサブクエリを取れる辺りにある!

CREATE TEMP FUNCTION
  array_twice(n INT64) AS (ARRAY(
    SELECT
      AS STRUCT (
      SELECT
        ARRAY_AGG(vv)
      FROM (
        SELECT
          v * 2  -- <- like a map function
        FROM
          UNNEST(GENERATE_ARRAY(1, n)) AS v) AS vv)));
SELECT
  array_twice(100)

テーブルを参照することもできる

上記より SELECT 文が打てることを確認できたので FROM 句に実テーブルを渡してみる。 このクエリも合法で実行することができる!

CREATE TEMP FUNCTION
  do() AS (ARRAY(
    SELECT
      AS STRUCT (
      SELECT
        ARRAY_AGG(vv)
      FROM (
        SELECT
          LENGTH(repo_name)  -- <- like a map function
        FROM
          `bigquery-public-data.github_repos.licenses` -- <- refer to a table
        LIMIT
          10) AS vv)));
SELECT
  do()

ドキュメントにテーブル参照に関する記述があるので、この挙動は一応想定されたもののはず。(もしかしたら他のまっとうな参照方法があるのかも)

一意の UDF とテーブル参照を合わせたクエリあたりの最大数 - 1,000。完全な展開後に、UDF ごとに一意のテーブルと UDF を合わせて 1,000 個まで参照できます。

cloud.google.com

Apache Parquet の Logical Types に関するメモ

去年末になりますが、 embulk-output-s3_parquet という Embulk Plugin にて Logical Type をサポートするためのパッチを書き、マージしていただきました。

github.com

個人的な主な目的は timestamp 型をサポートすることでした。 Athena や BigQuery から参照するにあたり timestamp 型として扱えるか否かで使い勝手は大きく変わります。 Logical Type としての扱いを期待するかを設定に書く必要はあるものの、割と簡素な設定で timestamp 型対応ができるようになったのではないかと思います。

さてこの Logical Type ですが、 2020 年 2 月現在 2 種類の仕様が混在してかつ古い仕様が事実上スタンダードになっている状態です。 自分は最初混乱したのですが、この不幸の連鎖を止めたいので、本記事に自分が把握した状態を記録しておきます。

古い仕様 "Converted Types"

ここまで一口に "Logical Types" といってしまったのですが、ドキュメント上は古い仕様は Converted Types, 新しい仕様が Logical Types と区別されます。 そして現在使われる Logical Types としてはこの Converted Types が一般的でしょう。

github.com

前提として、 Parquet がサポートする型 は以下のものになります。

  • BOOLEAN
  • INT32
  • INT64
  • INT96
  • FLOAT
  • DOUBLE
  • BYTE_ARRAY

Converted Types ではこれらの型に別解釈を与えることができます。 これは OriginalType に定義されています。 (このクラス名からか、 JIRA や GitHub 上のやり取りを見ると Converted Types のことを Original Types と呼ぶこともあるっぽい?ようです。 Java 実装に限った話かも)

  • MAP,
  • LIST,
  • UTF8,
  • MAP_KEY_VALUE,
  • ENUM,
  • DECIMAL,
  • DATE,
  • TIME_MILLIS,
  • TIME_MICROS,
  • TIMESTAMP_MILLIS,
  • TIMESTAMP_MICROS,
  • UINT_8,
  • UINT_16,
  • UINT_32,
  • UINT_64,
  • INT_8,
  • INT_16,
  • INT_32,
  • INT_64,
  • JSON,
  • BSON,
  • INTERVAL

embulk-output-s3_parquet でサポートする timestamp 型はここに現れる TIMESTAMP_MILLIS, TIMESTAMP_MICROS になります! また Parquet では文字列型がこの Converted TypesUTF8 OriginalType を使って表現されます。 そのため、Parquet の Converted Types は例えば Avro における Logical Types よりは多くのユーザが触れる機会がある概念かも知れません。

新しい仕様 "Logical Types"

新しい Logical Types は UUID やナノ秒精度 timestamp などさらにリッチな型をサポートしていたり、 Builder パターンでのスキーマ構築をサポートしていたりと様々な更新を含みます。 Converted Types との互換性も考慮されており、互換がある表現の場合は変換メソッドも提供されます。

これは Parquet 1.11.0 からサポートされているのですが、 maven central repository に上がったのが 2019 年 11 月でありまだ歴史が浅い機能といえます。 ミドルウェアの対応としても、 Hive でサポート開始がされ始めた ようですが Spark はまだ だったり普及するのにはまだ時間を要しそうです。 BigQuery などクラウドサービスにおいても、スケジュールは定かではないですが、サポートするにしてももうしばらく時間が必要かも知れません。

そんな訳で 2020 年 2 月現在、まだ多くの場合は Converted Types を利用することになると思うのですが、将来的に Logical Types に置き換えることや変換メソッドがあることを知っておくと良いでしょう。 またドキュメントは新しい Logical Types に関する記述の方が増えていくと思われ、現行2つの仕様がありドキュメントの記述と自分が必要としてい仕様がどちらのものか意識することがしばらく必要かも知れません。

Schema Registry について書いていく その1: Confluent Schema Registry

分散アプリケーション間のメッセージングやログ収集基盤において、しばしばスキーマの扱いは便利である反面頭を悩ませる種になります。

スキーマを厳密に定義して、 Protocol Buffers や Avro などのシリアライゼーションフォーマットを用いることで、メッセージのサイズの減少やメッセージの扱いの安全性を実行時より早く検出できる機会が増えます。 しかしスキーマの互換性に気をつけたり、メッセージをシリアライズ・デシリアライズするアプリケーション間でスキーマ情報をどう共有するかの課題も発生します。 この課題は、昨年発売された "データ指向アプリケーションデザイン" でも "4.1.4.3 そもそもライターのスキーマとは何なのか?" などで触れられています。

世の中にはこの課題を解決するための "スキーマレジストリ" と呼ばれるソフトウェア、サービスがいくつか存在します。 残念ながら日本語の資料があまり無いようにも感じますし、筆者の気が済むか飽きるまで様々なスキーマレジストリを探索してみようかと思います。 第一回は Confluent Schema Registry について調べてみます。 (残念ながら筆者は Confluent Schema Registry を実務で触れた経験がなく、なにか内容に誤りがある可能性があります)

Confluent Schema Registry とは

Confluent Schema Registry は、マネージドな Apache Kafka を提供する Confluent 社 が開発したスキーマレジストリです。

docs.confluent.io

以下のような機能を持っています。

Apache Avro については以前本ブログで触れましたし世の中にドキュメントが多数存在するのでここでは別段触れません。

スキーマは Subject という識別に用いる名前とバージョン番号で管理されます。 Subject をどう決めるかはいくつかのオプションがあり、デフォルトでは topic 名になっています。 topic 名を用いる場合はある topic に流れるレコードのスキーマは一つに限定され、複数の異なるスキーマのレコードを流すことは出来ません。 これを行いたい場合は他にレコード名もしくは topic 名とレコード名の組み合わせを Subject として用いることになります。

スキーマ情報の格納先がそれもまた Kafka であるのはちょっとユニークな点と言えそうです。 デフォルトではシングルパーティション_schemas topic にスキーマ情報を格納するようです。

docs.confluent.io

スキーマレジストリという機構の悩ましい点として、それ自体がメッセージバスと同等くらいの可用性を求められることが挙げられます。 メッセージバスが生きていてもスキーマレジストリが死んでいるなら、シリアライズする時に用いたスキーマを何処にも格納できず、デシリアライズする側のシステムに伝える術も失うためです。 Confluent Schema Registry ではスキーマレジストリREST API を提供する HTTP サーバの可用性の懸念は残りますが、ストレージは Kafka それ自体と運命共同体になるため考える課題が一つ減りそうです。

Confluent Schema Registry は更にスキーマの更新問題にも解決策をもらたします。 アプリケーションを稼働させていくうちにスキーマを更新したくなるシーンが登場するのは自然なことでしょう。 この際に Confluent Schema Registry は Kafka の Producer が作成した Avro のスキーマは解決してくれますが、 Consumer 側にも配慮する必要があります。 例えば Consumer が必要としているフィールドがある日 Producer が送信するレコードから無断で削除されたなら、 Consumer 側で何らかの問題が出るかもしれません。 この問題に配慮するため、 Confluent Schema Registry ではスキーマの互換性チェックの機構を設けています。 互換性には前方互換性、後方互換性、完全互換性の 3 タイプが設けられています。

docs.confluent.io

前方互換性は Producer が Consumer より先にスキーマを更新するパターンで求められる互換性です。 この場合フィールドの追加もしくは optional 、すなわちデフォルト値を持つフィールドの削除が許されています。 これらの変更が行われても Consumer は、自分が知らない追加フィールドは無視する、削除されたフィールドはデフォルト値で埋めることができます。

f:id:syu_cream:20200131004548p:plain

後方互換性は Consumer が Producer より先にスキーマを更新するパターンで求められる互換性です。 この場合は前述の逆で、 optional なフィールドの追加とフィールドの削除が許されています。 この互換性タイプが Confluent Schema Registry のデフォルトの互換性モードであり、現実世界で気を使われることが多いものであると思います。

f:id:syu_cream:20200131004611p:plain

完全互換性は Producer と Consumer どちらが先にスキーマを更新しても良いパターンで求められる互換性です。 この場合は制約が最も弱いようなスキーマ更新、つまり optional なフィールドの追加・削除が許されるようになります。 この互換性タイプは制約が弱いため扱いにも困るケースが出るかも知れませんが、 Confluent のプラットフォーム以外のミドルウェアやキャッシュ機構が存在したり処理するレコードの順序が保証されなかったりするケースに、互換性の問題で Consumer 側でエラーが発生するケースを低減できそうです。

f:id:syu_cream:20200131004629p:plain

Confluent Schema Registry はスキーマを更新しようとする際に、求められるタイプに従って互換性をチェックする機能を提供します。

REST API は実際に Confluent Schema Registry に対応した SerDe でスキーマ操作を行う際に使われる API にもなります。 手動で REST API を叩いてスキーマ登録を行うなどをしても良いでしょう。 この API の仕様は以下にまとまっています。

docs.confluent.io

管理用の WebUI については実際に触れてみた方が話が早そうです。次に触れてみましょう。

Confluent Schema Registry に実際に触れてみる

クイックスタート

公式で Docker イメージと docker-compose yml ファイルが提供されており、とりあえず動かす敷居が低めになっています。

docs.confluent.io

このドキュメントに沿って Docker コンテナを動かした後にブラウザ上で http://localhost:9021/ にアクセスしてみます。 上手く動いていれば以下のような管理画面を閲覧できるはずです。

f:id:syu_cream:20200129004253p:plain

Avro のスキーマを登録してみる

ここでは実験用に hello topic を作成して、この topic に流すレコードの valueスキーマを設定してみようと思います。

ここではバージョン 1 のスキーマとして以下を与えてみます。

{
  "name": "Hello",
  "type": "record",
  "fields": [
    {
      "name": "id",
      "type": "long"
    },
    {
      "name": "name",
      "type": "string"
    }
  ]
}

登録したスキーマを WebUI 上で確認できます。

f:id:syu_cream:20200131001440p:plain

一度スキーマを登録すると互換性モードを選べるようになります。 ここでは一つ前のバージョンのスキーマ後方互換性を保つモードである Backward に設定しておきます。

f:id:syu_cream:20200131001705p:plain

Avro のスキーマを更新してみる; 互換性がある場合

先程登録したスキーマを更新してみようと思います。 ここでは年齢を指定する想定で {"name": "age", "type": ["null", "long"], "default": null} という nullable なフィールドを追加してみます。

{
  "name": "Hello",
  "type": "record",
  "fields": [
    {
      "name": "id",
      "type": "long"
    },
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "age",
      "type": ["null", "long"],
      "default": null
    }
  ]
}

このスキーマ変更は optional なフィールド追加に相当して後方互換性チェックをパスすることができます。 無事 WebUI からも新しいバージョンのスキーマを閲覧することができています。 diff 表示も出来ていい感じです。

f:id:syu_cream:20200131002225p:plain

Avro のスキーマを更新してみる; 互換性がない場合

ここから後方互換性が無い変更をしてみます。 よく考えたらこのレコードをアプリケーション上で作成した時刻の timestamp 値を保持したくなってきました。 ということで optional でないフィールド {"name": "created_at", "type": "long", "logicalType": "timestamp-millis"} を追加してみます。

{
  "name": "Hello",
  "type": "record",
  "fields": [
    {
      "name": "id",
      "type": "long"
    },
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "age",
      "type": ["null", "long"],
      "default": null
    },
    {
      "name": "created_at",
      "type": "long",
      "logicalType": "timestamp-millis"
    }
  ]
}

しかしながら先述の通り後方互換性を保つには optional なフィールド追加は許されません。 この場合 Confluent Schema Registry が互換性が保てない旨のメッセージを出してスキーマの更新が拒否されます。

f:id:syu_cream:20200131002951p:plain

このように Confluent Schema Registry では、スキーマの管理と共に、求められる互換性のタイプに従ってそれを維持するようチェックをかけてくれます。 本当はスキーマ管理を .avsc ファイルなどで保存して CI で互換性チェックし、パスすればスキーマ更新するなど人間による直接の更新を避けるのが望ましいかもですが、この WebUI と互換性チェック機構で人間による直接管理もある程度運用に耐えるかもしれません。

余談; _schemas topic

WebUI 上では _schemas topic の様子も確認することができます。 log.cleanup.policydelete ではなく compact が設定されており、これにより時間経過等によるレコードの削除を抑制しています。

f:id:syu_cream:20200131010901p:plain

Confluent Schema Registry の実装を覗いてみる

Confluent Schema Registry のコードはGitHub で公開されています。折角なのでどんな実装になっているか少し覗いてみます。 ちなみにここでは網羅性などは全く考慮していません。筆者が気になるポイントを勝手にかいつまんでいます。

github.com

ここで紹介する内容は v5.4.0 時点のものとなります。

avro-serializer: Schema Registry と通信した結果でシリアライズ・デシリアライズを行う

この module に存在するのが、実際に producer, consumer を実装する際に properties に指定する serializer, deserializer になります。

シリアライズのコアの実装が AbstractKafkaAvroSerializerserializeImpl() に存在します。 自動登録する設定になっているなら Avro スキーマスキーマレジストリに登録し、その後 Kafka の topic に送信するメッセージをシリアライズします。 この時のバイナリレイアウトなのですが、 この Serializer 実装特有のフォーマットになっており Avro のレコードの手前に 8 バイトのマジックナンバースキーマの ID が埋め込まれています。 Avro のレコードのバイナリはおなじみ SpecificDatumWriter, ReflectDatumWriter, あるいは GenericDatumWriter で書き出します。

docs.confluent.io

これに対応してデシリアライズ側の実装は AbstractKafkaAvroDeserializer クラスに多くのロジックが持たれています。 まずは deserialize() ですがリーダ側のスキーマが渡せるようになっており、デシリアライズの結果をライター側とは異なる(しかし互換性はある)型に展開できるようになっています。 ライター側のスキーマは勿論スキーマレジストリ経由で解決できます。この辺りの実装は AbstractKafkaAvroDeserializer の内部クラスの DeserializationContext クラスで行われます。 このクラスに渡された、デシリアライズ対象のバイナリは、先頭 8 バイトにマジックナンバースキーマ ID が存在するはすです。 まずマジックナンバーの存在チェックを行った後にスキーマ ID を取り出し、この ID をスキーマレジストリに問い合わせて Avro のスキーマ情報を取得します。 その後はやはりおなじみ SpecificDatumReader, ReflectDatumReader, GenericDatumReaderJava のオブジェクトとして読み出します。

これで概ね Avro のレコードのシリアライズとデシリアライズの流れが掴めますが、スキーマの登録と取得がどのように成されているのでしょうか。 先述の Serializer, Deserializer の共通の親クラスとして AbstractKafkaAvroSerDe があり、このクラスがデフォルトでは CachedSchemaRegistryClient というスキーマレジストリに対するクライアントを保持しています。 このクライアントの実装がどうなっているのか、次はスキーマレジストリとの通信を行っている client module も覗いてみます。

client: Schema Registry のクライアント

SchemaRegistryClient インタフェースを実装したクラスが実際に Schema Registry と通信しています。 先述の CachedSchemaRegistryClient はその 1 実装であり、スキーマや ID, バージョン情報のキャッシュと RestService というクラスのメンバを持ちます。 そして RestService が実際に HTTP リクエストをスキーマレジストリREST API に送信する、例えばスキーマの登録時に /subjects/:subject/versions に POST リクエストを送る、というような格好になります。

core: Schema Registry のコア実装

もうひとつ、スキーマレジストリREST API サーバを提供する core モジュールも気になる存在です。 REST API の実装やスキーマ情報などの永続化をどのように行っているのでしょうか。

スキーマレジストリREST API サーバは Jetty + Jersey で実装されているようです。 API の各エンドポイントは JAX-RS の Resource として実装されており、例えば /subjects/{subject}/versions であれば SubjectVersionsResource が相当する、などの構成になっています。

スキーマなどの情報を永続化するストレージレイヤは KafkaStore クラスで提供されています。 スキーマの登録のため put() が呼ばれると、スキーマ情報を ProducerRecord に包んで _schemas topic に 送ります。 他方、スキーマの情報取得はというと KafkaStore 自体では直接は _schemas topic を consume せず、別スレッドで動作してローカルスキーマキャッシュを更新する KafkaStoreReaderThread が更新した InMemoryCache から取得する格好になります。 この動作についてはドキュメントでも言及されています。

docs.confluent.io

おわりに

Confluent Schema Registry について筆者の思うまま気になるところをざっくり紹介しました。 いかがでしたでしょうか。

次回は Apache Pulsar の持つ Schema Registry 機能について調べて行こうかなと思います。 なんとこちらのスキーマレジストリは Avro は勿論 Protocol Buffers もサポートしているようです! Protocol Buffers は各言語向けの自動生成されたライブラリを使う、 Avro で言う SpecificRecord のサブクラスに近い性質を持つ表現で扱われることが多いと思われますが、スキーマレジストリをどのように提供しているのでしょうか。

pulsar.apache.org

余談ですが本記事執筆中に Confluent Schema Registry でも Protocol Buffers をサポートする気配のある pull request を発見してしまいました。 近い将来 Avro 以外でシリアライズしたレコードが色んなミドルウェアでらくらくに扱える日が来るかも知れませんね。

github.com

Apache Avro について知っていることを書いていく その2

Apache Avro について書き下していく記事その 2 です。 本記事では Avro で表現されるデータのプログラミング言語上の表現、特に Java を想定して SpecificDataGenericData について触れていきます。

Avro のデータ型とコード生成

前回の記事で触れた通り、 Avro では様々なリッチなデータ型を表現できます。 多くの場合一番外側のスコープでは record type を使って構造体を定義することになるかなと思います。 Avro スキーマは以下のように事前に JSON 文字列を埋め込むか .avsc ファイルで保存しておくのもアリですが、実行時に動的生成することもできます。

{
  "name": "Hello",
  "type": "record",
  "fields": [
    {
      "name": "id",
      "type": "long"
    },
    {
      "name": "name",
      "type": "string"
    }
  ]
}

ここで定義したデータ型を実際にどう扱いましょうか? 例えば Protocol Buffers ではちょうど Avro で .avsc ファイルに記述するように、以下のようにおおむね .proto ファイルにメッセージ型を定義していくことになると思います。

syntax = "proto3";

package com.syucream.example;

message Hello {
  uint64 id = 1;
  string name = 2;
}

その後典型的には protoc と各言語に対応する protoc plugin を用いることで、定義したメッセージ型に従ったコードを生成することになります。

$ protoc --java_out=./ hello.proto
// Generated by the protocol buffer compiler.  DO NOT EDIT!
// source: hello.proto

package com.syucream.example;

public final class HelloOuterClass {
  private HelloOuterClass() {}
  public static void registerAllExtensions(
      com.google.protobuf.ExtensionRegistryLite registry) {
  }

  public static void registerAllExtensions(
      com.google.protobuf.ExtensionRegistry registry) {
    registerAllExtensions(
        (com.google.protobuf.ExtensionRegistryLite) registry);
  }
  public interface HelloOrBuilder extends
      // @@protoc_insertion_point(interface_extends:com.syucream.example.Hello)
      com.google.protobuf.MessageOrBuilder {

    /**
     * <code>uint64 id = 1;</code>
     */
    long getId();

    /**
     * <code>string name = 2;</code>
     */
    java.lang.String getName();
   ...

つまりアプリケーションに組み込んで用いる際に事前にコード生成を求められます。

Avro ではコード生成はオプションです。 事前に生成したコードを用いることも、そうでない選択肢を取ることもできます。

基本的なデータ型表現セット GenericData

org.apache.avro.generic.GenericData は Avro のデータ型の Java での表現の基礎となるクラスなどが詰まったユーティリティです。 GenericData 以下に具体的な型に対応したクラスが存在します。 例えば record なら org.apache.avro.generic.GenericData.Record といった具合です。 以下では特にこの GenericData.Record を掘り下げましょう。

まずこのクラスは org.apache.avro.generic.GenericRecord interface を実装しているので、まぁ名前の通りなのですがつまり値を get したり put したりできます。 一見あまり情報量が無さそうなこの interface は、結構色々な Javaミドルウェアで頻出します。 またこの get() の戻り値は Object 型であり、実際どんな型なのか意識するのは get() を呼び出すコードを書く人の責務になります。

GenericData.Record オブジェクトを作るには、 Schema オブジェクトを渡して new します。 Schema オブジェクトは Schema.Parser を使って事前定義した JSON 文字列をパースして得るのもよし、動的に Schema.createRecord() などから生成してもよし、です。

この GenericData.Record ですが、フィールドの具体的な値は Object[] 型のメンバとして持ち、 get/put などをする時は Schema オブジェクトの情報からフィールドのインデックスを得てアクセスします。 これはちょうど、 Avro の仕様としてシリアライズした後のバイナリのレイアウトがどうなっているかがスキーマによって決定づけられる(他にレイアウト情報を持たずスキーマを信じる)のに対応しそうです。

GenericData を読み書きする GenericDatumReader/GenericDatumWriter

org.apache.avro.generic.GenericDatumReader を用いることでシリアライズされた状態のバイナリから GenericData 以下の Java のオブジェクトを取り出すことができます。 ジェネリクスのパラメータは、例えば record 型の値を参照するだけでいいなら前述の GenericRecord を指定するので良いでしょう。 逆に Java のオブジェクトをシリアライズしたい際は org.apache.avro.generic.GenericDatumWriter を使うことができます。 より具体的な使い方は公式のテストコードを見ると参考になります。

github.com

Avro のシリアライズ方法とバイナリレイアウトを決めているのはスキーマです。 というわけでもちろん、GenericDatumReader / GenericDatumWriter を用いる際にはスキーマ情報が求められます。

さて実は GenericDatumReader には二種類のスキーマ、引数名でいうと readerwriter を与えることができます。 名前の通り writerGenericDatumWriter を使ってシリアライズする側で用いられた側のスキーマreaderGenericDatumReader で読み出す側のスキーマです。 具体的で実用しそうなシナリオとしては、 writer が古いスキーマシリアライズする可能性のある環境下で reader としては新しいスキーマ(古いスキーマからの後方互換性がある)に従う型のオブジェクトとして扱いたい際に二種類のスキーマを与える場合がありそうです。

より安全で高速な表現セット SpecificData

さてこの GenericData 関連のツールですが、値の取り回しは Object 型ですることになるので逐一キャストしなければならず、安全な実装をできるかやキャストするコストが発生する点が気になります。 悪い言い方をすれば、シリアライズ・デシリアライズの操作を含めてスキーマに違反していないことをは保証された値でしかなく、 Java のコード上は例えば record 型なら単なる Map<String, Object> 型オブジェクトと言えそうです。

幸いなことに Avro では GenericData を使うのとは別に SpecificData サブクラスを使う道も存在します。 org.apache.avro.specific.SpecificDataGenericData のサブクラスです。 と言ってもこのクラス自体にはさほど機能は追加されていません。 重要なのはこれを継承した、自動生成された schema specific なクラスです。

SpecificData なクラスのコード生成

最初に述べた通り Avro には事前に .java ファイルのコードを生成しておく道もあります。 コード生成は avro-tools を通して行えます。試しにこの記事の冒頭に出てきた .avsc からコード生成してみます。

$ java -jar ~/tools/avro-tools-1.8.2.jar compile schema hello.avsc ./
Input files to compile:
  hello.avsc
log4j:WARN No appenders could be found for logger (AvroVelocityLogChute).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

得られる Java のコードは以下のようになります(一部抜粋)

/**
 * Autogenerated by Avro
 *
 * DO NOT EDIT DIRECTLY
 */

import org.apache.avro.specific.SpecificData;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.message.SchemaStore;

@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class Hello extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
  private static final long serialVersionUID = -8266440640401408575L;
  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Hello\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"name\",\"type\":\"string\"}]}");
  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }

  ...

  @Deprecated public long id;
  @Deprecated public java.lang.CharSequence name;

  /**
   * Default constructor.  Note that this does not initialize fields
   * to their default values from the schema.  If that is desired then
   * one should use <code>newBuilder()</code>.
   */
  public Hello() {}

  ...

  /**
   * Gets the value of the 'id' field.
   * @return The value of the 'id' field.
   */
  public java.lang.Long getId() {
    return id;
  }

  /**
   * Sets the value of the 'id' field.
   * @param value the value to set.
   */
  public void setId(java.lang.Long value) {
    this.id = value;
  }

  /**
   * Gets the value of the 'name' field.
   * @return The value of the 'name' field.
   */
  public java.lang.CharSequence getName() {
    return name;
  }

  /**
   * Sets the value of the 'name' field.
   * @param value the value to set.
   */
  public void setName(java.lang.CharSequence value) {
    this.name = value;
  }

  ...

嬉しいことにフィールドの型に対応した getter/setter が手に入ります! GenericRecord interface のメソッドを使うよりこちらの方が安全で若干高速になるはずです。

見てきた通り SpecificData 系サブクラスはコード生成する都合コンパイル時にスキーマが決まっている必要があります。 またスキーマ定義が頻繁に変更され、その度にコード生成し直しまくるのもやや煩雑かと思われます。 あまり変わらない・変えたくないデータ型を定義する時は SpecificData からなるクラスを生成、動的に・頻繁に変わる可能性があるデータ型は GenericData として扱うのが良いかもしれません。

Java 意外の言語ではどうなる?

Java の例を見てきた通り Specific なテータ表現は Generic なもののサブセットとなっています。 そんな訳で多くのプログラミング言語では Generic なデータ表現はサポートされています。

が、 Specific によってはサポート状況がまちまちなようです。 筆者が認識した限りでは、 C++ で experimental でサポート? されていたり、 Go では gogen-avro を作っている人がいます。

飽きてきたので

本記事はこのあたりで終わります。 あとは筆者がよくハマる、複雑なスキーマを定義してみる話やシリアライズされた後のバイナリのレイアウトの話とか別記事で書くかもしれないです。

Apache Avro について知っていることを書いていく その1

Apache Avro になにかと縁があり、かつ普及しているテクノロジーの割に日本語の情報がそんなにない(個人の意見です、意外とあるかも)のでつらつら書いてみます。 整理はされておらずシーケンシャルに要素を並べています。 実装についてとくに言及がされていなければ、 Java の 1.8.2 について触れているものとします。

Apache Avro はデータフォーマットとエコシステムのひとつ

Apache AvroApache トップレベルプロジェクトのひとつでファイルフォーマットとその周辺エコシステムです。 比較される技術として Protocol Buffers や Message Pack が挙げられると思います。 Apache Avro はそれらの中でも主に Hadoop エコシステムなどビッグデータ絡みの文脈でよく登場するように思えます。

以下のディレクトリ構成の通り、公式でいくつかのプログラミング言語をサポートしているようです。

github.com

筆者はほとんど Java のパッケージしか使わず正確な比較も行ってないのですが、おそらく Java 向けの機能がサポート厚めだと思われます。 avro-toolsJava で書かれてたり avro-protobuf という Protocol Buffers との変換ライブラリが提供されていたりするので。 また非公式ですが以下のような他言語向けライブラリもあったりします。

github.com

Avro はスキーマの表現力が強い

Avro はスキーマに従いシリアライゼーションとでシリアライゼーションをするわけですが、このスキーマの表現力が割と強いです。 雑に列挙すると以下の通りです。

  • record 型(構造体)のサポート

    • エイリアスや doc 、 namespace の宣言もできる(オプション)
  • array 型(配列)のサポート

  • union 型(共用体)のサポート

    • null 型との union を取り、デフォルト値を null にすることで nullable な型を表現できる
  • map 型のサポート

  • enum 型のサポート
  • fixed 型という固定長バイト配列のサポート(使ったこと無いのでよくわからん)
  • デフォルト値が設定できる
  • logical types を使うことで型に別の解釈をもたせることができる

    • メジャーな使い方として long 型に timestamp-millis logical type をもたせるなど
    • timestamp の制度は micros も選べる

これらにより、かなりリッチなスキーマを記述することができるはずです。

Avro のスキーマを記述するのがつらい

表現力が高いこともあるのですが、基本的にスキーマJSON で記述することになる上か結構冗長でかつ人間にとって読みにくい内容になりがちです。 公式リポジトリに簡単な例があるのですが、簡単なものでもある程度事前知識を求められるでしょう。 さらに型をネストし始めたら地獄です。 筆者は特殊な訓練を繰り返すことにより最近手でスキーマを書けるようになってきた気がしますが、このスキーマの記載を同僚に求めるのは酷なものです。

github.com

この問題はかなり認知されているのか、公式が IDL を提供しています。 一見 Protocol Buffers を彷彿とさせる気がするシンタックスですね。

avro.apache.org

また、プログラムから動的にスキーマを記述、生成することもできます。 Java であれば Schema.createRecord() して List<Schema.Field> オブジェクトを埋めて・・・といった流れで実現できます。

Avro 自体の圧縮効率はそんな高くない?

int, long は zigzag encoding してくれますが、 bytes, string は length + バイナリなどかなりシンプルなシリアライズをします。 結果としてシリアライズされたあとのバイナリのサイズは Protocol Buffers や Message Pack と比較してそんなに潰れないと思います。 (実測した訳でなく、単にこれまでの経験則程度の話です) ただ Object Container Files フォーマットを取るとブロック単位に圧縮をかけてくれたりします。

Object Container Files フォーマットの便利さ

あなたがデータアナリストやデータサイエンティストなどのポジションで働いているなら、このフォーマットで Avro と対面することが多いかもしれません。 .avro 拡張子をよく取る、 Object Container Files フォーマットに従うレイアウトを取ったファイルです。

実体は well-known な Avro スキーマに従ってシリアライズされたファイルになります。 そのスキーマの中にはレコードのスキーマとコーデック、フィンガープリント情報などが含まれます、 ここのスキーマ情報により、ファイルを読むことで中のバイナリがどんなスキーマシリアライズされたかわかるので動的にデシリアライザを生成して読める状態になってるわけですね!すぐれもの!

このファイルフォーマットは BigQuery や Redshift でもサポートされており、 Hadoop エコシステムにあまり関わらずとも Avro のファイルをオブジェクトストレージに書き出しておけば、あとで DHW に統合して安全便利にクエリを投げられるわけですね!すぐれもの!

avro-tools が開発運用に便利

あまり言及されてない気がしますが、 avro-tools という Java 製のツールが便利です。 事前定義した Avro のスキーマを記載した.avsc ファイルから Specific クラスを自動生成するのはもちろん、 Object Container Files フォーマットのファイルから schema を読んで吐いてくれたりレコードを json に変換して出力してくれたりします。 特にレコードを json で出力してくれると、 jq などと組み合わせてフィルタしたり内容確認できたりして便利ですね。

mvnrepository.com

飽きてきたので

本記事ではこれくらいにしておきます。 気力があったり次回書きたいことがまとまってきたらまた書きます。

「データとML周辺エンジニアリングを考える会」という勉強会の第二回を開催しました

TL;DR

2019/07/19(金)に、ヤフー株式会社様コワーキングスペースの LODGE において、「データとML周辺エンジニアリングを考える会」という勉強会の第二回目を開催しました。

data-engineering.connpass.com

データエンジニアリングとサイエンス、アナリティクスにざっくり被るような少し広く曖昧なドメインで開始した勉強会です。

今回は前回より少々多めで、 40 人強の参加者が参加してくれました。 発表、 LT に合わせて懇親会も議論が弾み、主催のひとりとしては実現したかったことがある程度できたのではないかと考えています。

つらつら発表内容

せっかくなので覚えているうちに発表資料のリンク記載と超個人的な感想載せてみます。

15 分枠

GCPでStreamなデータパイプライン運用しはじめた by @shoe116

メルカリでのログ収集のためのパイプラインの構築の話です。 マイクロサービスアーキテクチャへの移行やビジネス・組織のスケールに合わせた試行錯誤の跡が伺えます。 というかわたしも業務で参加してるやつです。実際上記の試行錯誤をしています。

speakerdeck.com

行動ログ処理基盤の構築 by @hirosassa

サービスにおける行動ログの収集基盤の刷新の話です。 現行システムが pull 型で基盤側がサービスの内情を知ってしまう問題を、 push 型のアーキテクチャにしたあたりが今後の投資になりうるおもしろい点なのではと思います。 (基盤システムってどこまでサービスのことを知るか、責任分界をどこでするかしばしば悩ましくなりますよね)

speakerdeck.com

LT 枠

Google Cloud ML Engineに浸かってみる by @yudeayase

GCP の ML Engine の話です。 ML Engine, 便利そうではあるもののこの仕組みに特化してしまうのは良いのか?などと考えさせられました。

cloud.google.com

(資料アップロードなし?あとで調べる

PoC案件が多すぎてつらいので、パイプラインを使いまわすツールを入れた。 by @mori_kaz0429

繰り返し発生する PoC 案件で、似たようなクエリを投げたりすることが多い処理を共通化、再利用可能にする話です。 最後の方には Apache Airflow などワークフローエンジンを今後使ってみたい話も。

(資料アップロードなし?あとで調べる

Cloud Composer & Cloud Dataflow によるバッチETLの再構築 by @yuzutas0

Cloud Composer (Apache Airflow のマネージドサービス) を使って壊れかけのデータ同期の仕組みを立て直す話です。 データエンジニアとデータアナリスト、両方に対してヒアリングをかけつつ現状を鑑みて良いバランスのところを攻めるというマネジメントに近い側面もあれば、 Cloud Dataflow によるクレンジング処理に触れたりする話もあり盛りだくさんでした。

speakerdeck.com

DigdagでETL処理をする by @nakano_shota

今度は Digdag でワークフローを組んだ話です。 s3_touch というオペレーターを開発して、 s3_wait と組み合わせてプロジェクト間依存関係も対応できるようにしていて良い。 あまり深く触れられなかったけどリトライ処理や冪等性担保大事ですよね。もっとお話聞いてみたい気がします。

speakerdeck.com

Comet.ml で AutoML ライブラリ開発(仮) by @Y_oHr_N

Comet.ml 、初めて知ったのですがかなり良さそう! codecov でカバレッジを可視化するのと同じようにモデルの可視化をできるのは好感触!

speakerdeck.com

データ活用の際にハマってしまったログ・データスキーマ設計 by @yu-ya4

この手の苦労話、これこそこういう勉強会で話し合いたかったことな気がします。 テーブルの日付が何の時間を表すか問題、スキーマ更新にどう立ち向かうか、 STRING 型フィールドに JSON 突っ込むのどうなんだ話、 null の扱いと結構あるある話な気がするが・・・。 二番目の話題は個人的にも刺さるものがありました。簡易にアプリケーションからログを出力して、読み出す時に苦労するスキーマオンリードの戦略自体は間違ってはいないはずだし・・・。 触れられていなかった別の点として、 BigQuery はカラムナでデータを持ってくれているはずなのですが、 JSON を突っ込んで読む時にパースするではパフォーマンスが落ちる(課金が増える?)かもしれないなと思いました。

speakerdeck.com

今後に向けて

幸いなことに参加者の方々からもそれなりに好評を得られたようだし、自分としても知見を得たい気持ちもあるため、主催メンバーで話し合いつつ第三回を企画していきたいと考えています。 ジャストアイデアですが、初心者枠?というかこれからデータ基盤を作っていこうとする人たちが発表しやすい枠を設けるとかもあるとイベントの雰囲気変わるかなとか。 その時が来たらまたアナウンスしますので、ご興味ある方々いらっしゃいましたらぜひぜひ!!

技術書典5で配布した同人誌の原稿データを GitHub で公開しました

こちらで紹介した、僕が主催するサークルで技術書典5で配布した同人誌の原稿データを公開しました。

syucream.hatenablog.jp

リポジトリはこちらになります。

github.com

前回と同様、 Re:VIEW を使って記述しています。 epub / pdf ファイルが欲しい方はいい感じにビルドしてください。

もし多少でも気に入った方がいらっしゃれば、 Kindle 版を購入していただけると幸いです! 僕のジュース代の足しくらいにはなります。

www.amazon.co.jp