# 5-2 Resilient Distributed Datasets - A Fault-Tolerant Abstraction for In-Memory Cluster Computing ## 1. Introduction Cluster computing frameworks * 廣泛被用於大規模數據分析 * 讓使用者可以使用一組高階的 operators (指令)來寫 parallel 計算 program,不需要擔心 work distribution 和 fault tolerance * 缺乏分佈式記憶體的支援 * 在處理需要在多個運算中 reuse intermediate result 的應用( iterative machine learning、graph algorithms、interactive data mining )效率不高 * reuse 兩個 computation 之間的數據的唯一方法是將其寫入外部的 stable storage 系統 * 大量的 data replication、disk I/O 和 serialization,可能佔據 application 大部分執行時間 解決: 專門的 framework 來應對數據 reuse 的應用 * Pregel 、 HaLoop * 僅支持特定的計算模式,這些模式自動的進行 data share * abstractions for in-memory storage * distributed shared memory、key-value stores、databases、Piccolo * 基於 fine-grained 介面更新 mutable state * fault-tolerant * 數據複製到多台機器上 or 在多台機器上 進行 log 紀錄和更新 * 需要在 cluster 上複製大量數據,bandwidth 遠低於 RAM,並會產生大量的存儲成本 論文提出: Resilient Distributed Datasets(RDDs) * 在一般性的應用中 reuse 數據,user 可以自主將 intermediate result 永久儲存在記憶體中、控制 data 的 partiton 以優化 data placement * 提供 fault-tolerant、parallel data structures(非常適合許多平行應用,因為這些應用通常需要對多個 data 應用相同的 operator) * 可以表達許多到目前為止被提議為獨立系統的 cluster programming models(MapReduce、DryadLINQ、SQL、Pregel、HaLoop) * abstractions for in-memory storage * 基於 coarse-grained transformations 的介面 * fault-tolerant * RDD 的 partition 擁有足夠的信息來從其他 RDDs 衍生並重新計算該 partition,不需要在 cluster 上複製大量數據 * Spark = RDDs + 一個類似於 DryadLINQ 的 language 整合 programming interface + Scala programming language(Scala interpreter 互動式地查詢大型 dataset) ## 2. Resilient Distributed Datasets (RDDs) ### 2.1 RDD Abstraction *一個 RDD 是一個 read-only、partitioned record 集合* * transformations * 只能透過 transformations 創建 RDDs * 定義: deterministic operation 對 data in stable storage 或者 other RDDs * map、filter、join * 一個 RDD 擁有足夠的訊息來了解它是如何從其他 dataset 中衍生出來的(其 lineage),以便從 stable storage 中計算出它的 partition * 如果一個程序在故障後無法重構一個 RDD,那麼這個程序就不能參考這個 RDD * user 可以控制 RDDs 的兩個方面:persistence 和 partitioning * user 可以指明將 reuse 哪些 RDDs 並為它們選擇一個 store 策略(例如,in-memory storage) * user 可以要求根據每條 record 中的 key 將一個 RDD 的 element 在機器間 partition(對於 placement 優化很有用,例如確保將要聯合的兩個 dataset 以相同的方式進行 hash partition) ### 2.2 Spark Programming Interface Spark 透過一個整合語言的 API 展示出 RDDs * 類似於 DryadLINQ、FlumeJava * 每個 data set 都被表示為一個 object、透過這些 obiect 的 method 調用來觸發 transformations programmer 設計程式過程 1. 通過對 stable storage 中的數據進行 transformations 來定義一個或多個 RDDs 2. 在 *actions* 中使用這些 RDDs,來向應用程式返回一個值/將數據導出到 storage system * action * operations * example: count(返回 dataset 中的 element 數量)、collect(返回element 本身)、 save(將 dataset 輸出到 storage system) * Spark 第一次在 action 中使用 RDDs 時會 lazily computes RDDs(只有實際呼叫到 action 時才會進行計算 RDDs),以便能夠進行 pipeline transformations persist method * user 用來指明希望在未來操作中 reuse 哪些 RDDs * persist 的 flags: 請求其他 persistence 策略,如僅將 RDD 存儲在 disk 上或在機器間複製 * Spark 預設將 persistent RDDs 保留在記憶體中,但如果 RAM 不足,可以將它們溢出到 disk * persistence priorit: 每個 RDD 持有,以指定哪些記憶體中的數據應該首先被溢出到 disk #### 2.2.1 Example: Console Log Mining 假設一個網路服務遇到 error,操作員想要在 Hadoop filesystem(HDFS)中搜尋 log 以找到原因,使用 Spark,可以僅將 log 中的 error message 加載到一組節點的 RAM 中並進行 interactively query: 1. ```lines = spark.textFile("hdfs://...")``` * 定義了一個由 HDFS 文件支持的 RDD(作為 text line 的集合) 2. ```errors = lines.filter(_.startsWith("ERROR"))``` * 衍生出一個過濾後的 RDD 3. ```errors.persist()``` * 要求將 *error* persist 於記憶體中,以便在不同查詢間共享 4. ```errors.count()``` * action 中使用 RDD(計算訊息的數量) 對 RDD 進行進一步的 transformations 並使用其結果: ```// Count errors mentioning MySQL: errors.filter(_.contains("MySQL")).count() // Return the time fields of errors mentioning // HDFS as an array (assuming time is field // number 3 in a tab-separated format): errors.filter(_.contains("HDFS")) .map(_.split(’\t’)(3)) .collect() ``` * 在執行 *collect* 之前應用了 *filter* 和 *map* * Spark 的 scheduler 將對後兩個 transformations 進行 pipeline 處理,並發送一組任務到持有 errors cached partitions 的節點上計算 * 如果某個 errors partitions 丟失,Spark 會通過只對相應的 lines partitions filter 來重建 ![image](https://hackmd.io/_uploads/Hy0_-jIW0.png) ### 2.3 Advantages of the RDD Model distributed shared memory(DSM) * 可以讀寫 grobal address 空間中的任意位置(fine-grained) * 傳統系統 RDDs vs DSM ![image](https://hackmd.io/_uploads/SkSIP4PW0.png) * write * RDDs * coarse-grained transformations * 只適用於執行批量寫入的應用程序 * DSM * 允許對每個記憶體位置進行讀寫 * fault tolerance * RDDs * 較好 * lineage 恢復(不需要承擔 checkpointing 的 overhead) * 只有在 fail 時 lost 的 RDD 需要重新計算,可以在不同節點上並行重新計算,無需回滾整個程序 * DSM * 較差 * checkpointing 恢復 * roll back 整個程序 * 減緩 slow nodes 影響 * RDDs * immutable 性質 * 夠通過運行 slow 任務的 backup 副本來減輕 slow node 的影響 * DSM * 困難 * 實現 back 任務時,兩個 task 副本會訪問相同的記憶體位置並互相干擾更新 * 調度任務 * RDDs * 可以根據數據的位置來調度任務,以提高性能 * RAM 不夠時 * RDDs * 只要是 scan-based 的操作,無法用於 RAM 的 partition 可以存儲在 disk 上,並將提供與當前數據並行系統類似的性能 ### 2.4 Applications Not Suitable for RDDs * RDDs 最適合於那些對 dataset 中所有元素應用相同操作的 batch 應用 * 不適用於 asynchronous fine-grained 更新 shared 狀態的應用 ## 3. Spark Programming Interface Spark 使用在 Scala 中的語言整合 API 提供 RDD 抽象 * 是一種針對 Java VM 的 statically functional programming language * 結合了 conciseness(用於 interactive)和 efficiency(因為 static type) * 將每個 closure(transform 的參數) 表示為一個 Java object * object 被序列化並在另一個節點上加載,以便通過網絡傳遞 closure * 將 closure 中綁定的任何變數保存為 object 中的 field * 例如 ```var x = 5; rdd.map(_ + x)```,將 5 加到 RDD 的每個元素上 * closure objects 有使用 reflection 的問題 * 需要更多的部分來使 Spark 能夠被 Scala interpreter 使用 * RDDs 本身是由元素類型參數化的 static type object * 例如,RDD[Int] 是一個整數的 RDD Spark 使用 ![image](https://hackmd.io/_uploads/B17ODDD-A.png =500x) * driver program * 該程序連接到一群 *workers*(long-lived processes,操作過程中將 RDD partition 存儲在 RAM 中) * 定義一個或多個 RDDs 並在 actions 中使用 * 追蹤 RDDs 的血統 ### 3.1 RDD Operations in Spark ![image](https://hackmd.io/_uploads/B15SG_v-0.png) 列出了 Spark 中可用的主要 RDD 轉換和操作 * 方括號中顯示類型參數 * Partitioner class * 代表 RDD 的 partition 順序 * 可以對另一個 dataset 進行 partition(groupByKey、reduceByKey、sort 這樣的操作自動導致 hash 或範圍 partition 的 RDD) ### 3.2 Example Applications #### 3.2.1 Logistic Regression 梯度下降:從隨機值開始設定 w,並在每次迭代中,對數據求和一個關於 w 的函數,以將 w 向改進的方向移動。 ``` val points = spark.textFile(...) .map(parsePoint).persist() var w = // random initial vector for (i <- 1 to ITERATIONS) { val gradient = points.map{ p => p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y }.reduce((a,b) => a+b) w -= gradient } ``` 1. 通過對 text file 進行 ```map``` transformarion,將每一行解析為 Point object,定義出了一個名為 points 的 persist RDD 2. 反覆對 points 執行 map 和 reduce #### 3.2.2 PageRank 通過加總從指向自己的 file 所收到的貢獻來更新每個 file 的 *rank* 在每次 iterative 中 1. 每個 file 都向其鄰居發送一個貢獻 r/n(r: 排名,n: 鄰居數量) 2. 將其排名更新為 α/N + (1 − α)∑ci(求和: 收到的貢獻,N: file 的總數) ``` val links = spark.textFile(...).map(...).persist() var ranks = // RDD of (URL, rank) pairs for (i <- 1 to ITERATIONS) { // Build an RDD of (targetURL, float) pairs // with the contributions sent by each page val contribs = links.join(ranks).flatMap { (url, (links, rank)) => links.map(dest => (dest, rank/links.size)) } // Sum contributions by URL and get new ranks ranks = contribs.reduceByKey((x,y) => x+y) .mapValues(sum => a/N + (1-a)*sum) } ``` ![image](https://hackmd.io/_uploads/BJ2XTtDWA.png) 在每次 iteration 中,根據前一次的 contribe 和 rank 以及 static links dataset 創建一個新的 rank dataset * 需要可靠地複製某些版本的 ranks 以減少故障恢復時間: 使用帶有 RELIABLE flag 的 persist * link dataset 不需要被複製,因為可以通過重新運行輸入文件的區塊上的 map 高效地重建它的 partition * 通過控制 RDDs 的 partition 來優化 PageRank 中的通信 * 為 links 指定一個 patition(例如,按 URL 在節點之間的 link list 進行 hash patition),並以同樣的方式分區 ranks,確保 link 和 rank 之間的 join 操作不需要通信(因為每個 URL 的 rank 將與其 link list 位於同一台機器上) * 自定義的 Partitioner class 來將相互 link 的頁面分組在一起(例如,按域名 patition URLs) * 這兩種優化在定義 links 時調用 partitionBy 來表達 ``` links = spark.textFile(...).map(...) .partitionBy(myPartFunc).persist() ``` 在 initial call 之後,link 和 rank 之間的 join 將自動將每個 URL 的貢獻匯總到其 link list 所在的機器上,在那裡計算其新的 rank,然後將 rank 與其 link 結合 ## 4. Representing RDDs RDDs 的表示方式必須要能夠在非常多的 transforms 種類中追蹤血統 -> graph-based 且不需要對每個操作增加額外的功能 RDD 由4個部分組成: * partitions: atomic pieces of the dataset * dependencies: on parent RDDs * function: computing the dataset based on its parents * metadata: about RDD's partitioning scheme and data placement ![image](https://hackmd.io/_uploads/HJTM0cnfR.png) dependencies * narrow dependencies * parent RDD 的每個 partition 最多由 child RDD 的一個 partition 使用 * 得出 child partition: 允許在一個節點上進行pipelined execution,可以計算所有的 parent partitions * 節點故障後的恢復: 只需要重新計算丟失的 parent partitions,且這些 partitions 可以在不同節點上並行重新計算 * wide dependencies * 多個 child partition 可能依賴於一個 parent partition * 得出 child partition: 需要所有 parent partitions 的數據都可用,並且需要通過類似 MapReduce 的操作在節點間進行數據洗牌 * 節點故障後的恢復: 單一節點的故障可能丟失某些來自多個 ancestors RDD 的partitions,需要完全重新執行 ![image](https://hackmd.io/_uploads/rJCYCqnfR.png) RDD implementations Example: * HDFS files * partitions: 為 file 的每一個 block 返回一個 partition,每個 partition object 中存儲了該 block 的偏移量 * preferredLocations: 提供存放指定的 block 的節點信息 * iterator: 用來讀取這些 block 的數據 * map: 任何 RDD 調用 map 會返回一個 MappedRDD object * partitions、preferredLocations: 擁有與其 parent RDD 相同的 partition 和 prefered 位置 * iterator: 對 parent record 應用傳入的函數 * union: 對兩個 RDD 調用 union 會返回一個其 partition 是兩個父 RDD partition 聯合的 RDD * partitions: 每個 child partition 是透過 narrow dependency on the corresponding parent 計算得出的 * sample: similar to mapping,RDD 為每個 partitio 存儲一個隨機 number generator seed 以確定性地抽樣 parent records * join: 將兩個 RDD 進行 join 可能導致兩個 narrow dependencies(如果它們都使用相同的 partitioner 進行 hash/range partitioned),兩個 wide dependencies,或者混合(如果一個 parent RDD 有 partitioner 而另一個沒有) * partitioner: 輸出的 RDD 都有一個(繼承自 parent RDD/是默認的 hash partitioner) ## 5. Implementation ### 5.1 job scheduling scheduler * job: 為了要能夠執行 action 在 RDD 上,scheduler 會建出一個 RDD 的 lineage 圖(一個 DAG *stages*) -> assign task ![image](https://hackmd.io/_uploads/HkVNJlCMC.png) * stage * 一個 stage 包含透過 narrow dependencies 所能夠找出最多的 pipelined transformation * stage 透過 wide dependencies 或能縮短 parent RDD 的計算的已計算 partition * store * narrow dependencies 會將 RDD 的 partition 存在計算出該 partition node 的 memory * wide dependencies 會將 RDD 的中間計算紀錄放在 parent RDD 所在的 node memory 上 * lanch task * 當要找出 target RDD 時,scheduler 會發出 task 來計算每個 stage 的 missing partition * assign task: 透過 data localit (delay scheduling) * partition available in memory on a node * a node is partition's preferred locations * 當 task 呼叫 lookup operation(透過 key 來隨機的使用 element in hash-partitioned RDDs) 時,task 會告訴 schedular 計算它需要的 partition * fail * task fail: 只要 stage 的 parent 是可用的,會在其他 node 上重新執行 * stage fail: resubmit task * scheduler fail: 不允許 ### 5.2 Interpreter Integration interactive shell: 因為使用 in-memory 的低延遲,可以用來提供 interpreter 來 interactively 查詢大型資料庫 the interpreter in Spark * interpreter: 將 user 輸入的每一行作為一個 class -> load 到 JVM -> 產生一個包含該行的 variables 或 functions 的 object * EX: ```= var x = 5 ----> a class called Line1 containing x println(x) ----> println(Line1.getInstance().x) ``` * Class shipping: 為了讓 worker 節點獲取每行創建的 class 的 bytecode,讓 interpreter 通過 HTTP 服務這些 class * Modified code generation: 當使用的前行定義的變數時,worker 節點不會接收到該變數(因為 static method for object),所以將行的生成 code 邏輯修正 ![image](https://hackmd.io/_uploads/By4O6xAMR.png) ### 5.3 Memory Management Spark 提供3種儲存持久 RDD 方案 * 將數據作為反序列化的 Java object 存儲在 memory * fastest performance,因為 JVM 可以本地訪問每個 RDD 元素 * 數據作為序列化數據存儲在 memory * 讓用戶在空間有限的情況下選擇一種比 Java object graphs 更節省 memory 的表示方式,但代價是 performance 降低 * 存儲在 disk * 適用於那些過大而無法保留在 RAM 中且每次使用重新計算成本高昂的 RDDs Spark 選擇保留在 memory 的 RDDs * LRU * 當新的 partition 和與選出來替換的 partition 屬於相同的 RDD,保留舊 partition,以防止同一 RDD 的 partition 反復進出(因為大多數操作將在整個 RDD 上運行任務,所以 memory 中已有的 partition 很可能在未來需要使用) * 提供給 user 的 persistence priority ### 5.4 Support for Checkpointing * wide dependencies: checkpointing is useful for RDDs with long lineage graphs * narrow dependencies: 若是在 stable storage 存在 data,checkpointing 不需要 * node fail: 可以在其他 node 上平行重新計算 loss 的 RDD partition,其成本只是複製整個 RDD 的一小部分 Spark 提供了一個用於 checkpointing 的 API(一個 REPLICATE flag 來持久化),但將 checkpointing 哪些數據的決定權留給了 user (ps: RDD 的 read only 特性使得它們比一般的 shared memory更容易進行 checkpointing) ## 6. Discussion Expressing Existing Programming Models * MapReduce: flatMap、groupByKey、reduce-ByKey * DryadLINQ * SQL * Pregel * Iterative MapReduce * Batched Stream Processing * Explaining the Expressivity of RDDs # 5-2 Spark: Cluster Computing withWorking Sets ## 1. Introduction 目前存在 a model of cluster computing: * 自動提供 locality-aware scheduling, fault tolerance, load balancing, scalability, fault tolerance * 提供一種 programming model,user 創建 DAG of data stream,以通過一組 operator 傳遞輸入數據 * 不適用於在多個並行操作中 reuse 一組 working set 的應用: Iterative jobs、Interactive analytics Spark * RDDs 在表達性和可擴展性及可靠性之間找到了一個平衡點 * 是第一個允許使用 efficient, general-purpose programming language interactively 處理 cluster 上 large dataset 的系統 ## 2. Programming Model driver program: application 的 high-level control flow + operation parallel Spark parallel programming * resilient distributed datasets * parallel operations on datasets(function to apply on a dataset) * two types of shared variables ### 2.1 Resilient Distributed Datasets (RDDs) In Spark, each RDD is represented by a Scala object construct RDDs * a file in a shared file system * parallelizing a Scala collection * 將 data 分成很多片並配送到多個 node 裡 * transforming an existing RDD * flatMap(map by user function), map(map by function), filter(只選取符合條件的 element) * changing the persistence of an existing RDD * default RDD: 只用在使用的時候才會出現在 memory 裡 * cache action: 是一個告知使用完還要儲存在 memory 裡的 hint,當 memory 不夠時,還是會溢出到 disk * save action: write RDD to distributed filesystem 讓 user 可以自行評估 store cost, speed of access, probability of losing, recomputing ### 2.2 Parallel Operations * reduce: 合併 dataset element 後在 driver program 產生結果 * collect: 將所有 dataset element 發送到 driver program * foreach: 將每個 dataset element 應用一個 user function reduce 是利用 shuffle transformation 達成 ### 2.3 Shared Variables ++使用 closures(function) 來當成參數使用 map, filter, reduce++ >當 worker 使用這些 closures >>包含的變數會被複製一份到 worker (copy by value) >>允許 user 創造2種 shared variable (copy by reference) * Broadcast variables * 運用: large read-only data * Accumulators * 特色: worker 只能藉由特定 operator add 且 只能由 driver 讀取 * 定義: 具有 an add operation 和 a zero value 的 type 都可以添加此 shared type * 運用: counter ## 3. Examples 省略 variable types,因為 Scala 支持 type 推斷(動態) ### 3.1 Text Search 在 HDFS log file 中找到 error line ``` val file = spark.textFile("hdfs://...") // collection of line val errs = file.filter(_.contains("ERROR")) // line contain errors val ones = errs.map(_ => 1) // key: line, value: 1 val count = ones.reduce(_+_) // add up val cachedErrs = errs.cache() // cached RDD to store errs(RDD) ``` ### 3.2 Logistic Regression ```for(p <- points){body}``` = ```points.foreach(p => {body})``` ### 3.3 Alternating Least Squares 預測 u 名用戶對 m 部電影的評分 >有一個部分填滿的矩陣 R,其中包含了一些用戶-電影的已知評分 >ALS 將 R = M * U,這兩個矩陣的維度分別為 m × k 和 k × u 使用已知評分算出 M 和 U,然後計算 M × U 來預測未知的評分 1. 初始化 M 為一個隨機值 2. 在給定 M 的情況下優化 U,以最小化 R 上的誤差 3. 在給定 U 的情況下優化 M,以最小化 R 上的誤差 4. 重複步驟 2 和 3 直到收斂 由於所有步驟都使用 R,將 R 設置為 Broadcast variables 就不需要在每個步驟中重新將其發送到每個節點 ``` val Rb = spark.broadcast(R) for (i <- 1 to ITERATIONS) { U = spark.parallelize(0 until u) .map(j => updateUser(j, Rb, M)) .collect() M = spark.parallelize(0 until m) .map(j => updateUser(j, Rb, U)) .collect() } ``` ## 4. Implementation * 每個 dataset object 包含一個指向其 parent pointer,用來告知 parent RDD 如何 transform ``` val file = spark.textFile("hdfs://...") val errs = file.filter(_.contains("ERROR")) val ones = errs.map(_ => 1) val count = ones.reduce(_+_) val cachedErrs = errs.cache() ``` ![image](https://hackmd.io/_uploads/Hyr9cteXR.png) * RDD object interface * ``getPartitions``: returns a list of partition IDs * ``getIterator(partition)``: iterates over a partition * ``getPreferredLocations(partition)``: 用於 task scheduling 來達到 data locality * schedule 1. Spark 對每個平行的在 partition 上進行的 operator 創建 task 2. 傳送到 worker node 上,盡量以 task 的 preferred location 來傳送 3. 一旦 worker 接收到 task,呼叫 getIterator 讀取 partition * RDDs 的不同 type 只是代表 RDD 如何實作 interface * Hdfs-TextFile * partition: block IDs in HDFS * preferred location: block location * getIterator: 打開一個 stream 來讀取 block * MappedDataset * partitions: 和 parent 一樣 * preferred locations: 和 parent 一樣 * getIterator: 應用 map function 到一個 parent 的 element * CachedDataset * preferred locations: 最初與 parent 相同,但在某個節點上 cache partition 後會更新,以優先 reuse 該節點 * getIterator: 尋找一個 transform 後 partition 的 locally cache copy * fault easy to handle:如果一個節點失敗,其 partition 會從它們的 parent dataset 重新讀取,並最終在其他節點上 cache * 傳送 closure 到 worker node * 用來 * 定義 dataset * pass to operations(ex: reduce) * Java object: 可以透過 Java serialization 來序列化(在多個 machine 上較可以直覺計算) * Spark 內建 closure * 有問題: closure 會使用到不在他自己 body 的 outer scope * 解決: 通過對 closure class 的 bytecode 進行靜態分析,檢測未使用的 variables 並將相應的字段在 closure object 中設為 null * Shared Variables: 使用帶有自定義序列化格式的 class * broadcast variable * 創建一個帶有值 v 的 broadcast variable b: v 被保存到shared file system 中的一個文件中。b 的序列化形式是該文件的路徑 * 在 worker node: 查詢 b 的值,首先檢查 v 是否在 local cache 中,如果不在,則從 file system 中讀取它 * Accumulator: serialization 技巧 * 創建: 每個 accumulator 被給予 unique ID,其序列化形式包含其 ID 和對其 type 的 0 值 * 在 worker node: 為執行 task 的每個 thread 創建一個獨立的 accumulator copy 本地變數 * 在 task 開始時,重置為 0 * 在 task 開史後,worker 發送一條消息給 driver program,包含它對各個 accumulator 所做的更新 * driver program 只對每個 operation 的每個 partition 的更新應用一次,以防止因 task 因故障重新執行時發生重複計數 * Interpreter Integration * interpreter 將它定義的 class 輸出到一個 shared filesystem 中,worker 可以使用自定義的 Java class loader 從中加載這些 class