ブログ・ア・ラ・クレーム

技術的なメモとかライフログとか。

Cloud Dataflow の FlexTemplate は何者か

先月、さらりと Cloud Dataflow に FlexTemplate という新機能のベータ版がリリースされました。

cloud.google.com

残念ながらまだあまりドキュメントがなく、これを用いるとなにが嬉しいのかが掴みにくいところです。 本記事では FlexTemplate 周りを軽く試してどのような機能で何が嬉しいのか探ってゆきます。

従来のテンプレート

Cloud Dataflow はジョブの一部パラメータを実行時に置き換え可能にしたビルド済みのテンプレートを作ることができます。 また Google はこの機能を用いた公式テンプレートを何種類か用意してくれており、 GCP 上のリソースに対してコーディングなく ETL 処理をすることが可能になっています。

cloud.google.com

ちなみにこのテンプレートの機能なのですが、 FlexTemplate の登場の都合からかドキュメント上で "従来のテンプレート" という見出しになってしまったようです。。。

f:id:syu_cream:20200512002131p:plain

実行時にパラメータを置き換え可能にするには、その値を ValueProvider でラップしてあげる必要があります。 ValueProvider の実装としてテンプレートのステージング時に静的に値が決まる StaticValueProvider と実行時に決まる RuntimeValueProvider が存在しており、これによりパラメータを渡すタイミングを柔軟に選べます。 また Apache Beam の SDK 内の IO 関連ビルダークラスには ValueProvider を受け取って振る舞いを変えてくれるものが多々あります。

beam.apache.org

逆に言うとこの ValueProvider で賄えないような性質のパラメータは実行時に指定可能なパラメータとして扱えません。 これは分かりやすい部分で言うと Beam の SDK のインタフェース的に ValueProvider を受け取ってくれないような部分、もっと複雑な例で言うと内容によってパイプラインのグラフが変わるようなパラメータは扱えません(あるいは扱うのが困難)

FlexTemplate

FlexTemplate は、おそらくなのですが、与えられるパラメータに柔軟性を与えるものです。 ドキュメントだけでは正体が定かにはならないですが、 gcr.io/dataflow-templates-base/java11-template-launcher-base をベースとした Docker image を用いて Dataflow ジョブを実行するようになります。 テンプレートジョブの構築手順は「従来のテンプレート」に似ており FlexTemplate によるテンプレートの作成と、実行の 2 つのフェーズがあります。

前者では前述の Docker image を GCR に push して、かつその Docker image で解釈可能なパラメータを示した metadata.json を GCS に upload します。 Docker image の build & push は従来のテンプレートでは存在しなかった手順ですね。

https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates#creating_a_flex_template

後者では実際にパラメータを与えてジョブを実行します。

cloud.google.com

このジョブの実行時に興味深いことに、ジョブの launch 用に GCE インスタンスが開始され先程作成した Docker image が実行されることが見て取れます。 この Docker コンテナの動作がよくわからないところですが、内部的にはこのタイミングから Dataflow ジョブのステージングと実行を開始してくれるものと考えられます。

f:id:syu_cream:20200512004846p:plain

#cloud-configs
...
    ExecStart=/usr/bin/docker run -v /var/log/dataflow/template_launcher:/var/log/dataflow/template_launcher gcr.io/xxx/samples/dataflow/streaming-beam-sql --template-container-args='{"consoleLogsLocation":"gs://xxx/staging/template_launches/xxx/console_logs","environment":{"region":"us-central1","serviceAccountEmail":"xxx","stagingLocation":"gs://xxx/staging","tempLocation":"gs://xxx/tmp"},"jobId":"xxx","jobName":"streaming-beam-sql-20200512-004931","jobObjectLocation":"gs://xxx/staging/template_launches/xxx/job_object","operationResultLocation":"gs://xxx/staging/template_launches/xxx/operaton_result","parameters":{"inputSubscription":"test","outputTable":"foo:bar.baz","stagingLocation":"gs://xxx/staging","tempLocation":"gs://xxx/tmp"},"projectId":"xxx"}'
...

さてこの FlexTemplate で何が嬉しいのかというと、大きな差異として実行時パラメータを ValueProvider で包む必要が無くなった点があります。 FlexTemplate を利用したジョブのサンプルがいくつか公開されているのですが、この内の PipelineOptions の getInputSubscription() など、実行時に指定可能なパラメータでありつつも ValueProvider 型で扱われていないことが見てとれます。

github.com

これにより、従来のテンプレートでは課題になっていた ValueProvider を受け取ってくれないような値を実行時に変更したいだとか、パイプラインの構造が変わるようなパラメータを渡したいような部分に効果を発揮するものと思われます。 とは言ってもデメリットもあり、ジョブが Queue に入ってから Running の状態になるまで数分待たされる(ステージングのタイミングが遅延したのなら妥当に思えるが)だとか、今のところ Streaming Engine や FlexRS のサポートがないとか課題が存在します。

終わりに

軽く触れてみた限り FlexTemplate はシンプルな仕組みながら Dataflow のテンプレートに相当な柔軟性を持たせることができそうです。 FlexTemplate にどこまでを期待して、どのように従来のテンプレートと棲み分けすべきかのか(上位互換の位置付けなのか?)、今後のサポート具合など気になるところですね。

Schema Registry について書いていく: Confluent Schema Registry の Protocol Buffers & JSON Schema サポート

先日リリースされた Confluent Platform 5.5 より Protocol BuffersJSON Schema のサポートが入ったようです。

www.confluent.io

以前 5.4 を対象に、 Schema Registry を中心に色々記載してみましたが、今回は 5.5 で入ったこの差分を追跡してみます。

syucream.hatenablog.jp

Confluent Control Center 上でスキーマを管理する

定番の Confluent Control Center を使って、ブラウザ上で Schema Registry の設定周りを試してみましょう。 試しに protobuf でスキーマを登録してみた状態が以下のとおりです。 protobuf において .proto ファイルに記述していくスキーマを登録できるようになっています。

f:id:syu_cream:20200502220418p:plain

JSON Schema の場合は例えば以下のようになります。

f:id:syu_cream:20200502220857p:plain

互換性チェックの挙動を試してみる

これだけでは面白くないので protobuf のスキーマを更新していってみます。 まず uint32 age = 3; フィールド追加を行ってみます。フィールド追加は特に互換性を壊しません。というわけでスキーマの更新も無事に行なえます。

syntax = "proto3";
message value_protobuf {
  uint64 id = 1;
  string name = 2;
  uint32 age = 3;
}

今度は field number を変更してみます。早速さきほど追加したフィールドを uint32 age = 42; に書き換えてみました。 この変更もリスクはありそうですが、スキーマ更新は受け入れられます。

syntax = "proto3";
message value_protobuf {
  uint64 id = 1;
  string name = 2;
  uint32 age = 42;
}

さらにフィールドの削除やフィールド名の変更も受け入れられます。

syntax = "proto3";
message value_protobuf {
  uint64 id = 1;
  string fullname = 2;
}

これらの変更は互換性を壊すものではないのでしょうか? Confluent Schema Registry での protobuf の互換性チェックの仕様は以下の通りになっています。

docs.confluent.io

おおむね protobuf の wire format の解釈に影響を及ぼさず、デシリアライズは無事行えるような内容であれば互換性が維持されると見るようです。 フィールド名の変更に対する言及は明記されていませんが、これも wire format の解釈に影響しないからだと思われます。 ・・・ただ実際にはデシリアライズには成功するとしても後続のデータ処理が影響を受けないかどうかは保証されないでしょうし注意が必要です。 フィールドの追加や削除は予期せぬ値のデフォルト値化、フィールド名のリネームはフィールド名を意識した後続処理で問題が発生するかも知れません。

ここから明確に互換性が無い変更も行ってみます。このフィールドの型 uint32string にしてみます。 この場合は予想通り互換性が無いとされてスキーマ更新が拒否されます。

f:id:syu_cream:20200502222110p:plain

// 筆者はこの他にも JSON Schema のスキーマ変更も試してみたのですが、互換性チェックの挙動が読めず諦めてしまいました。

protobuf の扱い周りの実装を読んで見る

5.5 でサポートされたデータフォーマットのバリエーションと各フォーマットの扱いはどうなっているのでしょう。 せっかくなので前節で触れた protobuf 周りの実装を追ってみます。

protobuf 対応した Serializer/Deserializer

ドキュメントから、データフォーマット毎に Serializer, Deserializer の実装が異なるようです。まずはここをエントリポイントとしてみます。

docs.confluent.io

Avro と同様 AbstractKafkaProtobufSerializer, AbstractKafkaProtobufDeserializer に、 protobuf のデータである Message のサブクラスのオブジェクトのシリアライズ・デシリアライズ処理と schema registry とのやり取りの処理が実装されています。 ここで protobuf のスキーマ情報は公式の Descriptor としてではなくラップされた型として扱われます。また protobuf は import 文で他の .proto ファイルを読み込むことができるのですが、この依存関係を解決するロジックも持ち合わせています。

面白い点として Serializer は protobuf の、 protoc で生成された具体的なメッセージクラス (Avro でいう SpecificRecord みたいなもの)を扱い、 Deserializer では DynamicMessage (Avro でいう GenericRecord みたいなもの) を扱うことです。 producer は .proto の記述に従ったメッセージクラスが使えますが、それを consumer が持っている保証はないため妥当な扱いだと思われます。

複数データフォーマットをサポートしたスキーマの表現

5.4 では Schema Registry として考慮していたスキーマが Avro しかありえなかったので、 Schema Registry としては Avro の Schema クラスを扱うような実装になっています。 これが 5.5 では ParsedSchema インタフェースという一段抽象化された構造で扱われます。 protobuf のスキーマを表現する ProtobufSchema、 Avro の AvroSchema, JSON Schema の JsonSchema はそれぞれ ParsedSchema を実装しています。 個々の ParsedSchema 実装の中でスキーマ固有の処理、互換性チェックや protobuf で言う descriptor の扱いなどを担います。

おわりに

簡潔に追っただけですが、妥当な進化を遂げたような Confluent Platform 5.5 でした。 protobuf はコード事前生成を想定した利用シーンが多いですが、 Confluent Platform でのサポートなど利用シーンが増えてくると活用できる機会がさらに増えるかも知れません。 また今回複数データフォーマットのサポートが入った以外にも ksqldb という KSQL の進化系?のような機構が増えて Confluent Platform も進化を続けている印象を受けますね。

protobuf のシリアライズ済みバイナリを無理やり読む

Protocol Buffer wire format について

Protocol Buffer でシリアライズされた後のバイナリのレイアウトの仕様は wire format の仕様という形で独立してドキュメントが用意されています。 この wire format の仕様は見ればわかる通りそれほど記述量が多くなく、それでいて互換性を気にしつつ、かつ拡張の邪魔にならないような配慮がされています。

developers.google.com

wire format として特に重要な点は以下の通りだと個人的には考えます。

  • 整数型に対するデータサイズがなるべく小さくなるような工夫がされている
  • 複雑な型はバイト配列に押し込める。(多くのシリアライゼーションフォーマットと同じように)長さが指定されたバイト配列として表現
  • フィールドの順序が任意にできる。のでスキーマ更新において順序を意識しなくてよくなる。

wire format では各フィールドはフィールド番号と wire format 上での型で識別されます。 これらを元に、 .proto ファイルでどう記述されていたのかに対応してより具体的な型として解釈したりフィールドの名前を取得したりできるわけです。 例えば field number が 10 、 wire type が 2(バイト配列) であった場合に、 .proto では string name = 10; という対応する field number があるならフィールド名は name でありバイト配列は文字列として解釈できそうだと判断できます。

とはいえシリアライズ済みバイナリを頻繁に扱ってくると、ましてや複数のシステムでデータ交換をし出すと時には .proto や descriptor 、ライブラリなしに内容を確認したい時があるかも知れません。 特にデータ交換を行う場合システム間で持つスキーマが異なる場合にはこの希望は大きくなるかもです。 またパースエラーが起こる箇所を絞り込みたいなどの要望から、スキーマを用いた具体的なデータの解釈をなるべく遅らせたい場合にこの戦略は有効かも知れません。 そんなことを考え、 Go で無理やりシリアライズ済みバイナリを、それ単体の持つ情報だけで解釈してみる試みをしました。

Protocol Buffer without schema

というわけで Go で schema less で protobuf のデータを無理やり解釈するコードを書いてみました。

github.com

wire format のフィールドの解釈は protowire という既存のモジュールがあり、詳細な部分は概ねこれで行えます。 wire format の解釈の上で面倒くさい点は可変長になりうる varint の値や ZigZag encoding されうる signed integer あたりですがこのあたりは実装されてくれています。

godoc.org

バイト列の詳細な解釈は descriptor の情報が無いと正確な値は取れません。 ここでは雑に、思いつく範囲の解釈を全部試して wire format としては valid な値をすべて返すようにしています。 具体的には文字列とバイト列、ネストされたメッセージ、整数型の repeated な値などです。 そのほかにも Protocol Buffer としてはバイト列で map 型が表現できるのですが、これについては wire format やその他のドキュメントに詳細な仕様がなく(うまく見つけてなくてどこかに存在するかも?)一旦諦めています。 Go の実装的にはバイト列の中に更にネストされたメッセージに似てフィールド番号や wire type が指定されたフィールド列が存在して、フィールド番号が 1 であれば map の key 、 2 であれば value となるようです。

github.com

これを用いて以下のような .proto のメッセージを

syntax = "proto3";

message Example {
    enum Num {
        ZERO = 0;
        ONE = 1;
    }

    uint64 uint64_val = 1;
    string string_val = 2;
    fixed64 fixed64_val = 3;
    fixed32 fixed32_val = 4;
    Num enum_val = 5;
    Child child_val = 6;

    repeated uint64 r_uint64_val = 101;
    repeated string r_string_val = 102;
    repeated fixed64 r_fixed64_val = 103;
    repeated fixed32 r_fixed32_val = 104;
    repeated Num r_enum_val = 105;
    repeated Child r_child_val = 106;
}

message Child {
    uint64 v = 1;
}

こんな様な値を設定してシリアライズした時の

               msg := &protosl.Example{
                    Uint64Val:  1,
                    StringVal:  "testing",
                    Fixed64Val: 11,
                    Fixed32Val: 111,
                    EnumVal:    protosl.Example_ONE,
                    ChildVal: &protosl.Child{
                        V: 1,
                    },
                    RUint64Val: []uint64{2, 3},
                    RStringVal: []string{"aaa", "bbb"},
                    // RFixed64Val: []uint64{22, 33}, TODO repeated fixed isn't supported
                    // RFixed32Val: []uint32{222, 333}, TODO repeated fixed isn't supported
                    REnumVal: []protosl.Example_Num{protosl.Example_ZERO, protosl.Example_ONE},
                    RChildVal: []*protosl.Child{
                        {
                            V: 2,
                        },
                        {
                            V: 3,
                        },
                    },
                }

シリアライズ後のバイナリを食わせると、かなり冗長にはなりますが以下のように解釈できます。

$ echo -n "\x08\x01\x12\x07\x74\x65\x73\x74\x69\x6e\x67\x19\x0b\x00\x00\x00\x00\x00\x00\x00\x25\x6f\x00\x00\x00\x28\x01\x32\x02\x08\x01\xaa\x06\x02\x02\x03\xb2\x06\x03\x61\x61\x61\xb2\x06\x03\x62\x62\x62\xca\x06\x02\x00\x01\xd2\x06\x02\x08\x02\xd2\x06\x02\x08\x03" | protosl
{"1":1,"101":{"__bytes":"AgM=","__packed":[2,3],"__string":"\u0002\u0003"},"102":[{"__bytes":"YWFh","__packed":[97,97,97],"__string":"aaa"},{"__bytes":"YmJi","__packed":[98,98,98],"__string":"bbb"}],"105":{"__bytes":"AAE=","__packed":[0,1],"__string":"\u0000\u0001"},"106":[{"__bytes":"CAI=","__message":{"1":2},"__packed":[8,2],"__string":"\u0008\u0002"},{"__bytes":"CAM=","__message":{"1":3},"__packed":[8,3],"__string":"\u0008\u0003"}],"2":{"__bytes":"dGVzdGluZw==","__packed":[116,101,115,116,105,110,103],"__string":"testing"},"3":11,"4":111,"5":1,"6":{"__bytes":"CAE=","__message":{"1":1},"__packed":[8,1],"__string":"\u0008\u0001"}}

バイナリを直接確認できると何かしら便利、フィールドとの対応付けは後で行うので・・・という時に役のに立つかも知れません。

原則 WFH 勤務が開始して二ヶ月が経過した

タイトルの通りで単なる日記なのですが、今の心境を赤裸々に綴っておくと後で振り返れると思いつらつら書きます。 なおこのエントリは個人の意見ですし、同じ会社同じチームでも受け取り方や課題感はだいぶ差異が出るんじゃないかなーと思っています。

二ヶ月前からこれまで

今年 2 月 19 日に新型コロナウイルス蔓延を受けて、勤め先が以下のようなプレスを出しました。

about.mercari.com

この段階で大規模にリモートワークに転換する企業はそう多くなく、このときは代表例としては GMO さん、ついでドワンゴさんあたりが原則リモートワークで業務をすることを宣言していた記憶があります。 東京都内のこの時の状況としては、少ないながらも感染事例が挙げられつつあり、会社として特に方針が無くともチームとして満員電車の通勤を避けたいななどと話していたりしました。 そういう情勢だったので会社として早めに方針が打ち出されたのはいち従業員としては行動方針を決めやすくもありありがたいものでした。

チーム的には Slack ベースのコミュニケーションを盛んにやっており、リモート勤務に必要な設備もチームメンバーがそれほど障壁なく行えたのは幸いでした。 WIAS という勤怠管理システムのお世話にもこの頃からなり始めました。

tech.mercari.com

さて原則 WFH はそれほど課題なく継続されて、一ヶ月が経過してお花見シーズンになる頃には世間的にも緩みが出てきて、チーム的にも週一くらいで出社して社会復帰してみよう(?)という雰囲気が出てきました。 その矢先に永寿総合病院における院内感染が発生して(通勤経路や生活圏的にこのあたりで僕は余計に危機感を覚えたのですが)、緩み始めたムードは一点します。 より強く WFH を求められるようになり、世間のニュースも仄暗いものばかりになってきます。 その傾向は緩むどころか悪化するばかりで、やがて緊急事態宣言に至り、本稿執筆時は多くの飲食店等が活動を休止して街はすっかり息をひそめています。

WFH を続けてみた所感

そんな具合で世間的には比較的早く原則 WFH の働き方に切り替えて二ヶ月経過した訳ですが。 WFH だから被った被害ややりにくさというのはそれほど無いと考えています。 ただしホワイトボードなどを使って集団でブレストしたり、ふらっと立ち話風に同僚に声かけて相談するという行動がしにくくなったのは損失ではあります。ただ致命的ではないとも。 また僕の場合は部屋が狭く(自宅の快適さより通勤の快適さに重きを置いていた)、働くのにあまり適さない自宅環境だったのも大きな課題でした。

今回の件で一番の問題は WFH で勤務することそれ自体でなく新型コロナウイルス蔓延の先行きが見えず、時には大きく状況が悪化することがあった部分だと思います。 働き方というか働くことというか、それ以前に世間を取り巻く状況が悪く、精神衛生に良くないのがもっとも辛い。

とりあえずこの先生きのこるには

直近試しているのはとにかく作業環境の改善です。 1K 限界一人暮らし部屋はそのままでいると漏れなく仕事場所がベッドルームと同一存在になります。 これに耐えられる人は良いのでしょうが、僕は耐えられませんでした。 この課題に対して理想的にはさっさと引っ越し!したいところですが、不要不急の外出を自粛するよう求められた我々にできることは限界があります。 というわけで妥協案としてクローゼットの中のものを外に追いやるか思い切って処分してスペースを構築して、クローゼット内部を作業場所にするようにしました。

このようなクローゼットの活用の仕方には「クロッフィス」と呼ばれることがあるようです(どれくらい知名度があるかは知らん)

weboo.link

今ではカーテンで簡易仕切りを設けたり造花で草を生やしたり最適化を進めています。

そして精神衛生に悪い状況であることは、もうしょうがない。 しばらくは今回の騒動が起こる前と同じようには働けないと、諦めることにしています。 逆にこの状況の変化で生産性が向上するのならそれはとてもすごいこと。だけどそうそう起こらないこと。 つけ加えるなら、メンタルが弱った自覚が出てるうちにさっさと有給を取るなりして休んでしまうのが良いとも思っています。ねてればなおる、しらんけど。

BigQuery SQL UDF の挙動を色々確認する

BigQuery ではユーザ定義関数(UDF) を作ることができる。 これを使って、よく使われる式や関数呼び出しの組み合わせを名前付けして再利用できる。

cloud.google.com

UDF は SQLJavaScript の二種類の言語による記述が可能で、後者は色々なトリッキーな利用例も世に出ているので知っている人も多いかも知れない。 例えば WebAssembly のコードを実行するとかである。

medium.com

本記事では 地味な方 素直な SQL での記述による UDF について色々挙動を確認してみる。

戻り値の型を推論してくれる

UDF は例えばシンプルなものだと以下のように記述できる。

CREATE TEMP FUNCTION
  zero()
  RETURNS INT64 AS (0);
SELECT
  zero()

ここで戻り値の型は省略することができる(勝手に解決してくれる)

CREATE TEMP FUNCTION
  zero()
  AS (0);
SELECT
  zero()

STRUCT 型の戻り値も推論してくれる。が、もちろん各フィールド名の情報はないのでダミーの値が補完されてしまう。

CREATE TEMP FUNCTION
  user()
  AS ((1, "taro"));
SELECT
  user()
[
  {
    "f0_": {
      "_field_1": "1",
      "_field_2": "taro"
    }
  }
]

RETURNS で型を明示するとそこに記述されたフィールド名を拾ってくれる。

CREATE TEMP FUNCTION
  user()
  RETURNS STRUCT<id INT64, name STRING>
  AS ((1, "taro"));
SELECT
  user()
[
  {
    "f0_": {
      "id": "1",
      "name": "taro"
    }
  }
]

UDF から UDF を呼ぶ際はすでに定義済みである必要がある

UDF から別の UDF を呼び出すこともできる。

CREATE TEMP FUNCTION
  callee() AS (42);
CREATE TEMP FUNCTION
  caller() AS (callee());
SELECT
  caller()

この定義順序を逆にするとクエリの実行が通らなくなる。 Function not found: callee; failed to parse CREATE [TEMP] FUNCTION statement at [5:16] というエラーが出てしまう。

CREATE TEMP FUNCTION
  caller() AS (callee());  -- <- Function not found :(
CREATE TEMP FUNCTION
  callee() AS (42);
SELECT
  caller()

UDF の再帰呼び出しをすることもできない。この例では Function not found: fib; failed to parse CREATE [TEMP] FUNCTION statement at [10:7] と怒られてしまう。

CREATE TEMP FUNCTION
  fib(n INT64) AS (
  IF
    ( n > 2,
      fib(n - 1) + fib(n - 2),
      1 ) );
SELECT
  fib(2)

だったら一度ダミーの UDF を定義してみよう!ということで一時 UDF じゃなく永続 UDF を、一度ダミーのものを定義したあと REPLACE してみる。 このクエリはそれ自体は valid と言われ実行可能である。

-- dummy
CREATE OR REPLACE FUNCTION
  udfs.fib(n INT64) AS (1);
 
CREATE OR REPLACE FUNCTION
  udfs.fib(n INT64) AS (
  IF
    ( n > 2,
      udfs.fib(n - 1) + udfs.fib(n - 2),
      1 ) );
SELECT
  udfs.fib(2)

が、これは実行してみると Query error: Too many nested views/persistent user-defined functions or possible circular reference of views/persistent user-defined functions referenced in query. Only 16 levels of nested views/persistent user-defined functions are allowed. at [11:1] と怒られてしまう。 UDF では 16 チェーンまでしか UDF 呼び出しをできない制限がありそれに引っかかっているように見えるエラーメッセージである。この例では udfs.fib(2) を呼び出してもそもそも再帰呼び出しされない気がするが・・・。

cloud.google.com

頑張ればループ処理を書ける

UDF では前述の通り再帰呼び出しできず、ループするような制御構文もサポートされていない。 が、 ARRAY 型の値と ARRAY 型関連関数を使うとループ、リストの各要素に対する繰り返し処理的なことができる。 ミソとなるのがみんな大好き UNNEST() でテーブルを手にいれることができることと、 ARRAY() の引数はサブクエリを取れる辺りにある!

CREATE TEMP FUNCTION
  array_twice(n INT64) AS (ARRAY(
    SELECT
      AS STRUCT (
      SELECT
        ARRAY_AGG(vv)
      FROM (
        SELECT
          v * 2  -- <- like a map function
        FROM
          UNNEST(GENERATE_ARRAY(1, n)) AS v) AS vv)));
SELECT
  array_twice(100)

テーブルを参照することもできる

上記より SELECT 文が打てることを確認できたので FROM 句に実テーブルを渡してみる。 このクエリも合法で実行することができる!

CREATE TEMP FUNCTION
  do() AS (ARRAY(
    SELECT
      AS STRUCT (
      SELECT
        ARRAY_AGG(vv)
      FROM (
        SELECT
          LENGTH(repo_name)  -- <- like a map function
        FROM
          `bigquery-public-data.github_repos.licenses` -- <- refer to a table
        LIMIT
          10) AS vv)));
SELECT
  do()

ドキュメントにテーブル参照に関する記述があるので、この挙動は一応想定されたもののはず。(もしかしたら他のまっとうな参照方法があるのかも)

一意の UDF とテーブル参照を合わせたクエリあたりの最大数 - 1,000。完全な展開後に、UDF ごとに一意のテーブルと UDF を合わせて 1,000 個まで参照できます。

cloud.google.com

Apache Parquet の Logical Types に関するメモ

去年末になりますが、 embulk-output-s3_parquet という Embulk Plugin にて Logical Type をサポートするためのパッチを書き、マージしていただきました。

github.com

個人的な主な目的は timestamp 型をサポートすることでした。 Athena や BigQuery から参照するにあたり timestamp 型として扱えるか否かで使い勝手は大きく変わります。 Logical Type としての扱いを期待するかを設定に書く必要はあるものの、割と簡素な設定で timestamp 型対応ができるようになったのではないかと思います。

さてこの Logical Type ですが、 2020 年 2 月現在 2 種類の仕様が混在してかつ古い仕様が事実上スタンダードになっている状態です。 自分は最初混乱したのですが、この不幸の連鎖を止めたいので、本記事に自分が把握した状態を記録しておきます。

古い仕様 "Converted Types"

ここまで一口に "Logical Types" といってしまったのですが、ドキュメント上は古い仕様は Converted Types, 新しい仕様が Logical Types と区別されます。 そして現在使われる Logical Types としてはこの Converted Types が一般的でしょう。

github.com

前提として、 Parquet がサポートする型 は以下のものになります。

  • BOOLEAN
  • INT32
  • INT64
  • INT96
  • FLOAT
  • DOUBLE
  • BYTE_ARRAY

Converted Types ではこれらの型に別解釈を与えることができます。 これは OriginalType に定義されています。 (このクラス名からか、 JIRA や GitHub 上のやり取りを見ると Converted Types のことを Original Types と呼ぶこともあるっぽい?ようです。 Java 実装に限った話かも)

  • MAP,
  • LIST,
  • UTF8,
  • MAP_KEY_VALUE,
  • ENUM,
  • DECIMAL,
  • DATE,
  • TIME_MILLIS,
  • TIME_MICROS,
  • TIMESTAMP_MILLIS,
  • TIMESTAMP_MICROS,
  • UINT_8,
  • UINT_16,
  • UINT_32,
  • UINT_64,
  • INT_8,
  • INT_16,
  • INT_32,
  • INT_64,
  • JSON,
  • BSON,
  • INTERVAL

embulk-output-s3_parquet でサポートする timestamp 型はここに現れる TIMESTAMP_MILLIS, TIMESTAMP_MICROS になります! また Parquet では文字列型がこの Converted TypesUTF8 OriginalType を使って表現されます。 そのため、Parquet の Converted Types は例えば Avro における Logical Types よりは多くのユーザが触れる機会がある概念かも知れません。

新しい仕様 "Logical Types"

新しい Logical Types は UUID やナノ秒精度 timestamp などさらにリッチな型をサポートしていたり、 Builder パターンでのスキーマ構築をサポートしていたりと様々な更新を含みます。 Converted Types との互換性も考慮されており、互換がある表現の場合は変換メソッドも提供されます。

これは Parquet 1.11.0 からサポートされているのですが、 maven central repository に上がったのが 2019 年 11 月でありまだ歴史が浅い機能といえます。 ミドルウェアの対応としても、 Hive でサポート開始がされ始めた ようですが Spark はまだ だったり普及するのにはまだ時間を要しそうです。 BigQuery などクラウドサービスにおいても、スケジュールは定かではないですが、サポートするにしてももうしばらく時間が必要かも知れません。

そんな訳で 2020 年 2 月現在、まだ多くの場合は Converted Types を利用することになると思うのですが、将来的に Logical Types に置き換えることや変換メソッドがあることを知っておくと良いでしょう。 またドキュメントは新しい Logical Types に関する記述の方が増えていくと思われ、現行2つの仕様がありドキュメントの記述と自分が必要としてい仕様がどちらのものか意識することがしばらく必要かも知れません。

Schema Registry について書いていく その1: Confluent Schema Registry

分散アプリケーション間のメッセージングやログ収集基盤において、しばしばスキーマの扱いは便利である反面頭を悩ませる種になります。

スキーマを厳密に定義して、 Protocol Buffers や Avro などのシリアライゼーションフォーマットを用いることで、メッセージのサイズの減少やメッセージの扱いの安全性を実行時より早く検出できる機会が増えます。 しかしスキーマの互換性に気をつけたり、メッセージをシリアライズ・デシリアライズするアプリケーション間でスキーマ情報をどう共有するかの課題も発生します。 この課題は、昨年発売された "データ指向アプリケーションデザイン" でも "4.1.4.3 そもそもライターのスキーマとは何なのか?" などで触れられています。

世の中にはこの課題を解決するための "スキーマレジストリ" と呼ばれるソフトウェア、サービスがいくつか存在します。 残念ながら日本語の資料があまり無いようにも感じますし、筆者の気が済むか飽きるまで様々なスキーマレジストリを探索してみようかと思います。 第一回は Confluent Schema Registry について調べてみます。 (残念ながら筆者は Confluent Schema Registry を実務で触れた経験がなく、なにか内容に誤りがある可能性があります)

Confluent Schema Registry とは

Confluent Schema Registry は、マネージドな Apache Kafka を提供する Confluent 社 が開発したスキーマレジストリです。

docs.confluent.io

以下のような機能を持っています。

Apache Avro については以前本ブログで触れましたし世の中にドキュメントが多数存在するのでここでは別段触れません。

スキーマは Subject という識別に用いる名前とバージョン番号で管理されます。 Subject をどう決めるかはいくつかのオプションがあり、デフォルトでは topic 名になっています。 topic 名を用いる場合はある topic に流れるレコードのスキーマは一つに限定され、複数の異なるスキーマのレコードを流すことは出来ません。 これを行いたい場合は他にレコード名もしくは topic 名とレコード名の組み合わせを Subject として用いることになります。

スキーマ情報の格納先がそれもまた Kafka であるのはちょっとユニークな点と言えそうです。 デフォルトではシングルパーティション_schemas topic にスキーマ情報を格納するようです。

docs.confluent.io

スキーマレジストリという機構の悩ましい点として、それ自体がメッセージバスと同等くらいの可用性を求められることが挙げられます。 メッセージバスが生きていてもスキーマレジストリが死んでいるなら、シリアライズする時に用いたスキーマを何処にも格納できず、デシリアライズする側のシステムに伝える術も失うためです。 Confluent Schema Registry ではスキーマレジストリREST API を提供する HTTP サーバの可用性の懸念は残りますが、ストレージは Kafka それ自体と運命共同体になるため考える課題が一つ減りそうです。

Confluent Schema Registry は更にスキーマの更新問題にも解決策をもらたします。 アプリケーションを稼働させていくうちにスキーマを更新したくなるシーンが登場するのは自然なことでしょう。 この際に Confluent Schema Registry は Kafka の Producer が作成した Avro のスキーマは解決してくれますが、 Consumer 側にも配慮する必要があります。 例えば Consumer が必要としているフィールドがある日 Producer が送信するレコードから無断で削除されたなら、 Consumer 側で何らかの問題が出るかもしれません。 この問題に配慮するため、 Confluent Schema Registry ではスキーマの互換性チェックの機構を設けています。 互換性には前方互換性、後方互換性、完全互換性の 3 タイプが設けられています。

docs.confluent.io

前方互換性は Producer が Consumer より先にスキーマを更新するパターンで求められる互換性です。 この場合フィールドの追加もしくは optional 、すなわちデフォルト値を持つフィールドの削除が許されています。 これらの変更が行われても Consumer は、自分が知らない追加フィールドは無視する、削除されたフィールドはデフォルト値で埋めることができます。

f:id:syu_cream:20200131004548p:plain

後方互換性は Consumer が Producer より先にスキーマを更新するパターンで求められる互換性です。 この場合は前述の逆で、 optional なフィールドの追加とフィールドの削除が許されています。 この互換性タイプが Confluent Schema Registry のデフォルトの互換性モードであり、現実世界で気を使われることが多いものであると思います。

f:id:syu_cream:20200131004611p:plain

完全互換性は Producer と Consumer どちらが先にスキーマを更新しても良いパターンで求められる互換性です。 この場合は制約が最も弱いようなスキーマ更新、つまり optional なフィールドの追加・削除が許されるようになります。 この互換性タイプは制約が弱いため扱いにも困るケースが出るかも知れませんが、 Confluent のプラットフォーム以外のミドルウェアやキャッシュ機構が存在したり処理するレコードの順序が保証されなかったりするケースに、互換性の問題で Consumer 側でエラーが発生するケースを低減できそうです。

f:id:syu_cream:20200131004629p:plain

Confluent Schema Registry はスキーマを更新しようとする際に、求められるタイプに従って互換性をチェックする機能を提供します。

REST API は実際に Confluent Schema Registry に対応した SerDe でスキーマ操作を行う際に使われる API にもなります。 手動で REST API を叩いてスキーマ登録を行うなどをしても良いでしょう。 この API の仕様は以下にまとまっています。

docs.confluent.io

管理用の WebUI については実際に触れてみた方が話が早そうです。次に触れてみましょう。

Confluent Schema Registry に実際に触れてみる

クイックスタート

公式で Docker イメージと docker-compose yml ファイルが提供されており、とりあえず動かす敷居が低めになっています。

docs.confluent.io

このドキュメントに沿って Docker コンテナを動かした後にブラウザ上で http://localhost:9021/ にアクセスしてみます。 上手く動いていれば以下のような管理画面を閲覧できるはずです。

f:id:syu_cream:20200129004253p:plain

Avro のスキーマを登録してみる

ここでは実験用に hello topic を作成して、この topic に流すレコードの valueスキーマを設定してみようと思います。

ここではバージョン 1 のスキーマとして以下を与えてみます。

{
  "name": "Hello",
  "type": "record",
  "fields": [
    {
      "name": "id",
      "type": "long"
    },
    {
      "name": "name",
      "type": "string"
    }
  ]
}

登録したスキーマを WebUI 上で確認できます。

f:id:syu_cream:20200131001440p:plain

一度スキーマを登録すると互換性モードを選べるようになります。 ここでは一つ前のバージョンのスキーマ後方互換性を保つモードである Backward に設定しておきます。

f:id:syu_cream:20200131001705p:plain

Avro のスキーマを更新してみる; 互換性がある場合

先程登録したスキーマを更新してみようと思います。 ここでは年齢を指定する想定で {"name": "age", "type": ["null", "long"], "default": null} という nullable なフィールドを追加してみます。

{
  "name": "Hello",
  "type": "record",
  "fields": [
    {
      "name": "id",
      "type": "long"
    },
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "age",
      "type": ["null", "long"],
      "default": null
    }
  ]
}

このスキーマ変更は optional なフィールド追加に相当して後方互換性チェックをパスすることができます。 無事 WebUI からも新しいバージョンのスキーマを閲覧することができています。 diff 表示も出来ていい感じです。

f:id:syu_cream:20200131002225p:plain

Avro のスキーマを更新してみる; 互換性がない場合

ここから後方互換性が無い変更をしてみます。 よく考えたらこのレコードをアプリケーション上で作成した時刻の timestamp 値を保持したくなってきました。 ということで optional でないフィールド {"name": "created_at", "type": "long", "logicalType": "timestamp-millis"} を追加してみます。

{
  "name": "Hello",
  "type": "record",
  "fields": [
    {
      "name": "id",
      "type": "long"
    },
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "age",
      "type": ["null", "long"],
      "default": null
    },
    {
      "name": "created_at",
      "type": "long",
      "logicalType": "timestamp-millis"
    }
  ]
}

しかしながら先述の通り後方互換性を保つには optional なフィールド追加は許されません。 この場合 Confluent Schema Registry が互換性が保てない旨のメッセージを出してスキーマの更新が拒否されます。

f:id:syu_cream:20200131002951p:plain

このように Confluent Schema Registry では、スキーマの管理と共に、求められる互換性のタイプに従ってそれを維持するようチェックをかけてくれます。 本当はスキーマ管理を .avsc ファイルなどで保存して CI で互換性チェックし、パスすればスキーマ更新するなど人間による直接の更新を避けるのが望ましいかもですが、この WebUI と互換性チェック機構で人間による直接管理もある程度運用に耐えるかもしれません。

余談; _schemas topic

WebUI 上では _schemas topic の様子も確認することができます。 log.cleanup.policydelete ではなく compact が設定されており、これにより時間経過等によるレコードの削除を抑制しています。

f:id:syu_cream:20200131010901p:plain

Confluent Schema Registry の実装を覗いてみる

Confluent Schema Registry のコードはGitHub で公開されています。折角なのでどんな実装になっているか少し覗いてみます。 ちなみにここでは網羅性などは全く考慮していません。筆者が気になるポイントを勝手にかいつまんでいます。

github.com

ここで紹介する内容は v5.4.0 時点のものとなります。

avro-serializer: Schema Registry と通信した結果でシリアライズ・デシリアライズを行う

この module に存在するのが、実際に producer, consumer を実装する際に properties に指定する serializer, deserializer になります。

シリアライズのコアの実装が AbstractKafkaAvroSerializerserializeImpl() に存在します。 自動登録する設定になっているなら Avro スキーマスキーマレジストリに登録し、その後 Kafka の topic に送信するメッセージをシリアライズします。 この時のバイナリレイアウトなのですが、 この Serializer 実装特有のフォーマットになっており Avro のレコードの手前に 8 バイトのマジックナンバースキーマの ID が埋め込まれています。 Avro のレコードのバイナリはおなじみ SpecificDatumWriter, ReflectDatumWriter, あるいは GenericDatumWriter で書き出します。

docs.confluent.io

これに対応してデシリアライズ側の実装は AbstractKafkaAvroDeserializer クラスに多くのロジックが持たれています。 まずは deserialize() ですがリーダ側のスキーマが渡せるようになっており、デシリアライズの結果をライター側とは異なる(しかし互換性はある)型に展開できるようになっています。 ライター側のスキーマは勿論スキーマレジストリ経由で解決できます。この辺りの実装は AbstractKafkaAvroDeserializer の内部クラスの DeserializationContext クラスで行われます。 このクラスに渡された、デシリアライズ対象のバイナリは、先頭 8 バイトにマジックナンバースキーマ ID が存在するはすです。 まずマジックナンバーの存在チェックを行った後にスキーマ ID を取り出し、この ID をスキーマレジストリに問い合わせて Avro のスキーマ情報を取得します。 その後はやはりおなじみ SpecificDatumReader, ReflectDatumReader, GenericDatumReaderJava のオブジェクトとして読み出します。

これで概ね Avro のレコードのシリアライズとデシリアライズの流れが掴めますが、スキーマの登録と取得がどのように成されているのでしょうか。 先述の Serializer, Deserializer の共通の親クラスとして AbstractKafkaAvroSerDe があり、このクラスがデフォルトでは CachedSchemaRegistryClient というスキーマレジストリに対するクライアントを保持しています。 このクライアントの実装がどうなっているのか、次はスキーマレジストリとの通信を行っている client module も覗いてみます。

client: Schema Registry のクライアント

SchemaRegistryClient インタフェースを実装したクラスが実際に Schema Registry と通信しています。 先述の CachedSchemaRegistryClient はその 1 実装であり、スキーマや ID, バージョン情報のキャッシュと RestService というクラスのメンバを持ちます。 そして RestService が実際に HTTP リクエストをスキーマレジストリREST API に送信する、例えばスキーマの登録時に /subjects/:subject/versions に POST リクエストを送る、というような格好になります。

core: Schema Registry のコア実装

もうひとつ、スキーマレジストリREST API サーバを提供する core モジュールも気になる存在です。 REST API の実装やスキーマ情報などの永続化をどのように行っているのでしょうか。

スキーマレジストリREST API サーバは Jetty + Jersey で実装されているようです。 API の各エンドポイントは JAX-RS の Resource として実装されており、例えば /subjects/{subject}/versions であれば SubjectVersionsResource が相当する、などの構成になっています。

スキーマなどの情報を永続化するストレージレイヤは KafkaStore クラスで提供されています。 スキーマの登録のため put() が呼ばれると、スキーマ情報を ProducerRecord に包んで _schemas topic に 送ります。 他方、スキーマの情報取得はというと KafkaStore 自体では直接は _schemas topic を consume せず、別スレッドで動作してローカルスキーマキャッシュを更新する KafkaStoreReaderThread が更新した InMemoryCache から取得する格好になります。 この動作についてはドキュメントでも言及されています。

docs.confluent.io

おわりに

Confluent Schema Registry について筆者の思うまま気になるところをざっくり紹介しました。 いかがでしたでしょうか。

次回は Apache Pulsar の持つ Schema Registry 機能について調べて行こうかなと思います。 なんとこちらのスキーマレジストリは Avro は勿論 Protocol Buffers もサポートしているようです! Protocol Buffers は各言語向けの自動生成されたライブラリを使う、 Avro で言う SpecificRecord のサブクラスに近い性質を持つ表現で扱われることが多いと思われますが、スキーマレジストリをどのように提供しているのでしょうか。

pulsar.apache.org

余談ですが本記事執筆中に Confluent Schema Registry でも Protocol Buffers をサポートする気配のある pull request を発見してしまいました。 近い将来 Avro 以外でシリアライズしたレコードが色んなミドルウェアでらくらくに扱える日が来るかも知れませんね。

github.com