Try   HackMD

Stage III - Predictive Data Analytics

Course: Big Data - IU S23
Author: Firas Jolha

Dataset

Agenda

Prerequisites

  • HDP 2.6.5 is installed
  • Stage I is done
    • The relational database is built.
    • The database is imported to HDFS via Sqoop.
    • The tables are stored in HDFS as Avro data files and compressed in Snappy.
    • The schema of AVRO files are stored in HDFS.
  • Stage II is done
    • The Hive table are created.
    • You performed EDA on the data stored in HDFS.
    • You saved the output of each query in csv file for presentation stage.

Objectives

  • Build Spark ML model
  • Train and evaluate the model
  • Hyperparameter tuning and Grid search

Description

In this stage, we will build an ML model for the dataset that we have and perform hyperparameter tuning via GridSearch.

Dataset Description

The dataset is about the departments and employees in a company as well as their salary categories. It consists of two .csv files.

The file emps.csv contains information about employees:

  • EMPNO is a unique employee number; it is the primary key of the employee table.
  • ENAME stores the employee's name.
  • The JOB attribute stores the name of the job the employee does.
  • The MGR attribute contains the employee number of the employee who manages that employee. If the employee has no manager, then the MGR column for that employee is left set to null.
  • The HIREDATE column stores the date on which the employee joined the company.
  • The SAL column contains the details of employee salaries.
  • The COMM attribute stores values of commission paid to employees. Not all employees receive commission, in which case the COMM field is set to null.
  • The DEPTNO column stores the department number of the department in which each employee is based. This data item acts a foreign key, linking the employee details stored in the EMP table with the details of departments in which employees work, which are stored in the DEPT table.

The file depts.csv contains information about departments:

  • DEPTNO: The primary key containing the department numbers used to identify each department.
  • DNAME: The name of each department.
  • LOC: The location where each department is based.

I created these csv files from the tables provided in the link.

Preparation

Before starting with Spark ML, make sure that you built Hive tables and tested them via EDA in the previous stage.

Modeling in Spark ML

In this part of the project, we will build an ML model. Here I will explain two modes for performing analysis in HDP. The first one is used for deployment and the second one is used for development. We suggest to use both of them and perform the analysis in an interactive Zeppelin note then run the code via spark-submit after changing some configurations in Spark Session.

Performing PDA should include:
- building the model.
- Tuning the model parameters.
- Performing predictions.

I recommend using Hive tables created in Hive via partitioning and/or bucketing and/or none of both since there is a common issue in reading the schema of tables generated by Sqoop but the Hive tables whose schema is generated by Hive would not return issues I hope.

In case you encounter similar issues, please contact your TA.

1. Non-interactive analysis via spark-submit

You can save the code in a file model.py and run it on Spark using spark-submit tool. You should add some jars to properly import the HIVE tables using Spark SQL. You should write the code in model.py file and run it via spark-submit tool as follows:

spark-submit --jars /usr/hdp/current/hive-client/lib/hive-metastore-1.2.1000.2.6.5.0-292.jar,/usr/hdp/current/hive-client/lib/hive-exec-1.2.1000.2.6.5.0-292.jar --packages org.apache.spark:spark-avro_2.12:3.0.3 scripts/model.py 
  1. Connect to Hive.
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .appName("BDT Project")\
        .config("spark.sql.catalogImplementation","hive")\
        .config("hive.metastore.uris", "thrift://sandbox-hdp.hortonworks.com:9083")\
        .config("spark.sql.avro.compression.codec", "snappy")\
        .enableHiveSupport()\
        .getOrCreate()
  1. List all databases
print(spark.catalog.listDatabases())
  1. List all tables
print(spark.catalog.listTables("projectdb"))
  1. Read Hive table
emps = spark.read.format("avro").table('projectdb.employees_part')
emps.createOrReplaceTempView('employees')

depts = spark.read.format("avro").table('projectdb.departments_buck')
depts.createOrReplaceTempView('departments')

Now we can use depts and emps as an input dataframe for our model.

  1. Run some queries
emps.printSchema()
depts.printSchema()

spark.sql("SELECT * FROM employees WHERE deptno=10").show()

spark.sql("SELECT * FROM departments").show()

Possible issues with Hive will be discussed with students and added later to this document.

2. Interactive analysis via Zeppelin

  1. Create a new note and set python2 as the default interpter.
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline


spark = SparkSession.builder\
    .appName("BDT Project")\
    .master("local[*]")\
    .config("hive.metastore.uris", "thrift://sandbox-hdp.hortonworks.com:9083")\
    .config("spark.sql.catalogImplementation","hive")\
    .config("spark.sql.avro.compression.codec", "snappy")\
    .config("spark.jars", "file:///usr/hdp/current/hive-client/lib/hive-metastore-1.2.1000.2.6.5.0-292.jar,file:///usr/hdp/current/hive-client/lib/hive-exec-1.2.1000.2.6.5.0-292.jar")\
    .config("spark.jars.packages","org.apache.spark:spark-avro_2.12:3.0.3")\
    .enableHiveSupport()\
    .getOrCreate()


sc = spark.sparkContext

print(sc)

I do not recommend running the application in the cluster using spark2 interpreter since it leads to Hive exceptions.

  1. List all databases
print(spark.catalog.listDatabases())
  1. List all tables
print(spark.catalog.listTables("projectdb"))
  1. Read Hive table
emps = spark.read.format("avro").table('projectdb.employees_part')
emps.createOrReplaceTempView('employees')

depts = spark.read.format("avro").table('projectdb.departments_buck')
emps.createOrReplaceTempView('departments')

Now we can use depts and emps as an input dataframe for our model.

  1. Run some queries
emps.printSchema()
depts.printSchema()

spark.sql("SELECT * FROM employees WHERE deptno=10").show()

spark.sql("SELECT * FROM departments").show()

spark.sql("SELECT AVG(SAL) FROM employees;").show()
spark.sql("SELECT * from employees where comm is NULL;").show()

You can perform EDA here via Spark SQL too but it is optional.

3. ML Model

Here I will show a simple example to predict the salaries of the employees via linear regression.

1. Preprocessing the data

A. Selecting the features


# If you have date type then Sqoop probably will convert it to timestamp and you will get the date in unix_time format.
# You can encode it as numerical feature (not recommended) and you can also sin_cos_transformation. For simplicty, we will discard it.

# We will use the following features
# Excluded 'comm' because it has a lot of nulls
features = ['empno', 'ename', 'job', 'mgr', 'deptno']

# The output/target of our model
label = 'sal'


# Remove the quotes before and after each string in job and ename columns.
emps = emps.withColumn("job", F.translate("job","'",""))
emps.show()
emps = emps.withColumn("ename", F.translate("ename","'",""))
emps.show()


# I am thinking of generating a new column out of ename and job.
# The column will have ename and job concatenated with '_' 
# Then we use word2Vec to encode it
emps = emps.select(features + [label]).na.drop()
emps = emps.withColumn("ename_job", F.concat(F.col('ename'), F.lit("_"), F.col('job')))
emps = emps.withColumnRenamed("sal","label")

emps.show()

B. Building the Pipeline


from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, Word2Vec, Tokenizer, RegexTokenizer
from pyspark.sql.functions import col

categoricalCols = ['deptno']
textCols = ['ename_job']
others = ['empno', 'mgr']


# Since the tokenizer only return tokens separated by white spaces, I used RegexTokenizer to tokenize by '_'
# Then created word2Vec model

# tokenizer = Tokenizer(inputCol="ename", outputCol="ename_tokens")
# emps_tok = tokenizer.transform(emps)
tokenizer = RegexTokenizer(inputCol=textCols[0], outputCol="ename_job_tokens", pattern="_")
# emps_tok = tokenizer.transform(emps)
# emps_tok.show()

word2Vec = Word2Vec(vectorSize=5, seed=42, minCount=1, inputCol="ename_job_tokens", outputCol="ename_enc")
# word2VecModel = word2Vec.fit(emps_tok)
# print(word2VecModel)

# emps_tok = word2VecModel.transform(emps_tok)
# emps_tok.show()

# Adding the encoded ename_job to the list of other columns
others += [ename_enc]


# Create String indexer to assign index for the string fields where each unique string will get a unique index
# String Indexer is required as an input for One-Hot Encoder 
# We set the case as `skip` for any string out of the input strings
indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)).setHandleInvalid("skip") for c in categoricalCols ]

# Encode the strings using One Hot encoding
# default setting: dropLast=True ==> For example with 5 categories, an input value of 2.0 would map to an output vector of [0.0, 0.0, 1.0, 0.0]. The last category is not included by default (configurable via dropLast), because it makes the vector entries sum up to one, and hence linearly dependent. So an input value of 4.0 maps to [0.0, 0.0, 0.0, 0.0].
encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol="{0}_encoded".format(indexer.getOutputCol())) for indexer in indexers ]

# This will concatenate the input cols into a single column.
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] + others, outputCol= "features")

# You can create a pipeline to use only a single fit and transform on the data.
pipeline = Pipeline(stages=[tokenizer, word2Vec] + indexers + encoders + [assembler])


# Fit the pipeline ==> This will call the fit functions for all transformers if exist
model=pipeline.fit(emps)
# Fit the pipeline ==> This will call the transform functions for all transformers
data = model.transform(emps)

data.show()

# We delete all features and keep only the features and label columns
data = data.select(["features", "label"])


from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4
# distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
transformed = featureIndexer.transform(data)

# Display the output Spark DataFrame
transformed.show()

2. Modeling

#  split the data into 60% training and 40% test (it is not stratified)
(trainingData, testData) = transformed.randomSplit([0.6, 0.4])

# Create Linear Regression Model
lr = LinearRegression()

# Add the last indexer with the model to the pipeline
pipeline = Pipeline(stages=[featureIndexer, lr])

# Fit the data to the pipeline stages
model = pipeline.fit(trainingData)

3. Prediction

# Transform the data (Prediction)
predictions = model.transform(testData)

# Display the predictions
predictions.show()

4. Evaluation

from pyspark.ml.evaluation import RegressionEvaluator 

# Evaluate the performance of the model
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

5. Saving the results (Spark DataFrame as csv file)

You can export the results of prediction to csv file as follows:

predictions.coalesce(1)\
    .select("prediction",'label')\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header","true")\
    .csv("/project/output/lr_predictions.csv")

The path by default in Zeppelin refers to HDFS, and the file will be stored in HDFS as shown below, but you can move it to local file system and put it in output of the project repository using hdfs dfs commands.

We used coalesce function to repartition the dataframe and get only one csv file, otherwise you could get multiple files due to the multiple partitions of the dataframe.

For other options for mode, check the documentation.

For the project, you need to complete PDA according to the criteria in project description.

Upcoming lab

  • Build a simple Dashboard for presentation stage.
  • Presenting the project results and data insights in the dashboard.

References