Recent Tutorials and Articles
    Counting User Tweets using Apache Spark Streaming
    Published on: 16th August 2016
    Posted By: Amit Kumar

    This tutorial will provide the instructions for developing a Java program for counting tweets of an user from Twitter data using Apache Spark Streaming.

    Pre-requisities


    In this tutorial, we will be demonstrating how to develop Java applications in Apache Spark Streaming using Eclipse IDE and Apache Maven. Since our main focus is on Apache Spark Streaming related application development, we will be assuming that you are already accustomed to these tools. On the same lines, it is assumed that you already have following tools installed on your machine - 

    • Get accustomed to Apache Spark if you are new to Apache Spark
    • Register for Twitter developer account to procure following four keys - Consumer Token, Consumer Token Secret, Access Token and Access Token Secret
    • JDK 8 since we will be using Lambda expressions
    • Eclipse IDE with Apache Maven Plugin

     

    Introduction to Apache Spark Streaming


    Apache Spark Streaming is a Spark library that wraps Spark Core engine to provide stream processing capabilities. Spark Streaming internally batches the message received in a stream for a configurable time and then pass this batch (called micro-batch) to Spark core engine for processing. It keeps doing this exercise of batching and processing indefinitely to enable us to do stream processing. This would mean that data streams in Spark streaming is a sequence of RDDs.

    Spark Streaming Architecture

    Spark Streaming library includes the connectors to consume stream of messages from many sources such as Apache Kafka, Apache Flume, HDFS, Amazon S3, Amazon Kinesis, Twitter to name a few. We will be using Twitter connector in this tutorial to consume and process tweets.

    Similarly, Spark Streaming also comes with connectors to output the processed data of stream into destinations such as HDFS, various databases to name a few.

    Spark Interfaces

     

    Setting Up Maven Java Project


    Creating Maven Project:

    We will be starting with creating a Maven project in Eclipse IDE by following below steps - 

    1. Open New Project wizard in Eclipse IDE as shown below:

      New Maven Project
    2. On next screen, select option Create a simple project to create quick project as below:

      New Maven Project Type and Location Selection
    3. Enter Group Id and Artifiact Id on next screen and finally click on Finish to create the project as below:

      Group and Artifact Id Selection

    At this point, you will start seeing your new project (in my case, it is spark-streaming-basics) in Project Explorer.

     

    Adding Spark Dependency:

    Next step is to add Apache Spark Streaming and Spark Twitter libraries to our newly created project. In order to do so, we will be adding following maven dependency to our project's pom.xml file.

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.11 -->
    <dependency>
    	<groupId>org.apache.spark</groupId>
    	<artifactId>spark-streaming_2.11</artifactId>
    	<version>1.6.2</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-twitter_2.11 -->
    <dependency>
    	<groupId>org.apache.spark</groupId>
    	<artifactId>spark-streaming-twitter_2.11</artifactId>
    	<version>1.6.2</version>
    </dependency>

    For completion purpose, here is what my pom.xml looks like after adding above dependency -

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    	<groupId>com.aksain.spark-streaming.basics</groupId>
    	<artifactId>spark-streaming-basics</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<properties>
    		<spark.version>1.6.2</spark.version>
    	</properties>
    
    	<dependencies>
    		<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.11 -->
    		<dependency>
    			<groupId>org.apache.spark</groupId>
    			<artifactId>spark-streaming_2.11</artifactId>
    			<version>${spark.version}</version>
    		</dependency>
    		<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-twitter_2.11 -->
    		<dependency>
    			<groupId>org.apache.spark</groupId>
    			<artifactId>spark-streaming-twitter_2.11</artifactId>
    			<version>${spark.version}</version>
    		</dependency>
    	</dependencies>
    </project>

    After adding this dependency, Eclipse will automatically start downloading the libraries from Maven repository. Please be patient as it may take a while for Eclipse to download the jars and build your project.

     

    Developing User Tweets Counting Program


    Now, we will create Java program for counting tweets of an user in Twitter as below (please put your consumer key, access tokens before executing the program) -

    package com.aksain.sparkstreaming.basics.twitter;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.streaming.Duration;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.twitter.TwitterUtils;
    
    import scala.Tuple2;
    import twitter4j.Status;
    import twitter4j.auth.Authorization;
    import twitter4j.auth.OAuthAuthorization;
    import twitter4j.conf.Configuration;
    import twitter4j.conf.ConfigurationBuilder;
    
    import com.google.common.collect.Iterables;
    
    /**
     * @author Amit Kumar
     *
     * Demonstrates Apache Spark Streaming functioning by consuming data from Twitter and printing number of tweets
     *  written by an user in a fixed period (10 seconds in our case).
     */
    public class SparkTwitterDataProcessor {
    	public static void main(String[] args) {
    		// Prepare the spark configuration by setting application name and master node "local" i.e. embedded mode
    		final SparkConf sparkConf = new SparkConf().setAppName("Twitter Data Processing").setMaster("local[10]");
    		// Create Streaming context using spark configuration and duration for which messages will be batched and fed to Spark Core
    		final JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Duration.apply(10000));
    		
    		// Prepare configuration for Twitter authentication and authorization
    		final Configuration conf = new ConfigurationBuilder().setDebugEnabled(false)
    										.setOAuthConsumerKey("<put-your-consumer-key>")
    										.setOAuthConsumerSecret("<put-your-consumer-key-secret>")
    										.setOAuthAccessToken("<put-your-access-token>")
    										.setOAuthAccessTokenSecret("<put-your-consumer-token-secret>")
    										.build();
    		// Create Twitter authorization object by passing prepared configuration containing consumer and access keys and tokens
    		final Authorization twitterAuth = new OAuthAuthorization(conf);
    		// Create a data stream using streaming context and Twitter authorization
    		final JavaReceiverInputDStream<Status> inputDStream = TwitterUtils.createStream(streamingContext, twitterAuth, new String[]{});
    		// Create a new stream by filtering the non english tweets from earlier streams
    		final JavaDStream<Status> enTweetsDStream = inputDStream.filter((status) -> "en".equalsIgnoreCase(status.getLang()));
    		// Convert stream to pair stream with key as user screen name and value as tweet text
    		final JavaPairDStream<String, String> userTweetsStream = 
    								enTweetsDStream.mapToPair(
    									(status) -> new Tuple2<String, String>(status.getUser().getScreenName(), status.getText())
    								);
    		
    		// Group the tweets for each user
    		final JavaPairDStream<String, Iterable<String>> tweetsReducedByUser = userTweetsStream.groupByKey();
    		// Create a new pair stream by replacing iterable of tweets in older pair stream to number of tweets
    		final JavaPairDStream<String, Integer> tweetsMappedByUser = tweetsReducedByUser.mapToPair(
    					userTweets -> new Tuple2<String, Integer>(userTweets._1, Iterables.size(userTweets._2))
    				);
    		// Iterate over the stream's RDDs and print each element on console
    		tweetsMappedByUser.foreachRDD((VoidFunction<JavaPairRDD<String, Integer>>)pairRDD -> {
    			pairRDD.foreach(new VoidFunction<Tuple2<String,Integer>>() {
    
    				@Override
    				public void call(Tuple2<String, Integer> t) throws Exception {
    					System.out.println(t._1() + "," + t._2());
    				}
    				
    			});
    		});
    		// Triggers the start of processing. Nothing happens if streaming context is not started
    		streamingContext.start();
    		// Keeps the processing live by halting here unless terminated manually
    		streamingContext.awaitTermination();
    		
    	}
    }
    

    Main highlights of the program are that we create spark configuration, Java spark streaming context with batches of tweets received in every 10 seconds and then use this context to perform following operations - .

    • Create a Twitter data stream of Type JavaReceiverInputDStream using Twitter utilites provided in Spark Twitter libraries
    • Create a filtered data stream by only having english Tweets with lang as "en"
    • Convert each tweet to key, value with key as user screen name and value as tweet text
    • Group the tweet key, value pair based on user screen name. This will result into Pair stream  with key as user screen name and value as Iterables of tweets
    • Convert value of pair stream from Iterables to count of tweets for each key (user)
    • Iterate over stream and print all the pairs on console

     

    Executing Program


    Since we have set master as "local", we can simply run this problem in Eclipse just like any other Java program - Right Click On Program -> Run As -> Java Application.

    In order to submit this program to a Spark cluster, you will need to remove setMaster("local") in SparkConf object as master information will be provided at the time of submitting the code to cluster. We will first need to create JAR file (spark-streaming-basics.jar) of our code and use below command to submit it to spark running on YARN cluster - 

    ./bin/spark-submit 
    	--class com.aksain.sparkstreaming.basics.twitter.SparkTwitterDataProcessor
    	--deploy-mode cluster
    	--master yarn
    	spark-streaming-basics.jar

     

    Either way, here is the sample output (actual output will be different based on tweets fetched at that time) that this program will generate -

    DonnaPa85694862,1
    mawiekaels,1
    BYEjas_,1
    PEETthePOOT,1
    phiiiiaaaaa,1
    Skooshbag,1
    iakinfvinograd5,1
    AnjuRavinder,1
    EstevamLouis,1
    Ellie1029,1
    Queird_kpopstan,1
    fuxiexx,1
    latestnews003,1
    SingaporeAdv,1
    Consult_Energy,1
    Terlingua_Texas,1
    Ballsmcgee_9,1
    WSJecon,1
    BlauesAugeBlond,1
    abutler04,1
    toppingzouis,1
    JenniesMagic,1
    Detroit_Tigers2,1
    bellatrixx7,1
    GTAJBaker1,1
    UBD_26,1
    StupidRS,1
    KimKg23,1
    pri_kachu,1
    test5f1798,1
    AlMightyLil,1
    AdamFreers,1
    Sonsyrae,1
    jay_flooooo,1
    Farah_fir,1
    manuelwhistrem1,1
    HarryLo70670681,1
    pheel_lip,1
    luckyandpat,1
    TCBKelvis,1
    My1stRecord,1
    QDPurdu,1
    LotusCarClub,1
    ghia1212,1
    MzChi_az,1
    PrincePurrsia,1
    SarahEmDubois,1
    CelestineErski1,1

     

     

     

     

    Thank you for reading through the tutorial. In case of any feedback/questions/concerns, you can communicate same to us through your comments and we shall get back to you as soon as possible.

    Posted By: Amit Kumar
    Published on: 16th August 2016