# 5.2 Spark: Cluster Computing with Working Sets :::info 提出了一個名為**Spark**的新框架,被設計來解決 MapReduce 在處理某些類型計算(迭代任務和互動式分析)時的一些限制,並且保留MapReduce 的可擴展性和容錯性,引入了一種叫做 **彈性分散式數據集(RDDs)** 的抽象概念,允許數據在多個不同的操作之間被重用和快速迭代。 ::: >Spark主要是用在資料量沒那麼大的data(因為會將資料存在memory,資料量過大會out of memory),而MapReduce最大的優勢是能處理非常大量的資料,因為它將data存在disk上。 >所以在實務上,通常會是通過MapReduce將大量的資料處理過後再將少部分資料丟給Spark做機器學習之類需要iterative的工作。 ## 1. Introduction MapReduce有兩個方面的問題: 1. **Iterative jobs:** 雖然每次迭代可以都可以表示一個MapReduce任務,但每個作業(每一次迭代)都必須從磁碟重新載入數據,導致明顯的效能損失。 2. **Interactive analytics:** Hadoop通常用於Pig和Hive等SQL在大型資料集上執行查詢,但使用Hadoop 查詢都會有嚴重的延遲(幾十秒),因為 Hadoop 將查詢作為一個獨立的 MapReduce 作業,從磁碟中讀取數據。 因此本篇提出一個新的cluster computing框架Spark,支援應用平行處理作業,同時提供了與MapReduce類似的可擴展性和容錯性,並引入一個重要概念RDD(resilient distribution dataset,彈性分散式資料集),用來解決迭代作業的問題,可以分佈在一組機器上的相關物件集合,如果其中一個分割區遺失了同樣可以重建。 ## 2. Programming Model 為了使用Spark,開發人員需要編寫Driver,作用是控制應用程式的執行流程並在並行的環境中執行一系列的平行操作,Spark主要提供了兩類抽象:Resilient Distributed Datasets (RDD) 和parallel operation,此外還支援兩種受限的共享變量。 ### 2.1 Resilient Distributed Datasets (RDDs) 可參考: [RDD論文筆記](https://hackmd.io/@11220CS542600/Sywc9LmZ0) RDD 是 Spark 中用於分佈式數據處理的一種核心數據結構,Spark 是用 Scala 語言寫成的。 * RDD特性: * 一組跨電腦間的可分割的read-only物件集合。 * 分割區遺失可以重建 (因為RDD不需要物化在實體儲存上,可以透過實體儲存上的資料來建構RDD)。 * 每個RDD都由一個Scala物件表示。 * 可持久化RDD,供後續計算使用。 >**什麼是可持久化RDD?** >當 RDD 被標記為可持久化時,Spark 會將該 RDD 的第一次計算結果存儲起來,當後續操作需要使用到這個 RDD 時,Spark 可以直接使用已經計算並存儲的數據,而無需再次進行計算。 * 如何建立一個RDD: 1. 從HDFS這樣的共享檔案系統中建構。 2. 透過在一個驅動程式中**parallelizing** 讀取Scala集合,將它分成多個slices發送到多個node上。 3. 從另一個RDD**transform**而來。 4. 透過更改現有RDD的**persistence**。 >**RDD預設是lazy和ephemeral的**,只有當它們在一個操作中被使用了,計算才會實際執行,但是可以透過特定的操作來改變其持久性。 >**如何透過特定的操作來改變RDD persistence?** >1. **cache action:** 將資料保存在記憶體中,讓之後重複使用時可以快速的使用。 >2. **save action:** 將資料持久化到像HDFS這樣的分散式檔案存統上,這個被保存的版本也可以在後期的操作中重複使用。 ### 2.2 Parallel Operations RDD 上可以執行一系列的平行操作: 1. **reduce:** 透過關聯函數整合資料集中的要素 2. **collect:** 將資料集中所有的元素傳送給驅動程式 3. **foreach:** 透過使用者提供的函數來遍歷每個元素,轉換資料的要素。 ### 2.3 Shared Variables Spark允許programmer建立兩種受限類型的共享變量,以下是兩種簡單常見的使用模式: 1. **Broadcast variables:** 如果有一個很大的read-only資料片段(如lookup table),需要用於多個並行的操作,最好是把它一次分發給每個工作空間,而不是每一個閉包裡面都去做一個封裝。 2. **Accumulators:**:可以在Worker節點間共用該變量,可以用來作為計數器。這些變數只能進行「add」操作,並且只能被驅動程式讀取,可以被定義成任意類型,只要該類型具有「add」操作、能賦值 0 就好。 ## 3. Examples 下面會列舉三個範例來顯示如何使用上述特性,省略變數類型,因為Scala支援type inference。 ### 3.1 Text Search 假設在HDFS上面保存有一批超大數量日誌文件,需要尋找其中錯誤的row,可以透過一個檔案資料集物件(file dataset object)來實現。 ``` java= val file = spark.textFile("hdfs://...") //建立一個叫做「file」的多個資料集,將HDFS檔案表示成一個行集合 val errs = file.filter(_.contains("ERROR"))//轉換資料集建立一個包含錯誤的行集合errs val ones = errs.map(_ => 1)//把每一行都映射為1(找到一行,就計數為1) val count = ones.reduce(_+_)//採用聚合函數將這些資料累積起來。 ``` 假設需要對儲存在HDFS中的大型日誌檔案中包含的錯誤行進行統計,以透過一個檔案資料集物件(file dataset object)來實,上面的程式碼範例使用Spark的方式實作了MapReduce操作,與MapReduce的操作不同的是,Spark可以保存中間資料,如果我們想要保存errs數據,就可以使用以下方式建立一個快取的RDD: <!-- 因為RDDS是延遲執行的,所以所errs和ones在初始化的時候沒有進行實作,當reduce被呼叫的時候,每個工作節點都去掃描輸入模組,以流的方式來讀取數據,並將計數累加到驅動程式。 --> ``` java= val cachedErrs = errs.cache() ``` 這樣如果後續我們需要讀errs資料進行更多的操作,就會大大的提高執行效率。 ### 3.2 Logistic Regression 邏輯迴歸是一種反覆迭代的分類演算法,該演算法試圖找到一個超參數來更好地劃分類別。 ```java= // Read points from a text file and cache them val points = spark.textFile(...).map(parsePoint).cache() // Initialize w to random D-dimensional vector var w = Vector.random(D) // Run multiple iterations to update w for (i <- 1 to ITERATIONS) { val grad = spark.accumulator(new Vector(D)) for (p <- points) { // Runs in parallel val s = (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y grad += s * p.x } w -= grad.value } ``` Logistic Regression是一種迭代演算法,因此可將迭代資料快取在記憶體中從而提高執行效率(迭代ITERATIONS次,每次points都是從cache到記憶體的資料來讀)。將梯度設定成累加器變量,這樣其就可以在並行的環境下進行累加了。 ### 3.3 Alternating Least Squares (ALS) ALS演算法是用來**處理良好過濾的問題**,例如我們要透過使用者對電影觀看歷史和評分來預測他們喜歡的電影。 **ALS 的基本過程**:假設我們想要預測使用者對電影的評分,並且我們有一個已經填充了部分R的矩陣,這個矩陣包含了已知的使用者對電影的評分,ALS將R作為兩個矩陣M、U的積,M和U矩陣分別以m×k和k×u來表示,這表示每個使用者和每部電影都有k維特徵來描述它們的特點,使用者對電影的評分即為使用者特徵分配和電影特徵分配的積數,ALS透過已知的評分來獲得M和U矩陣,然後透過計算M×U來預測未知的評分。演算法主要透過以下的迭代過程來實現: 1. 隨機初始化矩陣M 2. 透過給定的M'來優化U,使得R上的托盤最小 3. 透過U來優化M,實現R上的托盤最小 4. 重複步驟2和步驟3直到收斂 ```java= val Rb = spark.broadcast(R) for (i <- 1 to ITERATIONS) { U = spark.parallelize(0 until u).map(j => updateUser(j, Rb, M))collect() M = spark.parallelize(0 until m).map(j => updateUser(j, Rb, U)).collect() } ``` 計算U和M時,都是透過並行化的方式進行計算的,而計算的過程中每一次循環,都需要數據集R,因此我們可以把數據集R設置成廣播變量,在程序啟動之後,數據集R只會被driver傳送一次給所有參與計算的worker節點。 ## 4. Implementation Spark 建立在 Mesos 之上,Mesos 是一個cluster operating system(集群作業系統),允許多個集群程式以一種細粒度(非常精確和靈活)的方式來共享一個集群,同時為應用程式提供在集群上啟動任務的 API,使得 Spark 能夠與現有的一些cluster運算框架並行運行,如Hadoop、MPI 的Mesos 連接埠。 Spark的核心是彈性分佈資料集(RDD)的實作。例如:假設我們定義了一個叫做cachedErrs 的快取資料集,它用來表示一個日誌檔案中的錯誤訊息,我們透過使用 map和reduce操作來統計元素的個數,code如3.1所示。 ```java= val file = spark.textFile("hdfs://...") val errs = file.filter(_.contains("ERROR")) val cachedErrs = errs.cache() val ones = cachedErrs.map(_ => 1) val count = ones.reduce(_+_) ``` 這些資料集將儲存為保存有每個RDD關係的物件鏈,如下圖所示,每個資料集物件包含指向父節點的指標以及如何轉換得到父節點的資訊。 ![image](https://hackmd.io/_uploads/BJihpRNZ0.png) 在內部,每個 RDD 物件都實作了相同的簡單接口,這些接口包含了以下三個操作: 1. getPartitions : 回傳partition ID列表 2. getIterator(partition) : 遍歷一個分區 3. getPreferredLocations(partition) : 用於任務調度以實現資料局部性 不同類型RDD 的差異只在於它們實作RDD 介面的方式,以下是一些例子 * HdfsTextFile RDD * 分區:基於 HDFS 中的block ID。 * 首選位置:與 HDFS 上的block位置相對應,有助於減少讀取時的網路開銷。 * getIterator:打開流來讀取 HDFS 塊的內容。 * MappedDataset RDD: * 分區和首選位置:與其父節點相同,保留了父數據集的分佈特性。 * getIterator:對父節點的元素應用 map 函數來生成新的元素。 * CachedDataset RDD: * 分區的首選位置:剛開始時與父節點一樣,但是一旦數據被分佈到其他節點並被缓存,這些首選位置會更新以反映新的存儲位置。 * getIterator:從本地存儲中找到轉換後的分區副本來讀取數據,有助於提高數據處理速度。 這種設計使得錯誤很容易處理,如果一個節點出現錯誤,可以從父數據集中重新讀取並將數據分散到其他節點,或是從HDFS上其他節點的副本中重新讀取丟失的數據分區。 ### 共享變數 有broadcast variables、accumulators 這兩種共享變量,它們是透過自訂序列化的類別來實現的。 * **broadcast variables:** 當建立廣播變數b,並賦值為v時,v保存在共用檔案系統中的一個檔案中,b的序列化格式即為該檔案的路徑,當b在一個工作節點中被使用時,Spark首先會檢查v是否是本地緩存,如果不是則從檔案系統中讀取它。最初是使用HDFS來創建廣播變量,後來開發出一個更有效率的串流廣播系統。 * **accumulators:** 透過使用不同的「序列化技巧」來實現,每個累加器在創建時都會有訓練的id,當累加器被保存時,它的序列化包含了它的id以及零值。 ### 解譯器整合 在Spark中整合了Scala的解譯器,對Scala的解譯器做了以下兩點的改變。 1. 將解釋器定義的類別輸出到共享檔案系統中,worker可以透過自訂的java類別載入器載入它們。 2. 改變Scala產生的程式碼,讓每行程式碼對應的單例物件直接引用前面行的單例對象,而不是透過getInstance方法取得前面的行單例物件,這樣就可以使得:無論行單例物件何時被序列化發送給worker,都可以捕獲其引用的單例物件的當前狀態。 ## 5. Result ### Logistic Regression 將3.2中邏輯回歸任務的效能與Hadoop的邏輯回歸實作進行比較,結果如下圖。 ![image](https://hackmd.io/_uploads/B1ta6RV-R.png) Hadoop每次迭代都需要127s,因為它作為獨立的MapReduce作業運行,Spark第一次迭代需要174s(可能是因為使用Scala而不是Java),但後續迭代只需要6s,因為每次迭代都會重複使用快取資料,所以隨著迭代次數增加,Spark比Hadoop速度快更多。 ### Alternating Least Squares 發現在不使用廣播變數的情況下,每次迭代重新發送rating matrix R的時間佔據了大部分的任務運行時間,通過使用 HDFS 或 NFS 進行的簡單廣播實現,廣播時間隨著節點數量的增長而線性增長,從而限制了任務的可擴展性。我們實現了應用級多播系統來緩解這種情況。然而,即使使用快速廣播,在每次迭代時重新發送R也是昂貴的。 ### Interactive Spark 使用Spark interpreter在15個「m1.xlarge」EC2機器的記憶體中載入39 GB的維基百科數據,並以互動方式查詢,第一次查詢資料集時,大約需要35秒,與在其上執行Hadoop作業相當,但是後續查詢只需0.5到1秒,即使它們掃描所有資料也是如此。 ## 6. Related Work ### Distributed Shared Memory Spark 的彈性分佈式數據集(RDDs)可以被視為distributed shared memory(DSM)的一種抽象,RDD 和 DSM interfaces 在兩個方面有所不同,不僅優化了性能和可擴展性,還增強了容錯能力: 1. **限制的編程模型:** RDDs提供了一個更加限制的編程模型,允許在集群節點失敗時高效地重建數據。一些 DSM 系統透過 checkpointing 來實現容錯,而 Spark 則是利用捕捉在 RDD 對象中的lineage information來重構丟失的 RDD 分區,也就是只有丟失的分區需要被重新計算,且這些分區可以在不同節點上並行重計算,程式不需回到檢查點。此外,如果沒有節點失敗,則不會有額外開銷。 2. **push computation to the data:** 與 MapReduce 一樣,RDDs 將計算推向數據,而不是讓任意節點訪問全局地址空間,這樣可以提高數據處理的效率和效能。 ### Cluster Computing Frameworks ### Language Integration ### Lineage ## 7. Discussion and Future Work Spark為programming clusters提供了三種簡單的資料抽象,能夠運用在迭代計算和交互式計算中。 * resilient distributed datasets (RDDs) * two restricted types of shared variables * broadcast variables * accumulators **未來工作** * 正式界定 RDDs 及 Spark 其他抽象的特性,以及它們適用於各類應用和工作負載的適宜性。 * 增強 RDD 抽象,以允許programmers在存儲成本和重建成本之間進行權衡。 * 定義新的操作來轉換 RDDs。包括一個按給定的key重新劃分 RDD 的 "shuffle" 操作,允許我們實現group-bys 和 joins)。 * 在 Spark interpreter上提供更高級別的交互式界面。 # Reference 1. [筆記]https://www.jianshu.com/p/846efcd407ba 2. [翻譯]https://www.cnblogs.com/wwzyy/p/16539671.html