# Spark Streaming
###### tags: `Spark` `Streaming`
![](https://i.imgur.com/43NM5xm.png)
Types of queries one wants on answer on a data stream:
- Sampling data from a stream
- Construct a random sample
- Queries over sliding windows
- Number of items of type x in the last k elements of the stream
- Filtering a data stream
- Select elements with property x from the stream
- Counting distinct elements
- Number of distinct elements in the last k elements of the stream
- Estimating moments
- Estimate average/std deviation of last k elements
- Finding frequent elements
## Quick Example: WordCount
In this example we would **read files written in a directory** as a stream of data.
### Step 1. Create A Java Project
#### Canonical Maven directory structure
```sh
$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/JavaNetworkWordCount.java
```
The content of [`./pom.xml`](https://hackmd.io/s/B1zXag5pl#project-dependencies) and [`./src/main/java/JavaNetworkWordCount.java`](https://hackmd.io/s/B1zXag5pl#main-class) are provided in below.
#### Main Class
```java
/* JavaNetworkWordCount.java */
import ...
/**
* Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
*/
public final class JavaNetworkWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
String dataDirectory = "remoteFolder/streaming";
JavaDStream<String> lines =ssc.textFileStream(dataDirectory);
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)));
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
.reduceByKey((i1, i2) -> i1 + i2);
wordCounts.print();
ssc.start();
ssc.awaitTermination();
}
}
```
Spark Streaming will monitor the directory `dataDirectory`(= `"remoteFolder/streaming"`) and process any files created in that directory (files written in nested directories not supported).
Note that
- The files must have the same data format.
- The files must be created in `dataDirectory` by atomically moving into or renaming.
#### Project Dependencies
```xml
<!--pom.xml-->
<project>
<groupId>org.nchc.spark</groupId>
<artifactId>spark-sample</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>JavaNetworkWordCount</name>
<packaging>jar</packaging>
<version>0.0.1</version>
<properties>
<java.version>1.8</java.version>
<spark.version>1.5.0</spark.version>
</properties>
<dependencies>
<!-- Spark core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
```
- Note that `SparkConf().setAppName` in main java class must match `project->name` in pom.xml.
- Note that `project->dependencies` in pom.xml must contain all libraries we import in our java classes.
### Step 2. `Package` The Project using Maven
```sh
# Package a JAR containing your application
$ mvn clean package
...
[INFO] Building jar: {..}/{..}/target/spark-sample-0.0.1.jar
```
### Step 3. Prepare Streaming Data
Create the directory named the same as variable `dataDirectory` you set in your project.
```shell
$ hadoop fs -mkdir remoteFolder/streaming
```
Then create a file named `hello` with data you want to receive as a stream.
```shell
$ vi hello
hello world
```
### Step 4. Run The Project
Open a terminal, named "Terminal A", to execute the following commnd.
```shell
$ spark-submit --class "JavaNetworkWordCount" JavaNetworkWordCount/target/spark-sample-0.0.1.jar
```
### Step 5. Move Files into Streaming Directory
Open another terminal, named "Terminal B" to execute the following commnds.
```shell
$ hadoop fs -rm remoteFolder/streaming/hello
$ hadoop fs -copyFromLocal hello remoteFolder/streaming
$ hadoop fs -cat remoteFolder/streaming/*
```
Then in "Terminal A" you would see the following information show on screen.
```shell
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...
```
If you didn't see, execute the same commands repeatedly until you see.
```shell
$ hadoop fs -rm remoteFolder/streaming/hello
$ hadoop fs -copyFromLocal hello remoteFolder/streaming
$ hadoop fs -cat remoteFolder/streaming/*
```
## DataFrame and SQL Operations
You can easily use DataFrames and SQL operations on streaming data.
### Step 1. Create A Java Project
#### Canonical Maven directory structure
```sh
$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/JavaRow.java
./src/main/java/JavaSQLNetworkWordCount.java
```
The content of [`./pom.xml`](https://hackmd.io/s/B1zXag5pl#project-dependencies-add-spark-sql), [`./src/main/java/JavaNetworkWordCount.java`](https://hackmd.io/s/B1zXag5pl#classes), and [./src/main/java/JavaRow.java](https://hackmd.io/s/B1zXag5pl#classes) are provided in below.
#### Classes
```java
/* JavaRow.java */
/** Java Bean class for converting RDD to DataFrame */
public class JavaRow implements java.io.Serializable {
private String word;
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
}
```
```java
/* JavaSQLNetworkWordCount.java */
import ...
/**
* Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
*/
public final class JavaSQLNetworkWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
private static final Pattern COMMA = Pattern.compile(",");
public static void main(String[] args) throws Exception {
// Create the context with a 5 second batch size
SparkConf sparkConf = new SparkConf().setAppName("JavaSQLNetworkWordCount");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(5));
String dataDirectory = "remoteFolder/streaming";
JavaDStream<String> lines =ssc.textFileStream(dataDirectory);
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)));
words.foreachRDD( (rdd, time) -> {
// Get the singleton instance of SQLContext
SQLContext sqlContext = SQLContext.getOrCreate(rdd.context());
// Convert RDD[String] to RDD[case class] to DataFrame
JavaRDD<JavaRow> rowRDD = rdd.map(word -> {
JavaRow record = new JavaRow();
record.setWord(word);
return record;
});
DataFrame wordsDataFrame = sqlContext.createDataFrame(rowRDD, JavaRow.class);
// Register as table
wordsDataFrame.registerTempTable("words");
// Do word count on table using SQL and print it
DataFrame wordCountsDataFrame =
sqlContext.sql("select word, count(*) as total from words group by word");
wordCountsDataFrame.show();
return null;
}
);
ssc.start();
ssc.awaitTermination();
}
}
```
Spark Streaming will monitor the directory `dataDirectory`(= `"remoteFolder/streaming"`) and process any files created in that directory (files written in nested directories not supported).
Note that
- The files must have the same data format.
- The files must be created in `dataDirectory` by atomically moving into or renaming.
#### Project Dependencies (add spark-sql)
```xml
<!--pom.xml-->
<project>
<groupId>org.nchc.spark</groupId>
<artifactId>spark-sample</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>JavaSQLNetworkWordCount</name>
<packaging>jar</packaging>
<version>0.0.1</version>
<properties>
<java.version>1.8</java.version>
<spark.version>1.5.0</spark.version>
</properties>
<dependencies>
<!-- Spark core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
```
- Note that `SparkConf().setAppName` in main java class must match `project->name` in pom.xml.
- Note that `project->dependencies` in pom.xml must contain all libraries we import in our java classes.
### Step 2. `Package` The Project using Maven
```sh
# Package a JAR containing your application
$ mvn clean package
...
[INFO] Building jar: {..}/{..}/target/spark-sample-0.0.1.jar
```
### Step 3. Prepare Streaming Data
Create the directory named the same as variable `dataDirectory` you set in your project.
```shell
$ hadoop fs -mkdir remoteFolder/streaming
```
Then create a file named `hello` with data you want to receive as a stream.
```shell
$ vi hello
hello world
```
### Step 4. Run The Project
Open a terminal, named "Terminal A", to execute the following commnd.
```shell
$ spark-submit --class "JavaSQLNetworkWordCount" JavaSQLNetworkWordCount/target/spark-sample-0.0.1.jar
```
### Step 5. Move Files into Streaming Directory
Open another terminal, named "Terminal B" to execute the following commnds.
```shell
$ hadoop fs -rm remoteFolder/streaming/hello
$ hadoop fs -copyFromLocal hello remoteFolder/streaming
$ hadoop fs -cat remoteFolder/streaming/*
```
Then in "Terminal A" you would see the following information show on screen.
```shell
+-----+-----+
| word|total|
+-----+-----+
|hello| 1|
|world| 1|
+-----+-----+
```
If you didn't see, execute the same commands repeatedly until you see.
```shell
$ hadoop fs -rm remoteFolder/streaming/hello
$ hadoop fs -copyFromLocal hello remoteFolder/streaming
$ hadoop fs -cat remoteFolder/streaming/*
```
## Summary HPC Stream in Every 5 Secs
In this example we use the same project architecture as in the previous example, except that we execute the following SQL command:
```SQL
Select LOGIN_NAME, sum(q_TIME) as SUMQ_TIME, sum(ELP_TIME) as SUMELP_TIME, sum(SU) as SUMSU From HPC Group By LOGIN_NAME
```
### Step 1. Create A Java Project
#### Canonical Maven directory structure
```sh
$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/JavaRow.java
./src/main/java/HPCStreamSummary.java
```
#### Classes
```java
/* JavaRow.java */
public class JavaRow implements java.io.Serializable {
private String LOGIN_NAME;
private Double SU;
private Integer ELP_TIME;
private Integer q_TIME;
private Integer REAL_CPU;
public Double getSU() {
return SU;
}
public void setSU(String SU) {
this.SU = Double.parseDouble(SU);
}
public Integer getELP_TIME() {
return ELP_TIME;
}
...
}
```
```java
/* JavaSQLNetworkWordCount.java */
import ...
/**
* Summ up HPC lines
*/
public final class HPCStreamSummary {
private static final Pattern SPACE = Pattern.compile(" ");
private static final Pattern COMMA = Pattern.compile(",");
public static void main(String[] args) throws Exception {
// Create the context with a 5 second batch size
SparkConf sparkConf = new SparkConf().setAppName("HPCStreamSummary");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(5));
String dataDirectory = "remoteFolder/streaming";
JavaDStream<String> lines =ssc.textFileStream(dataDirectory);
//JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(COMMA.split(x)));
lines.foreachRDD( (rdd, time) -> {
// Get the singleton instance of SQLContext
SQLContext sqlContext = SQLContext.getOrCreate(rdd.context());
// Convert RDD[String] to RDD[case class] to DataFrame
JavaRDD<JavaRow> rowRDD = rdd.map(line -> {
String[] words = line.split(",");
JavaRow record = new JavaRow();
record.setLOGIN_NAME(words[2]);
record.setREAL_CPU(words[9]);
record.setq_TIME(words[10]);
record.setELP_TIME(words[11]);
record.setSU(words[13]);
return record;
});
DataFrame hpcDataFrame = sqlContext.createDataFrame(rowRDD, JavaRow.class);
// Register as table
hpcDataFrame.registerTempTable("HPC");
// Do word count on table using SQL and print it
DataFrame hpcCountsDataFrame =
sqlContext.sql("select LOGIN_NAME, sum(q_TIME) as SUMQ_TIME, sum(ELP_TIME) as SUMELP_TIME, sum(SU) as SUMSU from HPC group by LOGIN_NAME");
System.out.println("Users in the last 5 secs:");
hpcCountsDataFrame.show();
return null;
}
);
ssc.start();
ssc.awaitTermination();
}
}
```
Spark Streaming will monitor the directory `dataDirectory`(= `"remoteFolder/streaming"`) and process any files created in that directory (files written in nested directories not supported).
Note that
- The files must have the same data format.
- The files must be created in `dataDirectory` by atomically moving into or renaming.
#### Project Dependencies
```xml
<!--pom.xml-->
<project>
<groupId>org.nchc.spark</groupId>
<artifactId>spark-sample</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>JavaSQLNetworkWordCount</name>
<packaging>jar</packaging>
<version>0.0.1</version>
<properties>
<java.version>1.8</java.version>
<spark.version>1.5.0</spark.version>
</properties>
<dependencies>
<!-- Spark core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
```
- Note that `SparkConf().setAppName` in main java class must match `project->name` in pom.xml.
- Note that `project->dependencies` in pom.xml must contain all libraries we import in our java classes.
### Step 2. `Package` The Project using Maven
```sh
# Package a JAR containing your application
$ mvn clean package
...
[INFO] Building jar: {..}/{..}/target/spark-sample-0.0.1.jar
```
### Step 3. Prepare Streaming Data
Create the directory named the same as variable `dataDirectory` you set in your project.
```shell
$ hadoop fs -mkdir remoteFolder/streaming
```
Then I extract 100 lines from `qacct_2011.csv`, which saves as `qacct_test`, to use as streaming data.
### Step 4. Run The Project
Open a terminal, named "Terminal A", to execute the following commnd.
```shell
$ spark-submit --class "HPCStreamSummary" HPCStreamSummary/target/spark-sample-0.0.1.jar
```
### Step 5. Move Files into Streaming Directory
Open another terminal, named "Terminal B" to execute the following commnds.
```shell
$ hadoop fs -copyFromLocal qacct_test remoteFolder/streaming
```
Then in "Terminal A" you would see the following information show on screen.
```shell
+----------+---------+-----------+------------------+
|LOGIN_NAME|SUMQ_TIME|SUMELP_TIME| SUMSU|
+----------+---------+-----------+------------------+
| sxxxxxx5| 74| 971| 9.709999999999999|
| mxn| 0| 161| 0.0335|
| sxxxxxx8| 3966| 11669|116.69000000000001|
| sxxxxxx2| 4| 5832| 933.12|
| sxxxcc24| 0| 24497| 5.1035|
| sxxxcc32| 52611| 30229|446.93250000000006|
| root| 0| 346| 0.0721|
| jxxxng00| 9| 0| 0.0|
| sxxxcc48| 0| 4932| 1.0275|
| jxxxbe00| 4| 0| 0.0|
| uxxxcl00| 4| 0| 0.0|
| gxm| 0| 40| 0.0083|
| cxd| 0| 19| 0.004|
| cxxm| 0| 16| 0.0034|
| sxxxcc01| 4| 282| 0.47|
| sxxxcc03| 3| 0| 0.0|
+----------+---------+-----------+------------------+
```
## References
- [Spark Java API](https://spark.apache.org/docs/1.5.0/api/java/index.html)
- [Spark Streaming Programming Guide](https://spark.apache.org/docs/1.5.0/streaming-programming-guide.html)