メッセージキュー事始め in Ruby
ruby でメッセージキューを扱いたく、stomp クライアントライブラリを試してみました。
って、奥さんのエントリとシンクロしているようですが、気のせいではありません(笑)。
1. メッセージキューを起動
ruby の stomp はクライアントだけなので、キュー(サーバ側) は ActiveMQ を使うか、Perl の POE::Component::MessageQueue で立てる必要があります。
今回は奥さんとのメッセージのやりとりが目標なので、奥さんのプログラム( POE::Component::MessageQueue 使用)をそのまま使わせてもらいました。
/usr/bin/mq.pl --data-dir /tmp/mqプチTIPS: POE::Component::MessageQueue でキューを立てて、subscribe したり、subscribe されていない状態で send したりすると次のようなエラーが出て最初ハマってしまいました。
QUEUE: Message 10 intended for 8 on /queue/test could not be deliveredメッセージが直接的でないのでわかりにくいですが、実は POE::Component::MessageQueue をインスタンス化したときに指定されたストレージへの保存に失敗しているケースが考えられます。 というわけで、自分の場合は DBI-SQLite を入れたらエラーが出なくなりました。
2. ruby stomp のインストール
ruby stomp は gem でさっくりインストールできます。
gem install stomp
この手のライブラリの例に漏れずドキュメントはないので(苦笑)、ソースを読むか、 unit test のコードを眺めるかして心の準備をします。
3. ruby stomp でキューにメッセージを送る
メッセージを送るには Stomp::Connection#send を用います。
require 'rubygems' require 'stomp' conn = Stomp::Connection.open "test", "", "localhost", 61613 destination = "/queue/test" conn.send destination, "message 1\n"
4. ruby stomp でキューからメッセージを受け取る
メッセージを受け取るには Stomp::Connection#subscribe と receive を用います。
require 'rubygems' require 'stomp' conn = Stomp::Connection.open "test", "", "localhost", 61613 destination = "/queue/test" conn.subscribe destination while true msg = conn.receive # キューが空の場合は待ち受け puts msg.body end
# acknowledge する場合。
conn.subscribe destination, :ack => 'client'
while true
msg = conn.receive # キューが空の場合は待ち受け
puts msg.body
conn.ack msg.headers["message-id"] # メッセージ処理後、受け取り完了を知らせる
end
これで POE::Component::MessageQueue を通じて stompy( for Perl ) とのメッセージの送受信が可能であることを確認しました。簡単。
Ruby の中だけで(分散)非同期処理を行うなら、dRuby とかを使うのが楽ちんですが、他プラットフォームも混ざった状態であるなら、stomp もお手頃で良さそうです。
なお、Stomp::Client というクラスもあり、こちらを使えばメッセージ受け取り時のイベントリスナーをブロックで記述できる「はず」なのですが、手元ではどうしても期待通りに動作させることが出来ませんでした。
Stomp::Client も Stomp::Connection を使って実装されているので、ソースを丹念に読めばきっと原因はわかると思うんですが、手抜きして追いかけてません……