# 5.2 Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing :::info **RDD設計背景** 改善MapReduce缺點: 1. 不夠高效,有大量資料複製、磁碟IO和序列化開銷。 2. 只適用批次資料處理。 3. 把中間結果寫入HDFS中,對於迭代式資料(機器學習、圖演算法、互動式資料探勘工具)處理效能比較差。 **RDD提供了一個抽象的資料架構**,我們不必擔心底層資料的分散特性,只需將具體的應用邏輯表達為一系列轉換處理,不同RDD之間的轉換操作形成依賴關係,可以實現管道化,從而**避免了中間結果的存儲**,大大降低了資料複製、磁碟IO和序列化開銷。 <!-- RDD適合對資料集中所有元素進行相同操作的批次應用 --> <!-- 在Spark廣泛使用以前,業界主要使用MapReduce 來對大量資料進行分散式處理,但因為**MapReduce**有一些限制,像是**不夠高效、只適用批次資料處理、對於迭代式資料處理效能比較差**等等,**Spark提出了一種新的資料抽像模式稱為RDD(彈性分散式資料集)**,讓使用者將資料保存在記憶體中,並且可以控制他們的分區來優化資料以及提供了一系列高級的操作介面。 --> ::: ## 1. Introduction 現在的框架在不同計算階段之間重用資料(如在兩個MapReduce的job之間)的唯一方式是將其寫入外部穩定儲存系統中,由於資料的複製、硬碟I/O和序列化,導致了大量的成本開銷,並佔據了應用程式運行的大部分時間。因此在這篇論文中提出了一種能夠廣泛運用用於各種應用中高效的資料重用抽象,**彈性分佈式資料集(RDDs)**,RDDs是一個**容錯的、並行的資料結構**,能夠**讓使用者明確地在記憶體中持久化中間結果**,控制其分區以優化資料的放置和使用豐富的操作符對其進行處理。 ## 2. Resilient Distributed Datasets (RDDs) ### 2.1 RDD Abstraction RDD只能透過**操作實體儲存的資料**或是**操作其他RDD**來產生,這種操作稱為**轉換(*transformation*)**,包含*map*, *join*, *filter*。 RDDs是**不可變**的,一旦創建就無法修改,所有轉換操作都會產生一個新的RDD,而不是修改現有的RDD。透過多個RDDs來表示不同版本的資料集以實現多狀態。 **可以從*lineage*了解自己是如何從其他資料集產生的,以及如何由持久化儲存中的來源資料計算得到的,也能因此實現高效恢復數據,不需要數據備份。** >**lineage:** 記錄了一個 RDD 從一個或多個其他 RDDs 透過一系列轉換操作(如 map、filter、join 等)生成的完整歷史。 使用者可以對RDDs進行2方面的控制: * persistence: 決定要重複使用的RDDs並為其選擇儲存策略(如in-memory)。 * partitioning: 將RDDs的元素透過特定key value進行分區。 ### 2.2 Spark Programming Interface Programmers透過將實體儲存上的資料集進行**transform**(如*map*和*filter*)來定義一個或多個RDDs,並用**actions**對這些RDDs進行操作。 *actions*包括: * *count*(傳回資料集中元素的數目) * *collect*(傳回元素本身) * *save*(將資料集輸出到外部儲存系統) Spark會將RDD的實例化延後到真正使用到action的時候,這樣中間環節的多個RDD轉換就可能會進行串列化轉換操作,所以可以對transform進行pipeline transformations。 Programmers也可以呼叫*persist*方法來宣告他們想重複使用的RDDs,為每個RDD設定優先權來表示當沒有足夠RAM的情況,那些資料能繼續存在記憶體中。 ### 2.3 Advantages of the RDD Model * 提供了一個粗粒度(coarse-grained)轉換的API,對每個資料項應用相同的操作。 * **使用日誌記錄資料集是如何產生的而不是去記錄資料本身**,使得容錯變得有效率,一旦某個partition 遺失,透過日誌可以找出該partition 是如何透過其他的RDD 經過什麼操作得到的,透過重新計算即可進行恢復,不需要資料的複製,也不需要任何回滾操作。 * 可以使用MapReduce 中的備份任務機制來緩解Straggler。 >Straggler: 有些task相對於其他正常的task執行異常的慢 ![image](https://hackmd.io/_uploads/SkyS6CEZC.png) ### 2.4 Applications Not Suitable for RDDs RDDs不太適合那些需要對共享狀態進行非同步、細粒度更新的應用程序,例如Web應用程式的儲存系統或Incremental Web Crawler。 >原因: >1. RDD 的不可變性,無法在運行時修改數據。 >2. RDD 的操作是粗粒度的,不適合需要對單個數據項目進行頻繁和獨立更新的情況。 >3. RDD 的計算是懶惰執行的,只有在行動操作(如 count、collect)被觸發時才進行,無法提供足夠的響應速度。 >4. 非同步共享狀態更新需要能夠快速且靈活地反映多個節點的操作影響,但RDD不直接支持共享狀態, >what is Incremental Web Crawler? >和傳統網頁爬蟲相比不需要從頭開始重新爬取整個網站,而是能夠識別出自上次爬取以來已經變更或新增的內容,只對這些部分進行爬取。 ## 3. Spark Programming Interface Spark透過Scala提供整合API的RDDs抽象。 >Scala是基於Java VM的靜態型別函數程式語言。 怎麼使用Spark? * Programmer寫一個連接到一組*workers*的*driver program*,如Figure 2,driver定義一個或多個RDDs並呼叫*action*操作,driver上的Spark程式碼也會追蹤RDDs的lineage。 workers是持久運行的process,它們能夠在多個操作之間,在RAM中儲存 RDD的分區,也就是說一旦某個RDD的分區被計算出來並被分配到某個工作節點上,這個分區就可以留在該節點的內存中,供後續的操作重用,而無需在每次操作中重新計算。 ![image](https://hackmd.io/_uploads/HkM8aC4bC.png) ### 3.1 RDD Operations in Spark ![image](https://hackmd.io/_uploads/SJ1DTR4-C.png) ### 3.2 Example Applications #### Logistic Regression 許多機器學習演算法本質上是迭代的,透過迭代來最佳化過程,例如梯度下降。 下面程式用於找到一個hyperplane $w$ 能最好的分出兩類,該分類演算法使用梯度下降:它以隨機值開始w,並且在每次迭代時,它將$w$的函數與數據相加以沿著改善它的方向移動。 ```java = val points = spark.textFile(...).map(parsePoint).persist() var w = // random initial vector for (i <- 1 to ITERATIONS) { val gradient = points.map{ p => p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y }.reduce((a,b) => a+b) w -= gradient } ``` #### PageRank 展示如何控制RDD的分區以提高效能。 透過累加在文件中對每個文件的應用次數迭代地更新每個文件的rank,在每次迭代時,每個檔案都會向其鄰居發送r/n的貢獻值,r是其排名,n是其鄰居的數量,然後透過 $α/N + (1 − α)∑c_i$式子更新排名,其中求和是它所收到的貢獻值,N是文件的總數。 ```java = val links = spark.textFile(...).map(...).persist() var ranks = // RDD of (URL, rank) pairs for (i <- 1 to ITERATIONS) { // Build an RDD of (targetURL, float) pairs // with the contributions sent by each page val contribs = links.join(ranks).flatMap { (url, (links, rank)) =>links.map(dest => (dest, rank/links.size)) } // Sum contributions by URL and get new ranks ranks = contribs.reduceByKey((x,y) => x+y) .mapValues(sum => a/N + (1-a)*sum) } ``` 上面程式可以產生Fig 3的RDD lineage圖 ![image](https://hackmd.io/_uploads/H1hv604-C.png) ## 4. Representing RDDs 透過一個公共介面來表示每個RDD,這個介面包含五個資訊,如Table 3。 ![image](https://hackmd.io/_uploads/H1A_TAN-C.png) * 如何表示RDDs之間的依賴關係? (Figure 4) * **narrow dependencies(窄依賴)**: 父RDD每個分區最多由子RDD的一個分區使用,函數包括$map$、$filter$、$union$、$join$(父RDD 是hash-partitioned)等 * **wide dependencies(寬依賴)**: 父節點的每個分片,被多個子節點依賴,函數包括$groupByKey$、$join$(父RDD 不是hash-partitiond)等。 ![image](https://hackmd.io/_uploads/S1ZcTRNbR.png) * 窄依賴的優點 * 串行化允許多個RDD操作在一個機器上串行化處理(因為它不依賴其他RDD),可以避免RDD傳輸的網路開銷,以及各個操作之間的同步等待。 * 恢復效率更高,當一個RDD失效後,只需要將父RDD 中的對應分區重新計算即可,還能將恢復任務調度到不同的節點上並行進行。 ## 5. Implementation 大約用14000行scala code實作了Spark ### 5.1 Job Scheduling ![image](https://hackmd.io/_uploads/r1TqpANZC.png) * schedule是被一個*action*觸發的 * scheduler檢查RDD的*lineage*生成DAG * scheduler根據DAG劃分為不同的stage,遇到寬依賴斷開,遇到窄依賴就將當前RDD 加入到Stage 中;將窄依賴盡量劃分到同一個Stage 中,劃分的原則是每個階段包含盡可能多的具有窄依賴的管線轉換。(Figure 5) * stage的邊界一般寬依賴所需的shuffle操作,或任何已經計算過的分區。 * scheduler開始建立缺少資料分片的RDD,直到最後建立了目標(執行*action*操作的)RDD。 scheduler使用延遲調度方法根據數據位置將任務分配給機器,也就是scheduler在決定將任務發送到哪個節點時,會優先考慮數據已經存在的地方。 * 如果一個task處理中需要的資料已經在某個節點的內存中,那麼將task送到那個節點處理(而不是將資料拉過來)。 * 如果一個task處理的資料分片包含在地化訊息或是沒有在節點的內存忠,會將task傳送到本地化訊息中(或利用與數據相關的位置信息)認為最優的那個節點(最接近數據的節點)。 ### 5.2 Interpreter Integration Scala解釋器通常會透過為使用者輸入的每一行編譯一個類,將其載入到JVM中,並在其上呼叫函數來操作,該類別包含一個單例對象,該對象包含該行上的變數或函數,並在初始化方法中運行行程式碼。例如,如果使用者輸入程式碼$var$ $x = 5$,接著又輸入$println(x)$,則解譯器會定義一個包含x的Line1類,並將第2行編譯為$println(Line1.getInstance().x)$。 Spark做的改變: * Class shipping: 使用HTTP協定讓work能取得每行定義的class的bytecode。 * Modified code generation: 引用了前面程式碼行中變量,例如上面的範例Line1.x, 目的是避免JVM不把靜態成員序列化的特點。 ### 5.3 Memory Management Spark提供了三種儲存persistent RDDs的選項 * 反序列化Java物件的記憶體存儲: 效能最強,Java VM 可以直接存取RDD。 * 序列化資料的記憶體儲存: 效能次之,當記憶體不足時,可以更有效的保存數據。 * 儲存在硬碟上: 效能最差,對於資料量特別大時較為適用。 記憶體不足時, 使用LRU淘汰使用較少的RDD,但如果一個RDD被新創建的RDD使用,則不淘汰(避免循環淘汰) ### 5.4 Support for Checkpointing 為了避免RDD有過多的依賴資訊,導致恢復很耗時,將一些RDDs保存到 stable storage(也就是checkpoint)可能會有所幫助。 ## 6. Evaluation * 在迭代機器學習和圖形應用程式中,Spark的效能比Hadoop高出20倍。因為透過將資料儲存為Java物件在記憶體中來避免I/O和反序列化成本。 * 使用Spark分析報表比在Hadoop上運行快40倍。 * 當節點發生故障時,Spark可以透過僅重建遺失的RDD分割區來快速恢復。 * Spark可用於交互查詢1 TB資料集,延遲僅5-7秒。 ## 7. Discussion RDDs可以有效地表達迄今為止獨立提出的許多集群程式設計模型,代表RDDs不僅可以用於產生與這些模型中編寫的程式相同的輸出,還可以實現這些框架具有的最佳化,例如將特定資料保存在記憶體中,將其分區為最大限度地減少通信,並有效地從故障中恢復。 使用RDD表現的模型: 1. MapReduce 2. DryadLINQ 3. SQL 4. Pregel 5. Iterative MapReduce 6. Batched Stream Processing: ### Explaining the Expressivity of RDDs * 為什麼RDD能夠表達這些不同的程式設計模型? RDD的限制對許多平行應用程式幾乎沒有影響。 雖然RDDs只能透過批量轉換來創建,但許多並行程式本質上就是將相同的操作應用於許多記錄。 ## 8. Related Work ## 9. Conclusion 彈性分散式資料集(RDDs)是一種高效、通用和容錯的,用於在cluster applications中共享資料的抽象化,提供基於粗粒度轉換的API,使其能夠使用lineage來有效地恢復資料。 ## Reference [翻譯] - https://fangmiao97.github.io/2019/04/13/tanslate-Resilient-Distributed-Datasets-A-Fault-Tolerant-Abstraction%E2%80%93for-In-Memory-Cluster-Computing/ [筆記] - https://www.cnblogs.com/zwCHAN/p/4231419.html [筆記] - https://zhuanlan.zhihu.com/p/88472330