Course: Big Data - IU S23
Author: Firas Jolha
If you have installed Python and pip from previous labs, then you just need to run the following command:
pip2 install pyspark
Otherwise, you need to return to the previous labs and install them.
Scala is the native language for writing Spark applications but Apache Spark supports drivers for other languages such as Python (PySpark package), Java, and R.
Apache Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, pandas API on Spark for pandas workloads (only for spark 3), MLlib for machine learning, GraphX for graph processing, and Structured Streaming for incremental computation and stream processing.
Spark introduces the concept of an RDD (Resilient Distributed Dataset), an immutable fault-tolerant, distributed collection of objects that can be operated on in parallel. An RDD can contain any type of object and is created by loading an external dataset or distributing a collection from the driver program. Each RDD is split into multiple partitions (similar pattern with smaller sets), which may be computed on different nodes of the cluster
RDD means:
Resilient – capable of rebuilding data on failure
Distributed – distributes data among various nodes in cluster
Dataset – collection of partitioned data with values
Spark Driver contains more components responsible for translation of user code into actual jobs executed on cluster:
Spark has a small code base and the system is divided in various layers. Each layer has some responsibilities. The layers are independent of each other.
The first layer is the interpreter, Spark uses a Scala interpreter, with some modifications. As you enter your code in spark console (creating RDD’s and applying operators), Spark creates a operator graph. When the user runs an action (like collect), the Graph is submitted to a DAG Scheduler. The DAG scheduler divides operator graph into (map and reduce) stages. A stage is comprised of tasks based on partitions of the input data. The DAG scheduler pipelines operators together to optimize the graph. For e.g. Many map operators can be scheduled in a single stage. This optimization is key to Spark's performance. The final result of a DAG scheduler is a set of stages. The stages are passed on to the Task Scheduler. The task scheduler launches tasks via cluster manager (Spark Standalone/Yarn/Mesos). The task scheduler doesn’t know about dependencies among stages.
There are two main types of Spark operations: Transformations and Actions.
Spark RDD Transformations are functions that take an RDD as the input and produce one or many RDDs as the output. They do not change the input RDD (since RDDs are immutable and hence one cannot change it), but always produce one or more new RDDs by applying the computations they represent e.g. Map(), filter(), reduceByKey() etc.
Now there is a point to be noted here and that is when you apply the transformation on any RDD it will not perform the operation immediately. It will create a DAG(Directed Acyclic Graph) using the applied operation, source RDD and function used for transformation. And it will keep on building this graph using the references till you apply any action operation on the last lined up RDD. That is why the transformation in Spark are lazy.
Transformations construct a new RDD from a previous one. For example, one common transformation is filtering data that matches a predicate.
There are two types of transformations:
Narrow transformation — In Narrow transformation, all the elements that are required to compute the records in single partition live in the single partition of parent RDD. A limited subset of partition is used to calculate the result. Narrow transformations are the result of map(), filter().
Wide transformation — In wide transformation, all the elements that are required to compute the records in the single partition may live in many partitions of parent RDD. Wide transformations are the result of groupbyKey and reducebyKey.
Actions, on the other hand, compute a result based on an RDD, and either return it to the driver program or save it to an external storage system (e.g., HDFS).
Action is one of the ways of sending data from Executer to the driver. Executors are agents that are responsible for executing a task. While the driver is a JVM process that coordinates workers and execution of the task.
Spark Paired RDDs are nothing but RDDs containing a key-value pair. Basically, key-value pair (KVP) consists of a two linked data item in it. Here, the key is the identifier, whereas value is the data corresponding to the key value.
Moreover, Spark operations work on RDDs containing any type of objects. However key-value pair RDDs attains few special operations in it. Such as, distributed “shuffle” operations, grouping or aggregating the elements by a key.
The dataset for this demo is movies data and can be downloaded from here
import pandas as pd
import pyspark
import pymongo
import numpy as np
from pyspark.sql import SparkSession
from os.path import join
spark = SparkSession \
.builder \
.appName("my spark app") \
.master("local[*]") \
.getOrCreate()
sc = spark.sparkContext
There are multiple ways to create RDDs in PySpark.
parallelize()
functiondata = [(1,2,3, 'a b c'), (4,5,6, 'd e f'), (7,8,9,'g h i')]
rdd = sc.parallelize(data) # Spark RDD creation
rdd.collect() # Spark action
rdd.toDF() # Converts RDD into Spark DataFrame
createDataFrame()
functiondata = [(1,2,3, 'a b c'), (4,5,6, 'd e f'), (7,8,9,'g h i')]
df = spark.createDataFrame(data) # is a dataframe
df.rdd # Convert Spark DataFrame into RDD
path = "file:///sparkdata/movies.csv"
df1 = spark.read.format("csv") \
.option("sep", ",") \
.option("inferSchema", "true") \
.option("header", "true") \
.load(path)
df1.show() # Display the dataframe
df1.printSchema() # print the schema of the dataframe
import pymongo
# The default configuration
# localhost:27017
client = pymongo.MongoClient()
db = client['moviesdb'] # client['<db_name>']
# A pymongo Cursor
# db.<collection_name>
movies_cur = db.movies.find() # Get all documents
# Convert to Pandas DataFrame
df1 = pd.DataFrame(movies_cur)
from pyspark.sql.types import *
schema = StructType([
# StructField(<fieldname>, <fieldtype>, <nullability>)
StructField("Audience score %", IntegerType(), True),
StructField("Film", StringType(), True),
StructField("Genre", StringType(), True),
StructField("Lead Studio", StringType(), True),
StructField("Profitability", FloatType(), True),
StructField("Rotten Tomatoes %", IntegerType(), True),
StructField("Worldwide Gross", StringType(), True),
StructField("Year", IntegerType(), True),
StructField("_id", StringType(), True)
])
# Try to run spark.createDataFrame(movies_cur)
# Convert immediately to Spark DataFrame
df3 = spark.createDataFrame(movies_cur, schema)
# Convert to RDD then to Spark DataFrame
df4 = spark.createDataFrame(sc.parallelize(movies_cur), schema) # Convert to Spark DataFrame
To read data from MongoDB via PySpark, you need to download the spark driver connector and put the jar file in the jars directory of SPARKHOME.
path = "hdfs:///data/movies.csv"
df3 = spark.read.load(path, format="csv", sep = ",", inferSchema = "true", header = "true") # a spark dataframe
df3 = spark.read.csv(path, sep = ",", inferSchema = "true", header = "true") # a spark dataframe
df3.show()
rdd3 = sc.textFile(path) # a spark rdd
The file movies.csv
is uploaded to HDFS and stored in the folder /data
.
Returns a new RDD by applying a function to each element of this RDD.
# df1 is Spark DataFrame for the previous movies.csv data
films = df1.select(['Film']).rdd.flatMap(lambda x : x)
# flatmap() - Returns a new RDD by first applying a function to all elements of this RDD, and then flattening the results
rdd1 = films
rdd2 = rdd1.map(lambda row : (row, 1))
# map() - Returns a new RDD by applying a function to each element of this RDD
rdd1.take(10)
rdd2.take(10)
genres = df1.select(['Genre']).rdd.flatMap(lambda x : x)
temprdd1 = genres.groupBy(lambda x : x)
temprdd2 = temprdd1.map(lambda x : (x[0], len(x[1])))
temprdd2.collect()
tempdf = temprdd2.toDF()
tempdf.show()
# Returns the number of film records for each genre
Returns a new dataset formed by selecting those elements of the source on which func
returns true.
genres = df1.select(['Genre']).rdd.flatMap(lambda x : x)
genres.filter(lambda x : not x.lower().startswith("c")).collect()
# Returns the genres which does not start with c (case insensitive)
Write a Spark application for counting the number of words in the titles of the films.
Entity Resolution is a technique to identify data records in a single data source or across multiple data sources that refer to the same real-world entity and to link the records together. In Entity Resolution, the strings that are nearly identical, but maybe not exactly the same, are matched without having a unique identifier.
In this exercise, you will read the data from a MongoDB database mflix
from the lab 03.
Exercises on PySpark
You need to work on a Zeppelin note.
Exercises on PySpark
The second part of Spark lab will be dedicated to learn more data analytics in Spark and how to use the package pyspark.ml
for building distributed ML models for some classification and clustering problems.