Tuesday 25 September 2012

Integrating JMS message queues with Spring Integration

Overview


A standalone java client connects to JMS queue running in a Geronimo App Server. The standalone client uses Spring Integration framework to enable the communication to the JMS queue.



The current Proof-Of-Concept uses the Apache Active MQ message queues which are supported by default in Apache Geronimo server and that is why the Apache Active MQ core libraries are included in the client application alongwith the JMS specification libraries. In case of other JMS implementations (like those on Tibco EMS etc) those specific libraries (related to communication from client app) will need to be included in the client application so that the messages are sent using the JMS protocol. 

Also note that the Apache Active MQ jars contain the basic JNDI implementation to lookup the JMS queues on the Apache Geronimo server and its specific protocol needs to be mentioned in the Spring JNDI template configuration.

Maven configuration for the application (standalone client)


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
  
    <artifactId>eai</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>eai</name>
    <description>EAI Interaction</description>
  
    <dependencies>
        <dependency>
            <groupId>javax.jms</groupId>
            <artifactId>com.springsource.javax.jms</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>3.1.2.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
            <version>2.1.3.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-jms</artifactId>
            <version>2.1.3.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
            <version>2.1.3.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-core</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>1.1.0</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.6.1</version>
        </dependency>
    </dependencies>
    
</project>

Integration Flow

As seen in Springsource Tool Suite

Flow in STS



The following are the individual components of the Spring Integration Flow/Pipeline:
  • A message driven channel adapter to consume the messages from JMS queue
  • An inbound channel to channel the messages from the inbound adapter
  • A Service Adapter message handler POJO to handle the messages. The Service Adapter is linked to the inbound channel to receives and an outbound channel on which to send the messages
  • An outbound channel to channel the messages to the outbound adapter
  • An outbound channel adapter to send messages to JMS queue

And this is the source representation of the above (app-context.xml)


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xmlns:int="http://www.springframework.org/schema/integration"
    xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.1.xsd
        http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.1.xsd
        http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.1.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">


    <import resource="jms-listener-container-config.xml"/>

    <int:annotation-config/>
        
    <int:channel id="inbound"/>
    <int:channel id="outbound"/>
    
    <int-jms:message-driven-channel-adapter id="jmsIn" 
                                            channel="inbound" 
                                            container="messageListenerContainer" 
                                            acknowledge="transacted"></int-jms:message-driven-channel-adapter>
    
    <int-jms:outbound-channel-adapter id="jmsOut" 
                                      channel="outbound"
                                      connection-factory="remoteConnectionFactory" 
                                      destination="receiveQueue" 
                                      delivery-persistent="true" ></int-jms:outbound-channel-adapter>
                                      
    <int:service-activator  input-channel="inbound"
                            output-channel="outbound" 
                            ref="messageHandler"
                            method="onMessage" 
                            requires-reply="true"></int:service-activator>
    
    <bean id="messageHandler" class="com.mujo.eai.MessageHandler"></bean>
    
</beans> 

Observations on above configuration:

The JMS Listener configuration and JNDI configuration (jms-listener-container-config.xml) is as follows:


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xmlns:jee="http://www.springframework.org/schema/jee"
    xsi:schemaLocation="http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.1.xsd
        http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.1.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

    <bean id="remoteJndiTemplate" class="org.springframework.jndi.JndiTemplate" lazy-init="false"> 
        <property name="environment"> 
            <props> 
                <prop key="java.naming.provider.url">tcp://localhost:61616</prop>
                <prop key="java.naming.factory.url.pkgs">org.apache.activemq.jndi</prop>
                <prop key="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</prop>
                <prop key="connectionFactoryNames">DefaultActiveMQConnectionFactory,QueueConnectionFactory</prop>
                <prop key="queue.SendReceiveQueue">org.apache.geronimo.configs/activemq-ra/JCAAdminObject/SendReceiveQueue</prop>
                <prop key="queue.SendQueue">org.apache.geronimo.configs/activemq-ra/JCAAdminObject/MDBTransferBeanOutQueue</prop> 
            </props> 
        </property> 
    </bean> 

    <bean id="remoteConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean" lazy-init="false">
        <property name="jndiTemplate" ref="remoteJndiTemplate"/>
        <property name="jndiName" value="QueueConnectionFactory"/>
        <property name="lookupOnStartup" value="true" />
        <property name="proxyInterface" value="javax.jms.ConnectionFactory" />
    </bean>
    
    <bean id="receiveQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg index="0">
            <value>SendReceiveQueue</value>
        </constructor-arg> 
    </bean>
    
    <bean id="sendQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg index="0">
            <value>SendQueue</value>
        </constructor-arg> 
    </bean>
    
    <bean id="messageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="remoteConnectionFactory"/>
        <property name="destination" ref="receiveQueue"/>
        <property name="sessionTransacted" value="true"/>
        <property name="maxConcurrentConsumers" value="1"/>
        <property name="concurrentConsumers" value="1"/>
        <property name="receiveTimeout" value="5000"/>
        <property name="recoveryInterval" value="60000"/>
        <property name="autoStartup" value="true"/>         
    </bean>
     
</beans>
Note the following features of the above configuration:
  • A Spring JMS message listener container is configured for receiving the messages
  • The Message Driven Channel Adapter then refers to the message listener container
  • The Spring JNDITemplate is configured with the JNDI for JMS queues on Geronimo
  • Here we are using the default JMS queues available in Geronimo which are built on Active MQ
  • The connection factory refers to the JNDITemplate
  • The Message Listener container refers to the connection factory
  • The concurrent consumers value needs to be set in tune with the JVM settings since each consumer is a JVM thread that consumes memory and CPU. For our test purpose this has been set to 1
  • Note the bean lazy-init has been set to false for the connection factory and JNDI lookup. This can be set to true (i.e. default) in a real application where on startup it is possible that the remote JMS queue is not available which can prevent the application from starting up.
  • The recovery interval has been set to 60000 ms i.e. after every 1 minute the message listener will check whether the JMS queue is available. This is in case the JMS queue becomes unavailable for whatever reasons.

Running the Application

Java class with main method


public class App {

    public static void main(String args[]) {
        AbstractApplicationContext context = new ClassPathXmlApplicationContext("META-INF/spring/app-context.xml");
        context.registerShutdownHook();
    }

} 

MessageHandler POJO


public class MessageHandler {

    public String onMessage(String message) {
        System.out.println("In onMessage");
        System.out.println(message);
        
        String payload = message;
        
        return payload;
    }
}

Geronimo Console


The “JMS Resource” link in the LHS tree show the current JMS connection factories and queues configured

Sending Message on JMS Queue using Geronimo Console


The standalone client will receive the message from the JMS queue and send it back to the same queue. This will be repeated till the application is terminated.

Hope the above example is useful to someone.

Download Maven Project with Source Code