ブログ・ア・ラ・クレーム

技術的なメモとかライフログとか。

embulk-input-datastore を作った

TL;DR

Cloud Datastore からデータをぶっこ抜いてくるための embulk input plugin を作りました。

github.com

普通に embulk gem install embulk-input-datastore した上で input plugin としてご利用ください。 参照するエンティティ、プロパティを絞り込むために GQL を指定可能にしているのでご利用ください。 (逆にそれ以外の参照方法をサポートしていません。。)

in:
  type: datastore
  project_id: "your-gcppj-123"
  json_keyfile: credential.json
  gql: "SELECT * FROM myKind"
...

細かい話

実装言語

Kotlin で実装しています。その理由としては Java よりシンプルに記述可能で、かつそれなりに事例があるので採用してみた次第です。

qiita.com

テストは Spek を用いて RSpec 的なシンタックスでテストコードを記述し、アサーションライブラリとして kotlin-test を使うようにしました。 大した記述量では無いのですが、まぁまぁ様になっている気がします。

embulk の input plugin としての挙動

Cloud Datastore はいわゆる NoSQL データベースでありスキーマが固定されているわけではありません。 従ってエンティティによって同じプロパティ名で違う型のプロパティが存在したり、値がそもそも存在しなかったりします。

これらを embulk の(というか MessagePack の?)型にマッピングするのが面倒くさく、あまり良い対応方法も思いつかなかったので embulk-input-mongodb と同様単一の JSON フィールドに結果をまとめて出力するようにしました。 input plugin として値の読み出しは行うが、変換処理を行う場合は別の filter plugin などでなんとかせい、というスタンスです。

gradle-embulk-plugin の利用

gradle-embulk-plugin を便利に使わせていただきました!主に初期のテンプレートコード生成や gradle embulk_run による動作確認などでお世話になりました。 これから gradle でビルドしつつ embulk plugin 作るような方にはオススメしたいところです。

動作例

一応載っけてみます。 以下のような myKind カインドに対するエンティティたちが存在する場合に

f:id:syu_cream:20180626000814p:plain

こんな感じの設定ファイルを用意して

in:
  type: datastore
  project_id: "<my-project-id>"
  json_keyfile: credential.json
  gql: "SELECT * FROM myKind WHERE myProp >= 100 AND myProp < 200"

out:
  type: stdout

実行することで以下のような出力が得られました。

$ embulk run examples/datastore2stdout.yaml
2018-06-26 00:11:05.173 +0900: Embulk v0.9.7
2018-06-26 00:11:07.219 +0900 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2018-06-26 00:11:14.783 +0900 [INFO] (main): Gem's home and path are set by default: "/Users/ryo/.embulk/lib/gems"
2018-06-26 00:11:17.364 +0900 [INFO] (main): Started Embulk v0.9.7
2018-06-26 00:11:17.639 +0900 [INFO] (0001:transaction): Loaded plugin embulk-input-datastore (0.1.1)
2018-06-26 00:11:17.743 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4
2018-06-26 00:11:17.784 +0900 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
{"extra":{"defaultProperty":"hogefuga"}, "myProp":100}
{"myProp":150}
2018-06-26 00:11:19.708 +0900 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
2018-06-26 00:11:19.714 +0900 [INFO] (main): Committed.
2018-06-26 00:11:19.715 +0900 [INFO] (main): Next config diff: {"in":{},"out":{}}

おわりに

なんとなく動くところまで持っていけたので記事にしました。 Cloud Firestore も同じ要領で plugin 作れたりするのかなとぼんやり考えたりしました。