Running ActiveMQ using Spring

Since the ActiveMQ web site is not really clear on how to integrate it with the Spring framework I decided to write this post to clarify some points.
The really good news I can already point out is that it is possible running a JMS inside a servlet container like Tomcat without any JCA adapter, so you do not need Jencks or similar.
Furthermore the configuration of a complete running environment can be completely embedded in a Spring configuration file and a very easy way. Here is an example:

<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms"    
xsi:schemaLocation="http://www.springframework.org/schema/beans <a href="http://www.springframework.org/schema/beans/spring-beans.xsd" title="http://www.springframework.org/schema/beans/spring-beans.xsd">http://www.springframework.org/schema/beans/spring-beans.xsd</a> <a href="http://www.springframework.org/schema/jms" title="http://www.springframework.org/schema/jms">http://www.springframework.org/schema/jms</a> <a href="http://www.springframework.org/schema/jms/spring-jms-2.5.xsd"><br />
<amq:broker" title="http://www.springframework.org/schema/jms/spring-jms-2.5.xsd"><br />
<amq:broker">http://www.springframework.org/schema/jms/spring-jms-2.5.xsd"><br />
<amq:broker</a> brokername="test-broker" start="true">
  <amq:persistenceAdapter>
      <amq:amqPersistenceAdapter directory="/opt/activemq" maxFileLength="32mb"/>
  </amq:persistenceAdapter>
  <amq:transportconnectors>
      <amq:transportconnector uri="tcp://localhost:7171"/>
  </amq:transportconnectors>
</amq:broker>
<amq:connectionFactory id="amqConnectionFactory"  brokerURL="vm://test-broker"/>
<bean class="org.springframework.jms.connection.CachingConnectionFactory" id="connectionFactory">
  <constructor-arg ref="amqConnectionFactory"/>
  <property name="sessionCacheSize" value="100"/>
</bean>    
<amq:queue physicalName="testQueue" />
<bean class="org.springframework.jms.core.JmsTemplate"  id="jmsTemplate">
  <constructor-arg ref="connectionFactory"/> 
</bean>
<bean class="packageName.className" id="queueProducer">
 <property name="jmsTemplate" ref="jmsTemplate"/>
 <property name="queueName" value="testQueue"/>
</bean>
<bean class="packageName.className" id="queueListener"/>    
<jms:listener-container concurrency="10"
 connectionfactory="connectionFactory">
  <jms:listener destination="testQueue" ref="queueListener"/>
</jms:listener-container>
</beans>



Let's analyze all the tags, one by one:
<amq:broker brokername="test-broker" start="true">
  <amq:persistenceAdapter>
    <amq:amqPersistenceAdapter directory="/opt/activemq"
        maxFileLength="32mb"/>
   </amq:persistenceAdapter>
  <amq:transportconnectors>
    <amq:transportconnector uri="tcp://localhost:7171"/>
  </amq:transportconnectors>
</amq:broker>

This is the definition for an embedded JMS broker called test-broker which listens on port 7171 using the tcp protocol and which persist data using the default AMQ Message Store in the /opt/activemq directory.


<amq:connectionFactory id="amqConnectionFactory" brokerURL="vm://test-broker"/>

JMS connection factory which connects to the broker using a VM transport. In this way the communication is made at JVM level avoiding network overhead


<amq:queue id="testQueue" physicalname="TestQueue">

Defines the queue we are going to use


<bean class="org.springframework.jms.connection.CachingConnectionFactory" id="connectionFactory">
   <constructor-arg ref="amqConnectionFactory"/>
   <property name="sessionCacheSize" value="100"/>
</bean>

This is the connection factory we are really going to use in our application. It caches connections, sessions and even MessageProducer. It is important to set the value of the sessionCacheSize property since the default value is 1.


<bean class="org.springframework.jms.core.JmsTemplate" id="jmsTemplate">
   <constructor-arg ref="connectionFactory"></constructor-arg>
</bean>

For those who know the Spring integration with Hibernate this class should sound familiar since it has a very similar design to the Hibernate Template.
It hides most of the JMS-related level details like obtaining a session or handling acknowledgments. By default session are not transacted and auto-acknowledge.
The connectionFactory is passed as a property to the constructor.
One of its most important method is send(String destinationName,MessageCreator messageCreatpr): MessageCreator is an interface defined by Spring having only the createMessage(Session session) method, its output is the JMS message that will be sent to destinationName.


<bean class="packageName.className" id="queueProducer">
    <property name="jmsTemplate" ref="jmsTemplate"></property>
    <property name="queueName" value="testQueue"></property>
</bean>

We finally find an object coming from our application, it is the producer of the queue and send messages to it through the jmsTemplate.


<bean class="packageName.className" id="queueListener"/>

The consumer of ours queue. It needs to implement the javax.jms.MessageListener interface (or some Spring specific interface like SessionAwareMessageListener).
In Spring terms it is a Message Driven Pojo, the difference with a standard Message Driven Bean is that it does not need to run in an EJB container.


<jms:listener-container concurrency="1" connection-factory="connectionFactory">
    <jms:listener destination="AggregateQueue" ref="queueListener"></jms:listener>
</jms:listener-container>

This is the bean who makes the magic: it registers itself as a listener for all the queue's and execute the callback on the listener as a message is received in an asynchronous way using a Spring TaskExecutor. The concurrency attribute defines how many MessageConsumer to create for each listener. If you use a value > 1 remember to set the prefetchLimit to 1 in order to avoid undelivered messages to the consumers.







Have fun :)

Trackback URL for this post:

http://www.func.nl/trackback/110

Nice writeup, I wrote

Nice writeup, I wrote something similar up at http://codedependents.com/2009/10/16/efficient-lightweight-jms-with-spri... and I have a few more additions to make to it coming up. For instance, I figured out when you want to use the CachingConnectionFactory from Spring vs the PooledConnectionFactory from amq.

I've also gone back and edited the activemq.apache.org wiki to hopefully make more of this clearer. I'd love any feed back you have on the online documentation.

Great stuff

Thanks so much for this; it's been a huge help.

One extra thing: how much work is it to replace the local consumer with a send to another ActiveMQ broker? I'm trying to use this configuration as a way of implementing local persistence. So:

Instead of local app sending to remote broker, now local app sends to local persistent broker, which sends to remote broker when connection available.

Any pointers very helpful! Thanks anyway.

Post new comment

The content of this field is kept private and will not be shown publicly.
  • Web page addresses and e-mail addresses turn into links automatically.
  • Lines and paragraphs break automatically.
  • You can enable syntax highlighting of source code with the following tags: <code>, <blockcode>. Beside the tag style "<foo>" it is also possible to use "[foo]".