Enterprise Integration Zone is brought to you in partnership with:

John D'Emic is a technologist, developer and author. He is currently a Solutions Architect at MuleSoft and a co-author of both editions of Mule in Action. John is a DZone MVB and is not an employee of DZone and has posted 10 posts at DZone. You can read more from them at their website. View Full User Profile

Lightweight RPC with ZeroMQ (ØMQ) and Protocol Buffers

11.26.2012
| 10448 views |
  • submit to reddit

A frequent issue I come across writing integration applications with Mule is deciding how to communicate back and forth between my front end application, typically a web or mobile application, and a flow hosted on Mule.

I could use web services and do something like annotate a component with JAX-RS and expose this out over HTTP.  This is potentially overkill,  particularly if I only want to host a few methods, the methods are asynchronous or I don’t want to deal with the overhead of HTTP.  It also could be a lot of extra effort if the only consumers of the API, at least initially, are internal facing applications.

Another choice is to use “synchronous” JMS with temporary reply queues.  While Mule makes this easy to do, particularly with MuleClient, I now have to deal with the overhead of spinning up a JMS infrastructure.   I could also be limited to Java only clients, depending on which JMS broker I choose.  The latter is particularly signifcant, as Java probably isn’t the technology of choice on the web or mobile layer.

ØMQ for RPC

ØMQ, or , is a networking library designed from the ground up to ease integration between distributed applications.  In addition to supporting a variety of messaging patterns, which are enumerated in the extremely well written guide,  the library is written in platform agnostic C with wrappers for different languages like Java, Python and Ruby.

These features make it a good candidate to solve the challenges I introduced above, particularly since a community contributed module for ØMQ was released recently.  Let’s consider a  simple service that accepts a request for a range of stock quotes and returns the results and see how we can host this service with Mule and expose it out with the ØMQ Module.

Data Serialization with Protocol Buffers

Data is transported back and forth over ØMQ as byte arrays.  We, as such, need to decide on a way to serialize our stock quote request and responses “on the wire.”  Before we do that, however, let’s take a look at the Java canonical data model we’re using on the client and server side.  The following Gists show the important bits of the StockQuote and StockQuoteResponse classes.

public class StockQuote implements Serializable {

    String symbol;

    Date date;

    Double open;

    Double high;

    Double low;

    Double close;

    Long volume;

    Double adjustedClose;


public class StockQuoteRequest implements Serializable {

    String symbol;

    Date startDate;

    Date endDate;


public interface StockDataService {

    public List<StockQuote> getQuote(StockQuoteRequest request);

}

We could use Java serialization to get the objects into byte arrays.  Ignoring the other deficiencies of default Java serialization, the main drawback is that it limits our clients to one’s running on a JVM.  XML or JSON provide better alternatives, but for the purposes of this example we’ll assume we want a more compact representation of the data (this isn’t totally unrealistic, stock quote data can be extremely time sensitive and we probably want to minimize serialization and deserialization overhead.)

Protocol Buffers provide a good middle ground and also boast a Mule Module to provide the necessary transformers we need to move back and forth from the byte array representations.  Let’s define two .proto files to define the wire format and generate the intermediary stubs for serialization.

package com.acmesoft.zeromq;

option java_package = "com.acmesoft.stock.model.serialization.protobuf";
option optimize_for = SPEED;package com.acmesoft.zeromq;

option java_package = "com.acmesoft.stock.model.serialization.protobuf";
option optimize_for = SPEED;

option java_multiple_files = true;

message StockQuoteResponseBuffer {
   repeated StockQuoteBuffer result = 1;
}

message StockQuoteBuffer {
  required string symbol = 1;
  required int64 date = 2;
  required double open = 3;
  required double high = 4;
  required double low = 5;
  required double close = 6;
  required int64 volume = 7;
  required double adjustedClose = 8;
}

option java_multiple_files = true;

message StockQuoteRequestBuffer {
  required string symbol = 1;
  required int64 start = 2;
  required int64 end = 3;
}

You typically would use the “protoc” compiler to generate the Java stubs.  This is tedious, however, so we’ll instead modify the pom.xml of our project to compile the protoc files during the compile goals:

<?xml version="1.0" encoding="UTF-8"?>
<plugin>
   <groupId>com.google.protobuf.tools</groupId>
   <artifactId>maven-protoc-plugin</artifactId>
   <configuration>
      <protocExecutable>/usr/local/bin/protoc</protocExecutable>
   </configuration>
   <executions>
      <execution>
         <goals>
            <goal>compile</goal>
            <goal>testCompile</goal>
         </goals>
      </execution>
   </executions>
</plugin>

Since we already have a domain model we’ll add some helper classes to simplify the serialization tasks on the client side.

   public byte[] toProtocolBufferAsBytes() {
        return StockQuoteRequestBuffer.newBuilder()
                .setSymbol(symbol)
                .setStart(startDate.getTime())
                .setEnd(endDate.getTime()).build().toByteArray();
    }

  public static StockQuoteRequest fromProtocolBuffer(StockQuoteRequestBuffer buffer) {

        StockQuoteRequest request = new StockQuoteRequest();
        request.setSymbol(buffer.getSymbol());
        request.setStartDate(new Date(buffer.getStart()));
        request.setEndDate(new Date(buffer.getEnd()));

        return request;
    }


  public static StockQuoteResponseBuffer toProtocolBuffer(List<StockQuote> quotes) {
        StockQuoteResponseBuffer.Builder responseBuilder = StockQuoteResponseBuffer.newBuilder();

        for (StockQuote quote : quotes) {
            responseBuilder.addResult(StockQuoteBuffer.newBuilder()
                    .setAdjustedClose(quote.getAdjustedClose())
                    .setClose(quote.getClose())
                    .setDate(quote.getDate().getTime())
                    .setHigh(quote.getHigh())
                    .setLow(quote.getLow())
                    .setOpen(quote.getOpen())
                    .setSymbol(quote.getSymbol())
                    .setVolume(quote.getVolume()).build());
        }
        return responseBuilder.build();
    }

    public static List<StockQuote> listOfStockQuotesFromBytes(byte[] bytes) {
        List<StockQuoteBuffer> buffer;
        try {
            buffer = StockQuoteResponseBuffer.parseFrom(bytes).getResultList();
        } catch (InvalidProtocolBufferException e) {
            throw new SerializationException(e);
        }

        List<StockQuote> quotes = new ArrayList<StockQuote>();

        for (StockQuoteBuffer stockQuoteBuffer : buffer) {
            StockQuote stockQuote = new StockQuote();
            stockQuote.setClose(stockQuoteBuffer.getClose());
            stockQuote.setDate(new Date(stockQuoteBuffer.getDate()));
            stockQuote.setHigh(stockQuoteBuffer.getHigh());
            stockQuote.setOpen(stockQuoteBuffer.getOpen());
            stockQuote.setSymbol(stockQuoteBuffer.getSymbol());
            stockQuote.setVolume(stockQuoteBuffer.getVolume());
            stockQuote.setAdjustedClose(stockQuoteBuffer.getAdjustedClose());
            stockQuote.setLow(stockQuoteBuffer.getLow());
            quotes.add(stockQuote);
        }

        return quotes;
    }

Configuring StockDataService

Now that we have a canonical data model and a wire format defined we’re ready to wire up a Mule flow to expose the service out.  Note that for this to work you need to have jzmq installed locally on your system.  The following dependency needs to be added to your pom.xml once its installed:

<dependency>
   <groupId>org.zeromq</groupId>
   <artifactId>zmq</artifactId>
   <version>2.2.0</version>
   <systemPath>/usr/local/lib/zmq.jar</systemPath>
   <scope>system</scope>
</dependency>

Where systemPath is the location of the zmq.jar on your filesystem.

Once that’s out of the way we can configure the flow, as illustrated below:

<flow name="main">
        <zeromq:inbound-endpoint address="tcp://*:9090" socket-operation="bind"
                                 exchange-pattern="request-response"/>
        <protobuf:deserialize
                protobufClass="com.acmesoft.stock.model.serialization.protobuf.StockQuoteRequestBuffer"/>
        <expression-transformer
                expression="com.acmesoft.stock.model.StockQuoteRequest.fromProtocolBuffer(payload)"/>
        <component class="com.acmesoft.stock.service.StockDataServiceImpl"/>
        <expression-transformer
                expression="return com.acmesoft.stock.model.StockQuote.toProtocolBuffer(payload)"/>
    </flow>

The ZeroMQ inbound-endpoint will be bound to TCP port 9090 with a request-response exchange pattern.  The deserialize MP in the protobuf module will deserialize the byte array to the generated StockQuoteRequestBuffer class.  From there we’ll use MEL to invoke the helper method on StockQuoteRequest to transform the intermediary class to the domain model.

The List of StockQuotes returned from StockDataService will  be transformed by the MEL expression using the “toProtocolBuffer” helper method on the domain model.  The Protocol Buffer Module is then smart enough to implicitly transform the intermediary object to a byte array for the response.

Consuming the Service from the Client Side

Now that the server is ready we can turn our attention to the client side code to invoke the remote service.  Let’s take a look at how this works:

StockQuoteRequest stockQuoteRequest = new StockQuoteRequest();
stockQuoteRequest.setSymbol("FB");
stockQuoteRequest.setStartDate(new Date( new Date().getTime() - (86400000 * 7)));
stockQuoteRequest.setEndDate(new Date());

ZMQ.Socket zmqSocket = zmqContext.socket(ZMQ.REQ);
zmqSocket.setReceiveTimeOut(RECEIVE_TIMEOUT);
zmqSocket.connect("tcp://localhost:9090");
zmqSocket.send(stockQuoteRequest.toProtocolBufferAsBytes(), 0);

List<StockQuote> quotes = StockQuote.listOfStockQuotesFromBytes(zmqSocket.recv(0));

We start off by defining the StockQuoteRequest object to give us all the quotes for Facebook stock from the last week.  We can then open up a ZMQ socket, set the timeout, connect to the ZMQ socket on the remote Mule instance and send the byte representation of the StockQuoteRequest to it.

zmqSocket.recv is then used to receive the bytes back from Mule.  From here we can use the listOfStockQuotesFromBytes helper method we wrote above to convert the Protocol Buffer representation to a List of StockQuotes.  Despite the fair bit of plumbing we did above, this is a pretty concise bit of client side code to invoke the remote service.

Conclusion

This blog post only touched on the features of ØMQ and the ØMQ Mule Module.  In addition to request-reply, other exchange-patterns are supported, like one-way, push and pull.  This effectively gives you the benefits of a reliable, asynchronous messaging layer without a centralized infrastructure.  I hope to cover this in a later post.

Protocol buffers also seem like a natural fit as a wire format for ØMQ.  protobuffers echo  ØMQ’s principals of being lightweight, fast and platform agnostic.  These are also, not coincidently, principals Mule shares as an integration framework.

The project for this example is available on GitHub.

Published at DZone with permission of John D'Emic, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)