--- tags: BigData title: Lab 05 - Apache Spark - Part 2 --- # Lab 05 : Apache Spark - Part 2 **Course:** Big Data - IU S23 **Author:** Firas Jolha <!-- ## Dataset - ## Readings - --> # Agenda [toc] # Prerequisites - Installed Hortonworks Data Platform (HDP) Sandbox - Installed pip and pandas packages - Added Python interpreter to Zeppelin # Objectives - Run Spark application to a cluster - Learns methods to extract features in spark MLlib - Apply TF-IDF feature extraction for description of movies - Perform some regression tasks in spark MLlib <!-- - Perform some classification tasks in spark MLlib --> <!-- - Perform some clustering tasks in spark MLlib --> <!-- - Perform some graph analytics tasks in Neo4j --> # Run spark application on the cluster In the last lab, we used `master("local[*]")` to run the spark application but the execution was done on the local machine and not on the cluster (where `*` means to use as many CPU cores as exist). Apache Spark supports multiple cluster managers, including standalone spark manager, Yarn, Mesos,...etc. In this lab, we will use Yarn to manage the cluster and will learn how we can track the status of Spark jobs on Spark UI. You need to create a Zeppelin note where the default interpreter is `spark2`. To specify the cluster manager as `Yarn`, make sure that `yarn-client` is specified as master in the configuration of the interpreter. In yarn-client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN. :::info Spark supports two modes for running on YARN, “yarn-cluster” mode and “yarn-client” mode. Broadly, yarn-cluster mode makes sense for production jobs, while yarn-client mode makes sense for interactive and debugging uses where you want to see your application’s output immediately. ::: ![](https://i.imgur.com/b1CbiIR.png) <center> <p> Yarn client mode </p> </center> ![](https://i.imgur.com/M3o1dTd.png) <center> <p> Yarn cluster mode </p> </center> :::info In interactive platforms like Zeppelin, we will deploy the application only on client mode. ::: # Feature Extraction For most ML pipelines, feature extraction is one of the first stages to do. The need to extract features from the data has multiple advantages. One of them is to reduce the dimensionality of the data, and also to increase the performance of ML models. ## Example on TF-IDF Open a Zeppelin note and set `spark2` as the default interpreter. For each cell, you need to use `%pyspark` to write Python cells. 1. Import packages ```python! %pyspark import pandas as pd import numpy as np from pyspark.sql import SparkSession ``` 2. Create session and get SparkContext ```python! %pyspark spark = SparkSession \ .builder \ .appName("my spark app") \ .getOrCreate() sc = spark.sparkContex ``` 3. Upload [`HollywoodMovies.csv`](https://raw.githubusercontent.com/reisanar/datasets/master/HollywoodMovies.csv) to HDFS. 4. Read from HDFS ```python! %pyspark path = "hdfs:///data/HollywoodMovies.csv" # /data is folder where the file exists df = spark.read.load(path, format="csv", sep = ",", inferSchema = "true", header = "true") df.show() ``` 5. Select `movies` column ```python! %pyspark filmsdf = df.select("Movie") filmsdf.show(truncate=False) ``` 6. Create the pipeline ```python! from pyspark.ml import Pipeline from pyspark.ml.feature import CountVectorizer, HashingTF, IDF, Tokenizer, StopWordsRemover %pyspark tokenizer = Tokenizer(inputCol="Movie", outputCol="words") swremoval = StopWordsRemover(inputCol="words", outputCol="stremoved") vectorizer = CountVectorizer(inputCol="stremoved", outputCol="rawFeatures") idf = IDF(inputCol="rawFeatures", outputCol="features") pipeline = Pipeline(stages = [tokenizer, swremoval, vectorizer, idf]) model = pipeline.fit(filmsdf) model.save("hdfs:///models/tfidfmodel") model ``` 7. Transform the data ```python! %pyspark films = model.transform(filmsdf) # Show first ten transformed features of films films.show(10) # Show first ten values of column `words` films.select('words').show(10, truncate = False) # Show first ten values of column `stremoved` films.select('stremoved').show(10, truncate = False) # Show first ten values of column `rawFeatures` films.select('rawFeatures').show(10, truncate = False) # Get the real values of the Spark DataFrame films.select('rawFeatures').rdd.flatMap(lambda x : x).map(lambda x : x.toArray()).collect() ``` 8. Get the total counts for each column ```python! %pyspark # Getting the total count for each term total_counts = model.transform(filmsdf) \ .select('rawFeatures').rdd \ .map(lambda row: row['rawFeatures'].toArray()) \ .reduce(lambda x,y: [x[i]+y[i] for i in range(len(y))]) total_counts ``` 9. Get the vocabulary along with count ```python! %pyspark # Get the vocabulary along with the counts vocabList = model.stages[2].vocabulary d = {'vocabList':vocabList,'counts':total_counts} spark.createDataFrame(np.array(list(d.values())).T.tolist(),list(d.keys())).show() ``` 10. Get the vectorized and raw representation of the movies data ```python! %pyspark from pyspark.sql.types import ArrayType, StringType def termsIdx2Term(vocabulary): def termsIdx2Term(termIndices): return [vocabulary[int(index)] for index in termIndices] return udf(termsIdx2Term, ArrayType(StringType())) vectorizerModel = model.stages[2] vocabList = vectorizerModel.vocabulary print(vocabList) rawFeatures = model.transform(filmsdf).select('rawFeatures') from pyspark.sql.functions import udf import pyspark.sql.functions as F from pyspark.sql.types import StringType, DoubleType, IntegerType indices_udf = udf(lambda vector: vector.indices.tolist(), ArrayType(IntegerType())) values_udf = udf(lambda vector: vector.toArray().tolist(), ArrayType(DoubleType())) rawFeatures.withColumn('indices', indices_udf(F.col('rawFeatures'))) \ .withColumn('values', values_udf(F.col('rawFeatures'))) \ .withColumn("Terms", termsIdx2Term(vocabList)("indices")).show() ``` ![](https://i.imgur.com/l5qh1Rb.png) <center> <p> Example on Term Frequency </p> </center> # Linear Regression We will predict the AudienceScore from `Movie`, `LeadStudio` and `RottenTomatoes` columns using Linear Regression from Spark MLlib. ```python! %pyspark dfvec = df.select(["Movie", "LeadStudio", "RottenTomatoes", "AudienceScore"]) dfvec = dfvec.withColumnRenamed("AudienceScore","label") categoricalCols = ['Movie', 'LeadStudio'] others = [c for c in dfvec.columns if c not in categoricalCols] from pyspark.ml import Pipeline from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler from pyspark.sql.functions import col indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)).setHandleInvalid("skip") for c in categoricalCols ] # default setting: dropLast=True encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol="{0}_encoded".format(indexer.getOutputCol())) for indexer in indexers ] assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] + others, outputCol= "features") pipeline = Pipeline(stages=indexers + encoders + [assembler]) dfvec = dfvec.na.drop() model=pipeline.fit(dfvec) data = model.transform(dfvec) data = data.select(["features", "label"]) from pyspark.ml.regression import LinearRegression from pyspark.ml.feature import VectorIndexer from pyspark.ml.evaluation import RegressionEvaluator featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data) transformed = featureIndexer.transform(data) transformed.show() # split the data into 60% training (trainingData, testData) = transformed.randomSplit([0.6, 0.4]) lr = LinearRegression() pipeline = Pipeline(stages=[featureIndexer, lr]) model = pipeline.fit(trainingData) predictions = model.transform(testData) predictions.show() from pyspark.ml.evaluation import RegressionEvaluator evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse") rmse = evaluator.evaluate(predictions) print("Root Mean Squared Error (RMSE) on test data = %g" % rmse) ``` :::warning You can should complete learning classification, clustering, other ML model examples from the [book](https://runawayhorse001.github.io/LearningApacheSpark/index.html). ::: <!-- # Classification # Clustering --> # References - [Apache Spark](https://spark.apache.org) - [Deep Dive into Apache Spark Transformations and Action](https://blog.knoldus.com/deep-dive-into-apache-spark-transformations-and-action/) - [Apache Spark RDD vs DataFrame vs DataSet](https://data-flair.training/blogs/apache-spark-rdd-vs-dataframe-vs-dataset/) - [Spark with Python (PySpark) Tutorial For Beginners](https://sparkbyexamples.com/pyspark-tutorial/) - [Learning PySpark](https://www.amazon.com/Learning-PySpark-Tomasz-Drabas/dp/1786463709)