Guaranteed Delivery in Spring Integration

Wilfred Springer

This is not a rant against ESB. I am not saying that ESBs never have a purpose, nor suggest that it's all just a scam. If - after having read this post - you got the impression that I suspect a conspiracy behind ESB, then I want to tell you up front that this is certainly not what I intended to say.


The only claim I am actually making is that - if you are interested in implementing integration patterns - you can certainly achieve that with a whole lotta less. And as always, less clearly is more. You do not always need a monumental solution to solve an integration problem. Spring Integration and Camel are wonderful alternative solutions, and way less intrusive.

That doesn't mean that it's all just roses and sunshine. There are certainly things left to desire. One of the things that I find sort of missing in Spring Integration is an easy way to have guaranteed message delivery, so that's what this post is about.

Channels

One of Spring Integration's major abstractions is a channel. Think of it as a queue that is not necessarily backed by - well, a queue. It's just the place where you hand-of or receive a message, without the sender and receiver necessarily being aware of each other's existence. When the receiver reveives the message, then that message might be pulled from a buffer kept internally inside the channel (like in a QueueChannel), but it might as well be delivered in a synchronous manner, by the same thread that sent it to the channel (like in a DirectChannel). From a receiver's point of view, it's all the same.

Spring Integration Channel Abstractions

Spring Integration Channel Abstractions

Now, if you're dealing with a channel that is backed by a an actual in-memory BlockingQueue, then you do have some degree of reliability. You know the message will be delivered at least once, at most once, and that order is preserved, but only if your system is never shut down. If your system is shut down, either because you stop it yourself, or because of unexpected error conditions, all of the messages that are inside those channels can essentially be considered lost: whenever the system reboots, all of those queues will be empty.

Message loss in regular channels, versus KahaChannel

Message loss in regular channels, versus KahaChannel

Idempotency to the rescue?

You could argue that you could just replay all of your messages, and - as long as the receivers or intermediates are all idempotent receivers - everything will just work out fine, but then I tend to disagree. First of all, you do not want to replay your entire history of messages at system startup. So you probably need some mechanism to prevent messages that already traversed the entire chain of message handlers from being sent again. If idempotent receiver is the only trick up your sleeve, you end up doing a hell of a lot of work yourself, and it is not going to make your life any easier.

Apart from that, if you do to replay a fair selection of messages that entered your system in the past, then you still need the ability to reliably store the messages at the outer rim of your system. So even if you abstained from wanting reliable delivery guarantees inside your system, you still will not be able to avoid a reliable hand-off at the edge of your system. (That is, unless you convince your callers (business partners, external clients) to do all of this bookkeeping for you, but I bet you will not be able to pull that off.)

JMS to the rescue?

Now,  I said Spring Integration does not have support for guaranteed delivery, which actually was a lie. You can have reliable delivery, if you route your messages through JMS, and use a persistent queue. So, it can be done. However, why would you want the overhead of the JMS API and everything that sits behind it, if you could also implement the PollableChannel in a reliable way directly?

Kaha DB to the rescue!

So, that's what I did. I present to you, the KahaChannel. Kaha is ActiveMQ's data store backing persistent message queues. So essentially, all I did is rip out Kaha underneath ActiveMQ and implemented the PollableChannel interface on top of that. The result is a native PollableChannel that directly stores messages into ActiveMQ's highly optimized data store. And Kaha is file-based, so it does not require any other external datastore.

(Obviously, the PollableChannel interface could also be easily implemented on top of some other journalling system, such as Howl. However, Kaha seems to be way more active than any of the other journalling systems I am aware of.)

So now what?

Using the KahaChannel is fairly simple. It is just a drop-in replacement for any other PollableChannel. Unfortunately, it does not nicely blend in with the Spring Integration namespaces (yet), so you do need to construct an instance of a KahaChannel using vanilla Spring configuration.

So, if this used to be the configuration of your channel before:

    <si:channel id="inboxl">
        <si:queue capacity="1101"/>
    </si:channel>

Then the only thing to do to make your system a little bit more reliable is drop in this as a replacement:

    <bean id="inbox" class="nl.flotsam.spring.integration.kaha.KahaChannelFactory">
        <property name="directory" value="..."/>
    </bean>

The example above uses the KahaChannelFactory for constructing an instance. That's not a requirement. You can also construct it directly, by creating an instance of KahaChannel and passing in the parameters as constructor arguments, but then the name of the container constructed underneath will not automatically correspond to the id of the bean.

Performance

Replacing an in-memory buffer-based channel with a disk-based with reliable delivery guarantees obviously has its price. However, the performance penalty is not necessarily huge. It all depends on the size and complexity of your messages. If your messages cary hardly any data, then the penalty is fairly small: pumping 10.000 messages through an in-memory channel took 3.2 seconds on my system. Running the same messages through a KahaChannel took only 4.3 seconds. You do the math.

Encoding Messages

In order to make the KahaChannel really a reliable channel that will survive a crash, the messages obviously need to be stored. Unfortunately, Spring Integration does not define an abstraction for encoding and decoding messages yet.

The Kaha abstractions allow you to encode to DataOutput and decode from DataInput, so anything eventually needs to be mapped to something supporting these abstractions. Rather than trying to implement the ultimate way of encoding and decoding a message itself (if something like that would exist at all), the KahaChannel basically takes the stance that it is up to you to provide a sensible codec. And for that purpose, it defines its own message codec interface:

public interface MessageCodec<T> {
    void encode(Message<T> value, DataOutput out) throws IOException;
    Message<T> decode(DataInput in) throws IOException;
}

It does however also ship its own SimpleMessageCodec, which implements a ridiculously simple strategy for persisting your messages, only supporting basic types as int, boolean and String as types of the payload and headers.

Conclusions

Spring Integration itself does not provide a persistent message channel, guaranteeing that your messages will eventually arrive at the receiver, even in case of system restarts. However, if you leave that out, you end up with a toolbox that pretty much provides most of what you would expect from a fullblown ESB solution, at least in terms of its support for implementing integration patterns. A JMS Message Broker could be used to add durable message persistence to the mix.

However, despite the fact that Spring Integration has no support for persistent message channels built in, it does allow you to add your own. Implementing a persistent channel on top of those abstractions turns out to be fairly easy, if you rely on a data store such as Kaha. And having a lightweight native persistent message channel seems to be a more natural fit in Spring Integration's lightweight approach. In a sense, it is similar to the NMR in JBI implementations: you know your messages are safe once they are inside: it may be backed by a JMS Message Broker, but to the ESB developer, it's just a native component of the product.

Comments (12)

  1. jos - Reply

    November 27, 2009 at 11:57 am

    JBI NMR is not systematically backed by a persistent message store : NMR is used to manage communication Exchanges between endpoints and support patterns (in-out, in-only, ...) and sync or async communication, the exchange end state tells the initiator if it was successful or not.
    NMR can be persistent (slow) or SEDA based (fast but not persistent). In this case safe endpoints must deal with persistency and retry (such as with JMS endpoint for instance) to manage exchange failures.
    Regards

  2. Mark Fisher - Reply

    November 27, 2009 at 8:20 pm

    Nice post! We should be able to support a custom "queue" strategy within Spring Integration such that namespace support would be available with something like a "ref" attribute or inner-bean defined within a queue element. On one hand, with the introduction of MessageStore in 2.0, we will have a persistence option (via a DataSource-backed implementation), but that does not provide queue characteristics inherently. On the other hand, we could simply delegate to any java.util.BlockingQueue reference, but that defines more methods than we really need.

    As in your example, all that is really needed is something to support "addLast" and "removeFirst", and *timeouts* for each if supported by the underlying implementation. These could be mapped directly to our channel's send and receive calls or we could provide something like a BlockingQueueAdapter that does implement BlockingQueue but then delegates to such a strategy interface. In any case, my goal would be to not only support this with a single strategy hook but also to provide one or more persistent options out of the box within the 2.0 timeframe.

    Regards,
    Mark

  3. Erik Rozendaal - Reply

    November 30, 2009 at 10:32 am

    So how much faster is the KahaChannel compared to the JMS one?

  4. Wilfred Springer - Reply

    November 30, 2009 at 10:39 am

    I haven't tried yet. 🙂

  5. Age Mooy - Reply

    November 30, 2009 at 10:57 am

    Why would you not use a standardized API implemented by the same backend you're talking to now ? Not invented here ?

  6. Wilfred springer - Reply

    November 30, 2009 at 9:43 pm

    Basically cutting out abstractions and making things a little bit more understandable. Nobody likes the JMS APIs, but we still accept it in between everything we do. If I can do with less, then I'd prefer to do so.

    Another argument is performance. It turns out that we loose a freaking amount of cycles nowadays with piles of pointless abstractions.

    And then of course, I didn't invent ActiveMQ. 😉 But to my defense I will say that I didn't invent Kaha and I didn't invent Spring Integration either. In fact, I didn't invent anything at all, so this could never have been the not invented here syndrome manifesting itself.

  7. Mark Fisher - Reply

    December 2, 2009 at 3:32 pm

    Wilfred,

    I would be very interested to see the results of some crude performance tests between your Kaha-only channel, and Spring Integration's new JMS-backed channel using ActiveMQ. That would be a great way to measure the "freaking amounts of cycles" 😉

    Whatever performance difference exists could be considered in the trade-off, but on the other side, there are additional benefits of the "full" ActiveMQ solution (queue management/monitoring via JMX, the ability to *share* a channel within a load-balancing/failover strategy, transactions with redelivery, etc).

    Regards,
    Mark

  8. Kshitiz - Reply

    September 3, 2010 at 6:29 pm

    Hi Wilfred,

    Thanks for the informative post!

    I am aiming to create a notification server (a pub/sub and p2p model) and excited to use Spring Integration's features. To make that notification server an isolated black box, I m thinking to expose its functionality thru rest based APIs. Since ActiveMQ is highly configurable so probably going to use it for the message management part.

    Do You think that it's a feasible & a wise decision to go for an architecture like this considering scalability & extensibilty ?

    [edited on 6th Sep] External systems > Restful APIs on Tomcat > Spring Integration acting as lightweight ESB > ActiveMQ (+MySql for message persistence and tomcat i.e. not the ‘embedded’ jetty) > Spring Integration > Restful APIs > External systems

    What all pros & cons do you suggest in this scenario.....

    Many thanks,
    Kshitiz

  9. Shashank - Reply

    December 13, 2010 at 3:50 am

    Hi All,
    We have a requirement to use SI to have guranteed delivery. By Guranteed
    deliver I mean the best effort like a retry policy in case of exceptions.

    We thought of using a JMS adaptor in front of the channel and in case of exceptions in service activator the message gets back to the queue and gets retried.

    Issue arises if we are running something async in the service activator .So not sure what could be the possibilities in that case. Do we execute a poller or depend on a callback. In either case the message context has to be persisted for that period..

    Thoughts ??

    Regards
    Shashank

  10. Iwein Fuld - Reply

    March 4, 2011 at 10:57 am

    @Shashank: it is possible to take a message from a jms backed queue in a transaction by setting the transaction manager on the poller. If you ensure that "running something async" doesn't mean that the transaction is closed prematurely, then there is no problem. This does mean you'd have to wait for the asynchronous process to complete, for example using a gateway (which will automatically wait for the reply message).

    In most cases however, having to wait for the process to complete naturally results in asynchronous processing not being that much of an advantage. I'd recommend that you use a synchronous process at first, that keeps things much simpler.

  11. srini - Reply

    March 7, 2011 at 7:48 am

    Hi All,
    require to persist message with payload in database ,that flow thorugh spring integration jms message channels,some of solutions may be message store,kahaDB,jdbc with transactional .

    approaches with pros and cons ,any other soultions ?

    Thanks in Advance
    Srini

  12. Raghu - Reply

    August 31, 2011 at 6:26 pm

    Hi All,
    Thanks for posting this informative article. I was wondering if one could use KahaDB as a persistent message store in a cluster.
    We are planning to deploy Spring Integration in a clustered environment.
    Can multiple nodes share the same KahaDB.

    Thanks in Advance

Add a Comment