---
tags: BigData,IntroToBigData
title: Stage III - Predictive Data Analytics
---
# Stage III - Predictive Data Analytics
**Course:** Big Data - IU S23
**Author:** Firas Jolha
# Dataset
- [Some emps and depts](http://www.cems.uwe.ac.uk/~pchatter/resources/html/emp_dept_data+schema.html)
# Agenda
[toc]
# Prerequisites
- HDP 2.6.5 is installed
- Stage I is done
- The relational database is built.
- The database is imported to HDFS via Sqoop.
- The tables are stored in HDFS as Avro data files and compressed in Snappy.
- The schema of AVRO files are stored in HDFS.
- Stage II is done
- The Hive table are created.
- You performed EDA on the data stored in HDFS.
- You saved the output of each query in csv file for presentation stage.
# Objectives
- Build Spark ML model
- Train and evaluate the model
- Hyperparameter tuning and Grid search
# Description
In this stage, we will build an ML model for the dataset that we have and perform hyperparameter tuning via GridSearch.
# Dataset Description
The dataset is about the departments and employees in a company as well as their salary categories. It consists of two `.csv` files.
:::spoiler
The file `emps.csv` contains information about employees:
- EMPNO is a unique employee number; it is the primary key of the employee table.
- ENAME stores the employee's name.
- The JOB attribute stores the name of the job the employee does.
- The MGR attribute contains the employee number of the employee who manages that employee. If the employee has no manager, then the MGR column for that employee is left set to null.
- The HIREDATE column stores the date on which the employee joined the company.
- The SAL column contains the details of employee salaries.
- The COMM attribute stores values of commission paid to employees. Not all employees receive commission, in which case the COMM field is set to null.
- The DEPTNO column stores the department number of the department in which each employee is based. This data item acts a foreign key, linking the employee details stored in the EMP table with the details of departments in which employees work, which are stored in the DEPT table.
The file `depts.csv` contains information about departments:
- DEPTNO: The primary key containing the department numbers used to identify each department.
- DNAME: The name of each department.
- LOC: The location where each department is based.
<!-- The file `salgrade.csv` contains information about salary categories:
- GRADE: A numeric identifier for the category of the salary.
- LOSAL: The lowest salary in this category.
- HISAL: The highest salary in this category.
-->
:::info
I created these `csv` files from the tables provided in the link.
:::
# Preparation
Before starting with Spark ML, make sure that you built Hive tables and tested them via EDA in the previous stage.
# Modeling in Spark ML
In this part of the project, we will build an ML model. Here I will explain two modes for performing analysis in HDP. The first one is used for deployment and the second one is used for development. We suggest to use both of them and perform the analysis in an interactive Zeppelin note then run the code via `spark-submit` after changing some configurations in Spark Session.
Performing PDA should include:
- building the model.
- Tuning the model parameters.
- Performing predictions.
:::danger
I recommend using Hive tables created in Hive via partitioning and/or bucketing and/or none of both since there is a common issue in reading the schema of tables generated by Sqoop but the Hive tables whose schema is generated by Hive would not return issues I hope.
In case you encounter similar issues, please contact your TA.
:::
## 1. Non-interactive analysis via `spark-submit`
You can save the code in a file `model.py` and run it on Spark using `spark-submit` tool. You should add some jars to properly import the HIVE tables using Spark SQL. You should write the code in `model.py` file and run it via `spark-submit` tool as follows:
```powershell!
spark-submit --jars /usr/hdp/current/hive-client/lib/hive-metastore-1.2.1000.2.6.5.0-292.jar,/usr/hdp/current/hive-client/lib/hive-exec-1.2.1000.2.6.5.0-292.jar --packages org.apache.spark:spark-avro_2.12:3.0.3 scripts/model.py
```
<!-- You can also work on Zeppelin note. You can run your app on YARN cluster by using the configuration `--master=yarn`. -->
<!-- ### Read the data from Hive -->
1. Connect to Hive.
```python!
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.appName("BDT Project")\
.config("spark.sql.catalogImplementation","hive")\
.config("hive.metastore.uris", "thrift://sandbox-hdp.hortonworks.com:9083")\
.config("spark.sql.avro.compression.codec", "snappy")\
.enableHiveSupport()\
.getOrCreate()
```
<!-- from pyspark.sql import SparkSession
spark = SparkSession.builder\
.appName("BDT Project")\
.config("spark.sql.warehouse.dir","hdfs://sandbox-hdp.hortonworks.com:8020/apps/hive/warehouse")\
.config("spark.sql.catalogImplementation","hive")\
.config("hive.metastore.uris", "thrift://sandbox-hdp.hortonworks.com:9083")\
.config("spark.sql.avro.compression.codec", "snappy")\
.config("spark.jars", "/usr/hdp/current/hive-client/lib/hive-metastore-1.2.1000.2.6.5.0-292.jar,/usr/hdp/current/hive-client/lib/hive-exec-1.2.1000.2.6.5.0-292.jar")\
.config("spark.jars.packages", "org.apache.spark:spark-avro_2.11:2.4.4")\
.enableHiveSupport()\
.getOrCreate() -->
2. List all databases
```python!
print(spark.catalog.listDatabases())
```
3. List all tables
```python!
print(spark.catalog.listTables("projectdb"))
```
4. Read Hive table
```python!
emps = spark.read.format("avro").table('projectdb.employees_part')
emps.createOrReplaceTempView('employees')
depts = spark.read.format("avro").table('projectdb.departments_buck')
depts.createOrReplaceTempView('departments')
```
Now we can use `depts` and `emps` as an input dataframe for our model.
5. Run some queries
```python!
emps.printSchema()
depts.printSchema()
spark.sql("SELECT * FROM employees WHERE deptno=10").show()
spark.sql("SELECT * FROM departments").show()
```
:::danger
Possible issues with Hive will be discussed with students and added later to this document.
:::
## 2. Interactive analysis via Zeppelin
1. Create a new note and set `python2` as the default interpter.
```python!
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
spark = SparkSession.builder\
.appName("BDT Project")\
.master("local[*]")\
.config("hive.metastore.uris", "thrift://sandbox-hdp.hortonworks.com:9083")\
.config("spark.sql.catalogImplementation","hive")\
.config("spark.sql.avro.compression.codec", "snappy")\
.config("spark.jars", "file:///usr/hdp/current/hive-client/lib/hive-metastore-1.2.1000.2.6.5.0-292.jar,file:///usr/hdp/current/hive-client/lib/hive-exec-1.2.1000.2.6.5.0-292.jar")\
.config("spark.jars.packages","org.apache.spark:spark-avro_2.12:3.0.3")\
.enableHiveSupport()\
.getOrCreate()
sc = spark.sparkContext
print(sc)
```
:::warning
I do not recommend running the application in the cluster using `spark2` interpreter since it leads to Hive exceptions.
:::
2. List all databases
```python!
print(spark.catalog.listDatabases())
```
3. List all tables
```python!
print(spark.catalog.listTables("projectdb"))
```
4. Read Hive table
```python!
emps = spark.read.format("avro").table('projectdb.employees_part')
emps.createOrReplaceTempView('employees')
depts = spark.read.format("avro").table('projectdb.departments_buck')
emps.createOrReplaceTempView('departments')
```
Now we can use `depts` and `emps` as an input dataframe for our model.
5. Run some queries
```python!
emps.printSchema()
depts.printSchema()
spark.sql("SELECT * FROM employees WHERE deptno=10").show()
spark.sql("SELECT * FROM departments").show()
spark.sql("SELECT AVG(SAL) FROM employees;").show()
spark.sql("SELECT * from employees where comm is NULL;").show()
```
:::warning
You can perform EDA here via Spark SQL too but it is optional.
:::
## 3. ML Model
Here I will show a simple example to predict the salaries of the employees via linear regression.
### 1. Preprocessing the data
#### A. Selecting the features
```python!
# If you have date type then Sqoop probably will convert it to timestamp and you will get the date in unix_time format.
# You can encode it as numerical feature (not recommended) and you can also sin_cos_transformation. For simplicty, we will discard it.
# We will use the following features
# Excluded 'comm' because it has a lot of nulls
features = ['empno', 'ename', 'job', 'mgr', 'deptno']
# The output/target of our model
label = 'sal'
# Remove the quotes before and after each string in job and ename columns.
emps = emps.withColumn("job", F.translate("job","'",""))
emps.show()
emps = emps.withColumn("ename", F.translate("ename","'",""))
emps.show()
# I am thinking of generating a new column out of ename and job.
# The column will have ename and job concatenated with '_'
# Then we use word2Vec to encode it
emps = emps.select(features + [label]).na.drop()
emps = emps.withColumn("ename_job", F.concat(F.col('ename'), F.lit("_"), F.col('job')))
emps = emps.withColumnRenamed("sal","label")
emps.show()
```
#### B. Building the Pipeline
```python!
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, Word2Vec, Tokenizer, RegexTokenizer
from pyspark.sql.functions import col
categoricalCols = ['deptno']
textCols = ['ename_job']
others = ['empno', 'mgr']
# Since the tokenizer only return tokens separated by white spaces, I used RegexTokenizer to tokenize by '_'
# Then created word2Vec model
# tokenizer = Tokenizer(inputCol="ename", outputCol="ename_tokens")
# emps_tok = tokenizer.transform(emps)
tokenizer = RegexTokenizer(inputCol=textCols[0], outputCol="ename_job_tokens", pattern="_")
# emps_tok = tokenizer.transform(emps)
# emps_tok.show()
word2Vec = Word2Vec(vectorSize=5, seed=42, minCount=1, inputCol="ename_job_tokens", outputCol="ename_enc")
# word2VecModel = word2Vec.fit(emps_tok)
# print(word2VecModel)
# emps_tok = word2VecModel.transform(emps_tok)
# emps_tok.show()
# Adding the encoded ename_job to the list of other columns
others += [ename_enc]
# Create String indexer to assign index for the string fields where each unique string will get a unique index
# String Indexer is required as an input for One-Hot Encoder
# We set the case as `skip` for any string out of the input strings
indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)).setHandleInvalid("skip") for c in categoricalCols ]
# Encode the strings using One Hot encoding
# default setting: dropLast=True ==> For example with 5 categories, an input value of 2.0 would map to an output vector of [0.0, 0.0, 1.0, 0.0]. The last category is not included by default (configurable via dropLast), because it makes the vector entries sum up to one, and hence linearly dependent. So an input value of 4.0 maps to [0.0, 0.0, 0.0, 0.0].
encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol="{0}_encoded".format(indexer.getOutputCol())) for indexer in indexers ]
# This will concatenate the input cols into a single column.
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] + others, outputCol= "features")
# You can create a pipeline to use only a single fit and transform on the data.
pipeline = Pipeline(stages=[tokenizer, word2Vec] + indexers + encoders + [assembler])
# Fit the pipeline ==> This will call the fit functions for all transformers if exist
model=pipeline.fit(emps)
# Fit the pipeline ==> This will call the transform functions for all transformers
data = model.transform(emps)
data.show()
# We delete all features and keep only the features and label columns
data = data.select(["features", "label"])
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4
# distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
transformed = featureIndexer.transform(data)
# Display the output Spark DataFrame
transformed.show()
```
### 2. Modeling
```python!
# split the data into 60% training and 40% test (it is not stratified)
(trainingData, testData) = transformed.randomSplit([0.6, 0.4])
# Create Linear Regression Model
lr = LinearRegression()
# Add the last indexer with the model to the pipeline
pipeline = Pipeline(stages=[featureIndexer, lr])
# Fit the data to the pipeline stages
model = pipeline.fit(trainingData)
```
### 3. Prediction
```python!
# Transform the data (Prediction)
predictions = model.transform(testData)
# Display the predictions
predictions.show()
```
### 4. Evaluation
```python!
from pyspark.ml.evaluation import RegressionEvaluator
# Evaluate the performance of the model
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
```
### 5. Saving the results (Spark DataFrame as csv file)
You can export the results of prediction to `csv` file as follows:
```python!
predictions.coalesce(1)\
.select("prediction",'label')\
.write\
.mode("overwrite")\
.format("csv")\
.option("sep", ",")\
.option("header","true")\
.csv("/project/output/lr_predictions.csv")
```
The path by default in Zeppelin refers to HDFS, and the file will be stored in HDFS as shown below, but you can move it to local file system and put it in `output` of the project repository using `hdfs dfs` commands.

:::info
We used `coalesce` function to repartition the dataframe and get only one csv file, otherwise you could get multiple files due to the multiple partitions of the dataframe.
For other options for `mode`, check the documentation.

:::
:::danger
For the project, you need to complete PDA according to the criteria in project description.
:::
# Upcoming lab
- Build a simple Dashboard for presentation stage.
- Presenting the project results and data insights in the dashboard.
<!--
:::info
If the HDF write procedure to local file system or HDFS is slow, you can check the following optimizations.

:::
-->
<!--
## How to enable Hive LLAP
1. Enable pre-emption from Yarn configs. Save the change and click OK then Proceed even if you some alerts.
<center>
<img src="https://i.imgur.com/Rmui0O0.png" width="200" />
</center>
2. Restart Yarn (Restart all affected is enough).
3. Go to Hive configs and enable interactive query option.
<center>
<img src="https://i.imgur.com/ltnxznd.png" width="200" />
</center>
It will show you a window to select the HiveServer2. Just click select since you have only one host in the cluster.
<center>
<img src="https://i.imgur.com/z1TiabT.png" width="600" />
</center>
4. Keep the default setting of Hive LLAP and save the change.
<center>
<img src="https://i.imgur.com/QriLDOD.png" width="200" />
</center>
5. Restart all Hive components and check the Hive interactive UI as shown below.

-->
<!-- :::warning
Do not forget to clear the folder `/project` if exists before importing the data.
::: -->
# References
- [emps-depts-dataset](https://www.cs.uct.ac.za/mit_notes/database/htmls/chp03.html)