Enterprise Integration Zone is brought to you in partnership with:

Bilgin Ibryam is a software engineer with Master's degree in Computer Science and currently working for Red Hat in London. He is also the author of "Instant Apache Camel Message Routing" book, an open source enthusiast, Apache OFBiz, and Apache Camel committer. In his spare time, he enjoys contributing to open source projects and blogging at ofbizian.com. Bilgin is a DZone MVB and is not an employee of DZone and has posted 21 posts at DZone. You can read more from them at their website. View Full User Profile

Transactional Caching for Camel With Infinispan

07.31.2013
| 2170 views |
  • submit to reddit

Some time ago I created a Redis connector for Camel. Redis is an awesome key-value store (and a lot more) but then I needed a cache running in the same JVM as Camel and noticed Infinispan, which has just switched to ASL v2. There are already other connectors in Camel for caching on the JVM, like Hazelcast and EHCache, but if you are already using Camel as part of other Red Hat products or want to see how LIRS eviction overperforms LRU, Infinispan is worth trying.

Briefly, Infinispan is transactional in-memory key-value store and data grid. When used in embedded mode, Infinispan resides in the same JVM as Camel and allows Camel consumer to receive cache change notifications:

<route>
  <from uri="infinispan://localhost?cacheContainer=#cacheContainer&caseName=orders&eventTypes=CACHE_ENTRY_CREATED"/>
  <filter>
    <simple>${out.header.CamelInfinispanIsPre} == true</simple>
    <to uri="log:com.mycompany.order?showHeaders=true"/>
  </filter>
</route>

In the example above, when a cache entry is created, Infinispan will fire two events - one before and one after the cache entry has been created. It is also possible to receive the events synchronously, meaning in the same thread that processes the cache action, or asynchronously in a separate thread without blocking the cache action.

Using Infinispan as a local cache is simple, it exposes a ConcurrentMap interface and has the usual expiration, eviction, passivation, persistent store, querying, etc features. What makes Infinispan also a data grid is the ability of the nodes to discover other nodes and replicate or distribute data among themselves. Replication allows sharing data across a cluster whereas distribution uses consistent hashing algorithm to achieve better scalability.
In client-server mode, Infinispan is running as standalone application, and Camel producer can send messages using Infinispan's Hot Rod client. Hot Rod is a binary, language neutral, intelligent protocol, allowing interaction with Infinisnap servers in topology and hash-distribution-aware fashion.

The Infinispan producer in Camel currently offers GET, PUT, REMOVE and CLEAR operations. Here is an example of the producer putting data to orders cache:

<route>
  <from uri="direct:orderCache"/>
  <setHeader headerName="CamelInfinispanKey">
    <simple>${in.header.orderId}</simple>
  </setHeader>
  <setHeader headerName="CamelInfinispanValue">
    <simple>${in.header.orderTotal}</simple>
  </setHeader>
  <setHeader headerName="CamelInfinispanOperation">
    <simple>CamelInfinispanOperationPut</simple>
  </setHeader>
  <to uri="infinispan://localhost?caseName=orders"/>
</route>

Let's create a more interesting example. Infinispan is also JTA compliant and can participate in transactions. We will create a REST API for registering persons, which will first persist the person in a relational database using Camel sql component, and then will put the firstName into Infinispan cache in the same transaction. We will do that using a transacted Camel route, so if an error occurs during routing, at any stage, Camel will make sure the transaction(for the cache and the database) is rolled back, so that the database and the cache are always in a consistent state.

<route>
  <from uri="restlet:/persons?restletMethod=POST"/>
  <transacted/>

  <!-- PERSIST TO DB -->
  <to uri="sql:insert into person(firstName, lastName) values(:#firstName,:#lastName)?dataSource=#dataSource"/>

  <!-- DAMN EXCEPTION THROWER-->
  <filter>
    <simple>${in.header.lastName} == "damn"</simple>
    <throwException ref="damn"/>
  </filter>

  <!-- PUT TO CACHE -->
  <to uri="sql:select id from person WHERE id = (select max(id) from person)?dataSource=#dataSource"/>
  <setHeader headerName="personId">
    <simple>${body[0][ID]}</simple>
  </setHeader>
  <setHeader headerName="CamelInfinispanKey">
    <simple>${headerAs(personId, String)}</simple>
  </setHeader>
  <setHeader headerName="CamelInfinispanValue">
    <simple>${in.header.firstName}</simple>
  </setHeader>
  <setHeader headerName="CamelInfinispanOperation">
    <simple>CamelInfinispanOperationPut</simple>
  </setHeader>
  <to uri="infinispan://localhost?cacheContainer=#cacheContainer&caseName=orders"/>
</route>

As you can see there is no magic or extra configuration in the route, it is a standard route. We have small bit of code that throws an exception when the person lastName is damn to simulate errors in the middle of the route.

The application runs in a standalone mode with atomikos JTA transaction manager. First we create a JtaTransactionManager

<bean id="userTransaction" class="com.atomikos.icatch.jta.UserTransactionImp"/>
<bean id="userTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"/>
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"> 
  <constructor-arg ref="userTransaction"/>
  <constructor-arg ref="userTransactionManager"/>
</bean>

Then wrap our DataSource with it:

public AtomikosDataSourceBean atomikosDataSourceBean() throws Exception {
    EmbeddedXADataSource ds = new EmbeddedXADataSource();
    ds.setCreateDatabase("create");
    ds.setDatabaseName("target/testdb");
    ds.setUser("");
    ds.setPassword("");
 
    AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
    xaDataSource.setXaDataSource(ds);
    xaDataSource.setUniqueResourceName("xaDerby");
    return xaDataSource;
}

and using a TransactionManagerLookup tell Infinispan to participate in the same transaction:

public BasicCacheContainer basicCacheContainer() throws Throwable {
    GlobalConfiguration glob = new GlobalConfigurationBuilder().nonClusteredDefault().build();

    Configuration loc = new ConfigurationBuilder()
        .transaction().transactionMode(TransactionMode.TRANSACTIONAL)
        .transactionManagerLookup(new TransactionManagerLookup() {
            @Override
            public TransactionManager getTransactionManager() throws Exception {
            return jtaTransactionManager.getTransactionManager();
            }
        }).build();

    return new DefaultCacheManager(glob, loc, true);
}

After all this boilerplate code, we have our datasource, cache and Camel route participating in the same transaction. To see the full REST example with two phase commit and rollback, get the source code from github and play with it.

BTW, the Camel-infinispan component is still not part of Camel trunk, to run the example you will need that too.


Published at DZone with permission of Bilgin Ibryam, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)