--- tags: IntroToBigData title: Lab 4 - Apache Spark Core & SQL --- # Lab 4 - Apache Spark Core & SQL **Course:** Big Data - IU S23 **Author:** Firas Jolha ## Dataset - [Top gross movies between 2007 and 2011](https://raw.githubusercontent.com/firas-jolha/BigData-IU-S23/main/movies.csv) - [World cup results from 1872 till 2022](https://raw.githubusercontent.com/aminebennaji19/FIFA-World-Cup-Qatar-2022/main/data/results.csv) ## Readings - [Spark 3.0.3 Python API Docs](https://spark.apache.org/docs/3.0.3/api/python/index.html) # Agenda [toc] # Prerequisites - Installed Hortonworks Data Platform (HDP) Sandbox - Installed pip and pandas packages - Added Python interpreter to Zeppelin # Objectives - Install pyspark on HDP Sandbox - Distribute data in HDFS - Transformations & actions in pyspark - Learn how to analyze data in pyspark - Write spark applications <!-- Install mongodb on HDP cluster https://www.youtube.com/watch?v=5AUe7y6hnL8 --> <!-- # Introduction --> # Intro to Apache Spark [review] :::warning This section gives a theoretical introduction to Spark, which will be covered in the lecture. Feel free to skip it if you have enough theoretical knowledge about Spark. ::: **Apache Spark** is a unified analytics engine for large-scale data processing. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, pandas API on Spark for pandas workloads (only for spark 3), MLlib for machine learning, GraphX for graph processing, and Structured Streaming for incremental computation and stream processing. ![](https://i.imgur.com/N20Uo5D.png) Spark introduces the concept of an **RDD (Resilient Distributed Dataset)**, an immutable fault-tolerant, distributed collection of objects that can be operated on in parallel. An RDD can contain any type of object and is created by loading an external dataset or distributing a collection from the driver program. Each RDD is split into multiple partitions (similar pattern with smaller sets), which may be computed on different nodes of the cluster RDD means: **Resilient** – capable of rebuilding data on failure **Distributed** – distributes data among various nodes in cluster **Dataset** – collection of partitioned data with values ## Core Concepts in Spark - **Application:** A user program built on Spark. Consists of a driver program and executors on the cluster. - **Driver Program:** The process running the `main()` function of the application and creating the `SparkContext`. - **Cluster Manager:** An external service for acquiring resources on the cluster (e.g. standalone manager, Mesos, YARN). `--master` option in `spark-submit` tool. - **Deploy mode:** Distinguishes where the driver process runs. In "cluster" mode, the framework launches the driver inside of the cluster. In "client" mode, the submitter launches the driver outside of the cluster. `--deploy-mode` option in `spark-submit` tool. - **Job:** A piece of code which reads some input from HDFS or local, performs some computation on the data and writes some output data. - **Stages:** Jobs are divided into stages. Stages are classified as a Map or reduce stages. Stages are divided based on computational boundaries, all computations (operators) cannot be updated in a single Stage. It happens over many stages. - **Tasks:** Each stage has some tasks, one task per partition. One task is executed on one partition of data on one executor (machine). - **DAG:** DAG stands for Directed Acyclic Graph, in the present context its a DAG of operators. - **Executor:** The process responsible for executing a task. - **Master:** The machine on which the Driver program runs - **Slave:** The machine on which the Executor program runs Check the glossary from [here](https://spark.apache.org/docs/3.0.3/cluster-overview.html). <!-- ![](https://i.imgur.com/F4FrNnG.png) --> ## Spark Application Model Apache Spark is widely considered to be the successor to MapReduce for general purpose data processing on Apache Hadoop clusters. In **MapReduce**, the highest-level unit of computation is a job. A job loads data, applies a map function, shuffles it, applies a reduce function, and writes data back out to persistent storage. In **Spark**, the highest-level unit of computation is an application. A Spark application can be used for a single batch job, an interactive session with multiple jobs, or a long-lived server continually satisfying requests. A Spark job can consist of more than just a single map and reduce. MapReduce starts a process for each task. In contrast, a Spark application can have processes running on its behalf even when it is not running a job. Furthermore, multiple tasks can run within the same executor. Both combine to enable extremely fast task startup time as well as in-memory data storage, resulting in orders of magnitude faster performance over MapReduce. ## Spark Execution Model At runtime, a Spark application maps to a single driver process and a set of executor processes distributed across the hosts in a cluster. The driver process manages the job flow and schedules tasks and is available the entire time the application is running. Typically, this driver process is the same as the client process used to initiate the job, although when run on YARN, the driver can run in the cluster. In interactive mode, the shell itself is the driver process. The executors are responsible for executing work, in the form of tasks, as well as for storing any data that you cache. Executor lifetime depends on whether dynamic allocation is enabled. An executor has a number of slots for running tasks, and will run many concurrently throughout its lifetime. ![](https://i.imgur.com/kErC1WW.png) Invoking an `action` operation inside a Spark application triggers the launch of a **job** to fulfill it. Spark examines the dataset on which that action depends and formulates an execution plan. The execution plan assembles the dataset transformations into **stages**. A stage is a collection of **tasks** that run the same code, each on a different subset of the data. ## Spark Components - Spark Driver - Executors - Cluster Manager ![](https://i.imgur.com/K7xzq0s.png) Spark Driver contains more components responsible for translation of user code into actual jobs executed on cluster: ![](https://i.imgur.com/OMrw8jE.png) - **SparkContext** - represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster - **DAGScheduler** - computes a DAG of stages for each job and submits them to TaskScheduler determines preferred locations for tasks (based on cache status or shuffle files locations) and finds minimum schedule to run the jobs - **TaskScheduler** - responsible for sending tasks to the cluster, running them, retrying if there are failures, and mitigating stragglers - **SchedulerBackend** - backend interface for scheduling systems that allows plugging in different implementations( Mesos, YARN, Standalone, local) - **BlockManager** - provides interfaces for putting and retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap) ## Spark Architecture Apache Spark works in a master-slave architecture where the **master** is called **“Driver”** and **slaves** are called **“Workers”**. ![](https://i.imgur.com/3Tj1QUG.png) When you run a Spark application, Spark Driver creates a context that is an entry point to your application, and all operations (transformations and actions) are executed on worker nodes, and the resources are managed by Cluster Manager. ## How Spark works? Spark has a small code base and the system is divided in various layers. Each layer has some responsibilities. The layers are independent of each other. ![](https://i.imgur.com/vLg1PoN.png) The first layer is the interpreter, Spark uses a Scala interpreter, with some modifications. As you enter your code in spark console (creating RDD’s and applying operators), Spark creates a operator graph. When the user runs an action (like collect), the Graph is submitted to a DAG Scheduler. The DAG scheduler divides operator graph into (map and reduce) stages. A stage is comprised of tasks based on partitions of the input data. The DAG scheduler pipelines operators together to optimize the graph. For e.g. Many map operators can be scheduled in a single stage. This optimization is key to Spark's performance. The final result of a DAG scheduler is a set of stages. The stages are passed on to the Task Scheduler. The task scheduler launches tasks via cluster manager (Spark Standalone/Yarn/Mesos). The task scheduler doesn’t know about dependencies among stages. ## Spark Features - In-memory computation - PySpark loads the data from disk and process in memory and keeps the data in memory. - This is the main difference between PySpark and Mapreduce (I/O intensive). - Distributed processing using parallelize - When you create RDD from a data, it partitions the data elements. By default, Spark creates one partition for each core. - Can be used with many cluster managers (Spark, Yarn, Mesos e.t.c) - Fault-tolerant - It can be used to read and write files from distributed file systems like HDFS. - Immutable - Once RDDs are created they cannot be modified, they need to be destroyed and recreated to perform any change. - Lazy evaluation - PySpark does not evaluate the RDD transformations as they appear/encountered by Driver, instead it keeps the all transformations as it encounters in a graph (DAG) and evaluates them when it sees the first RDD action. - Cache & persistence - We can also cache/persists the RDD in memory to reuse the previous computations. - Inbuild-optimization when using DataFrames - Supports SQL - We can perform analysis on the cluster via queries written in SQL and executed on Spark engine. ## Supported Cluster Managers Spark supports four cluster managers: - *Standalone* – a simple cluster manager included with Spark that makes it easy to set up a cluster. - *Hadoop YARN* – the resource manager in Hadoop 2. This is mostly used, cluster manager. - *Apache Mesos* – Mesos is a Cluster manager that can also run Hadoop MapReduce and PySpark applications. - *Kubernetes* – an open-source system for automating deployment, scaling, and management of containerized applications. - **local** – which is not really a cluster manager but still you can use “local” for master() in order to run Spark on your local machine. # PySpark PySpark is a Spark library written in Python to run Python applications using Apache Spark capabilities. Using PySpark we can run applications in parallel on the distributed cluster (multiple nodes). In other words, PySpark is a Python API for Apache Spark. **Spark** is written in **Scala** and later on due to its industry adaptation, its API PySpark released for Python using Py4J Java library that is integrated within PySpark and allows Python to dynamically interface with JVM objects. Hence, to run PySpark you need Java to be installed along with Python, and Apache Spark. ## PySpark Modules & Packages - **PySpark RDD** (`pyspark.rdd`) - **PySpark DataFrame and SQL** (`pyspark.sql`) - Pandas-On-Spark (new in PySpark 3.2.0+) - `pyspark.pandas` - PySpark ML - RDD-based (`pyspark.mllib`) - Spark DataFrame-based (`pyspark.ml`) - Next week. - PySpark Streaming (`pyspark.streaming`) - Structured Streaming (new in PySpark 3.2.0+) - `pyspark.sql.streaming` - Last week. - PySpark GraphFrames (GraphFrames) - Spark GraphX supported only in Scala - Last week. - PySpark Resource (`pyspark.resource`) - new in PySpark 3.0 Besides these, if you wanted to use third-party libraries, you can find them at https://spark-packages.org/ . This page is kind of a repository of all Spark third-party libraries. ## Install PySpark on HDP Sandbox If you have installed Python and pip from previous labs, then you just need to run the following command: ```sh pip2 install pyspark ``` Otherwise, you need to return to the previous labs and install them. :::warning [Scala is the native language](https://www.oreilly.com/library/view/data-algorithms-with/9781492082378/ch01.html) for writing Spark applications but Apache Spark supports drivers for other languages such as Python (PySpark package), Java, and R. ::: ## Running PySpark applications You can run PySpark statements in interactive mode on the shell `pyspark` or you can write them in Pyhton file and submit it to Spark using the tool `spark-submit`. # Spark RDD RDD is a fundamental building block of Spark. RDDs are immutable distributed collections of objects. Immutable meaning once you create an RDD you cannot change it. Each record in RDD is divided into logical partitions, which can be computed on different nodes of the cluster. In other words, RDDs are a collection of objects similar to list in Python, with the difference being RDD is computed on several processes scattered across multiple physical servers also called nodes in a cluster while a Python collection lives and process in just one process. Additionally, RDDs provide data abstraction of partitioning and distribution of the data designed to run computations in parallel on several nodes, while doing transformations on RDD we do not have to worry about the parallelism as Spark by default provides. ## Spark RDD Operations There are two main types of Spark operations: Transformations and Actions. ![](https://i.imgur.com/5Rzg8rU.png) ## Spark RDD Transformations Spark RDD Transformations are functions that take an RDD as the input and produce one or many RDDs as the output. They do not change the input RDD (since RDDs are immutable), but always produce one or more new RDDs by applying the computations they represent e.g. map(), filter(), reduceByKey() etc. Spark supports lazy evaluation and when you apply the transformation on any RDD it will not perform the operation immediately. It will create a DAG(Directed Acyclic Graph) using 1) the applied operation, 2) source RDD and 3) function used for transformation. It will keep on building this graph using the references till you apply any action operation on the last lined up RDDs. That is why the transformations in Spark are lazy. Transformations construct a new RDD from a previous one. ![](https://i.imgur.com/GJgpmBr.png) For example, we can build a simple Spark application for counting the words in a text file. A spark job of two stages needs to be created (we do not create stages manually but the we define a pipeline for which the stages will be determined): - Stage 1 - Read the data from the files - input: text file - output: RDD1 - Split the sentences in the RDD into words - input: RDD1 - output: RDD2 - operation: flatMap(split the words by space) - The elements of RDD are only values. - Initialize the counters to 1 for each word. - input: RDD2 - output: RDD3 - operation: map(initialize the counters to 1 for each word) - The elements of RDD are key-value pairs. - PairRDD - Aggregates the counts of words where the group key is the word. - input: RDD3 - ouput: RDD4 - operation: reduceByKey(aggregates the counts based on the word which is the key in the key-value pair) - Stage 2 - Print the word counts to the screen - input: RDD4 - output: RDD5 - operation: foreach(prints the words with their counts) ![](https://i.imgur.com/zomLW1S.png) There are two types of transformations: **Narrow transformation** — In Narrow transformation, all the elements that are required to compute the records in single partition live in the single partition of parent RDD. A limited subset of partition is used to calculate the result. Narrow transformations are the result of map(), filter(). ![](https://i.imgur.com/LFPbHzv.png) **Wide transformation** — In wide transformation, all the elements that are required to compute the records in the single partition may live in many partitions of parent RDD. Wide transformations are the result of groupbyKey() and reducebyKey(). **Spark Pair RDDs** are nothing but RDDs containing a key-value pair. Basically, key-value pair (KVP) consists of a two linked data item in it. Here, the key is the identifier, whereas value is the data corresponding to the key value. Moreover, Spark operations work on RDDs containing any type of objects. However key-value pair RDDs attains few special operations in it. Such as, distributed “shuffle” operations, grouping or aggregating the elements by a key. The following Spark transformations accept input as an RDD consisting of single values. ![](https://i.imgur.com/RqpWt7h.png) Where the transformations below accept an input as a pair RDD consisting of key-value pairs. ![](https://i.imgur.com/DbTTs5o.png) ## Spark RDD Actions Actions, on the other hand, compute a result based on an RDD, and either return it to the driver program or save it to an external storage system (e.g., HDFS). The following actions are applied on RDDs which contains single values. ![](https://i.imgur.com/XOM1KVq.png) Action is one of the ways of sending data from Executer to the driver. Executors are agents that are responsible for executing a task. While the driver is a JVM process that coordinates workers and execution of the task. The following actions are applied on pair RDDs which contain key-value pairs. ![](https://i.imgur.com/vdaocQQ.png) # Spark Context The dataset for this demo is movies data and can be downloaded from the link attached to this document. ## Import required packages ```python import pandas as pd import pyspark import pymongo import numpy as np from pyspark.sql import SparkSession from os.path import join ``` ## Create a SparkContext ```python! # Import SparkSession from pyspark.sql import SparkSession # Create SparkSession spark = SparkSession \ .builder \ .appName("my spark app") \ .master("local[*]") \ # local[n] it uses n cores/threads for running spark job .getOrCreate() sc = spark.sparkContext # Or # Import SparkContext and SparkConf from pyspark import SparkContext, SparkConf # Create SparkContext conf = SparkConf() \ .setAppName("my spark app")\ .setMaster("local[*]") sc = SparkContext(conf=conf) spark = SparkSession.builder.config(conf).getOrCreate() sc = spark.sparkContext ``` :::warning Here we are using the local machine as the resource manager with as many as threads/cores as it has. ::: :::warning A Spark Driver is an application that creates a `SparkContext` for executing one or more jobs in the cluster. It allows your Spark/PySpark application to access Spark Cluster with the help of Resource Manager. When you create a `SparkSession` object, `SparkContext` is also created and can be retrieved using `spark.sparkContext`. `SparkContext` will be created only once for an application; even if you try to create another SparkContext, it still returns existing SparkContext. ::: :::danger In order to change settings of SparkSession, you need to kill the application and create it again. If your application is running on YARN cluster manager then you can kill it by getting its application_id as follows. ```powershell! yarn application -kill <application_id> ``` If your application is running in the local machine, then you can kill from Spark Web UI as follows: - Open Spark History Server UI. - In HDP you follow the link - http://localhost:18081/history/<application_id>/jobs/ - Replace <application_id> with the id of your application - Select the jobs tab. - Find a job you wanted to kill. - Select kill to stop the Job ![](https://i.imgur.com/9YKkrqH.png) ::: # Spark RDD There are multiple ways to create RDDs in PySpark. ### 1. using `parallelize()` function ```python data = [(1,2,3, 'a b c'), (4,5,6, 'd e f'), (7,8,9,'g h i')] rdd = sc.parallelize(data) # Spark RDD creation # rdd = sc.parallelize(data, n) # n is minimum number of partitions ``` This function will split the dataset into multiple partitions. You can get number of partitions by using the function `<rdd>.getNumParititions()`. <!-- ### 2. using `createDataFrame()` function ```python data = [(1,2,3, 'a b c'), (4,5,6, 'd e f'), (7,8,9,'g h i')] df = spark.createDataFrame(data) # is a dataframe df.rdd # Convert Spark DataFrame into RDD ``` --> ### 2. using `textFile` or `wholeTextFiles` functions #### Read from a local file ```python path = "file:///sparkdata/movies.csv" rdd = sc.textFile(path) # a spark rdd ``` The file `movies.csv` is uploaded to the local file system and stored in the folder `/sparkdata`. #### Read from HDFS ```python! path = "hdfs:///data/movies.csv" rdd3 = sc.textFile(path) # a spark rdd ``` The file `movies.csv` is uploaded to HDFS and stored in the folder `/data`. :::info The function `wholeTextFiles` will read the whole file as a single row in the RDD whereas the function `textFile` will read each line of the file as a row in the RDD. ::: :::warning When we use `parallelize()` or `textFile()` or `wholeTextFiles()` methods of `SparkContxt` to initiate RDD, it automatically splits the data into partitions based on resource availability. when you run it on a local machine it would create partitions as the same number of cores available on your system. ::: ## Create empty RDD ```python! rdd = sc.emptyRDD() rdd = sc.paralellize([], 10) ``` ## Repartition and Coalesce Sometimes we may need to repartition the RDD, PySpark provides two ways to repartition; first using `repartition()` method which shuffles data from all nodes also called full shuffle and second `coalesce()` method which shuffle data from minimum nodes, for examples if you have data in 4 partitions and doing coalesce(2) moves data from just 2 nodes. ```python! rdd = sc.parallelize(range(1,100), 10) print(rdd.getNumPartitions()) print(rdd.collect()) rdd2 = rdd.repartition(4) print(rdd2.getNumPartitions()) print(rdd2.collect()) rdd3 = rdd.coalesce(4) print(rdd3.getNumPartitions()) print(rdd3.collect()) ``` Note that `repartition()` method is a very expensive operation as it shuffles data from all nodes in a cluster. Both functions return RDD. :::info - `collect` is a Spark action and returns all elements of the RDD. - `repartition` and `coalesce` are considered as transformations in Spark. ::: ## PySpark RDD Transformations Transformations are lazy operations, instead of updating an RDD, these operations return another RDD. ### map and flatMap `map` operation returns a new RDD by applying a function to each element of this RDD. `flatMap` applies the map operation and then flattens the RDD rows. We use this function when you the map operation returns a list of values and flattening will convert the list of list of values into list of values. ```python! # Read file rdd1 = sc.textFile("movies.csv") rdd1.take(10) # tokenize rdd2 = rdd1.flatMap(lambda x : x.split(",")) rdd2.take(10) # Or # def f(x): # return x.split(",") # rdd2 = rdd1.flatMap(f) # rdd2.take(10) # Remove the additional spaces rdd3 = rdd2.map(lambda x : x.strip()) rdd3.take(10) ``` :::info `take(k)` is a Spark action and returns the first $k$ elements of the RDD. ::: ### filter Returns a new RDD after applying filter function on source dataset. ```python! # Returns only values which are digits rdd4 = rdd3.filter(lambda x : str(x).isdigit()) print(rdd4.count()) ``` :::info `count` is a Spark action and returns the number of elements in the RDD. ::: ### distinct Returns a new RDD after eliminating all duplicated elements.. ```python! # Returns unique number of values which are only digits rdd5 = rdd4.distinct() print(rdd5.count()) ``` ### sample Return a sampled subset of this RDD. ```python! rdd6 = rdd5.sample(withReplacement=False, fraction=0.6, seed=0) print(rdd6.count()) ``` ### randomSplit Splits the RDD by the weights specified in the argument. For example rdd.randomSplit(0.7,0.3) ```python! rdd7, rdd8 = rdd1.randomSplit(weights=[0.2, 0.3], seed=0) print(rdd7.count()) print(rdd8.count()) ``` ### mapPartitions and mapPartitionsWithIndex `mapPartitions` is similar to `map`, but executes transformation function on each partition, This gives better performance than `map` function. `mapPartitionsWithIndex` returns a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition. They both should return a generator. ```python! # a generator function def f9(iter): for v in iter: yield (v, 1) rdd9 = rdd3.mapPartitions(f9) print(rdd9.take(10)) # a generator function def f10(index, iter): for v in iter: yield (v, index) rdd10 = rdd3.mapPartitionsWithIndex(f10) print(rdd10.take(10)) ``` ### sortBy and groupBy ```python! # Compute the word-digit frequency in the file and show the top 10 words. # Group by all tokens rdd11 = rdd3.groupBy(lambda x : x) # Calculate the length of the list of token duplicates rdd12 = rdd11.map(lambda x: (x[0], len(x[1]))) # or # rdd12 = rd11.mapValues(len) # Sort the results rdd13 = rdd12.sortBy(lambda x : x[1], ascending=False) # Take the first elements of the RDD and display print(rdd13.take(10)) ``` ![](https://i.imgur.com/7HDMygw.png) ### sortByKey and reduceByKey ```python! # Compute the digit frequency in the file and show the top 10 words. # Get all digits rdd14 = rdd3.filter(lambda x: x.isdigit()) # Initialize the counters rdd15 = rdd14.map(lambda x : (x, 1)) # Aggregate the counters who have same key which is here a digit rdd16 = rdd15.reduceByKey(lambda x, y : x+y) # Sort the results rdd17 = rdd16.sortBy(lambda x : x[1], ascending=False) # Take the first elements of the RDD and display print(rdd17.take(10)) ``` ![](https://i.imgur.com/cXhsMEs.png) ## PySpark RDD Actions RDD Action operations return the values from an RDD to a driver program. In other words, any RDD function that returns non-RDD is considered as an action. ### collect returns the complete dataset as an Array. ### max, min, first, top, take `max` returns the maximum value from the dataset whereas `min` returns the minimum value from the dataset. `first` returns the first element in the dataset. `top` returns top `n` elements from the dataset (after sorting them). `take` returns the first `n` elements of the dataset. ![](https://i.imgur.com/2OfDP3p.png) :::info The operation `take` in Spark RDD is the same as `head` in pandas DataFrame whereas `top` is interpreted as the first elements after sorting them. ::: ### count, countByValue `count` returns the count of elements in the dataset. :::info There are other similar operations. `countApprox(timeout, confidence=0.95)` which is the approximate version of `count()` and returns a potentially incomplete result within a timeout, even if not all tasks have finished. `countApproxDistinct(relative_accuracy)` returns an approximate number of distinct elements in the dataset. **Note:** These operations are used when you have very large dataset which takes a lot of time to get the count. ::: `countByValue`Return a dictionary where the key represents each unique value in the dataset and the value represents count of each value present. ```python! print(rdd3.countByValue()) ``` ![](https://i.imgur.com/hBLLvZe.png) ### reduce, reduceTree `reduce` reduces the elements of the dataset using the specified binary operator. `reduceTree` reduces the elements of this RDD in a multi-level tree pattern. The output is the same. ```python! result = rdd3 \ .map(lambda x : 1) \ .reduce(lambda x, y : x+y) resultTree = rdd3 \ .map(lambda x : 1) \ .treeReduce(lambda x, y : x+y) # You should get the same results as rdd3.count() operation assert rdd3.count()==result==resultTree ``` :::warning Here I showed some of the operations, but you can find more in the [documentation](https://spark.apache.org/docs/3.0.3/api/python/pyspark.html). ::: ### saveAsTextFile Used to save the rdd to an external data store. ```python! rdd3.saveAsTextFile("/root/myrdd") ``` ![](https://i.imgur.com/qJv2oqR.png) ## RDD Persistence Persistence is useful due to: - Cost efficient – Spark computations are very expensive hence reusing the computations are used to save cost. - Time efficient – Reusing the repeated computations saves lots of time. - Execution time – Saves execution time of the job which allows us to perform more jobs on the same cluster. We have different levels for storage like memory, disk, serialized, unserialized, repliacted, unreplicated. You can check [here](https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence) for the avilable options. ### RDD Cache PySpark RDD `cache()` method by default saves RDD computation to storage level `MEMORY_ONLY` meaning it will store the data in the JVM heap as unserialized objects. ```python! cachedRDD = rdd.cache() cachedRDD.collect() ``` ### RDD Persist PySpark `persist()` method is used to store the RDD to a specific storage level. ```python! import pyspark persistedRDD = rdd.persist(pyspark.StorageLevel.MEMORY_ONLY) persistedRDD.collect() ``` ### RDD Unpersist PySpark automatically monitors every `persist()` and `cache()` calls you make and it checks usage on each node and drops persisted data if not used or by using least-recently-used (LRU) algorithm. You can also manually remove using `unpersist()` method. `unpersist()` marks the RDD as non-persistent, and remove all blocks for it from memory and disk. ```python! unpersistedRDD = persistedRDD.unpersist() unpersistedRDD.collect() ``` ## Shuffling in Spark engine Shuffling is a mechanism Spark to redistribute the data across different executors and even across machines. PySpark shuffling triggers when we perform certain transformation operations like `gropByKey()`, `reduceByKey()`, `join()` on RDDS. Shuffling is an expensive operation since it involves the following: - Disk I/O - Data serialization and deserialization - Network I/O For example, when we perform `reduceByKey()` operation, PySpark does the following: 1. Spark engine firstly runs map tasks on all partitions which groups all values for a single key. 2. The results of the map tasks are kept in memory. 3. When results do not fit in memory, PySpark stores the data into a disk. 4. PySpark shuffles the mapped data across partitions, some times it also stores the shuffled data into a disk for reuse when it needs to recalculate. 5. Run the garbage collection 6. Finally runs reduce tasks on each partition based on key. PySpark RDD triggers shuffle and repartition for several operations like `repartition()`, `coalesce()`, `groupByKey()`, and `reduceByKey()`. Based on your dataset size, a number of cores and specific memory size, can benefit or harm the Spark shuffling. When you deal with less amount of data, you should typically reduce the number of partitions otherwise you will end up with many partitioned files with less number of records in each partition. which results in running many tasks with lesser data to process. On other hand, when you have too much of data and having less number of partitions results in fewer longer running tasks and some times you may also get out of memory error. Getting the right size of the shuffle partition is always tricky and takes many runs with different values to achieve the optimized number. This is one of the key properties to look for when you have performance issues on Spark jobs. ## Shared Variables When Spark executes transformation using `map` or `reduce` operations, It executes the transformations on a remote node by using the variables that are shipped with the tasks and these variables are not sent back to PySpark Driver hence there is no capability to reuse and sharing the variables across tasks. PySpark shared variables solve this problem using the below two techniques. PySpark provides two types of shared variables. - Broadcast variables (read-only shared variable) - Accumulator variables (updatable shared variables) ### Broadcast variables We can create broadcast variables using the function `sc.broadcast`. A broadcast variable created with `SparkContext.broadcast()`. Access its value through `value`. ```python! v = sc.broadcast(range(1, 100)) print(v.value) ``` ### Accumulator variables A shared variable that can be accumulated, i.e., has a commutative and associative `add` operation. Worker tasks on a Spark cluster can add values to an Accumulator with the `+=` operator, but only the driver program is allowed to access its value, using value. Updates from the workers get propagated automatically to the driver program. ```python! acc = sc.accumulator(0) acc+=10 acc.add(10) print(acc.value) # 20 ``` # Spark DataFrame **DataFrame** is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as structured data files, tables in Hive, external databases, or existing RDDs. **PySpark DataFrame** is mostly similar to **Pandas DataFrame** with the exception PySpark DataFrames are **distributed** in the **cluster** (meaning the data in DataFrame’s are stored in different machines in a cluster) and any operations in PySpark executes in parallel on all machines whereas Panda Dataframe stores and operates on a **single machine**. Due to parallel execution on all cores on multiple machines, PySpark runs operations faster then pandas. Each record in the dataframe is of type `pyspark.sql.Row` whereas each column is of type `pyspark.sql.Column`. There are multiple ways to create DataFrame in PySpark: ## 1. using `createDataFrame()` function ```python! data = [(1,2,3, 'a b c'), (4,5,6, 'd e f'), (7,8,9,'g h i')] df = spark.createDataFrame(data) # is a dataframe df.rdd # Convert Spark DataFrame into RDD ``` ## 2. using `toDF()` function ```python! rdd = sc.parallelize(data) df = rdd.toDF() ``` ## 3. Read from a local file ```python! path = "file:///sparkdata/movies.csv" df1 = spark.read.format("csv") \ .option("sep", ",") \ .option("inferSchema", "true") \ .option("header", "true") \ .load(path) df1.show() # Display the dataframe df1.printSchema() # print the schema of the dataframe ``` ## 4. Read from MongoDB via PyMongo ```python! import pymongo # The default configuration # localhost:27017 client = pymongo.MongoClient() db = client['moviesdb'] # client['<db_name>'] # A pymongo Cursor # db.<collection_name> movies_cur = db.movies.find() # Get all documents # Convert to Pandas DataFrame df1 = pd.DataFrame(movies_cur) from pyspark.sql.types import * schema = StructType([ # StructField(<fieldname>, <fieldtype>, <nullability>) StructField("Audience score %", IntegerType(), True), StructField("Film", StringType(), True), StructField("Genre", StringType(), True), StructField("Lead Studio", StringType(), True), StructField("Profitability", FloatType(), True), StructField("Rotten Tomatoes %", IntegerType(), True), StructField("Worldwide Gross", StringType(), True), StructField("Year", IntegerType(), True), StructField("_id", StringType(), True) ]) # Try to run spark.createDataFrame(movies_cur) # Convert immediately to Spark DataFrame df3 = spark.createDataFrame(movies_cur, schema) # Convert to RDD then to Spark DataFrame df4 = spark.createDataFrame(sc.parallelize(movies_cur), schema) # Convert to Spark DataFrame ``` ## 5. Read from HDFS ```python! path = "hdfs://sandbox-hdp.hortonworks.com:8020/data/movies.csv" # See note below df = spark.read.load(path, format="csv", sep = ",", inferSchema = "true", header = "true") # a spark dataframe # OR df = spark.read.csv(path, sep = ",", inferSchema = "true", header = "true") # a spark dataframe df.printSchema() df.show(truncate=False) ``` The file `movies.csv` is uploaded to HDFS and stored in the folder `/data`. :::warning **Note:** To access the HDFS from Spark application, you need to use the hostname `sandbox-hdp.hortonworks.com` or the ip address assigned to the cluster container (for instance, it is `172.18.0.2` in my machine). You can get that address from the setting `fs.defaultFS` in HDFS advanced configuration. Here `localhost` will not work due to the different schema `hdfs`. ![](https://i.imgur.com/qE3b8u5.png) ::: ## StructType & StructField `StructType` and `StructField` classes are used to programmatically specify the schema to the DataFrame and create complex columns like nested struct, array, and map columns. `StructType` is a collection of `StructField`’s that defines column name, column data type, boolean to specify if the field can be nullable or not and metadata. ```python! from pyspark.sql.types import StructField, StructType, StringType, IntegerType # A sample data data = [ (("James","","Smith"),"36636","M",3100), (("Michael","Rose",""),"40288","M",4300), (("Robert","","Williams"),"42114","M",1400), (("Maria","Anne","Jones"),"39192","F",5500), (("Jen","Mary","Brown"),"","F",-1) ] schema = StructType([ StructField('name', StructType([ StructField('firstname', StringType(), True), StructField('middlename', StringType(), True), StructField('lastname', StringType(), True) ])), StructField('id', StringType(), True), StructField('gender', StringType(), True), StructField('salary', IntegerType(), True) ]) df = spark.createDataFrame(data=data,schema=schema) df.printSchema() df.show(truncate=False) ``` ## Spark DataFrame Operations ## show [Action] `show()` is used to display the contents of the DataFrame in a Table Row and Column Format. By default, it shows only 20 Rows, and the column values are truncated at 20 characters. ```python! df.show() df.show(5) df.show(5, truncate=False) df.show(10, truncate=False, vertical=True) ``` ## collect [Action] `collect()` is an action operation that is used to retrieve all the elements of the dataset (from all nodes) to the driver node. It retrieves all elements in a DataFrame as a `list` of `Row` type to the driver node. We should use `collect()` on smaller dataset usually after `filter()`, `group()` e.t.c. Retrieving larger datasets results in *OutOfMemory* error. You can use `head` operation to get only the first rows/records. ```python! df.collect() # all elements df.collect()[0] # first row df.collect()[0][0] # first cell at first row and first column ``` ## select [Transformation] `select()` function is used to select single, multiple, column by index, all columns from the list and the nested columns from a DataFrame. This function returns a DataFrame with the selected columns. ```python! from pyspark.sql.functions import col df.select("name", \ "name.firstname", \ df.id, \ df['gender'], \ col("salary")) \ .show() ``` ![](https://i.imgur.com/x7JOSWW.png) ```python! df.select("*").show() ``` ![](https://i.imgur.com/wK8K36m.png) ```python! df.select([col for col in df.columns]).show() ``` ![](https://i.imgur.com/F3qeTRp.png) ```python! df.select(df.columns[:2]).show() ``` ![](https://i.imgur.com/kDklnx4.png) ## withColumn, withColumnRenamed, drop [Transformation] withColumn() is a transformation function of DataFrame which is used to change the value, convert the datatype of an existing column, create a new column, and many more. ```python! # Read the data path = "hdfs://sandbox-hdp.hortonworks.com:8020/data/movies.csv" # See note below df = spark.read.load(path, format="csv", sep = ",", inferSchema = "true", header = "true") # Print Schema df.printSchema() ``` 1. Change the datatype of the column. ```python! # Convert the `Worldwide Gross` column to double # 1. Remove the $ sign import pyspark.sql.functions as F df.withColumn("Worldwide Gross", F.translate('Worldwide Gross', '$', '').cast("Double")).show(5) df.withColumn("Worldwide Gross", F.col("Worldwide Gross").cast("Double")) # You can merge the previous operations into one operation as shown below ``` ![](https://i.imgur.com/uRr3CBw.png) 2. Update the values in a column ```python! col_name = df.columns[3] df2.withColumn(col_name, F.col(col_name)/100).show(5) ``` ![](https://i.imgur.com/XAduwQ3.png) 3. Create a Column from an existing one ```python! col_name = df2.columns[3] df2.withColumn("score", F.col(col_name)/100).show(5) ``` ![](https://i.imgur.com/7J4SF67.png) 4. Add a New Column with fixed value ```python! df2.withColumn("Country", F.lit("Russia")).show() ``` 5. Rename a column ```python! df2.withColumnRenamed(df2.columns[3], "score").show(5) ``` ![](https://i.imgur.com/zDDZ153.png) 6. Drop a column ```python! df2.drop("Year").show(5) ``` ![](https://i.imgur.com/uqHXiIJ.png) ## filter, where [Transformation] PySpark `filter()` function is used to filter the rows from RDD/DataFrame based on the given condition or SQL expression, you can also use `where()` clause instead of the `filter()` if you are coming from an SQL background, both these functions operate exactly the same. ```python! df2.filter((df2.Year == 2008) & (df2['Film'].startswith("Wh"))).show(5) ``` ![](https://i.imgur.com/1uZZDVE.png) ```python! df2.filter(~F.col('Genre').isin(['Comedy', 'Drama'])).show(5) ``` ![](https://i.imgur.com/ZuQTK8O.png) ### distinct, dropDuplicates [Transformation] PySpark `distinct()` function is used to drop/remove the duplicate rows (all columns) from DataFrame and `dropDuplicates()` is used to drop rows based on selected (one or multiple) columns. ```python! print(df2.count() - df2.distinct().count()) ``` ![](https://i.imgur.com/U5LsUIF.png) ```python! df2.dropDuplicates(['Genre', 'Lead Studio']).show(5) ``` ![](https://i.imgur.com/jVKnkRh.png) ### groupby [Transformation] Similar to SQL `GROUP BY` clause, PySpark `groupBy()` function is used to collect the identical data into groups on DataFrame and perform count, sum, avg, min, max functions on the grouped data. When we perform `groupBy()` on PySpark Dataframe, it returns `GroupedData` object which contains aggregate functions. Some of them are `avg`, `sum`, `min`, `max`. :::warning Notice that the aggregate functions are transformations and will return a DataFrame. You need to call an action to see the output of the aggregation. ::: 1. Total gross for each film genre. ```python! df2.groupby("Genre").sum("Worldwide Gross").show() ``` ![](https://i.imgur.com/acvOGm3.png) 2. Calculate the average score for audience and max gross for each film genre every year. Exclude elements whose max gross is less than 50. ```python! df2.groupby("Genre", 'Year') \ .agg( F.avg("Audience score %").alias("avg_score"), \ F.max(df2.columns[6]).alias("max_gross") ).where(F.col("max_gross")>=50) \ .show(5) # Equivalent SQL Query # SELECT Genre, # Year, # avg("Audience score ") AS avg_score, # max("Worldwide Gross") AS max_score # FROM movies # GROUP BY Genre, Year # HAVING max_score >= 50 ``` ![](https://i.imgur.com/kiVRe3U.png) ### orderBy, sort [Transformation] You can use either `sort` or `orderBy` function of PySpark DataFrame to sort DataFrame by ascending or descending order based on single or multiple columns, you can also do sorting using PySpark SQL sorting functions. **Example:** Calculate the average score for audience and max gross for each film genre every year. Exclude elements whose max gross is less than 50. ```python! df2.groupby("Genre", 'Year') \ .agg( F.avg("Audience score %").alias("avg_score"), \ F.max(df2.columns[6]).alias("max_gross") ).where(F.col("max_gross")>=50) \ .sort(F.col("max_gross").asc(), F.col("avg_score").desc()) \ .show(5) # Equivalent SQL Query # SELECT Genre, # Year, # avg("Audience score ") AS avg_score, # max("Worldwide Gross") AS max_score # FROM movies # GROUP BY Genre, Year # HAVING max_score >= 50 # ORDER BY max_gross asc, avg_score desc ``` ### Join Join is used to combine two DataFrames and by chaining these you can join multiple DataFrames. it supports all basic join type operations available in traditional SQL like INNER, LEFT OUTER, RIGHT OUTER, LEFT ANTI, LEFT SEMI, CROSS, SELF JOIN. ```python! df3 = df2.groupby("Genre", 'Year') \ .agg( F.avg("Audience score %").alias("avg_score"), \ F.max(df2.columns[6]).alias("max_gross") ).where(F.col("max_gross")>=50) df3.join(df2, (df2.Genre==df3.Genre) & (df2.Year==df3.Year), how="inner").show(5) ``` ![](https://i.imgur.com/VfGHevN.png) ### UDF (User Defined Function) PySpark UDF is the most useful feature of Spark SQL & DataFrame that is used to extend the PySpark built-in capabilities. I will show here the steps for creating UDF for capitalizing the first character in each word. Steps of creating UDFs are: 1. Create a Python function. ```python! def convertCase(s): resStr="" arr = s.split(" ") for x in arr: resStr = resStr + x[0].upper() + x[1:len(x)] + " " return resStr ``` 2. Convert a Python function to PySpark UDF ```python! import pyspark.sql.functions as F from pyspark.sql.types import StringType capitalizeUDF = F.udf(lambda x: convertCase(x),StringType()) # Since the default return type of the udf() is StringType, you can write it as follows capitalizeUDF = F.udf(lambda x: convertCase(x)) ``` 3. Use the UDF ```python! df2.select("Film", capitalizeUDF(F.col("Film")).alias("Capitalized_Film")).show(5, truncate = False) ``` ![](https://i.imgur.com/84tA0GJ.png) :::danger Note: UDFs are treated as a black box to Spark hence it can not apply optimization and you will lose all the optimization PySpark does on Dataframe/Dataset. We recommend to use UDFs only if you do not have them as built-in functions. ::: # Spark SQL It is a module used for structured data processing. Spark SQL allows you to query structured data using either SQL or DataFrame API. The `pyspark.sql` is a module in Spark that is used to perform SQL-like operations on the data stored in memory. You can either leverage using programming API to query the data or use the ANSI SQL queries similar to RDBMS. You can also mix both, for example, use API on the result of an SQL query. <!-- Following are the important classes from the SQL module. - `pyspark.sql.SparkSession` – SparkSession is the main entry point for DataFrame and SQL functionality. - `pyspark.sql.DataFrame` – DataFrame is a distributed collection of data organized into named columns. - `pyspark.sql.Column` – A column expression in a DataFrame. - `pyspark.sql.Row` – A row of data in a DataFrame. - `pyspark.sql.GroupedData` – An object type that is returned by DataFrame.groupBy(). - `pyspark.sql.DataFrameNaFunctions` – Methods for handling missing data (null values). - `pyspark.sql.DataFrameStatFunctions` – Methods for statistics functionality. - `pyspark.sql.functions` – List of standard built-in functions. - `pyspark.sql.types` – Available SQL data types in Spark. - `pyspark.sql.Window` – Would be used to work with window functions. Regardless of what approach you use, you have to create a SparkSession which is an entry point to the Spark application. --> **Spark SQL** is one of the most used Spark modules for processing structured columnar data format. Once you have a DataFrame created, you can interact with the data by using SQL syntax. In other words, Spark SQL brings native RAW SQL queries on Spark meaning you can run traditional ANSI SQL on Spark Dataframe. In order to use SQL, first, register a **temporary table/view on DataFrame** using the `createOrReplaceTempView()` function. Once created, this table can be accessed throughout the SparkSession using `sql()` and it will be dropped along with your SparkContext termination. Use `sql()` method of the SparkSession object to run the query and this method returns a new DataFrame ## Data Description This dataset includes $44,341$ results of international football matches starting from the very first official match in $1872$ up to $2022$. The matches range from FIFA World Cup to FIFI Wild Cup to regular friendly matches. The matches are strictly men's full internationals and the data does not include Olympic Games or matches where at least one of the teams was the nation's B-team, U-23 or a league select team. **results.csv** includes the following columns: - **date** - date of the match - **home_team** - the name of the home team - **away_team** - the name of the away team - **home_score** - full-time home team score including extra time, not including penalty-shootouts - **away_score** - full-time away team score including extra time, not including penalty-shootouts - **tournament** - the name of the tournament - **city** - the name of the city/town/administrative unit where the match was played - **country** - the name of the country where the match was played - **neutral** - TRUE/FALSE column indicating whether the match was played at a neutral venue For the dataset of scorers and shootouts you can check this [Kaggle data card](https://www.kaggle.com/datasets/martj42/international-football-results-from-1872-to-2017). ## Spark SQL Examples Here we will use the dataset 1. Create SQL View - Load the data and read the `results` dataframe. ```python from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DateType schema = StructType([ StructField("date", DateType(), False), StructField("home_team", StringType(), False), StructField("away_team", StringType(), False), StructField("home_score", IntegerType(), False), StructField("away_score", IntegerType(), False), StructField("tournament", StringType(), False), StructField("city", StringType(), False), StructField("country", StringType(), False), StructField("neutral", BooleanType(), False), ]) # You can also use spark.read.csv function df = spark.read.format("csv").load("results.csv", header = True, schema = schema) df ``` - Creat the temporary view. ```python! df.createOrReplaceTempView("results_table") ``` 3. Spark SQL to Select Columns ```python! // DataFrame API Select query df.select("home_team","city","country","tournament") .show(5) // SQL Select query spark.sql("SELECT home_team, city, country, tournament FROM RESULTS_TABLE") .show(5) ``` 5. Filter Rows To filter the rows from the data, you can use $where()$ function from the DataFrame API. ```python! // DataFrame API where() df.select("country","city","home_team","tournament") .where("city == 'Moscow'") .show(5) ``` Similarly, in SQL you can use WHERE clause as follows. ```python! // SQL where spark.sql(""" SELECT country, city, home_team, tournament FROM RESULTS_TABLE WHERE city = 'Moscow' """) .show(5) ``` 6. Sorting ```python! // sorting df.select("country","city","home_team","tournament") .where("city in ('London','Paris','Moscow')") .orderBy("city") .show(10) // SQL ORDER BY spark.sql(""" SELECT country, city, home_team, tournament FROM RESULTS_TABLE WHERE city in ('London','Paris','Moscow') order by city """) .show(10) ``` 7. Grouping ```python! // grouping df.groupBy("city").count() .show() // SQL GROUP BY clause spark.sql(""" SELECT city, count(*) as count FROM RESULTS_TABLE GROUP BY city""") .show() ``` 8. SQL Join Operations PySpark SQL join has a below syntax and it can be accessed directly from DataFrame. ```scala! join(self, other, on=None, how=None) ``` join() operation takes parameters as below and returns DataFrame. - param *other*: Right side of the join - param *on*: a string for the join column name - param *how*: default inner. Must be one of inner, cross, outer,full, full_outer, left, left_outer, right, right_outer,left_semi, and left_anti. You can also write Join expression by adding $where()$ and $filter()$ methods on DataFrame and can have Join on multiple columns. - Create two Spark dataframes ```python! emp = [(1,"Smith",-1,"2018","10","M",3000), \ (2,"Rose",1,"2010","20","M",4000), \ (3,"Williams",1,"2010","10","M",1000), \ (4,"Jones",2,"2005","10","F",2000), \ (5,"Brown",2,"2010","40","",-1), \ (6,"Brown",2,"2010","50","",-1) \ ] empColumns = ["emp_id","name","superior_emp_id","year_joined", \ "emp_dept_id","gender","salary"] empDF = spark.createDataFrame(data=emp, schema = empColumns) empDF.printSchema() empDF.show(truncate=False) dept = [("Finance",10), \ ("Marketing",20), \ ("Sales",30), \ ("IT",40) \ ] deptColumns = ["dept_name","dept_id"] deptDF = spark.createDataFrame(data=dept, schema = deptColumns) deptDF.printSchema() deptDF.show(truncate=False) ``` - Create two tables ```python! empDF.createOrReplaceTempView("EMP") deptDF.createOrReplaceTempView("DEPT") ``` - Inner join ```python! # Join in pyspark.sql.DataFrame API empDF.join(deptDF,empDF.emp_dept_id == deptDF.dept_id,"inner") \ .show(truncate=False) # SQL INNER JOIN joinDF = spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id") \ .show(truncate=False) # SQL INNER JOIN joinDF2 = spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \ .show(truncate=False) ``` - Left join **Left** a.k.a **Leftouter join** returns all rows from the left dataset regardless of match found on the right dataset when join expression doesn’t match, it assigns null for that record and drops records from right where match not found. ```python! # Left Join in pyspark.sql.DataFrame API empDF.join(deptDF,empDF.emp_dept_id == deptDF.dept_id, "left") \ .show(truncate=False) # SQL LEFT JOIN joinDF = spark.sql("select * from EMP e LEFT OUTER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \ .show(truncate=False) ``` - Right Join ```python! # Right Join in pyspark.sql.DataFrame API empDF.join(deptDF,empDF.emp_dept_id == deptDF.dept_id, "right") \ .show(truncate=False) # SQL RIGHT JOIN joinDF = spark.sql("select * from EMP e RIGHT OUTER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \ .show(truncate=False) ``` - Full join ```python! # Full Join in pyspark.sql.DataFrame API empDF.join(deptDF,empDF.emp_dept_id == deptDF.dept_id, "full") \ .show(truncate=False) # SQL FULL JOIN joinDF = spark.sql("select * from EMP e FULL OUTER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \ .show(truncate=False) ``` You can read about Anti-joins, semi-joins and unions from [here] <!-- # Spark Execise 1 Write a Spark application for counting the number of words in the titles of the films. --> <!-- # Spark Exercise 2 Entity Resolution is a technique to identify data records in a single data source or across multiple data sources that refer to the same real-world entity and to link the records together. In Entity Resolution, the strings that are nearly identical, but maybe not exactly the same, are matched without having a unique identifier. In this exercise, you will read the data from a MongoDB database `mflix` from the lab 03. :::success **Exercises on PySpark** You need to work on a Zeppelin note. 1. Import the data from MongoDB database. 2. In Zeppelin, access the database and read the description of movies. 3. Search for character names in the description of movies. 4. Link the matched entities to the character names and display their information ::: # Spark Exercise 3 :::success **Exercises on PySpark** 1. Download an article whose size is at least 200 MB from [Wikimedia dump Feb.2023](https://dumps.wikimedia.org/enwiki/20230201). 2. Prepare a Zeppelin note to extract the words from the article. Get rid of digits and numbers and exclude any words contain digits. 3. Analyze the distribution of words in the article. 4. Draw a histogram shows the distribution of 20 top words from the article. 5. Answer the following queries: - What is the most popular word in the article? - Is there a semantic relationship between the most popular word and the title of the article? - Can you formulate a new meaningful title from the top words? - Do you think that the top words can be helpful for title generation? - Can you check if the distribution of words is normal or not? you can use any normality tests you know from your advanced statistics class. A helpful [resource](https://machinelearningmastery.com/a-gentle-introduction-to-normality-tests-in-python/). - What is the relative matching among top words? You can use any string matching distance/measure but you need to implement it in pyspark. ::: --> <!-- # Upcoming Lab The second part of Spark lab will be dedicated to learn more data analytics in Spark and how to use the package `pyspark.ml` for building distributed ML models for some classification problems. --> # References - [Apache Spark](https://spark.apache.org) - [Deep Dive into Apache Spark Transformations and Action](https://blog.knoldus.com/deep-dive-into-apache-spark-transformations-and-action/) - [Apache Spark RDD vs DataFrame vs DataSet](https://data-flair.training/blogs/apache-spark-rdd-vs-dataframe-vs-dataset/) - [Spark with Python (PySpark) Tutorial For Beginners](https://sparkbyexamples.com/pyspark-tutorial/) - [Learning PySpark](https://www.amazon.com/Learning-PySpark-Tomasz-Drabas/dp/1786463709)