ブログ・ア・ラ・クレーム

技術的なメモとかライフログとか。

転職しました: メルペイ -> Ubie ~クセつよ組織を求めて~

f:id:syu_cream:20210804130053p:plain

表題の通りで、 2021年7月より前職の株式会社メルペイ(メルカリ)を退職して Ubie に転職しました。というわけでいわゆる転職エントリです。

TL;DR

  • メルカリ・メルペイに 3 年 10 ヶ月在籍しました
  • スキルや経験の幅を持たせたい、サービス初期の雰囲気をまた味わいたくて転職
  • Ubie に入社して SWE として働き出して一ヶ月経過しました

メルカリ・メルペイでやってたこと

メルカリには 2017 年 8 月に入社し、約半年間 SRE チームに所属していました。その後メルペイの立ち上げに合わせて異動して 2021 年 6 月末までデータエンジニアをやっていました。本当に数多くの同僚や関係者に恵まれておりました。お世話になった方々ありがとうございました。

メルカリ・メルペイで体験したことは枚挙に暇がなく、約 4 年で体験したことなのかと疑うほどに濃密なものでした。プロ集団の SRE チームの一員として働き、刺激を受けたこと。メルペイの立ち上げ初期メンバーに加わり、大規模な新サービスのローンチに関われたこと。マザーズ上場。社員数の急拡大(自分が入社した頃より最終的に約 3 倍になった!)と多様なバックグランドを持つメンバーが揃い出したこと。大量のデータを取り扱うデータ基盤の構築にリソースを振り切れたこと。

特にデータ基盤の構築は約 4 年の歳月の中で注力していた事であり、成果を Builerscon 2018 LT や ApacheCon NA 2019 で発表させて頂くにも至りました。

speakerdeck.com

speakerdeck.com

メルカリ・メルペイは先日発表があったメルカリShops や、メルコインも手がけており、現状に満足せずに Go Bold に挑戦し続る、刺激の強い環境です。

about.mercari.com

将来に向けた葛藤

そんな良い環境を手放してまで転職する理由としては、ひとえに自分のキャリアと将来を考えた上で別の道を歩むのが良いと判断したためです。

転職に至るまで以下のような葛藤を抱えていました。 - 自分の年齢と人生を考慮して今もっと挑戦すべきと感じた - 自分の立場がコンフォートゾーンになってきた - スタートアップの雰囲気をより感じたくなった それぞれ分解してみます。

自分の年齢と人生を考慮して今もっと挑戦すべきと感じた

今年で自分は 33 歳であり、今後のライフイベントも考慮するとどこまで自由に挑戦できるか疑問が浮かびます。今このとき、体力や情熱があり多少人生の時間の使い方を自由にできる時により挑戦すべきではないかと感じていました。

自分の立場がコンフォートゾーンになってきた

ありふれた話ですが、自分の立場がコンフォートゾーンになり、新しい刺激が少なく成長をあまり実感できなくなっていました。

メルカリ・メルペイでは主にデータ基盤の開発と運用をやっていました。こうしたいわゆる社内プラットフォームやプロダクト基盤などと呼ばれるシステムの担当分野においては、特定の技術的な問題と挑戦にフォーカスしやすい反面、ビジネス上のドメイン知識の獲得や多様な技術の理解が得難いというデメリットがあると考えます。また基盤刷新や新技術の導入で新しい知識を獲得する機会はあるものの、徐々に強くてニューゲームになり刺激が無くなってくるようにも感じます。自分の直近の実例を取ると、 Apache Airflow や Apache Beam, Apache Spark, Apache Flink を活用したデータ収集基盤の構築や BigQuery による出たウェアハウス提供に従事してきました。他方、連携するマイクロサービスの知識はそれほど身に付かず、 Android, iOS アプリや Web のフロントエンドについては更に距離のある話題になっていました。

前前職まで遡ると、自分のキャリアにおいてプラットフォーム的な役割に振った時間は 8 割ほどを占めています。この方向性を維持するにも別の道を模索するにも、より多様な経験を積んで能力や発想に幅を持たせるべきではないかと考えました。

スタートアップの雰囲気をより感じたくなった

自分がメルカリに入社した頃と比べて社員数は劇的に変化しました。組織は成熟して上場も果たし、仕事の仕方も自分の入社した頃と比較してこなれてきた印象がありました。それに際してどこか「昔はよかった」と感じてしまうシーンが増えてきました。(大抵は思い出を過剰に美化しているかもですが)

考えるべきこと、やるべき仕事が多量にある急成長中のスタートアップでは刺激も多く学べることがあるはずです。自分には今、再びそのカオスだけど喜びが多い環境に身をおくべきなのではと葛藤しました。

Ubie 入社

前述のような葛藤をかれこれ半年ほど続けて、最終的に 2021 年 7 月に医療系スタートアップである Ubie 株式会社に入社する運びになりました。

ubie.life

Ubie は現在ホットな医療という分野での活躍をねらい、また特色のあるというかクセが強い組織や文化を持っており、これが自分が抱える課題を緩和する材料になると感じたためです。

医療 X テクノロジーは挑戦しがいがある

代表の阿部の記事の通り、医療分野においてテクノロジーを組み合わせて解決できる課題があり、挑戦しがいがあります。

note.com

「テクノロジーで人々を適切な医療に案内する」ことで救える命、助けられる人々が居るはずです。もっと身近に言うなら将来の自分を救うことにもなりえます。俺もまだまだ若いし放っておけば直るっしょ、という油断をいつまでできるか分かったものではありません。このコロナ禍で、我々は誰しも当事者になりうるという危機感を植え付けられています。医療と将来の健康状態は今改めて我々の目の前に見える壁として立ちはだかっています。

そして課題があるということは、ビジネスチャンスにも満ちていると言えるでしょう。会社の成長に貢献して、対価としての報酬を得る。自分が事業にコミットしている実感が非常に出る状況が揃っていると考えます。

ラクラシー組織と頻繁な異動

Ubie ではホラクラシーという組織体系を採っています。詳細は以下にまとまっています。個人的に響いた点として、有機的に組織が変化して異動が頻繁に行われていること、個人が複数のロールに紐付き業務を遂行することが挙げられます。(実際、入社して二日目で自分のメンターが異動してチームから居なくなりました。ワロタ、そんなのってアリ!?)

note.com

ロールベースの働き方は、現在多様な経験を積んで引き出しを増やしたく感じている自分にとってマッチしそうです。 will/can/must がマッチしそうな環境に身を置き、別の道があれば別のロールを探すこともできそうです。これだけ言うと兼務に近いイメージになりますが、 Ubie のホラクラシーではロールを積極的に抜けていく想定でもあり、機動力高く動いていけそうです。

さて一風変わったそんな組織にジョインして一ヶ月。現在は「生活者とかかりつけ医をつなぐ」為のプロダクト開発のため、ソフトウェアエンジニアとして動き初めています。先日新サービスに関するプレスも打たれました。

prtimes.jp

関わる技術スタックとしてはフロントエンドで React(Next.js), TypeScript バックエンドでは Kotlin, Spring Boot, GraphQL などで、正直ほとんど触ったことない(特に TypeScript は入社まで触ったことが全くなかった)要素ばかりです。しかしこれもまた良い刺激でありコンフォートゾーンからの脱出が捗ると考えています。

まだまだバリバリのスタートアップである

自分が入社した時点で社員数は 100 名超くらいでした。その中でも Web 周りを触ってるソフトウェアエンジニアに限定すると 15 名ほどと少数精鋭部隊になっています。

twitter.com

ということは個々人やれることややるべきことは無限にあるわけです。成長できる要素しかないし、このフェーズの雰囲気を味わえることは非常に価値があります。

また会社の規模と事業のフェーズも手伝ってか率直かつ建設的なコミュニケーションが行われています。根回しやプライベートなチャットなど社内政治を避け、かつ同僚にラフにコミュニケーションを取る体制は、自分には快く感じるに加えてバリバリの成長途中感を覚えさせるものになっています。

おわりに

というわけで、

  • メルカリ・メルペイに 3 年 10 ヶ月在籍しましたスキルや経験の幅を持たせたい
  • サービス初期の雰囲気をまた味わいたくて転職
  • Ubie に入社して SWE として働き出して一ヶ月経過しました

というお話でした。今後ともよろしくお願いいたします。また本記事がなにかの参考になれば幸いです。

もし本記事を通して Ubie にご興味を持っていただけた方がいらっしゃいましたら、採用ページあるいは僕に直接ご連絡いただけると幸いです!あんまり興味を持った訳じゃなくて最近どうなの?って話でも振って頂けると喜びます。

先述の通り Ubie ではまだまだメンバーを募集しており、更にプロダクト開発を邁進していきたい状況です。SRE やデータエンジニアについては固有の JD があり、

recruit.ubie.life

recruit.ubie.life

Web 周り触ってるソフトウェアエンジニア(僕の今の本業はここ)についてはこちらになります。

recruit.ubie.life

技術書典9で「Apache Parquet ではじめる快適 データ分析」を出します

技術書典9 で「Apache Parquet ではじめる快適 データ分析」を出します。 もしよろしければお手にとっていただければ幸いです。 まあ今回はオンライン開催で電子書籍のみの配布なので、物理的にお手に取れないんですけどね〜!

本書は Apache Parquet についてつらつらと紹介記事を書いた内容になります。 また付録的に同サークルメンバー著「USB デバイスを作るのがツラい」というテーマの記事も掲載します。

データ分析業務にムッチャ関わる、ストレージコストを最適化したい、 BigQuery などのデータウェアハウスサービスを日常的につかう、なんとなく気になった、ような人々に効果的だと思います。 以上よろしくお願いいたします。

f:id:syu_cream:20200906114431j:plain

目次:

第1章 Apache Parquet ではじめる快適データ分析 5
1.1 はじめに .................................. 5 
レコード指向フォーマットとは? ..................... 6 
カラムナフォーマットとは? ....................... 8 
レコード指向とカラムナ、OLTPとOLAP ............... 10 
カラムナフォーマットの実装例 ...................... 11
1.2 ApacheParquetとはなにか........................ 11 
並列読み書き処理化しやすいバイナリレイアウト............. 12 
スキーマが自己記述的 ........................... 15 
シンプルで柔軟性のある型表現 ...................... 15 
ネストされたカラムや繰り返しされるカラムに対しても有効 . . . . . . . 16 
多様なエンコード方法 ........................... 26 
豊富な圧縮コーデックを選択可能 ..................... 31 
メタデータを駆使したクエリ最適化が可能 ................ 32
1.3 ApacheParquet実装例 .......................... 33 
parquet-mr................................. 33 
ApacheArrowC++実装......................... 34 
Goにおける実装例............................. 34
1.4 実際に使ってみる ............................. 35 
Parquetファイルを生成してみる ..................... 35 
ParquetファイルにAthenaからクエリしてみる . . . . . . . . . . . . 39
1.5 実際の運用................................. 42 
Parquet で実際どれくらいファイルサイズが削減されるのか? . . . . . 42 
RowGroupとPageのサイズのチューニング .............. 42 
長期ログ保存におけるコスト削減に寄与できる .............. 42 
ストリーム処理に組み込む難しさを考慮する ............... 43
 SELECT*に弱い............................. 44 
1.6 おわりに .................................. 45
付録 A
A.1 はじめに .................................. 46
A.2 USB通信プロトコル概要 ......................... 46
A.3 USBデバイスの設計方針 ......................... 48
A.4 USBデバイスが動くまで ......................... 49
A.5 USBの消費電力規格............................ 50
A.6 地獄のノイズ耐性試験 ........................... 50
トグルビット不一致 ............................ 51
安物のハブが...... ............................. 52
A.7 まとめ ................................... 52
あとがき
54
USB デバイスを作るのがツラい 46
@syu_cream .................................... 54 
@lunatic_star ................................... 54

Data Mesh の記事を読んだ

一年以上前の記事だけど、 https://martinfowler.com/ に "Data Mesh" をうたう記事があったので軽く読みました。

martinfowler.com

こちらに日本語で概要をまとめた記事もありご一読することをおすすめします。 僕の個人ブログを見るより確実で良い情報を得られるでしょう。

https://www.infoq.com/jp/news/2020/03/distributed-data-mesh/

以下では現行のぼくの業務と照らし合わせて、 Data Mesh について個人的解釈などを書いていきます。

Current status ...

二年くらい前に builderscon で "メルペイにおける、マイクロサービスに寄り添うログ収集基盤" みたいなタイトルで LT で発表したりしました。 当時、急速に開発されるマイクロサービス群と元から存在したモノリスなシステムに特化したデータ基盤が存在し、「マイクロサービス化したら分析等のためのデータどうなんの???」と漠然とした課題感はあるものの誰も答えを見出だせていない状況でした。

speakerdeck.com

そこから二年も経過すると弊データ基盤も色々とあり、上記に挙げた batch/streaming それぞれの要件に特化した仕組みを作ったり刷新したり、公開していないまた別のシステムを構築したりとかしていました。(その辺の最近の話も別途公開していければと思っています) この二年で発生した大きな変化としては、以下の辺りが挙げられるかと思います

  • ビジネスのスケールに対して自分が認知できる範囲が追いつかなくなった
  • マイクロサービスがむっちゃ増えた。俺は数えるのをやめた
  • データの要件も多種多様になった。種類によるところや性能要件など

この辺りの煽りを受けると、データ基盤もこのような変化に追従できなければ組織の中でのボトルネックになりかねないなという危機感を覚えています。

Data Mesh の話

という個人的振り返りをしつつ元記事の話題に移ります。

データ基盤前史

とにかく我々は「サイロ化」という言葉を好んで使い、打ち倒すべき敵みたいに扱います。 データのサイロ化もそのやり玉に挙がり、組織やシステム間でデータ連携ができずに分析基盤でうまく扱えない課題を指摘されることがあります。 これに対して、データレイクやデータウェアハウスみたいな一元的にデータが管理可能な入れ物を用意して、とりあえずそこにデータを突っ込む道路を舗装して分析業務を回すみたいな解が取られてきたと思います。

Data Mesh の記事ではこのような一元的なアーキテクチャを前世代的なものと位置付けています。 中央集権的なデータ基盤は全体最適化には良いけれど、個別の高度な要件を満たすのが難しくなります。 またデータ基盤はデータの producer / consumer のようなデータの流れに沿った上流・下流の構図を作りがちです。んで、 consumer が要件を満たしたい場合上流に遡りつつデータ基盤屋さんにも相談するような依存関係が生まれます。 さらにそうした構図が生まれると中流に位置するデータ基盤のチームは時として producer/consumer のドメイン知識を求められるかもしれません。その振る舞いを行えるメンバーがどれだけ確保できるでしょうか・・・。

個人的にはこうしたデータ基盤のモノリス化はなんら不思議ではないと思います。 BigQuery はじめとした便利なデータ基盤に使えるシステムが台頭してきてはいますが、データエンジニアリングの領域は未だ職人芸が求められる領域であり、それに特化したスペシャリストが基盤構築を行うのは自然かなと。 またデータ基盤構築にあたり、まずデータを一定数揃えないとバリューを出しにくいでしょうから producer に寄った最適化をして「とりあえずデータを集める」「データレイクに突っ込んでから後のことを考える」のは理にかなっていると考えます。 とはいえデータ基盤の利用者が増えて、 consumer のリクエストを聞き始めると苦しみが生まれ始めるとも考えられます。 自分の実体験としても、黙っててもデータ基盤がワークするケースというのは producer と consumer が同一のチームかあまり距離が遠くないチームのケースが多いような気もしています。

データをメッシュにする

この記事における前世代的なデータ基盤の課題の解決方法は、マイクロサービスアーキテクチャさながらモノリスの分解だと考えられます。

データメッシュの世界では一元的でモノリスなデータ基盤は存在せず、代わりに広く使われるデータインフラを見るチームと分散したデータ処理システムが存在します。 また明確な producer と consumer という立場を生じさせず、各ドメインチームがデータの管理も行い相互にコミュニケーションします。 分散することで前述のサイロ化問題が再熱しそうですが、横断的なデータガバナンスの仕組みやセルフサーブ可能なエコシステムを導入していきます。

データメッシュの思想は本質的には権限や責務の移譲と、データ基盤が真に基盤らしく振る舞うためのパラダイムシフトを起こすことだと考えます。 前者の思想はマイクロサービスアーキテクチャとよくなじみ、データの producer がマイクロサービスであるならばその延長でデータも扱えればいいだけでしょう。 データ基盤が基盤本来の仕事に集中するのも重要なことで、データの producer / consumer が増えるにつれ無限にドメイン知識が求められるなら組織のスケーラビリティは死んでいくし、同様の振る舞いができるメンバーを探すのが困難になってくると思います。 ぼくの所属する組織では Microservices Platform チームというマイクロサービスを支える基盤を構築するチームが存在し、マイクロサービスを開発運用するにあたり共通課題となる Kubernetes クラスタやデプロイパイプラインの提供を行っています。 これに近く各ドメインチームがデータにまつわる課題を解くための共通基盤を提供してセルフサーブ可能にして、しかし自身は課題を解く主役にはならないぐらいのバランスが求められるのかもしれません。

tech.mercari.com

そう理想は言ってもデータメッシュの世界観に沿うようなツールが無いとこの理想的世界に近づくことはかなわないでしょう。 データメッシュの記事では特に GCP のプロダクトについて、一元的なデータガバナンスなら Google Cloud DataCatalog が、バッチ・ストリーミング処理には統合的に扱えインフラがフルマネージドな Google Cloud Dataflow があると挙げています。 また筆者の経験ではデータメッシュの世界観でデータレイク的なポジションとして GCS を、ドメインごとに bucket を作成して利用して、データウェアハウスとして BigQuery を使うのもありかと考えます。 特に BigQuery は GCP プロジェクトが異なっていても参照する権限があれば JOIN することは可能であり、データメッシュのような論理的には分散したデータ基盤を実現するのにマッチするように感じます。

Data Mesh と俺

セルフサーブ可能な基盤を目指してなるべくデータ基盤がドメイン知識を抱え込まずコミュニケーションにおけるクリティカルパスにならないようにする思想は重要だと感じます。 前世代的な(と言われてしまった)データ基盤では producer/consumer のバリエーションも増えて、その間のコミュニケーションにデータ基盤が入ることでボトルネックを生むことになりかねません。 セルフサーブ可能であればある程度「勝手にやってくれ」といえる領域が増えてボトルネックが解消されてゆき、データ基盤チームはより基盤の作り込みに集中することができると思われます。 とはいえこれを最初期からゴールに据えるのも骨が折れる作業であると思うので、段階的に分散可能にしていくのが良いかもしれません。 最近では弊チームでもセルフサーブ・分散管理可能な設計にしつつ、枯れてくるまでは自チームで面倒を見るという思想で動くことが増えてきました。

データガバナンスやデータ処理の分散化そのものについてはやや懐疑的な部分があります。 前世代のデータ基盤でも十分多い数の producer が発生するはずで、データメッシュの話とは独立してデータガバナンス、メタデータ管理やリネージ追跡、クオリティチェックなどの課題を考えるべきでしょう。 もしかしたらデータ基盤チームがこれらの課題まで人手でカバーしているケースがあるかも知れませんが、それならなおのことデータメッシュの文脈に依らずエコシステムの作り込みをした方が良いように思えます。 またデータ処理もまた職人芸が試される領域でありあまり各ドメインチームに移譲しにくいような気もしています。 BigQuery などデータウェアハウスに格納してから SQL でなんとかする、みたいな汎用的なシナリオならいざ知らず、低遅延での処理が求められるとか重複除去したいとかリッチな要件が出てくるシナリオで各チームで対応するのが現実的なのかどうか。

また、いずれにせよ consumer のようなデータを使う側にある人々をどのようにケアするかは課題になると推測しています。 中央集権的なデータ基盤の有無に関わらず consumer が必要なデータを producer に準備してもらう枠組みは必要で、そのコミュニケーションや動機づけをどうすれば解決できるのか自分の中ではアイデアがありません。 そこを含めてデータガバナンスで頑張る!という話であるなら、まだ現実の課題に適用するまでに障壁がある気もしております。

Avro と BigQuery の load とうまく付き合いたい

Avro と BigQuery の読み込み

Apache Avro は BigQuery のデータ読み込みに対応したシリアライゼーションフォーマットであり、 Object Container Files フォーマットを採用することでスキーマが自己記述的になり読み込みに際して別途スキーマ情報を与えなくて済むメリットがあります。 また BigQuery としては Avro (を含めたいくつかの形式) では平行読み込みが可能とされ、それができない形式、たとえば gzip 圧縮された JSONL 形式などと比較して早く読み込めるようです。

cloud.google.com

加えて、実は並行読み込みが可能とされてかつ効率的な圧縮が期待できる Avro, Parquet, ORC の中にも読み込み処理において優劣があるようです。 Google BigQuery: The Definitive Guide によると "The most efficient expressive format is Avro" とあり、列志向で圧縮もかかる Avro が最も効率的であるとされています。 対して Parquet や ORC は行志向であり、これはこれで外部デーブル経由でファイルに直接クエリする分には効率的なものの、 BigQuery に読み込む際には全列読まなければならない分 Avro が有利なようです。

そんないい感じっぽい Avro の BigQuery へのデータ読み込み、本記事ではスキーマ周りについていくつか動作を確認してみようと思います。

BigQuery にさまざまなスキーマの Avro ファイルを読ませてみる

互換があるスキーマで追記する場合

まずは replace をせず単純に BigQuery のテーブルにレコードを追加して行こうと思います。 とりあえず適当に 1 カラムだけある空テーブルを作っておきます。

f:id:syu_cream:20200526231738p:plain

まずはこれにマッチする単純な Avro ファイルを作って load してみます。

$ cat user_v1.avsc
{
  "name": "User",
  "type": "record",
  "fields": [
    {
      "name": "id",
      "type": "long"
    }
  ]
}
$ java -jar ~/tools/avro-tools-1.8.2.jar random --schema-file user_v1.avsc --count 1 user_v1.avro 2>/dev/null
$ bq load --project_id syucream-dev --source_format AVRO syucream-dev:test_syucream.user user_v1.avro
Upload complete.
Waiting on bqjob_r750d0087bb28a293_00000172515bed9d_1 ... (1s) Current status: DONE
$ bq query --nouse_legacy_sql 'SELECT * FROM syucream-dev.test_syucream.user'
Waiting on bqjob_r7bab3c778b4c6a7d_00000172515c7520_1 ... (0s) Current status: DONE
+---------------------+
|         id          |
+---------------------+
| 7190660540979993749 |
+---------------------+

サクッとできました。追記もサクッとできます。

$ bq load --project_id syucream-dev --source_format AVRO syucream-dev:test_syucream.user user_v1.avro
Upload complete.
Waiting on bqjob_r26360ddf0526570_00000172515d711c_1 ... (1s) Current status: DONE
$ bq query --nouse_legacy_sql 'SELECT * FROM syucream-dev.test_syucream.user'
Waiting on bqjob_rba99937746b1712_00000172515d8f6f_1 ... (0s) Current status: DONE
+---------------------+
|         id          |
+---------------------+
| 7190660540979993749 |
| 7190660540979993749 |
+---------------------+

このスキーマと互換のあるスキーマを持つ Avro ファイルの load も問題なくできます。

$ cat user_v1_1.avsc
{
  "name": "User",
  "type": "record",
  "fields": [
    {
      "name": "id",
      "type": "long"
    },
    {
      "name": "name",
      "type": ["null", "string"],
      "default": null
    }
  ]
}
$ java -jar ~/tools/avro-tools-1.8.2.jar random --schema-file user_v1_1.avsc --count 1 user_v1_1.avro 2>/dev/null
$ bq load --project_id syucream-dev --source_format AVRO --schema_update_option ALLOW_FIELD_ADDITION syucream-dev:test_syucream.user user_v1_1.avro
Upload complete.
Waiting on bqjob_r616f9c494daf502a_00000172516056b9_1 ... (0s) Current status: DONE
$ bq query --nouse_legacy_sql 'SELECT * FROM syucream-dev.test_syucream.user'
Waiting on bqjob_r46005fd5ddeefbab_00000172516077d6_1 ... (0s) Current status: DONE
+----------------------+------+
|          id          | name |
+----------------------+------+
| -6779445778023123159 | NULL |
|  7190660540979993749 | NULL |
|  7190660540979993749 | NULL |
+----------------------+------+

BigQuery のスキーマ的には互換がある追記をする場合

今度は前述とは互換性がない、 name フィールドが nullable でなくなったスキーマを持つ Avro ファイルを load してみます。 これは成功しますが BigQuery のテーブルとしては name フィールドは nullable のままとなります。(まあ nullable から required の変更は許されていないですしね。。。)

$ cat user_v2.avsc
{
  "name": "User",
  "type": "record",
  "fields": [
    {
      "name": "id",
      "type": "long"
    },
    {
      "name": "name",
      "type": "string"
    }
  ]
}
$ java -jar ~/tools/avro-tools-1.8.2.jar random --schema-file user_v2.avsc --count 1 user_v2.avro 2>/dev/null
$ bq load --project_id syucream-dev --source_format AVRO --schema_update_option ALLOW_FIELD_ADDITION syucream-dev:test_syucream.user user_v2.avro
Upload complete.
Waiting on bqjob_r5a6eb8678879a075_0000017251620b01_1 ... (1s) Current status: DONE
$ bq query --nouse_legacy_sql 'SELECT * FROM syucream-dev.test_syucream.user'
Waiting on bqjob_r5105091904b8eed0_000001725162d619_1 ... (0s) Current status: DONE
+----------------------+-----------------+
|          id          |      name       |
+----------------------+-----------------+
|  3285309633976168209 | twbnsureievqwes |
|  7190660540979993749 | NULL            |
| -6779445778023123159 | NULL            |
|  7190660540979993749 | NULL            |
+----------------------+-----------------+

ここから逆行して最初に load した Avro ファイルを load しようとしても成功します。 これはやはり BigQuery のテーブル上では name フィールドは nullable であり、 name フィールドをそもそも持たないレコードの場合は null で埋めればいいからですね。

$ bq load --project_id syucream-dev --source_format AVRO syucream-dev:test_syucream.user user_v1.avro
Upload complete.
Waiting on bqjob_r3e8bbde894d35c3b_00000172516328ff_1 ... (0s) Current status: DONE
$ bq query --nouse_legacy_sql 'SELECT * FROM syucream-dev.test_syucream.user'
Waiting on bqjob_r7e87a10bad564007_00000172516369af_1 ... (0s) Current status: DONE
+----------------------+-----------------+
|          id          |      name       |
+----------------------+-----------------+
|  7190660540979993749 | NULL            |
|  3285309633976168209 | twbnsureievqwes |
|  7190660540979993749 | NULL            |
| -6779445778023123159 | NULL            |
|  7190660540979993749 | NULL            |
+----------------------+-----------------+

互換がない追記をする場合

今度はさらに BigQuery のテーブルとしても互換が取れないであろう変更をしてみます。 ここでは required となる age フィールドを追加してみます。 この場合、この Avro ファイル単体としては load できそうですが既存のレコードが age フィールドの値をもたないため load できません。

$ cat user_v3.avsc
{
  "name": "User",
  "type": "record",
  "fields": [
    {
      "name": "id",
      "type": "long"
    },
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "age",
      "type": "long"
    }
  ]
}
$ java -jar ~/tools/avro-tools-1.8.2.jar random --schema-file user_v3.avsc --count 1 user_v3.avro 2>/dev/null
$ bq load --project_id syucream-dev --source_format AVRO --schema_update_option ALLOW_FIELD_ADDITION syucream-dev:test_syucream.user user_v3.avro
Upload complete.
Waiting on bqjob_r6d0dccd4050d198_000001725165f2c9_1 ... (0s) Current status: DONE
BigQuery error in load operation: Error processing job 'syucream-dev:bqjob_r6d0dccd4050d198_000001725165f2c9_1': Provided Schema does not match Table
syucream-dev:test_syucream.user. Cannot add required fields to an existing schema. (field: age)

--replace する場合

BigQuery のデータ読み込みでは追記ではなくアトミックなテーブルの再生成も行えます。 この場合はテーブルとそのスキーマが作り直される都合、前述のスキーマの互換性を気にしなくてよくなります。

$ bq load --project_id syucream-dev --replace --source_format AVRO syucream-dev:test_syucream.user user_v1.avro
Upload complete.
Waiting on bqjob_r1441ed0e2b659303_000001725167e0af_1 ... (0s) Current status: DONE
$ bq query --nouse_legacy_sql 'SELECT * FROM syucream-dev.test_syucream.user'
Waiting on bqjob_r64a104d745d627a1_00000172516817e3_1 ... (0s) Current status: DONE
+---------------------+
|         id          |
+---------------------+
| 7190660540979993749 |
+---------------------+
$ bq load --project_id syucream-dev --replace --source_format AVRO syucream-dev:test_syucream.user user_v1_1.avro
Upload complete.
Waiting on bqjob_r1254a5673d1c4a55_0000017251684053_1 ... (0s) Current status: DONE
$ bq query --nouse_legacy_sql 'SELECT * FROM syucream-dev.test_syucream.user'
Waiting on bqjob_r542e92fd2eadaefe_0000017251685b41_1 ... (0s) Current status: DONE
+----------------------+------+
|          id          | name |
+----------------------+------+
| -6779445778023123159 | NULL |
+----------------------+------+
$ bq load --project_id syucream-dev --replace --source_format AVRO syucream-dev:test_syucream.user user_v2.avro
Upload complete.
Waiting on bqjob_r7f71a4c5acbb4a54_00000172516874f8_1 ... (0s) Current status: DONE
$ bq query --nouse_legacy_sql 'SELECT * FROM syucream-dev.test_syucream.user'
Waiting on bqjob_r26844a99b599657a_0000017251688ed0_1 ... (0s) Current status: DONE
+---------------------+-----------------+
|         id          |      name       |
+---------------------+-----------------+
| 3285309633976168209 | twbnsureievqwes |
+---------------------+-----------------+
$ bq load --project_id syucream-dev --replace --source_format AVRO syucream-dev:test_syucream.user user_v3.avro
Upload complete.
Waiting on bqjob_r3039d309e0eaecd0_000001725168a3bd_1 ... (1s) Current status: DONE
$ bq query --nouse_legacy_sql 'SELECT * FROM syucream-dev.test_syucream.user'
Waiting on bqjob_r3535c0ed97eae63f_000001725168c9b7_1 ... (0s) Current status: DONE
+---------------------+-----------+----------------------+
|         id          |   name    |         age          |
+---------------------+-----------+----------------------+
| 6571829868147110661 | gcypqmwby | -7543339857203188581 |
+---------------------+-----------+----------------------+

雑なまとめ

BigQuery による Avro ファイルのデータ読み込みは非常に協力で、 --replace によるテーブル更新によって楽で効果的な運用ができると思います。 BigQuery 上でスキーマがどうなるかを考えずに読み込みジョブを実行するだけで良くなるのはメリットが大きいでしょう。

ただし --replace ですべてのユースケースが叶えられるわけでもなく、巨大なデータソースから ETL を経て差分更新で同期したいだとかログをひたすら追記したい場合にはスキーマの更新問題がしばしばネックになると思われます。 ただその場合でも、BigQuery のテーブルのスキーマとして互換が取れる Avro ファイルであれば読み込み可能であることからスキーマ更新について考える負荷は減りそうです。 スキーマ更新が頻繁に発生しうるワークロードでは逆にこの特性を捉えた上でどうテーブル更新するかのワークフローを組むと良いのかもですね。

Cloud Dataflow の FlexTemplate は何者か

先月、さらりと Cloud Dataflow に FlexTemplate という新機能のベータ版がリリースされました。

cloud.google.com

残念ながらまだあまりドキュメントがなく、これを用いるとなにが嬉しいのかが掴みにくいところです。 本記事では FlexTemplate 周りを軽く試してどのような機能で何が嬉しいのか探ってゆきます。

従来のテンプレート

Cloud Dataflow はジョブの一部パラメータを実行時に置き換え可能にしたビルド済みのテンプレートを作ることができます。 また Google はこの機能を用いた公式テンプレートを何種類か用意してくれており、 GCP 上のリソースに対してコーディングなく ETL 処理をすることが可能になっています。

cloud.google.com

ちなみにこのテンプレートの機能なのですが、 FlexTemplate の登場の都合からかドキュメント上で "従来のテンプレート" という見出しになってしまったようです。。。

f:id:syu_cream:20200512002131p:plain

実行時にパラメータを置き換え可能にするには、その値を ValueProvider でラップしてあげる必要があります。 ValueProvider の実装としてテンプレートのステージング時に静的に値が決まる StaticValueProvider と実行時に決まる RuntimeValueProvider が存在しており、これによりパラメータを渡すタイミングを柔軟に選べます。 また Apache Beam の SDK 内の IO 関連ビルダークラスには ValueProvider を受け取って振る舞いを変えてくれるものが多々あります。

beam.apache.org

逆に言うとこの ValueProvider で賄えないような性質のパラメータは実行時に指定可能なパラメータとして扱えません。 これは分かりやすい部分で言うと Beam の SDK のインタフェース的に ValueProvider を受け取ってくれないような部分、もっと複雑な例で言うと内容によってパイプラインのグラフが変わるようなパラメータは扱えません(あるいは扱うのが困難)

FlexTemplate

FlexTemplate は、おそらくなのですが、与えられるパラメータに柔軟性を与えるものです。 ドキュメントだけでは正体が定かにはならないですが、 gcr.io/dataflow-templates-base/java11-template-launcher-base をベースとした Docker image を用いて Dataflow ジョブを実行するようになります。 テンプレートジョブの構築手順は「従来のテンプレート」に似ており FlexTemplate によるテンプレートの作成と、実行の 2 つのフェーズがあります。

前者では前述の Docker image を GCR に push して、かつその Docker image で解釈可能なパラメータを示した metadata.json を GCS に upload します。 Docker image の build & push は従来のテンプレートでは存在しなかった手順ですね。

https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates#creating_a_flex_template

後者では実際にパラメータを与えてジョブを実行します。

cloud.google.com

このジョブの実行時に興味深いことに、ジョブの launch 用に GCE インスタンスが開始され先程作成した Docker image が実行されることが見て取れます。 この Docker コンテナの動作がよくわからないところですが、内部的にはこのタイミングから Dataflow ジョブのステージングと実行を開始してくれるものと考えられます。

f:id:syu_cream:20200512004846p:plain

#cloud-configs
...
    ExecStart=/usr/bin/docker run -v /var/log/dataflow/template_launcher:/var/log/dataflow/template_launcher gcr.io/xxx/samples/dataflow/streaming-beam-sql --template-container-args='{"consoleLogsLocation":"gs://xxx/staging/template_launches/xxx/console_logs","environment":{"region":"us-central1","serviceAccountEmail":"xxx","stagingLocation":"gs://xxx/staging","tempLocation":"gs://xxx/tmp"},"jobId":"xxx","jobName":"streaming-beam-sql-20200512-004931","jobObjectLocation":"gs://xxx/staging/template_launches/xxx/job_object","operationResultLocation":"gs://xxx/staging/template_launches/xxx/operaton_result","parameters":{"inputSubscription":"test","outputTable":"foo:bar.baz","stagingLocation":"gs://xxx/staging","tempLocation":"gs://xxx/tmp"},"projectId":"xxx"}'
...

さてこの FlexTemplate で何が嬉しいのかというと、大きな差異として実行時パラメータを ValueProvider で包む必要が無くなった点があります。 FlexTemplate を利用したジョブのサンプルがいくつか公開されているのですが、この内の PipelineOptions の getInputSubscription() など、実行時に指定可能なパラメータでありつつも ValueProvider 型で扱われていないことが見てとれます。

github.com

これにより、従来のテンプレートでは課題になっていた ValueProvider を受け取ってくれないような値を実行時に変更したいだとか、パイプラインの構造が変わるようなパラメータを渡したいような部分に効果を発揮するものと思われます。 とは言ってもデメリットもあり、ジョブが Queue に入ってから Running の状態になるまで数分待たされる(ステージングのタイミングが遅延したのなら妥当に思えるが)だとか、今のところ Streaming Engine や FlexRS のサポートがないとか課題が存在します。

終わりに

軽く触れてみた限り FlexTemplate はシンプルな仕組みながら Dataflow のテンプレートに相当な柔軟性を持たせることができそうです。 FlexTemplate にどこまでを期待して、どのように従来のテンプレートと棲み分けすべきかのか(上位互換の位置付けなのか?)、今後のサポート具合など気になるところですね。

Schema Registry について書いていく: Confluent Schema Registry の Protocol Buffers & JSON Schema サポート

先日リリースされた Confluent Platform 5.5 より Protocol BuffersJSON Schema のサポートが入ったようです。

www.confluent.io

以前 5.4 を対象に、 Schema Registry を中心に色々記載してみましたが、今回は 5.5 で入ったこの差分を追跡してみます。

syucream.hatenablog.jp

Confluent Control Center 上でスキーマを管理する

定番の Confluent Control Center を使って、ブラウザ上で Schema Registry の設定周りを試してみましょう。 試しに protobuf でスキーマを登録してみた状態が以下のとおりです。 protobuf において .proto ファイルに記述していくスキーマを登録できるようになっています。

f:id:syu_cream:20200502220418p:plain

JSON Schema の場合は例えば以下のようになります。

f:id:syu_cream:20200502220857p:plain

互換性チェックの挙動を試してみる

これだけでは面白くないので protobuf のスキーマを更新していってみます。 まず uint32 age = 3; フィールド追加を行ってみます。フィールド追加は特に互換性を壊しません。というわけでスキーマの更新も無事に行なえます。

syntax = "proto3";
message value_protobuf {
  uint64 id = 1;
  string name = 2;
  uint32 age = 3;
}

今度は field number を変更してみます。早速さきほど追加したフィールドを uint32 age = 42; に書き換えてみました。 この変更もリスクはありそうですが、スキーマ更新は受け入れられます。

syntax = "proto3";
message value_protobuf {
  uint64 id = 1;
  string name = 2;
  uint32 age = 42;
}

さらにフィールドの削除やフィールド名の変更も受け入れられます。

syntax = "proto3";
message value_protobuf {
  uint64 id = 1;
  string fullname = 2;
}

これらの変更は互換性を壊すものではないのでしょうか? Confluent Schema Registry での protobuf の互換性チェックの仕様は以下の通りになっています。

docs.confluent.io

おおむね protobuf の wire format の解釈に影響を及ぼさず、デシリアライズは無事行えるような内容であれば互換性が維持されると見るようです。 フィールド名の変更に対する言及は明記されていませんが、これも wire format の解釈に影響しないからだと思われます。 ・・・ただ実際にはデシリアライズには成功するとしても後続のデータ処理が影響を受けないかどうかは保証されないでしょうし注意が必要です。 フィールドの追加や削除は予期せぬ値のデフォルト値化、フィールド名のリネームはフィールド名を意識した後続処理で問題が発生するかも知れません。

ここから明確に互換性が無い変更も行ってみます。このフィールドの型 uint32string にしてみます。 この場合は予想通り互換性が無いとされてスキーマ更新が拒否されます。

f:id:syu_cream:20200502222110p:plain

// 筆者はこの他にも JSON Schema のスキーマ変更も試してみたのですが、互換性チェックの挙動が読めず諦めてしまいました。

protobuf の扱い周りの実装を読んで見る

5.5 でサポートされたデータフォーマットのバリエーションと各フォーマットの扱いはどうなっているのでしょう。 せっかくなので前節で触れた protobuf 周りの実装を追ってみます。

protobuf 対応した Serializer/Deserializer

ドキュメントから、データフォーマット毎に Serializer, Deserializer の実装が異なるようです。まずはここをエントリポイントとしてみます。

docs.confluent.io

Avro と同様 AbstractKafkaProtobufSerializer, AbstractKafkaProtobufDeserializer に、 protobuf のデータである Message のサブクラスのオブジェクトのシリアライズ・デシリアライズ処理と schema registry とのやり取りの処理が実装されています。 ここで protobuf のスキーマ情報は公式の Descriptor としてではなくラップされた型として扱われます。また protobuf は import 文で他の .proto ファイルを読み込むことができるのですが、この依存関係を解決するロジックも持ち合わせています。

面白い点として Serializer は protobuf の、 protoc で生成された具体的なメッセージクラス (Avro でいう SpecificRecord みたいなもの)を扱い、 Deserializer では DynamicMessage (Avro でいう GenericRecord みたいなもの) を扱うことです。 producer は .proto の記述に従ったメッセージクラスが使えますが、それを consumer が持っている保証はないため妥当な扱いだと思われます。

複数データフォーマットをサポートしたスキーマの表現

5.4 では Schema Registry として考慮していたスキーマが Avro しかありえなかったので、 Schema Registry としては Avro の Schema クラスを扱うような実装になっています。 これが 5.5 では ParsedSchema インタフェースという一段抽象化された構造で扱われます。 protobuf のスキーマを表現する ProtobufSchema、 Avro の AvroSchema, JSON Schema の JsonSchema はそれぞれ ParsedSchema を実装しています。 個々の ParsedSchema 実装の中でスキーマ固有の処理、互換性チェックや protobuf で言う descriptor の扱いなどを担います。

おわりに

簡潔に追っただけですが、妥当な進化を遂げたような Confluent Platform 5.5 でした。 protobuf はコード事前生成を想定した利用シーンが多いですが、 Confluent Platform でのサポートなど利用シーンが増えてくると活用できる機会がさらに増えるかも知れません。 また今回複数データフォーマットのサポートが入った以外にも ksqldb という KSQL の進化系?のような機構が増えて Confluent Platform も進化を続けている印象を受けますね。

protobuf のシリアライズ済みバイナリを無理やり読む

Protocol Buffer wire format について

Protocol Buffer でシリアライズされた後のバイナリのレイアウトの仕様は wire format の仕様という形で独立してドキュメントが用意されています。 この wire format の仕様は見ればわかる通りそれほど記述量が多くなく、それでいて互換性を気にしつつ、かつ拡張の邪魔にならないような配慮がされています。

developers.google.com

wire format として特に重要な点は以下の通りだと個人的には考えます。

  • 整数型に対するデータサイズがなるべく小さくなるような工夫がされている
  • 複雑な型はバイト配列に押し込める。(多くのシリアライゼーションフォーマットと同じように)長さが指定されたバイト配列として表現
  • フィールドの順序が任意にできる。のでスキーマ更新において順序を意識しなくてよくなる。

wire format では各フィールドはフィールド番号と wire format 上での型で識別されます。 これらを元に、 .proto ファイルでどう記述されていたのかに対応してより具体的な型として解釈したりフィールドの名前を取得したりできるわけです。 例えば field number が 10 、 wire type が 2(バイト配列) であった場合に、 .proto では string name = 10; という対応する field number があるならフィールド名は name でありバイト配列は文字列として解釈できそうだと判断できます。

とはいえシリアライズ済みバイナリを頻繁に扱ってくると、ましてや複数のシステムでデータ交換をし出すと時には .proto や descriptor 、ライブラリなしに内容を確認したい時があるかも知れません。 特にデータ交換を行う場合システム間で持つスキーマが異なる場合にはこの希望は大きくなるかもです。 またパースエラーが起こる箇所を絞り込みたいなどの要望から、スキーマを用いた具体的なデータの解釈をなるべく遅らせたい場合にこの戦略は有効かも知れません。 そんなことを考え、 Go で無理やりシリアライズ済みバイナリを、それ単体の持つ情報だけで解釈してみる試みをしました。

Protocol Buffer without schema

というわけで Go で schema less で protobuf のデータを無理やり解釈するコードを書いてみました。

github.com

wire format のフィールドの解釈は protowire という既存のモジュールがあり、詳細な部分は概ねこれで行えます。 wire format の解釈の上で面倒くさい点は可変長になりうる varint の値や ZigZag encoding されうる signed integer あたりですがこのあたりは実装されてくれています。

godoc.org

バイト列の詳細な解釈は descriptor の情報が無いと正確な値は取れません。 ここでは雑に、思いつく範囲の解釈を全部試して wire format としては valid な値をすべて返すようにしています。 具体的には文字列とバイト列、ネストされたメッセージ、整数型の repeated な値などです。 そのほかにも Protocol Buffer としてはバイト列で map 型が表現できるのですが、これについては wire format やその他のドキュメントに詳細な仕様がなく(うまく見つけてなくてどこかに存在するかも?)一旦諦めています。 Go の実装的にはバイト列の中に更にネストされたメッセージに似てフィールド番号や wire type が指定されたフィールド列が存在して、フィールド番号が 1 であれば map の key 、 2 であれば value となるようです。

github.com

これを用いて以下のような .proto のメッセージを

syntax = "proto3";

message Example {
    enum Num {
        ZERO = 0;
        ONE = 1;
    }

    uint64 uint64_val = 1;
    string string_val = 2;
    fixed64 fixed64_val = 3;
    fixed32 fixed32_val = 4;
    Num enum_val = 5;
    Child child_val = 6;

    repeated uint64 r_uint64_val = 101;
    repeated string r_string_val = 102;
    repeated fixed64 r_fixed64_val = 103;
    repeated fixed32 r_fixed32_val = 104;
    repeated Num r_enum_val = 105;
    repeated Child r_child_val = 106;
}

message Child {
    uint64 v = 1;
}

こんな様な値を設定してシリアライズした時の

               msg := &protosl.Example{
                    Uint64Val:  1,
                    StringVal:  "testing",
                    Fixed64Val: 11,
                    Fixed32Val: 111,
                    EnumVal:    protosl.Example_ONE,
                    ChildVal: &protosl.Child{
                        V: 1,
                    },
                    RUint64Val: []uint64{2, 3},
                    RStringVal: []string{"aaa", "bbb"},
                    // RFixed64Val: []uint64{22, 33}, TODO repeated fixed isn't supported
                    // RFixed32Val: []uint32{222, 333}, TODO repeated fixed isn't supported
                    REnumVal: []protosl.Example_Num{protosl.Example_ZERO, protosl.Example_ONE},
                    RChildVal: []*protosl.Child{
                        {
                            V: 2,
                        },
                        {
                            V: 3,
                        },
                    },
                }

シリアライズ後のバイナリを食わせると、かなり冗長にはなりますが以下のように解釈できます。

$ echo -n "\x08\x01\x12\x07\x74\x65\x73\x74\x69\x6e\x67\x19\x0b\x00\x00\x00\x00\x00\x00\x00\x25\x6f\x00\x00\x00\x28\x01\x32\x02\x08\x01\xaa\x06\x02\x02\x03\xb2\x06\x03\x61\x61\x61\xb2\x06\x03\x62\x62\x62\xca\x06\x02\x00\x01\xd2\x06\x02\x08\x02\xd2\x06\x02\x08\x03" | protosl
{"1":1,"101":{"__bytes":"AgM=","__packed":[2,3],"__string":"\u0002\u0003"},"102":[{"__bytes":"YWFh","__packed":[97,97,97],"__string":"aaa"},{"__bytes":"YmJi","__packed":[98,98,98],"__string":"bbb"}],"105":{"__bytes":"AAE=","__packed":[0,1],"__string":"\u0000\u0001"},"106":[{"__bytes":"CAI=","__message":{"1":2},"__packed":[8,2],"__string":"\u0008\u0002"},{"__bytes":"CAM=","__message":{"1":3},"__packed":[8,3],"__string":"\u0008\u0003"}],"2":{"__bytes":"dGVzdGluZw==","__packed":[116,101,115,116,105,110,103],"__string":"testing"},"3":11,"4":111,"5":1,"6":{"__bytes":"CAE=","__message":{"1":1},"__packed":[8,1],"__string":"\u0008\u0001"}}

バイナリを直接確認できると何かしら便利、フィールドとの対応付けは後で行うので・・・という時に役のに立つかも知れません。