John D'Emic is a technologist, developer and author. He is currently a Solutions Architect at MuleSoft and a co-author of both editions of Mule in Action. John is a DZone MVB and is not an employee of DZone and has posted 10 posts at DZone. You can read more from them at their website. View Full User Profile

Twitter Complex Event Processing with Esper and Drools

12.05.2012
| 2468 views |
  • submit to reddit

Complex event processing engines are a natural fit for event driven platforms like Mule. Native   support has been available in Mule since version 3.2 by way of the Drools Module.  The Esper Module now offers an alternate way to leverage CEP in your integration applicationsEsper is a robust, performant, open source, complex event processing engine.  Let’s take a look at how to use with Mule and then see how it compares to ’ CEP support.

Subscribing to a Stream of Tweets

The following Gist shows a Mule application that reads status updates from ’s public timeline and logs instances of updates that contain the “#mule” hashtag.

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns="http://www.mulesoft.org/schema/mule/core"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:twitter="http://www.mulesoft.org/schema/mule/twitter"
      xmlns:quartz="http://www.mulesoft.org/schema/mule/quartz"
      
      xmlns:esper="http://www.mulesoft.org/schema/mule/esper"
      xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/3.2/mule.xsd
http://www.mulesoft.org/schema/mule/quartz http://www.mulesoft.org/schema/mule/quartz/3.2/mule-quartz.xsd
http://www.mulesoft.org/schema/mule/bpm http://www.mulesoft.org/schema/mule/bpm/3.2/mule-bpm.xsd
http://www.mulesoft.org/schema/mule/esper http://www.mulesoft.org/schema/mule/esper/1.0/mule-esper.xsd
http://www.mulesoft.org/schema/mule/twitter http://www.mulesoft.org/schema/mule/twitter/2.3/mule-twitter.xsd
">

    <description>
        Esper Cloud Connector example using Twitter
    </description>

    <esper:config configuration="esper-config.xml"/>

    <twitter:config consumerKey="****************" consumerSecret="***************"/>

    <flow name="main">
        <poll frequency="120000">
            <twitter:get-public-timeline/>
        </poll>
        <twitter:get-public-timeline/>
        <collection-splitter/>
        <esper:send eventPayload-ref="#[payload:]"/>
    </flow>

    <flow name="Event Listener Flow">
        <esper:listen statement="select count(hashtagEntities.where(p => p.text = 'mule')) as tagged
from Tweets having count(hashtagEntities.where(p => p.text = 'mule')) > 0"/>
        <logger category="INFO"/>
    </flow>
</mule>

To begin to use the Esper module we first need to create a configuration file that defines our event types.  Future versions of the module will likely remove this restriction, but for now you need to manually define an Esper configuration, like the following:

<esper-configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                     xmlns="http://www.espertech.com/schema/esper"
                     xsi:schemaLocation="
http://www.espertech.com/schema/esper
http://www.espertech.com/schema/esper/esper-configuration-2.0.xsd">

    <event-type name="Tweets" class="twitter4j.StatusJSONImpl"/>

</esper-configuration>

This registers an event stream called “Tweets”  whose events are the appropriate domain object in the twitter4j API.  Esper also supports Maps and XML documents as event types.

The “main” flow in the application above polls Twitter every 2 minutes for a sample of status updates (note that a “real” implementation of this application would probably subscribe to the firehose.)  A collection-splitter it used to split the List of status updates into atomic messages which are injected into the “Tweets” event stream using the <esper:send> message processor.

Now that we’re populating the event stream we can use Esper’s query language, EPL.  EPL is SQL-esque that simplifies querying event streams.  The <esper:listen> message source in the application above takes an EPL query to subscribe to events off a stream.  In this case we are setting a query that will generate a composite event when status updates with a “#mule” hashtag match the query.  These events are logged with the logger message processor.

Comparing to Drools CEP Support

Let’s see how Esper compares to Drool’s CEP support.  This Gist shows the Drools rules definition for the stock broker CEP example that ships with the latest version of Mule. This example demonstrates using CEP to alert when a given stock changes by a certain threshold in a given time window.  The Drools example using DRL is fairly verbose, as we can see from the Gist above.  The following illustrates the  same relevant functionality implemented with Esper:

<flow name="processStockTicks">
        <composite-source>
            <inbound-endpoint ref="stockTick"/>
            <ajax:inbound-endpoint channel="/services/cepExample/thresholdChange"/>
        </composite-source>
        <all>
            <ajax:outbound-endpoint channel="/services/cepExample/stockTick"/>
            <esper:send eventPayload-ref="#[payload:]"/>
        </all>
    </flow>

<flow name="sendAlerts">
        <esper:listen statement="
select symbol,price,(Math.abs(first(price) - last(price)) / first(price)) * 100.0 as percentChange
from StockTick.win:time(2 min) group by symbol
having (Math.abs(first(price) - last(price)) / first(price)) * 100.0 > 7.0 "/>
        <vm:outbound-endpoint path="alerts.out"/>
 </flow>

The EPL statement could be even more concise if we externally defined the average difference function.  If your application isn’t already using Drools then you may want to consider using Esper as an alternative.  The fact EPL is SQL-like also makes it an attractive option, as you don’t need to learn DRL to begin working with event streams.

Rate Limiting Traffic

I find CEP, and Esper in particular, extremely useful when developing and maintaining integration applications.  I previously blogged about using Esper in this context for doing QoS checks on endpoints.  You can also use it to rate limit traffic on a flow as illustrated in the following example:

 <flow name="Event Rate Limit Flow">
        <vm:inbound-endpoint path="filtered.in"/>
        <esper:filter eventPayload-ref="#[payload:]"
                      statement="select case when count(*) > 1000 then false else true end from TestEvent.win:time(5 min)"
                      key="case when count(*)>1000 then false else true end"/>
        <vm:outbound-endpoint path="filtered.out"/>
    </flow>

This example uses the <esper:filter> processor to stop traffic from passing if the given EPL statement evaluates to false.  In this example we’re stopping traffic if more then 1000 messages are received in a 5 minute window.

Functional use cases for CEP also abound.  The stock example that ships with Mule is an obvious one.  Rapidly changing data like stock prices, auction prices, RFID tag locations, etc all lend themselves to an event driven processing model.  A less obvious use case is observing the state of domain objects as they pass through a system.  You could, for instance, generate an event whenever an Order is submitting for a web store application.  A listener could then subscribe to the stream and generate a composite event when the order has been in an unfilled state for over a certain threshold of time.

Conclusion

Complex event processing is often a natural fit for integration applications built with Mule.  While Mule ships with CEP support via Drools’, the Esper module provides an alternative with a more concise configuration.

Published at DZone with permission of John D'Emic, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)