# Apache Spark ## What is Apache Spark? * Unified Analytics Engine for big data and machine learning * In-memory cluster computing * MapReduce-based * Cluster deployment: Standalone, Yarn, K8s * Feature: Spark SQL, Spark Core (RDD), Spark Streaming, MLlib,... ![image](https://hackmd.io/_uploads/HJtmW9KUkl.png) Key characteristics: Speed: * cheap commodity servers, multithreading, paralle processing * computational graph, DAG -> tasks -> run in parallel across workers * optimized bytecode for execution (Tungsten - physical execution engine) * in-memmory storage for intermediate results Ease of Use: * provide a fundamental abstraction - Resilient Distributed Dataset (**RDD**), upon which all other higher-level structured data abstractions, such as DataFrames and Datasets, are constructed. Modularity: * many types of workloads, include the following modules (core): Spark SQL, Spark SQL, Spark Structured Streaming, Spark MLlib, and GraphX. ![image](https://hackmd.io/_uploads/r167m9Y81x.png) Extensibility: * Spark focuses on its fast, parallel computation engine rather than on storage, decouples computation and storage. * rich ecosystem of packages includes Spark connectors for a variety of external data sources, performance monitors, and more. ``` $ head /Users/hungnd/Code/data/flight-data/csv/2015-summary.csv DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count United States,Romania,15 United States,Croatia,1 United States,Ireland,344 # in Python flightData2015 = spark\ .read\ .option("inferSchema","true")\ .option("header","true")\ .csv("/Users/hungnd/Code/data/flight-data/csv/2015-summary.csv") flightData2015.show(3) +-----------------+-------------------+-----+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| +-----------------+-------------------+-----+ | United States| Romania| 15| | United States| Croatia| 1| | United States| Ireland| 344| +-----------------+-------------------+-----+ flightData2015.sort("count").take(2) dataFrameWay = flightData2015\ .groupBy("DEST_COUNTRY_NAME")\ .count()\ .orderBy('count', ascending=False) dataFrameWay.show(5) +-----------------+-----+ |DEST_COUNTRY_NAME|count| +-----------------+-----+ | United States| 125| | Anguilla| 1| | Russia| 1| | Paraguay| 1| | Senegal| 1| +-----------------+-----+ # SQL flightData2015.createOrReplaceTempView("flight_data_2015") sqlWay = spark.sql(""" SELECT DEST_COUNTRY_NAME, count(1) as count FROM flight_data_2015 GROUP BY DEST_COUNTRY_NAME ORDER BY count DESC """) sqlWay.show(5) +-----------------+-----+ |DEST_COUNTRY_NAME|count| +-----------------+-----+ | United States| 125| | Anguilla| 1| | Russia| 1| | Paraguay| 1| | Senegal| 1| +-----------------+-----+ ``` ## Spark's Execution ![image](https://hackmd.io/_uploads/B1ECFstL1g.png) **Spark driver** * As the part of the Spark application responsible for instantiating a SparkSession/SparkContext. You use a SparkContext object to interact with the Spark driver for your job run * Generating execution plans, converts your Spark application into one or more Spark jobs, then transforms each job into a DAG * Scheduling tasks and requesting tasks for Spark executors * Managing task progress and recovery **SparkSession** The ***entry point into all functionality in Spark is the SparkSession class***. ``` from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .config("spark.some.config.option", "some-value") \ .getOrCreate() ``` **Cluster Manager** The cluster manager is responsible for managing and allocating resources for the cluster of nodes on which your Spark application runs. Currently, Spark supports four cluster managers: the built-in standalone cluster manager, Apache Hadoop YARN, Apache Mesos, and Kubernetes. **Spark Executor** A worker for holding data and running tasks that are passed from the Spark driver. A Spark executor has multiple slots so that multiple tasks to be processed in parallel. Spark supports one task for each virtual CPU (vCPU) core by default. **Deployment modes** ![image](https://hackmd.io/_uploads/SySsijt81l.png) ![image](https://hackmd.io/_uploads/S1yPm35Uyl.png) **Resilient distributed dataset** Spark does the complex job of storing and tracking large data sets across Spark executors. When you write code for Spark jobs, you don't need to think about the details of storage. Spark provides the resilient distributed dataset (RDD) abstraction, which is a collection of elements that can be operated on in parallel and can be partitioned across the Spark executors of the cluster. ![store-data-memory](https://hackmd.io/_uploads/rystCBMvkl.png) RDDs consist of data divided into multiple parts called partitions. Each Spark executor stores one or more partitions in memory, and the data is distributed across multiple executors. RDDs are immutable, meaning they can't be changed after they're created. To change a DataFrame, you can use transformations, which are defined in the following section. RDDs replicate data across available nodes, so they can automatically recover from node failures. **Parallelism** ![distributed-processing-flow](https://hackmd.io/_uploads/SylYvIGDke.png) # Install Install pyspark with pip ``` pip3 install pyspark ``` # Spark Application **Application** Based on a Spark session (Spark context). Identified by a unique ID such as <application_XXX>. **Job** A parallel computation consisting of multiple tasks that gets spawned in response to a Spark action (e.g., save(), collect()). **Stage** Based on the shuffles created for an RDD. Each job gets divided into smaller sets of tasks called stages that depend on each other. **Task** A task is the minimum unit of processing scheduled by Spark and sent to a Sparak executor. Tasks are created for each RDD partition, and the number of tasks is the maximum number of simultaneous executions in the stage. ![spark-execution-plan](https://hackmd.io/_uploads/rk4g7IMPkl.png) **Transformations, Actions, Lazily Evaluation** RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset. * ***Transformations***: transform a Spark DataFrame into a new DataFrame without altering the original data, giving it the property of immutability. Put another way, an operation such as select() or filter() will not change the original DataFrame; instead, it will return the transformed results of the operation as a new DataFrame. All transformations in Spark are lazy, in that they don't compute their results right away. Instead, Spark remembers a series of transformations applied to some base dataset. The transformations are computed only when an action requires a result to be returned to the driver(***when asked for results***). * ***Actions***: trigger the jobs. Using transformations, you build up your logical transformation plan. To initiate the computation, you run an action such as write, count, show, or collect. ![image](https://hackmd.io/_uploads/BkJnJP5Uke.png) # Spark UI