Wednesday, November 13, 2013

Adding New Modules to Spring XD Part 1

Spring XD is a unified, distributed, and extensible system for data ingestion, real time analytics, batch processing, and data export. The project’s goal is to simplify the development of big data applications.
So a question popped into my head, "Can XD handle all the ways that data can be transmitted or pulled from existing infrastructure, new data sources or devices?  While XD has several ingestors, processors and exporters already present, what if the data comes from a source that it doesn't currently support?  So for part 1, we'll discuss how to add  modules that are not in XD, but are present in Spring Integration, without writing any code.  Part 2 will discuss how to create a module, that is not already present in Spring Integration.  

Getting Started

If you haven't downloaded and installed Spring XD, this article does a great job of covering installation and the basics:

Modules

XD supports 3 types of modules:

The Example

I want to gather earthquake data for the last hour from the USGS then show that data in the log.  The USGS offers a Restful service that returns current and historical earthquake data.  So I guess step one is is to breakout my IDE and start coding some Java/Groovy since this source isn't here.... WAIT!   XD is built on existing Spring projects (Spring Batch, Spring Data, Spring Integration,...).  So can't I take advantage of that?  And the answer is yes.  Spring integration offers several adapters and gateways.   So in this case we will take advantage of these features.  

The Solution 

So to get this data I'm going to use the HttpOutboundGateway offered by Spring Integration.  
Note
The HttpOutboundGateway is used because it sends a data request outside our application and captures the results from the data provider

So lets look at usgs.xml .

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns:beans="http://www.springframework.org/schema/beans"
 xmlns:int-http="http://www.springframework.org/schema/integration/http"
 xsi:schemaLocation="http://www.springframework.org/schema/beans
  http://www.springframework.org/schema/beans/spring-beans.xsd
  http://www.springframework.org/schema/integration
  http://www.springframework.org/schema/integration/spring-integration.xsd
      http://www.springframework.org/schema/integration/http
     http://www.springframework.org/schema/integration/http/spring-integration-http.xsd">

     <int-http:outbound-gateway id='USGSGateway'
          request-channel='input'
          url='http://earthquake.usgs.gov/earthquakes/feed/geojson/all/hour'
          http-method='GET'
          expected-response-type='java.lang.String'
          charset='UTF-8'
          reply-timeout='5000'
          reply-channel='output'>    
     </int-http:outbound-gateway>

     <channel id="output"/>
     <channel id="input"/>

</beans:beans>

In the example above we see that we created a HTTPOutboundGateway that will execute a GET against  the hourly feed from the USGS with a reply time out of 5 seconds and expects a UTF-8 string result.
The input channel will accept a message from XD kicking off our USGS module and the output from the USGS will be placed on the output-channel.

Deploy

  1. Now copy the usgs.xml to the {xd.home}/xd/modules/processor. 
  2. Startup XD {xd.home}/xd/bin/xd-singlenode
  3. Startup XD-Shell {xd.home}/shell/bin/xd-shell
  4. From the shell let's create a stream to kick off a earthquake request It should look something like this:
xd:> stream create myusgsStream --definition "trigger |usgs|log"

From the xd-single-node we will see the earthquake json data output to log/console:  For example:
14:51:00,330  WARN task-scheduler-3 logger.myusgsStream:145 - {"type":"FeatureCollection","metadata":{"generated": 1378925455000,"url": "http://earthquake.usgs.gov/earthquakes/feed/geojson/all/hour","title": "USGS All Earthquakes, Past Hour","subTitle": "Real-time, worldwide earthquake list for the past hour", "cacheMaxAge": 60},"features":[{"type":"Feature","properties":{"mag":1.3,"place":"36km NNE of Talkeetna, Alaska","time":"1378924976000","updated":"1378925341225","tz":-480,"url":"http://earthquake.usgs.gov/earthquakes/eventpage/ak10803891","felt":null,"cdi":null,"mmi":null,"alert":null,"status":"AUTOMATIC","tsunami":null,"sig":"26","net":"ak","code":"10803891","ids":",ak10803891,","sources":",ak,","types":",general-link,geoserve,nearby-cities,origin,tectonic-summary,","nst":null,"dmin":null,"rms":"0.13","gap":null,"magnitudeType":"Ml"},"geometry":{"type":"Point","coordinates":[-149.8367,62.6285,51.2]},"id":"ak10803891"}]}
WOOHOO You did it!

So what did we do here:

First by copying the usgs.xml to the {xd.home}/modules/process directory we declared that the usgs.xml is the usgs processor.  The source for our stream is the trigger, the processor is your usgs module, and the sink is the log which will write the earthquake data to the XD log.  Also anytime you deploy  a new module you will have to bounce XD so that it will recognize the new module. 

Lets make the USGS module a little bit more generic.

First off lets change the name of our usgs module to httpsource, and this can be done by renaming usgs.xml to httpsource.xml.   Now that the name is changed lets replace the usgs url with a parameter with a default.  In this case replace url='http://earthquake.usgs.gov/earthquakes/feed/geojson/all/hour'  with url='${url:http://earthquake.usgs.gov/earthquakes/feed/geojson/all/hour}' .    Now we can send any url to the httpsource from the XD-Shell.   For example: 
xd:> stream create myusgsStream --definition "trigger |usgs --url='http://www.cnn.com'|log"
However if I decide not to put the --url parameter it will default to the http://earthquake.usgs.gov/earthquakes/feed/geojson/all/hour url.  For example:
xd:> stream create myusgsStream --definition "trigger |usgs |log"

4 comments:

  1. What is the typical location of {xd.home} on a Mac?

    ReplyDelete
    Replies
    1. when you run xd-singlenode, it shows following information on my Mac

      Using local mode JMX is disabled for XD components
      XD Configuration:
      XD_HOME=/usr/local/Cellar/springxd/1.0.0.M4/libexec/xd
      XD_TRANSPORT=local
      XD_STORE=memory
      XD_ANALYTICS=memory
      XD_HADOOP_DISTRO=hadoop12

      so XD_HOME=/usr/local/Cellar/springxd/1.0.0.M4/libexec/xd

      Delete
  2. This comment has been removed by the author.

    ReplyDelete
  3. Can be HttpOutboundGateway defined as a Source instead of as a Processor?

    ReplyDelete