--- tags: IntroToBigData title: Lab 5 - Apache Spark ML --- # Lab 5 - Apache Spark ML **Course:** Big Data - IU S23 **Author:** Firas Jolha <!-- ## Dataset - ## Readings - --> # Agenda [toc] # Prerequisites - Installed Hortonworks Data Platform (HDP) Sandbox - Installed pip and pandas packages - Added Python interpreter to Zeppelin # Objectives - Run Spark application to a cluster - Learns methods to extract features in spark MLlib - Apply TF-IDF feature extraction for description of movies - Perform some regression tasks in spark MLlib <!-- - Perform some classification tasks in spark MLlib --> <!-- - Perform some clustering tasks in spark MLlib --> <!-- - Perform some graph analytics tasks in Neo4j --> # Run spark application on the cluster In the last lab, we used `master("local[*]")` to run the spark application but the execution was done on the local machine and not on the cluster (where `*` means to use as many CPU cores as exist). Apache Spark supports multiple cluster managers, including standalone spark manager, Yarn, Mesos,...etc. In this lab, we will use Yarn to manage the cluster and will learn how we can track the status of Spark jobs on Spark UI. :::danger **Note:** Before starting running your code on YARN cluster, you need to set the following environment variable in the file `spark-env.sh` as follows: ```powershell! export HADOOP_CONF_DIR=/etc/hadoop/conf ``` Just run the pervious command on the shell of the cluster container. You can find this configuration in Spark2 configs windows in `advanced spark2.env` but you need to restart spark2 after that update. ![](https://i.imgur.com/Jx4VIeS.png) ::: You need to create a Zeppelin note where the default interpreter is `spark2`. To specify the cluster manager as `Yarn`, make sure that `yarn-client` is specified as master in the configuration of the interpreter. In yarn-client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN. :::info Spark supports two modes for running on YARN, “yarn-cluster” mode and “yarn-client” mode. Broadly, yarn-cluster mode makes sense for production jobs, while yarn-client mode makes sense for interactive and debugging uses where you want to see your application’s output immediately. ::: ![](https://i.imgur.com/b1CbiIR.png) <center> <p> Yarn client mode </p> </center> ![](https://i.imgur.com/M3o1dTd.png) <center> <p> Yarn cluster mode </p> </center> :::info In interactive platforms like Zeppelin, we will deploy the application only on client mode. ::: # Apache Spark MLlib In Spark, we have two different MLlib APIs, The old API which is RDD-based (`pyspark.mllib`) and the new API which is DataFrame-based (`pyspark.ml`). As of Spark 2.0, the RDD-based APIs in the `pyspark.mllib` package have entered maintenance mode. The primary Machine Learning API for Spark is now the DataFrame-based API in the `pyspark.ml` package. # Basic data structures in Spark ML Spark MLlib supports local vectors and matrices stored on a single machine, as well as distributed matrices backed by one or more RDDs. Local vectors and local matrices are simple data models that serve as public interfaces. The underlying linear algebra operations are provided by [**Breeze**](http://www.scalanlp.org/). ## Local vector A local vector has integer-typed and 0-based indices and double-typed values, stored on a single machine. MLlib supports two types of local vectors: **dense** and **sparse**. A dense vector is backed by a double array representing its entry values, while a sparse vector is backed by two parallel arrays: indices and values. For example, a vector `(1.0, 0.0, 3.0)` can be represented in dense format as `[1.0, 0.0, 3.0]` or in sparse format as `(3, [0, 2], [1.0, 3.0])`, where `3` is the size of the vector. MLlib recognizes NumPy’s array and Python’s list as dense vectors. You can create sparse vectors using MLlib’s `Vectors.SparseVector`. ```python! %pyspark import numpy as np from pyspark.mllib.linalg import Vectors as Vectors1 from pyspark.ml.linalg import Vectors as Vectors2 # Use a NumPy array as a dense vector. dv1 = np.array([1.0, 0.0, 3.0]) # Use a Python list as a dense vector. dv2 = [1.0, 0.0, 3.0] # Create a SparseVector. sv1 = Vectors1.sparse(3, [0, 2], [1.0, 3.0]) print(sv1) # Create a SparseVector. sv2 = Vectors2.sparse(4, [0, 2], [1.0, 3.0]) sv3 = Vectors2.dense([0, 1.0, 0.0, 3.0]) print(sv2) print(sv2.toArray()) print(sv2.indices) print(sv2.values) print(sv2.norm(1)) # p = 1 print(sv2.norm(2)) # p = 2 print(sv2.norm(3)) # p = 3 print(sv3) ``` ### Math operations ```python! # For sparse vectors we need to convert to dense vectors in order to perform math operations as follows: v = Vectors2.dense(Vectors2.sparse(2, [0,1], [1.0, 2.0]).toArray()) u = Vectors2.dense(Vectors2.sparse(2, [0,1], [3.0, 4.0]).toArray()) # With dense vectors v = Vectors2.dense([1.0, 2.0]) u = Vectors2.dense([3.0, 4.0]) print(v + u) print(2 - v) print(v / 2) print(v * u) print(u / v) print(u % 2) # print(-v) # Error print(0-v) # Correct ``` ![](https://i.imgur.com/t2YxgA9.png) <!-- ![](https://i.imgur.com/h6C7JQ6.png) --> ## Local matrix A local matrix has integer-typed row and column indices and double-typed values, stored on a single machine. For example, the following dense matrix $$ A= \begin{bmatrix} 1.0 & 2.0 \\ 3.0 & 4.0 \\ 5.0 & 6.0 \end{bmatrix} $$ is stored in a one-dimensional array `[1.0, 3.0, 5.0, 2.0, 4.0, 6.0]` with the matrix size `(3, 2)`. ```python! from pyspark.mllib.linalg import Matrices as Matrices1 from pyspark.ml.linalg import Matrices as Matrices2 # Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0)) dm2 = Matrices1.dense(3, 2, [1, 3, 5, 2, 4, 6]) dm3 = Matrices2.dense(3, 2, [1, 3, 5, 2, 4, 6]) # Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0)) sm2 = Matrices1.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8]) sm3 = Matrices2.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8]) ``` <!-- ### Math operations ```python! # For sparse vectors we need to convert to dense vectors in order to perform math operations as follows: v = Matrices2.sparse(2, [0,1], [1.0, 2.0]).toDense() u = Matrices2.sparse(2, [0,1], [3.0, 4.0]).toDense() # With dense vectors v = Matrices2.dense([1.0, 2.0]) u = Matrices2.dense([3.0, 4.0]) print(v + u) print(2 - v) print(v / 2) print(v * u) print(u / v) print(u % 2) # print(-v) # Error print(0-v) # Correct ``` --> ## Distributed matrix A distributed matrix has long-typed row and column indices and double-typed values, stored distributively in one or more RDDs. It is very important to choose the right format to store large and distributed matrices. Converting a distributed matrix to a different format may require a global shuffle, which is quite expensive. We recommend to use local vectors and store them in distributed dataframes. Feel free to use distributed matrices if needed but avoid using local matrices which operate only on a single machine. # Feature Extraction For most ML pipelines, feature extraction is one of the first stages to do. The need to extract features from the data has multiple advantages. One of them is to reduce the dimensionality of the data, and also to increase the performance of ML models. ## Example on TF-IDF Open a Zeppelin note and set `spark2` as the default interpreter. For each cell, you need to use `%pyspark` to write Python cells. 1. Import packages ```python! %pyspark import pandas as pd import numpy as np from pyspark.sql import SparkSession ``` 2. Create session and get SparkContext ```python! %pyspark spark = SparkSession \ .builder \ .appName("my spark app") \ .getOrCreate() sc = spark.sparkContex ``` 3. Upload [`HollywoodMovies.csv`](https://raw.githubusercontent.com/reisanar/datasets/master/HollywoodMovies.csv) to the local file system. 4. Read from HDFS ```python! %pyspark path = "/data/HollywoodMovies.csv" # /data is folder where the file exists df = spark.read.load(path, format="csv", sep = ",", inferSchema = "true", header = "true") df.show() ``` 5. Select `movies` column ```python! %pyspark filmsdf = df.select("Movie") filmsdf.show(truncate=False) ``` 6. Create the pipeline ```python! from pyspark.ml import Pipeline from pyspark.ml.feature import CountVectorizer, HashingTF, IDF, Tokenizer, StopWordsRemover %pyspark tokenizer = Tokenizer(inputCol="Movie", outputCol="words") swremoval = StopWordsRemover(inputCol="words", outputCol="stremoved") vectorizer = CountVectorizer(inputCol="stremoved", outputCol="rawFeatures") idf = IDF(inputCol="rawFeatures", outputCol="features") pipeline = Pipeline(stages = [tokenizer, swremoval, vectorizer, idf]) model = pipeline.fit(filmsdf) model.save("hdfs:///models/tfidfmodel") model ``` 7. Transform the data ```python! %pyspark films = model.transform(filmsdf) # Show first ten transformed features of films films.show(10) # Show first ten values of column `words` films.select('words').show(10, truncate = False) # Show first ten values of column `stremoved` films.select('stremoved').show(10, truncate = False) # Show first ten values of column `rawFeatures` films.select('rawFeatures').show(10, truncate = False) # Get the real values of the Spark DataFrame films.select('rawFeatures').rdd.flatMap(lambda x : x).map(lambda x : x.toArray()).collect() ``` 8. Get the total counts for each column ```python! %pyspark # Getting the total count for each term total_counts = model.transform(filmsdf) \ .select('rawFeatures').rdd \ .map(lambda row: row['rawFeatures'].toArray()) \ .reduce(lambda x,y: [x[i]+y[i] for i in range(len(y))]) total_counts ``` 9. Get the vocabulary along with count ```python! %pyspark # Get the vocabulary along with the counts vocabList = model.stages[2].vocabulary d = {'vocabList':vocabList,'counts':total_counts} spark.createDataFrame(np.array(list(d.values())).T.tolist(),list(d.keys())).show() ``` 10. Get the vectorized and raw representation of the movies data ```python! %pyspark from pyspark.sql.types import ArrayType, StringType def termsIdx2Term(vocabulary): def termsIdx2Term(termIndices): return [vocabulary[int(index)] for index in termIndices] return udf(termsIdx2Term, ArrayType(StringType())) vectorizerModel = model.stages[2] vocabList = vectorizerModel.vocabulary print(vocabList) rawFeatures = model.transform(filmsdf).select('rawFeatures') from pyspark.sql.functions import udf import pyspark.sql.functions as F from pyspark.sql.types import StringType, DoubleType, IntegerType indices_udf = udf(lambda vector: vector.indices.tolist(), ArrayType(IntegerType())) values_udf = udf(lambda vector: vector.toArray().tolist(), ArrayType(DoubleType())) rawFeatures.withColumn('indices', indices_udf(F.col('rawFeatures'))) \ .withColumn('values', values_udf(F.col('rawFeatures'))) \ .withColumn("Terms", termsIdx2Term(vocabList)("indices")).show() ``` <!-- ![](https://i.imgur.com/l5qh1Rb.png) <center> <p> Example on Term Frequency </p> </center> --> # Linear Regression We will predict the AudienceScore from `Movie`, `LeadStudio` and `RottenTomatoes` columns using Linear Regression from Spark MLlib. ```python! %pyspark # Select some features with the label dfvec = df.select(["Movie", "LeadStudio", "RottenTomatoes", "AudienceScore"]) # Rename the label column dfvec = dfvec.withColumnRenamed("AudienceScore","label") # Separate the categorical features from numerical ones categoricalCols = ['Movie', 'LeadStudio'] others = [c for c in dfvec.columns if c not in categoricalCols] # Drop the label from the features list others.remove("label") from pyspark.ml import Pipeline from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler from pyspark.sql.functions import col # Create String indexer to assign index for the string fields where each unique string will get a unique index # String Indexer is required as an input for One-Hot Encoder # We set the case as `skip` for any string out of the input strings indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)).setHandleInvalid("skip") for c in categoricalCols ] # Encode the strings using One Hot encoding # default setting: dropLast=True ==> For example with 5 categories, an input value of 2.0 would map to an output vector of [0.0, 0.0, 1.0, 0.0]. The last category is not included by default (configurable via dropLast), because it makes the vector entries sum up to one, and hence linearly dependent. So an input value of 4.0 maps to [0.0, 0.0, 0.0, 0.0]. encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol="{0}_encoded".format(indexer.getOutputCol())) for indexer in indexers ] # This will concatenate the input cols into a single column. assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] + others, outputCol= "features") # You can create a pipeline to use only a single fit and transform on the data. pipeline = Pipeline(stages=indexers + encoders + [assembler]) # Drop null values for any row contain NULL in any column dfvec = dfvec.na.drop() # Fit the pipeline ==> This will call the fit functions for all transformers if exist model=pipeline.fit(dfvec) # Fit the pipeline ==> This will call the transform functions for all transformers data = model.transform(dfvec) # We delete all features and keep only the features and label columns data = data.select(["features", "label"]) from pyspark.ml.regression import LinearRegression from pyspark.ml.feature import VectorIndexer from pyspark.ml.evaluation import RegressionEvaluator # Automatically identify categorical features, and index them. # We specify maxCategories so features with > 4 # distinct values are treated as continuous. featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data) transformed = featureIndexer.transform(data) # Display the output Spark DataFrame transformed.show() # split the data into 60% training and 40% test (it is not stratified) (trainingData, testData) = transformed.randomSplit([0.6, 0.4]) # Create Linear Regression Model lr = LinearRegression() # Add the last indexer with the model to the pipeline pipeline = Pipeline(stages=[featureIndexer, lr]) # Fit the data to the pipeline stages model = pipeline.fit(trainingData) # Transform the data (Prediction) predictions = model.transform(testData) # Display the predictions predictions.show() from pyspark.ml.evaluation import RegressionEvaluator # Evaluate the performance of the model evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse") rmse = evaluator.evaluate(predictions) print("Root Mean Squared Error (RMSE) on test data = %g" % rmse) ``` <!-- # Math Operations on vec As we have seen in the previous example, we ended up with a dataframe which has a `features` column containing vectors and a `label` column. ```python! ``` --> <!-- Fix the others columsn and remove label column --> # Spark on Colab Please follow [this link](https://colab.research.google.com/drive/18KMqvX0l2UfZAbMRsmVphiO2gC0blIKY?usp=sharing) for the Colab notebook. :::warning You can complete learning classification, clustering, other ML model examples from the [book](https://runawayhorse001.github.io/LearningApacheSpark/index.html). ::: <!-- # Classification # Clustering --> # References - [Apache Spark](https://spark.apache.org) - [Deep Dive into Apache Spark Transformations and Action](https://blog.knoldus.com/deep-dive-into-apache-spark-transformations-and-action/) - [Apache Spark RDD vs DataFrame vs DataSet](https://data-flair.training/blogs/apache-spark-rdd-vs-dataframe-vs-dataset/) - [Spark with Python (PySpark) Tutorial For Beginners](https://sparkbyexamples.com/pyspark-tutorial/) - [Learning PySpark](https://www.amazon.com/Learning-PySpark-Tomasz-Drabas/dp/1786463709)