For the past couple of weeks I have been working on a way to connect JRuby with RabbitMQ message queuing server by using Workling plugin for Rails.
For this I had to use official RabbitMQ Java client since I gave up on all available gems for this purpose since they either didn't support JRuby or required Evented Mongrel web server. I had a bit of trouble so this post is to help everyone start with this.
Workling
This part was pretty easy. Just run:
script/plugin install git://github.com/purzelrakete/workling.git
and all required files should be placed in your projects vendor/plugins/workling folder.
Java client
The RabbitMQ Java client library allows Java code to interface to messaging servers. Workling will use this client with all communication to RabbitMq. Required files can be found on RabbitMQ site.
For this purpose we shall need commons-cli-1.1.jar, commons-io-1.2.jar and rabbitmq-client.jar. Place these files in appropriate folder in the workling directory.
AMQP Client
What we'll have to do is tell Workling how to use our Java client to communicate with RabbitMQ server. This is done by overriding default implementation of amqp_client.rb found in \vendor\plugins\workling\lib\workling\clients\ folder.
Before we start we'll have to import our jars first. This is best done in the "try_load_an_amqp_client" in the workling.rb file. Put something like this:
require 'java' require 'path_to_file/commons-cli-1.1.jar' require 'path_to_file/commons-io-1.2.jar' require 'path_to_file/rabbitmq-client.jar'
As you can see class AmqpClient extends Workling::Clients::Base "interface"
defines 5 methods for us to implement: request, retrieve, subscribe, connect and close.
At the beginning of the class implementation we shall have to load all required java classes by using java_import method. We shall require the following:
java_import 'com.rabbitmq.client.Address' java_import 'com.rabbitmq.client.Connection' java_import 'com.rabbitmq.client.ConnectionFactory' java_import 'com.rabbitmq.client.Channel' java_import 'com.rabbitmq.client.Consumer'
Now we can override connect method by inserting the following code:
def connect conn_factory = ConnectionFactory.new conn_factory.setUsername( "guest") conn_factory.setPassword("guest") conn_factory.setHost("localhost") conn_factory.setPort(5672) @connection = conn_factory.new_connection @channel = @connection.create_channel end
This code is pretty simple. We just created connection to our server by using default local parameters.
After this we can implement request and retrieve methods required for rails app to connect and post messages to queue and for workers to retrieve these messages.
We should have containers for all the objects we create so they wouldn't be recreated all over again and we can destroy them later on. For this simple example we shall use two hash maps where we will set are the objects declared. Later you can create more complex structures.
def initialize_objects(key) if !@exchanges[:test] @exchanges[:test] = true @channel.exchange_declare("test", "fanout", false, false, nil) end id !@queues[key] @queues[key] = true @channel.queue_bind(key, "test", key) end end def request(key, value) initialize_objects(key) message_body_byte = Marshal.dump(value).to_java_bytes @channel.basicPublish("test", key,nil, message_body_byte) puts " Message #{value} sent to queue #{key}" end def retrieve(key) initialize_objects(key) message = nil delivery = @channel.basic_get(key, false) if delivery message = Marshal.load(String.from_java_bytes(delivery.get_body)) puts " Received #{message}" end message end
These are two simple methods which create exchange and queue (if required), send messages and print result. You can later create more complex durable queues with acks and everything else you need. Now all you need is to implement close method by calling @channel.queue_unbind, @channel.close and @connection.close methods.
Our last task is to set the dispatcher and the client in the development.rb:
config.after_initialize do Workling::Remote.dispatcher = Workling::Remote::Runners::ClientRunner.new Workling::Remote.dispatcher.client = Workling::Clients::AmqpClient.new end
Notice we are only using retrieve for this example since I couldn't find a way to use subscribe with this client and without "evented" server.
When you load data into queues (by using async calls in your app) you can start the workers by running listen.rb in the \vendor\plugins\workling\script folder. We have to call it manually since JRuby does not support fork method used by commonly used workling_client.
Well, hope this is enough to get you going with JRuby, Workling and RabbitMQ. Enjoy ;)