---
tags: IntroToBigData
title: Lab 5 - Apache Spark ML
---
# Lab 5 - Apache Spark ML
**Course:** Big Data - IU S23
**Author:** Firas Jolha
<!-- ## Dataset
-
## Readings
- -->
# Agenda
[toc]
# Prerequisites
- Installed Hortonworks Data Platform (HDP) Sandbox
- Installed pip and pandas packages
- Added Python interpreter to Zeppelin
# Objectives
- Run Spark application to a cluster
- Learns methods to extract features in spark MLlib
- Apply TF-IDF feature extraction for description of movies
- Perform some regression tasks in spark MLlib
<!-- - Perform some classification tasks in spark MLlib -->
<!-- - Perform some clustering tasks in spark MLlib -->
<!-- - Perform some graph analytics tasks in Neo4j -->
# Run spark application on the cluster
In the last lab, we used `master("local[*]")` to run the spark application but the execution was done on the local machine and not on the cluster (where `*` means to use as many CPU cores as exist). Apache Spark supports multiple cluster managers, including standalone spark manager, Yarn, Mesos,...etc. In this lab, we will use Yarn to manage the cluster and will learn how we can track the status of Spark jobs on Spark UI.
:::danger
**Note:** Before starting running your code on YARN cluster, you need to set the following environment variable in the file `spark-env.sh` as follows:
```powershell!
export HADOOP_CONF_DIR=/etc/hadoop/conf
```
Just run the pervious command on the shell of the cluster container. You can find this configuration in Spark2 configs windows in `advanced spark2.env` but you need to restart spark2 after that update.

:::
You need to create a Zeppelin note where the default interpreter is `spark2`. To specify the cluster manager as `Yarn`, make sure that `yarn-client` is specified as master in the configuration of the interpreter. In yarn-client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.
:::info
Spark supports two modes for running on YARN, “yarn-cluster” mode and “yarn-client” mode. Broadly, yarn-cluster mode makes sense for production jobs, while yarn-client mode makes sense for interactive and debugging uses where you want to see your application’s output immediately.
:::

<center>
<p>
Yarn client mode
</p>
</center>

<center>
<p>
Yarn cluster mode
</p>
</center>
:::info
In interactive platforms like Zeppelin, we will deploy the application only on client mode.
:::
# Apache Spark MLlib
In Spark, we have two different MLlib APIs, The old API which is RDD-based (`pyspark.mllib`) and the new API which is DataFrame-based (`pyspark.ml`). As of Spark 2.0, the RDD-based APIs in the `pyspark.mllib` package have entered maintenance mode. The primary Machine Learning API for Spark is now the DataFrame-based API in the `pyspark.ml` package.
# Basic data structures in Spark ML
Spark MLlib supports local vectors and matrices stored on a single machine, as well as distributed matrices backed by one or more RDDs. Local vectors and local matrices are simple data models that serve as public interfaces. The underlying linear algebra operations are provided by [**Breeze**](http://www.scalanlp.org/).
## Local vector
A local vector has integer-typed and 0-based indices and double-typed values, stored on a single machine. MLlib supports two types of local vectors: **dense** and **sparse**. A dense vector is backed by a double array representing its entry values, while a sparse vector is backed by two parallel arrays: indices and values. For example, a vector `(1.0, 0.0, 3.0)` can be represented in dense format as `[1.0, 0.0, 3.0]` or in sparse format as `(3, [0, 2], [1.0, 3.0])`, where `3` is the size of the vector. MLlib recognizes NumPy’s array and Python’s list as dense vectors. You can create sparse vectors using MLlib’s `Vectors.SparseVector`.
```python!
%pyspark
import numpy as np
from pyspark.mllib.linalg import Vectors as Vectors1
from pyspark.ml.linalg import Vectors as Vectors2
# Use a NumPy array as a dense vector.
dv1 = np.array([1.0, 0.0, 3.0])
# Use a Python list as a dense vector.
dv2 = [1.0, 0.0, 3.0]
# Create a SparseVector.
sv1 = Vectors1.sparse(3, [0, 2], [1.0, 3.0])
print(sv1)
# Create a SparseVector.
sv2 = Vectors2.sparse(4, [0, 2], [1.0, 3.0])
sv3 = Vectors2.dense([0, 1.0, 0.0, 3.0])
print(sv2)
print(sv2.toArray())
print(sv2.indices)
print(sv2.values)
print(sv2.norm(1)) # p = 1
print(sv2.norm(2)) # p = 2
print(sv2.norm(3)) # p = 3
print(sv3)
```
### Math operations
```python!
# For sparse vectors we need to convert to dense vectors in order to perform math operations as follows:
v = Vectors2.dense(Vectors2.sparse(2, [0,1], [1.0, 2.0]).toArray())
u = Vectors2.dense(Vectors2.sparse(2, [0,1], [3.0, 4.0]).toArray())
# With dense vectors
v = Vectors2.dense([1.0, 2.0])
u = Vectors2.dense([3.0, 4.0])
print(v + u)
print(2 - v)
print(v / 2)
print(v * u)
print(u / v)
print(u % 2)
# print(-v) # Error
print(0-v) # Correct
```

<!--  -->
## Local matrix
A local matrix has integer-typed row and column indices and double-typed values, stored on a single machine. For example, the following dense matrix
$$
A= \begin{bmatrix}
1.0 & 2.0 \\
3.0 & 4.0 \\
5.0 & 6.0
\end{bmatrix}
$$
is stored in a one-dimensional array `[1.0, 3.0, 5.0, 2.0, 4.0, 6.0]` with the matrix size `(3, 2)`.
```python!
from pyspark.mllib.linalg import Matrices as Matrices1
from pyspark.ml.linalg import Matrices as Matrices2
# Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
dm2 = Matrices1.dense(3, 2, [1, 3, 5, 2, 4, 6])
dm3 = Matrices2.dense(3, 2, [1, 3, 5, 2, 4, 6])
# Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
sm2 = Matrices1.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8])
sm3 = Matrices2.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8])
```
<!--
### Math operations
```python!
# For sparse vectors we need to convert to dense vectors in order to perform math operations as follows:
v = Matrices2.sparse(2, [0,1], [1.0, 2.0]).toDense()
u = Matrices2.sparse(2, [0,1], [3.0, 4.0]).toDense()
# With dense vectors
v = Matrices2.dense([1.0, 2.0])
u = Matrices2.dense([3.0, 4.0])
print(v + u)
print(2 - v)
print(v / 2)
print(v * u)
print(u / v)
print(u % 2)
# print(-v) # Error
print(0-v) # Correct
``` -->
## Distributed matrix
A distributed matrix has long-typed row and column indices and double-typed values, stored distributively in one or more RDDs. It is very important to choose the right format to store large and distributed matrices. Converting a distributed matrix to a different format may require a global shuffle, which is quite expensive.
We recommend to use local vectors and store them in distributed dataframes. Feel free to use distributed matrices if needed but avoid using local matrices which operate only on a single machine.
# Feature Extraction
For most ML pipelines, feature extraction is one of the first stages to do. The need to extract features from the data has multiple advantages. One of them is to reduce the dimensionality of the data, and also to increase the performance of ML models.
## Example on TF-IDF
Open a Zeppelin note and set `spark2` as the default interpreter. For each cell, you need to use `%pyspark` to write Python cells.
1. Import packages
```python!
%pyspark
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
```
2. Create session and get SparkContext
```python!
%pyspark
spark = SparkSession \
.builder \
.appName("my spark app") \
.getOrCreate()
sc = spark.sparkContex
```
3. Upload [`HollywoodMovies.csv`](https://raw.githubusercontent.com/reisanar/datasets/master/HollywoodMovies.csv) to the local file system.
4. Read from HDFS
```python!
%pyspark
path = "/data/HollywoodMovies.csv" # /data is folder where the file exists
df = spark.read.load(path, format="csv", sep = ",", inferSchema = "true", header = "true")
df.show()
```
5. Select `movies` column
```python!
%pyspark
filmsdf = df.select("Movie")
filmsdf.show(truncate=False)
```
6. Create the pipeline
```python!
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer, HashingTF, IDF, Tokenizer, StopWordsRemover
%pyspark
tokenizer = Tokenizer(inputCol="Movie", outputCol="words")
swremoval = StopWordsRemover(inputCol="words", outputCol="stremoved")
vectorizer = CountVectorizer(inputCol="stremoved", outputCol="rawFeatures")
idf = IDF(inputCol="rawFeatures", outputCol="features")
pipeline = Pipeline(stages = [tokenizer, swremoval, vectorizer, idf])
model = pipeline.fit(filmsdf)
model.save("hdfs:///models/tfidfmodel")
model
```
7. Transform the data
```python!
%pyspark
films = model.transform(filmsdf)
# Show first ten transformed features of films
films.show(10)
# Show first ten values of column `words`
films.select('words').show(10, truncate = False)
# Show first ten values of column `stremoved`
films.select('stremoved').show(10, truncate = False)
# Show first ten values of column `rawFeatures`
films.select('rawFeatures').show(10, truncate = False)
# Get the real values of the Spark DataFrame
films.select('rawFeatures').rdd.flatMap(lambda x : x).map(lambda x : x.toArray()).collect()
```
8. Get the total counts for each column
```python!
%pyspark
# Getting the total count for each term
total_counts = model.transform(filmsdf) \
.select('rawFeatures').rdd \
.map(lambda row: row['rawFeatures'].toArray()) \
.reduce(lambda x,y: [x[i]+y[i] for i in range(len(y))])
total_counts
```
9. Get the vocabulary along with count
```python!
%pyspark
# Get the vocabulary along with the counts
vocabList = model.stages[2].vocabulary
d = {'vocabList':vocabList,'counts':total_counts}
spark.createDataFrame(np.array(list(d.values())).T.tolist(),list(d.keys())).show()
```
10. Get the vectorized and raw representation of the movies data
```python!
%pyspark
from pyspark.sql.types import ArrayType, StringType
def termsIdx2Term(vocabulary):
def termsIdx2Term(termIndices):
return [vocabulary[int(index)] for index in termIndices]
return udf(termsIdx2Term, ArrayType(StringType()))
vectorizerModel = model.stages[2]
vocabList = vectorizerModel.vocabulary
print(vocabList)
rawFeatures = model.transform(filmsdf).select('rawFeatures')
from pyspark.sql.functions import udf
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, DoubleType, IntegerType
indices_udf = udf(lambda vector: vector.indices.tolist(), ArrayType(IntegerType()))
values_udf = udf(lambda vector: vector.toArray().tolist(), ArrayType(DoubleType()))
rawFeatures.withColumn('indices', indices_udf(F.col('rawFeatures'))) \
.withColumn('values', values_udf(F.col('rawFeatures'))) \
.withColumn("Terms", termsIdx2Term(vocabList)("indices")).show()
```
<!--

<center>
<p>
Example on Term Frequency
</p>
</center>
-->
# Linear Regression
We will predict the AudienceScore from `Movie`, `LeadStudio` and `RottenTomatoes` columns using Linear Regression from Spark MLlib.
```python!
%pyspark
# Select some features with the label
dfvec = df.select(["Movie", "LeadStudio", "RottenTomatoes", "AudienceScore"])
# Rename the label column
dfvec = dfvec.withColumnRenamed("AudienceScore","label")
# Separate the categorical features from numerical ones
categoricalCols = ['Movie', 'LeadStudio']
others = [c for c in dfvec.columns if c not in categoricalCols]
# Drop the label from the features list
others.remove("label")
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.functions import col
# Create String indexer to assign index for the string fields where each unique string will get a unique index
# String Indexer is required as an input for One-Hot Encoder
# We set the case as `skip` for any string out of the input strings
indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)).setHandleInvalid("skip") for c in categoricalCols ]
# Encode the strings using One Hot encoding
# default setting: dropLast=True ==> For example with 5 categories, an input value of 2.0 would map to an output vector of [0.0, 0.0, 1.0, 0.0]. The last category is not included by default (configurable via dropLast), because it makes the vector entries sum up to one, and hence linearly dependent. So an input value of 4.0 maps to [0.0, 0.0, 0.0, 0.0].
encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol="{0}_encoded".format(indexer.getOutputCol())) for indexer in indexers ]
# This will concatenate the input cols into a single column.
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] + others, outputCol= "features")
# You can create a pipeline to use only a single fit and transform on the data.
pipeline = Pipeline(stages=indexers + encoders + [assembler])
# Drop null values for any row contain NULL in any column
dfvec = dfvec.na.drop()
# Fit the pipeline ==> This will call the fit functions for all transformers if exist
model=pipeline.fit(dfvec)
# Fit the pipeline ==> This will call the transform functions for all transformers
data = model.transform(dfvec)
# We delete all features and keep only the features and label columns
data = data.select(["features", "label"])
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4
# distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
transformed = featureIndexer.transform(data)
# Display the output Spark DataFrame
transformed.show()
# split the data into 60% training and 40% test (it is not stratified)
(trainingData, testData) = transformed.randomSplit([0.6, 0.4])
# Create Linear Regression Model
lr = LinearRegression()
# Add the last indexer with the model to the pipeline
pipeline = Pipeline(stages=[featureIndexer, lr])
# Fit the data to the pipeline stages
model = pipeline.fit(trainingData)
# Transform the data (Prediction)
predictions = model.transform(testData)
# Display the predictions
predictions.show()
from pyspark.ml.evaluation import RegressionEvaluator
# Evaluate the performance of the model
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
```
<!-- # Math Operations on vec
As we have seen in the previous example, we ended up with a dataframe which has a `features` column containing vectors and a `label` column.
```python!
``` -->
<!-- Fix the others columsn and remove label column -->
# Spark on Colab
Please follow [this link](https://colab.research.google.com/drive/18KMqvX0l2UfZAbMRsmVphiO2gC0blIKY?usp=sharing) for the Colab notebook.
:::warning
You can complete learning classification, clustering, other ML model examples from the [book](https://runawayhorse001.github.io/LearningApacheSpark/index.html).
:::
<!-- # Classification
# Clustering
-->
# References
- [Apache Spark](https://spark.apache.org)
- [Deep Dive into Apache Spark Transformations and Action](https://blog.knoldus.com/deep-dive-into-apache-spark-transformations-and-action/)
- [Apache Spark RDD vs DataFrame vs DataSet](https://data-flair.training/blogs/apache-spark-rdd-vs-dataframe-vs-dataset/)
- [Spark with Python (PySpark) Tutorial For Beginners](https://sparkbyexamples.com/pyspark-tutorial/)
- [Learning PySpark](https://www.amazon.com/Learning-PySpark-Tomasz-Drabas/dp/1786463709)