Sunday 18 September 2011

Using RabbitMQ with JRuby and Workling plugin




For the past couple of weeks I have been working on a way to connect JRuby with RabbitMQ message queuing server by using Workling plugin for Rails.
For this I had to use official RabbitMQ Java client since I gave up on all available gems for this purpose since they either didn't support JRuby or required Evented Mongrel web server. I had a bit of trouble so this post is to help everyone start with this.

Workling
This part was pretty easy. Just run:
script/plugin install git://github.com/purzelrakete/workling.git
and all required files should be placed in your projects vendor/plugins/workling folder.

Java client
The RabbitMQ Java client library allows Java code to interface to messaging servers. Workling will use this client with all communication to RabbitMq. Required files can be found on RabbitMQ site.
For this purpose we shall need commons-cli-1.1.jar, commons-io-1.2.jar and rabbitmq-client.jar. Place these files in appropriate folder in the workling directory.

AMQP Client
What we'll have to do is tell Workling how to use our Java client to communicate with RabbitMQ server. This is done by overriding default implementation of amqp_client.rb found in \vendor\plugins\workling\lib\workling\clients\ folder.
Before we start we'll have to import our jars first. This is best done in the "try_load_an_amqp_client" in the workling.rb file. Put something like this:

require 'java'
require 'path_to_file/commons-cli-1.1.jar'
require 'path_to_file/commons-io-1.2.jar'
require 'path_to_file/rabbitmq-client.jar'

As you can see class AmqpClient extends Workling::Clients::Base "interface"
defines 5 methods for us to implement: request, retrieve, subscribe, connect and close.
At the beginning of the class implementation we shall have to load all required java classes by using java_import method. We shall require the following:

java_import 'com.rabbitmq.client.Address'
java_import 'com.rabbitmq.client.Connection'
java_import 'com.rabbitmq.client.ConnectionFactory'
java_import 'com.rabbitmq.client.Channel'
java_import 'com.rabbitmq.client.Consumer'

Now we can override connect method by inserting the following code:

def connect
  conn_factory = ConnectionFactory.new
  conn_factory.setUsername( "guest")
  conn_factory.setPassword("guest")
  conn_factory.setHost("localhost") 
  conn_factory.setPort(5672)
  @connection = conn_factory.new_connection
  @channel = @connection.create_channel
end 

This code is pretty simple. We just created connection to our server by using default local parameters.
After this we can implement request and retrieve methods required for rails app to connect and post messages to queue and for workers to retrieve these messages.
We should have containers for all the objects we create so they wouldn't be recreated all over again and we can destroy them later on. For this simple example we shall use two hash maps where we will set are the objects declared. Later you can create more complex structures.

def initialize_objects(key)
  if !@exchanges[:test]
    @exchanges[:test] = true
    @channel.exchange_declare("test", "fanout", false, false, nil)
  end

  id !@queues[key]
    @queues[key] = true 
    @channel.queue_bind(key, "test", key)
  end
end

def request(key, value)
  initialize_objects(key)
  message_body_byte = Marshal.dump(value).to_java_bytes
  @channel.basicPublish("test", key,nil, message_body_byte)
  puts " Message #{value} sent to queue #{key}"
end

def retrieve(key)
  initialize_objects(key)
  message = nil
  delivery = @channel.basic_get(key, false)
  if delivery
    message = Marshal.load(String.from_java_bytes(delivery.get_body))
    puts " Received #{message}"
  end
  message
end

These are two simple methods which create exchange and queue (if required), send messages and print result. You can later create more complex durable queues with acks and everything else you need. Now all you need is to implement close method by calling @channel.queue_unbind, @channel.close and @connection.close methods.

Our last task is to set the dispatcher and the client in the development.rb:

config.after_initialize do
  Workling::Remote.dispatcher = Workling::Remote::Runners::ClientRunner.new
  Workling::Remote.dispatcher.client = Workling::Clients::AmqpClient.new
end

Notice we are only using retrieve for this example since I couldn't find a way to use subscribe with this client and without "evented" server.
When you load data into queues (by using async calls in your app) you can start the workers by running listen.rb in the \vendor\plugins\workling\script folder. We have to call it manually since JRuby does not support fork method used by commonly used workling_client.

Well, hope this is enough to get you going with JRuby, Workling and RabbitMQ. Enjoy ;)