Doing Nothing
Tuesday, 26 August 2014
Tuesday, 25 September 2012
Integrating JMS message queues with Spring Integration
Overview
A standalone java client connects to JMS queue running in a Geronimo App Server. The standalone client uses Spring Integration framework to enable the communication to the JMS queue.
The current Proof-Of-Concept uses the Apache Active MQ message queues
which are supported by default in Apache Geronimo server and that is why the Apache
Active MQ core libraries are included in the client application alongwith the
JMS specification libraries. In case of other JMS implementations (like those
on Tibco EMS etc) those specific libraries (related to communication from
client app) will need to be included in the client application so that the
messages are sent using the JMS protocol.
Maven configuration for the application (standalone client)
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>eai</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>eai</name>
<description>EAI Interaction</description>
<dependencies>
<dependency>
<groupId>javax.jms</groupId>
<artifactId>com.springsource.javax.jms</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>3.1.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>2.1.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jms</artifactId>
<version>2.1.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
<version>2.1.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.1</version>
</dependency>
</dependencies>
</project>
Integration Flow
As seen in Springsource Tool Suite
Flow in STS |
The following are the individual components of the Spring Integration Flow/Pipeline:
- A message driven channel adapter to consume the messages from JMS queue
- An inbound channel to channel the messages from the inbound adapter
- A Service Adapter message handler POJO to handle the messages. The Service Adapter is linked to the inbound channel to receives and an outbound channel on which to send the messages
- An outbound channel to channel the messages to the outbound adapter
- An outbound channel adapter to send messages to JMS queue
And this is the source representation of the above (app-context.xml)
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.1.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.1.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.1.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">
<import resource="jms-listener-container-config.xml"/>
<int:annotation-config/>
<int:channel id="inbound"/>
<int:channel id="outbound"/>
<int-jms:message-driven-channel-adapter id="jmsIn"
channel="inbound"
container="messageListenerContainer"
acknowledge="transacted"></int-jms:message-driven-channel-adapter>
<int-jms:outbound-channel-adapter id="jmsOut"
channel="outbound"
connection-factory="remoteConnectionFactory"
destination="receiveQueue"
delivery-persistent="true" ></int-jms:outbound-channel-adapter>
<int:service-activator input-channel="inbound"
output-channel="outbound"
ref="messageHandler"
method="onMessage"
requires-reply="true"></int:service-activator>
<bean id="messageHandler" class="com.mujo.eai.MessageHandler"></bean>
</beans>
Observations on above configuration:
- A Message Driven Channel Adapter has been used instead of Inbound Channel Adapter. The Message Channel Adapter can be used in concert with a Message Listener Container. For more details refer: http://static.springsource.org/spring-integration/reference/htmlsingle/#jms-message-driven-channel-adapter
- Instead of a Transaction Manager, the acknowledge property has been set to "transacted". Spring recommends this when not using a transaction manager.
The JMS Listener configuration and JNDI configuration (jms-listener-container-config.xml) is as follows:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:jee="http://www.springframework.org/schema/jee"
xsi:schemaLocation="http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.1.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.1.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="remoteJndiTemplate" class="org.springframework.jndi.JndiTemplate" lazy-init="false">
<property name="environment">
<props>
<prop key="java.naming.provider.url">tcp://localhost:61616</prop>
<prop key="java.naming.factory.url.pkgs">org.apache.activemq.jndi</prop>
<prop key="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</prop>
<prop key="connectionFactoryNames">DefaultActiveMQConnectionFactory,QueueConnectionFactory</prop>
<prop key="queue.SendReceiveQueue">org.apache.geronimo.configs/activemq-ra/JCAAdminObject/SendReceiveQueue</prop>
<prop key="queue.SendQueue">org.apache.geronimo.configs/activemq-ra/JCAAdminObject/MDBTransferBeanOutQueue</prop>
</props>
</property>
</bean>
<bean id="remoteConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean" lazy-init="false">
<property name="jndiTemplate" ref="remoteJndiTemplate"/>
<property name="jndiName" value="QueueConnectionFactory"/>
<property name="lookupOnStartup" value="true" />
<property name="proxyInterface" value="javax.jms.ConnectionFactory" />
</bean>
<bean id="receiveQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0">
<value>SendReceiveQueue</value>
</constructor-arg>
</bean>
<bean id="sendQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0">
<value>SendQueue</value>
</constructor-arg>
</bean>
<bean id="messageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="remoteConnectionFactory"/>
<property name="destination" ref="receiveQueue"/>
<property name="sessionTransacted" value="true"/>
<property name="maxConcurrentConsumers" value="1"/>
<property name="concurrentConsumers" value="1"/>
<property name="receiveTimeout" value="5000"/>
<property name="recoveryInterval" value="60000"/>
<property name="autoStartup" value="true"/>
</bean>
</beans>
- A Spring JMS message listener container is configured for receiving the messages
- The Message Driven Channel Adapter then refers to the message listener container
- The Spring JNDITemplate is configured with the JNDI for JMS queues on Geronimo
- Here we are using the default JMS queues available in Geronimo which are built on Active MQ
- The connection factory refers to the JNDITemplate
- The Message Listener container refers to the connection factory
- The concurrent consumers value needs to be set in tune with the JVM settings since each consumer is a JVM thread that consumes memory and CPU. For our test purpose this has been set to 1
- Note the bean lazy-init has been set to false for the connection factory and JNDI lookup. This can be set to true (i.e. default) in a real application where on startup it is possible that the remote JMS queue is not available which can prevent the application from starting up.
- The recovery interval has been set to 60000 ms i.e. after every 1 minute the message listener will check whether the JMS queue is available. This is in case the JMS queue becomes unavailable for whatever reasons.
Running the Application
Java class with main method
public class App {
public static void main(String args[]) {
AbstractApplicationContext context = new ClassPathXmlApplicationContext("META-INF/spring/app-context.xml");
context.registerShutdownHook();
}
}
MessageHandler POJO
public class MessageHandler {
public String onMessage(String message) {
System.out.println("In onMessage");
System.out.println(message);
String payload = message;
return payload;
}
}
Geronimo Console
The “JMS Resource” link in the LHS tree show the current JMS connection factories and queues configured
Sending Message on JMS Queue using Geronimo Console
The standalone client will receive the message from the JMS queue and send it back to the same queue. This will be repeated till the application is terminated.
Hope the above example is useful to someone.
Download Maven Project with Source Code
Labels:
JMS,
Spring Integration
Subscribe to:
Posts (Atom)