POSIX message queue を Go のコードから利用するためのライブラリ posix_mq を作った
表題の通りです。
cgo を使って POSIX message queue の基本的な操作、 open/close と send/receive とその他細々とした機能を実装しています。 とは言っても、それほど複雑なことはしておらず、 POSIX の関数呼び出しを愚直に Go の func にラップしているだけなのですけどね。
なぜやったのか
入門 Kubernetes などを読むと、 Pod 内のコンテナ同士では SysV / POSIX の IPC namespace を共有している記述があります。
Pod 内の別コンテナへの通信となると、 sidecar パターンで Envoy を動かすようなネットワーキング用プロキシを介するのに利用したり fluentd などのロギングエージェントにログを送ったり、お決まりのパターンがあると思います。 そういった際に低コストで非言語依存なプロトコルが欲しくなることが多々あるように考えられます。
POSIX message queue は POSIX としての仕様も存在し、低コストな IPC 手段のひとつです。 これを選択肢のひとつとして用意しておくことは発生する課題に柔軟に対応するのに重要であると考えます。 手段は他にもあるし、同様の機能であれば SySV message queue やいっそ AMQP などをしゃべっても良い気もするのですが、手段を増やす意味でも今回のライブラリを開発してみた次第です。
動作例
sender / receiver の通信例
シンプルなsender と receiver であれば以下のコードで実装してやり取りできます。
- sender.go
package main import ( "fmt" "log" "time" "github.com/syucream/posix_mq/src/posix_mq" ) const maxTickNum = 10 func main() { oflag := posix_mq.O_WRONLY | posix_mq.O_CREAT mq, err := posix_mq.NewMessageQueue("/posix_mq_example", oflag, 0666, nil) if err != nil { log.Fatal(err) } defer mq.Close() count := 0 for { count++ mq.Send([]byte(fmt.Sprintf("Hello, World : %d\n", count)), 0) fmt.Println("Sent a new message") if count >= maxTickNum { break } time.Sleep(1 * time.Second) } }
- receiver.go
package main import ( "fmt" "log" "github.com/syucream/posix_mq/src/posix_mq" ) const maxTickNum = 10 func main() { oflag := posix_mq.O_RDONLY mq, err := posix_mq.NewMessageQueue("/posix_mq_example", oflag, 0666, nil) if err != nil { log.Fatal(err) } defer mq.Close() fmt.Println("Start receiving messages") count := 0 for { count++ msg, _, err := mq.Receive() if err != nil { log.Fatal(err) } fmt.Printf(string(msg)) if count >= maxTickNum { break } } }
Kubernetes の同一 Pod 上 container 通信例
折角なので上記の sender / receiver を Kubernetes の Pod に押し込んで通信させてみます。 あらかじめてきとうに sender / receiver 用の Docker イメージを作っておいてください。
Pod の定義なのですが、愚直に container の設定を羅列していくだけです。 IPC namespace は勝手に共有されるのでそれに関する設定や準備は必要ありません。
apiVersion: v1 kind: Pod metadata: name: posixmq-pod spec: containers: - name: posixmq-sender image: "posix_mq_sender" imagePullPolicy: IfNotPresent - name: posixmq-receiver image: "posix_mq_receiver" imagePullPolicy: IfNotPresent restartPolicy: Never
Pod の動作確認をさくっとしてみましょう。
$ kubectl apply -f example/kubernetes/pod-posixmq.yaml pod "posixmq-pod" created ... $ kubectl logs posixmq-pod -c posixmq-sender go run example/exec/sender.go Sent a new message Sent a new message Sent a new message Sent a new message Sent a new message Sent a new message Sent a new message Sent a new message Sent a new message Sent a new message $ kubectl logs posixmq-pod -c posixmq-receiver go run example/exec/receiver.go Start receiving messages Hello, World : 1 Hello, World : 2 Hello, World : 3 Hello, World : 4 Hello, World : 5 Hello, World : 6 Hello, World : 7 Hello, World : 8 Hello, World : 9 Hello, World : 10
この出力結果を見るに、 sender の送ったメッセージがちゃんと receiver に届いていそうです!
余談
POSIX の機能となると可搬性を期待してしまいますが、 POSIX message queue は darwin や windows では実装されていなかったりと意外に可搬性に欠けます。 対して SysV の message queue はこれに比べて可搬性が高く、より多くの環境でサポートされています。 (このあたりは Linuxプログラミングインタフェース にも記述されていますね!)
とは言っても本記事で書くようにあらかじめ環境が定められている Kubernetes クラスタ上で動かす場合は、それほど気にすることでも無いのかもしれません。 また、 SysV message queue の Go ラッパーライブラリは Shopify により実装されているのでこれを試すのもアリかもです。
ちなみに少し前のベンチマーク内容ですが、 POSIX message queue は IPC の手段として結構高パフォーマンスであるような調査結果もあります。