--- tags: BigData title: Lab 04 - Apache Spark - Part 1 --- # Lab 04 : Apache Spark - Part 1 **Course:** Big Data - IU S23 **Author:** Firas Jolha <!-- ## Dataset - ## Readings - --> # Agenda [toc] # Prerequisites - Installed Hortonworks Data Platform (HDP) Sandbox - Installed pip and pandas packages - Added Python interpreter to Zeppelin # Objectives - Install pyspark on HDP Sandbox - Distribute data in HDFS - I/O in pyspark - Transformations & actions in pyspark - Learn how to analyze data in pyspark - Write spark applications - Run and monitor spark applications <!-- Install mongodb on HDP cluster https://www.youtube.com/watch?v=5AUe7y6hnL8 --> <!-- # Introduction --> # Install PySpark on HDP Sandbox If you have installed Python and pip from previous labs, then you just need to run the following command: ```sh pip2 install pyspark ``` Otherwise, you need to return to the previous labs and install them. :::warning [Scala is the native language](https://www.oreilly.com/library/view/data-algorithms-with/9781492082378/ch01.html) for writing Spark applications but Apache Spark supports drivers for other languages such as Python (PySpark package), Java, and R. ::: # Intro to Apache Spark **Apache Spark** is a unified analytics engine for large-scale data processing. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, pandas API on Spark for pandas workloads (only for spark 3), MLlib for machine learning, GraphX for graph processing, and Structured Streaming for incremental computation and stream processing. ![](https://i.imgur.com/N20Uo5D.png) Spark introduces the concept of an **RDD (Resilient Distributed Dataset)**, an immutable fault-tolerant, distributed collection of objects that can be operated on in parallel. An RDD can contain any type of object and is created by loading an external dataset or distributing a collection from the driver program. Each RDD is split into multiple partitions (similar pattern with smaller sets), which may be computed on different nodes of the cluster RDD means: **Resilient** – capable of rebuilding data on failure **Distributed** – distributes data among various nodes in cluster **Dataset** – collection of partitioned data with values ## Core Concepts in Spark - **Job:** A piece of code which reads some input from HDFS or local, performs some computation on the data and writes some output data. - **Stages:** Jobs are divided into stages. Stages are classified as a Map or reduce stages (Its easier to understand if you have worked on Hadoop and want to correlate). Stages are divided based on computational boundaries, all computations (operators) cannot be Updated in a single Stage. It happens over many stages. - **Tasks:** Each stage has some tasks, one task per partition. One task is executed on one partition of data on one executor (machine). - **DAG:** DAG stands for Directed Acyclic Graph, in the present context its a DAG of operators. - **Executor:** The process responsible for executing a task. - **Master:** The machine on which the Driver program runs - **Slave:** The machine on which the Executor program runs ## Spark Components - Spark Driver - A separate process to execute user applications - Creates **SparkContext** to schedule jobs execution and negotiate with cluster manager - Executors - Run tasks scheduled by driver - Store computation results in memory, on disk or off-heap - interact with storage systems - Cluster Manager - Mesos - YARN - Spark Standalone ![](https://i.imgur.com/K7xzq0s.png) Spark Driver contains more components responsible for translation of user code into actual jobs executed on cluster: ![](https://i.imgur.com/OMrw8jE.png) - **SparkContext** - represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster - **DAGScheduler** - computes a DAG of stages for each job and submits them to TaskScheduler determines preferred locations for tasks (based on cache status or shuffle files locations) and finds minimum schedule to run the jobs - **TaskScheduler** - responsible for sending tasks to the cluster, running them, retrying if there are failures, and mitigating stragglers - **SchedulerBackend** - backend interface for scheduling systems that allows plugging in different implementations( Mesos, YARN, Standalone, local) - **BlockManager** - provides interfaces for putting and retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap) ## How Spark works? Spark has a small code base and the system is divided in various layers. Each layer has some responsibilities. The layers are independent of each other. ![](https://i.imgur.com/vLg1PoN.png) The first layer is the interpreter, Spark uses a Scala interpreter, with some modifications. As you enter your code in spark console (creating RDD’s and applying operators), Spark creates a operator graph. When the user runs an action (like collect), the Graph is submitted to a DAG Scheduler. The DAG scheduler divides operator graph into (map and reduce) stages. A stage is comprised of tasks based on partitions of the input data. The DAG scheduler pipelines operators together to optimize the graph. For e.g. Many map operators can be scheduled in a single stage. This optimization is key to Spark's performance. The final result of a DAG scheduler is a set of stages. The stages are passed on to the Task Scheduler. The task scheduler launches tasks via cluster manager (Spark Standalone/Yarn/Mesos). The task scheduler doesn’t know about dependencies among stages. <!-- ## Word Count Example --> # Spark Operations There are two main types of Spark operations: Transformations and Actions. ![](https://i.imgur.com/5Rzg8rU.png) <!-- ![](https://i.imgur.com/zomLW1S.png) --> ## Spark Transformations Spark RDD Transformations are functions that take an RDD as the input and produce one or many RDDs as the output. They do not change the input RDD (since RDDs are immutable and hence one cannot change it), but always produce one or more new RDDs by applying the computations they represent e.g. Map(), filter(), reduceByKey() etc. Now there is a point to be noted here and that is when you apply the transformation on any RDD it will not perform the operation immediately. It will create a DAG(Directed Acyclic Graph) using the applied operation, source RDD and function used for transformation. And it will keep on building this graph using the references till you apply any action operation on the last lined up RDD. That is why the transformation in Spark are lazy. Transformations construct a new RDD from a previous one. For example, one common transformation is filtering data that matches a predicate. There are two types of transformations: **Narrow transformation** — In Narrow transformation, all the elements that are required to compute the records in single partition live in the single partition of parent RDD. A limited subset of partition is used to calculate the result. Narrow transformations are the result of map(), filter(). ![](https://i.imgur.com/LFPbHzv.png) **Wide transformation** — In wide transformation, all the elements that are required to compute the records in the single partition may live in many partitions of parent RDD. Wide transformations are the result of groupbyKey and reducebyKey. ![](https://i.imgur.com/RqpWt7h.png) ![](https://i.imgur.com/DbTTs5o.png) ## Spark Actions Actions, on the other hand, compute a result based on an RDD, and either return it to the driver program or save it to an external storage system (e.g., HDFS). Action is one of the ways of sending data from Executer to the driver. Executors are agents that are responsible for executing a task. While the driver is a JVM process that coordinates workers and execution of the task. ![](https://i.imgur.com/XOM1KVq.png) Spark Paired RDDs are nothing but RDDs containing a key-value pair. Basically, key-value pair (KVP) consists of a two linked data item in it. Here, the key is the identifier, whereas value is the data corresponding to the key value. Moreover, Spark operations work on RDDs containing any type of objects. However key-value pair RDDs attains few special operations in it. Such as, distributed “shuffle” operations, grouping or aggregating the elements by a key. ![](https://i.imgur.com/vdaocQQ.png) # Demo on Spark Operations The dataset for this demo is movies data and can be downloaded from [here](https://gist.githubusercontent.com/tiangechen/b68782efa49a16edaf07dc2cdaa855ea/raw/0c794a9717f18b094eabab2cd6a6b9a226903577/movies.csv) ## Import required packages ```python import pandas as pd import pyspark import pymongo import numpy as np from pyspark.sql import SparkSession from os.path import join ``` ## Open a SparkSession ```python spark = SparkSession \ .builder \ .appName("my spark app") \ .master("local[*]") \ .getOrCreate() sc = spark.sparkContext ``` ## Spark RDD & DataFrame There are multiple ways to create RDDs in PySpark. ### 1. using `parallelize()` function ```python data = [(1,2,3, 'a b c'), (4,5,6, 'd e f'), (7,8,9,'g h i')] rdd = sc.parallelize(data) # Spark RDD creation rdd.collect() # Spark action rdd.toDF() # Converts RDD into Spark DataFrame ``` ### 2. using `createDataFrame()` function ```python data = [(1,2,3, 'a b c'), (4,5,6, 'd e f'), (7,8,9,'g h i')] df = spark.createDataFrame(data) # is a dataframe df.rdd # Convert Spark DataFrame into RDD ``` ### 3. Read from a local file ```python path = "file:///sparkdata/movies.csv" df1 = spark.read.format("csv") \ .option("sep", ",") \ .option("inferSchema", "true") \ .option("header", "true") \ .load(path) df1.show() # Display the dataframe df1.printSchema() # print the schema of the dataframe ``` ### 4. Read from MongoDB via PyMongo ```python import pymongo # The default configuration # localhost:27017 client = pymongo.MongoClient() db = client['moviesdb'] # client['<db_name>'] # A pymongo Cursor # db.<collection_name> movies_cur = db.movies.find() # Get all documents # Convert to Pandas DataFrame df1 = pd.DataFrame(movies_cur) from pyspark.sql.types import * schema = StructType([ # StructField(<fieldname>, <fieldtype>, <nullability>) StructField("Audience score %", IntegerType(), True), StructField("Film", StringType(), True), StructField("Genre", StringType(), True), StructField("Lead Studio", StringType(), True), StructField("Profitability", FloatType(), True), StructField("Rotten Tomatoes %", IntegerType(), True), StructField("Worldwide Gross", StringType(), True), StructField("Year", IntegerType(), True), StructField("_id", StringType(), True) ]) # Try to run spark.createDataFrame(movies_cur) # Convert immediately to Spark DataFrame df3 = spark.createDataFrame(movies_cur, schema) # Convert to RDD then to Spark DataFrame df4 = spark.createDataFrame(sc.parallelize(movies_cur), schema) # Convert to Spark DataFrame ``` :::info To read data from MongoDB via PySpark, you need to download the spark driver connector and put the jar file in the jars directory of *SPARKHOME*. ::: ### 5. Read from HDFS ```python! path = "hdfs:///data/movies.csv" df3 = spark.read.load(path, format="csv", sep = ",", inferSchema = "true", header = "true") # a spark dataframe df3 = spark.read.csv(path, sep = ",", inferSchema = "true", header = "true") # a spark dataframe df3.show() rdd3 = sc.textFile(path) # a spark rdd ``` The file `movies.csv` is uploaded to HDFS and stored in the folder `/data`. ## map and flatMap operations Returns a new RDD by applying a function to each element of this RDD. ```python! # df1 is Spark DataFrame for the previous movies.csv data films = df1.select(['Film']).rdd.flatMap(lambda x : x) # flatmap() - Returns a new RDD by first applying a function to all elements of this RDD, and then flattening the results rdd1 = films rdd2 = rdd1.map(lambda row : (row, 1)) # map() - Returns a new RDD by applying a function to each element of this RDD rdd1.take(10) rdd2.take(10) ``` ## groupBy operation ```python! genres = df1.select(['Genre']).rdd.flatMap(lambda x : x) temprdd1 = genres.groupBy(lambda x : x) temprdd2 = temprdd1.map(lambda x : (x[0], len(x[1]))) temprdd2.collect() tempdf = temprdd2.toDF() tempdf.show() # Returns the number of film records for each genre ``` ## filter operation Returns a new dataset formed by selecting those elements of the source on which `func` returns true. ```python! genres = df1.select(['Genre']).rdd.flatMap(lambda x : x) genres.filter(lambda x : not x.lower().startswith("c")).collect() # Returns the genres which does not start with c (case insensitive) ``` # Spark Execise 1 Write a Spark application for counting the number of words in the titles of the films. # Spark Exercise 2 Entity Resolution is a technique to identify data records in a single data source or across multiple data sources that refer to the same real-world entity and to link the records together. In Entity Resolution, the strings that are nearly identical, but maybe not exactly the same, are matched without having a unique identifier. In this exercise, you will read the data from a MongoDB database `mflix` from the lab 03. :::success **Exercises on PySpark** You need to work on a Zeppelin note. 1. Import the data from MongoDB database. 2. In Zeppelin, access the database and read the description of movies. 3. Search for character names in the description of movies. 4. Link the matched entities to the character names and display their information ::: # Spark Exercise 3 :::success **Exercises on PySpark** 1. Download an article whose size is at least 200 MB from [Wikimedia dump Feb.2023](https://dumps.wikimedia.org/enwiki/20230201). 2. Prepare a Zeppelin note to extract the words from the article. Get rid of digits and numbers and exclude any words contain digits. 3. Analyze the distribution of words in the article. 4. Draw a histogram shows the distribution of 20 top words from the article. 5. Answer the following queries: - What is the most popular word in the article? - Is there a semantic relationship between the most popular word and the title of the article? - Can you formulate a new meaningful title from the top words? - Do you think that the top words can be helpful for title generation? - Can you check if the distribution of words is normal or not? you can use any normality tests you know from your advanced statistics class. A helpful [resource](https://machinelearningmastery.com/a-gentle-introduction-to-normality-tests-in-python/). - What is the relative matching among top words? You can use any string matching distance/measure but you need to implement it in pyspark. ::: # Upcoming Lab The second part of Spark lab will be dedicated to learn more data analytics in Spark and how to use the package `pyspark.ml` for building distributed ML models for some classification and clustering problems. # References - [Apache Spark](https://spark.apache.org) - [Deep Dive into Apache Spark Transformations and Action](https://blog.knoldus.com/deep-dive-into-apache-spark-transformations-and-action/) - [Apache Spark RDD vs DataFrame vs DataSet](https://data-flair.training/blogs/apache-spark-rdd-vs-dataframe-vs-dataset/) - [Spark with Python (PySpark) Tutorial For Beginners](https://sparkbyexamples.com/pyspark-tutorial/) - [Learning PySpark](https://www.amazon.com/Learning-PySpark-Tomasz-Drabas/dp/1786463709)