# MapReduce contributed by <`Sean1127`>,<`baimao8437`>,<`Lukechin`>,<`xdennisx`>,<`paul5566`>,<`chenweiii`> --- # 相關連結 [GitHub](https://github.com/Sean1127/mapreduce) [YouTube]() # 預期目標 延續上學期 ["MapReduce with POSIX Thread"](https://hackmd.io/s/Hkb-lXkyg) 的成果,強化效能和應用案例 # 背景介紹 Mathias Brossard 在 5, Oct, 2016 完成的 "A simple C thread pool implementation" 為最原始的版本,包含`threadpool.c`,`threadpool.h`的實做程式碼與`heavy.c`,`shutdown.c`,`thrdtest.c`測試程式碼 ## threadpool 版本 [GitHub](https://github.com/Sean1127/mapreduce/tree/169d20f326772492a836c0d2acd6d1de985f002d) * `threadpool.[ch]` * `threadpool_create`: create threadpool_t object * `threadpool_add`: add a new task in the queue of a threadpool * `threadpool_destroy`: stop and destroy a threadpool_t object * `threadpool_free`: free memory * `threadpool_thread`: worker thread, infinite loop that grabs a task from threadpool queue and executes it * [`test/`](https://hackmd.io/s/Hkb-lXkyg#test) * [`heavy.c`](https://hackmd.io/s/Hkb-lXkyg#heavyc) 64 thread pool,每個 pool 的 queue size 8192,每個 pool thread count 4 * [`shutdown.c`](https://hackmd.io/s/Hkb-lXkyg#shutdownc) * [`thrdtest.c`](https://hackmd.io/s/Hkb-lXkyg#thrdtestc) 此版本需要修正的地方有: 1. `test/` 4 個測試程式都有 memory leak 2. threadpool queue full 情況未處理 ## mapreduce support 版本 [git commit](https://github.com/Sean1127/mapreduce/commit/b9675af0fd2022d4ebe50e5de6b6dfb5088cb1f3) mapreduce 是一種軟體框架(software framework),常用於巨量資料的平行計算 * [詳細](https://hackmd.io/s/Hkb-lXkyg#what-is-mapreduce) 此版本在原有的 threadpool 加上 mapreduce 的功能架構 * `threadpool.[ch]` * `threadpool_map`: call `threadpool_add`, blocks until all is done * `threadpool_map_thread`: map task added to threadpool, fixed partition method * `threadpool_reduce`: call `threadpool_add`, blocks until all is done * `threadpool_reduce_thread`: reduce task added to threadpool * [`mapreduce.c`](https://hackmd.io/s/Hkb-lXkyg#mapreducec): 新的測試程式碼 此版本需要修正的地方有: 1. `threadpool_map_thread` 資料分割的方式是直接把 DATASIZE 除以 THREAD_COUNT 去分配 task,此作法寫死且不能展示 mapreduce 的真正的功效 2. [`mapreduce.c`](https://hackmd.io/s/Hkb-lXkyg#mapreducec) 的程式架構分層太多,詳細可以先偷看[這裡](https://hackmd.io/s/HJNu64sJZ#4-檢討-mapreduce-程式架構) ## Calculate map and reduce time [git commit](https://github.com/Sean1127/mapreduce/commit/003b64275ad051f0f0429b088e4656f7e11e2eb3) 經過老師整理之後,交給同學當作業的程式碼做修正 而此版本是同學開始研究的第 1 個 commit 之後的實驗會依循去年同學的改進流程 # 重現實驗結果 <`Sean1127`> ## Queue size 的影響 * 當 QUEUE $< 7$ ,有時會出現錯誤的結果,但都可以順利執行結束,以下節錄自終端機執行輸出: ```shell Pool started with 8 threads and queue size of 5 reduce result = 63395205 Pool started with 8 threads and queue size of 5 reduce result = 130575149 Pool started with 8 threads and queue size of 5 Pool started with 8 threads and queue size of 5 reduce result = 21171191 ``` 可以看到執行的結果明顯錯誤,而且有時根本無法顯示結果(shutdown) 原因來自 `threadpool_add` 中對於 queue full 的處理,如果滿了將會直接回傳 -1,讓程式中止 * 當 QUEUE $\ge 8$ 的時候,程式可正常結束且結果也正確 根據不同 QUEUE size,map 與 reduce 的執行時間(執行 10 次取平均,每次都先清空 cache): ![](https://i.imgur.com/GeCI93p.png) * `map` 的時間遠大於 `reduce`(e.g. QUEUE = 265, map: 0.004424, reduce: 0.000018) * queue size 對時間的影響並不明顯,在 QUEUE $< 7$ 的時間曲線應該是因為無法正確執行而提前中止程式 故往後實驗皆以 QUEUE = 256 作為條件 ## 改善質數演算法 兩次改進的內容請參考[這裡](https://hackmd.io/s/Hkb-lXkyg#改進質數演算法) ![](https://i.imgur.com/lL3t8zo.png) ## 處理 threadpool queue full ![](https://i.imgur.com/YElC4iv.png) 詳細請看[這裡](https://hackmd.io/s/Hkb-lXkyg#處理-queue-full-的情況) ## 精簡 `threadpool_create` ![](https://i.imgur.com/NZDqEEG.png) 詳細請看[這裡](https://hackmd.io/s/Hkb-lXkyg#精簡程式碼) ## 修正 `threadpool_reduce` 這個版本是從 "handle full queue exception" 開始分支的 branch,所以改良的地方跟計時比較難比較 詳細請看[這裡](https://hackmd.io/s/Hkb-lXkyg#修正-mapreduce-的行為) ## Implement lock free thread pool ![](https://i.imgur.com/KjyZ6hj.png) 詳細請看[這裡](https://hackmd.io/s/Hkb-lXkyg#實作-lock-free) * 這裡要注意的是,"... 2. `tests/mapreduce.c` 的運行結果不如預期,在 `#define DATASIZE (2000000)` 的情況下,執行時間如下...可見不但沒有精進反而還有變慢的跡象。..."(引用自上一組同學),雖然說跟原始版本沒有進步,但他們所指的**原始版本**是**沒有 lock free 的最新版本**,而不是連質數演算法都沒有改的最原始版本 * 事實上連續幾個版本的時間都非常接近,而改善的幅度也只有 5 % 左右,故這裡可以做結: 以上三個版本並沒有在時間上做出額外貢獻,但的確有改善程式的正確性以及可讀性 ## Use C11 atomic operation [git commit](https://github.com/Sean1127/mapreduce/commit/a38b437fb66808bab083c7630d5bf9c150a50ab4) ![](https://i.imgur.com/ehmzqa5.png) 此版本改寫的程式碼非常少,~~時間進步也非常少~~ atomic operation 用途並不只這邊,但卻是 lock free programming 的一項利器,改善的幅度不大可能是因為 data size 還太小(2,000,000),又或是其他 function 的 overhead 太大 ## Without using mutex [git commit](https://github.com/Sean1127/mapreduce/commit/3c1150e7dd2cd1fb981a1cd03e6f438606f551e6) 此版本的修改都在測試程式碼,將 critical section 的 mutex 改用 atomic 完成 雖然圖表顯示執行時間增加(4.25 %),但理論上是沒有影響,增加的幅度 < 5 % 亦可忽略 ![](https://i.imgur.com/6mT3Bxd.png) # 進階改善嘗試 ## 1 加速 尋找質數的函式 <`baimao8437`>,<`xdennisx`> ### 可能可加速的地方 - 在 `is_simple` 的 for loop 裡面第二個 if 判斷式 ```c= if (x < 2) return; if ((x & ~3) && !(x & 1)) return; for (int i = 3; i * i <= x; i += 2) if (x % i == 0) return; ``` 原執行時間: ```shell [map] Total time: 0.561102 [is_simple] Total time: 2.219543 [reduce] Total time: 0.009626 ``` 改進方法:先把 2 跟 3 的倍數去掉之後,每個質數的組成都是 `6k+1` 或是 `6k-1`,因此把每次 loop 的迭代的間隔變為 `+2`、`+4` - 將迴圈次數降低 ```c= if (x == 2 || x == 3){ data[n] = x; return; } if(!(x % 2) || !(x % 3) || (x < 2)) return; long i = 5; long w = 2; while (i * i <= x){ if (x % i == 0) return; i += w; w = 6 - w; } ``` 時間有些微下降!! ```shell [map] Total time: 0.405432 [is_simple] Total time: 1.620451 [reduce] Total time: 0.008208 ``` 再改進:一次判斷兩個 `+2`、`+4` - 將迴圈次數降低 ```c= if(x<=1) return; else if (x <=3) { data[n] = x; return; } else if (!(x%2)||!(x%3)) return; long i = 5; while (i * i <= x) { if (!(x % i)||!(x%(i+2))) return; i=i+6; } ``` 結果效率提升41.1% ```shell [map] Total time: 0.347067 [is_simple] Total time: 1.307372 [reduce] Total time: 0.010900 ``` ## 2 queue full handling 暫時不做 <br> ## 3 調整 Map 切割問題的方式 (interleaving) contributed by <`chenweiii`> ### 實驗前提 * 從尚未開始改善的版本開始實驗 * 調整問題大小,從 20000 加大成 200000,希望比較能看出改善成果 * thread 數 = 8 ### 改善內容 * 原本的設計是直接將 200000 個數字切割成 8 塊分別 map 給 thread 去執行。 * 1~25000 * ... * 175001~200000 * cache-misses 高達 9%。 ``` Performance counter stats for './mapreduce' (10 runs): 4,2207 cache-misses # 9.791 % of all cache refs ( +- 14.32% ) 43,1070 cache-references ( +- 1.87% ) 195,7357,9219 cycles ( +- 0.98% ) 171,4108,3879 instructions # 0.88 insn per cycle ( +- 0.00% ) 1.131522587 seconds time elapsed ( +- 2.51% ) ``` * 若將切割的方式改為 interleave 的方式 * 1, 9, 17, ... * 2, 10, 18, ... * 3, 11, 19, ... ```clike= static void threadpool_map_thread(void *arg) { int id = *(int *) arg; threadpool_map_t *map = (threadpool_map_t *) ((int *) arg - id); int start = id; int end = map->size; int delta = map->thread_count; for (; start < end; start += delta) map->routine(start, map->arg); sem_post(&map->done_indicator); } ``` * cache-misses 則稍稍下降為 3.6%,花在 map 的時間卻沒有改善,反而增加 > ~~執行時間分析圖待補...。~~ [name=Chen Wei] [time=Thu, May 11, 2017 8:49 PM] ``` Performance counter stats for './mapreduce' (10 runs): 3,2631 cache-misses # 3.648 % of all cache refs ( +- 11.34% ) 89,4584 cache-references ( +- 1.62% ) 226,2217,9341 cycles ( +- 2.06% ) 216,9198,9598 instructions # 0.96 insn per cycle ( +- 0.00% ) 1.287776785 seconds time elapsed ( +- 2.52% ) ``` ### 時間分析 * 為了補齊時間的分析,跑了 DATASIZE 從 20000 到 100000 * 雖然 cache misses 有所改善,但執行時間卻有增加的趨勢 ![](https://i.imgur.com/GLq0uHq.png) * 調整了一下 thread_count = 16 ,不知道會不會有比較好的表現 > 在這個版本,資料的切割仍是以 thread_count 處理 [name=Chen Wei][time=Mon, May 15, 2017 3:08 AM] * cache-misses 些微下降,但執行時間卻神奇地減少 > 仍不知道為什麼同樣 cache-misses 都有下降,卻會有不同的表現? [name=Chen Wei][time=Mon, May 15, 2017 4:14 PM] ``` Performance counter stats for './mapreduce' (10 runs): 4,4955 cache-misses # 2.953 % of all cache refs ( +- 13.58% ) 152,2247 cache-references ( +- 2.32% ) 273,8929,5990 cycles ( +- 0.45% ) 216,9467,8206 instructions # 0.79 insn per cycle ( +- 0.00% ) 1.033093284 seconds time elapsed ( +- 1.23% ) ``` * 為了觀察三組實驗的表現,我跑了 DATASIZE 從 20000 ~ 200000 的資料,觀察時間的變化 ![](https://i.imgur.com/fMdyOfW.png) <br> ## 4 檢討 MapReduce 程式架構 contributed by <`chenweiii`>, <`Lukechin`> ### 回顧上一組對 MapReduce 實驗成果 * 對論文進行摘要 * 改進質數演算法 * 使用 Yastopwatch * 使用 LFTPool 取代原本 ThreadPool implementation * 使用 mutrace, ThreadSanitizer 觀察原程式 * 但沒有具體地改善效能 * 使用 C11 atomic operation 改善原本 LFTPool 某些操作 ### 原始 MapReduce Model ```graphviz digraph g { node [color=black,fontname=Courier,shape=box] edge [color=black] threadpool_map -> 分割; 分割 -> <threadpool_add (1)> 分割 -> <threadpool_add (2)> 分割 -> <threadpool_add (3)> <threadpool_add (1)> -> <map_thread (1)> <threadpool_add (2)> -> <map_thread (2)> <threadpool_add (3)> -> <map_thread (3)> <map_thread (1)>, <map_thread (2)>, <map_thread (3)> -> routine routine -> return } ``` ```graphviz digraph g { node [color=black,fontname=Courier,shape=box] edge [color=black] threadpool_reduce -> threadpool_map threadpool_map -> 分割; 分割 -> <threadpool_add (1)> 分割 -> <threadpool_add (2)> 分割 -> <threadpool_add (3)> <threadpool_add (1)> -> <map_thread (1)> <threadpool_add (2)> -> <map_thread (2)> <threadpool_add (3)> -> <map_thread (3)> <map_thread (1)> -> <reduce_thread (1)> <map_thread (2)> -> <reduce_thread (2)> <map_thread (3)> -> <reduce_thread (3)> <reduce_thread (1)>, <reduce_thread (2)>, <reduce_thread (3)> -> reduce reduce -> return } ``` ### 存在的問題 * 切割問題的方式不彈性,threadpool 有多少 thread 便切成多少 map task。 * 若 thread 做完自身的 map task 便閒置,等待其他 thread 做完。 * 由於 threadpool 裡再包一層 threadpool_map,就喪失了原本 threadpool 的好處,先做完的 thread 沒辦法再找 map task 或是 reduce task 做。 ### 預計改善項目 * 改善 threadpool_map 運作模式 * 目前情況: 直接將問題切割成 M 等份的 task 分配給 M 條 thread 去執行 Map。 * 欲改善為: 將問題切割成 N 等份的 task,讓 M 條 thread 去執行。 * 改善 MapReduce 的架構 * 目前情況: 全部的 Map tasks 結束後,再統一丟給 Reduce tasks 處理。 * 欲改善為: 建立 Map tasks 與 Reduce tasks 的對應關係,當某一 Reduce task 對應的 Map tasks 皆以處理完,便可直接將 Reduce task 放入 task queue 等待執行。 ### 改善 threadpool_map 運作模式 commit ```411f3c7``` * 原本程式碼的寫法,寫死了 task 必須切割成 thread_count 的個數 ```clike= /* threadpool_map */ for (int i = 0; i < pool->thread_count; i++) { threadpool_error_t _err = threadpool_add(pool, threadpool_map_thread, &map.personal_pointers[i], flags); } /* threadpool_map_thread */ int end = map->size / map->thread_count; int additional_items = map->size - end * map->thread_count; int start = end * id; ``` * 新增 threadpool_map 需要傳入的參數: task_num,user 可以指定問題可切成 task_num 的個數 ```clike= /* threadpool_map */ for (int i = 0; i < task_num; i++) { threadpool_error_t _err = threadpool_add(pool, threadpool_map_thread, &map.personal_pointers[i], flags); } /* threadpool_map_thread */ int id = *(int *) arg; threadpool_map_t *map = (threadpool_map_t *) ((int *) arg - id); int task_num = map->task_num; int end = map->size / task_num; int additional_items = map->size - end * task_num; int start = end * id; ``` ### 改善 MapReduce 的架構 commit ```c36f353``` * 將 threadpool_map 與 threadpool_reduce 整理成 mapreduce * 改良 reduce 架構,讓 threadpool_reduce_thread 不用再透過 threadpool_map_thread 去執行 * 當某 map task 完成,master thread 便會將對應的 reduce task 放入 waiting queue > 但目前實作上,是一對一的 mapping 關係,暫時想不到多對一的好處跟實作方法。 [name=Chen Wei][time=Fri, May 12, 2017 3:37 PM] >> 回應老師討論區問題,這邊指的 mapping 關係是指 map 與 reduce 之間,不是指 reduce task 與 worker thread 之間。 [name=Chen Wei][time=Sun, May 14, 2017 11:33 PM] * 因把 map task 和 reduce task 整合在一個 function,便不再個別測量 map 與 reduce time ```graphviz digraph g { node [color=black,fontname=Courier,shape=box] edge [color=black] mapreduce -> 分割; 分割 -> <threadpool_add (1)> 分割 -> <threadpool_add (2)> 分割 -> <threadpool_add (3)> 分割 -> <threadpool_add (4)> 分割 -> <threadpool_add (5)> 分割 -> <threadpool_add (6)> <threadpool_add (1)> -> <map_thread (1)> <threadpool_add (2)> -> <map_thread (2)> <threadpool_add (3)> -> <map_thread (3)> <threadpool_add (4)> -> <reduce_thread (1)> <threadpool_add (5)> -> <reduce_thread (2)> <threadpool_add (6)> -> <reduce_thread (3)> <map_thread (1)>, <map_thread (2)>, <map_thread (3)> -> routine <reduce_thread (1)>, <reduce_thread (2)>, <reduce_thread (3)> -> reduce routine, reduce -> return } ``` * mapreduce 以 threadpool_map 為主體,加入 threadpool_reduce 的程式碼 * 加入 sem_wait 等待其中 map task 完成,若完成則再尋找是哪一個 task 完成 * 在這邊使用了 map.personal_pointers == -1 當作是否完成的標記,故當 map task 完成時要記得更改 * 由於改成和 map task 一對一,也要順便修改 reduce 切割問題的模式,以和 map 一致 ```clike= for (int i = 0; i < task_num; i++) { sem_wait(&map.done_indicator); for (int j = 0; j < task_num; j++) { if (map.personal_pointers[j] == -1) { /* add reduce task j */ threadpool_error_t _err = threadpool_add(pool, threadpool_reduce_thread, &info.personal_pointers[j], flags); map.personal_pointers[j] = -2; break; } } } ``` * threadpool_reduce 便可以直接砍掉了,但要記得修改 threadpool_reduce_thread 的參數及裡面的程式碼,符合 threadpool_add 參數的格式 * 由於最後在等待 reduce_thread 完成,故必須新增一個 info.done_indicator 的 semaphore,等待所有 reduce thread 確實結束 ### 改善結果 * 實驗環境 * OS: Ubuntu 16.04.2 LTS (64 bit) * CPU: intel i7-6700 * Cache: * L1d cache: 32K * L1i cache: 32K * L2 cache: 256K * L3 cache: 8192K * cash alignment: 64B (cache block size) * Memory: 32 GB * 實驗假設 * THREAD 數目 = 8 * TASK QUEUE 大小 = 256 * TASK NUM = 16 * 不同 DATASIZE 的執行時間,取樣 30 次作平均 * 對照組 (orig) 為最一開始 support MapReduce 的版本 * 在沒有清理 cache 情況下的時間表現 * 能夠看出明顯的改善 ![](https://i.imgur.com/sdTwSo7.png) * 每次執行皆清理 cache 的時間表現 (opt1) * 在組員的提醒下,在每次執行 mapreduce 之前清理 cache * 結果沒有如上圖漂亮,略顯波折,但在某些區段仍看得出改善成果 ![](https://i.imgur.com/Zn9HbWs.png) <br> ## 5 整合成果 * 在修改完 MapReduce 架構後,逐一整合下列兩項改善方法之成果 * 實驗假設如同上一節所述,都有清理 cache 後再進行量測 * 對照組依然為 orig,最後會在附上綜合的比較圖表 ### 整合 interleaving map 的切割問題方式 (opt2) ![](https://i.imgur.com/YfrhqIj.png) ### 整合 is_simple() 改善後的演算法 (opt3) 從 interleaving map 的版本整合 ![](https://i.imgur.com/0t5FYfG.png) ### 綜合的比較圖表 ![](https://i.imgur.com/YhfvPWm.png) <br> ## 6 lockfree 加速 暫時不做 ## 7 fault tolerance 暫時不做 ## 8 改善 merge sort contributed by <`Sean1127`>,<`Lukechin`>,<`chenweiii`> ### 實驗環境 ``` Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Byte Order: Little Endian CPU(s): 4 On-line CPU(s) list: 0-3 Thread(s) per core: 2 Core(s) per socket: 2 Socket(s): 1 NUMA node(s): 1 Vendor ID: GenuineIntel CPU family: 6 Model: 78 Model name: Intel(R) Core(TM) i5-6300U CPU @ 2.40GHz Stepping: 3 CPU MHz: 399.932 CPU max MHz: 3000.0000 CPU min MHz: 400.0000 BogoMIPS: 4992.00 Virtualization: VT-x L1d cache: 32K L1i cache: 32K L2 cache: 256K L3 cache: 3072K NUMA node0 CPU(s): 0-3 ``` ### 內容 將原本 threadpool 替換為我們改良後的 threadpool,並比較其效能。 #### thread_count = 1 ![](https://i.imgur.com/bihUuOJ.png) #### thread_count = 2 ![](https://i.imgur.com/WmsCIv9.png) #### thread_count = 4 ![](https://i.imgur.com/tRWhMz8.png) #### thread_count = 8 ![](https://i.imgur.com/Xg1HLR9.png) #### thread_count = 16 ![](https://i.imgur.com/jpMkboI.png) #### thread_count = 32 ![](https://i.imgur.com/XNFKCAk.png) #### thread_count = 64 ![](https://i.imgur.com/LODXBh4.png) #### 綜合 ![](https://i.imgur.com/VeCloej.png) <br> # 不同應用案例 <`paul5566`> 用 merge sort 測試 map reduce 效能 # 結論 # Reference - [Find prime algorithm](http://stackoverflow.com/questions/1801391/what-is-the-best-algorithm-for-checking-if-a-number-is-prime) ###### tags: `sysprog`