EasyNetQ, a simple .NET API for RabbitMQ
For the shootout tests spraying byte arrays around was fine, but in the real world, we want our messages to be .NET types. I also wanted to provide developers with a very simple API that abstracted away the Exchange/Binding/Queue model of AMQP and instead provides a simple publish/subscribe and request/response model. My inspiration was the excellent work done by Dru Sellers and Chris Patterson with MassTransit (the new V2.0 beta is just out).
The code is on GitHub here:
https://github.com/mikehadlow/EasyNetQ
The API centres around an IBus interface that looks like this:
/// <summary>
/// Provides a simple Publish/Subscribe and Request/Response API for a message bus.
/// </summary>
public interface IBus : IDisposable
{
/// <summary>
/// Publishes a message.
/// </summary>
/// <typeparam name="T">The message type</typeparam>
/// <param name="message">The message to publish</param>
void Publish<T>(T message);
/// <summary>
/// Subscribes to a stream of messages that match a .NET type.
/// </summary>
/// <typeparam name="T">The type to subscribe to</typeparam>
/// <param name="subscriptionId">
/// A unique identifier for the subscription. Two subscriptions with the same subscriptionId
/// and type will get messages delivered in turn. This is useful if you want multiple subscribers
/// to load balance a subscription in a round-robin fashion.
/// </param>
/// <param name="onMessage">
/// The action to run when a message arrives.
/// </param>
void Subscribe<T>(string subscriptionId, Action<T> onMessage);
/// <summary>
/// Makes an RPC style asynchronous request.
/// </summary>
/// <typeparam name="TRequest">The request type.</typeparam>
/// <typeparam name="TResponse">The response type.</typeparam>
/// <param name="request">The request message.</param>
/// <param name="onResponse">The action to run when the response is received.</param>
void Request<TRequest, TResponse>(TRequest request, Action<TResponse> onResponse);
/// <summary>
/// Responds to an RPC request.
/// </summary>
/// <typeparam name="TRequest">The request type.</typeparam>
/// <typeparam name="TResponse">The response type.</typeparam>
/// <param name="responder">
/// A function to run when the request is received. It should return the response.
/// </param>
void Respond<TRequest, TResponse>(Func<TRequest, TResponse> responder);
}
To create a bus, just use a RabbitHutch, sorry I couldn’t resist it :)
var bus = RabbitHutch.CreateRabbitBus("localhost");
You can just pass in the name of the server to use the default Rabbit virtual host ‘/’, or you can specify a named virtual host like this:
var bus = RabbitHutch.CreateRabbitBus("localhost/myVirtualHost");
The first messaging pattern I wanted to support was publish/subscribe. Once you’ve got a bus instance, you can publish a message like this:
var message = new MyMessage {Text = "Hello!"};
bus.Publish(message);
This publishes the message to an exchange named by the message type.
You subscribe to a message like this:
bus.Subscribe<MyMessage>("test", message => Console.WriteLine(message.Text));
This creates a queue named ‘test_<message type>’ and binds it to the message type’s exchange. When a message is received it is passed to the Action<T> delegate. If there are more than one subscribers to the same message type named ‘test’, Rabbit will hand out the messages in a round-robin fashion, so you get simple load balancing out of the box. Subscribers to the same message type, but with different names will each get a copy of the message, as you’d expect.
The second messaging pattern is an asynchronous RPC. You can call a remote service like this:
var request = new TestRequestMessage {Text = "Hello from the client! "};
bus.Request<TestRequestMessage, TestResponseMessage>(request, response =>
Console.WriteLine("Got response: '{0}'", response.Text));
This first creates a new temporary queue for the TestResponseMessage. It then publishes the TestRequestMessage with a return address to the temporary queue. When the TestResponseMessage is received, it passes it to the Action<T> delegate. RabbitMQ happily creates temporary queues and provides a return address header, so this was very easy to implement.
To write an RPC server. Simple use the Respond method like this:
bus.Respond<TestRequestMessage, TestResponseMessage>(request =>
new TestResponseMessage { Text = request.Text + " all done!" });
This creates a subscription for the TestRequestMessage. When a message is received, the Func<TRequest, TResponse> delegate is passed the request and returns the response. The response message is then published to the temporary client queue.
Once again, scaling RPC servers is simply a question of running up new instances. Rabbit will automatically distribute messages to them.
The features of AMQP (and Rabbit) make creating this kind of API a breeze. Check it out and let me know what you think.
(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)




