# 5-1: MapReduce: Simplified Data Processing on Large Clusters
## Abstract
* MapReduce是一個分散式計算的框架,它的特點是user只需要定義自己的map與reduce function就可以輕易的將程式轉變為分散式計算的形式,而不需要去在意分散式系統需要解決的問題(例如: consistency、scalability、machine failure等等)。
* MapReduce具有高度的scalability,可以同時管理數千台機器與高達數TB的資料。
## 1. Introduction
* 設計這個系統的原因:
* google有一系列邏輯非常簡單的任務要處理,但這些任務要處理的輸入規模都非常巨大,如果對於每個任務都分別implement關於分散式計算的邏輯的話,code會非常冗餘且難以維護。
* 因此,google想要設計一種抽象層面的框架,使得使用者可以不需要在乎分散式系統的細節,而是把它當成是一台電腦來使用。這樣一來即使是完全不懂分散式系統的使用者也可以輕鬆的將自己的code透過這個框架將其底層從一般的計算轉換成分散式計算。
* contributions:
* 設計一個分散式計算的interface。
* 將該interface implement在大型商用的電腦叢集上,並證明其效能。
## 2. Programming Model
* 使用者需要定義自己的map與reduce函式。
* map:
* 接收原始輸入並產生一系列的中間鍵值,reduce函式會使用這些中間鍵值並產生最終的結果。
* reduce:
* 會使用map產生的這些中間鍵值,並將同樣key的value進行合併變成一個list,並利用這個key與value list產生最終的輸出檔案。
* 實際上reduce的value list是一個iterator,原因是value list包含的元素可能很多,memory不一定可以裝的下,所以使用iterator來遍歷整個list。
### 2.1 Example
* 問題描述:
* 從大量的文檔當中統計每一個word出現的次數。
* 對應的map與reduce函式:

* 其中map的功能是計算出一個文檔中的每一個word的出現次數,reduce的功能則是對於每個key(某個word),合併其在各個文檔出現的次數,並輸出總出現次數。
### 2.2 Types
說明input的型別不需要與output一樣,只需要程式邏輯滿足下列的抽象就可以。

### 2.3 More Examples
給出一些example用MapReduce該如何撰寫。
> * 分佈式「grep」:如果一行文字符合給定的模板,那麼map函數會輸出該行。reduce作為一個恆等函數,它只將提供的中間資料複製到輸出。
> * URL存取頻率計數:map函數處理web網頁請求日誌,並依照$<URL,1>$輸出。reduce函數對$URL$相同的值求和,並輸出$<URL,Sum>$鍵值對。
> * 反轉web連結拓樸圖:map函數對名為$source$的頁面中每個名為$Target$的URL連結輸出一個$<Target,Source>$鍵值對。reduce函數依照所有$Target$相同的$Source$合併為一個列表,並與其對應的URL關聯,輸出$<Target,list(Source)>$鍵值對。
> * 每個主機的詞向量統計:詞向量是對是對一個或一系列文件中最重要的詞的總結,其形式為$<Word,Frequency>$鍵值對列表。map函數為每篇輸入文檔輸出一個$<HostName,Term\ Vector>$鍵值對(其中$HostName$由文檔的URL解析而來)。reduce函數會受到對於給定的主機上每篇文章的所有的字向量。其將這些詞向量加在一起,丟棄掉低頻詞,最終輸出$<HostName,Term\ Vector>$鍵值對。
> * 倒排索引:map函數對每篇文檔進行擷取,輸出一個$<Word, DocumentID>$的序列。reduce函數接受給定字的所有鍵值對,並依照$文件ID$排序。輸出一個$<Word, list(DocumentID)>$鍵值對。所有輸出的鍵值對的集合組成了一個簡單的倒排索引。如果需要持續追蹤詞的位置,僅需簡單的增量計算。
> * 分散式排序:map擷取每筆記錄中的鍵,輸出一個$<Key, Record>$的鍵值對。reduce函數不會對中間變數作修改直接輸出所有的鍵值對。排序計算依賴章節4.1介紹的分區機制和章節4.2介紹的排序屬性。
## 3. Implementation
* MapReduce的實現會受到不同的電腦叢集環境影響其效能,本篇論文實現的MapReduce適合用在大型商用叢集上面,環境設置如下:
:::info
1. 機器通常使用雙核心x86處理器,2-4GB內存,運行Linux系統。
2. 使用商用網路硬體:每台機器頻寬通常為100Mbps或1Gbps,但平均分到的頻寬通常為100Mbps或1Gbps,但平均分到的頻寬則小得多。 (譯註:可能受交換機間頻寬限制,每台機器平均分到的頻寬遠小於其單機頻寬。)
3. 一個集群由成百上千的機器組成,因此機器故障是常態。
4. 儲存由直接連接到獨立的機器上IDE(譯註:本文IDE指集成設備電路Intergated Drive Electronics)磁碟提供。我們為了管理這些磁碟上的數據,開發了一個內部的分散式檔案系統[8]。此檔案系統使用副本的方式在不可靠的硬體上提供了可用性和可靠性。
5. 使用者將工作(job)提交到一個調度系統。每個工作由一系列的任務(task)組成,這些任務被scheduler(調度器)映射到集群中一系列可用的機器上。
:::
### 3.1 Execution Overview
* 輸入資料會被分成 $M$ 個split,也就代表會有 $M$ 個map任務需要被執行,這樣就可以將輸入拆成可進行分散式運算的形式(每個map任務處理 $Data/M$ 這麼多的資料)。
* reduce也同樣被分為 $R$ 個任務,map任務產生的中間鍵值會透過partition function來決定其歸屬哪一個reduce任務,partition function的一個例子可以是 $hash(key)\ mod\ R$。
* 系統流程圖如下:

:::info
1. 使用者程式中的MapReduce函式庫首先將輸入檔劃分為$M$個分片,通常每個分片為16MB到64MB(使用者可透過可選參數控制)。隨後,庫會在叢集中的機器上啟動程式的一些副本。
2. 在這些程式的副本中,有一份很特殊,它是master副本。其他的副本是被master分配了任務的worker副本。總計要分配$M$個map任務和$R$個reduce任務。 master選取閒置的worker並為每個選取的worker指派map或reduce任務。
3. 被指派map任務的worker從輸入資料分片中讀取內容。其解析輸入資料中的鍵值對,並將每個鍵值對傳給使用者定義的map函數。map函數輸出的中間鍵值對在記憶體中快取。
4. 記憶體中快取的鍵值對會定期寫入本機磁碟,寫入的資料會被分區函數劃分為$R$個區域。這些在磁碟中快取的鍵值對的位置會被傳送給master,master會將這些位置資訊進一步傳遞給reduce worker。
5. 當master通知reduce worker中間鍵值對的位置資訊後,reduce worker會透過遠端程序呼叫(翻譯:即RPC。)的方式從map worker的本機磁碟中讀取快取的資料。當reduce worker讀取完所有中間資料後,它會對中間資料按照鍵進行排序,以便將所有鍵相同的鍵值對分成一組。因為通常來說,需對鍵不同的資料會被對應到同一個reduce任務中,所以需要對資料排序。如果中間資料總量過大以至於無法放入記憶體中,則會使用外排序演算法(external sort)。
6. reduce worker遍歷每一個遇到的中間鍵值對的,它會將鍵和該鍵對應的一系列值傳遞給使用者定義的reduce函數。reduce函數的輸出會被追加(append)到該reduce分區的最終輸出檔。
7. 當所有的map和reduce任務都執行完畢後,master會喚醒使用者程式。此時,呼叫MapReduce的調應用序會回到使用者程式碼中。
:::
* 幾個重點:
* 整個系統由master管理,master會schedule任務給worker,而worker在完成任務後也會將該任務的輸出檔案位置回傳給master。
* 如果**master死掉了系統無法自行偵測**,需要靠使用者去重新執行MapReduce。
* **中間鍵值是被儲存在執行map任務的那台機器的local disk上**,也就是如果那台worker死掉了,該map任務必須要被重新執行。
* **reduce的輸出是存放在stable file system上**,也就是在完成reduce任務後,就算該worker死掉也不需要重新執行該reduce任務。
* 整個MapReduce任務結束後,**會產生$R$個輸出檔案**,可以直接讀取,也可以當成下一個MapReduce的輸入。
* **無論是要執行map與reduce任務,都必須master去通知worker**,這裡有一個潛在的問題是當任務數量很多時,master可能會永遠處於busy狀態。
> * 當有map任務完成時,該worker會向master發送一條帶有$R$個臨時檔案名稱的訊息。如果master收到了一個已經完成過的map任務的完成訊息,master會忽略該訊息。否則,master會在其資料結構中記錄這$R$個檔案的檔案名。
> * 當有一個reduce任務完成時,該worker會自動地將其暫時輸出檔案重新命名為永久的檔案名稱。如果同一個reduce任務在多台機器中執行,會出現多個重命名呼叫將檔案重命名同一個永久檔案名稱的情況。我們依賴下層檔案系統提供了原子性重命名操作,來確保最終的檔案系統中僅包含來自一次reduce任務輸出的資料。
### 3.2 Master Data Structures
* 對於每個map和reduce任務,master會儲存其狀態,狀態有以下三種:
* 等待中 ( $idle$)
* 執行中($in\text{-}progress$)
* 完成($completed$ )
* 對於每個已完成的map任務,master會儲存其輸出的$R$個中間文件區域的位置,並將其傳給對應的reduce任務。
### 3.3 Fault Tolerance
#### 3.3.1 Worker Failure
* master會定期ping各個worker來確認該worker是否還活著。
* 如果worker死了,master會做出以下操作:
* 將該worker上執行的所有map任務標為$idle$,無論是已完成的還是未完成的。而reduce任務則是$in\text{-}progress$的才需要被標為$idle$。
* 被標示$idle$的任務會再被繼續schedule給空閒的worker執行。
* 任何使用到這些map任務的reduce任務在這些map任務被重新執行後,會被告知要從新的worker那裡拿取資料。
* 這個方式讓MapReduce處理worker failure變得很簡單,就只需要將該任務assign給新的worker並重新執行就好,在發生大規模的機器故障的時候可以很輕易地做到error recovery。
#### 3.3.2 Master Failure
* 會定期將master的資料保存在stable file system,作為checkpoint使用。
* 如果master故障需要依靠使用者自行偵測並重啟MapReduce。
#### 3.3.3 Semantics in the Presence of Failures
* 對於deterministic的map與reduce function,MapReduce保證其執行一定跟sequential執行得到一樣的結果。
* 但對於nondeterministic的map與reduce function,MapReduce只能提供較弱的語意保證。
* 考慮這個情境,假設有一個map任務$M1$與其對應的兩個reduce任務$R1、R2$,其中一個$R1$先完成,而另一個$R2$在還沒讀取完$M1$資料時$M1$死掉了,那另一個map任務$M2$會被執行,這時候如果map function是nondeterministic,$R1、R2$就可能就會讀到不同次map任務的結果。
### 3.4 Locality
* master會考量輸入檔案的位置,盡可能的將使用該檔案的map任務分配在該機器上,如果沒辦法,會盡可能的分配在離該機器近一點的機器上執行,以節省網路頻寬。
### 3.5 Task Granularity
* 每個MapReduce任務的$M$與$R$應該遠大於機器的數量,這樣才可以充分利用每台機器的效能。
> 翻譯:否則,考慮$M$小於worker機器數的情況,每個worker上只有一個任務,如果一個worker故障,那麼該worker中完成的任務只能在另一台worker機器上重跑,無法充分利用並行的性能)。
* 對於master本身的複雜度分析:
* 做出 $O(M + R)$ 個schedule。
* 保存 $O(M \times R)$ 個state在記憶體中,這邊應該是指對於每個map任務需要儲存$R$個中間鍵值檔的位置,所以整體空間複雜度才是 $O(M \times R)$。
* 最終選擇的參數:
> $2,000$台機器上選擇$M=200,000$、$R=5,000$的參數執行MapReduce計算。
### 3.6 Backup Tasks
* 造成MapReduce操作緩慢的其中一個原因是離群者( $straggler$ ),意思是有些task相對於其他正常的task執行異常的慢,原因可能如下:
> * CPU、記憶體、本地磁碟或網路頻寬
> * google遇到的罕見問題:
> 機器初始化程式碼中的一個bug,其導致了處理器快取被停用,受影響的機器上的計算慢了超過100倍。
* 解決方式是當該MapReduce快執行完時,系統針對還沒執行完的任務再schedule幾個worker去做,這些tasks被稱為backup tasks。當backup tasks與正常task其中之一執行完畢,該任務就被視為已完成。利用這些backup tasks來解決$straggler$的問題。
> 我們發現這個機制顯著地減少了完成大型MapReduce操作的時間。例如,章節5.3中的排序程序在停用任務副本機制時,完成時間延長了44%。
## 4. Refinements
### 4.1 Partitioning Function
* partition function的功能是用來決定中間鍵值應該要分配到哪個reduce任務,預設的partition function是 $hash(key)\ mod\ R$。
* 在某些情況指定特定的partition function會讓系統效能更好,或更符合使用者的需求。
* 假設中間鍵值是一連串的$<URL, Frequency>$,而使用者希望有相同hostname的url被分在同一個output file裡面,這時候使用者可以傳入自訂的partition function來達成這件事,以這個例子來說,partition function是 $hash(hostname(key))\ mod\ R$。
* 有一個潛在的風險是partition function選的不好可能會導致某一個reduce任務要處理大多數的資料,讓程式從分散式計算退化成sequntial。
### 4.2 Ordering Guarantees
* MapReduce保證在同一個reduce任務中,他拿到的資料是升序排序的,這同時也保證了最終的輸出文件會是照升序排序的。
> MapReduce框架保證在同一個中間結果分區內,即同一個reduce任務內,中間結果資料是按照鍵的升序處理的,因為reduce任務處理前會先將中間結果資料按照鍵進行排序。這樣在reduce任務處理完成後,最終結果文件內的資料也是按照鍵的順序排序的,這就有利於對最終結果文件按鍵進行高效的隨機查找,或方便其他需要排好序的資料的場景。
### 4.3 Combiner Function
* 可以定義一個combiner function,通常其內容會與reduce function一樣,該function的功能是可以先merge中間鍵值對,令其變成一個較小的檔案,以節省網路頻寬的使用。
* 假設有一個中間鍵值檔案包含了$10^9$個$<URL, 1>$,原始方法會將這個檔案直接傳送給reduce任務,那就會使用很大的網路頻寬,但如果是有了combiner function的話,combiner function會在本地將其merge成$<URL, 10^9>$再傳送給reduce任務,這大大減少了網路頻寬的使用,也加快了整體MapReduce任務的速度。
### 4.4 Input and Output Types
* 使用者可以寫自訂的Reader,讓輸入不再局限於檔案,可以從以下形式輸入:
* 資料庫
* 記憶體
* 輸出也是同理。
### 4.5 Side-effects
* MapReduce不支援在map與reduce任務執行時產生額外的輸出檔案,使用者必須自己去handle這件事情。
> 我們沒有對一個任務生產多個輸出檔提供原子性的兩段提交協議(two-phase commits,2PC)支援。因此,產生多個輸出檔案且有跨文件一致性需求的任務應該具有「確定性(譯註,如章節3.3.3)」。但在實際環境中,這項限制並不是什麼問題。
### 4.6 Skipping Bad Records
* 有些使用者程式的bug會導致其在處理某些紀錄上必定會失敗,因此MapReduce提供了一種模式可以跳過這些壞掉的紀錄。而通常在大數據的統計上,跳過少數紀錄對於結果而言無傷大雅。
* 具體實現:
> 每個worker程序會安裝一個捕捉段違規(segmentation violation)和匯流排錯誤(bus error)的處理器。再呼叫使用者的map或reduce操作之前,MapReduce函式庫會在全域變數中儲存參數的編號。如果使用者程式碼產生了一個訊號,訊號處理器會傳送一個含有該編號的「last gasp(奄奄一息)」UDP套件給master。當master在同一筆記錄上收到超過一個故障時,master會在下次重新執行相關map任務或reduce任務時指示跳過該記錄。
### 4.7 Local Execution
* 由於分散式系統本身的複雜性,該論文開發了一個可以在單台電腦上模擬MapReduce的執行環境,以幫助使用者更好的進行測試和除錯。
### 4.8 Status Information
* master上會有一個http的伺服器,該伺服器會展示當前MapReduce的狀況,包含:
* 多少個任務已經完成
* 多少個任務正在執行
* 輸入的位元組數
* 中間資料的位元組數
* 輸出的位元組數
* 處理速度
* 該監控頁面可以幫助使用者了解當前狀況與估算任務大約需要執行多長時間,我把它理解成雲端的監控頁面。
### 4.9 Counters
* > MapReduce函式庫提供了用來計數不同事件發生次數的計數器
* 考慮以下情境,想要計算一群document中,各個word的frequncy與首字大寫的word的出現次數,就可以使用以下的程式碼做到這件事。
```
Counter* uppercase;
uppercase = GetCounter("uppercase");
map(String name, String contents):
for each word w in contents:
if (IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w, "1");
```
* > 有些計數器的值會被MapReduce函式庫自動維護,例如處理過的輸入鍵值對的數量或產生的輸出鍵值對的數量。
* MapReduce會自己解決因為重複執行任務而導致的Counter重複計算,重復執行的原因可能是因為worker failure或backup tasks。
## 5. Performance
* 主要測試了兩個任務:
* 在1TB的資料中搜尋符合模板的紀錄。
* 對1TB的資料做排序。
### 5.1 Cluster Configuration
* 叢集資訊:
* 1800台機器。
* 每台機器有兩個開啟hyper threading的 $2$ Ghz Intel Xeon processors及4GB的memory與兩個160GB的硬碟。
* 100-200Gbps的網路。
* > 在4GB記憶體中,有大約1~1.5GB記憶體被叢集為了運行其他任務保留
* > 所有機器都在同一個中心託管,因此任何兩個機器間往返延遲(RTT)小於1ms
* 程式執行時CPU、磁碟與網路處於空閒狀態。
### 5.2 Grep
* 輸入: $10^{10}$ 筆100byte的record,$M = 15000, R = 1$
* 
> 圖2展示了計算進度隨時間的變化。 Y軸展示了輸入資料被掃描的速率。隨著分配給MapReduce計算的機器越來越多,其速度也逐漸提高。當有1764個worker被分配到該任務時,速率峰值超過了30GB/s。當map任務完成時,速率開始逐漸下降並在整個計算時間的大概第80s時下降到0。整個計算從開始到結束大概消耗了150s。這包括了大概一分鐘的啟動時間開銷。這項開銷的原因是程式需要傳播到所有worker機器與開啟1000個輸入檔並取得局部最佳化所需的資訊時與GFS互動的延遲。
### 5.3 Sort
* 輸入: $10^{10}$ 筆100byte的record,$M = 15000, R = 4000$
* 
* 左上角的圖表對應到的是map任務,可以觀察到map任務都在大概第200秒前完成。
* 左中則是代表資料從map任務轉移到reduce任務的過程,他會從第一個map任務完成就開始做,所有任務在大概第600秒時完成。
* 左下代表reduce任務將輸出檔寫入到stable file system的時間,所有任務在大概第850秒時完成,算上啟動開銷的話是891秒。
> 這邊會使用到網路頻寬的原因是因為底層的的檔案系統是GFS(Google File System),是google開發的分散式檔案系統,他提供data replica的功能,所以在寫入時會自動寫入另一份副本到不同的機器上,所以需要使用網路頻寬。
### 5.4 Effect of Backup Tasks
* 圖3(b)展示的是沒有做backup tasks的系統,總計算時間在1283秒,比有backup tasks的增加了44%的時間。
### 5.5 Machine Failures
* 圖3(c)是模擬機器故障的情況,故意kill掉200個task,總計算時間在933秒,只比正常的系統增加了5%的時間。
## 6. Experience
* 描述google在開發MapReduce的經驗,比較不重要可以跳過。
* 有興趣可以參考 https://blog.mrcroxx.com/posts/paper-reading/mapreduce-osdi04/#6-%E7%A0%94%E5%8F%91%E7%BB%8F%E5%8E%86
## 7. Related Work
* 描述前人的經歷,也可以跳過。
* 有興趣可以參考 https://blog.mrcroxx.com/posts/paper-reading/mapreduce-osdi04/#7-%E7%9B%B8%E5%85%B3%E5%B7%A5%E4%BD%9C
### 8. Conclusions
* MapReduce成功的原因:
* > 因為MapReduce框架隱藏了並行化、容錯、本地最佳化和負載均衡的細節。
* > 很多不同的問題都可以被表示為MapReduce計算。例如,MapReduce在Google的生產系統的web搜尋服務、排序、資料探勘、機器學習和許多其他系統中都被當作資料產生工具使用
* > 我們開發了一個適用於由上千台機器組成的大型叢集的MapReduce實作。此實作可以有效利用這些機器的資源,因此其非常適用於Google中的大型運算問題。
* 在開發過程學習到的經驗:
* 適當的限制可以讓開發分散式計算的程式更加簡單。
* 網路頻寬很重要,要把它當成是一個稀缺的資源來使用(對應到[3.4 Locality](https://hackmd.io/BNu_Tp0PRkud_1AvKttrpg?view#34-Locality))。
* backup tasks可以大幅加速任務的執行時間。
## Reference
1. https://static.googleusercontent.com/media/research.google.com/zh-TW//archive/mapreduce-osdi04.pdf 論文原文
2. https://blog.mrcroxx.com/posts/paper-reading/mapreduce-osdi04/ 論文中文翻譯
3. https://superlova.github.io/2021/05/04/%E3%80%90%E8%AE%BA%E6%96%87%E9%98%85%E8%AF%BB%E7%AC%94%E8%AE%B0%E3%80%91MapReduce-Simplified-Data-Processing-on-Large-Clusters/ 論文筆記
4. https://zhuanlan.zhihu.com/p/466882580 論文筆記
5. https://github.com/Vonng/ddia/blob/master/zh-tw/ch10.md $Designing\ Data\text{-}Intensive\ Applications$ 第十章