Enterprise Integration Zone is brought to you in partnership with:

I'm an Enterprise Software Architect working predominantly on integration solutions in the financial software arena. Core competencies include Java, Message Driven Architectures, Spring, Spring Integration and data storage technologies. Matt is a DZone MVB and is not an employee of DZone and has posted 13 posts at DZone. You can read more from them at their website. View Full User Profile

Spring Integration - Message Gateway Adapters

07.31.2012
| 10593 views |
  • submit to reddit

This article is going to present a technique for handling response types from message sub-systems in a service-based Message Driven Architecture (MDA). It will provide some theory, experience of message sub-system development and some robust working code that's running in several financial institution production deployments.

As Spring Integration is underpinned by a well established set of Spring APIs familiar to most developers, I have chosen this as a basis for this article.

If developers only ever had to build integration flows to handle positive business responses, this article would not be necessary. However, real world development, particularly within service integration environments, results in developers having to deal not just with positive business responses but business exceptions, technical exceptions and services that become unresponsive.

Spring Integration Gateways

Spring Integration (SI) Gateways are entry points for message sub-systems. Specify a gateway using a Java interface and a dynamic proxy implementation is automatically generated by the SI framework at runtime such that it can be used by any beans in which it's injected. Incidentally, SI gateways also offer a number of other benefits such as request & reply timeout behaviour.

Typical Message Driven Architecture (MDA) Applications

As with Java design patterns for application development, similar patterns exist for integration applications. One such common integration pattern is service-chaining - a number of services are connected in series or parallel that together perform a business function. If you've built and deployed services using MuleSoft's Mule, SpringSource's Spring Integration, FuseSource's ServiceMix (Camel), Oracle's Service Bus or IBMs WebSphere ESB the chances are that you've already built an application using chained-services. This pattern will become ever more widespread as the software industry moves away from client-server topologies towards service based architectures.

A recent engagement in which Incept5 supplied architectural consultancy will be used as an example implementation of the service-chain application pattern. The engagement required the team to build a solution that would transition financial messages (SWIFT FIN) from a raw (ISO15022) state through binding, validation and transformation (into XML) and ultimately added to a data store for further processing or business exception management.

Using EIP graph notation, the first phase service composition could be represented as follows. A document message arrives into the application domain, it's parsed and bound to a Java object, semantically validated as a SWIFT message, transformed to XML and then stored in a datastore. As a side note, the binding, semantic validation and transformation are all performed by C24s iO product. Furthermore, the full solution sample that can be found in GitHub for this project contains dispatcher configurations in order that thread pools can be used for each message sub-system, for the sake of brevity and clarity, such details will be omitted from this article.

Although this configuration specifies a chain of services it's not adequate to form the bases of a robust production deployment. An exception thrown by any of the services would be thrown straight back to the entry gateway thus loosing context, i.e. which service threw the exception, any non-response code invoked by the service may result in its Java thread getting parked and null values returned by a service may cause unexpected problems or even for the entry gateway to hang; if an entry gateway is used unlike in this diagram.

Unresponsive Service Invocation

The Spring Integration specific construct for accessing a message sub-system is the gateway. Although apparently simple, the SI gateway is a powerful feature that results in generation of a dynamic proxy generated by Spring's GatewayProxyFactoryBean. This bean can be injected into new or existing code or services as an implementation of the interface. The SI gateway also provides facilities to deal with timeouts and provides some error handling facility. A typical Spring Integration namespace XML configuration is as follows:
<int:channel id="parsing-gw-request-channel" datatype="java.lang.String">
  <int:queue capacity="${gateway.parse.queue.capacity}"/>
</int:channel>
    

<int:gateway id="parseGateway"
             service-interface=

                 "com.c24.solution.swift.flatten.gateway.ParseGateway"

             default-request-channel="parsing-gw-request-channel"
             default-reply-channel="parsing-gw-reply-channel"
             default-reply-timeout="${gateway.parse.timeout}"/>

<int:channel id="parsing-gw-reply-channel" 

             datatype="biz.c24.io.api.data.ComplexDataObject"/>
Evolving the design, the next phase application architecture design needs to take advantage of these gateways, the Spring Integration context can be extended quite simply by creating a Java interface for each service. Building message sub-system gateways leads us towards a design model like the following:

A small amount of additional configuration and some very simple Java interfaces means that developers can now configure request/reply timeouts on the gateway and avoid any unresponsive code - assuming (as we always should) that code written locally or supplied by 3rd parties is capable of misbehaving.

Additionally, the SI gateway allows specification of an error handling channel so that you have the opportunity to handle errors. This design I am presenting here is not going to use error channels but handle them in a different way - hopefully that will become more obvious as the design evolves.

Specific semantics and configuration examples for gateways and gateway timeouts can be found in reference material provided by SpringSource and other blogs.

A significant benefit to the integration solution has been added with the use of Spring Integration gateways. However, further improvements need to be made for the following reasons:

A gateway timeout will result in a null value being returned to the calling, or outer, flow. This is a business exception condition that means that the payload that was undergoing processing needs to be pushed into a business exception management process in order that it can be progressed through to conclusion - normally dictated by the business. If a transient technical issue caused this problem, resubmission may solve the exception, otherwise further investigation will be required. Whatever the eventual outcome, context about the failure and it's location need to be recorded and made available to the business exception management operative. The context must contain information about the message sub-system that failed to process the message and the message itself.

  • Exceptions generated by message sub-systems can be handled in a few different ways, through the error channel inside the gateway undergoing message processing failure or by the calling flow. Again, context needs to be recorded. In this case, the technical exception needs to be added to the context, along with the gateway processing the message undergoing failure and any additional information that may be used within the business exception management process for resolution.
  •  

    If transactions were involved in this flow, for example a transactional JMS message consumption flow trigger, it would be possible to rollback the transaction in order that it can be re-queued to a DLQ. However, the design in this software compensates for failures by pushing failing messages to a destination configured by the developer directly; and this may of course be a JMS queue if that's required. This avoids transaction rollback and adds the benefit of exception context directly rather than operations staff and developers having to scour logs to locate exception details.

    The Gateway Adapter

    In order to handle all exceptions taking place within a message sub-system and also handle null values a Gateway Adapter class can be used. The reason that the SI gateway error channel is not used for this purpose is that it would be complex to define and would have to be done in several places. The gateway adapter allows all business exception conditions to be handled in one place and treat them in the same way. The Gateway Adapter is a custom written Spring bean that invokes the injected gateway directly and manages null and exception responses before allowing the invocation request to return to the caller (or outer flow).

    The architectural design diagram evolved from the previous design phase includes a service activator backed by a Gateway Adapter bean, this calls the injected gateway (dynamic proxy) which calls the business service.

    Nuts and Bolts

    The design diagrams are a useful guide for making the point but maybe the more interesting part is the configuration and code itself. As with the entire solution, the outer or calling flow can be seen in the project located in GitHub, however a useful snippet to view is one of the gateway adapter namespace configurations, the gateway, the gateway adapter and the gateway service configuration. As a number of gateways exist in this application, and they all follow the same pattern, the following configuration and code will merely demonstrate one of them.

    Gateway Adapter Namespace Configuration

    <int:channel id="message-parse-channel" datatype="java.lang.String"/>
    <int:chain input-channel="message-parse-channel" 
               output-channel="message-validate-channel">
        <int:service-activator ref="parseGatewayService" method="service"/>
    </int:chain>
    Gateway
    package com.c24.solution.swift.flatten.gateway;
    
    import org.springframework.integration.Message;
    
    /**
     * @author Matt Vickery - matt.vickery@incept5.com
     * @since 17/05/2012
     */
    public interface ParseGateway {
        public Message<?> send(Message<String> message);
    }
    GatewayAdapter
    package com.c24.solution.swift.flatten.gateway.adapter;
    
    import biz.c24.io.api.data.ComplexDataObject;
    import com.c24.solution.swift.flatten.exception.ExceptionContext;
    import com.c24.solution.swift.flatten.exception.ExceptionSubContext;
    import com.c24.solution.swift.flatten.gateway.ExceptionGatewayService;
    import com.c24.solution.swift.flatten.gateway.ParseGateway;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.integration.Message;
    import org.springframework.util.Assert;
    
    /**
     * @author Matt Vickery - matt.vickery@incept5.com
     * @since 17/05/2012
     */
    public class ParseGatewayService extends AbstractGatewayService {
    
      private static final Logger LOG = 
    
          LoggerFactory.getLogger(ParseGatewayService.class);
      private final ParseGateway parseGateway;
      private final ExceptionGatewayService exceptionGatewayService;
    
      public ParseGatewayService(
    
             final ParseGateway parseGateway,
             final ExceptionGatewayService exceptionGatewayService) {
          this.parseGateway = parseGateway;
          this.exceptionGatewayService = exceptionGatewayService;
      }
    
      public Message<ComplexDataObject> service(Message<String> message) {
    
          Message<?> response;
          try {
              LOG.debug("Entering parse gateway.");
              response = parseGateway.send(message);
          } catch (RuntimeException e) {
              LOG.error("Exception response .. {}, exception: {}", getClass(), e);
              LOG.error("Invoking ... process because: {}.", e.getCause());
              buildExceptionContextAndDispatch(
    
                message, 
    
                ExceptionContext.PARSE_FAILURE,
                ExceptionSubContext.EXCEPTION_GATEWAY_RESPONSE, 
    
                exceptionGatewayService);
              throw e;
          }
    
          if (response != null) {
            if (!(response.getPayload() instanceof ComplexDataObject))
                throw new IllegalStateException(INTERRUPTING_..._EXCEPTION);
          } else {
            LOG.info("Null response received ....", getClass());
            buildExceptionContextAndDispatch(
    
              message, 
    
              ExceptionContext.PARSE_FAILURE,
              ExceptionSubContext.NULL_GATEWAY_RESPONSE, 
    
              exceptionGatewayService);
            throw new GatewayAdapterException(NULL_GATEWAY_RESPONSE_CAUGHT);
          }
    
          Assert.state(response.getPayload() instanceof ComplexDataObject);
          return (Message<ComplexDataObject>) response;
      }
    }
    GatewayService
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:c24="http://schema.c24.biz/spring-integration"
           xmlns:int="http://www.springframework.org/schema/integration"
           xsi:schemaLocation="http://www.springframework.org/schema/integration
     http://www..org/schema/integration/spring-integration.xsd  http://schema.c24.biz/spring-integration
     http://schema.c24.biz/spring-integration.xsd
     http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans.xsd">
    
        <int:channel id="parsing-gw-request-channel" 
                     datatype="java.lang.String">
            <int:queue capacity="${gateway.parse.queue.capacity}"/>
        </int:channel>
        
    
        <int:gateway 
    
          id="parseGateway"
          service-interface="com.c24.solution.swift.flatten.gateway.ParseGateway"
          default-request-channel="parsing-gw-request-channel"
          default-reply-channel="parsing-gw-reply-channel"
          default-reply-timeout="${gateway.parse.timeout}"/>
        
    
        <int:channel id="parsing-gw-reply-channel" 
                     datatype="biz.c24.io.api.data.ComplexDataObject"/>
    
        <int:chain input-channel="parsing-gw-request-channel" 
                   output-channel="parsing-gw-reply-channel">
            <int:poller fixed-delay="50" 
                        task-executor="binding-thread"/>
            <c24:unmarshalling-transformer 
    
              id="c24Mt541UnmarshallingTransformer"
              source-factory-ref="textualSourceFactory"
              model-ref="mt541Model"/>
        </int:chain>
    
    </beans>

     

    Summary


    Whatever type of message based integration system you are designing, once you get beyond the use of sample code for prototyping a technology, and let's face it, there are millions of lines of code out there that are copied from trivial examples, you need to consider invocation in the face of technical exceptions, business exceptions & non-responsive services. Using Spring Integration Gateways to represent entry to message sub-systems means that we must be able to cope with all of these types of behaviour. The Gateway Adapter pattern allows a single, central processing location for such conditions.


    The intent for the Gateway Adapter should be clear from the example configuration and code provided but can also be run and tested using the full source located on GitHub. Please leave any feedback or comments as you see fit.
    Published at DZone with permission of Matt Vickery, author and DZone MVB. (source)

    (Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)