ブログ・ア・ラ・クレーム

技術的なメモとかライフログとか。

Schema Registry について書いていく その1: Confluent Schema Registry

分散アプリケーション間のメッセージングやログ収集基盤において、しばしばスキーマの扱いは便利である反面頭を悩ませる種になります。

スキーマを厳密に定義して、 Protocol Buffers や Avro などのシリアライゼーションフォーマットを用いることで、メッセージのサイズの減少やメッセージの扱いの安全性を実行時より早く検出できる機会が増えます。 しかしスキーマの互換性に気をつけたり、メッセージをシリアライズ・デシリアライズするアプリケーション間でスキーマ情報をどう共有するかの課題も発生します。 この課題は、昨年発売された "データ指向アプリケーションデザイン" でも "4.1.4.3 そもそもライターのスキーマとは何なのか?" などで触れられています。

世の中にはこの課題を解決するための "スキーマレジストリ" と呼ばれるソフトウェア、サービスがいくつか存在します。 残念ながら日本語の資料があまり無いようにも感じますし、筆者の気が済むか飽きるまで様々なスキーマレジストリを探索してみようかと思います。 第一回は Confluent Schema Registry について調べてみます。 (残念ながら筆者は Confluent Schema Registry を実務で触れた経験がなく、なにか内容に誤りがある可能性があります)

Confluent Schema Registry とは

Confluent Schema Registry は、マネージドな Apache Kafka を提供する Confluent 社 が開発したスキーマレジストリです。

docs.confluent.io

以下のような機能を持っています。

Apache Avro については以前本ブログで触れましたし世の中にドキュメントが多数存在するのでここでは別段触れません。

スキーマは Subject という識別に用いる名前とバージョン番号で管理されます。 Subject をどう決めるかはいくつかのオプションがあり、デフォルトでは topic 名になっています。 topic 名を用いる場合はある topic に流れるレコードのスキーマは一つに限定され、複数の異なるスキーマのレコードを流すことは出来ません。 これを行いたい場合は他にレコード名もしくは topic 名とレコード名の組み合わせを Subject として用いることになります。

スキーマ情報の格納先がそれもまた Kafka であるのはちょっとユニークな点と言えそうです。 デフォルトではシングルパーティション_schemas topic にスキーマ情報を格納するようです。

docs.confluent.io

スキーマレジストリという機構の悩ましい点として、それ自体がメッセージバスと同等くらいの可用性を求められることが挙げられます。 メッセージバスが生きていてもスキーマレジストリが死んでいるなら、シリアライズする時に用いたスキーマを何処にも格納できず、デシリアライズする側のシステムに伝える術も失うためです。 Confluent Schema Registry ではスキーマレジストリREST API を提供する HTTP サーバの可用性の懸念は残りますが、ストレージは Kafka それ自体と運命共同体になるため考える課題が一つ減りそうです。

Confluent Schema Registry は更にスキーマの更新問題にも解決策をもらたします。 アプリケーションを稼働させていくうちにスキーマを更新したくなるシーンが登場するのは自然なことでしょう。 この際に Confluent Schema Registry は Kafka の Producer が作成した Avro のスキーマは解決してくれますが、 Consumer 側にも配慮する必要があります。 例えば Consumer が必要としているフィールドがある日 Producer が送信するレコードから無断で削除されたなら、 Consumer 側で何らかの問題が出るかもしれません。 この問題に配慮するため、 Confluent Schema Registry ではスキーマの互換性チェックの機構を設けています。 互換性には前方互換性、後方互換性、完全互換性の 3 タイプが設けられています。

docs.confluent.io

前方互換性は Producer が Consumer より先にスキーマを更新するパターンで求められる互換性です。 この場合フィールドの追加もしくは optional 、すなわちデフォルト値を持つフィールドの削除が許されています。 これらの変更が行われても Consumer は、自分が知らない追加フィールドは無視する、削除されたフィールドはデフォルト値で埋めることができます。

f:id:syu_cream:20200131004548p:plain

後方互換性は Consumer が Producer より先にスキーマを更新するパターンで求められる互換性です。 この場合は前述の逆で、 optional なフィールドの追加とフィールドの削除が許されています。 この互換性タイプが Confluent Schema Registry のデフォルトの互換性モードであり、現実世界で気を使われることが多いものであると思います。

f:id:syu_cream:20200131004611p:plain

完全互換性は Producer と Consumer どちらが先にスキーマを更新しても良いパターンで求められる互換性です。 この場合は制約が最も弱いようなスキーマ更新、つまり optional なフィールドの追加・削除が許されるようになります。 この互換性タイプは制約が弱いため扱いにも困るケースが出るかも知れませんが、 Confluent のプラットフォーム以外のミドルウェアやキャッシュ機構が存在したり処理するレコードの順序が保証されなかったりするケースに、互換性の問題で Consumer 側でエラーが発生するケースを低減できそうです。

f:id:syu_cream:20200131004629p:plain

Confluent Schema Registry はスキーマを更新しようとする際に、求められるタイプに従って互換性をチェックする機能を提供します。

REST API は実際に Confluent Schema Registry に対応した SerDe でスキーマ操作を行う際に使われる API にもなります。 手動で REST API を叩いてスキーマ登録を行うなどをしても良いでしょう。 この API の仕様は以下にまとまっています。

docs.confluent.io

管理用の WebUI については実際に触れてみた方が話が早そうです。次に触れてみましょう。

Confluent Schema Registry に実際に触れてみる

クイックスタート

公式で Docker イメージと docker-compose yml ファイルが提供されており、とりあえず動かす敷居が低めになっています。

docs.confluent.io

このドキュメントに沿って Docker コンテナを動かした後にブラウザ上で http://localhost:9021/ にアクセスしてみます。 上手く動いていれば以下のような管理画面を閲覧できるはずです。

f:id:syu_cream:20200129004253p:plain

Avro のスキーマを登録してみる

ここでは実験用に hello topic を作成して、この topic に流すレコードの valueスキーマを設定してみようと思います。

ここではバージョン 1 のスキーマとして以下を与えてみます。

{
  "name": "Hello",
  "type": "record",
  "fields": [
    {
      "name": "id",
      "type": "long"
    },
    {
      "name": "name",
      "type": "string"
    }
  ]
}

登録したスキーマを WebUI 上で確認できます。

f:id:syu_cream:20200131001440p:plain

一度スキーマを登録すると互換性モードを選べるようになります。 ここでは一つ前のバージョンのスキーマ後方互換性を保つモードである Backward に設定しておきます。

f:id:syu_cream:20200131001705p:plain

Avro のスキーマを更新してみる; 互換性がある場合

先程登録したスキーマを更新してみようと思います。 ここでは年齢を指定する想定で {"name": "age", "type": ["null", "long"], "default": null} という nullable なフィールドを追加してみます。

{
  "name": "Hello",
  "type": "record",
  "fields": [
    {
      "name": "id",
      "type": "long"
    },
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "age",
      "type": ["null", "long"],
      "default": null
    }
  ]
}

このスキーマ変更は optional なフィールド追加に相当して後方互換性チェックをパスすることができます。 無事 WebUI からも新しいバージョンのスキーマを閲覧することができています。 diff 表示も出来ていい感じです。

f:id:syu_cream:20200131002225p:plain

Avro のスキーマを更新してみる; 互換性がない場合

ここから後方互換性が無い変更をしてみます。 よく考えたらこのレコードをアプリケーション上で作成した時刻の timestamp 値を保持したくなってきました。 ということで optional でないフィールド {"name": "created_at", "type": "long", "logicalType": "timestamp-millis"} を追加してみます。

{
  "name": "Hello",
  "type": "record",
  "fields": [
    {
      "name": "id",
      "type": "long"
    },
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "age",
      "type": ["null", "long"],
      "default": null
    },
    {
      "name": "created_at",
      "type": "long",
      "logicalType": "timestamp-millis"
    }
  ]
}

しかしながら先述の通り後方互換性を保つには optional なフィールド追加は許されません。 この場合 Confluent Schema Registry が互換性が保てない旨のメッセージを出してスキーマの更新が拒否されます。

f:id:syu_cream:20200131002951p:plain

このように Confluent Schema Registry では、スキーマの管理と共に、求められる互換性のタイプに従ってそれを維持するようチェックをかけてくれます。 本当はスキーマ管理を .avsc ファイルなどで保存して CI で互換性チェックし、パスすればスキーマ更新するなど人間による直接の更新を避けるのが望ましいかもですが、この WebUI と互換性チェック機構で人間による直接管理もある程度運用に耐えるかもしれません。

余談; _schemas topic

WebUI 上では _schemas topic の様子も確認することができます。 log.cleanup.policydelete ではなく compact が設定されており、これにより時間経過等によるレコードの削除を抑制しています。

f:id:syu_cream:20200131010901p:plain

Confluent Schema Registry の実装を覗いてみる

Confluent Schema Registry のコードはGitHub で公開されています。折角なのでどんな実装になっているか少し覗いてみます。 ちなみにここでは網羅性などは全く考慮していません。筆者が気になるポイントを勝手にかいつまんでいます。

github.com

ここで紹介する内容は v5.4.0 時点のものとなります。

avro-serializer: Schema Registry と通信した結果でシリアライズ・デシリアライズを行う

この module に存在するのが、実際に producer, consumer を実装する際に properties に指定する serializer, deserializer になります。

シリアライズのコアの実装が AbstractKafkaAvroSerializerserializeImpl() に存在します。 自動登録する設定になっているなら Avro スキーマスキーマレジストリに登録し、その後 Kafka の topic に送信するメッセージをシリアライズします。 この時のバイナリレイアウトなのですが、 この Serializer 実装特有のフォーマットになっており Avro のレコードの手前に 8 バイトのマジックナンバースキーマの ID が埋め込まれています。 Avro のレコードのバイナリはおなじみ SpecificDatumWriter, ReflectDatumWriter, あるいは GenericDatumWriter で書き出します。

docs.confluent.io

これに対応してデシリアライズ側の実装は AbstractKafkaAvroDeserializer クラスに多くのロジックが持たれています。 まずは deserialize() ですがリーダ側のスキーマが渡せるようになっており、デシリアライズの結果をライター側とは異なる(しかし互換性はある)型に展開できるようになっています。 ライター側のスキーマは勿論スキーマレジストリ経由で解決できます。この辺りの実装は AbstractKafkaAvroDeserializer の内部クラスの DeserializationContext クラスで行われます。 このクラスに渡された、デシリアライズ対象のバイナリは、先頭 8 バイトにマジックナンバースキーマ ID が存在するはすです。 まずマジックナンバーの存在チェックを行った後にスキーマ ID を取り出し、この ID をスキーマレジストリに問い合わせて Avro のスキーマ情報を取得します。 その後はやはりおなじみ SpecificDatumReader, ReflectDatumReader, GenericDatumReaderJava のオブジェクトとして読み出します。

これで概ね Avro のレコードのシリアライズとデシリアライズの流れが掴めますが、スキーマの登録と取得がどのように成されているのでしょうか。 先述の Serializer, Deserializer の共通の親クラスとして AbstractKafkaAvroSerDe があり、このクラスがデフォルトでは CachedSchemaRegistryClient というスキーマレジストリに対するクライアントを保持しています。 このクライアントの実装がどうなっているのか、次はスキーマレジストリとの通信を行っている client module も覗いてみます。

client: Schema Registry のクライアント

SchemaRegistryClient インタフェースを実装したクラスが実際に Schema Registry と通信しています。 先述の CachedSchemaRegistryClient はその 1 実装であり、スキーマや ID, バージョン情報のキャッシュと RestService というクラスのメンバを持ちます。 そして RestService が実際に HTTP リクエストをスキーマレジストリREST API に送信する、例えばスキーマの登録時に /subjects/:subject/versions に POST リクエストを送る、というような格好になります。

core: Schema Registry のコア実装

もうひとつ、スキーマレジストリREST API サーバを提供する core モジュールも気になる存在です。 REST API の実装やスキーマ情報などの永続化をどのように行っているのでしょうか。

スキーマレジストリREST API サーバは Jetty + Jersey で実装されているようです。 API の各エンドポイントは JAX-RS の Resource として実装されており、例えば /subjects/{subject}/versions であれば SubjectVersionsResource が相当する、などの構成になっています。

スキーマなどの情報を永続化するストレージレイヤは KafkaStore クラスで提供されています。 スキーマの登録のため put() が呼ばれると、スキーマ情報を ProducerRecord に包んで _schemas topic に 送ります。 他方、スキーマの情報取得はというと KafkaStore 自体では直接は _schemas topic を consume せず、別スレッドで動作してローカルスキーマキャッシュを更新する KafkaStoreReaderThread が更新した InMemoryCache から取得する格好になります。 この動作についてはドキュメントでも言及されています。

docs.confluent.io

おわりに

Confluent Schema Registry について筆者の思うまま気になるところをざっくり紹介しました。 いかがでしたでしょうか。

次回は Apache Pulsar の持つ Schema Registry 機能について調べて行こうかなと思います。 なんとこちらのスキーマレジストリは Avro は勿論 Protocol Buffers もサポートしているようです! Protocol Buffers は各言語向けの自動生成されたライブラリを使う、 Avro で言う SpecificRecord のサブクラスに近い性質を持つ表現で扱われることが多いと思われますが、スキーマレジストリをどのように提供しているのでしょうか。

pulsar.apache.org

余談ですが本記事執筆中に Confluent Schema Registry でも Protocol Buffers をサポートする気配のある pull request を発見してしまいました。 近い将来 Avro 以外でシリアライズしたレコードが色んなミドルウェアでらくらくに扱える日が来るかも知れませんね。

github.com