Recent Tutorials and Articles
    Apache Storm Integration with Apache Kafka
    Published on: 12th September 2016
    Posted By: Amit Kumar

    This tutorial will provide you with Java program to consume messages from Apache Kafka using Apache Storm.

    Abstract


    Now a days, quite a few systems are modeled using Event Stream processing paradigm. These applications see the client requests or state changes as set of Events (immutable messages). In order to achieve scalability and reliability, these events are then published on to a messaging system. This messaging system is subscribed by different applications responsible for processing the events.

    This paradigm requires us to integrate messaging systems with computing applications. In this tutorial, we will be looking at integration of Apache Kafka (a distributed publish-subscribe messaging system) with Apache Storm (a distributed real-time processing engine).

    Note: Instructions in this tutorial will only work with Kafka client api version 0.8.2.2 and below. Since Kafka has drastically changed its consumer api, new Storm Kafka client api was developed that we will cover in subsequent tutorials. You can subscribe to our tutorials to get notified through email whenever we publish a new tutorial.

     

    Pre-requisites


    Here are pre-requisites to follow instructions in this tutorial effectively -

    1. Basic knowledge of Apache Storm
    2. Basic knowledge of Apache Kafka
    3. Running cluster of Apache ZooKeeper (required by Kafka)
    4. Running cluster of Apache Kafka
    5. Eclipse with Maven Plugin

     

    Setting Up Maven Java Project


    Creating Maven Project:

    We will be starting with creating a Maven project in Eclipse IDE by following below steps - 

    1. Open New Project wizard in Eclipse IDE as shown below:

      New Maven Project
    2. On next screen, select option Create a simple project to create quick project as below:

      New Maven Project Type and Location Selection
    3. Enter Group Id and Artifiact Id on next screen and finally click on Finish to create the project as below:

      Group and Artifact Id Selection

    At this point, you will start seeing your new project (in my case, it is kafka-storm-integration) in Project Explorer.

     

    Adding Kafka and Storm Dependencies:

    Here is how our pom.xml will look like after adding dependencies for Storm and Kafka -

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    	<groupId>com.aksain.kafka.integrations</groupId>
    	<artifactId>kafka-storm-integration</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    
    	<properties>
    		<storm.version>1.0.2</storm.version>
    		<kafka.version>0.8.2.2</kafka.version>
    	</properties>
    
    	<dependencies>
    		<!-- Storm library -->
    		<dependency>
    			<groupId>org.apache.storm</groupId>
    			<artifactId>storm-core</artifactId>
    			<version>${storm.version}</version>
    		</dependency>
    		<!-- Storm-Kafka integration library -->
    		<dependency>
    			<groupId>org.apache.storm</groupId>
    			<artifactId>storm-kafka</artifactId>
    			<version>${storm.version}</version>
    		</dependency>
    		<!-- Kafka client libraries as Storm-Kafka integration library does not include these -->
    		<dependency>
    			<groupId>org.apache.kafka</groupId>
    			<artifactId>kafka_2.11</artifactId>
    			<version>${kafka.version}</version>
    			<exclusions>
    				<!-- Excluded to avoid version issues between Kafka zookeeper api and 
    					Storm-kafka zookeeper api -->
    				<exclusion>
    					<groupId>org.apache.zookeeper</groupId>
    					<artifactId>zookeeper</artifactId>
    				</exclusion>
    				<!-- Excluded to avoid Pre-emptive StackOverflowException due to version/implementation 
    					issues between Kafka slf4j/log4j api and Storm slf4j/log4js api -->
    				<exclusion>
    					<groupId>org.slf4j</groupId>
    					<artifactId>slf4j-log4j12</artifactId>
    				</exclusion>
    				<exclusion>
    					<groupId>log4j</groupId>
    					<artifactId>log4j</artifactId>
    				</exclusion>
    			</exclusions>
    		</dependency>
    
    	</dependencies>
    </project>

    After adding this dependency, Eclipse will automatically start downloading the libraries from Maven repository. Please be patient as it may take a while for Eclipse to download the jars and build your project.

     

    Writing a program for integrating Kafka with Storm


    Since we are ready with our Java project and all the required dependencies, it's time to write some code!

    We will be starting with writing a class called LoggerBolt that will log the messages consumed from Kafka -

    package com.aksain.kafka.storm.bolt;
    
    import org.apache.log4j.Logger;
    import org.apache.storm.topology.BasicOutputCollector;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseBasicBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    
    /**
     * @author Amit Kumar
     */
    public class LoggerBolt extends BaseBasicBolt{
    	
    	private static final long serialVersionUID = 1L;
    	private static final Logger LOG = Logger.getLogger(LoggerBolt.class);
    
    	@Override
    	public void execute(Tuple input, BasicOutputCollector collector) {
    		LOG.info(input.getString(0));
    	}
    
    	@Override
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		declarer.declare(new Fields("message"));
    	}
    }

    We however would not create a spout for Kafka as Storm provides an out of the box implementation called KafkaSpout for consuming messages. We will only be passing SpoutConf object containing following details from command line arguments -

    1. ZooKeeper host urls such as localhost:2181
    2. Topic Name
    3. ZooKeeper root path (path where kafka configurations are managed in ZooKeeper)
    4. Consumer id

    Here is the main Demo class responsible for creating topology with KafkaSpout and LoggerBolt and finally submitting it to Local cluster (embedded version of Storm in Eclipse) -

    package com.aksain.kafka.storm;
    
    import java.util.HashMap;
    
    import org.apache.log4j.Logger;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.kafka.BrokerHosts;
    import org.apache.storm.kafka.KafkaSpout;
    import org.apache.storm.kafka.SpoutConfig;
    import org.apache.storm.kafka.StringScheme;
    import org.apache.storm.kafka.ZkHosts;
    import org.apache.storm.spout.SchemeAsMultiScheme;
    import org.apache.storm.topology.TopologyBuilder;
    
    import com.aksain.kafka.storm.bolt.LoggerBolt;
    
    /**
     * @author Amit Kumar
     */
    public class KafkaStormIntegrationDemo {
    	
    	private static final Logger LOG = Logger.getLogger(KafkaStormIntegrationDemo.class);
    
    	public static void main(String[] args) {
    		// Log program usages and exit if there are less than 4 command line arguments
    		if(args.length < 4) {
    			LOG.fatal("Incorrect number of arguments. Required arguments: <zk-hosts> <kafka-topic> <zk-path> <clientid>");
    			System.exit(1);
    		}
    		
    		// Build Spout configuration using input command line parameters
    		final BrokerHosts zkrHosts = new ZkHosts(args[0]);
    		final String kafkaTopic = args[1];
    		final String zkRoot = args[2];
    		final String clientId = args[3];
    		final SpoutConfig kafkaConf = new SpoutConfig(zkrHosts, kafkaTopic, zkRoot, clientId);
    		kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
    
    		// Build topology to consume message from kafka and print them on console
    		final TopologyBuilder topologyBuilder = new TopologyBuilder();
    		// Create KafkaSpout instance using Kafka configuration and add it to topology
    		topologyBuilder.setSpout("kafka-spout", new KafkaSpout(kafkaConf), 1);
    		//Route the output of Kafka Spout to Logger bolt to log messages consumed from Kafka
    		topologyBuilder.setBolt("print-messages", new LoggerBolt()).globalGrouping("kafka-spout");
    		
    		// Submit topology to local cluster i.e. embedded storm instance in eclipse
    		final LocalCluster localCluster = new LocalCluster();
    		localCluster.submitTopology("kafka-topology", new HashMap<>(), topologyBuilder.createTopology());
    	}
    }

    Finally, let's put a log4j.properties file with following contents into src/main/resources to enable logging -

    # Direct log messages to stdout
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
    
    # Root logger option
    log4j.rootLogger=INFO, stdout

     

    Executing the program


    It's now time to execute our program. However, we first need to ensure that we have a topic with some messages in our Apache Kafka cluster.

    You can execute following command from your Kafka home directory to create a topic called 'storm-test-topic' -

    ./bin/kafka-topics.sh --create --topic storm-test-topic --zookeeper localhost:2181 --replication-factor 1 --partitions 1

    Execute following command to verify whether topic has been created successfully -

    ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic storm-test-topic

    You can now publish messages to this new topic using following command. Simply write messages on console and press enter to send.

    ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic storm-test-topic

    Since now we have a topic with few messages, let's run our program to consume these messages.

    We can simply submit this program to embedded Storm cluster in Eclipse just like any other Java program with command line arguments - Right Click On KafkaStormIntegrationDemo Program -> Run As -> Run Configurations...

    In Run Configurations... dialog box, switch to Arguments tab and add following to Program arguments -

    localhost:2181 storm-test-topic /brokers storm-consumer

    Once program executes successfully, it will take some time to start Storm and then connect to ZooKeeper and Kafka. After it has activated our topology, we will start kafka messages in console. Here are some of the message logs that i got printed on my console -

    22377 [Thread-16-print-messages-executor[3 3]] INFO  c.a.k.s.b.LoggerBolt - First message sent by Kafka console producer
    22379 [Thread-16-print-messages-executor[3 3]] INFO  c.a.k.s.b.LoggerBolt - Second message to be consumed by Storm
    22380 [Thread-16-print-messages-executor[3 3]] INFO  c.a.k.s.b.LoggerBolt - Yet another message for Storm to consume and log on console
    22383 [Thread-16-print-messages-executor[3 3]] INFO  c.a.k.s.b.LoggerBolt - This is demo message for demo of kafka and storm integration

    You can infact send messages while program is running and those messages will be consumed and logged in eclipse console by Storm.

     

    Thank you for reading through the tutorial. In case of any feedback/questions/concerns, you can communicate same to us through your comments and we shall get back to you as soon as possible.

    Posted By: Amit Kumar
    Published on: 12th September 2016