ブログ・ア・ラ・クレーム

技術的なメモとかライフログとか。

POSIX message queue を Go のコードから利用するためのライブラリ posix_mq を作った

表題の通りです。

github.com

cgo を使って POSIX message queue の基本的な操作、 open/close と send/receive とその他細々とした機能を実装しています。 とは言っても、それほど複雑なことはしておらず、 POSIX の関数呼び出しを愚直に Go の func にラップしているだけなのですけどね。

なぜやったのか

入門 Kubernetes などを読むと、 Pod 内のコンテナ同士では SysV / POSIX の IPC namespace を共有している記述があります。

www.oreilly.co.jp

Pod 内の別コンテナへの通信となると、 sidecar パターンで Envoy を動かすようなネットワーキング用プロキシを介するのに利用したり fluentd などのロギングエージェントにログを送ったり、お決まりのパターンがあると思います。 そういった際に低コストで非言語依存なプロトコルが欲しくなることが多々あるように考えられます。

POSIX message queue は POSIX としての仕様も存在し、低コストな IPC 手段のひとつです。 これを選択肢のひとつとして用意しておくことは発生する課題に柔軟に対応するのに重要であると考えます。 手段は他にもあるし、同様の機能であれば SySV message queue やいっそ AMQP などをしゃべっても良い気もするのですが、手段を増やす意味でも今回のライブラリを開発してみた次第です。

動作例

sender / receiver の通信例

シンプルなsender と receiver であれば以下のコードで実装してやり取りできます。

  • sender.go
package main

import (
    "fmt"
    "log"
    "time"

    "github.com/syucream/posix_mq/src/posix_mq"
)

const maxTickNum = 10

func main() {
    oflag := posix_mq.O_WRONLY | posix_mq.O_CREAT
    mq, err := posix_mq.NewMessageQueue("/posix_mq_example", oflag, 0666, nil)
    if err != nil {
        log.Fatal(err)
    }
    defer mq.Close()

    count := 0
    for {
        count++
        mq.Send([]byte(fmt.Sprintf("Hello, World : %d\n", count)), 0)
        fmt.Println("Sent a new message")

        if count >= maxTickNum {
            break
        }

        time.Sleep(1 * time.Second)
    }
}
  • receiver.go
package main

import (
    "fmt"
    "log"

    "github.com/syucream/posix_mq/src/posix_mq"
)

const maxTickNum = 10

func main() {
    oflag := posix_mq.O_RDONLY
    mq, err := posix_mq.NewMessageQueue("/posix_mq_example", oflag, 0666, nil)
    if err != nil {
        log.Fatal(err)
    }
    defer mq.Close()

    fmt.Println("Start receiving messages")

    count := 0
    for {
        count++

        msg, _, err := mq.Receive()
        if err != nil {
            log.Fatal(err)
        }
        fmt.Printf(string(msg))

        if count >= maxTickNum {
            break
        }
    }
}

Kubernetes の同一 Pod 上 container 通信例

折角なので上記の sender / receiver を Kubernetes の Pod に押し込んで通信させてみます。 あらかじめてきとうに sender / receiver 用の Docker イメージを作っておいてください。

Pod の定義なのですが、愚直に container の設定を羅列していくだけです。 IPC namespace は勝手に共有されるのでそれに関する設定や準備は必要ありません。

apiVersion: v1
kind: Pod
metadata:
  name: posixmq-pod
spec:
  containers:
    - name: posixmq-sender
      image: "posix_mq_sender"
      imagePullPolicy: IfNotPresent
    - name: posixmq-receiver
      image: "posix_mq_receiver"
      imagePullPolicy: IfNotPresent
  restartPolicy: Never

Pod の動作確認をさくっとしてみましょう。

$ kubectl apply -f example/kubernetes/pod-posixmq.yaml
pod "posixmq-pod" created
...
$ kubectl logs posixmq-pod -c posixmq-sender
go run example/exec/sender.go
Sent a new message
Sent a new message
Sent a new message
Sent a new message
Sent a new message
Sent a new message
Sent a new message
Sent a new message
Sent a new message
Sent a new message
$ kubectl logs posixmq-pod -c posixmq-receiver
go run example/exec/receiver.go
Start receiving messages
Hello, World : 1
Hello, World : 2
Hello, World : 3
Hello, World : 4
Hello, World : 5
Hello, World : 6
Hello, World : 7
Hello, World : 8
Hello, World : 9
Hello, World : 10

この出力結果を見るに、 sender の送ったメッセージがちゃんと receiver に届いていそうです!

余談

POSIX の機能となると可搬性を期待してしまいますが、 POSIX message queue は darwinwindows では実装されていなかったりと意外に可搬性に欠けます。 対して SysV の message queue はこれに比べて可搬性が高く、より多くの環境でサポートされています。 (このあたりは Linuxプログラミングインタフェース にも記述されていますね!)

www.oreilly.co.jp

とは言っても本記事で書くようにあらかじめ環境が定められている Kubernetes クラスタ上で動かす場合は、それほど気にすることでも無いのかもしれません。 また、 SysV message queue の Go ラッパーライブラリは Shopify により実装されているのでこれを試すのもアリかもです。

github.com

ちなみに少し前のベンチマーク内容ですが、 POSIX message queue は IPC の手段として結構高パフォーマンスであるような調査結果もあります。

www.programering.com