# Notes of << Spark: The Definitive Guide >>
[TOC]
# Chapter 01 - What is Apache Spark
## In one sentence
***UNIFIED*** ***COMPUTING ENGINE*** to process data in parallel on computer clusters with a set of ***LIBRARIES*** for different usage (e.g., SQL, streaming, ML)
### unified
- support a wide range of data tasks, from analyzing to ML and streaming over the same engine and consistent set of APIs
- provide consistent, composable APIs that users can use to build an application out of smaller pieces or out of existing libraries
### computing engine
- focus on loading data from storage systems and implement computation, meaning Spark does not store data by itself
- Spark can load data from a wide variety of storage systems, (1) object storage: Amazon S3, Azure Storage, (2)distributed file system: Apache Hadoop, (3) key-value store: Apache Cassandra, (4) message buses: Apache Kafka
- different from Apache Hadoop including storage (Hadoop file system) and process (MapReduce) which are closely integrated
- it is hard to run only one of the system, indicating a challenge to write apps that access data stored anywhere
### libraries
- provide a unified API for common data tasks
- Spark SQL: for SQL and structured data
- MLlib: for machine learning
- Spark stream: for streaming processing
- GraphX: for graph analytics
## The background of Spark: big data problem
- due to limitation in heat dissipation
→ engineers worked on adding parallel CPU cores instead of making one core faster
→ applications needed to add parallelism to run faster
→ new era of programming models, like Apache Spark
- the cost of storage and collecting decreases regularly
→ collecting data is extremely inexpensive
# Chapter 02 - A General Introduction to Spark
## Insufficient power of single computer
- there are somethings that a single computer is not powerful enough to perform
→ use a group of computers to pool the resources of many machines (**Cluster**)
→ need a **framework to coordinate** work across computers and execute tasks
(e.g., Spark standalone cluster manager, YARN, or Mose)
→ Spark applications are submitted to cluster managers, which grants resources
## Spark Application
- Spark applications = a driver + a set of executors(In this graph, the concept of cluster nodes is removed → users can specify how many executors should fall on each node)
- cluster manager
- keeps track of resources available
- driver
- runs main() function
- responsible for executing the program’s commands across executors
- maintain info about Spark applications
- respond to users’ program or input
- analyze, distribute and schedule work across executors
- executors
- carry out the work driver assigns to them
- execute code assigned to it by the driver
- report the state of the computations
## Spark Language APIs
- Scala
- Java
- Python
- R
- SQL
## Spark Session
- a driver process to control Spark Application
→ way for Spark to execute user-defined manipulations across clusters
- 1-to-1 correspondence between SparkSession and Spark Application
Spark’s API

## DataFrames
- most common Structured API
- schema: a list defines the columns and types within these columns
## Partitions
- to allow every executors perform work in parallel
→ Spark breaks up data into chunks
→ partition
- a partition = collection of rows that sit on one physical machine in cluster
→ partition represents how data is physically distributed across the cluster
- parallelism of only one
- 1 partition + 1000 executors
- 1000 partition + 1 executors
- with DataFrames, users do not (for most part) manipulate partitions manually
→ users simply specify high-level transformations of data
→ Spark determines how this work will actually execute on the cluster
## Transformations
- In Spark, core data structures are immutable
→ they cannot be changes after they’re created
→ Users need to provide instructions on how they want to modify the data
→ these instruction = Transformation
- Spark will not act on transformation until users call an action
- Transformation can be regarded as users’ business logic using Spark
- logical plan of transformation is important
→ it defines a lineage for the DataFrame
→ at any given time, Spark knows how to recompute any partition by performing all of the operations it had before on the same input data
- execution plan of transformation → directed acyclic graph (DAG)
- Two types:
- Narrow dependencies (narrow transformation)
- each input partition contribute to only one output partitions
- e.g., val divisBy2 = myRange.where("number % 2 = 0")
- Spark automatically perform and operation called pipelining
→ the one or multiple operations will be performed in memory
- Wide dependencies (wide transformation)
- each input partition contribute to many output partitions→ refer to Shuffle
→ Spark exchange partitions across the clusters
- when performing a shuffle, Spark writes the results to disk
- by default, after performing a shuffle, Spark outputs 200 shuffle partitions
- if we run spark in local mode, meaning there are not many executors
→ we can set config to further specify the number of partition to fit it
→ `spark.conf.set("spark.sql.shuffle.partitions", "5")`
## Lazy evaluation
- Spark will wait until the very last moment to execute the graph of computation instructions
→ in this way, Spark can compile plans from transformation instructed to run the transformation as efficiently as possible across the cluster
→ e.g., predicate pushdown
## Actions
- instruct Spark to compute a result from a series of transformations(e.g., count)
- three kinds
- view data in the console
- collect data to native objects in the respective language (e.g., collect)
- write to output data sources
## DataFrames and SQL
- With Spark SQL, user can register and DataFrame as a table or view (a temporary table) and query it using pure SQL
- there is no different between writing SQL queries or writing DataFrame code
→ they both compile to the same underlying plan
- execution plan is a directed acyclic graph (DAG) of transformations
→ each resulting in a new immutable DataFrame
## End-to-end example
```scala=
val spark = SparkSession.builder
.appName("Chapter02")
.master("local[*]")
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate()
val flightData2015: DataFrame = spark
.read
.option("inferSchema", "true")
.option("header", "true")
.csv("src/main/resources/data/flight-data/csv/2015-summary.csv")
// Use explain to see the physical plan
// (reading with 1 partition and sorting with 4 partitions)
flightData2015.sort("count").explain
// Both SQL and DataFrame will be compiled to the same plan
flightData2015.createOrReplaceTempView("flight_data_2015")
val sqlWay = spark.sql(
s"""
|SELECT DEST_COUNTRY_NAME, count(1)
|FROM flight_data_2015
|GROUP BY DEST_COUNTRY_NAME
|""".stripMargin)
sqlWay.explain
val dataFrameWay = flightData2015
.groupBy("DEST_COUNTRY_NAME")
.count()
dataFrameWay.explain
// Perform some complex operations
val sumSQLWay = spark.sql(
s"""
|SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
|FROM flight_data_2015
|GROUP BY DEST_COUNTRY_NAME
|ORDER BY destination_total DESC
|LIMIT 5
|""".stripMargin)
val sumDataFrameWay1 = flightData2015
.groupBy("DEST_COUNTRY_NAME")
.agg(sum("count").as("destination_total"))
.sort(col("destination_total").desc)
.limit(5)
val sumDataFrameWay2 = flightData2015
.groupBy("DEST_COUNTRY_NAME")
.sum("count")
.withColumnRenamed("sum(count)", "destination_total")
.sort(desc("destination_total"))
.limit(5)
```


# Chapter 03 - A Tour of Spark’s Toolset
## vast ecosystem of tools and libraries

## spark-submit
- let users send applications to a cluster and launch it to execute
- offer several controls with which users can specify the resources and detail
```bash!
./bin/spark-submit
--class org.apache.spark.examples.SparkPi
--master local \ ./examples/jars/spark-examples_2.11-2.2.0.jar 10
```
## Dataset: type-safe APIs for structure data
- only available for Java and Scala since they are statically typed language
(unavailable for Python and R for their dynamically typed feature)
- users can assign Java/Scala class with a DataFrame and manipulate it as a collection of typed objects (Java ArrayList & Scala Seq)
- type-safe means the class of the objects cannot be changed
→ especially useful for large project needing well-defined
- one great thing about Datasets is that users can use them only when they need
→ makes it easy to drop down to lower level, perform type-safe coding when necessary, and move higher up to SQL for more rapid analysis
- another advantage is that when users call collect or take on a Dataset, it will collect objects of the proper type in your Dataset, not DataFrame Rows
→ makes it easy to get type safety and securely perform manipulation in a distributed and a local manner without code changes
```scala=
case class Flight(DEST_COUNTRY_NAME: String,
ORIGIN_COUNTRY_NAME: String,
count: Int)
val flightData2015Parquet = spark
.read
.parquet("2010-summary.parquet")
val flightData2015DataSet = flightData2015Parquet.as[Flight]
flightData2015DataSet
.filter(flightRow => flightRow.ORIGIN_COUNTRY_NAME != "Canada")
.show
```
## Structured streaming
- reduce latency and allow for incremental processing
- it is easy to modify batch code to streaming code
- the actions in straming are a bit different from static (batch) actions
- static action: call something like count → but it doesn’t make sense in streaming
- streaming action: populating data somewhere
→ output to an in-memory table
→ it will update after every trigger
```scala=
val streamingDataFrame = spark
.readStream
.schema(staticSchema)
.option("header", "true")
.option("maxFilesPerTrigger", "1")
.format("csv")
.csv("retail-data/by-day/*.csv")
val query = purchaseByCustomerPerHour
.writeStream
.format("console")
.queryName("customer_purchases")
.outputMode("complete")
.start()
```
# Chapter 04 - Structured API Overview
> Spark is a distributed programming model.
> User specifies transformations and they build up a DAG (directed acyclic graph).
> An action begins the process of executing the instructions
## Datasets & DataFrames
### Overview
- Spark uses an engine called **Catalyst** to maintain its own type information
→ users code written in Python or Scala
→ the majority of it will operate on Spark types
- Within the Structured APIs, there are two more APIs
- DataFrames:
- untyped → Spark only check the types specified in schema at runtime
- Datasets of Type Row (optimized in-memory format)
- by using Row, Spark can operate on its own internal format without incurring any cost on using JVM types, which can cuase high garbage-collection and object instantiation cost
- Datasets:
- typed→ Spark check whether types conform to schema at compile time
- only available to JVM-based language (Scale & Java)
→ specify types with case classes in Scala or Java beans
- for the most part, users likely to work with DataFrames
→ using DataFrame means taking advantages of Spark’s optimized internal format
### Common
- Both are (distributed) table-like collections with well-defined rows & columns
- Both represent immutable, lazily evaluated plans that specify the operations
### Schema
- defines the column names and types of a DataFrame
- users define schemas manually or read a schema from data sources (schema on read)
## Columns and Rows
- columns represent a type of information
- rows are a record of data
- each record in a DataFrame must be of type Row
- can create manually from SQL, from RDD, from data sources, or from scratch
## Structured API execution
### Steps overview
1. Write DataFrame/Dataset/SQL code
2. Spark convert code to logical plan if valid code
- logical plan = a set of abstract transformation
→ optimized version, not refer to executors and drivers
- unresolved logical plan = the existance of tables & columns is still unknown yet
- Spark uses catalog to check wheter tables & columns exist
- passed logical plan will be sent to Catalyst Optimizer to optimize it

4. Spark tranform logical plan to physical plan
- physical plan specifies how logicla plan will execute on cluster
- generate different physical execution strategies
- compare plans through a cost model
- results in a series of RDDs and transformation

5. Spark executes physical plan (RDD manipulation) on the cluster
- Spark runs all of this code over RDDs
- Spark performs further optimization at runtime
# Chapter 05 - Basic Structured Operations
## Some terms
- DataFrame: a series of records (Rows), and a number of columns
- Schema: defines name as well as the type of the data in each column
- Partition: defines layouts of DataFrame’s physical distribution across cluster
- parition scheme: defines how is partition is allocated
## Schema
- When using Spark for production ETL, it’s a good idea to define schema manually although schema-on-read (let the data source defines the schema) is available
- reason 1: schema-on-read might be slow with plain-text file (e.g., csv)
- reason 2: schema-on-read might lead to precission issues (long → int)
- Spark maintains its own data types so users cannot simply set types via the pre-langauge types
- a schema is a StructType made up of a number of fields (StructFields)
- name
- type
- Boolean flag specifies whether that column can contain missing or null values.
- Users can specify metadata with that column (sotring info about the columns)

## Columns
- columns are logical constructions that simply represent a value computed on a per-record basis by means of an expression
- specify columns with "col" or "column" methods
## Expressions
- expressions: a set of transformation on values in a record
- a function takes input to create a single value
- the output value can be a complex type like array
- create expressions via the expr function (a column reference) in single case
→ expr("someCol") = col("someCole")
- **columns are expressions**
→ col("someCol") - 5 = expr("someCol" - 5)
- if use expr(), the expr function can pase transformation and column references
- code in dataframe code or as SQL experssion compile to same logical tree
```scala=
expr("(((someCol + 5) * 200) - 6) < otherCol")
(((col("someCol") + 5) * 200) - 6) < col("otherCol")
```
## Records and Rows
- only DataFrame have schema
→ rows do not have schema
→ when creating a Row manually, must specify the values in the same order as the schema of the DataFrame
- accessing data in rows is equally as easy: just specify the positio
- in Scala or Java, users must either use helper methods or explicitly coerce values
- in Python or R, the value will automatically be coerced into the correct type
## DataFrame transformation
- core operations
- add rows & columns
- remove rows & columns
- transform row into column (or vice versa)
- change order of rows based on values in columns

- Refer to columns
- expr is the most flexible reference that we can use.
→ it can refer to a plain column or a string manipulation of a column.
- selectExpr: a simple way to build up complex expressions to create DF
- ! Do not mix Column objects and strings → compiler error
→ df.select(col("DEST_COUNTRY_NAME"), "DEST_COUNTRY_NAME")
```scala=
import spark.implicits._
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)
df.select(
df.col("DEST_COUNTRY_NAME"),
col("DEST_COUNTRY_NAME"),
column("DEST_COUNTRY_NAME"),
expr("DEST_COUNTRY_NAME"),
$"DEST_COUNTRY_NAME",
).show(2)
// Do not mix column objects and string names
df.select(
"DEST_COUNTRY_NAME"
).show(2)
// expr is more flexible reference than col
df.select(expr("DEST_COUNTRY_NAME AS destination")).show((2))
df.select(expr("DEST_COUNTRY_NAME AS destination").alias("DEST_COUNTRY_NAME")).show((2))
// selectExpr is a single way to build up complex expressions
df.selectExpr(
"*",
"(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) AS withinCountry"
) show (2)
df.selectExpr(
"avg(count)", "count(distinct(DEST_COUNTRY_NAME))"
).show(2)
```
- Add columns: lit & withColumn
- lit: pass explicit values into Spark
- withColumn: more formal way to add new column
- take 2 arguments:
1. column name
2. expression that will create the value for the given row in the DF
```scala=
// Literal values: converting values from a given programming language to one that Spark can understand
df.select(expr("*"), lit(1).as("One")).show(2)
df.selectExpr("*", "1 as One").show(2)
// withColumn
df.withColumn("Destination", col("DEST_COUNTRY_NAME")).show(2)
```
- Rename columns: withColumnRenamed
- Reserve characters and keywords: backtick(`)
- Case sensitivity: default is false
→ user can change: set spark.sql.caseSensitive true
```scala=
// Rename columns
df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").show(2)
val dfWithLongColName = df.withColumn("This Long Column-Name", expr("ORIGIN_COUNTRY_NAME"))
dfWithLongColName.selectExpr(
"`This Long Column-Name`",
"`This Long Column-Name` as `new col`"
).show(2)
```
- Remove columns: drop
```scala=
// Remove columns
dfWithLongColName.drop("new col").show(2)
```
- Change colum’s type: cast
```scala=
// Change column type
df.withColumn("casted", col("count").cast("Int")).printSchema
```
- Filter rows: create expressions and filter with where or filter
- if we want to specify multiple AND filters, just chain them
→ Spark automatically perform all filtering operations at the same time
→ regardless of the filter ordering)
```scala=
// Filter rows
df.filter(col("count") < 2).show(2)
df.filter(expr("count < 2")).show(2)
df.where(col("count") < 2).show(2)
df.where(expr("count < 2")).show(2)
// Chain filters: Just chan the filter operations and let Spark handle the rest
df.where(col("count") < 2).filter(expr("ORIGIN_COUNTRY_NAME = 'Croatia'")).show(2)
```
- Get unique rows: distinct
```scala=
// Distinct Rows
val distinctOriginCountries = df.selectExpr("ORIGIN_COUNTRY_NAME").distinct.count
println(distinctOriginCountries)
```
- Random samples: sample
```scala=
// Sampling
val seed = 5
val withReplacement = false
val fraction = 0.5
df.sample(withReplacement, fraction, seed).show(2)
```
- Random split: randomSplit
```scala=
// Random splits
val dataFrames = df.randomSplit(Array(0.25, 0.75), seed)
dataFrames(0).count > dataFrames(1).count
```
- Concatenating and appending rows (union)
- dataframes are immutable
→ to append data to a dataframe
→ union original dataframe with new dataframe but schema need to be the same)
```scala=
// Union
val schema = df.schema
val newRows = Seq(
Row("New Country", "Other Country", 5L),
Row("New Country 2", "Other Country 3", 1L)
)
val originalCounts = df.count
val parallelizedDf = spark.sparkContext.parallelize(newRows)
val newDf = spark.createDataFrame(parallelizedDf, schema)
val unionCounts = df.union(newDf).count
println(unionCounts - originalCounts)
df.union(newDf)
.where("count = 1")
.where($"ORIGIN_COUNTRY_NAME" =!= "United States") // =!=: compare to the evaluated value
.show
```
- Sorting rows: sort and orderBy + asc / descasc_null_first, desc_null_first, asc_null_last, desc_null_first
- for optimization purpose, try to use sortWithinPartitions to sort withing each partition before another set of transformation
```scala=
// Random splits
val dataFrames = df.randomSplit(Array(0.25, 0.75), seed)
dataFrames(0).count > dataFrames(1).count
```
- Limit: limit
```scala=
// Limit
df.limit(5)
```
- Repartition and Coalesce
- an important optimization opportunity is to partition the data according to some frequently filtered columns
- control the physical layout of data across the cluster
- repartition incurs a shuffle of the data
- only repartition when
- the future # of partition is greater than current # of partitions
- looking to partition by a set of columns
- users can specify the # of partitions
- coalesce: not incur a full shuffle→ try to conbine partitions
```scala=
df.rdd.getNumPartitions
df.repartition(2)
df.repartition(col("DEST_COUNTRY_NAME"))
df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(1)
```
- Collect rows:
- can be a very expensive operation
- collect data on clusters to manipulate it on local machine
- collect: gets all data from the entire dataframe
- take: select the first N rows
- show: print a number of rows
- toLocalIterator: collect partition to the driver as an iterator
```scala=
// Collect
val collectDF = df.limit(10)
collectDF.take(5)
collectDF.show
collectDF.show(5, false)
collectDF.collect
```
# Chapter 06 - Working with Different Types of Data
- Where to look for APIs (http://bit.ly/2rKkALY)
- DataFrame (DataSet) Methods
- Column Methods
- Convert to Spark Types
- lit: convert a type in other language to corresponding Spark representation
```scala=
df.select(lit(5), lit("five"), lit(5.0)).printSchema
```
- Working with Booleans
- Booleans are essential when it comes to data analysis since they are the foundation for all filtering
- four elements: and, or, true, false
- in Spark, we should always chain together and filters as a sequential filter
→ avoid to use and or or to connect multiple parts of Boolean expressions→ chain filters sequentially is more easily to understand
- if working with null data when creating Boolean expressions, we need to treat things a bit differently
```scala=
// Booleans: foundations for filtering
df.where(col("InvoiceNo").notEqual(536365))
.select("InvoiceNo", "StockCode", "Description")
.show(5)
df.where(col("InvoiceNo").equalTo(536365))
.select("InvoiceNo", "StockCode", "Description")
.show(5, truncate = false)
df.where(col("InvoiceNo") =!= 536365)
.select("InvoiceNo", "StockCode", "Description")
.show(5, truncate = false)
df.where(col("InvoiceNo") === 536365)
.select("InvoiceNo", "StockCode", "Description")
.show(5, truncate = false)
df.where("InvoiceNo <> 536365")
.select("InvoiceNo", "StockCode", "Description")
.show(5, truncate = false)
df.where("InvoiceNo = 536365")
.select("InvoiceNo", "StockCode", "Description")
.show(5, truncate = false)
// Chain multiple filter conditions (no need to add 'and' or 'or')
df.withColumn("isExpensive", not(col("UnitPrice").leq(250)))
.filter("isExpensive")
.select("Description", "UnitPrice")
.show(5, truncate = false)
df.withColumn("isExpensive", expr("UnitPrice > 250"))
.filter("isExpensive")
.select("Description", "UnitPrice")
.show(5, truncate = false)
df.withColumn("isExpensive", expr("Not UnitPrice <= 250"))
.filter("isExpensive")
.select("Description", "UnitPrice")
.show(5, truncate = false)
df.where(col("Description").eqNullSafe("hello")).show
```
* Working with numbers
```scala=
// power function
val fabricateQuantity = pow(col("Quantity") * col("UnitPrice"), 2) + 5
df.select(expr("CustomerId"), col("Quantity"), column("UnitPrice"), fabricateQuantity.alias("realQuantity"))
.show(2)
df.selectExpr(
"CustomerId",
"POWER(Quantity * UnitPrice, 2.0) + 5 as realQuantity")
.show(2)
// round function
df.select(round(lit(2.5)), bround(lit(2.5))).show(2)
// correlation
df.stat.corr("Quantity", "UnitPrice")
df.select(corr("Quantity", "UnitPrice")).show
// describe
df.describe().show
df.stat.crosstab("StockCode", "Quantity").show
// add unique id
df.select(monotonically_increasing_id()).show(2)
```
* Working with strings
```scala=
// uppercase and lowercase
df.select(initcap(col("Description"))).show(2, truncate = false)
df.select(col("Description"), lower(col("Description")), upper(col("Description"))).show(2, truncate = false)
// trim and pad
df.select(
ltrim(lit(" HELLO ")).as("ltrim"),
rtrim(lit(" HELLO ")).as("rtrim"),
trim(lit(" HELLO ")).as("trim"),
lpad(lit("HELLO "), 2, " ").as("lpad"),
rpad(lit("HELLO "), 6, " ").as("rpad")
).show(1, truncate = false)
//regex
val simpleColors = Seq("black", "white", "red", "green", "blue")
val regexString = simpleColors.map(_.toUpperCase).mkString("|")
df.select(
regexp_replace(col("Description"), regexString, "COLOR").alias("color_clean"),
col("Description")
).show(3, truncate = false)
// replace
df.select(translate(col("Description"), "LEWT", "1347"), col("Description")).show(3, truncate = false)
// extract
val regexString2 = simpleColors.map(_.toUpperCase).mkString("(", "|", ")")
println(regexString2)
df.select(
col("Description"),
regexp_extract(col("Description"), regexString2, 1).alias("color_clean"))
.show(3, truncate = false)
// check existence
val containsBlack = col("Description").contains("BLACK")
val containsWhite = col("Description").contains("WHITE")
df.withColumn("hasSimpleColor", containsBlack.or(containsWhite))
.where("hasSimpleColor")
.select("Description")
.show(3, truncate = false)
val selectedColumns = simpleColors.map( color => {
col("Description").contains(color.toUpperCase).alias(s"is_$color")
}
):+ expr("*")
df.select(selectedColumns:_*)
.where(col("is_white").or(col("is_red")))
.select("Description")
.show(3, truncate = false)
```
* working with dates and timestamps
* Spark’s TimestampType class supports only **second-level** precision, which means that if you’re going to be working with milliseconds or microseconds, you’ll need to work around this problem by potentially operating on them as longs.
* Spark will not throw an error if it cannot parse the date; rather, it will just return null.
* Implicit type casting is an easy way to shoot yourself in the foot, especially when dealing with null values or dates in different timezones or formats.
→ it is recommend that we parse them explicitly instead of relying on implicit conversions.
```scala=
val dateDf = spark
.range(10)
.withColumn("today", current_date())
.withColumn("now", current_timestamp())
dateDf.createOrReplaceTempView("dateTable")
dateDf.printSchema
dateDf.select(date_sub(col("today"), 5), date_add(col("today"), 5)).show
dateDf.withColumn("week_ago", date_sub(col("today"), 7))
.select("today", "week_ago")
.show(1)
dateDf.select(
to_date(lit("2016-01-01")).alias("start"),
to_date(lit("2017-05-22")).alias("end"))
.select(months_between(col("end"), col("start")))
.show(1)
val dateFormat = "yyyy-dd-MM"
val cleanDateDf = spark.range(1).select(
to_date(lit("2016-12-11"), dateFormat).alias("start"),
to_date(lit("2016-20-11"), dateFormat).alias("end"),
)
cleanDateDf.show()
cleanDateDf.createOrReplaceTempView("dateTable2")
cleanDateDf.select(to_timestamp(col("start")), to_date(col("end"))).show
```
* working with nulls in Data
* As a best practice, you should always use nulls to represent missing or empty data in your DataFrames→ Spark can optimize working with null values more than it can if you use empty strings or other values.
* when you define a schema in which all columns are declared to not have null values, Spark will not enforce that and will happily let null values into that column→ The nullable signal is simply to help Spark SQL optimize for handling that column. If you have null values in columns that should not have null values, you can get an incorrect result or see strange exceptions that can be difficult to debug.
* There are two things you can do with null values: you can explicitly drop nulls or you can fill them with a value (globally or on a per-column basis).
* specifying "any" as an argument drops a row if any of the values are null. Using “all” drops the row only if all values are null or NaN for that row
```scala=
// coalesce: returns the first non-null value
df.select(coalesce(col("Description"), col("InvoiceNo"))).show
df.na.drop
df.na.fill("Convert all nulls to this")
val fillColValues = Map("StockCode" -> 5, "Description" -> "No Value")
df.na.fill(fillColValues)
df.na.replace("Description", Map("" -> "UNKNOWN"))
```
* working with complex types
* Structs: DataFrames within DataFrames
* Array: combine elements ino a row
* Map: key-value pairs of columns
```scala=
// complex types - struct
df.select(struct(col("Description"), col("InvoiceNo")).alias("complex"))
.selectExpr("complex")
.show(2, truncate = false)
df.select(struct(col("Description"), col("InvoiceNo")).alias("complex"))
.selectExpr("complex.Description", "complex.InvoiceNo")
.show(2, truncate = false)
df.select(struct(col("Description"), col("InvoiceNo")).alias("complex"))
.select(col("complex").getField("InvoiceNo"))
.show(2, truncate = false)
// complex types - array
df.select(split(col("Description"), " " ).alias("words"))
.selectExpr("words[0]")
.show(2, truncate = false)
df.select(size(split(col("Description"), " "))).show(2, truncate = false)
df.select(array_contains(split(col("Description"), " "), "WHITE")).show(2, truncate = false)
df.withColumn("splitted", split(col("Description"), " "))
.withColumn("exploded", explode(col("splitted")))
.select("Description", "splitted", "exploded")
.show(10, truncate = false)
// complex types - map
df.select(map(col("Description"), col("InvoiceNo")).alias("complex")).show(5, truncate = false)
df.select(map(col("Description"), col("InvoiceNo")).alias("complex"))
.selectExpr("complex['WHITE METAL LANTERN']").show(5, truncate = false)
df.select(map(col("Description"), col("InvoiceNo")).alias("complex"))
.selectExpr("explode(complex)").show(5, truncate = false)
```
* working with json
```scala=
// json
val jsonDf = spark.range(1).selectExpr(
"""
|'{"myJSONKey" : {"myJSONValue" : [1, 2, 3] }}' as jsonString
|""".stripMargin)
jsonDf.select(
get_json_object(col("jsonString"), "$.myJSONKey.myJSONValue[1]") as "column",
json_tuple(col("jsonString"), "myJSONKey") as "column2"
).show(2, truncate = false)
df.selectExpr("(InvoiceNo, Description) as complex")
.select(to_json(col("complex")))
.show(2, truncate = false)
val parseSchema = new StructType(Array(
StructField("InvoiceNo", StringType, nullable = true),
StructField("Description", StringType, nullable = true)
))
df.selectExpr("(InvoiceNo, Description) as complex")
.select(to_json(col("complex")).alias("newJson"))
.select(from_json(col("newJson"), parseSchema))
.show(5, truncate = false)
```
* user-defined functions
* it is recommended to write UDF in Scala or Java→ since written UDF in Python can cause resources constrained
```scala=
// user defined functions
val udfExampleDf = spark.range(5).toDF("num")
def power3(num: Double): Double = num * num * num
val power3DF = udf(power3(_: Double):Double)
udfExampleDf.select(power3DF(col("num"))).show()
spark.udf.register("power3", power3(_: Double):Double)
udfExampleDf.selectExpr("power3(num)").show()
```
# Chapter 07 - Aggregations
* Aggregating is the act of collecting something together and is a cornerstone of big data analytics
* In an aggregation, you will specify a key or grouping and an aggregation function that specifies how you should transform one or more columns
* each grouping returns a RelationalGroupedDataset on aggregations
* group types
* dataframe level
* group by
* window
* grouping set
* rollup
* cube
* Aggregation functions
* count & count distinct & approx count
* when performing a count(*), Spark will count null values (including rows containing all nulls).
* However, when counting an individual column, Spark will not count the null values
```scala=
// count & count distinct & approx_count_distinct
df.select(count("StockCode")).show
df.select(countDistinct("StockCode")).show
df.select(approx_count_distinct("StockCode", 0.1)).show
```
* first and last & min and max
```scala=
// first and last
df.select(first("StockCode"), last("StockCode")).show
// min and max
df.select(min("Quantity"), max("Quantity")).show
```
* sum & sumDistinct
```scala=
// sum and sum distinct
df.select(sum("Quantity")).show
df.select(sum_distinct(col("Quantity"))).show
```
* avg
```scala=
// avg
df.select(
count("Quantity") as "Total_Quantity",
sum("Quantity") as "Total_Sales",
avg("Quantity") as "Avg_Sales",
expr("mean(Quantity)") as "Mean_Sales"
).selectExpr(
"Total_Sales / Total_Quantity",
"Avg_Sales",
"Mean_Sales"
).show(5)
```
* variance and standard deviation
```scala=
By default, Spark performs the formula for the sample standard deviation or variance if you use the variance or stddev functions.
df.select(
var_pop("Quantity"),
var_samp("Quantity"),
stddev_pop("Quantity"),
stddev_samp("Quantity")
).show
```
* skewness and kurtosis
```scala=
df.select(skewness("Quantity"), kurtosis("Quantity")).show
covariance and correlation
df.select(
corr("InvoiceNo", "Quantity"),
covar_samp("InvoiceNo", "Quantity"),
covar_pop("InvoiceNo", "Quantity")
).show
```
* aggregation to complex types
```scala=
df.agg(
collect_set("Country"),
collect_list("Country")
).show
```
* grouping
* First we specify the column(s) on which we would like to group, and then we specify the aggregation(s).
→ The first step returns a RelationalGroupedDataset, and the second step returns a DataFrame
```scala=
df.groupBy("InvoiceNo", "CustomerId").count().show
df.groupBy("InvoiceNo").agg(
count("Quantity").as("total_quantity"),
expr("count(Quantity)")
).show
df.groupBy("InvoiceNo").agg(
"Quantity" -> "avg",
"Quantity" -> "sum",
"Quantity" -> "stddev_pop").show
```
* windowing
* carry out uniqueaggregation by either computing some aggregation on s specific "window" of data
* window specification determines which rows will be passed in to the function
* a window function calculates a return value for every input row of a table based on a group of rows, called a frame→ each row can fall into one or more frames
* Spark supports three kinds of window functions
* ranking
* analytical
* aggregate
```scala=
val dfWithDate = df
.withColumn("date", to_timestamp(col("InvoiceDate"), "MM/dd/yyyy HH:mm"))
dfWithDate.createOrReplaceTempView("dfWithDate")
val windowSpec = Window
.partitionBy("CustomerId", "date")
.orderBy(col("Quantity").desc)
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
val maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)
val purchaseDenseRank = dense_rank().over(windowSpec)
val purchaseRank = rank().over(windowSpec)
dfWithDate.where("CustomerId is not null").orderBy("CustomerId")
.select(
col("CustomerId"),
col("date"),
col("Quantity"),
purchaseRank.alias("quantityRank"),
purchaseDenseRank.alias("quantityDenseRank"),
maxPurchaseQuantity.alias("maxQuantity")
).show
```
* grouping set
* an aggregation across multiple groups
* The GROUPING SETS operator is only available in SQL.
* To perform the same in DataFrames, you use the rollup and cube operators—which allow us to get the same results
```scala=
// grouping sets
val dfNoNull = dfWithDate.drop()
dfNoNull.createOrReplaceTempView("dfNotNull")
val groupingSetSQL: String =
"""
|SELECT CustomerId, stockCode, sum(Quantity) FROM dfNotNull
|GROUP BY CustomerId, stockCode GROUPING SETS ((CustomerId, stockCode))
|ORDER BY CustomerId DESC, stockCode DESC
|""".stripMargin
spark.sql(groupingSetSQL).show
```
* rollups
* A null in both rollup columns specifies the grand total across both of those columns
```scala=
val rolledUpDf = dfNoNull.rollup("Date", "Country").agg(sum("Quantity"))
.selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity")
.orderBy("Date")
rolledUpDf.show
```
* cube
* Rather than treating elements hierarchically, a cube does the same thing across all dimensions
```scala=
// cube
val cubeDf = dfNoNull.cube("Date", "Country").agg(sum("Quantity"))
.selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity")
.orderBy("Date")
cubeDf.show
```
* ChatGPT’s explanation
* 差異總結:
* Grouping Sets:你可以自己指定想要的分組方式,手動選擇哪幾組需要聚合。
* Rollup:自動生成層級式的聚合,從最詳細到最總體。例如:按列A、A+B、A+B+C等層級進行彙總。
* Cube:自動生成所有可能的列組合,包括每個子集的聚合,適合多維度的數據分析。
* 關鍵點:
* Grouping Sets 給你最靈活的控制,讓你可以精確指定你想要的聚合組合。
* Rollup 主要用於層次式聚合,適合逐層查看數據。
* Cube 是最全面的,它會自動幫你生成所有可能的組合。
* 白話說明:
* Grouping Sets 就像點菜時,你自己挑選幾道特定的菜(彙總組合),只吃你點的那些。
* Rollup 就像是一層層品味,先吃最詳細的,再吃大菜,最後把所有東西混合吃一口。
* Cube 就是直接把所有菜都擺在桌上,任何可能的組合都幫你準備好了。
* grouping metadata
* query the aggregation level to easily filter→ using grouping_id
```scala=
// grouping metadata
dfNoNull.cube("customerId", "stockCode").agg(grouping_id(), sum("Quantity"))
.orderBy(expr("grouping_id()").desc)
.show
```
* pivot
* Pivots make it possible for you to convert a row into a column
```scala=
val pivoted = dfWithDate.groupBy("date").pivot("Country").sum()
pivoted.show()
pivoted.where("date > '2011-12-05'").select("date", "USA_sum(Quantity)").show
```
* user-defined aggregatiom functions
# Chapter 08 - Joins
* A join brings together two sets of data, the left and the right, by comparing the value of one or more keys of the left and right and evaluating the result of a join expression that determines whether Spark should bring together the left set of data with the right set of data.
* the join type determines what should be in the result set. There are a variety of different join types available in Spark for you to use
## Some dataframes used to demonstrate different joins
* Person

* Graduate Program

* Spark Status

## Inner joins: keep rows w/ keys existing in both df
* Inner joins are the default join, so we just need to specify our left DataFrame and join the right in the JOIN expression
```scala=
// Inner Join
val innerJoinExpr = person.col("graduate_program") === graduateProgram.col("id")
var joinType = "inner"
person.join(graduateProgram, innerJoinExpr).show(truncate = false)
person.join(graduateProgram, innerJoinExpr, joinType).show(truncate = false)
val innerJoinQuery: String =
s"""
|select * from person as p
|join graduateProgram as g on p.graduate_program = g.id
|""".stripMargin
spark.sql(innerJoinQuery).show(truncate = false)
```



## Outer joins: keep rows w/ keys in either left or right df
* If there is no equivalent row in either the left or right DataFrame, Spark will insert null
```scala=
// Outer Join
val outerJoinExpr = person.col("graduate_program") === graduateProgram.col("id")
joinType = "outer"
person.join(graduateProgram, outerJoinExpr).show(truncate = false)
person.join(graduateProgram, outerJoinExpr, joinType).show(truncate = false)
val outerJoinQuery: String =
s"""
|select * from person as p
|full outer join graduateProgram as g on p.graduate_program = g.id
|""".stripMargin
spark.sql(outerJoinQuery).show(truncate = false)
```



## Left outer joins: keep rows w/ keys in left df
* If there is no equivalent row in the right DataFrame, Spark will insert null
```scala=
joinType = "left_outer"
person.join(graduateProgram, JoinExpr, joinType).show(truncate = false)
val leftOuterJoinQuery: String =
s"""
|select * from person as p
|left outer join graduateProgram as g on p.graduate_program = g.id
|""".stripMargin
spark.sql(leftOuterJoinQuery).show(truncate = false)
```



## Right outer joins: keep rows w/ keys in right df
* If there is no equivalent row in the left DataFrame, Spark will insert null
```scala=
joinType = "right_outer"
person.join(graduateProgram, JoinExpr, joinType).show(truncate = false)
val rightOuterJoinQuery: String =
s"""
|select * from person as p
|right outer join graduateProgram as g on p.graduate_program = g.id
|""".stripMargin
spark.sql(rightOuterJoinQuery).show(truncate = false)
```



## Left semi joins: keep rows in left, and only the left df where the key appears in right df
* Semi joins are a bit of a departure from the other joins.
→ They do not actually include any values from the right DataFrame.
* They only compare values to see if the value exists in the second DataFrame.
→ If the value does exist, those rows will be kept in the result, even if there are duplicate keys in the left DataFrame
```scala=
// Left Semi Join
logger.info("""Left Semi Join""")
joinType = "left_semi"
graduateProgram.join(person, JoinExpr, joinType).show(truncate = false)
val leftSemiJoinQuery: String =
s"""
|SELECT * FROM graduateProgram
|LEFT SEMI JOIN person ON graduateProgram.id = person.graduate_program
|""".stripMargin
spark.sql(leftSemiJoinQuery).show(truncate = false)
val gradProgram2 = graduateProgram.union(Seq(
(0, "Masters", "Duplicated Row", "Duplicated School")).toDF())
gradProgram2.createOrReplaceTempView("gradProgram2")
gradProgram2.join(person, JoinExpr, joinType).show(truncate = false)
```



(Adding a duplicated row)


## Left anti joins: keep rows in left, and only the left df where they do not appear in right df
* Left anti joins are the opposite of left semi joins.
* They only compare values to see if the value exists in the second DataFrame.
→ keep only the values that do not have a corresponding key in the second DataFrame
* Think of anti joins as a NOT IN SQL-style filter
```scala=
// Left Anti Join
logger.info("""Left Anti Join""")
joinType = "left_anti"
graduateProgram.join(person, JoinExpr, joinType).show(truncate = false)
val leftAntiJoinQuery: String =
s"""
|SELECT * FROM graduateProgram
|LEFT ANTI JOIN person ON graduateProgram.id = person.graduate_program
|""".stripMargin
spark.sql(leftAntiJoinQuery).show(truncate = false)
```



## Natural joins: perform a join by implicitly matching the columns between the two df with same names
* Natural joins make implicit guesses at the columns on which you would like to join
## Cross (or Cartesian) joins: match every row in left df with every row in right df
* Cross joins will join every single row in the left DataFrame to ever single row in the right DataFram
```scala=
// Cross Join
logger.info("""Cross Join""")
joinType = "cross"
graduateProgram.join(person, JoinExpr, joinType).show(truncate = false)
graduateProgram.crossJoin(person).show(truncate = false)
val crossJoinQuery: String =
s"""
|SELECT * FROM graduateProgram
|CROSS JOIN person
|""".stripMargin
spark.sql(crossJoinQuery).show(truncate = false)
```



## Challenges when using Joins
### Joins on Complex Types
```scala=
joinType = "inner"
person
.withColumnRenamed("id", "personId")
.join(sparkStatus, expr("array_contains(spark_status, id)"))
.show(truncate = false)
```



### Handling Duplicate Column Names
* This can occur in two distinct situations
* The join expression that you specify does not remove one key from one of the input DataFrames and the keys have the same column name
* Two columns on which you are not performing the join have the same name
```scala=
// Duplicated columns names
val gradProgramDupe = graduateProgram.withColumnRenamed("id", "graduate_program")
val joinExpr = gradProgramDupe.col("graduate_program") === person.col("graduate_program")
gradProgramDupe.show(truncate = false)
person.join(gradProgramDupe, joinExpr).show()
person.join(gradProgramDupe, joinExpr).select("graduate_program").show(truncate = false)
```




```scala=
// this will throw an error
person.join(gradProgramDupe, joinExpr).select("graduate_program").show(truncate = false)
```

#### Approach 1: different join expression
* the easiest fix is to change the join expression from a Boolean expression to a string or sequence.
→ This automatically removes one of the columns for you during the join
```scala=
person.join(gradProgramDupe, "graduate_program").show()
person.join(gradProgramDupe,"graduate_program").select("graduate_program").show()
```

#### Approach 2: dropping the column after join
* Another approach is to drop the offending column after the join
→ When doing this, we need to refer to the column via the original source DataFrame
```scala=
person.join(gradProgramDupe, joinExpr).drop(person.col("graduate_program")).show()
person.join(gradProgramDupe, joinExpr).drop(person.col("graduate_program")).select("graduate_program").show()
```

#### Approach 3: Renaming a column before the join
```scala=
val gradProgram3 = graduateProgram.withColumnRenamed("id", "grad_id")
val joinExpr2 = person.col("graduate_program") === gradProgram3.col("grad_id")
person.join(gradProgram3, joinExpr2).show()
```

## How Spark performs Joins
* Spark approaches cluster communication in two different ways during joins
1. one incurs a shuffle join, which results in an all-to-all communication
2. a broadcast join
* Why this is important
* Comprehending how Spark performs joins can mean the difference between a job that completes quickly and one that never completes at all.
* Communication Strategies
* Big table–to–big table
* When you join a big table to another big table, you end up with a shuffle join
* In a shuffle join, every node talks to every other node and they share data according to which node has a certain key or set of keys (on which you are joining)
→ these joins are expensive because the network can become congested with traffic, especially if your data is not partitioned well

* Big table–to–small table
* When the table is small enough to fit into the memory of a single worker node, with some breathing room of course, we can optimize our join.
* it can often be more efficient to use a broadcast join
→ What this means is that we will replicate our small DataFrame onto every worker node in the cluster (be it located on one machine or many)
→ what this does is prevent us from performing the all-to-all communication during the entire join process
* With the DataFrame API, we can also explicitly give the optimizer a hint that we would like to use a broadcast join by using the correct function around the small DataFram
* The SQL interface also includes the ability to provide hints to perform joins→ MAPJOIN, BROADCAST, and BROAD CASTJOIN all do the same thing and are all supported

* Little table–to–little table
* it’s usually best to let Spark decide how to join them. You can always force a broadcast join if you’re noticing strange behavior.
# Chapter 09 - Data Sources
* Spark has six "core" data sources and hundreds of external data sources written by the community
* core: CSV, JSON, Parquet, ORC, JDBC/ODBC, plain-text
* community: Cassandra, HBase, MongoDB, Redshift, XML…
* In general, Spark will fail only at job execution time rather than DataFrame definition time—even if, for example, we point to a file that does not exist. This is due to lazy evaluation
## Data Source APIs
### Reading data
* The foundation for reading data in Spark is the DataFrameReader.
→ We access this through the SparkSession via the read attribute
```scala=
spark.read
DataFrameReader.format(...).option("key", "value").schema(...).load()
```
* After we have a DataFrame reader, we specify several values:
* format: optional because by default Spark will use the Parquet format
* schema: optional if the data source provides a schema or if you intend to use schema inference.
* read mode
* Reading data from an external source naturally entails encountering malformed data, especially when working with only semi-structured data sources.
* Read modes specify what will happen when Spark does come across malformed records.
→ default is permissive
### Writing data
```scala=
dataFrame.write
dataframe.write.format("csv")
.option("mode", "OVERWRITE")
.option("dateFormat", "yyyy-MM-dd")
.option("path", "path/to/file(s)")
.save()
DataFrameWriter
.format(...)
.option(...)
.partitionBy(...)
.bucketBy(...)
.sortBy(...)
.save()
```
* Because we always need to write out some given data source, we access the DataFrameWriter on a per-DataFrame basis via the write attribute
* After we have a DataFrameWriter, we specify three values: the format, a series of options, and the save mode
* format is optional because by default Spark will use the Parquet format
* PartitionBy, bucketBy, and sortBy work only for file-based data sources
* Save modes specify what will happen if Spark finds data at the specified location (assuming all else equal)
## CSV
### reading
```scala=
spark.read.format("csv")
spark.read.format("csv")
.option("header", "true")
.option("mode", "FAILFAST")
.option("inferSchema", "true")
.load("some/path/to/file.csv")
import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}
val myManualSchema = new StructType(Array(
new StructField("DEST_COUNTRY_NAME", StringType, true),
new StructField("ORIGIN_COUNTRY_NAME", StringType, true),
new StructField("count", LongType, false)
))
spark.read.format("csv")
.option("header", "true")
.option("mode", "FAILFAST")
.schema(myManualSchema)
.load("/data/flight-data/csv/2010-summary.csv")
.show(5)
```
### writing
```scala=
val csvFile = spark.read.format("csv")
.option("header", "true")
.option("mode", "FAILFAST")
.schema(myManualSchema)
.load("/data/flight-data/csv/2010-summary.csv")
csvFile.write
.format("csv")
.mode("overwrite")
.option("sep", "\t")
.save("/tmp/my-tsv-file.tsv")
```
## JSON
* In Spark, when we refer to JSON files, we refer to line-delimited JSON files. This contrasts with files that have a large JSON object or array per file.
* The line-delimited versus multiline trade-off is controlled by a single option: multi Line. When you set this option to true, you can read an entire file as one json object and Spark will go through the work of parsing that into a DataFrame.
* Line-delimited JSON is actually a much more stable format because it allows you to append to a file with a new record (rather than having to read in an entire file and then write it out)

### reading
```scala=
spark.read.format("json")
spark
.read
.format("json")
.option("mode", "FAILFAST")
.schema(myManualSchema)
.load("/data/flight-data/json/2010-summary.json")
.show(5)
```
### writing
```scala=
csvFile
.write
.format("json")
.mode("overwrite")
.save("/tmp/my-json-file.json")
```
## Parquet
* Parquet is an open source column-oriented data store that provides a variety of storage optimizations, especially for analytics workloads.
* it provides columnar compression, which saves storage space and allows for reading individual columns instead of entire files.
* It is a file format that works exceptionally well with Apache Spark and is in fact the default file format
* with Parquet files, schema-on-read method is more powerful because the schema is built into the file itself (so no inference needed)
### read
```scala=
spark.read.format("parquet")
spark
.read
.format("parquet")
.load("/data/flight-data/parquet/2010-summary.parquet")
.show(5)
```
### write
```scala=
csvFile
.write
.format("parquet")
.mode("overwrite")
.save("/tmp/my-parquet-file.parquet")
```
## ORC
* ORC is a self-describing, type-aware columnar file format designed for Hadoop workloads
* What is the difference between ORC and Parquet? For the most part, they’re quite similar; the fundamental difference is that Parquet is further optimized for use with Spark, whereas ORC is further optimized for Hive
### read
```scala=
spark
.read
.format("orc")
.load("/data/flight-data/orc/2010-summary.orc")
.show(5)
```
### write
```scala=
csvFile
.write
.format("orc")
.mode("overwrite")
.save("/tmp/my-json-file.orc")
```
## SQL Databases
* To read and write from these databases, you need to do two things: include the Java Database Connectivity (JDBC) driver for you particular database on the spark class path, and provide the proper JAR for the driver itself.
* Options


### read
```scala=
val driver = "org.sqlite.JDBC"
val path = "/data/flight-data/jdbc/my-sqlite.db"
val url = s"jdbc:sqlite:/${path}"
val tablename = "flight_info"
val dbDataFrame = spark
.read
.format("jdbc")
.option("url", url)
.option("dbtable", tablename)
.option("driver", driver)
.option("user", "username")
.option("password","my-secret-password")
.load()
```
#### Query Pushdown
* Spark makes a best-effort attempt to filter data in the database itself before creating the DataFrame
* if we specify a filter on our DataFrame, Spark will push that filter down into the database
* Spark can’t translate all of its own functions into the functions available in the SQL database in which you’re working.
→ Therefore, sometimes you’re going to want to pass an entire query into your SQL that will return the results as a DataFrame
→ Rather than specifying a table name, you just specify a SQL query. Of course, you do need to specify this in a special way; you must wrap the query in parenthesis and rename it to something
```scala=
val pushdownQuery =
"""(SELECT DISTINCT(DEST_COUNTRY_NAME) FROM flight_info)
AS flight_info"""
val dbDataFrame = spark
.read
.format("jdbc")
.option("url", url)
.option("dbtable", pushdownQuery)
.option("driver", driver)
.load()
```
#### Reading from databases in parallel
* Spark has an underlying algorithm that can read multiple files into one partition, or conversely, read multiple partitions out of one file, depending on the file size and the “splitability” of the file type and compression
* What you can configure, as seen in the previous options, is the ability to specify a maximum number of partitions to allow you to limit how much you are reading and writing in parallel
```scala=
val dbDataFrame = spark
.read
.format("jdbc")
.option("url", url)
.option("dbtable", tablename)
.option("driver", driver)
.option("numPartitions", 10)
.load()
```
* You can explicitly push predicates down into SQL databases through the connection itself.
* This optimization allows you to control the physical location of certain data in certain partitions by specifying predicates.
* If you specify predicates that are not disjoint, you can end up with lots of duplicate rows.
```scala=
val props = new java.util.Properties
props.setProperty("driver", "org.sqlite.JDBC")
val predicates = Array(
"DEST_COUNTRY_NAME = 'Sweden' OR ORIGIN_COUNTRY_NAME = 'Sweden'",
"DEST_COUNTRY_NAME = 'Anguilla' OR ORIGIN_COUNTRY_NAME = 'Anguilla'")
spark.read.jdbc(url, tablename, predicates, props).show()
spark.read.jdbc(url, tablename, predicates, props).rdd.getNumPartitions
```
#### Partitioning based on a sliding window
```scala=
val colName = "count"
val lowerBound = 0L
val upperBound = 348113L
val numPartitions = 10
spark
.read
.jdbc(url,tablename,colName,lowerBound,upperBound,numPartitions,props)
.count()
```
### write
```scala=
val newPath = "jdbc:sqlite://tmp/my-sqlite.db"
csvFile.write.mode("overwrite").jdbc(newPath, tablename, props)
```
## Text files
* Each line in the file becomes a record in the DataFrame
### read
* simply specify the type to be textFile.
* With textFile, partitioned directory names are ignored.
* To read and write text files according to partitions, you should use text, which respects partitioning on reading and writing
```scala=
spark
.read
.textFile("/data/flight-data/csv/2010-summary.csv")
.selectExpr("split(value, ',') as rows")
.show()
```
### write
* When you write a text file, you need to be sure to have only one string column; otherwise, the write will fail
```scala=
csvFile
.select("DEST_COUNTRY_NAME")
.write
.text("/tmp/simple-text-file.txt")
```
## Advanced I/O concepts
* How you store your data is of immense consequence when it comes to making your Spark jobs run smoothly
* Partitioning is a tool that allows you to control what data is stored (and where) as you write it
* When you write a file to a partitioned directory (or table), you basically encode a column as a folder. What this allows you to do is skip lots of data when you go to read it in later, allowing you to read in only the data relevant to your problem instead of having to scan the complete datase
* Bucketing is another file organization approach with which you can control the data that is specifically written to each file

→ avoid expensive shuffles when joining or aggregating
* When you’re writing lots of small files, there’s a significant metadata overhead that you incur managing all of those files. Spark especially does not do well with small files, although many file systems (like HDFS) don’t handle lots of small files well, either
→ You can use the maxRecordsPerFile option and specify a number of your choosing
## Additional Resources
* [Narrow v.s Wide transformation & PartitionBy v.s BucketBy](https://www.newsletter.swirlai.com/p/sai-26-partitioning-and-bucketing)
# Chapter 10 - Spark SQL
* Spark SQL is intended to operate as an online analytic processing (OLAP) database, not an online transaction processing (OLTP) database.
→ This means that it is not intended to perform extremely low-latency queries.
* The Hive metastore is the way in which Hive maintains table information for use across sessions.
→ Spark SQL has a great relationship with Hive because it can connect to Hive metastores.
* The highest level abstraction in Spark SQL is the Catalog.
→ the Catalog is an abstraction for the storage of metadata about the data stored in your tables as well as other helpful things like databases, tables, functions, and views.
→ The catalog is available in the org.apache.spark.sql.catalog.Catalog package and contains a number of helpful functions for doing things like listing tables, databases, and functions.
* The core difference between tables and DataFrames is this: you define DataFrames in the scope of a programming language, whereas you define tables within a database.
* One important note is the concept of managed versus unmanaged tables.
→ Tables store two important pieces of information. The data within the tables as well as the data about the tables; that is, the metadata.
* When you define a table from files on disk, you are defining an unmanaged table.
* When you use saveAsTable on a DataFrame, you are creating a managed table for which Spark will track of all of the relevant information. you will also notice that this writes to the default Hive warehouse location
* creating tables
* You can create tables from a variety of sources
* Spark lets you create one on the fly. You can even specify all sorts of sophisticated options when you read in a file
* These tables will be available in Spark even through sessions
* creating views
* To an end user, views are displayed as tables, except rather than rewriting all of the data to a new location, they simply perform a transformation on the source data at query time.
* temporary views that are available only during the current session and are not registered to a database
* Global temp views are resolved regardless of database and are viewable across the entire Spark application, but they are removed at the end of the session
* Databases
* Databases are a tool for organizing tables. As mentioned earlier, if you do not define one, Spark will use the default database→ The encoder maps the domain-specific type T to Spark’s internal type system.
# Chapter 11 - Datasets
* To efficiently support domain-specific objects, a special concept called an “Encoder” is required
→ The encoder maps the domain-specific type T to Spark’s internal type system
* For example, given a class Person with two fields, name (string) and age (int), an encoder directs Spark to generate code at runtime to serialize the Person object into a binary structure.
* When you use the Dataset API, for every row it touches, this domain specifies type, Spark converts the Spark Row format to the object you specified (a case class or Java class).
* This conversion slows down your operations but can provide more flexibility.
→ why should I use them at all? If we had to condense this down into a canonical list, here are a couple of reasons
1. When the operation(s) you would like to perform cannot be expressed using DataFrame manipulations
2. When you want or need type-safety, and you’re willing to accept the cost of performance to achieve it
3. Another potential time for which you might want to use Datasets is when you would like to reuse a variety of transformations of entire rows between single-node workloads and Spark workloads.
* Probably the most popular use case is to use DataFrames and Datasets in tandem, manually trading off between performance and type safety when it is most relevant for your workload.
* creating datasets
* To create Datasets in Scala, you define a Scala case class.
→ A case class is a regular class that has the following characteristics:
1. Immutable
2. Decomposable through pattern matching
3. Allows for comparison based on structure instead of reference
4. Easy to use and manipulate
* transformations on dataset
* Transformations on Datasets are the same as those that we saw on DataFrames
* Datasets also provide a more sophisticated method, the joinWith method. joinWith is roughly equal to a co-group (in RDD terminology) and you basically end up with two nested Datasets inside of one.
→ uery these as a Dataset or a DataFrame with complex types
* Of course, a “regular” join would work quite well, too, although you’ll notice in this case that we end up with a DataFrame (and thus lose our JVM type information).
→ We can always define another Dataset to gain this back.
* Grouping and aggregations follow the same fundamental standards that we saw in the previous aggregation chapter, so groupBy rollup and cube still apply, but these return DataFrames instead of Datasets (lose type information)
* Grouping and aggregations follow the same fundamental standards that we saw in the previous aggregation chapter, so groupBy rollup and cube still apply, but these return DataFrames instead of Datasets (you lose type information)
* An excellent example is the groupByKey method. This allows you to group by a specific key in the Dataset and get a typed Dataset in return→ This function, however, doesn’t accept a specific column name but rather a function. This makes it possible for you to specify more sophisticated grouping functions that are much more akin to something like:
```scala=
flights.groupByKey(x => x.DEST_COUNTRY_NAME).count()
```
* This should motivate using Datasets only with user-defined encoding surgically and only where it makes sense.
→ This might be at the beginning of a big data pipeline or at the end of one.
```scala=
// filtering
val flightSample = flights.filter(flightRow => originIsDest(flightRow)).first
println(flightSample)
// mapping
val destinations = flights.map(row => row.DEST_COUNTRY_NAME)
destinations.show(numRows = 5)
//joining
val flightMeta = spark.range(500).map(x => (x, scala.util.Random.nextLong))
.withColumnRenamed("_1", "count")
.withColumnRenamed("_2", "randomData")
.as[FlightMetadata]
val flights2 = flights
.joinWith(flightMeta, flights.col("count") === flightMeta.col("count"))
// complex types from joinWith
flights2.show(2)
flights2.selectExpr("_1.DEST_COUNTRY_NAME", "_2.randomData").show(2)
```
# Chapter 20 - Stream Processing Fundamentals
* Stream processing is the act of continuously incorporating new data to compute a result.
→ In stream processing, the input data is unbounded and has no predetermined beginning or end.
→ in batch processing, the computation runs on a fixed-input dataset
* common cases depict why users might want to use streaming
1. notification and alerting
* Given some series of events, a notification or alert should be triggered if some sort of event or series of events occurs.
2. real-time reporting
* Many organizations use streaming systems to run real-time dashboards that any employee can look at.
3. incremental ETL
* One of the most common streaming applications is to reduce the latency companies must endure while retreiving information into a data warehouse
* Using Structured Streaming, these jobs can incorporate new data within seconds, enabling users to query it faster downstream
* it is critical that data is processed exactly once and in a fault-tolerant manner: we don’t want to lose any input data before it makes it to the warehouse, and we don’t want to load the same data twice
4. update date to serve in real time
* Streaming systems are frequently used to compute data that gets served interactively by another application.
5. real-time decision making
* Real-time decision making on a streaming system involves analyzing new inputs and responding to them automatically using business logic
6. online machine learning
* A close derivative of the real-time decision-making use case is online machine learning. In this scenario, you might want to train a model on a combination of streaming and historical data from multiple users.
* advantages of streaming
* stream processing enables lower latency
* stream processing can also be more efficient in updating a result than repeated batch jobs, because it automatically incrementalizes the computation
* challenges of streaming
* input records might arrive to our application out-of-order: due to delays and retransmissions
* Processing out-of-order data based on application timestamps (also called event time)
* Maintaining large amounts of state
* Supporting high-data throughput
* Processing each event exactly once despite machine failures
* Handling load imbalance and stragglers
* Responding to events at low latency
* Joining with external data in other storage systems
* Determining how to update output sinks as new events arrive
* Writing data transactionally to output systems
* Updating your application’s business logic at runtime
* stream processing design points
* Record-at-a-Time Versus Declarative APIs
* Record-at-a-Time: pass each event to the application and let it react using custom code
* declarative APIs: application specifies what to compute but not how to compute it in response to each new event and how to recover from failure.
* Event Time Versus Processing Time
* Event time: the idea of processing data based on timestamps inserted into each record at the source
* Processing time: the time when the record is received at the streaming application
* If your application collects data from remote sources that may be delayed, such as mobile phones or IoT devices, event-time processing is crucial
* if your application only processes local events (e.g., ones generated in the same data center), you may not need sophisticated event-time processing.
* if your application only processes local events (e.g., ones generated in the same data center), you may not need sophisticated event-time processing.
* Continuous Versus Micro-Batch Execution
* In continuous processing-based systems, each node in the system is continually listening to messages from other nodes and outputting new updates to its child nodes
→ Continuous processing has the advantage of offering the lowest possible latency when the total input rate is relatively low, because each node responds immediately to a new message. However, continuous processing systems generally have lower maximum throughput
* micro-batch systems wait to accumulate small batches of input data (say, 500 ms’ worth)
→ Micro-batch systems can often achieve high throughput per node because they leverage the same optimizations as batch systems (e.g., vectorized processing), and do not incur any extra per-record overhead→ icro-batch systems can also use dynamic load balancing techniques to handle changing workloads (e.g., increasing or decreasing the number of tasks)
→ The downside, however, is a higher base latency due to waiting to accumulate a micro-batch
* When choosing between these two execution modes, the main factors you should keep in mind are your desired latency and total cost of operation (TCO)
* Micro-batch systems can comfortably deliver latencies from 100 ms to a second, depending on the application. Within this regime, they will generally require fewer nodes to achieve the same throughput, and hence lower operational cost (including lower maintenance cost due to less frequent node failures).
* Spark includes two streaming APIs, as we discussed at the beginning of this chapter.
* The earlier DStream API in Spark Streaming is purely micro-batch oriented. It has a declarative (functional-based) API but no support for event time.
* The newer Structured Streaming API adds higher-level optimizations, event time, and support for continuous processing.
* Structured Streaming is also designed to make it easy to build end-to-end continuous applications using Apache Spark that combine streaming, batch, and interactive queries
* Structured Streaming does not use a separate API from DataFrames: you simply write a normal DataFrame (or SQL) computation and launch it on a stream
# Chapter 21 - Stream Processing Fundamentals
* The main idea behind Structured Streaming is to treat a stream of data as a table to which data is continuously appended
* Core concepts
* transformations and actions
* The transformations available in Structured Streaming are, with a few restrictions, the exact same transformations that we saw in batch
* input sources
* kafka
* files on HDFS or S3
* socket source
* sink (destination)
* kafka
* almost any file format
* output modes
* append: only add new records
* update: update changed records
* complete: rewrite full output
* triggers
* define when data is output
* event-time processing
* processing data based on timestamps included in thr record that may arrive out of orders
* event-time data: time fields taht are embedded in data→ process data according to the time reocrd was generated
* watermarks: a feature of streaming that allow users to specify how late they expect to see data in event time→ setting watermarks to limit how long they need to remember old data.
* structured streaming in action
* Structured Streaming does not let you perform schema inference without explicitly enabling it.
→ You can enable schema inference for this by setting the configuration spark.sql.streaming.schemaInference to true.
```scala=
val spark = SparkSessionProvider.spark
spark.conf.set("spark.sql.shuffle.partitions", "4")
import spark.implicits._
val staticDf = spark.read.json(
"/Users/user/Downloads/Spark-The-Definitive-Guide-master/data/activity-data")
val staticDfSchema = staticDf.schema
staticDf.printSchema()
val streamingDf = spark.readStream
.schema(staticDfSchema)
.option("maxFilesPerTrigger", 1)
.json("/Users/user/Downloads/Spark-The-Definitive-Guide-master/data/activity-data")
val activityCounts = streamingDf.groupBy("gt").count()
val activityQuery = activityCounts.writeStream
.queryName("activity_counts")
.format("memory")
.outputMode("complete")
.start()
for (i <- 1 to 10) {
spark.sql("select * from activity_counts").show
Thread.sleep(1000)
}
activityQuery.awaitTermination()
```
* transformations on streams
* selections and filtering
```scala=
val simpleTransform = streamingDf
.withColumn("stairs", expr("gt like '%stairs%'"))
.where("stairs")
.where("gt is not null")
.select("gt", "model", "arrival_time", "creation_time")
.writeStream
.queryName("simple_transform")
.format("memory")
.outputMode("append")
.start()
simpleTransform.awaitTermination()
```
* aggregations
```scala=
val deviceModelStats = streamingDf
.cube("gt", "model")
.avg()
.drop("avg(Arrival_Time)")
.drop("avg(Creation_Time)")
.drop("avg(Index)")
.writeStream
.queryName("device_counts")
.format("memory")
.outputMode("complete")
.start()
for (i <- 1 to 10) {
spark.sql("select * from device_counts").show
Thread.sleep(1000)
}
deviceModelStats.awaitTermination()
```
* joins
```scala=
val historicalAgg = staticDf.groupBy("gt", "model").avg()
val deviceModelStats = streamingDf
.drop("Arrival_Time", "Creation_Time", "Index")
.cube("gt", "model")
.avg()
.join(historicalAgg, Seq("gt", "model"))
.writeStream
.queryName("device_counts")
.format("memory")
.outputMode("complete")
.start()
for (i <- 1 to 10) {
spark.sql("select * from device_counts").show
Thread.sleep(1000)
}
deviceModelStats.awaitTermination()
```
* Source and Sinks
* File
* The only difference between using the file source/sink and Spark’s static file source is that with streaming, we can control the number of files that we read in during each trigger via the maxFilesPerTrigger option that we saw earlier.
* Keep in mind that any files you add into an input directory for a streaming job need to appear in it atomically. Otherwise, Spark will process partially written files before you have finished.
* On file systems that show partial writes, such as local files or HDFS, this is best done by writing the file in an external directory and moving it into the input directory when finished. On Amazon S3, objects normally only appear once fully written.
* Kafka
* read from Kafka
```scala=
val ds1 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
// Subscribe to multiple topics
val ds2 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.load()
// Subscribe to a pattern of topics
val ds3 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.load()
```
* write to kafka
```scala=
ds1
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("checkpointLocation", "/to/HDFS-compatible/dir")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.start()
ds1
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("checkpointLocation", "/to/HDFS-compatible/dir")
.option("topic", "topic1")
.start()
```
* foreach
* This operation allows arbitrary operations to be computed on a per-partition basis, in parallel
* To use the foreach sink, you must implement the ForeachWriter interface
* The writer must be Serializable, as it were a UDF or a Dataset map function.
* The three methods (open, process, close) will be called on each executor.
* The writer must do all its initialization, like opening connections or starting transactions only in the open method.
→ A common source of errors is that if initialization occurs outside of the open method (say in the class that you’re using), that happens on the driver instead of the executor
* Because the Foreach sink runs arbitrary user code, one key issue you must consider when using it is fault tolerance→ Therefore, the API provides some additional parameters to help you achieve exactly-once processing.
* First, the open call on your ForeachWriter receives two parameters that uniquely identify the set of rows that need to be acted on.
* The version parameter is a monotonically increasing ID that increases on a per-trigger basis, and partitionId is the ID of the partition of the output in your task.
* Source and sinks for testing
* socket
* console
* memory
* Output mode: the result Dataset will look when it gets to sink
* categories
* append:
* default
* each row is output once and only once
* complete
* output the enrite state of the resultto output sink
* update
* only rows that are different from previous write are written out
* Structured Streaming limits your use of each mode to queries where it makes sense.
* Trigger: when data is output
* processing time trigger
```scala=
activityCounts.writeStream
.trigger(Trigger.ProcessingTime("100 seconds"))
.format("console")
.outputMode("complete")
.start()
```
* specify duratuib as a string
* The ProcessingTime trigger will wait for multiples of the given duration in order to output data. For example, with a trigger duration of one minute, the trigger will fire at 12:00, 12:01, 12:02, and so on.
→ If a trigger time is missed because the previous processing has not yet completed, then Spark will wait until the next trigger point (i.e., the next minute), rather than firing immediately after the previous processing completes
* once trigger
```scala=
activityCounts.writeStream
.trigger(Trigger.AvailableNow())
.format("console")
.outputMode("complete")
.start()
```
* During production, the Once trigger can be used to run your job manually at a low rate (e.g., import new data into a summary table just occasionally).
→ Because Structured Streaming still fully tracks all the input files processed and the state of the computation, this is easier than writing your own custom logic to track this in a batch job
# Chapter 22 - Event-time and Stateful Processing
* in stream-processing systems there are effectively two relevant times for each event
* the time at which it actually occurred (event time)
* the time that it was processed or reached the stream-processing system (processing time)
* the order of the series of events in the processing system does not guarantee an ordering in event time.
→ we hope to compare events based on the time at which those events occurred
* event-time
* Event time is the time that is embedded in the data itself
* This is important to use because it provides a more robust way of comparing events against one another.
* The challenge here is that event data can be late or out of order. This means that the stream processing system must be able to handle out-of-order or late data.
* processing-time
* Processing time is the time at which the stream-processing system actually receives data
* stateful processing
* stateful processing is only necessary when you need to use or update intermediate information (state) over longer periods of time (in either a microbatch or a record-at-a-time approach)
→ This can happen when you are using event time or when you are performing an aggregation on a key, whether that involves event time or not.
* there are times when you need fine-grained control over what state should be stored, how it is updated, and when it should be removed, either explicitly or via a time-out.
→ this is called arbitrary (or custom) stateful processing and Spark allows you to essentially store whatever information you like over the course of the processing of a stream
* event-time basics
* The first step in event-time analysis is to convert the timestamp column into the proper Spark SQL timestamp type
* windows
* tumbling windows

* sliding windows: decouple the window from the starting time of the window

* handling late data with Watermark
* Spark is going to need to store that intermediate data forever because users never specified a watermark, or a time at which we don’t expect to see any more data.→ Users must specify this watermark in order to age-out data in the stream (and, therefore, state) so that we don’t overwhelm the system over a long period of time.
* Concretely, a watermark is an amount of time following a given event or set of events after which we do not expect to see any more data from that time.
```scala=
val staticDf = spark.read.json(
"/Users/raymond/Downloads/Spark-The-Definitive-Guide-master/data/activity-data")
val streamingDf = spark.readStream
.schema(staticDf.schema)
.option("maxFilesPerTrigger", 10)
.json("/Users/raymond/Downloads/Spark-The-Definitive-Guide-master/data/activity-data")
streamingDf.printSchema
val withEventTime = streamingDf.selectExpr(
"*",
"cast(cast(Creation_Time as double)/1000000000 as timestamp) as event_time"
)
// create a window not overlapping for every 10 minutes
withEventTime
.groupBy(window(col("event_time"), "10 minutes"))
.count()
.writeStream
.queryName("event_per_window")
.format("memory")
.outputMode("complete")
.start()
spark.sql("select * from event_per_window").printSchema()
// create a window overlapping for every 5 minutes and each window contains 10 minutes
withEventTime
.groupBy(window(col("event_time"), "10 minutes", "5 minutes"))
.count()
.writeStream
.queryName("event_per_window2")
.format("memory")
.outputMode("complete")
.start()
spark.sql("select * from event_per_window2").printSchema()
withEventTime
.withWatermark("event_time", "1 hour")
.groupBy(window(col("event_time"), "10 minutes", "5 minutes"))
.count()
.writeStream
.queryName("event_per_window3")
.format("memory")
.outputMode("complete")
.start()
spark.sql("select * from event_per_window3").printSchema()
withEventTime
.withWatermark("event_time", "1 hour")
.dropDuplicates("User", "event_time")
.groupBy("User")
.count()
.writeStream
.queryName("event_per_window4")
.format("memory")
.outputMode("complete")
.start()
spark.sql("select * from event_per_window4").printSchema()
```