MapReduce

contributed by <Sean1127>,<baimao8437>,<Lukechin>,<xdennisx>,<paul5566>,<chenweiii>


相關連結

GitHub
YouTube

預期目標

延續上學期 "MapReduce with POSIX Thread" 的成果,強化效能和應用案例

背景介紹

Mathias Brossard 在 5, Oct, 2016 完成的 "A simple C thread pool implementation" 為最原始的版本,包含threadpool.c,threadpool.h的實做程式碼與heavy.c,shutdown.c,thrdtest.c測試程式碼

threadpool 版本

GitHub

  • threadpool.[ch]

    • threadpool_create: create threadpool_t object
    • threadpool_add: add a new task in the queue of a threadpool
    • threadpool_destroy: stop and destroy a threadpool_t object
    • threadpool_free: free memory
    • threadpool_thread: worker thread, infinite loop that grabs a task from threadpool queue and executes it
  • test/

  • heavy.c
    64 thread pool,每個 pool 的 queue size 8192,每個 pool thread count 4

  • shutdown.c

  • thrdtest.c

此版本需要修正的地方有:

  1. test/ 4 個測試程式都有 memory leak
  2. threadpool queue full 情況未處理

mapreduce support 版本

git commit

mapreduce 是一種軟體框架(software framework),常用於巨量資料的平行計算

此版本在原有的 threadpool 加上 mapreduce 的功能架構

  • threadpool.[ch]

    • threadpool_map: call threadpool_add, blocks until all is done
    • threadpool_map_thread: map task added to threadpool, fixed partition method
    • threadpool_reduce: call threadpool_add, blocks until all is done
    • threadpool_reduce_thread: reduce task added to threadpool
  • mapreduce.c: 新的測試程式碼

此版本需要修正的地方有:

  1. threadpool_map_thread 資料分割的方式是直接把 DATASIZE 除以 THREAD_COUNT 去分配 task,此作法寫死且不能展示 mapreduce 的真正的功效
  2. mapreduce.c 的程式架構分層太多,詳細可以先偷看這裡

Calculate map and reduce time

git commit

經過老師整理之後,交給同學當作業的程式碼做修正
而此版本是同學開始研究的第 1 個 commit
之後的實驗會依循去年同學的改進流程

重現實驗結果

<Sean1127>

Queue size 的影響

  • 當 QUEUE \(< 7\) ,有時會出現錯誤的結果,但都可以順利執行結束,以下節錄自終端機執行輸出:

    ​​Pool started with 8 threads and queue size of 5
    ​​reduce result = 63395205
    ​​
    ​​Pool started with 8 threads and queue size of 5
    ​​reduce result = 130575149
    ​​
    ​​Pool started with 8 threads and queue size of 5
    ​​
    ​​Pool started with 8 threads and queue size of 5
    ​​reduce result = 21171191
    

    可以看到執行的結果明顯錯誤,而且有時根本無法顯示結果(shutdown)
    原因來自 threadpool_add 中對於 queue full 的處理,如果滿了將會直接回傳 -1,讓程式中止

  • 當 QUEUE \(\ge 8\) 的時候,程式可正常結束且結果也正確
    根據不同 QUEUE size,map 與 reduce 的執行時間(執行 10 次取平均,每次都先清空 cache):

    Image Not Showing Possible Reasons
    • The image file may be corrupted
    • The server hosting the image is unavailable
    • The image path is incorrect
    • The image format is not supported
    Learn More →

    • map 的時間遠大於 reduce(e.g. QUEUE = 265, map: 0.004424, reduce: 0.000018)
    • queue size 對時間的影響並不明顯,在 QUEUE \(< 7\) 的時間曲線應該是因為無法正確執行而提前中止程式
      故往後實驗皆以 QUEUE = 256 作為條件

改善質數演算法

兩次改進的內容請參考這裡

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

處理 threadpool queue full

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

詳細請看這裡

精簡 threadpool_create

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

詳細請看這裡

修正 threadpool_reduce

這個版本是從 "handle full queue exception" 開始分支的 branch,所以改良的地方跟計時比較難比較

詳細請看這裡

Implement lock free thread pool

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

詳細請看這裡

  • 這裡要注意的是," 2. tests/mapreduce.c 的運行結果不如預期,在 #define DATASIZE (2000000) 的情況下,執行時間如下可見不但沒有精進反而還有變慢的跡象。"(引用自上一組同學),雖然說跟原始版本沒有進步,但他們所指的原始版本沒有 lock free 的最新版本,而不是連質數演算法都沒有改的最原始版本
  • 事實上連續幾個版本的時間都非常接近,而改善的幅度也只有 5 % 左右,故這裡可以做結: 以上三個版本並沒有在時間上做出額外貢獻,但的確有改善程式的正確性以及可讀性

Use C11 atomic operation

git commit

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

此版本改寫的程式碼非常少,時間進步也非常少
atomic operation 用途並不只這邊,但卻是 lock free programming 的一項利器,改善的幅度不大可能是因為 data size 還太小(2,000,000),又或是其他 function 的 overhead 太大

Without using mutex

git commit

此版本的修改都在測試程式碼,將 critical section 的 mutex 改用 atomic 完成
雖然圖表顯示執行時間增加(4.25 %),但理論上是沒有影響,增加的幅度 < 5 % 亦可忽略

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

進階改善嘗試

1 加速 尋找質數的函式

<baimao8437>,<xdennisx>

可能可加速的地方

  • is_simple 的 for loop 裡面第二個 if 判斷式
if (x < 2) return; if ((x & ~3) && !(x & 1)) return; for (int i = 3; i * i <= x; i += 2) if (x % i == 0) return;

原執行時間:

[map] Total time: 0.561102
[is_simple] Total time: 2.219543
[reduce] Total time: 0.009626

改進方法:先把 2 跟 3 的倍數去掉之後,每個質數的組成都是 6k+1 或是 6k-1,因此把每次 loop 的迭代的間隔變為 +2+4

  • 將迴圈次數降低
​​ if (x == 2 || x == 3){ ​​ data[n] = x; ​​ return; ​​ } ​​ if(!(x % 2) || !(x % 3) || (x < 2)) return; ​​ long i = 5; ​​ long w = 2; ​​ while (i * i <= x){ ​​ if (x % i == 0) return; ​​ i += w; ​​ w = 6 - w; ​​ }

時間有些微下降!!

​​[map] Total time: 0.405432
​​[is_simple] Total time: 1.620451
​​[reduce] Total time: 0.008208

再改進:一次判斷兩個 +2+4

  • 將迴圈次數降低
​​ if(x<=1) ​​ return; ​​ else if (x <=3) ​​ { ​​ data[n] = x; ​​ return; ​​ } ​​ else if (!(x%2)||!(x%3)) ​​ return; ​​ long i = 5; ​​ while (i * i <= x) ​​ { ​​ if (!(x % i)||!(x%(i+2))) ​​ return; ​​ i=i+6; ​​ }

結果效率提升41.1%

​​[map] Total time: 0.347067
[is_simple] Total time: 1.307372
[reduce] Total time: 0.010900

2 queue full handling

暫時不做


3 調整 Map 切割問題的方式 (interleaving)

contributed by <chenweiii>

實驗前提

  • 從尚未開始改善的版本開始實驗
  • 調整問題大小,從 20000 加大成 200000,希望比較能看出改善成果
  • thread 數 = 8

改善內容

  • 原本的設計是直接將 200000 個數字切割成 8 塊分別 map 給 thread 去執行。
    • 1~25000
    • 175001~200000
  • cache-misses 高達 9%。
Performance counter stats for './mapreduce' (10 runs):

            4,2207      cache-misses              #    9.791 % of all cache refs      ( +- 14.32% )
           43,1070      cache-references                                              ( +-  1.87% )
     195,7357,9219      cycles                                                        ( +-  0.98% )
     171,4108,3879      instructions              #    0.88  insn per cycle           ( +-  0.00% )

       1.131522587 seconds time elapsed                                          ( +-  2.51% )
  • 若將切割的方式改為 interleave 的方式
    • 1, 9, 17,
    • 2, 10, 18,
    • 3, 11, 19,
static void threadpool_map_thread(void *arg) { int id = *(int *) arg; threadpool_map_t *map = (threadpool_map_t *) ((int *) arg - id); int start = id; int end = map->size; int delta = map->thread_count; for (; start < end; start += delta) map->routine(start, map->arg); sem_post(&map->done_indicator); }
  • cache-misses 則稍稍下降為 3.6%,花在 map 的時間卻沒有改善,反而增加

執行時間分析圖待補 Chen WeiThu, May 11, 2017 8:49 PM

  Performance counter stats for './mapreduce' (10 runs):

            3,2631      cache-misses              #    3.648 % of all cache refs      ( +- 11.34% )
           89,4584      cache-references                                              ( +-  1.62% )
     226,2217,9341      cycles                                                        ( +-  2.06% )
     216,9198,9598      instructions              #    0.96  insn per cycle           ( +-  0.00% )

       1.287776785 seconds time elapsed                                          ( +-  2.52% )

時間分析

  • 為了補齊時間的分析,跑了 DATASIZE 從 20000 到 100000

  • 雖然 cache misses 有所改善,但執行時間卻有增加的趨勢

  • 調整了一下 thread_count = 16 ,不知道會不會有比較好的表現

在這個版本,資料的切割仍是以 thread_count 處理 Chen WeiMon, May 15, 2017 3:08 AM

  • cache-misses 些微下降,但執行時間卻神奇地減少

仍不知道為什麼同樣 cache-misses 都有下降,卻會有不同的表現? Chen WeiMon, May 15, 2017 4:14 PM

 Performance counter stats for './mapreduce' (10 runs):

            4,4955      cache-misses              #    2.953 % of all cache refs      ( +- 13.58% )
          152,2247      cache-references                                              ( +-  2.32% )
     273,8929,5990      cycles                                                        ( +-  0.45% )
     216,9467,8206      instructions              #    0.79  insn per cycle           ( +-  0.00% )

       1.033093284 seconds time elapsed                                          ( +-  1.23% )
  • 為了觀察三組實驗的表現,我跑了 DATASIZE 從 20000 ~ 200000 的資料,觀察時間的變化

4 檢討 MapReduce 程式架構

contributed by <chenweiii>, <Lukechin>

回顧上一組對 MapReduce 實驗成果

  • 對論文進行摘要
  • 改進質數演算法
  • 使用 Yastopwatch
  • 使用 LFTPool 取代原本 ThreadPool implementation
    • 使用 mutrace, ThreadSanitizer 觀察原程式
    • 但沒有具體地改善效能
  • 使用 C11 atomic operation 改善原本 LFTPool 某些操作

原始 MapReduce Model







g



threadpool_map

threadpool_map



分割

分割



threadpool_map->分割





threadpool_add (1)

threadpool_add (1)



分割->threadpool_add (1)





threadpool_add (2)

threadpool_add (2)



分割->threadpool_add (2)





threadpool_add (3)

threadpool_add (3)



分割->threadpool_add (3)





map_thread (1)

map_thread (1)



threadpool_add (1)->map_thread (1)





map_thread (2)

map_thread (2)



threadpool_add (2)->map_thread (2)





map_thread (3)

map_thread (3)



threadpool_add (3)->map_thread (3)





routine

routine



map_thread (1)->routine





map_thread (2)->routine





map_thread (3)->routine





return

return



routine->return











g



threadpool_reduce

threadpool_reduce



threadpool_map

threadpool_map



threadpool_reduce->threadpool_map





分割

分割



threadpool_map->分割





threadpool_add (1)

threadpool_add (1)



分割->threadpool_add (1)





threadpool_add (2)

threadpool_add (2)



分割->threadpool_add (2)





threadpool_add (3)

threadpool_add (3)



分割->threadpool_add (3)





map_thread (1)

map_thread (1)



threadpool_add (1)->map_thread (1)





map_thread (2)

map_thread (2)



threadpool_add (2)->map_thread (2)





map_thread (3)

map_thread (3)



threadpool_add (3)->map_thread (3)





reduce_thread (1)

reduce_thread (1)



map_thread (1)->reduce_thread (1)





reduce_thread (2)

reduce_thread (2)



map_thread (2)->reduce_thread (2)





reduce_thread (3)

reduce_thread (3)



map_thread (3)->reduce_thread (3)





reduce

reduce



reduce_thread (1)->reduce





reduce_thread (2)->reduce





reduce_thread (3)->reduce





return

return



reduce->return





存在的問題

  • 切割問題的方式不彈性,threadpool 有多少 thread 便切成多少 map task。
  • 若 thread 做完自身的 map task 便閒置,等待其他 thread 做完。
    • 由於 threadpool 裡再包一層 threadpool_map,就喪失了原本 threadpool 的好處,先做完的 thread 沒辦法再找 map task 或是 reduce task 做。

預計改善項目

  • 改善 threadpool_map 運作模式
    • 目前情況: 直接將問題切割成 M 等份的 task 分配給 M 條 thread 去執行 Map。
    • 欲改善為: 將問題切割成 N 等份的 task,讓 M 條 thread 去執行。
  • 改善 MapReduce 的架構
    • 目前情況: 全部的 Map tasks 結束後,再統一丟給 Reduce tasks 處理。
    • 欲改善為: 建立 Map tasks 與 Reduce tasks 的對應關係,當某一 Reduce task 對應的 Map tasks 皆以處理完,便可直接將 Reduce task 放入 task queue 等待執行。

改善 threadpool_map 運作模式

commit 411f3c7

  • 原本程式碼的寫法,寫死了 task 必須切割成 thread_count 的個數
/* threadpool_map */ for (int i = 0; i < pool->thread_count; i++) { threadpool_error_t _err = threadpool_add(pool, threadpool_map_thread, &map.personal_pointers[i], flags); } /* threadpool_map_thread */ int end = map->size / map->thread_count; int additional_items = map->size - end * map->thread_count; int start = end * id;
  • 新增 threadpool_map 需要傳入的參數: task_num,user 可以指定問題可切成 task_num 的個數
/* threadpool_map */ for (int i = 0; i < task_num; i++) { threadpool_error_t _err = threadpool_add(pool, threadpool_map_thread, &map.personal_pointers[i], flags); } /* threadpool_map_thread */ int id = *(int *) arg; threadpool_map_t *map = (threadpool_map_t *) ((int *) arg - id); int task_num = map->task_num; int end = map->size / task_num; int additional_items = map->size - end * task_num; int start = end * id;

改善 MapReduce 的架構

commit c36f353

  • 將 threadpool_map 與 threadpool_reduce 整理成 mapreduce
  • 改良 reduce 架構,讓 threadpool_reduce_thread 不用再透過 threadpool_map_thread 去執行
  • 當某 map task 完成,master thread 便會將對應的 reduce task 放入 waiting queue

但目前實作上,是一對一的 mapping 關係,暫時想不到多對一的好處跟實作方法。 Chen WeiFri, May 12, 2017 3:37 PM

回應老師討論區問題,這邊指的 mapping 關係是指 map 與 reduce 之間,不是指 reduce task 與 worker thread 之間。 Chen WeiSun, May 14, 2017 11:33 PM

  • 因把 map task 和 reduce task 整合在一個 function,便不再個別測量 map 與 reduce time






g



mapreduce

mapreduce



分割

分割



mapreduce->分割





threadpool_add (1)

threadpool_add (1)



分割->threadpool_add (1)





threadpool_add (2)

threadpool_add (2)



分割->threadpool_add (2)





threadpool_add (3)

threadpool_add (3)



分割->threadpool_add (3)





threadpool_add (4)

threadpool_add (4)



分割->threadpool_add (4)





threadpool_add (5)

threadpool_add (5)



分割->threadpool_add (5)





threadpool_add (6)

threadpool_add (6)



分割->threadpool_add (6)





map_thread (1)

map_thread (1)



threadpool_add (1)->map_thread (1)





map_thread (2)

map_thread (2)



threadpool_add (2)->map_thread (2)





map_thread (3)

map_thread (3)



threadpool_add (3)->map_thread (3)





reduce_thread (1)

reduce_thread (1)



threadpool_add (4)->reduce_thread (1)





reduce_thread (2)

reduce_thread (2)



threadpool_add (5)->reduce_thread (2)





reduce_thread (3)

reduce_thread (3)



threadpool_add (6)->reduce_thread (3)





routine

routine



map_thread (1)->routine





map_thread (2)->routine





map_thread (3)->routine





reduce

reduce



reduce_thread (1)->reduce





reduce_thread (2)->reduce





reduce_thread (3)->reduce





return

return



routine->return





reduce->return





  • mapreduce 以 threadpool_map 為主體,加入 threadpool_reduce 的程式碼
    • 加入 sem_wait 等待其中 map task 完成,若完成則再尋找是哪一個 task 完成
    • 在這邊使用了 map.personal_pointers == -1 當作是否完成的標記,故當 map task 完成時要記得更改
    • 由於改成和 map task 一對一,也要順便修改 reduce 切割問題的模式,以和 map 一致
for (int i = 0; i < task_num; i++) { sem_wait(&map.done_indicator); for (int j = 0; j < task_num; j++) { if (map.personal_pointers[j] == -1) { /* add reduce task j */ threadpool_error_t _err = threadpool_add(pool, threadpool_reduce_thread, &info.personal_pointers[j], flags); map.personal_pointers[j] = -2; break; } } }
  • threadpool_reduce 便可以直接砍掉了,但要記得修改 threadpool_reduce_thread 的參數及裡面的程式碼,符合 threadpool_add 參數的格式
  • 由於最後在等待 reduce_thread 完成,故必須新增一個 info.done_indicator 的 semaphore,等待所有 reduce thread 確實結束

改善結果

  • 實驗環境
    • OS: Ubuntu 16.04.2 LTS (64 bit)
    • CPU: intel i7-6700
    • Cache:
      • L1d cache: 32K
      • L1i cache: 32K
      • L2 cache: 256K
      • L3 cache: 8192K
    • cash alignment: 64B (cache block size)
    • Memory: 32 GB
  • 實驗假設
    • THREAD 數目 = 8
    • TASK QUEUE 大小 = 256
    • TASK NUM = 16
    • 不同 DATASIZE 的執行時間,取樣 30 次作平均
    • 對照組 (orig) 為最一開始 support MapReduce 的版本
  • 在沒有清理 cache 情況下的時間表現
    • 能夠看出明顯的改善
  • 每次執行皆清理 cache 的時間表現 (opt1)
    • 在組員的提醒下,在每次執行 mapreduce 之前清理 cache
    • 結果沒有如上圖漂亮,略顯波折,但在某些區段仍看得出改善成果

5 整合成果

  • 在修改完 MapReduce 架構後,逐一整合下列兩項改善方法之成果
  • 實驗假設如同上一節所述,都有清理 cache 後再進行量測
  • 對照組依然為 orig,最後會在附上綜合的比較圖表

整合 interleaving map 的切割問題方式 (opt2)

整合 is_simple() 改善後的演算法 (opt3)

從 interleaving map 的版本整合

綜合的比較圖表


6 lockfree 加速

暫時不做

7 fault tolerance

暫時不做

8 改善 merge sort

contributed by <Sean1127>,<Lukechin>,<chenweiii>

實驗環境

Architecture:          x86_64
CPU op-mode(s):        32-bit, 64-bit
Byte Order:            Little Endian
CPU(s):                4
On-line CPU(s) list:   0-3
Thread(s) per core:    2
Core(s) per socket:    2
Socket(s):             1
NUMA node(s):          1
Vendor ID:             GenuineIntel
CPU family:            6
Model:                 78
Model name:            Intel(R) Core(TM) i5-6300U CPU @ 2.40GHz
Stepping:              3
CPU MHz:               399.932
CPU max MHz:           3000.0000
CPU min MHz:           400.0000
BogoMIPS:              4992.00
Virtualization:        VT-x
L1d cache:             32K
L1i cache:             32K
L2 cache:              256K
L3 cache:              3072K
NUMA node0 CPU(s):     0-3

內容

將原本 threadpool 替換為我們改良後的 threadpool,並比較其效能。

thread_count = 1

thread_count = 2

thread_count = 4

thread_count = 8

thread_count = 16

thread_count = 32

thread_count = 64

綜合


不同應用案例

<paul5566>
用 merge sort 測試 map reduce 效能

結論

Reference

tags: sysprog