Spring Integration

Recently, we worked on a project that required frequent polling of database table, processing each row, and writing output to different database table(s). The data tables are incoming and outgoing sms messages, and load would get quite high sometimes. We also had to support load balancing. Since our services are already done with spring – the choice was to try Spring Integration.

So what is Spring Integration? “Spring Integration provides an extension of the Spring programming model to support Enterprise Integration Patterns. It enables lightweight messaging within Spring-based applications and supports integration with external systems via declarative adapters.”

Basic concepts when designing Spring Integration applications are Messages (java objects with metadata), Channels (pipes that messages go trough) and Message Endpoints.
Message endpoints can be various – like Transformer (transforms data), Router(route data), Filter (filters data), Splitter (splits messages), Aggregator (inverse of Splitter), Service Activator (connecting messages to Services) and most important Channel Adapters.

Channel Adapters connects Message Channels to “external” world. They can be inbound (create new messages) and outbound (consumes messages). The choice of connection type is pretty big – like AMQP, JDBC, Web Services, FTP, File, HTTP, TCP, UDP, JMS, Mail, MongoDb, RMI, XML, XMPP, Stream and even Twitter.

Spring Integration Sample
Spring Integration Sample

Back to our application. Its very easy to add polling of database table. Just add Jdbc Inbound Channel Adapter that will poll database table with query that you specify (load balancing is working as we are using oracles “select for update nowait skip locked” syntax which skips rows with read lock). Specify properties like channel (name of channel where will message go), data-source, row-mapper (converts table row to java objects), update (query that is executed after table is read – you want to delete row once it is processed or put some flag) and fixed-rate (polling interval). There is also possibility to add transaction support , or define cron expression instead of fixed rate polling, but we didn’t need that. What we needed is to add Thread pool with Spring Task Executor, and combine this with Spring Integration by setting message channel as ExecutorChannel


	<int-jdbc:inbound-channel-adapter query="${db.select.userrequest.query}"
		channel="smsInboundChannel" data-source="dataSource" row-mapper="userRequestMapper"
		update="${db.delete.userrequest.query}"  max-rows-per-poll="${db.poller.maxrowsperpoll}" >
			<int:poller fixed-rate="${db.poller.fixedrate}"  >
			</int:poller>
	</int-jdbc:inbound-channel-adapter>



That’s it. Now we just needed to make a Spring Integration chain. Add splitter to separate messages per row. Add Service Activator to process message. Finally add Jdbc Outbound Channel Adapter to write to database.The possibilities are numerous. And since everything is done using Spring concepts, the code is easily maintainable and testable.

0 comments