# Abstract
MapReduce 是一種程式設計模型及其實現,用於處理和生成大規模數據集。使用者指定一個Map函數,該函數處理key/value對以生成一組中間key/value對,還有一個Reduce函數,用於合併與相同中間key相關聯的所有中間值。許多現實世界的任務可以用這種模型表達
用這種函數式風格編寫的程序會自動在大型集群上的多台普通機器上並行執行。運行時系統處理輸入數據的分區、程序在多台機器上的排程、故障處理和跨機器通信管理。這使得沒有並行和分散式系統經驗的程式設計師也能輕鬆利用大型分散式系統的資源。
我們的MapReduce實現運行在大型集群上的普通機器上,並具有很高的可擴展性:一個典型的MapReduce計算處理許多TB的數據,並在數千台機器上執行。程式設計師發現這個系統易於使用:已有數百個MapReduce程序被實現,每天在Google的集群上執行超過一千個MapReduce作業。
# Introduction
過去五年來,作者及Google的許多人實現了數百個處理大規模原始數據的專用計算任務,例如抓取的文件、網頁請求日誌等,這些數據用來計算各種衍生數據,如倒排索引(inverted indices)、網頁文件圖結構的各種表示形式、每個主機抓取頁面的數量摘要、每日最頻繁的查詢集合等。大多數此類計算在概念上相對簡單,但由於輸入數據通常非常龐大,計算必須分布在數百或數千台機器上才能在合理的時間內完成。因此,如何並行化計算、分配數據以及處理故障的問題,使得原本簡單的計算變得複雜且充滿大量處理這些問題的代碼。
作為對這種複雜性的反應,我們設計了一種新的,使我們能夠表達我們試圖執行的簡單計算,但隱藏了並行化、容錯、數據分配和負載平衡的複雜細節。
我們的抽象受到 Lisp 和其他許多函數式語言中 map 和 reduce 原語的啟發。我們意識到,我們的大多數計算涉及對輸入的每個邏輯“記錄”應用 map 操作,以計算一組中間 key/value 對,然後對共享相同 key 的所有值應用 reduce 操作,以適當地合併衍生數據。我們使用用戶指定的 map 和 reduce 操作的函數式模型,使我們能夠輕鬆地並行化大型計算,並使用重新執行作為主要的容錯機制。
這項工作的主要貢獻是提供了一個簡單而強大的接口,使得大規模計算的自動並行化和分散式成為可能,並且我們的接口實現能在由商用PC組成的大型集群上實現高性能。
# Programming Model
這個計算模型接收一組輸入的 key/value 對,並生成一組輸出的 key/value 對。使用者通過兩個函數來表達計算:Map 和 Reduce。
- Map 函數:由使用者編寫,接收一個輸入對,並生成一組中間的 key/value 對。MapReduce 庫將所有與相同中間 key I 相關的中間值集合在一起,並將它們傳遞給 Reduce 函數。
- Reduce 函數:由使用者編寫,接收一個中間 key I 及其相關的一組值,將這些值合併成一個可能較小的值集合。通常,每次 Reduce 調用會生成零個或一個輸出值。中間值通過迭代器提供給使用者的 Reduce 函數,這允許處理超過記憶體大小的值列表。
## 2.1 Example
考慮計算大量文檔中每個單詞出現次數的問題。使用者可以編寫類似於以下的代碼:
```
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");
```
```
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
```
Map 函數輸出每個單詞及其出現次數(在此簡單示例中為'1')。Reduce 函數將所有為特定單詞輸出的次數相加。
此外,使用者還需要編寫代碼來填寫 MapReduce 規範對象(specification object),其中包括輸入和輸出文件的名稱以及可選的調優參數。然後,使用者調用 MapReduce 函數,傳遞這個規範對象。使用者的代碼與用 C++ 實現的 MapReduce 庫連接。附錄 A 包含此示例的完整程序文本。
## 2.2 Types
雖然前面的偽代碼是用字串輸入和輸出來編寫的,但概念上,使用者提供的 Map 和 Reduce 函數具有相關的類型:
```
map (k1, v1) → list (k2, v2)
reduce (k2, list (v2)) → list (v2)
```
即,輸入鍵和值來自於與輸出鍵和值不同的域。此外,中間鍵和值來自於與輸出鍵和值相同的域。
我們的 C++ 實現將字串傳遞給用戶定義的函數,並讓用戶代碼在字串和適當的類型之間進行轉換。
## 2.3 More Examples
以下是幾個簡單的示例,展示了如何使用 MapReduce 來實現有趣的計算。
1. 分散式 grep:Map 函數輸出匹配給定模式的行。Reduce 函數是一個身份函數,將中間數據複製到輸出中。
2. URL 訪問頻率統計:Map 函數處理網頁請求日誌,輸出〈URL, 1〉。Reduce 函數將相同 URL 的值相加,並輸出〈URL, 總次數〉對。
3. 反向網頁連結圖:Map 函數輸出〈target, source〉對於每個指向 target URL 的連結。Reduce 函數將與給定 target URL 相關的所有 source URL 列表合併,並輸出〈target, list(source)〉。
4. 每個主機的術語向量:術語向量總結了在文檔或文檔集中的重要詞彙及其頻率。Map 函數對每個輸入文檔輸出〈hostname, term vector〉對。Reduce 函數合併所有文檔的術語向量,丟棄不頻繁的術語,並輸出最終的〈hostname, term vector〉對。
5. 倒排索引:Map 函數解析每個文檔,輸出〈word, document ID〉對。Reduce 函數接受給定詞的所有對,對應的 document ID,並輸出〈word, list(document ID)〉對,這組輸出對形成簡單的倒排索引。
6. 分散式排序:Map 函數提取每條記錄的鍵,並輸出〈key, record〉對。Reduce 函數將所有對不變地輸出。這依賴於分區和排序功能。
# Implementation
可以有多種不同的 MapReduce 接口實現,具體選擇取決於環境。例如,一種實現可能適合小型共享記憶體機器,另一種適合大型 NUMA 多處理器,而另一種則適合更大規模的網路機器集合。
本節描述了針對 Google 廣泛使用的計算環境(由多個商用電腦組成的大型集群)的實現。在我們的環境中:
1. 機器通常是運行 Linux 的雙處理器 x86 處理器,每台機器有 2-4 GB 記憶體。
2. 使用商用網路硬體——通常每台機器的速度為 100 Mbps 或 1 Gbps,但整體雙向帶寬平均值顯著較低。
3. 集群由數百到數千台機器組成,因此機器故障很常見。
4. 儲存由直接附加到單個機器的廉價 IDE 硬盤提供。內部開發的分散式文件系統管理這些磁盤上存儲5. 的數據。該文件系統使用複製來在不可靠的硬件上提供可用性和可靠性。
6. 用戶將作業提交給調度系統。每個作業由一組任務組成,並由調度程序映射到集群內的一組可用機器上。
## 3.1 Execution Overview
Map 調用通過自動分割輸入數據來分配給多台機器處理。輸入分割可以由不同的機器並行處理。Reduce 調用通過將中間 key 空間分割成 R 塊來分配(例如,hash(key) mod R)。用戶指定分區數(R)和分區函數。

圖 1 顯示了我們實現中 MapReduce 操作的整體流程。當用戶程式調用 MapReduce 函數時,會發生以下一系列操作(圖中的編號標籤對應於下面列表中的編號):
1. MapReduce 庫首先將輸入文件分割成 M 塊,每塊通常為 16 到 64 MB(用戶可選的參數控制)。然後在集群中的多台機器上啟動多個程式副本。
2. 其中一個程式副本是特別的——即主伺服器。其餘的是工作節點,由主伺服器分配工作。有 M 個 map 任務和 R 個 reduce 任務需要分配。主伺服器選擇空閒工作節點,並分配給每個工作節點一個 map 任務或 reduce 任務。
3. 被分配 map 任務的工作節點會讀取相應的輸入分割的內容,解析輸入數據中的 key/value 對,並將每對傳遞給用戶定義的 Map 函數。Map 函數生成的中間 key/value 對會緩存在記憶體中。
4. 週期性地,緩存的對會寫入本地磁盤,並根據分區函數劃分成 R 區域。這些緩存對在本地磁盤上的位置會返回給主伺服器,主伺服器負責將這些位置轉發給 reduce 工作節點。
5. 當 reduce 工作節點收到主伺服器通知的這些位置時,它會使用遠程程序調用從 map 工作節點的本地磁盤讀取緩存數據。當 reduce 工作節點讀取了所有中間數據後,會按中間鍵排序,以便所有相同鍵的出現都被組合在一起。由於通常許多不同的鍵映射到相同的 reduce 任務,排序是必要的。如果中間數據量太大,無法放入記憶體,則使用外部排序。
6. reduce 工作節點遍歷排序後的中間數據,對於遇到的每個唯一中間鍵,將鍵和相應的一組中間值傳遞給用戶的 Reduce 函數。Reduce 函數的輸出會附加到最終輸出文件中。
7. 當所有 map 任務和 reduce 任務完成後,主伺服器喚醒用戶程式。此時,MapReduce 函數在用戶程式中返回結果。
成功完成後,MapReduce 執行的輸出在 R 個輸出文件中可用(每個 reduce 任務一個文件,文件名由用戶指定)。通常,用戶不需要將這些 R 個輸出文件合併成一個文件——他們通常將這些文件作為另一個 MapReduce 調用的輸入,或者從能夠處理分割成多個文件的分散式應用程序中使用它們。
## 3.2 Master Data Structures
主伺服器維護幾個資料結構。對於每個 map 任務和 reduce 任務,它儲存狀態(空閒、進行中或完成)以及工作機器的身份(對於非空閒任務)。
主伺服器是中間文件區域位置從 map 任務傳播到 reduce 任務的管道。因此,對於每個完成的 map 任務,主伺服器存儲由 map 任務生成的 R 個中間文件區域的位置和大小。這些位置和大小信息會在 map 任務完成時接收更新。信息會逐步推送給有進行中 reduce 任務的工作節點。
## 3.3 Fault Tolerance
由於 MapReduce 庫旨在利用數百或數千台機器處理非常大規模的數據,因此必須能夠優雅地處理機器故障。
- **工作節點故障**:主伺服器會定期 ping 每個工作節點。如果在一定時間內沒有收到工作節點的響應,主伺服器會將其標記為故障。該工作節點完成的任何 map 任務都會被重置為初始空閒狀態,從而可以重新調度到其他工作節點。類似地,故障工作節點上的任何進行中 map 任務或 reduce 任務也會被重置為空閒,並可以重新調度。
- **主伺服器故障**:可以讓主伺服器定期寫入數據結構的檢查點。如果主伺服器任務失敗,可以從最後的檢查點狀態重新啟動新副本。但是,由於只有一個主伺服器,其故障不太可能,因此我們當前的實現是當主伺服器失敗時中止 MapReduce 計算。用戶可以檢查這種情況並重試 MapReduce 操作。
## 3.4 Locality
在我們的計算環境中,網路帶寬是一種相對稀缺的資源。我們通過利用輸入數據(由 GFS 管理)存儲在集群機器的本地磁盤上來節約網路帶寬。GFS 將每個文件分割成 64 MB 的塊,並在不同的機器上存儲多個副本(通常是 3 副本)。MapReduce 主伺服器考慮到輸入文件的位置信息,並嘗試在包含相應輸入數據副本的機器上調度 map 任務。如果無法在包含數據的機器上調度,則嘗試在數據副本所在機器的網路交換機上調度 map 任務。當在集群中的大部分工作節點上運行大型 MapReduce 操作時,大部分輸入數據是從本地讀取的,消耗的網路帶寬為零。
## 3.5 Task Granularity
我們將 map 階段分割成 M 塊,reduce 階段分割成 R 塊,如上所述。理想情況下,M 和 R 應該遠大於工作節點的數量。讓每個工作節點執行許多不同的任務有助於動態負載平衡,並且在工作節點失敗時能夠更快地恢復:已完成的許多 map 任務可以分散到所有其他工作節點上。
在我們的實現中,M 和 R 的大小有實際限制,因為主伺服器必須做出 *O(M + R)* 的調度決策,並在記憶體中保留 O(M ∗ R) 的狀態,如上所述。(內存使用的常數因子很小:*O(M ∗ R)* 狀態部分大約由每個 map 任務/reduce 任務對應大約一字節的資料組成。)
此外,用戶常常會限制 R 的大小,因為每個 reduce 任務的輸出最終會放入一個單獨的輸出文件中。實際上,我們通常選擇 M 使得每個任務處理的輸入數據大約為 16 MB 到 64 MB(這樣可以最大限度地發揮本地化優化的作用),並使 R 成為我們預期使用的工作機器數量的幾倍。我們經常進行的 MapReduce 計算中,M = 200,000,R = 5,000,並使用 2,000 台工作機器。
## 3.6 Backup Tasks
一個常見的問題是,一台機器需要很長時間才能完成計算中的最後幾個 map 或 reduce 任務,我們稱之為“拖尾者”(straggler)。拖尾者可能由多種原因引起,例如機器上的硬碟問題、計算節點上過多的任務競爭 CPU、記憶體、本地硬碟或網路頻寬等。
我們有一種通用機制來減輕拖尾者問題的影響。當一個 MapReduce 操作接近完成時,主伺服器會為剩餘的進行中任務安排備份執行。當主執行或備份執行之一完成時,該任務標記為完成。我們已經調整了這個機制,使其通常只增加操作的計算資源使用量不超過幾個百分點。我們發現這顯著減少了完成大型 MapReduce 操作所需的時間。例如,在禁用備份任務機制的情況下,排序程式(見第 5.3 )完成時間增加了 44%。
# 4 Refinements
雖然僅編寫 Map 和 Reduce 函數的基本功能已經能滿足大多數需求,但我們發現了一些有用的擴展。在本節中進行描述。
## 4.1 Partitioning Function
MapReduce 使用者指定所需的 reduce 任務/輸出文件數量(R)。數據通過中間鍵上的分區函數分區到這些任務上。默認的分區函數使用hash(例如 **hash(key) mod R**),這通常會產生相當平衡的分區。在某些情況下,按鍵的其他函數分區數據是有用的。例如,有時候輸出鍵是 URL,我們希望所有來自單個主機的條目最終在同一輸出文件中。為了支持這種情況,MapReduce Library允許使用者提供特定的分區函數。例如,使用 **hash(Hostname(urlkey)) mod R** 作為分區函數會使來自同一主機的所有 URL 最終在同一輸出文件中。
## 4.2 Ordering Guarantees
我們保證在給定分區內,中間的 key/value 對按鍵的增序順序處理。這個順序保證使得生成每個分區的排序輸出文件變得容易,這在需要支援鍵的高效隨機訪問查詢的輸出文件格式中很有用,或者當輸出資料排序時更方便。
## 4.3 Combiner Function
在某些情況下,每個 map 任務生成的中間鍵有顯著的重複,而使用者指定的 Reduce 函數是可交換和結合的。這種情況下,Combiner 函數可以在網路傳輸之前進行部分資料合併,從而減少網路負載。
Combiner 函數在執行 map 任務的每台機器上運行。通常,同一段代碼用於實現 Combiner 和 Reduce 函數。Combiner 函數的輸出寫入中間文件,將發送到 reduce 任務,而 Reduce 函數的輸出寫入最終輸出文件。
部分合併顯著加快某些類別的 MapReduce 操作。附錄 A 包含一個使用 Combiner 的示例。
## 4.4 Input and Output Types
MapReduce Library提供了支持以多種格式讀取輸入資料的功能。例如,“文本”模式將每行視為一個 key/value 對:鍵是文件中的偏移量,值是行的內容。另一種常見的支持格式存儲按鍵排序的一系列 key/value 對。每種輸入類型實現都知道如何將自身分割成有意義的範圍以便作為單獨的 map 任務處理(例如,文本模式的範圍分割確保分割僅發生在行邊界)。使用者可以通過提供一個簡單的讀取器接口的實現來添加對新輸入類型的支援,但大多數使用者只使用少數pre-define的輸入類型之一。
讀取器不一定需要從文件讀取資料。例如,可以輕鬆定義一個從資料庫或記憶體映射資料結構中讀取記錄的讀取器。
同樣地,我們提供一組輸出類型來以不同格式生成資料,並且用戶代碼可以輕鬆添加對新輸出類型的支援。
## 4.5 Side-effects
在某些情況下,MapReduce 的使用者發現生成輔助文件作為 map 和/或 reduce 操作的附加輸出是方便的。我們依賴應用程序編寫者使這些副作用是原子的且可重複的。通常,應用程序會寫入臨時文件,並在完全生成後原子地重命名該文件。
我們不支持單個任務生成的多個輸出文件的原子性二階段提交。因此,具有跨文件一致性要求的任務應該是確定性的。這種限制在實踐中從未成為問題。
## 4.6 Skipping Bad Records
有時,使用者代碼中存在錯誤,導致 Map 或 Reduce 函數在某些記錄上崩潰。這些錯誤會阻止 MapReduce 操作完成。通常的做法是修復錯誤,但有時這不可行;例如,錯誤可能在無法獲得source code 的第三方庫中。此外,有時忽略少量記錄是可以接受的,例如在對大數據進行統計分析時。我們提供了一種可選的執行模式,其中 MapReduce 庫檢測導致崩潰的記錄,並跳過這些記錄以繼續前進。
每個工作過程安裝一個信號處理程序來捕捉段違規和總線錯誤。在調用使用者 Map 或 Reduce 操作之前,MapReduce 庫會將參數的序列號存儲在全局變數中。如果使用者代碼生成訊號,訊號處理程序會發送一個包含序列號的“最後一口氣”UDP包給 MapReduce 主伺服器。當主伺服器在特定記錄上看到多次失敗時,它會指示在下次重新執行相應的 Map 或 Reduce 任務時跳過該記錄。
## 4.7 Local Execution
在分散式系統中調試 Map 或 Reduce 函數可能很棘手,因為實際計算在數千台機器上發生,並且工作分配決策由主伺服器動態做出。為了幫助調試、分析和小規模測試,我們開發了一個替代的 MapReduce 庫實現,該實現順序執行 MapReduce 操作的所有工作。用戶可以通過特殊標誌調用他們的程序,然後輕鬆使用任何他們覺得有用的調試或測試工具(例如 gdb)。
## 4.8 Status Information
主伺服器運行內部 HTTP 伺服器,並導出一組狀態頁面供人類查看。這些狀態頁面顯示計算的進度,例如已完成的任務數量、正在進行的任務數量、輸入字節數、中間數據字節數、輸出字節數、處理速率等。這些頁面還包含指向每個任務生成的標準錯誤和標準輸出文件的鏈接。用戶可以使用這些數據來預測計算需要多長時間,以及是否應該向計算中添加更多資源。這些頁面還可以用來確定計算比預期慢很多時的原因。
此外,頂層狀態頁面顯示哪些工作節點失敗了,以及它們在失敗時正在處理的 map 和 reduce 任務。這些信息在試圖診斷使用者代碼中的錯誤時很有用。
## 4.9 Counters
MapReduce 庫提供了一個計數器功能,用於計數各種事件的發生次數。例如,用戶代碼可能希望計數處理的單詞總數或索引的德語文件數量等。
要使用此功能,用戶代碼創建一個命名計數器對象,然後在 Map 和/或 Reduce 函數中適當地增加計數。例如:
```
Counter* uppercase;
uppercase = GetCounter("uppercase");
map(String name, String contents):
for each word w in contents:
if (IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w, "1");
```
個別工作節點上的計數器值會定期傳遞給主伺服器(隨 ping 回應一起)。主伺服器聚合來自成功的 map 和 reduce 任務的計數器值,並在 MapReduce 操作完成時將其返回給用戶代碼。當前的計數器值也顯示在主伺服器狀態頁面上,以便人類可以觀察實時計算的進度。在聚合計數器值時,主伺服器消除了同一 map 或 reduce 任務的重複執行影響,以避免重複計數。(重複執行可能來自我們使用的備份任務和因故障重新執行的任務。)
一些計數器值由 MapReduce 庫自動維護,例如處理的輸入 key/value 對數量和生成的輸出 key/value 對數量。
使用者發現計數器功能對於檢查 MapReduce 操作的行為是否正常很有用。例如,在某些 MapReduce 操作中,用戶代碼可能希望確保生成的輸出對數量與處理的輸入對數量完全相等,或者處理的德語文件的比例在某個容忍範圍內。
# 5 Performance
我們測量了 MapReduce 在大型集群上運行的兩個計算的性能。這兩個計算包括:搜索大約1TB資料以查找特定模式,以及排序大約1TB的資料。
這兩個程式代表了 MapReduce 使用者編寫的真實程式中的一大部分——一類程式將資料從一種表示轉換為另一種表示,另一類程式從大資料集中提取少量有趣資料。
## 5.1 Cluster Configuration
所有程式在一個由約 1800 台機器組成的集群上執行。每台機器配備兩個 2GHz Intel Xeon 處理器(啟用了超執行緒技術),4GB 記憶體,兩個 160GB IDE 磁碟,以及一個giga bits Ethernet link。這些機器排列成two-level tree-shaped switch 網路,根部提供約 100-200 Gbps 的總頻寬。所有機器都位於同一托管設施內,因此任何兩台機器之間的往返時間小於一毫秒。
在 4GB 記憶體中,大約 1-1.5GB 保留給集群中運行的其他任務。程式在周六下午執行,此時 CPU、磁碟和網路大多數處於空閒狀態。
## 5.2 Grep
grep 程式掃描 1010 個 100 Bytes的記錄,搜索一個相對罕見的三字符模式(該模式出現在 92,337 個記錄中)。輸入資料分割成大約 64MB 的塊(M = 15000),整個輸出放入一個文件(R = 1)。

圖 2 顯示了計算隨時間的進度。Y 軸顯示了掃描輸入資料的速率。隨著更多機器分配給這個 MapReduce 計算,速率逐漸上升,當 1764 個工作節點被分配時,速率達到峰值超過 30 GB/s。隨著 map 任務完成,速率開始下降,大約 80 秒後達到零。整個計算從開始到結束大約需要 150 秒,包括約一分鐘的啟動開銷。這些開銷主要是將程式傳播到所有工作機器以及與 GFS 互動以打開一組 1000 個輸入文件並獲取所需的本地化優化訊息所需的延遲。
## 5.3 Sort
sort 程式對 1010 個 100 字節的記錄(大約 1 兆字節資料)進行排序。該程式模仿 TeraSort 基準測試。
排序程式包含不到 50 行使用者代碼。三行 Map 函數從文本行中提取 10 字節的排序鍵,並輸出該鍵和原始文本行作為中間的 key/value 對。我們使用內建的 Identity 函數作為 Reduce 運算元。這個函數將中間的 key/value 對不變地作為輸出 key/value 對。最終的排序輸出寫入一組 2-way複製的 GFS 文件(即程式輸出為 2 terabytes)。
如前所述,輸入資料分割成 64MB 的大小(M = 15000)。我們將排序輸出分成 4000 個文件(R = 4000)。分區函數使用鍵的初始字節將其分割成 R 塊。
我們的基準測試分區函數具有鍵分佈的內部知識。在一般排序程式中,我們會添加預通過的 MapReduce 操作,該操作會收集鍵的樣本,並使用樣本鍵的分佈來計算最終排序階段的分割點。

圖 3(a)顯示了正常執行 sort 程式的進度。左上圖顯示了讀取輸入的速率。速率在約 200 秒內達到約 13 GB/s 的峰值並迅速下降,因為所有 map 任務在 200 秒內完成。請注意,輸入速率低於 grep,因為 sort 的 map 任務花費約一半的時間和 I/O 頻寬將中間輸出寫入本地磁碟。grep 的相應中間輸出大小可以忽略不計。
左中圖顯示了從 map 任務到 reduce 任務的資料通過網路傳輸的速率。當第一個 map 任務完成時,資料交換開始。圖中的第一個峰值對應於大約 1700 個 reduce 任務(整個 MapReduce 被分配到大約 1700 台機器上,每台機器同時執行一個 reduce 任務)。大約 300 秒後,第一批 reduce 任務完成,我們開始傳輸剩餘的 reduce 任務數據。所有數據傳輸在計算約 600 秒時完成。
左下圖顯示了 reduce 任務將排序數據寫入最終輸出文件的速率。第一批數據傳輸結束與寫入開始之間有一段延遲,因為機器忙於排序中間數據。寫入以大約 2-4 GB/s 的速率持續進行。一切寫入在計算約 850 秒時完成。包括啟動開銷,整個計算需要 891 秒。這與當前最佳報告的 TeraSort 基準測試結果(1057 秒)相似。
需要注意幾點:輸入速率高於資料交換速率和輸出速率,這是因為我們的本地化優化——大部分資料從本地磁碟讀取,繞過了頻寬受限的網路。資料交換速率高於輸出速率,因為輸出階段寫入了排序數據的兩個副本(我們為了可靠性和可用性做了兩個副本)。我們寫入兩個副本是因為底層文件系統提供的可靠性和可用性機制。寫入資料的網路頻寬需求如果底層文件系統使用清除編碼而不是複製會降低。
## 5.4 Effect of Backup Tasks
圖 3(b)顯示了禁用備份任務執行 sort 程式的進度。執行流與圖 3(a)類似,除了有一個很長的尾部,幾乎沒有寫入活動。960 秒後,除了 5 個 reduce 任務外,所有任務都完成。然而,這最後幾個拖尾者花了 300 秒才完成。整個計算需要 1283 秒,增加了 44% 的時間。
## 5.5 Machine Failures
圖 3(c)顯示了我們故意在計算中殺死 200 個工作程序(從 1746 個工作程序中選出)的 sort 程式執行。底層集群調度系統立即在這些機器上重新啟動新的工作程序(因為只是程序被殺死,機器仍在正常工作)。
工作程序的死亡表現為負輸入速率,因為一些先前完成的 map 工作消失了(對應的 map 工作程序被殺死),需要重新執行。重新執行這些 map 工作相對較快。整個計算在 933 秒內完成,包括啟動開銷(只比正常執行時間增加了 5%)。
# 結論
MapReduce 程式設計模型在 Google 被成功地應用於許多不同的用途。我們將這種成功歸因於以下幾個原因。首先,該模型易於使用,即使對於沒有平行和分散式系統經驗的程式設計師也是如此,因為它隱藏了平行化、容錯、本地化優化和負載平衡的細節。其次,各種各樣的問題都可以輕鬆地表達為 MapReduce 計算。例如,MapReduce 被用於生成 Google 生產網路搜尋服務的資料、排序、資料挖掘、機器學習以及許多其他系統。第三,我們開發了一種實現,這種實現可以擴展到由數千台機器組成的大型集群。這種實現有效地利用了這些機器資源,因此適用於 Google 遇到的許多大型計算問題。
我們從這項工作中學到了幾件事。首先,限制程式設計模型使得平行化和分散計算變得容易,並使這些計算具有容錯能力。其次,網路頻寬是一種稀缺資源。因此,我們系統中的許多優化措施旨在減少跨網路傳輸的資料量:本地化優化允許我們從本地磁碟讀取數據,而將中間數據的單個副本寫入本地磁碟則節省了網路頻寬。第三,冗餘執行可以用來減少慢速機器的影響,並處理機器故障和資料丟失。
關鍵點
- 易於使用:隱藏了平行化、容錯、本地化優化和負載平衡的細節,即使對於沒有相關經驗的程式設計師也是如此。
- 廣泛應用:適用於多種問題,如生成網路搜尋服務資料、排序、資料挖掘、機器學習等。
- 高擴展性:能夠擴展到由數千台機器組成的大型集群,有效利用機器資源。
- 限制模型的好處:使平行化和分散計算變得容易,並具備容錯能力。
- 網路頻寬優化:針對減少跨網路傳輸的數據量進行優化,如本地化優化和將中間數據寫入本地磁碟。
- 冗餘執行:用來減少慢速機器的影響,並處理機器故障和數據丟失。