---
tags: BigData-BS-2020
title: Lab Block 5. MapReduce with Spark.
---
# Lab Block 5. MapReduce with Spark.
During this lab, you will learn how to compile and package containers for executing on the YARN resource manager. You will learn how to write applications in Scala language, how to work with Spark, how to submit an application to Hadoop YARN.
## Requirements
We assume you have completed the following:
- Configured Hadoop Cluster
- Successfully executed example application
- Downloaded and extracted [Spark 3.3.0](https://spark.apache.org/downloads.html)
- Installed and preconfigured scala building tools (either with [IntelliJ Idea](https://www.jetbrains.com/idea/download/) or [separately installed sbt tools](https://www.scala-sbt.org/))
- Familiarized yourself with [Scala syntax](https://learnxinyminutes.com/docs/scala/)
:::success
You are recommended to use IntelliJ Idea. Students have a free premium subscription for one year. IDE will definitely make your life easier. If an IDE is too cumbersome for your taste, install one of the modern programmer's text editors: [Atom](https://atom.io/) or [Visual Studio Code](https://code.visualstudio.com/). They provide syntax support for most programming languages through plugins.
:::
## Working with your Hadoop cluster
:::danger
For this lab, you will need a little more resources from your machine. If your computer struggles to run three VMs simultaneously, constrain yourself to a cluster with 1 or 2 VMs in it. If you use less than three nodes, ensure that the replication factor for HDFS is less than 3.
:::
## Hadoop and Spark
Spark is a cluster computing system implemented in Scala. It allows for several modes of processing, including batch and stream processing, Spark SQL. It comes with libraries for machine learning and graph processing, optimized for distributed computation.
Spark executes applications on clusters, currently [several are supported](https://spark.apache.org/docs/latest/cluster-overview.html):
- Apache Mesos
- Hadoop YARN
- Kubernetes
Today, we are going to use the Hadoop YARN cluster you have configured to execute a Spark application.
## Wordcount
Let's practice WordCount with [Alice in Wonderland](https://edisk.university.innopolis.ru/edisk/Public/Fall%202019/Big%20Data/alice.txt).
### Wordcount with Hadoop
Hadoop comes with its own WordCount Example. To test it, first put the text file into HDFS
```
hdfs dfs -put source destination
```
Then, run the WordCount example
```
hadoop jar hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.1.jar wordcount TextFile OutputDirectory
```
Hopefully, your current configuration is capable of executing the job. Sometimes, when the configuration is not proper for the task, the execution is stuck at 0%. If it does not work, increase the memory available for the MapReduce task.
For those who are interested in Hadoop's WordCount implementation, here is the Java code
```java
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
```
### WordCount with Spark
With Spark, the WordCount application will look something like this
```scala
val inputPath = "path/to/input"
val outputPath = "path/to/output"
val textFile = sc.textFile(inputPath)
val counts = textFile.flatMap(line => line.split(" ")).
map(word => (word, 1)).
reduceByKey(_ + _)
counts.saveAsTextFile(outputPath)
```
Your Spark installation comes with `spark-shell`. It is a [REPL](https://en.wikipedia.org/wiki/Read%E2%80%93eval%E2%80%93print_loop) environment that allows you to execute Scala statements without compiling and packaging an application. `spark-shell` is located in `$SPARK_DIRECTORY/bin`. You can [add the `bin` folder to `PATH`](https://unix.stackexchange.com/questions/26047/how-to-correctly-add-a-path-to-path?answertab=votes#tab-top) to execute it from any directory.
You can run this example line-by-line in `spark-shell`
```
$ spark-shell
19/09/20 19:33:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.1.71:4040
Spark context available as 'sc' (master = local[*], app id = local-1568997196444).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.4
/_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112)
Type in expressions to have them evaluated.
Type :help for more information.
scala> paste lines here
```
The output will be written to the output location. It will create a directory with several files
```bash
$ ls
_SUCCESS part-00000 part-00001
```
### Examine the Program
Let's try to understand what is happening under the hood when you execute this program.
#### Reading File
Overall, Spark example follows Scala syntax. The first meaningful line specifies location to read
```scala
val textFile = sc.textFile(inputPath)
```
It is important to note here that `inputPath` can be both a path to a text file or a path to a directory. In the case, `inputPath` is a directory, all the text files inside this directory are read.
Unlike in a regular program, this line does not read the text file right away. At this moment, we simply specified that we want to read this file in the future.
When you executed the statement, you probably notice a line similar to this
```
org.apache.spark.rdd.RDD[String] = path/to/file MapPartitionsRDD[1] at textFile...
```
The part on the [LHS](https://en.wikipedia.org/wiki/Sides_of_an_equation) tells you the type of the returned object. In our case, it is an `RDD` with `String` type. `RDD` stands for Resilient Distributed Dataset; it is a distributed `Array` counterpart in the world of Spark. More details about this object type will be given on lectures. If you cannot wait, please read [the documentation](https://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds).
The newly created `RDD` `textFile` is a distributed array, where each element is a line of the input file. The RDDs support a list of data [transformations](https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations). We will start with `map` and `reduce`.
#### Map and Reduce
```scala=
val counts = textFile.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
```
The functions of the `map` family apply a certain transformation to each element of an RDD. These transformations should be independent of each other. With Spark, the transformation is defined with a lambda function. The first one is `line => line.split(" ")`. The part before `=>` specifies input arguments. The rest is the body of the function. In this particular example, the lambda function accepts one argument that is referred to as `line` and is`String` type. In the body of the lambda function, we call `split` method of the `String` class.
The first map, however, is not a regular one. The result of the split is an array. `flatMap` assumes that the output of the transformation is an array, and merges all the arrays for all thee maps into a single RDD.
The method `flatMap` returns an RDD, and on the next line, we apply another map to this new RDD. This time the lambda function accepts one argument and creates a tuple.
The last line applies `reduceByKey`. It assumes that the objects in RDD are stored in tuples. The first element of the tuple is used as an *id*, and the second - as *value*. The elements of RDD with the same *id* are grouped together, and reduce operation is applied. You should be familiar with this procedure from the lectures on MapReduce. In this last transformation, the lambda function has an unusual syntax, which is called Placeholder Syntax. Lambda for the reduce operation should be of type `(V,V) => V`, i.e., it accepts two elements and returns one. In our case, it will return a sum. If you feel uncomfortable with placeholder syntax, please, familiarize yourself with the [Scala syntax](https://learnxinyminutes.com/docs/scala/).
The `reduceByKey` operation will return another RDD that we will assign to a constant `counts`. Note that after executing this line, the file was still not read.
#### Writing to File
The last line writes the result to a file.
```scala
counts.saveAsTextFile(outputPath)
```
Only after executing this line, the data is actually read, and the processing is executed. Previously, you simply specified how to perform computations, but the result was not used by anyone, and therefore, not computed. When you write to a file or to the console output, the results of the computation are queried.
Examine the content of the output file. Each line contains a tuple. This happened because the output of `reduceByKey` is also a tuple.
### Executing a Script
`spark-shell` allows executing a script. For this, type
```bash
>$ spark-shell -i my_script.scala
```
### Improving WordCount
Let's imagine that another application should read the output of your WordCount example. In this scenario, the tuple format of the output is not convenient. Let's change our application so that the output contains the word and the count separated by the tabular symbol.
To implement this, you need to add one more map to the processing pipeline that will transform a tuple into a properly formatted string.
## `spark-shell` on Cluster
You can execute a shell on a cluster by specifying the `master` argument
```
>$ spark-shell --master yarn
```
Before, you need to specify two environment variables
```bash
export YARN_CONF_DIR=path/to/hadoop/conf
export HADOOP_USER_NAME=vagrant
```
Spark does not know anything about your Hadoop cluster yet. You need to let it know by providing a path to the Hadoop's configuration folder `etc/hadoop`. You can copy the configuration folder to another machine, but you need to make sure that the network paths are valid.
After configuring Spark for YARN, run `spark-shell` in YARN mode and try executing WordCount. Make sure to put the data into HDFS and provide proper paths.
`spark-shell` uses a network to communicate with the cluster. You should be able to use IP addresses specified in `Vagrantfile`. If you cannot access VM by IP, verify that the network interface `vboxnet` is up.
## Building Spark Application
Let's build a proper application.
Create folder structure `src/main/scala`. In order to compile Scala code into an application, you need to create a top level class (same as with Java).
```scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object MainClass{
def main(args: Array[String]){
val conf = new SparkConf().setAppName("appName")
val sc = new SparkContext(conf)
val inputPath = args(0)
val outputPath = args(1)
val textFile = sc.textFile(inputPath)
val counts = textFile.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
counts.saveAsTextFile(outputPath)
}
}
```
Save this code into the `scala` folder. The name of the file should match the name of the main class. At the same level as `src` folder, create a `build.sbt` file with the following content
```
name := "spark-wordcount"
version := "1.0"
scalaVersion := "2.12.10"
val sparkVersion = "3.0.1"
libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion
```
:::warning
Be sure to specify the correct versions of Scala and Spark
:::
Now, create a jar container by running `sbt package`.
## `spark-submit`
Locate the newly create jar file and submit it to the cluster.
```bash
spark-submit --master yarn --class MainClass application.jar input/path output/path
```
## Sorting and Filtering
Look at the API for [transformations](https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations). Introduce the following modification into your program
- Filter out words that contain punctuation
- Sort the output by frequency in descending order
- Words and counts should be separated by a tabular character
# References
- [Help with Scala](https://learnxinyminutes.com/docs/scala/)
- Programming in Scala by Bill Venners and Martin Odersky
- RDD partitioning [1](https://medium.com/parrot-prediction/partitioning-in-apache-spark-8134ad840b0) [2](https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-rdd-partitions.html)
- Memory Issues [1](https://developer.ibm.com/hadoop/2016/02/16/beginners-guide-apache-spark-troubleshooting/) [2](https://developer.ibm.com/hadoop/2016/02/16/beginners-guide-apache-spark-troubleshooting/)
- Stack overflow during execution [1](https://issues.apache.org/jira/browse/SPARK-1006)
- History Server [1](https://spark.apache.org/docs/latest/monitoring.html)
## Appendix
### Including Dependencies
When you configure artifact with IDE, you need to specify the dependencies. There is absolutely no need to include all the dependencies in the jar container, so you should merely add dependencies to the manifest.
# Troubleshooting
## Sbt compilation error
There are issues compiling scala libraries with Java 9 and above. If you see error during compilation, try downgrading to Java 8.
## Missing Winutils
There is a known issue with Spark on Windows, that it requires winutils.exe. The solution can be found [here](https://stackoverflow.com/questions/35652665/java-io-ioexception-could-not-locate-executable-null-bin-winutils-exe-in-the-ha).
## No communication with the cluster (ConnectionRefused MachineName/127.0.0.1)
On Windows, your computer identifies itself with a domain name. The cluster has no way of resolving this domain name. Look into `SPARK_MASTER_HOST` or `SPARK_LOCAL_IP` in Spark's `spark_folder/conf/spark-env.sh`. If it does not exist, create it from a template.
Sometimes setting `SPARK_LOCAL_IP` will not work due to
If the SPARK_LOCAL_IP environment variable is set to a hostname, then this hostname will be used. If SPARK_LOCAL_IP is set to an IP address, it will be resolved to a hostname.
## Class Not found or Other Misterious Issues with Java
Verify that you scala version is `build.sbt` and the scala version of your spark destribution match.
# Acceptance Criteria
- `sbt` compiles jar containers
- You can execute jar container on your cluster using `spark-submit`
- Features in section `Sorting and Filtering` are implemented
# Self-check Questions
1. How does Spark know how to access the Hadoop cluster?
2. Provide the list of computational clusters supported by Spark (i.e. where can Spark run)?
3. Why are applications written in Spark more concise than applications using Hadoop’s MapReduce in Java?
4. Is Scala a compiled or an interpreted language?
5. Explain why Java libraries are compatible with Scala. After all, those are two different languages.
6. What is RDD?
7. Describe different deploy modes used in spark-submit
8. Explain the syntax behind reduce operations. Why does reduce accept two arguments _ + _? How does reduce work if the collection consists of a single element? What are the mathematical requirements imposed on reduce operations?
9. What is the difference between class and object in Scala?
# Useful links
- [Spark Quick Start](https://spark.apache.org/docs/latest/quick-start.html)
- [Spark RDD Programming Guide](https://spark.apache.org/docs/latest/programming-guide.html#initializing-spark)
- [Spark Submit](https://spark.apache.org/docs/latest/submitting-applications.html)
- [Spark Overview](https://spark.apache.org/docs/latest/)
# Further Reading
- You can create a compute cluster only using Spark itself (no external resource manager). Read [the documentation](https://spark.apache.org/docs/latest/spark-standalone.html).