ブログ・ア・ラ・クレーム

技術的なメモとかライフログとか。

POSIX message queue を Go のコードから利用するためのライブラリ posix_mq を作った

表題の通りです。

github.com

cgo を使って POSIX message queue の基本的な操作、 open/close と send/receive とその他細々とした機能を実装しています。 とは言っても、それほど複雑なことはしておらず、 POSIX の関数呼び出しを愚直に Go の func にラップしているだけなのですけどね。

なぜやったのか

入門 Kubernetes などを読むと、 Pod 内のコンテナ同士では SysV / POSIX の IPC namespace を共有している記述があります。

www.oreilly.co.jp

Pod 内の別コンテナへの通信となると、 sidecar パターンで Envoy を動かすようなネットワーキング用プロキシを介するのに利用したり fluentd などのロギングエージェントにログを送ったり、お決まりのパターンがあると思います。 そういった際に低コストで非言語依存なプロトコルが欲しくなることが多々あるように考えられます。

POSIX message queue は POSIX としての仕様も存在し、低コストな IPC 手段のひとつです。 これを選択肢のひとつとして用意しておくことは発生する課題に柔軟に対応するのに重要であると考えます。 手段は他にもあるし、同様の機能であれば SySV message queue やいっそ AMQP などをしゃべっても良い気もするのですが、手段を増やす意味でも今回のライブラリを開発してみた次第です。

動作例

sender / receiver の通信例

シンプルなsender と receiver であれば以下のコードで実装してやり取りできます。

  • sender.go
package main

import (
    "fmt"
    "log"
    "time"

    "github.com/syucream/posix_mq/src/posix_mq"
)

const maxTickNum = 10

func main() {
    oflag := posix_mq.O_WRONLY | posix_mq.O_CREAT
    mq, err := posix_mq.NewMessageQueue("/posix_mq_example", oflag, 0666, nil)
    if err != nil {
        log.Fatal(err)
    }
    defer mq.Close()

    count := 0
    for {
        count++
        mq.Send([]byte(fmt.Sprintf("Hello, World : %d\n", count)), 0)
        fmt.Println("Sent a new message")

        if count >= maxTickNum {
            break
        }

        time.Sleep(1 * time.Second)
    }
}
  • receiver.go
package main

import (
    "fmt"
    "log"

    "github.com/syucream/posix_mq/src/posix_mq"
)

const maxTickNum = 10

func main() {
    oflag := posix_mq.O_RDONLY
    mq, err := posix_mq.NewMessageQueue("/posix_mq_example", oflag, 0666, nil)
    if err != nil {
        log.Fatal(err)
    }
    defer mq.Close()

    fmt.Println("Start receiving messages")

    count := 0
    for {
        count++

        msg, _, err := mq.Receive()
        if err != nil {
            log.Fatal(err)
        }
        fmt.Printf(string(msg))

        if count >= maxTickNum {
            break
        }
    }
}

Kubernetes の同一 Pod 上 container 通信例

折角なので上記の sender / receiver を Kubernetes の Pod に押し込んで通信させてみます。 あらかじめてきとうに sender / receiver 用の Docker イメージを作っておいてください。

Pod の定義なのですが、愚直に container の設定を羅列していくだけです。 IPC namespace は勝手に共有されるのでそれに関する設定や準備は必要ありません。

apiVersion: v1
kind: Pod
metadata:
  name: posixmq-pod
spec:
  containers:
    - name: posixmq-sender
      image: "posix_mq_sender"
      imagePullPolicy: IfNotPresent
    - name: posixmq-receiver
      image: "posix_mq_receiver"
      imagePullPolicy: IfNotPresent
  restartPolicy: Never

Pod の動作確認をさくっとしてみましょう。

$ kubectl apply -f example/kubernetes/pod-posixmq.yaml
pod "posixmq-pod" created
...
$ kubectl logs posixmq-pod -c posixmq-sender
go run example/exec/sender.go
Sent a new message
Sent a new message
Sent a new message
Sent a new message
Sent a new message
Sent a new message
Sent a new message
Sent a new message
Sent a new message
Sent a new message
$ kubectl logs posixmq-pod -c posixmq-receiver
go run example/exec/receiver.go
Start receiving messages
Hello, World : 1
Hello, World : 2
Hello, World : 3
Hello, World : 4
Hello, World : 5
Hello, World : 6
Hello, World : 7
Hello, World : 8
Hello, World : 9
Hello, World : 10

この出力結果を見るに、 sender の送ったメッセージがちゃんと receiver に届いていそうです!

余談

POSIX の機能となると可搬性を期待してしまいますが、 POSIX message queue は darwinwindows では実装されていなかったりと意外に可搬性に欠けます。 対して SysV の message queue はこれに比べて可搬性が高く、より多くの環境でサポートされています。 (このあたりは Linuxプログラミングインタフェース にも記述されていますね!)

www.oreilly.co.jp

とは言っても本記事で書くようにあらかじめ環境が定められている Kubernetes クラスタ上で動かす場合は、それほど気にすることでも無いのかもしれません。 また、 SysV message queue の Go ラッパーライブラリは Shopify により実装されているのでこれを試すのもアリかもです。

github.com

ちなみに少し前のベンチマーク内容ですが、 POSIX message queue は IPC の手段として結構高パフォーマンスであるような調査結果もあります。

www.programering.com

技術書典5にてマイクロサービスとEnvoy、暗号通貨についての薄い本を配布します

明日 10/08 (月) は技術書典5 の日ですね!

techbookfest.org

当サークル「まいにちがきんようび。」もサークル参加して、新刊を配布する予定です!(既刊の配布予定はありません)

techbookfest.org

内容としては、マイクロサービスと Envoy Proxy を試してみた結果からの紹介記事をメインに、前回記事執筆者による bitcoin の仕様の闇の記事を付録に添えたものになります。

具体的には以下のような内容になります(目次から抜粋)

第1章 Envoy Proxy 入門
1.1 はじめに
1.2 マイクロサービスアーキテクチャ概要
1.2.1 モノリシックアーキテクチャ
1.2.2 マイクロサービスアーキテクチャ
1.3 マイクロサービス、そして Envoy と Istio
1.3.1 Envoy とは
1.3.2 Istio とは
1.4 Envoy 詳解
1.4.1 Envoy アーキテクチャ概要
1.4.2 Envoy のリソース抽象化
1.4.3 Envoy の特徴的な機能説明
1.4.4 nginx など従来のプロキシと何が違うのかについて
1.5 Envoy の試し方
1.5.1 Docker image をとりあえず動かす
1.5.2 複雑な構成を試してみる
1.6 おまけ
1.6.1 Istio における Envoy の組み込まれ方
1.6.2 Envoy ソースコードリーティング
1.7 まとめ
付録A 私が暗号通貨を嫌いになったわけ
A.1 はじめに
A.2 前提知識
A.2.1 トランザクションの構造と所有権の移転履歴
A.2.2 TXID とトランザクションデータ
A.2.3 コインベーストランザクションとマイニング報酬
A.2.4 マークルツリーとマークルルート
A.2.5 マイニングと Nonce
A.2.6 コインベーストランザクションとエクストラ Nonce
A.2.7 TXID の衝突確率と鳩ノ巣原理とバースデイパラドックス
A.3 TXID の衝突事例と BIP-30
A.4 BIP-30 から BIP-34 へ
A.5 BIP-34 以前のトランザクションとの衝突問題
A.6 BIP-30,34 と各種アルトコイン
A.7 実際の衝突発生確率
A.8 おわりに
あとがき

ぜひ会場でお目に止まるようであれば、手にとってみていただけると幸いです!

また当日会場に来ない、来れない、あるいは僕にとっては運良く完売してしまって購入できなくなったという時のために kindle 版も用意しております。 こちらも合わせてご検討いただければ幸いです。

https://www.amazon.co.jp/dp/B07HYC1HLN

Netflix のデータパイプラインを読み解きたい

Netflix はマイクロサービスアーキテクチャ界においてプロダクションで成功例を積んでいる、いわば大先輩だと思われます。 彼らは数多くのイベント登壇や techblog の記事、 GitHub 上による OSS の公開を行っており、それらからアーキテクチャやその変遷を垣間見ることができると考えています。

本記事では筆者が最近悩んでいる、マイクロサービス前提の世界でのログ収集基盤において、 Netflix の様々な事例を調べた結果をつらつら書いていこうと思います。 あらかじめ本記事は正確性を担保しておらず、あくまで筆者個人が調べることができた範囲での記述に留まることをお断りさせていただきます。

Suro: 分散データパイプライン

2015 年くらいにメンテが止まってしまったのですが、分散データパイプラインをうたう Suro というソフトウェアが存在しました。

Suro に関しては解説記事も書かれていて、イベントログを集約し後続の S3 、 Kafka 、その他イベント処理システムに転送する役割を担っていたようです。 またバッチとリアルタイム、両方の性質を持つデータを受け付けていたようです。

medium.com

Suro がメンテされなくなった経緯は筆者もよく分かっていませんが、後続のデータパイプラインの仕組みを見るにバッチとリアルタイムの両方の要望に答えるのが辛かったのか、あるいはデータの分析基盤の変更に合わせて作り直す決定をしたものかと推測しています。

Cassandra のデータ転送のバッチレイヤーと Ursula によるスピードレイヤー

Suro と入れ替える形かどうそうでないかは定かで無いですが、以下の資料によると Lambda Architecture に似たバッチレイヤーとスピードレイヤーの分離を行っているのが見て取れます。

https://qconsf.com/sf2016/system/files/presentation-slides/netflix-cloud_analytics.pdf

バッチレイヤーでは、 Netflix で広く使われている Cassandra をベースに SSTable をダンプして、それを aegisthus という Cassandra 向けバルクデータパイプラインソフトウェアを使って、 Netflix がデータウェアハウスに使っている S3 に転送していたようです。 また SSTable のダンプは上記資料では明示されていないのですが、 Priam というツールが公開されておりこれを使っているのではないかと推測しています。

ちなみにこの aegisthus も、 2017 年にメンテナンスモードに移行し今ではコミットされていないようです。 最後のコミットに対してされたコメント を見るに、 Cassandra のデータのデータウェアハウスへの転送は今後 Spark のジョブに移行することを検討しており、 2018 年 8 月時点ではまだ移行途中?でかつ新しいツールは OSS にはなっていないようです。

スピードレイヤーを支える Ursula というソフトウェアの存在も上記スライドでは触れられていますが、こちらは OSS になっている気配はありませんでした。

データパイプライン周辺技術

パイプライン以外にも、 Netflixビッグデータ処理ジョブのオーケストレーションツールである genie や、 メタデータ管理ソフトウェアである metacat など多くの小道具を揃えているようです。

余談

上記のうち Suro や aegisthus については、 "マイクロサービスアーキテクチャ" にも記述があり参考になる部分があるかも知れません。

www.oreilly.co.jp

ここで挙げたツールは Netflix が Cassandra を広く採用していることや、大規模なデータを持つがゆえの様々な課題、背景などもあって作ったのだと考えられます。 これらがそのままマッチして利用できる企業は多いわけではないと思われますが、これらの変遷や構成などが読み解けるものがあるのかも、などと思いました(小並感)

追記

今は Kafka を結構活用しているのかも。

株式会社メルカリに入社して1年が経過した

 ちょうど一年前の 2017年8月16日に株式会社メルカリに入社しました。キリがいいこともあり、ここらで個人的な振り返りをつらつら書いてみます。あらかじめ、はっきりいって個人の日記レベルの内容であることをお断りさせていただきます。

転職の経緯から入社まで

 2017 年 1 月頃から転職を考え初めていました。元の所属も Web サービスをなりわいとしており、それなりの愛着もあり良い経験をさせて頂いてはいたのですが、自分の能力向上を目指しつつより早いペースでプロダクトを提供していく体験を積みたく粛々と準備をしていました。 そんな中メルカリは、急成長しているサービスを提供しており外部への発信も多く、また所属エンジニアも強力な方々が多く自分にとっても良い経験になると踏んだため応募した次第でした。

 応募に際して僕の場合は特に知り合いのツテやエージェントを通さずに、採用ページの募集フォームにダイレクトアタックを仕掛けて選考に進みました。敢えて反省点をひねり出すならば、何とか所属するエンジニアにアプローチを仕掛けて雰囲気などを確かめるとより良かった気はします。採用ページや各種アウトプットからは見えない現場での葛藤やそこに根ざす文化を確認するのは、重要なことでしょうし。

入社後から今まで

 2017 年内はメルカリ SRE チームに所属して、ミドルウェア開発マンとして主にデータ収集基盤とその他細々としたコンポーネントの開発に携わっていました。 その後は 2018 年 2 月頃?(うろ覚え)に子会社メルペイに籍を移し、一部マイクロサービスの開発や、メルペイのためのデータ収集基盤を、マイクロサービスアーキテクチャに従った世界でのプロダクトでの課題も吸収しつつ考えるという仕事に従事し始めました。

 この間に起こった大きな変化として、個人的には前職が基本オンプレ前提のインフラ構成であったところからクラウドのリソースを多用する環境になったことと、会社的にはマイクロサービス化へ舵を切ったことです。 特に後者は今でも悩ましい事がいくつもあり、刺激の多い日々を過ごせているように感じています。

反省点と今後の目標

 ここ一年多くの刺激を受けつつ過ごせましたが、まだまだ自分が納得できるほど大きな成功に到達していないとも思えます。 元々、不確定な世界の中こつこつと少しずつ成果を積み上げ模索するのが好きであり得意であると考えているのですが、そのスタイルでは到達しにくい領域についても考えるべきかもという危機感も持ってきています。 そういった現状の自身の不足しているスキルやできる領域、やれる領域を明確にしていくのが今後の課題なのかなぁ、などと考えています。

おまけ: 仕事以外のここ一年の活動

ngx_mruby のノンブロッキング sleep

 メルカリ SRE チームに所属した際、そういえばちゃんと nginx を理解していないなと危機感を覚えつつ、勉強も兼ねて mruby スクリプトを動かすためのモジュール ngx_mruby にノンブロッキング sleep する仕組みを追加するパッチを書いてみていました。 このパッチは結局幾つかの問題にハマり、自分だけでは解決できず半ば放置していたのですが、 matsumotory さんはじめ GMO ペパボさんの方々に拾っていただき、 RubyKaigi 2018 で紹介されるまでに至りました。 ・・・諦めて単純に放置するでなく、もう少し能動的に相談しに行ったりとかすべきだったかも、など様々な反省があります。

rubykaigi.org

Go でいくつかツールを書いた

 すでに記事と化したものもあるのですが、 Go でいくつかツールを書きました。

syucream.hatenablog.jp

syucream.hatenablog.jp

自分の業務に直結するスキルが得られる以外にも、やっぱ何も考えなくてもシングルバイナリで使えるとツールとしては便利だと考えたため、ひたすら Go で書いてます。

で、

誰?

Cloud Spanner の DDL parser と DDL 変換ツールを作った

Cloud SpannerDDL parser の Go 実装と、 Spanner DDLMySQL DDL っぽいものに変換するツールを作りました。 本記事はこれらの紹介になります。

github.com

github.com

spar: Cloud Spanner DDL parser in Go

Spanner の DDLこんな感じ で、 SQL 方言に見えるもののカラムの型だの INTERLEAVE だの、仕様に対して独自の文法も結構な割合で持っているように見えます。 今回作った spar, DDL parser はこれらをパースして CREATE 文や ALTER 文などの構成要素をまとめて返してくれます。

リポジトリに同梱した DDL syntax checker のコードがシンプルな使用例としても有用です。 parser package の持つ Parse() に読みたい DDL の Reader を渡すとパースした結果を返してくれます。

package main

import (
    "io/ioutil"
    "log"
    "os"
    "strings"

    "github.com/syucream/spar/src/parser"
)

func main() {
    data, err := ioutil.ReadAll(os.Stdin)
    if err != nil {
        log.Fatal(err)
    }

    _, err = parser.Parse(strings.NewReader(string(data)))
    if err != nil {
        log.Fatal(err)
    }
}

パーサー部分は goyacc を使って生成しています。 また lexer 部分の一部は https://github.com/benbjohnson/sql-parser を参考にしています。

jackup: Jack up your DDL and translate between MySQL and Spanner

スパナときたら今度は別の工具も欲しくなってくると思われます。ということでジャッキを作りました。 jackup は標準入力か -f オプションで渡したパスのファイルの DDL を参照し、 spar を使ってパースした後、(現在は CREATE TABLE , CREATE INDEX しか読んでくれませんが) MySQL の同じような構造を持つ DDL を出力してくれます。 残念ながら一部の変換、特にカラムの長さに関する制限が Spanner のほうがゆるい部分があり、無理やり変換したり変換せず無視したりもしています。

このツールは Spanner を捨てて MySQL に以降したりなどクリティカルなユースケースに対応するつもりはなく、もう少しゆるく変換したらどうなりそうか確認したり、 MySQL 周りのエコシステム、例えば MySQL Workbench による ER 図生成に食わせたりしてみるために作ってます。 なお、このあたりの SQL 方言とその間の相互変換は、現状良いソリューションが無く今回自分は自前で実装したのですが、今後は Apache Calcite に淡い期待を寄せてみてもいいのかなと思っています。

Apache Calcite • Dynamic data management framework

技術書典4で配布した同人誌の原稿データを GitHub で公開しました

以下の記事でご紹介した、弊サークルの技術書典4の新刊を GitHub で public repo として公開しました。

syucream.hatenablog.jp

リポジトリはこちらになります。

github.com

ビルド済み pdf は無く、またその他配布した原稿にしか含まれないコンテンツなどあるかもしれませんが、ご了承ください。

技術書典4振り返り

いい機会なので振り返りをつらつら書きます

執筆環境について

  • Re:VIEW で原稿を記述
  • CircleCI で textlint 実行と pdf 自動ビルドするようにした
    • 入稿直前になって pdf がコレジャナイ感出るのマジ勘弁!なので
    • base の Docker Image に vvakame/review を使わせていただきました
  • 細かなレビューは GitHub の pullreq or Issue ベースで実施

進捗管理について

  • GitHub Issues の Milestone で、いくつかマイルストーンとその締切を設定
    • ゆるめの締切にした
    • 最初から早割入稿前提で管理してた。追い込み記事にこのバッファがあることで少し精神的に助かった
    • あまり予定通りにいかなかったのは大きな反省点
  • 多人数での執筆管理の経験が今回で初めてだったので、学びが多かった

本のテーマについて

  • テーマを 1 つに絞るか、開き直って雑誌形式にすべきだった
    • テーマが 2 つあるせいで、そのテーマ間に関連があるものだと誤解させてしまった

その他

  • メンバーのモチベーションが途中で低減してしまった
    • モチベ管理まですべきかは悩みどころ。同人誌であるなら、個々人が自分の記事の品質に納得が行っているならまぁいいのかな?
  • 表紙絵の担当が決まったのが後半だった
    • 早めに調整!!

そして技術書典5へ〜

弊サークル「まいにちがきんようび。」は技術書典5にも参加します!たぶんマイクロサービス周辺技術について執筆陣が好きな要素をひろってまとめた新刊を出すことになるかと思います(マイクロサービス関連以外にも、オマケとして記事を書くかもです) 引き続きよろしくお願いいたします!!

embulk-input-datastore を作った

TL;DR

Cloud Datastore からデータをぶっこ抜いてくるための embulk input plugin を作りました。

github.com

普通に embulk gem install embulk-input-datastore した上で input plugin としてご利用ください。 参照するエンティティ、プロパティを絞り込むために GQL を指定可能にしているのでご利用ください。 (逆にそれ以外の参照方法をサポートしていません。。)

in:
  type: datastore
  project_id: "your-gcppj-123"
  json_keyfile: credential.json
  gql: "SELECT * FROM myKind"
...

細かい話

実装言語

Kotlin で実装しています。その理由としては Java よりシンプルに記述可能で、かつそれなりに事例があるので採用してみた次第です。

qiita.com

テストは Spek を用いて RSpec 的なシンタックスでテストコードを記述し、アサーションライブラリとして kotlin-test を使うようにしました。 大した記述量では無いのですが、まぁまぁ様になっている気がします。

embulk の input plugin としての挙動

Cloud Datastore はいわゆる NoSQL データベースでありスキーマが固定されているわけではありません。 従ってエンティティによって同じプロパティ名で違う型のプロパティが存在したり、値がそもそも存在しなかったりします。

これらを embulk の(というか MessagePack の?)型にマッピングするのが面倒くさく、あまり良い対応方法も思いつかなかったので embulk-input-mongodb と同様単一の JSON フィールドに結果をまとめて出力するようにしました。 input plugin として値の読み出しは行うが、変換処理を行う場合は別の filter plugin などでなんとかせい、というスタンスです。

gradle-embulk-plugin の利用

gradle-embulk-plugin を便利に使わせていただきました!主に初期のテンプレートコード生成や gradle embulk_run による動作確認などでお世話になりました。 これから gradle でビルドしつつ embulk plugin 作るような方にはオススメしたいところです。

動作例

一応載っけてみます。 以下のような myKind カインドに対するエンティティたちが存在する場合に

f:id:syu_cream:20180626000814p:plain

こんな感じの設定ファイルを用意して

in:
  type: datastore
  project_id: "<my-project-id>"
  json_keyfile: credential.json
  gql: "SELECT * FROM myKind WHERE myProp >= 100 AND myProp < 200"

out:
  type: stdout

実行することで以下のような出力が得られました。

$ embulk run examples/datastore2stdout.yaml
2018-06-26 00:11:05.173 +0900: Embulk v0.9.7
2018-06-26 00:11:07.219 +0900 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2018-06-26 00:11:14.783 +0900 [INFO] (main): Gem's home and path are set by default: "/Users/ryo/.embulk/lib/gems"
2018-06-26 00:11:17.364 +0900 [INFO] (main): Started Embulk v0.9.7
2018-06-26 00:11:17.639 +0900 [INFO] (0001:transaction): Loaded plugin embulk-input-datastore (0.1.1)
2018-06-26 00:11:17.743 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4
2018-06-26 00:11:17.784 +0900 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
{"extra":{"defaultProperty":"hogefuga"}, "myProp":100}
{"myProp":150}
2018-06-26 00:11:19.708 +0900 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
2018-06-26 00:11:19.714 +0900 [INFO] (main): Committed.
2018-06-26 00:11:19.715 +0900 [INFO] (main): Next config diff: {"in":{},"out":{}}

おわりに

なんとなく動くところまで持っていけたので記事にしました。 Cloud Firestore も同じ要領で plugin 作れたりするのかなとぼんやり考えたりしました。