# 5-2 Resilient Distributed Datasets - A Fault-Tolerant Abstraction for In-Memory Cluster Computing
## 1. Introduction
Cluster computing frameworks
* 廣泛被用於大規模數據分析
* 讓使用者可以使用一組高階的 operators (指令)來寫 parallel 計算 program,不需要擔心 work distribution 和 fault tolerance
* 缺乏分佈式記憶體的支援
* 在處理需要在多個運算中 reuse intermediate result 的應用( iterative machine learning、graph algorithms、interactive data mining )效率不高
* reuse 兩個 computation 之間的數據的唯一方法是將其寫入外部的 stable storage 系統
* 大量的 data replication、disk I/O 和 serialization,可能佔據 application 大部分執行時間
解決: 專門的 framework 來應對數據 reuse 的應用
* Pregel 、 HaLoop
* 僅支持特定的計算模式,這些模式自動的進行 data share
* abstractions for in-memory storage
* distributed shared memory、key-value stores、databases、Piccolo
* 基於 fine-grained 介面更新 mutable state
* fault-tolerant
* 數據複製到多台機器上 or 在多台機器上 進行 log 紀錄和更新
* 需要在 cluster 上複製大量數據,bandwidth 遠低於 RAM,並會產生大量的存儲成本
論文提出: Resilient Distributed Datasets(RDDs)
* 在一般性的應用中 reuse 數據,user 可以自主將 intermediate result 永久儲存在記憶體中、控制 data 的 partiton 以優化 data placement
* 提供 fault-tolerant、parallel data structures(非常適合許多平行應用,因為這些應用通常需要對多個 data 應用相同的 operator)
* 可以表達許多到目前為止被提議為獨立系統的 cluster programming models(MapReduce、DryadLINQ、SQL、Pregel、HaLoop)
* abstractions for in-memory storage
* 基於 coarse-grained transformations 的介面
* fault-tolerant
* RDD 的 partition 擁有足夠的信息來從其他 RDDs 衍生並重新計算該 partition,不需要在 cluster 上複製大量數據
* Spark = RDDs + 一個類似於 DryadLINQ 的 language 整合 programming interface + Scala programming language(Scala interpreter 互動式地查詢大型 dataset)
## 2. Resilient Distributed Datasets (RDDs)
### 2.1 RDD Abstraction
*一個 RDD 是一個 read-only、partitioned record 集合*
* transformations
* 只能透過 transformations 創建 RDDs
* 定義: deterministic operation 對 data in stable storage 或者 other RDDs
* map、filter、join
* 一個 RDD 擁有足夠的訊息來了解它是如何從其他 dataset 中衍生出來的(其 lineage),以便從 stable storage 中計算出它的 partition
* 如果一個程序在故障後無法重構一個 RDD,那麼這個程序就不能參考這個 RDD
* user 可以控制 RDDs 的兩個方面:persistence 和 partitioning
* user 可以指明將 reuse 哪些 RDDs 並為它們選擇一個 store 策略(例如,in-memory storage)
* user 可以要求根據每條 record 中的 key 將一個 RDD 的 element 在機器間 partition(對於 placement 優化很有用,例如確保將要聯合的兩個 dataset 以相同的方式進行 hash partition)
### 2.2 Spark Programming Interface
Spark 透過一個整合語言的 API 展示出 RDDs
* 類似於 DryadLINQ、FlumeJava
* 每個 data set 都被表示為一個 object、透過這些 obiect 的 method 調用來觸發 transformations
programmer 設計程式過程
1. 通過對 stable storage 中的數據進行 transformations 來定義一個或多個 RDDs
2. 在 *actions* 中使用這些 RDDs,來向應用程式返回一個值/將數據導出到 storage system
* action
* operations
* example: count(返回 dataset 中的 element 數量)、collect(返回element 本身)、 save(將 dataset 輸出到 storage system)
* Spark 第一次在 action 中使用 RDDs 時會 lazily computes RDDs(只有實際呼叫到 action 時才會進行計算 RDDs),以便能夠進行 pipeline transformations
persist method
* user 用來指明希望在未來操作中 reuse 哪些 RDDs
* persist 的 flags: 請求其他 persistence 策略,如僅將 RDD 存儲在 disk 上或在機器間複製
* Spark 預設將 persistent RDDs 保留在記憶體中,但如果 RAM 不足,可以將它們溢出到 disk
* persistence priorit: 每個 RDD 持有,以指定哪些記憶體中的數據應該首先被溢出到 disk
#### 2.2.1 Example: Console Log Mining
假設一個網路服務遇到 error,操作員想要在 Hadoop filesystem(HDFS)中搜尋 log 以找到原因,使用 Spark,可以僅將 log 中的 error message 加載到一組節點的 RAM 中並進行 interactively query:
1. ```lines = spark.textFile("hdfs://...")```
* 定義了一個由 HDFS 文件支持的 RDD(作為 text line 的集合)
2. ```errors = lines.filter(_.startsWith("ERROR"))```
* 衍生出一個過濾後的 RDD
3. ```errors.persist()```
* 要求將 *error* persist 於記憶體中,以便在不同查詢間共享
4. ```errors.count()```
* action 中使用 RDD(計算訊息的數量)
對 RDD 進行進一步的 transformations 並使用其結果:
```// Count errors mentioning MySQL:
errors.filter(_.contains("MySQL")).count()
// Return the time fields of errors mentioning
// HDFS as an array (assuming time is field
// number 3 in a tab-separated format):
errors.filter(_.contains("HDFS"))
.map(_.split(’\t’)(3))
.collect()
```
* 在執行 *collect* 之前應用了 *filter* 和 *map*
* Spark 的 scheduler 將對後兩個 transformations 進行 pipeline 處理,並發送一組任務到持有 errors cached partitions 的節點上計算
* 如果某個 errors partitions 丟失,Spark 會通過只對相應的 lines partitions filter 來重建

### 2.3 Advantages of the RDD Model
distributed shared memory(DSM)
* 可以讀寫 grobal address 空間中的任意位置(fine-grained)
* 傳統系統
RDDs vs DSM

* write
* RDDs
* coarse-grained transformations
* 只適用於執行批量寫入的應用程序
* DSM
* 允許對每個記憶體位置進行讀寫
* fault tolerance
* RDDs
* 較好
* lineage 恢復(不需要承擔 checkpointing 的 overhead)
* 只有在 fail 時 lost 的 RDD 需要重新計算,可以在不同節點上並行重新計算,無需回滾整個程序
* DSM
* 較差
* checkpointing 恢復
* roll back 整個程序
* 減緩 slow nodes 影響
* RDDs
* immutable 性質
* 夠通過運行 slow 任務的 backup 副本來減輕 slow node 的影響
* DSM
* 困難
* 實現 back 任務時,兩個 task 副本會訪問相同的記憶體位置並互相干擾更新
* 調度任務
* RDDs
* 可以根據數據的位置來調度任務,以提高性能
* RAM 不夠時
* RDDs
* 只要是 scan-based 的操作,無法用於 RAM 的 partition 可以存儲在 disk 上,並將提供與當前數據並行系統類似的性能
### 2.4 Applications Not Suitable for RDDs
* RDDs 最適合於那些對 dataset 中所有元素應用相同操作的 batch 應用
* 不適用於 asynchronous fine-grained 更新 shared 狀態的應用
## 3. Spark Programming Interface
Spark 使用在 Scala 中的語言整合 API 提供 RDD 抽象
* 是一種針對 Java VM 的 statically functional programming language
* 結合了 conciseness(用於 interactive)和 efficiency(因為 static type)
* 將每個 closure(transform 的參數) 表示為一個 Java object
* object 被序列化並在另一個節點上加載,以便通過網絡傳遞 closure
* 將 closure 中綁定的任何變數保存為 object 中的 field
* 例如 ```var x = 5; rdd.map(_ + x)```,將 5 加到 RDD 的每個元素上
* closure objects 有使用 reflection 的問題
* 需要更多的部分來使 Spark 能夠被 Scala interpreter 使用
* RDDs 本身是由元素類型參數化的 static type object
* 例如,RDD[Int] 是一個整數的 RDD
Spark 使用

* driver program
* 該程序連接到一群 *workers*(long-lived processes,操作過程中將 RDD partition 存儲在 RAM 中)
* 定義一個或多個 RDDs 並在 actions 中使用
* 追蹤 RDDs 的血統
### 3.1 RDD Operations in Spark

列出了 Spark 中可用的主要 RDD 轉換和操作
* 方括號中顯示類型參數
* Partitioner class
* 代表 RDD 的 partition 順序
* 可以對另一個 dataset 進行 partition(groupByKey、reduceByKey、sort 這樣的操作自動導致 hash 或範圍 partition 的 RDD)
### 3.2 Example Applications
#### 3.2.1 Logistic Regression
梯度下降:從隨機值開始設定 w,並在每次迭代中,對數據求和一個關於 w 的函數,以將 w 向改進的方向移動。
```
val points = spark.textFile(...)
.map(parsePoint).persist()
var w = // random initial vector
for (i <- 1 to ITERATIONS) {
val gradient = points.map{ p =>
p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y
}.reduce((a,b) => a+b)
w -= gradient
}
```
1. 通過對 text file 進行 ```map``` transformarion,將每一行解析為 Point object,定義出了一個名為 points 的 persist RDD
2. 反覆對 points 執行 map 和 reduce
#### 3.2.2 PageRank
通過加總從指向自己的 file 所收到的貢獻來更新每個 file 的 *rank*
在每次 iterative 中
1. 每個 file 都向其鄰居發送一個貢獻 r/n(r: 排名,n: 鄰居數量)
2. 將其排名更新為 α/N + (1 − α)∑ci(求和: 收到的貢獻,N: file 的總數)
```
val links = spark.textFile(...).map(...).persist()
var ranks = // RDD of (URL, rank) pairs
for (i <- 1 to ITERATIONS) {
// Build an RDD of (targetURL, float) pairs
// with the contributions sent by each page
val contribs = links.join(ranks).flatMap {
(url, (links, rank)) =>
links.map(dest => (dest, rank/links.size))
}
// Sum contributions by URL and get new ranks
ranks = contribs.reduceByKey((x,y) => x+y)
.mapValues(sum => a/N + (1-a)*sum)
}
```

在每次 iteration 中,根據前一次的 contribe 和 rank 以及 static links dataset 創建一個新的 rank dataset
* 需要可靠地複製某些版本的 ranks 以減少故障恢復時間: 使用帶有 RELIABLE flag 的 persist
* link dataset 不需要被複製,因為可以通過重新運行輸入文件的區塊上的 map 高效地重建它的 partition
* 通過控制 RDDs 的 partition 來優化 PageRank 中的通信
* 為 links 指定一個 patition(例如,按 URL 在節點之間的 link list 進行 hash patition),並以同樣的方式分區 ranks,確保 link 和 rank 之間的 join 操作不需要通信(因為每個 URL 的 rank 將與其 link list 位於同一台機器上)
* 自定義的 Partitioner class 來將相互 link 的頁面分組在一起(例如,按域名 patition URLs)
* 這兩種優化在定義 links 時調用 partitionBy 來表達
```
links = spark.textFile(...).map(...)
.partitionBy(myPartFunc).persist()
```
在 initial call 之後,link 和 rank 之間的 join 將自動將每個 URL 的貢獻匯總到其 link list 所在的機器上,在那裡計算其新的 rank,然後將 rank 與其 link 結合
## 4. Representing RDDs
RDDs 的表示方式必須要能夠在非常多的 transforms 種類中追蹤血統 -> graph-based 且不需要對每個操作增加額外的功能
RDD 由4個部分組成:
* partitions: atomic pieces of the dataset
* dependencies: on parent RDDs
* function: computing the dataset based on its parents
* metadata: about RDD's partitioning scheme and data placement

dependencies
* narrow dependencies
* parent RDD 的每個 partition 最多由 child RDD 的一個 partition 使用
* 得出 child partition: 允許在一個節點上進行pipelined execution,可以計算所有的 parent partitions
* 節點故障後的恢復: 只需要重新計算丟失的 parent partitions,且這些 partitions 可以在不同節點上並行重新計算
* wide dependencies
* 多個 child partition 可能依賴於一個 parent partition
* 得出 child partition: 需要所有 parent partitions 的數據都可用,並且需要通過類似 MapReduce 的操作在節點間進行數據洗牌
* 節點故障後的恢復: 單一節點的故障可能丟失某些來自多個 ancestors RDD 的partitions,需要完全重新執行

RDD implementations Example:
* HDFS files
* partitions: 為 file 的每一個 block 返回一個 partition,每個 partition object 中存儲了該 block 的偏移量
* preferredLocations: 提供存放指定的 block 的節點信息
* iterator: 用來讀取這些 block 的數據
* map: 任何 RDD 調用 map 會返回一個 MappedRDD object
* partitions、preferredLocations: 擁有與其 parent RDD 相同的 partition 和 prefered 位置
* iterator: 對 parent record 應用傳入的函數
* union: 對兩個 RDD 調用 union 會返回一個其 partition 是兩個父 RDD partition 聯合的 RDD
* partitions: 每個 child partition 是透過 narrow dependency on the corresponding parent 計算得出的
* sample: similar to mapping,RDD 為每個 partitio 存儲一個隨機 number generator seed 以確定性地抽樣 parent records
* join: 將兩個 RDD 進行 join 可能導致兩個 narrow dependencies(如果它們都使用相同的 partitioner 進行 hash/range partitioned),兩個 wide dependencies,或者混合(如果一個 parent RDD 有 partitioner 而另一個沒有)
* partitioner: 輸出的 RDD 都有一個(繼承自 parent RDD/是默認的 hash partitioner)
## 5. Implementation
### 5.1 job scheduling
scheduler
* job: 為了要能夠執行 action 在 RDD 上,scheduler 會建出一個 RDD 的 lineage 圖(一個 DAG *stages*) -> assign task

* stage
* 一個 stage 包含透過 narrow dependencies 所能夠找出最多的 pipelined transformation
* stage 透過 wide dependencies 或能縮短 parent RDD 的計算的已計算 partition
* store
* narrow dependencies 會將 RDD 的 partition 存在計算出該 partition node 的 memory
* wide dependencies 會將 RDD 的中間計算紀錄放在 parent RDD 所在的 node memory 上
* lanch task
* 當要找出 target RDD 時,scheduler 會發出 task 來計算每個 stage 的 missing partition
* assign task: 透過 data localit (delay scheduling)
* partition available in memory on a node
* a node is partition's preferred locations
* 當 task 呼叫 lookup operation(透過 key 來隨機的使用 element in hash-partitioned RDDs) 時,task 會告訴 schedular 計算它需要的 partition
* fail
* task fail: 只要 stage 的 parent 是可用的,會在其他 node 上重新執行
* stage fail: resubmit task
* scheduler fail: 不允許
### 5.2 Interpreter Integration
interactive shell: 因為使用 in-memory 的低延遲,可以用來提供 interpreter 來 interactively 查詢大型資料庫
the interpreter in Spark
* interpreter: 將 user 輸入的每一行作為一個 class -> load 到 JVM -> 產生一個包含該行的 variables 或 functions 的 object
* EX:
```=
var x = 5 ----> a class called Line1 containing x
println(x) ----> println(Line1.getInstance().x)
```
* Class shipping: 為了讓 worker 節點獲取每行創建的 class 的 bytecode,讓 interpreter 通過 HTTP 服務這些 class
* Modified code generation: 當使用的前行定義的變數時,worker 節點不會接收到該變數(因為 static method for object),所以將行的生成 code 邏輯修正

### 5.3 Memory Management
Spark 提供3種儲存持久 RDD 方案
* 將數據作為反序列化的 Java object 存儲在 memory
* fastest performance,因為 JVM 可以本地訪問每個 RDD 元素
* 數據作為序列化數據存儲在 memory
* 讓用戶在空間有限的情況下選擇一種比 Java object graphs 更節省 memory 的表示方式,但代價是 performance 降低
* 存儲在 disk
* 適用於那些過大而無法保留在 RAM 中且每次使用重新計算成本高昂的 RDDs
Spark 選擇保留在 memory 的 RDDs
* LRU
* 當新的 partition 和與選出來替換的 partition 屬於相同的 RDD,保留舊 partition,以防止同一 RDD 的 partition 反復進出(因為大多數操作將在整個 RDD 上運行任務,所以 memory 中已有的 partition 很可能在未來需要使用)
* 提供給 user 的 persistence priority
### 5.4 Support for Checkpointing
* wide dependencies: checkpointing is useful for RDDs with long lineage graphs
* narrow dependencies: 若是在 stable storage 存在 data,checkpointing 不需要
* node fail: 可以在其他 node 上平行重新計算 loss 的 RDD partition,其成本只是複製整個 RDD 的一小部分
Spark 提供了一個用於 checkpointing 的 API(一個 REPLICATE flag 來持久化),但將 checkpointing 哪些數據的決定權留給了 user
(ps: RDD 的 read only 特性使得它們比一般的 shared memory更容易進行 checkpointing)
## 6. Discussion
Expressing Existing Programming Models
* MapReduce: flatMap、groupByKey、reduce-ByKey
* DryadLINQ
* SQL
* Pregel
* Iterative MapReduce
* Batched Stream Processing
* Explaining the Expressivity of RDDs
# 5-2 Spark: Cluster Computing withWorking Sets
## 1. Introduction
目前存在 a model of cluster computing:
* 自動提供 locality-aware scheduling, fault tolerance, load balancing, scalability, fault tolerance
* 提供一種 programming model,user 創建 DAG of data stream,以通過一組 operator 傳遞輸入數據
* 不適用於在多個並行操作中 reuse 一組 working set 的應用: Iterative jobs、Interactive analytics
Spark
* RDDs 在表達性和可擴展性及可靠性之間找到了一個平衡點
* 是第一個允許使用 efficient, general-purpose programming language interactively 處理 cluster 上 large dataset 的系統
## 2. Programming Model
driver program: application 的 high-level control flow + operation parallel
Spark parallel programming
* resilient distributed datasets
* parallel operations on datasets(function to apply on a dataset)
* two types of shared variables
### 2.1 Resilient Distributed Datasets (RDDs)
In Spark, each RDD is represented by a Scala object
construct RDDs
* a file in a shared file system
* parallelizing a Scala collection
* 將 data 分成很多片並配送到多個 node 裡
* transforming an existing RDD
* flatMap(map by user function), map(map by function), filter(只選取符合條件的 element)
* changing the persistence of an existing RDD
* default RDD: 只用在使用的時候才會出現在 memory 裡
* cache action: 是一個告知使用完還要儲存在 memory 裡的 hint,當 memory 不夠時,還是會溢出到 disk
* save action: write RDD to distributed filesystem
讓 user 可以自行評估 store cost, speed of access, probability of losing, recomputing
### 2.2 Parallel Operations
* reduce: 合併 dataset element 後在 driver program 產生結果
* collect: 將所有 dataset element 發送到 driver program
* foreach: 將每個 dataset element 應用一個 user function
reduce 是利用 shuffle transformation 達成
### 2.3 Shared Variables
++使用 closures(function) 來當成參數使用 map, filter, reduce++
>當 worker 使用這些 closures
>>包含的變數會被複製一份到 worker (copy by value)
>>允許 user 創造2種 shared variable (copy by reference)
* Broadcast variables
* 運用: large read-only data
* Accumulators
* 特色: worker 只能藉由特定 operator add 且 只能由 driver 讀取
* 定義: 具有 an add operation 和 a zero value 的 type 都可以添加此 shared type
* 運用: counter
## 3. Examples
省略 variable types,因為 Scala 支持 type 推斷(動態)
### 3.1 Text Search
在 HDFS log file 中找到 error line
```
val file = spark.textFile("hdfs://...") // collection of line
val errs = file.filter(_.contains("ERROR")) // line contain errors
val ones = errs.map(_ => 1) // key: line, value: 1
val count = ones.reduce(_+_) // add up
val cachedErrs = errs.cache() // cached RDD to store errs(RDD)
```
### 3.2 Logistic Regression
```for(p <- points){body}``` = ```points.foreach(p => {body})```
### 3.3 Alternating Least Squares
預測 u 名用戶對 m 部電影的評分
>有一個部分填滿的矩陣 R,其中包含了一些用戶-電影的已知評分
>ALS 將 R = M * U,這兩個矩陣的維度分別為 m × k 和 k × u
使用已知評分算出 M 和 U,然後計算 M × U 來預測未知的評分
1. 初始化 M 為一個隨機值
2. 在給定 M 的情況下優化 U,以最小化 R 上的誤差
3. 在給定 U 的情況下優化 M,以最小化 R 上的誤差
4. 重複步驟 2 和 3 直到收斂
由於所有步驟都使用 R,將 R 設置為 Broadcast variables 就不需要在每個步驟中重新將其發送到每個節點
```
val Rb = spark.broadcast(R)
for (i <- 1 to ITERATIONS) {
U = spark.parallelize(0 until u)
.map(j => updateUser(j, Rb, M))
.collect()
M = spark.parallelize(0 until m)
.map(j => updateUser(j, Rb, U))
.collect()
}
```
## 4. Implementation
* 每個 dataset object 包含一個指向其 parent pointer,用來告知 parent RDD 如何 transform
```
val file = spark.textFile("hdfs://...")
val errs = file.filter(_.contains("ERROR"))
val ones = errs.map(_ => 1)
val count = ones.reduce(_+_)
val cachedErrs = errs.cache()
```

* RDD object interface
* ``getPartitions``: returns a list of partition IDs
* ``getIterator(partition)``: iterates over a partition
* ``getPreferredLocations(partition)``: 用於 task scheduling 來達到 data locality
* schedule
1. Spark 對每個平行的在 partition 上進行的 operator 創建 task
2. 傳送到 worker node 上,盡量以 task 的 preferred location 來傳送
3. 一旦 worker 接收到 task,呼叫 getIterator 讀取 partition
* RDDs 的不同 type 只是代表 RDD 如何實作 interface
* Hdfs-TextFile
* partition: block IDs in HDFS
* preferred location: block location
* getIterator: 打開一個 stream 來讀取 block
* MappedDataset
* partitions: 和 parent 一樣
* preferred locations: 和 parent 一樣
* getIterator: 應用 map function 到一個 parent 的 element
* CachedDataset
* preferred locations: 最初與 parent 相同,但在某個節點上 cache partition 後會更新,以優先 reuse 該節點
* getIterator: 尋找一個 transform 後 partition 的 locally cache copy
* fault easy to handle:如果一個節點失敗,其 partition 會從它們的 parent dataset 重新讀取,並最終在其他節點上 cache
* 傳送 closure 到 worker node
* 用來
* 定義 dataset
* pass to operations(ex: reduce)
* Java object: 可以透過 Java serialization 來序列化(在多個 machine 上較可以直覺計算)
* Spark 內建 closure
* 有問題: closure 會使用到不在他自己 body 的 outer scope
* 解決: 通過對 closure class 的 bytecode 進行靜態分析,檢測未使用的 variables 並將相應的字段在 closure object 中設為 null
* Shared Variables: 使用帶有自定義序列化格式的 class
* broadcast variable
* 創建一個帶有值 v 的 broadcast variable b: v 被保存到shared file system 中的一個文件中。b 的序列化形式是該文件的路徑
* 在 worker node: 查詢 b 的值,首先檢查 v 是否在 local cache 中,如果不在,則從 file system 中讀取它
* Accumulator: serialization 技巧
* 創建: 每個 accumulator 被給予 unique ID,其序列化形式包含其 ID 和對其 type 的 0 值
* 在 worker node: 為執行 task 的每個 thread 創建一個獨立的 accumulator copy 本地變數
* 在 task 開始時,重置為 0
* 在 task 開史後,worker 發送一條消息給 driver program,包含它對各個 accumulator 所做的更新
* driver program 只對每個 operation 的每個 partition 的更新應用一次,以防止因 task 因故障重新執行時發生重複計數
* Interpreter Integration
* interpreter 將它定義的 class 輸出到一個 shared filesystem 中,worker 可以使用自定義的 Java class loader 從中加載這些 class