# MapReduce
contributed by <`Sean1127`>,<`baimao8437`>,<`Lukechin`>,<`xdennisx`>,<`paul5566`>,<`chenweiii`>
---
# 相關連結
[GitHub](https://github.com/Sean1127/mapreduce)
[YouTube]()
# 預期目標
延續上學期 ["MapReduce with POSIX Thread"](https://hackmd.io/s/Hkb-lXkyg) 的成果,強化效能和應用案例
# 背景介紹
Mathias Brossard 在 5, Oct, 2016 完成的 "A simple C thread pool implementation" 為最原始的版本,包含`threadpool.c`,`threadpool.h`的實做程式碼與`heavy.c`,`shutdown.c`,`thrdtest.c`測試程式碼
## threadpool 版本
[GitHub](https://github.com/Sean1127/mapreduce/tree/169d20f326772492a836c0d2acd6d1de985f002d)
* `threadpool.[ch]`
* `threadpool_create`: create threadpool_t object
* `threadpool_add`: add a new task in the queue of a threadpool
* `threadpool_destroy`: stop and destroy a threadpool_t object
* `threadpool_free`: free memory
* `threadpool_thread`: worker thread, infinite loop that grabs a task from threadpool queue and executes it
* [`test/`](https://hackmd.io/s/Hkb-lXkyg#test)
* [`heavy.c`](https://hackmd.io/s/Hkb-lXkyg#heavyc)
64 thread pool,每個 pool 的 queue size 8192,每個 pool thread count 4
* [`shutdown.c`](https://hackmd.io/s/Hkb-lXkyg#shutdownc)
* [`thrdtest.c`](https://hackmd.io/s/Hkb-lXkyg#thrdtestc)
此版本需要修正的地方有:
1. `test/` 4 個測試程式都有 memory leak
2. threadpool queue full 情況未處理
## mapreduce support 版本
[git commit](https://github.com/Sean1127/mapreduce/commit/b9675af0fd2022d4ebe50e5de6b6dfb5088cb1f3)
mapreduce 是一種軟體框架(software framework),常用於巨量資料的平行計算
* [詳細](https://hackmd.io/s/Hkb-lXkyg#what-is-mapreduce)
此版本在原有的 threadpool 加上 mapreduce 的功能架構
* `threadpool.[ch]`
* `threadpool_map`: call `threadpool_add`, blocks until all is done
* `threadpool_map_thread`: map task added to threadpool, fixed partition method
* `threadpool_reduce`: call `threadpool_add`, blocks until all is done
* `threadpool_reduce_thread`: reduce task added to threadpool
* [`mapreduce.c`](https://hackmd.io/s/Hkb-lXkyg#mapreducec): 新的測試程式碼
此版本需要修正的地方有:
1. `threadpool_map_thread` 資料分割的方式是直接把 DATASIZE 除以 THREAD_COUNT 去分配 task,此作法寫死且不能展示 mapreduce 的真正的功效
2. [`mapreduce.c`](https://hackmd.io/s/Hkb-lXkyg#mapreducec) 的程式架構分層太多,詳細可以先偷看[這裡](https://hackmd.io/s/HJNu64sJZ#4-檢討-mapreduce-程式架構)
## Calculate map and reduce time
[git commit](https://github.com/Sean1127/mapreduce/commit/003b64275ad051f0f0429b088e4656f7e11e2eb3)
經過老師整理之後,交給同學當作業的程式碼做修正
而此版本是同學開始研究的第 1 個 commit
之後的實驗會依循去年同學的改進流程
# 重現實驗結果
<`Sean1127`>
## Queue size 的影響
* 當 QUEUE $< 7$ ,有時會出現錯誤的結果,但都可以順利執行結束,以下節錄自終端機執行輸出:
```shell
Pool started with 8 threads and queue size of 5
reduce result = 63395205
Pool started with 8 threads and queue size of 5
reduce result = 130575149
Pool started with 8 threads and queue size of 5
Pool started with 8 threads and queue size of 5
reduce result = 21171191
```
可以看到執行的結果明顯錯誤,而且有時根本無法顯示結果(shutdown)
原因來自 `threadpool_add` 中對於 queue full 的處理,如果滿了將會直接回傳 -1,讓程式中止
* 當 QUEUE $\ge 8$ 的時候,程式可正常結束且結果也正確
根據不同 QUEUE size,map 與 reduce 的執行時間(執行 10 次取平均,每次都先清空 cache):
![](https://i.imgur.com/GeCI93p.png)
* `map` 的時間遠大於 `reduce`(e.g. QUEUE = 265, map: 0.004424, reduce: 0.000018)
* queue size 對時間的影響並不明顯,在 QUEUE $< 7$ 的時間曲線應該是因為無法正確執行而提前中止程式
故往後實驗皆以 QUEUE = 256 作為條件
## 改善質數演算法
兩次改進的內容請參考[這裡](https://hackmd.io/s/Hkb-lXkyg#改進質數演算法)
![](https://i.imgur.com/lL3t8zo.png)
## 處理 threadpool queue full
![](https://i.imgur.com/YElC4iv.png)
詳細請看[這裡](https://hackmd.io/s/Hkb-lXkyg#處理-queue-full-的情況)
## 精簡 `threadpool_create`
![](https://i.imgur.com/NZDqEEG.png)
詳細請看[這裡](https://hackmd.io/s/Hkb-lXkyg#精簡程式碼)
## 修正 `threadpool_reduce`
這個版本是從 "handle full queue exception" 開始分支的 branch,所以改良的地方跟計時比較難比較
詳細請看[這裡](https://hackmd.io/s/Hkb-lXkyg#修正-mapreduce-的行為)
## Implement lock free thread pool
![](https://i.imgur.com/KjyZ6hj.png)
詳細請看[這裡](https://hackmd.io/s/Hkb-lXkyg#實作-lock-free)
* 這裡要注意的是,"... 2. `tests/mapreduce.c` 的運行結果不如預期,在 `#define DATASIZE (2000000)` 的情況下,執行時間如下...可見不但沒有精進反而還有變慢的跡象。..."(引用自上一組同學),雖然說跟原始版本沒有進步,但他們所指的**原始版本**是**沒有 lock free 的最新版本**,而不是連質數演算法都沒有改的最原始版本
* 事實上連續幾個版本的時間都非常接近,而改善的幅度也只有 5 % 左右,故這裡可以做結: 以上三個版本並沒有在時間上做出額外貢獻,但的確有改善程式的正確性以及可讀性
## Use C11 atomic operation
[git commit](https://github.com/Sean1127/mapreduce/commit/a38b437fb66808bab083c7630d5bf9c150a50ab4)
![](https://i.imgur.com/ehmzqa5.png)
此版本改寫的程式碼非常少,~~時間進步也非常少~~
atomic operation 用途並不只這邊,但卻是 lock free programming 的一項利器,改善的幅度不大可能是因為 data size 還太小(2,000,000),又或是其他 function 的 overhead 太大
## Without using mutex
[git commit](https://github.com/Sean1127/mapreduce/commit/3c1150e7dd2cd1fb981a1cd03e6f438606f551e6)
此版本的修改都在測試程式碼,將 critical section 的 mutex 改用 atomic 完成
雖然圖表顯示執行時間增加(4.25 %),但理論上是沒有影響,增加的幅度 < 5 % 亦可忽略
![](https://i.imgur.com/6mT3Bxd.png)
# 進階改善嘗試
## 1 加速 尋找質數的函式
<`baimao8437`>,<`xdennisx`>
### 可能可加速的地方
- 在 `is_simple` 的 for loop 裡面第二個 if 判斷式
```c=
if (x < 2) return;
if ((x & ~3) && !(x & 1)) return;
for (int i = 3; i * i <= x; i += 2)
if (x % i == 0) return;
```
原執行時間:
```shell
[map] Total time: 0.561102
[is_simple] Total time: 2.219543
[reduce] Total time: 0.009626
```
改進方法:先把 2 跟 3 的倍數去掉之後,每個質數的組成都是 `6k+1` 或是 `6k-1`,因此把每次 loop 的迭代的間隔變為 `+2`、`+4`
- 將迴圈次數降低
```c=
if (x == 2 || x == 3){
data[n] = x;
return;
}
if(!(x % 2) || !(x % 3) || (x < 2)) return;
long i = 5;
long w = 2;
while (i * i <= x){
if (x % i == 0) return;
i += w;
w = 6 - w;
}
```
時間有些微下降!!
```shell
[map] Total time: 0.405432
[is_simple] Total time: 1.620451
[reduce] Total time: 0.008208
```
再改進:一次判斷兩個 `+2`、`+4`
- 將迴圈次數降低
```c=
if(x<=1)
return;
else if (x <=3)
{
data[n] = x;
return;
}
else if (!(x%2)||!(x%3))
return;
long i = 5;
while (i * i <= x)
{
if (!(x % i)||!(x%(i+2)))
return;
i=i+6;
}
```
結果效率提升41.1%
```shell
[map] Total time: 0.347067
[is_simple] Total time: 1.307372
[reduce] Total time: 0.010900
```
## 2 queue full handling
暫時不做
<br>
## 3 調整 Map 切割問題的方式 (interleaving)
contributed by <`chenweiii`>
### 實驗前提
* 從尚未開始改善的版本開始實驗
* 調整問題大小,從 20000 加大成 200000,希望比較能看出改善成果
* thread 數 = 8
### 改善內容
* 原本的設計是直接將 200000 個數字切割成 8 塊分別 map 給 thread 去執行。
* 1~25000
* ...
* 175001~200000
* cache-misses 高達 9%。
```
Performance counter stats for './mapreduce' (10 runs):
4,2207 cache-misses # 9.791 % of all cache refs ( +- 14.32% )
43,1070 cache-references ( +- 1.87% )
195,7357,9219 cycles ( +- 0.98% )
171,4108,3879 instructions # 0.88 insn per cycle ( +- 0.00% )
1.131522587 seconds time elapsed ( +- 2.51% )
```
* 若將切割的方式改為 interleave 的方式
* 1, 9, 17, ...
* 2, 10, 18, ...
* 3, 11, 19, ...
```clike=
static void threadpool_map_thread(void *arg)
{
int id = *(int *) arg;
threadpool_map_t *map = (threadpool_map_t *) ((int *) arg - id);
int start = id;
int end = map->size;
int delta = map->thread_count;
for (; start < end; start += delta)
map->routine(start, map->arg);
sem_post(&map->done_indicator);
}
```
* cache-misses 則稍稍下降為 3.6%,花在 map 的時間卻沒有改善,反而增加
> ~~執行時間分析圖待補...。~~ [name=Chen Wei] [time=Thu, May 11, 2017 8:49 PM]
```
Performance counter stats for './mapreduce' (10 runs):
3,2631 cache-misses # 3.648 % of all cache refs ( +- 11.34% )
89,4584 cache-references ( +- 1.62% )
226,2217,9341 cycles ( +- 2.06% )
216,9198,9598 instructions # 0.96 insn per cycle ( +- 0.00% )
1.287776785 seconds time elapsed ( +- 2.52% )
```
### 時間分析
* 為了補齊時間的分析,跑了 DATASIZE 從 20000 到 100000
* 雖然 cache misses 有所改善,但執行時間卻有增加的趨勢
![](https://i.imgur.com/GLq0uHq.png)
* 調整了一下 thread_count = 16 ,不知道會不會有比較好的表現
> 在這個版本,資料的切割仍是以 thread_count 處理 [name=Chen Wei][time=Mon, May 15, 2017 3:08 AM]
* cache-misses 些微下降,但執行時間卻神奇地減少
> 仍不知道為什麼同樣 cache-misses 都有下降,卻會有不同的表現? [name=Chen Wei][time=Mon, May 15, 2017 4:14 PM]
```
Performance counter stats for './mapreduce' (10 runs):
4,4955 cache-misses # 2.953 % of all cache refs ( +- 13.58% )
152,2247 cache-references ( +- 2.32% )
273,8929,5990 cycles ( +- 0.45% )
216,9467,8206 instructions # 0.79 insn per cycle ( +- 0.00% )
1.033093284 seconds time elapsed ( +- 1.23% )
```
* 為了觀察三組實驗的表現,我跑了 DATASIZE 從 20000 ~ 200000 的資料,觀察時間的變化
![](https://i.imgur.com/fMdyOfW.png)
<br>
## 4 檢討 MapReduce 程式架構
contributed by <`chenweiii`>, <`Lukechin`>
### 回顧上一組對 MapReduce 實驗成果
* 對論文進行摘要
* 改進質數演算法
* 使用 Yastopwatch
* 使用 LFTPool 取代原本 ThreadPool implementation
* 使用 mutrace, ThreadSanitizer 觀察原程式
* 但沒有具體地改善效能
* 使用 C11 atomic operation 改善原本 LFTPool 某些操作
### 原始 MapReduce Model
```graphviz
digraph g {
node [color=black,fontname=Courier,shape=box]
edge [color=black]
threadpool_map -> 分割;
分割 -> <threadpool_add (1)>
分割 -> <threadpool_add (2)>
分割 -> <threadpool_add (3)>
<threadpool_add (1)> -> <map_thread (1)>
<threadpool_add (2)> -> <map_thread (2)>
<threadpool_add (3)> -> <map_thread (3)>
<map_thread (1)>, <map_thread (2)>, <map_thread (3)> -> routine
routine -> return
}
```
```graphviz
digraph g {
node [color=black,fontname=Courier,shape=box]
edge [color=black]
threadpool_reduce -> threadpool_map
threadpool_map -> 分割;
分割 -> <threadpool_add (1)>
分割 -> <threadpool_add (2)>
分割 -> <threadpool_add (3)>
<threadpool_add (1)> -> <map_thread (1)>
<threadpool_add (2)> -> <map_thread (2)>
<threadpool_add (3)> -> <map_thread (3)>
<map_thread (1)> -> <reduce_thread (1)>
<map_thread (2)> -> <reduce_thread (2)>
<map_thread (3)> -> <reduce_thread (3)>
<reduce_thread (1)>, <reduce_thread (2)>, <reduce_thread (3)> -> reduce
reduce -> return
}
```
### 存在的問題
* 切割問題的方式不彈性,threadpool 有多少 thread 便切成多少 map task。
* 若 thread 做完自身的 map task 便閒置,等待其他 thread 做完。
* 由於 threadpool 裡再包一層 threadpool_map,就喪失了原本 threadpool 的好處,先做完的 thread 沒辦法再找 map task 或是 reduce task 做。
### 預計改善項目
* 改善 threadpool_map 運作模式
* 目前情況: 直接將問題切割成 M 等份的 task 分配給 M 條 thread 去執行 Map。
* 欲改善為: 將問題切割成 N 等份的 task,讓 M 條 thread 去執行。
* 改善 MapReduce 的架構
* 目前情況: 全部的 Map tasks 結束後,再統一丟給 Reduce tasks 處理。
* 欲改善為: 建立 Map tasks 與 Reduce tasks 的對應關係,當某一 Reduce task 對應的 Map tasks 皆以處理完,便可直接將 Reduce task 放入 task queue 等待執行。
### 改善 threadpool_map 運作模式
commit ```411f3c7```
* 原本程式碼的寫法,寫死了 task 必須切割成 thread_count 的個數
```clike=
/* threadpool_map */
for (int i = 0; i < pool->thread_count; i++) {
threadpool_error_t _err = threadpool_add(pool, threadpool_map_thread,
&map.personal_pointers[i], flags);
}
/* threadpool_map_thread */
int end = map->size / map->thread_count;
int additional_items = map->size - end * map->thread_count;
int start = end * id;
```
* 新增 threadpool_map 需要傳入的參數: task_num,user 可以指定問題可切成 task_num 的個數
```clike=
/* threadpool_map */
for (int i = 0; i < task_num; i++) {
threadpool_error_t _err = threadpool_add(pool, threadpool_map_thread,
&map.personal_pointers[i], flags);
}
/* threadpool_map_thread */
int id = *(int *) arg;
threadpool_map_t *map = (threadpool_map_t *) ((int *) arg - id);
int task_num = map->task_num;
int end = map->size / task_num;
int additional_items = map->size - end * task_num;
int start = end * id;
```
### 改善 MapReduce 的架構
commit ```c36f353```
* 將 threadpool_map 與 threadpool_reduce 整理成 mapreduce
* 改良 reduce 架構,讓 threadpool_reduce_thread 不用再透過 threadpool_map_thread 去執行
* 當某 map task 完成,master thread 便會將對應的 reduce task 放入 waiting queue
> 但目前實作上,是一對一的 mapping 關係,暫時想不到多對一的好處跟實作方法。 [name=Chen Wei][time=Fri, May 12, 2017 3:37 PM]
>> 回應老師討論區問題,這邊指的 mapping 關係是指 map 與 reduce 之間,不是指 reduce task 與 worker thread 之間。 [name=Chen Wei][time=Sun, May 14, 2017 11:33 PM]
* 因把 map task 和 reduce task 整合在一個 function,便不再個別測量 map 與 reduce time
```graphviz
digraph g {
node [color=black,fontname=Courier,shape=box]
edge [color=black]
mapreduce -> 分割;
分割 -> <threadpool_add (1)>
分割 -> <threadpool_add (2)>
分割 -> <threadpool_add (3)>
分割 -> <threadpool_add (4)>
分割 -> <threadpool_add (5)>
分割 -> <threadpool_add (6)>
<threadpool_add (1)> -> <map_thread (1)>
<threadpool_add (2)> -> <map_thread (2)>
<threadpool_add (3)> -> <map_thread (3)>
<threadpool_add (4)> -> <reduce_thread (1)>
<threadpool_add (5)> -> <reduce_thread (2)>
<threadpool_add (6)> -> <reduce_thread (3)>
<map_thread (1)>, <map_thread (2)>, <map_thread (3)> -> routine
<reduce_thread (1)>, <reduce_thread (2)>, <reduce_thread (3)> -> reduce
routine, reduce -> return
}
```
* mapreduce 以 threadpool_map 為主體,加入 threadpool_reduce 的程式碼
* 加入 sem_wait 等待其中 map task 完成,若完成則再尋找是哪一個 task 完成
* 在這邊使用了 map.personal_pointers == -1 當作是否完成的標記,故當 map task 完成時要記得更改
* 由於改成和 map task 一對一,也要順便修改 reduce 切割問題的模式,以和 map 一致
```clike=
for (int i = 0; i < task_num; i++) {
sem_wait(&map.done_indicator);
for (int j = 0; j < task_num; j++) {
if (map.personal_pointers[j] == -1) {
/* add reduce task j */
threadpool_error_t _err = threadpool_add(pool, threadpool_reduce_thread,
&info.personal_pointers[j], flags);
map.personal_pointers[j] = -2;
break;
}
}
}
```
* threadpool_reduce 便可以直接砍掉了,但要記得修改 threadpool_reduce_thread 的參數及裡面的程式碼,符合 threadpool_add 參數的格式
* 由於最後在等待 reduce_thread 完成,故必須新增一個 info.done_indicator 的 semaphore,等待所有 reduce thread 確實結束
### 改善結果
* 實驗環境
* OS: Ubuntu 16.04.2 LTS (64 bit)
* CPU: intel i7-6700
* Cache:
* L1d cache: 32K
* L1i cache: 32K
* L2 cache: 256K
* L3 cache: 8192K
* cash alignment: 64B (cache block size)
* Memory: 32 GB
* 實驗假設
* THREAD 數目 = 8
* TASK QUEUE 大小 = 256
* TASK NUM = 16
* 不同 DATASIZE 的執行時間,取樣 30 次作平均
* 對照組 (orig) 為最一開始 support MapReduce 的版本
* 在沒有清理 cache 情況下的時間表現
* 能夠看出明顯的改善
![](https://i.imgur.com/sdTwSo7.png)
* 每次執行皆清理 cache 的時間表現 (opt1)
* 在組員的提醒下,在每次執行 mapreduce 之前清理 cache
* 結果沒有如上圖漂亮,略顯波折,但在某些區段仍看得出改善成果
![](https://i.imgur.com/Zn9HbWs.png)
<br>
## 5 整合成果
* 在修改完 MapReduce 架構後,逐一整合下列兩項改善方法之成果
* 實驗假設如同上一節所述,都有清理 cache 後再進行量測
* 對照組依然為 orig,最後會在附上綜合的比較圖表
### 整合 interleaving map 的切割問題方式 (opt2)
![](https://i.imgur.com/YfrhqIj.png)
### 整合 is_simple() 改善後的演算法 (opt3)
從 interleaving map 的版本整合
![](https://i.imgur.com/0t5FYfG.png)
### 綜合的比較圖表
![](https://i.imgur.com/YhfvPWm.png)
<br>
## 6 lockfree 加速
暫時不做
## 7 fault tolerance
暫時不做
## 8 改善 merge sort
contributed by <`Sean1127`>,<`Lukechin`>,<`chenweiii`>
### 實驗環境
```
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 4
On-line CPU(s) list: 0-3
Thread(s) per core: 2
Core(s) per socket: 2
Socket(s): 1
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 78
Model name: Intel(R) Core(TM) i5-6300U CPU @ 2.40GHz
Stepping: 3
CPU MHz: 399.932
CPU max MHz: 3000.0000
CPU min MHz: 400.0000
BogoMIPS: 4992.00
Virtualization: VT-x
L1d cache: 32K
L1i cache: 32K
L2 cache: 256K
L3 cache: 3072K
NUMA node0 CPU(s): 0-3
```
### 內容
將原本 threadpool 替換為我們改良後的 threadpool,並比較其效能。
#### thread_count = 1
![](https://i.imgur.com/bihUuOJ.png)
#### thread_count = 2
![](https://i.imgur.com/WmsCIv9.png)
#### thread_count = 4
![](https://i.imgur.com/tRWhMz8.png)
#### thread_count = 8
![](https://i.imgur.com/Xg1HLR9.png)
#### thread_count = 16
![](https://i.imgur.com/jpMkboI.png)
#### thread_count = 32
![](https://i.imgur.com/XNFKCAk.png)
#### thread_count = 64
![](https://i.imgur.com/LODXBh4.png)
#### 綜合
![](https://i.imgur.com/VeCloej.png)
<br>
# 不同應用案例
<`paul5566`>
用 merge sort 測試 map reduce 效能
# 結論
# Reference
- [Find prime algorithm](http://stackoverflow.com/questions/1801391/what-is-the-best-algorithm-for-checking-if-a-number-is-prime)
###### tags: `sysprog`