Skip to main content

..Where Apache Camel meets Spring JMS

This post is relevant for developers who are aware of Apache Camel and use its JMS component to consume from a JMS broker. We are only discussing the message consumption part here. I'm assuming you are conversant with Camel's terminology of routes, endpoints, processors and EIPs. If not, following is a good place to start:

Link --->  Apache Camel Home 

Ever wondered what happens underneath when a Camel consumer route is defined to take messages from a JMS endpoint ? This post is an attempt to explain how Camel integrates with JMS via Spring APIs and how the business logic defined in a Camel route is invoked ?

Following is a simplified sequence diagram showing the set-up process which will eventually lead to message consumption from JMS broker.
.

Creation of JMS endpoints in a camel context is initiated by definition (via Java/XML DSL). A JMS endpoint takes a number of configurable attributes as seen here Camel JMS Component.

As soon as context is bootstrapped, if it finds a JMS endpoint defined in a "from" clause, the Camel machinery will try to create a consumer endpoint (JMSEndpoint and JMSConsumer instance) which internally refers to a DefaultMessageListenerContainer instance.

The container IS-A specialization of Spring's well known DefaultMessageListenerContainer. The initialization process also internally installs a javax.jms.MessageListener impl via EndpointMessageListener which comes from Camel API.


EndpointMessageListener is the entry point from where our business route definition will be invoked later on.


Another important thing which happens is the specification of task executor which is a cached thread pool by default. On this pool all the message consumption and "onMessage" call back will happen.

As soon as the JMSConsumer will start (auto start by deafult), it will post N number of AsyncMessageListenerInvoker instances on the task execution pool. These instances are obviously Runnables.

Also N = no. of concurrent consumers which we had specified in endpoint configuration


From here Spring will take over...



AsynMsgListenerInvoker IS-A Runnable which was posted on the task executor. It keeps polling the JMS broker via the message listener container (because container IS-A polling consumer as well, see the 1st diagram).

As soon as a message is received, the EndpointMessageListener is invoked which has a reference to the first Processor in route definition hence triggering the business logic.

We must appreciate the fact that "onMessage" is invoked in a try-catch block which captures all Throwables. That means Spring always has an opportunity to deal with issues which remained un-handled in our business route.

Should there be any exceptions caught by Spring, it will rollback the JMS transaction if session was transacted. In case of no errors, the transaction will be committed. In case of non-transacted sessions, no rollback/commit takes place.

In any case, if there is an error, the "onMessage" handler of EndpointMessageListener will invoke route specific ErrorHandler (in sync or async manner), and will propagate back to Spring which will catch it and deal with it.


This discussion shows the points in time when a JMS commit or rollback actually takes place and how Spring gracefully deals with errors.




Comments

Popular posts from this blog

C++11 std::thread Construction and Assignment Tricks

C++11 arrived with std::thread and related classes built within the standard. With the move semantics in place, the construction and assignment practices of 'std threads' deserve a small write up. Let's define a 'Runnable' first: class Runnable {    public:       virtual void operator ()() = 0;       virtual ~Runnable(){} }; class Task : Runnable {      public:          void operator ()()          {               //do sth here..          } };                                                 //later..      #include <thread>     Task runnable;     std::thread t1; //not attached to any thread                                                 std::thread t2 ( runnable ); //attached and running   Copy Construction of std::thread is not allowed            std::thread t3 ( t2 ); // compilation error            std::thread t4 = t2; // compilation error error : use of deleted function std::thread (const std::th

C++ logging using Apache Log4cxx on Linux

I'd demonstrate the set up of Apache log4cxx on Ubuntu 12.x in following steps: 1. Download all required packages 2. Build Log4cxx via Apache Maven 3. Use the libraries in a NetBeans C++ project on Ubuntu Linux. Reference URL: http://www.yolinux.com/TUTORIALS/Log4cxx.html This URL is quite detailed and explains things for other *nix operating systems as well. I wanted to start with minimum steps required, hence this post. I have a Windows 7 desktop and have Ubuntu 12.x 64-bit running on it via Oracle Virtualbox. So Ubuntu is running as a guest on my Windows 7 host. [The reference URL mentions different versions of  'libaprXX' libs but we have to use 'libapr1-dev' and 'libaprutil-dev', will see that later.] Prerequisites (install all of the following one by one) autoconf and automake --> sudo apt-get install autoconf automake libxml2 and libxml2-devel ( http://xmlsoft.org/ ) --> sudo apt-get install libxml2 libxml2-dev gmp (