Enterprise Integration Zone is brought to you in partnership with:

John D'Emic is a technologist, developer and author. He is currently a Solutions Architect at MuleSoft and a co-author of both editions of Mule in Action. John is a DZone MVB and is not an employee of DZone and has posted 10 posts at DZone. You can read more from them at their website. View Full User Profile

Asynchronous Message Processing with Mule

12.01.2012
| 2735 views |
  • submit to reddit

Processing messages asynchronously is an important technique when developing integration applications .   Asynchronous applications are typically easier to scale, allow for the implementation of reliability patterns and sometimes better reflect use cases in the real world.  Mule, not surprisingly, offers a wealth of opportunities to process messages asynchronously.

Asynchronous Flows

Setting exchange-pattern of a message source to “one-way” enables asynchronous processing for a flow. Some transports and connectors, like JMS or the VM transport, are asynchronous by default. Other transports which are inherently synchronous, like HTTP, need there exchange pattern explicitly set. Setting one-way exchange patterns on these transports allows you to simulate asynchronous behavior with protocols that would otherwise  not be asynchronous.    The following Gist demonstrates how to asynchronously bridge an HTTP request to JMS.

<flow name="HTTP to JMS Flow">
        <http:inbound-endpoint address="http://localhost:8080/foo" exchange-pattern="one-way"/>
        <jms:outbound-endpoint queue="messages"/>
 </flow>

Message Aggregation

You can process asynchronously dispatched messages in groups by using the collection- aggregator. Message groups are defined by setting the correlationId property of a MuleMessage or by setting the MULE_CORRELATION_ID outbound header. The correlationGroupSize property of MuleMessage, or the MULE_CORRELATION_GROUP_SIZE header, define the amount of messages in a group.

The following demonstrates how the collection-aggregator can be used to asynchronously wait and collect the contents of a correlation group arriving on a VM inbound-endpoint.

<flow name="aggregate.messages">
  <vm:inbound-endpoint path="foo.bar" exchange-pattern=”one-way”/>
  <collection-aggregator timeout="6000" failOnTimeout="false"/>
  <vm:outbound-endpoint path="messages.out"/>
</flow>

Message Splitting

Some message payloads, like collections or XML documents, can be split and dispatched asynchronously.  Here’s some of the message splitters Mule supports:

  • collection-splitter:  Splits a List payload into individual messages.
  • splitter: Split a message using the Mule Expression Language.
  • mulexml:filter-based-splitter: Spits an XML document payload using an XPath expression.

The following Gist demonstrates splitting a java.util.List and routing to a JMS queue.

  <collection-splitter/>
  <jms:outbound-endpoint queue="order.queue"/>

Message Chunking

Split message payloads can be reassembled by using the message-chunk-aggregator. By default the message-chunk-aggregator will use the correlationId and correlationGroupSize propertis of the MuleMessage for reassembly. You can define an optional “correlationIdExpression” to reassemble with a different message property.

The following flow illustrates how to assemble a group of split messages back together.

<flow name="aggregate.chunks">
  <vm:inbound-endpoint path="foo.bar"/>
  <message-chunk-aggregator/>
  <vm:outbound-endpint path="chunk.out"/>
</flow>

Tuning Asynchronous Flows

Asynchronous processing for a flow can be tuned by defining a queued-asynchronous-processing-strategy. Multiple queued-asynchronous-processing-strategy can be defined and set using the flow’s “processingStrategy” attribute. The following illustrates how to configure a flow to use up to 500 threads to asynchronously process messages arriving a VM inbound- endpoint.

<queued-asynchronous-processing-strategy name="allow500Threads" maxThreads="500"/>

<flow name="acceptOrders" processingStrategy="allow500Threads">
  <vm:inbound-endpoint path="acceptOrders" exchange-pattern="one-way"/>
  <vm:outbound-endpoint path="commonProcessing" exchange-pattern="one-way"/>
</flow>

Wrapping Up

Asynchronous message handling is one of the keys to using Mule effectively.  Hopefully this post illustrated some of Mule’s features that make dispatching, sending and tuning asynchronous message flows easy.

Related posts:

  1. Twitter Complex Event Processing (CEP) with Esper and Drools
  2. ESB or not to ESB Revisited – Part 3, API Layer and Grid Processing Architecture
  3. Routing with Message Processors in Flows (Part 1)
  4. Mule 3 Architecture, Part 2: Introducing the Message Processor
Published at DZone with permission of John D'Emic, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)