Wednesday, November 13, 2013

Adding New Modules to Spring XD Part 1

Spring XD is a unified, distributed, and extensible system for data ingestion, real time analytics, batch processing, and data export. The project’s goal is to simplify the development of big data applications.
So a question popped into my head, "Can XD handle all the ways that data can be transmitted or pulled from existing infrastructure, new data sources or devices?  While XD has several ingestors, processors and exporters already present, what if the data comes from a source that it doesn't currently support?  So for part 1, we'll discuss how to add  modules that are not in XD, but are present in Spring Integration, without writing any code.  Part 2 will discuss how to create a module, that is not already present in Spring Integration.  

Getting Started

If you haven't downloaded and installed Spring XD, this article does a great job of covering installation and the basics:


XD supports 3 types of modules:

The Example

I want to gather earthquake data for the last hour from the USGS then show that data in the log.  The USGS offers a Restful service that returns current and historical earthquake data.  So I guess step one is is to breakout my IDE and start coding some Java/Groovy since this source isn't here.... WAIT!   XD is built on existing Spring projects (Spring Batch, Spring Data, Spring Integration,...).  So can't I take advantage of that?  And the answer is yes.  Spring integration offers several adapters and gateways.   So in this case we will take advantage of these features.  

The Solution 

So to get this data I'm going to use the HttpOutboundGateway offered by Spring Integration.  
The HttpOutboundGateway is used because it sends a data request outside our application and captures the results from the data provider

So lets look at usgs.xml .

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns=""

     <int-http:outbound-gateway id='USGSGateway'

     <channel id="output"/>
     <channel id="input"/>


In the example above we see that we created a HTTPOutboundGateway that will execute a GET against  the hourly feed from the USGS with a reply time out of 5 seconds and expects a UTF-8 string result.
The input channel will accept a message from XD kicking off our USGS module and the output from the USGS will be placed on the output-channel.


  1. Now copy the usgs.xml to the {xd.home}/xd/modules/processor. 
  2. Startup XD {xd.home}/xd/bin/xd-singlenode
  3. Startup XD-Shell {xd.home}/shell/bin/xd-shell
  4. From the shell let's create a stream to kick off a earthquake request It should look something like this:
xd:> stream create myusgsStream --definition "trigger |usgs|log"

From the xd-single-node we will see the earthquake json data output to log/console:  For example:
14:51:00,330  WARN task-scheduler-3 logger.myusgsStream:145 - {"type":"FeatureCollection","metadata":{"generated": 1378925455000,"url": "","title": "USGS All Earthquakes, Past Hour","subTitle": "Real-time, worldwide earthquake list for the past hour", "cacheMaxAge": 60},"features":[{"type":"Feature","properties":{"mag":1.3,"place":"36km NNE of Talkeetna, Alaska","time":"1378924976000","updated":"1378925341225","tz":-480,"url":"","felt":null,"cdi":null,"mmi":null,"alert":null,"status":"AUTOMATIC","tsunami":null,"sig":"26","net":"ak","code":"10803891","ids":",ak10803891,","sources":",ak,","types":",general-link,geoserve,nearby-cities,origin,tectonic-summary,","nst":null,"dmin":null,"rms":"0.13","gap":null,"magnitudeType":"Ml"},"geometry":{"type":"Point","coordinates":[-149.8367,62.6285,51.2]},"id":"ak10803891"}]}
WOOHOO You did it!

So what did we do here:

First by copying the usgs.xml to the {xd.home}/modules/process directory we declared that the usgs.xml is the usgs processor.  The source for our stream is the trigger, the processor is your usgs module, and the sink is the log which will write the earthquake data to the XD log.  Also anytime you deploy  a new module you will have to bounce XD so that it will recognize the new module. 

Lets make the USGS module a little bit more generic.

First off lets change the name of our usgs module to httpsource, and this can be done by renaming usgs.xml to httpsource.xml.   Now that the name is changed lets replace the usgs url with a parameter with a default.  In this case replace url=''  with url='${url:}' .    Now we can send any url to the httpsource from the XD-Shell.   For example: 
xd:> stream create myusgsStream --definition "trigger |usgs --url=''|log"
However if I decide not to put the --url parameter it will default to the url.  For example:
xd:> stream create myusgsStream --definition "trigger |usgs |log"


  1. What is the typical location of {xd.home} on a Mac?

    1. when you run xd-singlenode, it shows following information on my Mac

      Using local mode JMX is disabled for XD components
      XD Configuration:

      so XD_HOME=/usr/local/Cellar/springxd/1.0.0.M4/libexec/xd

  2. This comment has been removed by the author.

  3. Can be HttpOutboundGateway defined as a Source instead of as a Processor?

  4. Hi, I am not able to deploy and getting SpringXDException: could not find module with name usgs and type processor.

    I followed above given steps for deployement.
    and do we need to set any environment variable like XD_HOME and if yes where should it point?

    I read other posts for deployment but not working. It seems things have changed in Spring XD

    1. I am working on windows with spring-xd-1.1.2.RELEASE

  5. This comment has been removed by the author.

  6. xd:>stream create myusgsStream --definition "trigger |usgs|log"
    Command failed Could not find module with name 'log' and type 'sink'.. while creating the stream i am facing this error. kindly help me with this error