## CH10 Batch data process with Mapreduce Unix 工具最大的限制就是你只能在單台機器上執行,而 Hadoop 有點像 Unix 的分散式版本,其中 HDFS 是檔案系統,而 MapReduce 是 Unix 程序的實現。 其實 MapReduce 的技術並不新奇,過去也有 **大規模並行處理(MPP, massively parallel processing)** 來處理類似的事情。 ![](https://i.imgur.com/GuGcrUS.gif) 最大的區別是,MPP 資料庫專注於在一組機器上並行執行分析 SQL 查詢,而 MapReduce 配合 Hadoop 的組合,則更像是一個可以執行任意程式的通用作業系統。 ### 儲存多樣性 不同於 MPP 架構下的資料庫,會需要使用者根據特定的模型(例如關係或文件)來構造資料。 而分散式檔案系統中的檔案只是位元組序列,可以使用任何資料模型和編碼來編寫。它們可能是資料庫記錄的集合,但同樣可以是文字、影象、影片、感測器讀數、稀疏矩陣、特徵向量、基因組序列或任何其他型別的資料。 說白了,Hadoop 開放了將資料不加區分地轉儲到 HDFS 的可能性,允許後續再研究如何進一步處理。相比之下,在將資料匯入資料庫專有儲存格式之前,MPP 資料庫通常需要對資料和查詢模式進行仔細的前期建模。 相較於過往,透過仔細的建模和匯入,可以讓資料庫的使用者有更高質量的資料來處理。但實踐經驗表明,簡單地使資料快速可用,通常要比事先決定理想資料模型要更有價值,原因如以下兩點 1. MPP 資料庫所要求的謹慎模式設計拖慢了集中式資料收集速度 2. 保持資料的靈活性,符合**壽司原則(sushi principle)**,讓不同團隊能各自針對 raw data 去處理與解讀,減少 overcooked 的情況發生 因此,Hadoop 經常被用於實現 ETL 過程,事務處理系統中的資料以某種原始形式轉儲到分散式檔案系統中,然後編寫 MapReduce 作業來清理資料,將其轉換為關係形式,並將其匯入 MPP 資料倉庫以進行分析。資料建模仍然在進行,但它在一個單獨的步驟中進行,與資料收集相解耦。這種解耦是可行的,因為分散式檔案系統支援以任何格式編碼的資料。 ### 處理模型的多樣性 MPP 簡單來說,就是將 SQL query 分散到不同的 data node 去處理,其針對的資料庫,還是 RDBMS 此特定資料庫,因此會針對 SQL query process 進行調整和最佳化,以獲得非常好的效能。讓業務分析師,使用的視覺化工具(例如 Tableau)快速地獲得 result。 但並非所有型別的處理都可以合理地表達為 SQL 查詢。例如,如果要構建機器學習和推薦系統,或者使用相關性排名模型的全文搜尋索引,或者執行影象分析,則很可能需要更一般的資料處理模型。因此它們不可避免地需要編寫程式碼,而不僅僅是查詢。 MapReduce 使工程師能夠輕鬆地在大型資料集上執行自己的程式碼。如果你有 HDFS 和 MapReduce,不僅可以建立一個 SQL 查詢執行引擎,也可以編寫許多其他形式的批處理,這些批處理不必非要用 SQL 查詢表示。 在 Hadoop 架構下,不同的處理模型都可以在共享的單個機器叢集上執行,所有這些機器都可以訪問分散式檔案系統上的相同檔案,不需要將資料匯入到幾個不同的專用系統中進行不同型別的處理:系統足夠靈活,可以支援同一個叢集內不同的工作負載。不需要移動資料,使得從資料中挖掘價值變得容易得多,也使採用新的處理模型容易的多。 像是以下兩種 OLTP 的資料庫 1. HBase 2. Impala (類似 MPP 架構的資料格式) 兩者都不使用 MapReduce,但都使用 HDFS 進行儲存。它們是不一樣的資料訪問與處理方法,但他們可以共存並被整合到同一個系統中。 ### 針對頻繁故障設計 #### MPP 如果一個節點在執行查詢時崩潰,大多數 MPP 資料庫會中止整個查詢,並讓使用者重新提交查詢或自動重新執行它。由於查詢通常最多執行幾秒鐘或幾分鐘,所以這種錯誤處理的方法是可以接受的,因為重試的代價不是太大。MPP 資料庫還傾向於在記憶體中保留儘可能多的資料(例如,使用雜湊連線)以避免從磁碟讀取的開銷。 #### Mapreduce 另一方面,MapReduce 可以容忍單個 Map 或 Reduce 任務的失敗,而不會影響作業的整體,透過以單個任務的粒度重試工作。它也會非常急切地將資料寫入磁碟,一方面是為了容錯,另一部分是因為假設資料集太大而不能適應記憶體。 MapReduce 方式更適用於較大的作業:要處理如此之多的資料並執行很長時間的作業,以至於在此過程中很可能至少遇到一個任務故障。在這種情況下,由於單個任務失敗而重新執行整個作業將是非常浪費的。即使以單個任務的粒度進行恢復引入了使得無故障處理更慢的開銷,但如果任務失敗率足夠高,這仍然是一種合理的權衡。 ### MapReduce 物化中間狀態 如前所述,每個 MapReduce 作業都獨立於其他任何作業。作業與世界其他地方的主要連線點是分散式檔案系統上的輸入和輸出目錄。如果希望一個作業的輸出成為第二個作業的輸入,則需要將第二個作業的輸入目錄配置為第一個作業輸出目錄,且外部工作流排程程式必須在第一個作業完成後再啟動第二個。 如果第一個作業的輸出是要在組織內廣泛釋出的資料集,則這種配置是合理的。在這種情況下,你需要透過名稱引用它,並將其重用為多個不同作業的輸入(包括由其他團隊開發的作業)。將資料釋出到分散式檔案系統中眾所周知的位置能夠帶來 **鬆耦合**,這樣作業就不需要知道是誰在提供輸入或誰在消費輸出 但在很多情況下,你知道一個作業的輸出只能用作另一個作業的輸入,這些作業由同一個團隊維護。在這種情況下,分散式檔案系統上的檔案只是簡單的 **中間狀態(intermediate state)**:一種將資料從一個作業傳遞到下一個作業的方式。在一個用於構建推薦系統的,由 50 或 100 個 MapReduce 作業組成的複雜工作流中,存在著很多這樣的中間狀態【29】。 將這個中間狀態寫入檔案的過程稱為 **物化(materialization)**。 如同前幾天討論的那般,每一個 MapReduce 的 job 都是獨立的,有依賴關係之 job 的唯一接觸點是資料位置,第 2 個 job 的輸入是第 1 個 job 的輸出,所以有些工程師會仰賴外部工具來安排所有 MapReduce job 的調度。 然而在大多數的情況下,job 的輸出只會被另一個 job 來使用,所以分散式檔案系統上很多資料都處於 中介狀態 (intermediate state),在一個使用 50 ~ 100 個 MapReduce job 的推薦引擎 workflow 下,其中介狀態資料的數量可以很驚人。 相比之下,Day 23 示範的用 Unix 工具分析 log 的例子就沒有這問題,管線命令 (pipe) 不會完整的實體化 (materialize) 中介狀態,而是以逐漸增量的方式,將輸出串流到輸入,而僅使用一個小的記憶體緩衝區。 跟 Unix 工具相比,MapReduce 這種完全實體化中介狀態的方法有以下缺點: 1. 一個 MapReudce job 需要等前面的 job 執行完才會開始,如果發生 Day 25 講的 傾斜 (skew) 就會拖慢 job 處理速度。 2. mapper 通常是多餘的,在某些案例下,你直接把 reducer 的輸出當做下一次 reudcer 的輸入會更快,省得用 mapper 再讀入一次,然後再做一次一樣的排序跟分區。 3. 中介狀態的資料還是會被分散式檔案系統做副本到多個節點上,但其實不需要。 ### 資料流引擎 (討論) 為了解決 MapReduce 的這些問題,幾種用於分散式批處理的新執行引擎被開發出來,其中最著名的是 Spark、Tez、Flink。它們的設計方式有很多區別,但有一個共同點:把整個工作流作為單個作業來處理,而不是把它分解為獨立的子作業。 ![](https://i.imgur.com/To3WNtT.png) 由於它們將工作流顯式建模為資料從幾個處理階段穿過,所以這些系統被稱為 **資料流引擎(dataflow engines)**。像 MapReduce 一樣,它們在一條線上透過反覆呼叫使用者定義的函式來一次處理一條記錄。 與 MapReduce 不同,這些函式不需要嚴格扮演交織的 Map 與 Reduce 的角色,而是可以以更靈活的方式進行組合。我們稱這些函式為 **運算元(operators)**,資料流引擎提供了幾種不同的選項來將一個運算元的輸出連線到另一個運算元的輸入: - 一種選項是對記錄按鍵重新分割槽並排序,就像在 MapReduce 的混洗階段一樣。這種功能可以用於實現排序合併連線和分組,就像在 MapReduce 中一樣。 - 另一種可能是接受多個輸入,並以相同的方式進行分割槽,但跳過排序。當記錄的分割槽重要但順序無關緊要時,這省去了分割槽雜湊連線的工作,因為構建散列表還是會把順序隨機打亂。 - 對於廣播雜湊連線,可以將一個運算元的輸出,傳送到連線運算元的所有分割槽。 這種型別的處理方式,它有幾個優點: - 像是 Sort 等高耗能的工作只需要在實際需要的地方執行,而不是預設地在每個 Map 和 Reduce 階段之間出現。 - 沒有不必要的 Map 任務,因為 Mapper 所做的工作通常可以合併到前面的 Reduce 運算元中(因為 Mapper 不會更改資料集的分割槽)。 - 由於工作流中的所有連線和資料依賴都是顯式宣告的,因此排程程式能夠總覽全域性,知道哪裡需要哪些資料,因而能夠利用區域性性進行最佳化。例如,它可以嘗試將消費某些資料的任務放在與生成這些資料的任務相同的機器上,從而資料可以透過共享記憶體緩衝區傳輸,而不必透過網路複製。 - 通常,運算元間的中間狀態足以儲存在記憶體中或寫入本地磁碟,這比寫入 HDFS 需要更少的 I/O(必須將其複製到多臺機器,並將每個副本寫入磁碟)。 MapReduce 已經對 Mapper 的輸出做了這種最佳化,但資料流引擎將這種思想推廣至所有的中間狀態。 - 運算元可以在輸入就緒後立即開始執行;後續階段無需等待前驅階段整個完成後再開始。 - 與 MapReduce(為每個任務啟動一個新的 JVM)相比,現有 Java 虛擬機器(JVM)程序可以重用來執行新運算元,從而減少啟動開銷。 ### 容錯 完全物化中間狀態至分散式檔案系統的一個優點是,它具有永續性,這使得 MapReduce 中的容錯相當容易:如果一個任務失敗,它可以在另一臺機器上重新啟動,並從檔案系統重新讀取相同的輸入。 ![](https://i.imgur.com/n1bC5G1.png) Spark 避免將中間狀態寫入 HDFS,因此它們採取了不同的方法來容錯:如果一臺機器發生故障,並且該機器上的中間狀態丟失,則它會從其他仍然可用的資料重新計算(在可行的情況下是先前的中間狀態,要麼就只能是原始輸入資料,通常在 HDFS 上)。 為了實現這種重新計算,框架必須跟蹤一個給定的資料是如何計算的 —— 使用了哪些輸入分割槽?應用了哪些運算元? Spark 使用 **彈性分散式資料集(RDD,Resilient Distributed Dataset)** 的抽象來跟蹤資料的譜系。 > RDD有三個特性: > > * 不可更動(Immutable):每個RDD都是不能被改變的(可以像Java的String一樣),想要更新的?從既有中再建立另一個吧。這樣的作法乍看下可能感覺怪怪的,但仔細想想,要讓資料容易用於分散式系統,Immutable是關鍵的一環,因為每個RDD都保證不會被更動。 > * 彈性(Resilient):分散式環境中忽然有節點失效是正常的(而且要平常心XD),那上面Spark正在使用或建立的RDD怎麼辦?沒關係,Spark會想辦法幫你重建。這與之後會提到的RDD lineage概念有關。 > * 分散式(Distributed):資料集可跨多個節點,並儲存在每個節點的記憶體內,所以Spark是記憶體怪獸,優點當然就是執行速度較快,但要小心網路資料交換(shuffling)這類昂貴操作。 在重新計算資料時,重要的是要知道計算是否是 **確定性的**:也就是說,給定相同的輸入資料,運算元是否始終產生相同的輸出?如果一些丟失的資料已經發送給下游運算元,這個問題就很重要。如果運算元重新啟動,重新計算的資料與原有的丟失資料不一致,下游運算元很難解決新舊資料之間的矛盾。對於不確定性運算元來說,解決方案通常是殺死下游運算元,然後再重跑新資料。 不過因為每次計算時的分群有所不同,透過重算資料來從故障中恢復並不總是正確的答案:如果中間狀態資料要比源資料小得多,或者如果計算量非常大,那麼將中間資料物化為檔案可能要比重新計算廉價的多。 ### 專業化的不同領域 儘管能夠執行任意程式碼的可擴充套件性是很有用的,但是也有很多常見的例子,不斷重複著標準的處理模式。因而這些模式值得擁有自己的可重用通用構建模組實現。傳統上,MPP 資料庫滿足了商業智慧分析和業務報表的需求,但這只是許多使用批處理的領域之一。 另一個越來越重要的領域是統計和數值演算法,它們是機器學習應用所需要的(例如分類器和推薦系統)。可重用的實現正在出現:例如,Mahout 在 MapReduce、Spark 和 Flink 之上實現了用於機器學習的各種演算法。 批處理引擎正被用於分散式執行日益廣泛的各領域演算法。隨著批處理系統獲得各種內建功能以及高階宣告式運算元,且隨著 MPP 資料庫變得更加靈活和易於程式設計,兩者開始看起來相似了:最終,它們都只是儲存和處理資料的系統。 ## 本章小結 在本章中,我們探索了批處理的主題。我們首先看到了諸如 awk、grep 和 sort 之類的 Unix 工具,然後我們看到了這些工具的設計理念是如何應用到 MapReduce 和更近的資料流引擎中的。一些設計原則包括:輸入是不可變的,輸出是為了作為另一個(仍未知的)程式的輸入,而複雜的問題是透過編寫 “做好一件事” 的小工具來解決的。 在 Unix 世界中,允許程式與程式組合的統一介面是檔案與管道;在 MapReduce 中,該介面是一個分散式檔案系統。我們看到資料流引擎添加了自己的管道式資料傳輸機制,以避免將中間狀態物化至分散式檔案系統,但作業的初始輸入和最終輸出通常仍是 HDFS。 分散式批處理框架需要解決的兩個主要問題是: * 分割槽 Partition 在 MapReduce 中,Mapper 根據輸入檔案塊進行分割槽。Mapper 的輸出被重新分割槽、排序併合併到可配置數量的 Reducer 分割槽中。這一過程的目的是把所有的 **相關** 資料(例如帶有相同鍵的所有記錄)都放在同一個地方。 * 容錯 MapReduce 經常寫入磁碟,這使得從單個失敗的任務恢復很輕鬆,無需重新啟動整個作業,但在無故障的情況下減慢了執行速度。資料流引擎更多地將中間狀態儲存在記憶體中,更少地物化中間狀態,這意味著如果節點發生故障,則需要重算更多的資料。確定性運算元減少了需要重算的資料量。 我們也討論了幾種 MapReduce 的連線演算法,它們也很好地演示了分割槽演算法是如何工作的: * 排序合併連線 Sort-merge joins 每個參與連線的輸入都透過一個提取連線鍵的 Mapper。透過分割槽、排序和合並,具有相同鍵的所有記錄最終都會進入相同的 Reducer 呼叫。這個函式能輸出連線好的記錄。 ![](https://i.imgur.com/uX7gef2.png) 一組 mapper 會走訪活動事件取出 userID 為 key 活動事件為 value ㄧ組 mapper 會走訪使用者資料庫取出 userID 為 key , 使用者生日作為 value * 廣播雜湊連線 Broadcast hash joins 兩個連線輸入之一很小,所以它並沒有分割槽,而且能被完全載入進一個雜湊表中。因此,你可以為連線輸入大端的每個分割槽啟動一個 Mapper,將輸入小端的散列表載入到每個 Mapper 中,然後掃描大端,一次一條記錄,併為每條記錄查詢散列表。 ![](https://i.imgur.com/y8kJRMk.png) * 分割槽雜湊連線 Partition hash joins 如果兩個連線輸入以相同的方式分割槽(使用相同的鍵,相同的雜湊函式和相同數量的分割槽),則可以獨立地對每個分割槽應用散列表方法。 ![](https://i.imgur.com/EEdHZZ2.png) 在批處理作業中的程式碼無需操心實現容錯機制:框架可以保證作業的最終輸出與沒有發生錯誤的情況相同,雖然實際上也許不得不重試各種任務。比起線上服務一邊處理使用者請求一邊將寫入資料庫作為處理請求的副作用,批處理提供的這種可靠性語義要強得多。 批處理作業的一大特點是,它讀取一些(immutable)輸入資料,來產生輸出資料,換言之,輸出是從輸入衍生出的。最關鍵的是,輸入資料是 **有界的(bounded)**:它有一個已知的資料。因為它是有界的,一個作業知道自己什麼時候完成了整個輸入的讀取,所以一個工作即便中間有幾個 task 是失敗的,但其**有界的(bounded)**的特性,讓 Batch Processing 最終總是會完成的。 https://ithelp.ithome.com.tw/articles/10273485 https://ithelp.ithome.com.tw/articles/10270750?sc=rss.iron