contributed by <HaoTse
>, <shelly4132
>, <HahaSula
>, <nekoneko
>, <Fzzzz
>, <abba123
>
Misco-distributed computing framework designed for mobile devices
ln -sf ../../scripts/pre-commit.hook .git/hooks/pre-commit
對一些獨立元素組成的概念上的列表的每一個元素進行指定的操作。
例如一個成績列表,有人發現所有學生的成績都被高估了一分,他可以定義一個「減一」的映射函數,用來修正這個錯誤。
事實上,每個元素都是被獨立操作的,而原始列表沒有被更改,因為這裡創建了一個新的列表來保存新的答案,這就是說,Map操作是可以高度並行的,這對高性能要求的應用以及並行計算領域的需求非常有用。
應用案例呢? jserv
對一個列表的元素進行適當的合併。
例如想知道班級的平均分,可以定義一個歸納函數,通過讓列表中的奇數(odd)或偶數(even)元素跟自己的相鄰的元素相加的方式把列表減半,如此遞歸運算直到列表只剩下一個元素,然後用這個元素除以人數,就得到了平均分。
利用特定的 map function 將一組 key/value 轉換成一些 intermediate key/value。
map (in_key, in_value) -> list(out_key, intermediate_value)
而 reduce function 則是將擁有同樣 intermediate key 的 value 合併在一起。
reduce (out_key, list(intermediate_value)) -> list(out_value)
許多現實世界的工作都可以套用這個model。
這邊有一個蠻生動的比喻:http://www.aprilzephyr.com/blog/04242016/我是如何向老婆解釋MapReduce的?(轉)/ gnitnaw
MapReduce提供:
這部份分兩部份,前半段筆者簡述一下在某些使用情境下,需要更高階、抽象的框架去隱藏處理大量資料所面臨到的複雜問題,後半段摘要後面每個章節所要介紹的內容。
以在大量的檔案中尋找每個單字出現的次數為例:
map function 在每一個單字出現時賦予那個單字1,也就是他出現一次,reduce function 則是將每個單字各自出現的次數加在一起。
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
<key, record>
,其中 key 是用 records 產生。identy function
。斜體部份為論文中未提及
喚起 Map 的時候會自動將 input data 分成 M 個 split,分散在多台機器上,這些 splits 可以被不同機器平行地處理。透過使用者自己定義的 partitioning function 可以將 intermediate key 分成 R 個片段,讓 Reduce 分散去處理。
上圖為執行 MapReduce 時整體的流程,以下將透過他們編號去分別解釋:
在一切都完整的結束後,MapReduce 的結果可以在那 R 個 output file 裡面找到,通常使用者不需要將那些結果再合併成一個檔案,而是直接將這些結果再傳入另一個 MapReduce call 裡,或是將他們放入另一個分散式系統裡去處理。
idle
, in-progress
, completed
in-progress
的 reduce tasks 的 worker machine 。因為 MapReduce 是設計來處理大量的資料在數百或數千的機器上,因此他必須能在錯誤發生時好好的處理。
Master 會定期去向 workers 要回應,如果在一定的時間內 worker 沒有回應的話,那個 worker 就會被標示為 failed。而被這些標示為 failed 的 worker 所完成或正在進行的 map 與正在進行的 reduce task 則會被標示為 idle,重新進入排程。
可以定期設置 checkpoint 紀錄 Master 的狀態,當 Master 出問題時便可從上一個 checkpoint 的狀態開始執行。但論文裡提到當只有一個 Master 的時候,幾乎不太會有 Master 出問題的時候,因此他們選擇當 Master failed 的時候直接終止整個 MapReduce。
deterministic function
,則 map/reduce framework 輸出的結果( output )與原本單一循序執行的程式的結果是相同的 。atomic commits
。
每個 in-progress
state 的 task 都會將結果寫入私自的暫存檔( private temporary file ) 。
map task 產生 R 個暫存檔, reduce task 產生 1 個
當 map task 執行完畢時,傳訊息給 master ,其中包含 R 個暫存檔的檔名。當 master 從執行完畢的 map task 收到訊息時,不會儲存傳來的 R 個暫存檔檔名。相反的,則會紀錄檔名。
不了解原文提到的
Otherwise, it records the names of R files in a master data structure.
的otherwise
是指何種情況下。nekoneko
reduce task 執行完畢時,會將暫存檔檔名改為最終的輸出檔名。但是為了防止同時會有多個 reduce task 更改檔名為最終輸出檔檔名,map/reduce 上的檔案系統支援 atomic rename operation
。
non-deterministic function
的情況下,仍可以使整個程式符合一個 weaker semantic
。non-deterministic function
的情況下,reduce task R1 產生的結果等價於另一個 non-deterministic
程式的結果;而 reduce task R2 產生的結果卻等價於另一個不同的 non-deterministic
程式的結果。non-determistic function
傳入相同的值時,可能會得到不同的結果,例如亂數產生的函式 rand()
GFS
(Google File System
) ,減少佔用網路頻寬。netwrok switch
上且擁有副本檔案的 worker machine在 MapReduce 中讓整體執行時間上升的就是「Straggler」,指的就是一台機器花了比預期時間還要長很多的時間去完成剩下的幾個 map 或 reduce task。會造成這樣的情形有很多原因,比如說較差的 disk 可能會讓讀取效率變差,如果這台機器上又剛好有其他 task 在執行,那資源搶奪的情況就會讓 MapReduce 的執行時間變得更久。
在論文中提到他們使用一個機制去處理這樣的情況,當 MapReduce 已經接近完結時,Master 將還正在執行的 task 進行備份,當不管是原本的還是備份的執行完成時,那個 task 就會被標記為已完成。論文的後面也有舉出實際的例子證明這個機制確實能有效提升 MapReduce 的執行時間。
partition function
和傳入的 key 完成。partition function
為 hash(key) mod R
。partition function
更合適的選擇。例如︰當 key 為 URLs ,最後希望能將擁有相同 host name 的 URLs 輸出到同一個檔案, partition function
適合用 hash(Hostname(urlkey)) mod R
。每個 partion 裡的 intermediate pairs 會依據 key 值排序成升冪或降冪
在一些情況下,map function 所產生出來的 intermediate key 具有很高的重複性,且 reduce function 具有交換律與結合律。以前面提到的計算單字出現次數的例子來說,每個 map task 可能會輸出數百數千個 <the, 1>
這樣形式的東西,這些結果會透過網路被送到一個 reduce task 裡面去做總和。可以讓使用者透過自己定義的 combine function 先做部份的結合,再將 map 的結果傳到 reduce,這樣可以有效的提升執行速度。
Section 4 一堆關於refinement的內容怎麼只剩這個? gnitnaw
Map/Reduce Library 提供讀取不同格式資料的 API 和 輸出不同格式資料的 API
Map/Reduce Library 本身有內建幾個 reading interface
, 論文裡舉例以下兩種
text
mode : 從檔案一行一行都資料,產生 key 為該行在檔案的位移量和 value 為該行的內容的 key/value pairs使用者是可以自訂 reading interface
支援讀取新的資料格式(new input format )
reading interface
( reader
) 並沒有限制一定是要從檔案讀取資料,也可以是其他讀取形式,例如︰從資料庫讀取 records
或是程式裡存在記憶體的某個資料結構。
writer
同理,如上面所述。
參考資料
GoogleCloudPlatform/appengine-mapreduce –- 3.4 Readers and Writers
Identity
,不更動 intermediate pair 直接輸出成 output pair 。a. 一般 MapReduce 的情況,總體執行時間為 891 秒。
b. 未始用 backup task 的情況,在 960 秒之後幾乎所有 task 都已經完成了,剩下 5 個 reduce task 還沒做完,一直到 1283 秒之後才結束整個 MapReduce,比一般情況下的時間多了 44%。
c. machine failure 的情況,總體執行時間為 933 秒,因為那些被 killed 的工作很快地就被重新執行所以執行時間只比一般情況上升了 5%。
該篇論文有無不了解的地方?可以列出來,雖然我不一定能解答,不過有人看就能集思廣益。gnitnaw
threadpool.[ch]
threadpool.h
中定義了 MAX_THREADS
,又在 threadpool.c
中定義 THREADS_MAX
,不太了解兩者的作用差別。MAX_THREADS
只有在 threadpool.c
裡的 threadpool_create
函式使用到,是用來限制輸入的 thread_count
要介於 0 到 MAX_THREADS
之間。THREADS_MAX
是用來定義 threadpool_map_t
的成員 personal_pointers
的陣列長度和 reduce_t_internal
的成員 elements
的陣列長度。sem_wait
block 住的執行緒,叫醒的執行緒會重新鎖上 semaphore。還有一個 function 是 sem_trywait() 可能也要研究一下,他能讓你的 semaphore 更有彈性。Yen-Kuan Wu
一個建議:不要假設這個程式的作者一開始就知道自己在寫啥,你如果覺得他的作法莫名其妙就勇敢把它改掉。
有時候老師會故意挑有很大改進空間的code給你們玩。gnitnaw
test/*
test裡面的4個程式都有memory leak的問題,請用valgrind檢查。gnitnaw
感謝提醒,已修正,請見該次 commit
另外對於我有找到 stackoverflow上的 memory leak 的討論,除了mapreduce.c沒有正確釋放需修正外,其他的都屬 reachablefzzz
thrdtest.c
看起來是普通檢視 thread_pool 運作正不正常,task 中只做簡單的 done++
紀錄做了幾次。
$ ./thrdtest
Pool started with 32 threads and queue size of 256
Added 264 tasks
Did 185 tasks
如果將
while((tasks / 2) > done) {
usleep(10000);
}
拿掉的話就會發現每次 did 的數量都只有 32,也就是 thread 的數量,這裡可以發現 threadpool_destroy
不會等 queue 的完成。
thrdtest.c 的第33行,有需要使用到lock? 變數task都只有在main函式呼叫
while(threadpool_add(pool, &dummy_task, NULL, 0) == 0) { pthread_mutex_lock(&lock); tasks++; pthread_mutex_unlock(&lock); }nekoneko
是的,拿掉後用 ThreadSanitizer 檢查沒有出現 data race 的訊息!
賴劭芊你source code裏面沒更新喔。
還有本程式裡唯一一個被thread更改的變數為done,此變數為計數器性質,
為何不考慮用原子操作(atomic operation)取代mutex exclusion?
原子操作快很多,不過函式中usleep那麼久你大概也看不出來有快多少。gnitnaw
shutdown.c
檢測下列兩者是否正常運作。
threadpool_destroy(pool, 0)
threadpool_destroy(pool, threadpool_graceful)
heavy.c
開了64個 thread pool 每個 pool 的 queue 的 size 是8192,應該只是在測試 thread 的極限。
mapreduce.c
計算小於 DATASIZE 所有質數的總和。
mapreduce.c (is_simple)
紀錄所有小於 DATASIZE 的質數於 data 裡
void is_simple(int n, void *_data)
{
int *data = (int *) _data;
int x = data[n];
data[n] = 0;
if (x < 2) return;
for (int i = 2; i < x; i++)
if (x % i == 0) return;
data[n] = x;
}
mapreduce.c (my_reduce)
將 left 與 right 的值相加後存到 left 裡
void my_reduce(void *self, void *left, void *right)
{
*((int *) left) += *((int *) right);
}
threadpool.c (threadpool_map_thread(void *arg))
int id = *(int *) arg;
threadpool_map_t *map = (threadpool_map_t *) ((int *) arg - id);
參照 mapreduce.c
中的 threadpool_map_t
typedef struct {
int personal_pointers[THREADS_MAX];
void(*routine)(int n, void *);
void *arg;
int size;
int thread_count;
sem_t done_indicator;
} threadpool_map_t;
threadpool_map_t.personal_pointers[i]
((int *) arg - id)
表示 threadpool_map_t.personal_pointers[i - id]
的位置,也就是 threadpool_map_t.personal_pointers[0]
的位置 ( struct 的起始位置)。threadpool.c (threadpool_reduce)
char *
,這裡參考 Pointer subtraction confusion 才了解 pointer 相減的真正意義,真慚愧。在論文的 Fault Tolerance 中曾提及失敗的 map task 仍需被重新放進 queue 裡等待重新執行,而在原本的程式碼中卻直接 return error,經過測試將 QUEUE
設到 5 以下程式就會死掉,因此在這裡判斷 _err
是否為 threadpool_queue_full
的情況,是的話就讓這個 task 等到 queue 有位置時重新 add。
將 queue 的大小設成 256 與 2 時的時間比較,可以發現 queue size 比較小並沒有讓時間變的比較長。
底下是 gprof 的結果,可看出變化不大
queue size = 256:
Each sample counts as 0.01 seconds.
% cumulative self self total
time seconds seconds calls ns/call ns/call name
95.18 0.38 0.38 1455810 261.52 261.52 is_simple
5.01 0.40 0.02 threadpool_reduce_thread
0.00 0.40 0.00 186523 0.00 0.00 my_reduce
0.00 0.40 0.00 8 0.00 0.00 my_free
0.00 0.40 0.00 6 0.00 0.00 my_alloc_neutral
0.00 0.40 0.00 1 0.00 0.00 my_finish
queue size = 2:
Each sample counts as 0.01 seconds.
% cumulative self self total
time seconds seconds calls ns/call ns/call name
91.08 0.40 0.40 1328197 301.74 301.74 is_simple
2.28 0.41 0.01 273410 36.65 36.65 my_reduce
2.28 0.42 0.01 main
2.28 0.43 0.01 threadpool_map_thread
2.28 0.44 0.01 threadpool_reduce_thread
0.00 0.44 0.00 8 0.00 0.00 my_free
0.00 0.44 0.00 7 0.00 0.00 my_alloc_neutral
0.00 0.44 0.00 1 0.00 0.00 my_finish
threadpool.c
的 threadpool_create
函式中,判斷傳入的 thread_count
和 queue_size
的 if 敘述可以再精簡
goto err
,因為還未呼叫到 pool = (threadpool_t *)malloc(sizeof(threadpool_t)
,不用處理 pool
記憶體釋放。
if(thread_count <= 0 || thread_count > MAX_THREADS || queue_size <= 0 || queue_size > MAX_QUEUE) {
return NULL;
}
if(thread_count <= 0 || thread_count > THREADS_MAX) {
goto err;
}
if(thread_count <= 0 || thread_count > MAX_THREADS \
|| thread_count > THREADS_MAX \
|| queue_size <= 0 || queue_size > MAX_QUEUE) {
return NULL;
}
對整個程式稍微做了一點調整,在閱讀原本程式碼的時候,發現到這裡真的很奇怪
int err = threadpool_map(pool, info.thread_count, threadpool_reduce_thread, &info, 0);
if (err) return err;
for (int i = 1; i < info.thread_count; i++) {
info.reduce_data->reduce(info.reduce_data->additional,
info.elements[0], info.elements[i]);
info.reduce_data->reduce_free(info.reduce_data->additional, info.elements[i]);
}
info.reduce_data->reduce_finish(info.reduce_data->additional, info.elements[0]);
info.reduce_data->reduce_free(info.reduce_data->additional, info.elements[0]);
threadpool_merge_reduce(&info);
return 0;
}
出現了兩次的 reduce ,而且兩個 reduce 都是同一種行為
雖然這不影響最後結果及程式撰寫,但對於使用這份程式的人來說,整體的彈性會因為卡在 true reduce 完後的 merge 仍然和原本的 reduce 共用 function 而導致難已撰寫。所以將 for 迴圈裡進行 merge 動作的部分獨立出來,並將原本沒有特別用處的 reduce_finish 重新調用給他。(merge 和 reduce 兩者相似,但不是相等,應該獨立出來處理。)
static void threadpool_merge_reduce(reduce_t_internal *info)
{
// merge all reduce result into global one(in master)
for (int i = 0; i < info->thread_count; i++) {
info->reduce_data->reduce_finish(info->reduce_data->additional,
info->reduce_data->result , info->elements[i]);
info->reduce_data->reduce_free(info->reduce_data->additional, info->elements[i]);
}
}
在新的 static function 裡面,不再使用 reduce ,而是改為使用 reduce_finish
本次修正請見 commit
另外又發現到,在 test/mapreduce.c 中,如果 threadpool_reduce_t 裡的 function 給 NULL 的話,會造成 core dump ,所以另外增設一個階段為 threadpool_mapreduce_setup,用來設置 mapreduce 會用到的 user-defined function 或 specification,並在裡面做好 error handling,讓使用者可以在外部呼叫的時候知道自己漏了哪邊。
另外增加 mapreduce_err_t 的 enumeration
typedef enum{
reduce_reduce_not_set = -1,
reduce_alloc_not_set = -2,
reduce_free_not_set = -3,
reduce_finish_not_set = -4
} mapreduce_err_t;
這部分如果以後有新增 map 階段的 error handling 的話需要重新新增
原本程式執行順序為
改為
最後修正命名問題,並額外在 threadpool_reduce_t 裡新增 void *result ,讓原本結果都是輸出在 elements[0],改為使用者自定變數去獲取結果值。
修正後的 struct threadpool_reduce_t
typedef struct {
void (*reduce)(void *addional, void *result, void *data);
void (*reduce_finish)(void *additional, void *result , void *local);
void *(*reduce_alloc_neutral)(void *additional);
void (*reduce_free)(void *additional, void *node);
int object_size;
void *begin;
void *end;
void *result;
void *additional;
} threadpool_reduce_t;
主要更動為更改易混淆的命名,像是原本 reduce function 中的 left , right , self,這幾個因為操作上的不同,而重新命名成其他更容易懂得變數名稱。
修正後的版本的 example commit,移除 mapreduce.c 以避免命名上的誤會 (未來仍可能將 mapreduce 額外拉出做成一份檔,所以不該在 test 裡面使用容易混淆的命名)
想要先將 CPU user time 降下來,因此先對質數演算法 mapreduce.c(is_simple)
做改進,原本的 for loop 的時間複雜度是 O(n),因為一直呼叫它,所以先來改進它的效能。
將 is_simple
for loop 部份改為
if (x > 2 && x % 2 == 0) return;
for (int i = 3; i * i <= x; i += 2)
if (x % i == 0) return;
減少 for loop iteration 的次數。
參考老師的建議,將判斷式使用 bitwise operator 簡化,如下
if ((x & ~3) && !(x & 1)) return;
如何將判斷式更加簡化和 for loop 的簡化還在想鄭皓澤
(x & ~3) && !(x & 1)
仍可化簡為 bitwise 操作 jserv
接著可以發現
$ make test
map : 0.000554
reduce : 0.000251
$ time ./tests/mapreduce
./tests/mapreduce 0.00s user 0.00s system 37% cpu 0.011 total
map 的時間下降了,執行時間也下降了。
The API contains addtional unused 'flags' parameters that would allow some additional options:
對照 mutrace 和 ThreadSanitizer 的輸出,先找到 lock contention 和潛在的執行緒操作議題 jserv
利用 clock_gettime
去計算 map 跟 reduce 的時間
map : 0.044008 sec
reduce : 0.000240 sec
Yastopwatch 很好用,只是原本的程式碼不是用
clock_gettime
,需要改寫再來整合 jserv精度應該夠?
get_usec
是gettimeofday
, 還有一個get_tsc
是直接用rdtsc
,DEFINE_TSC_CLOCK
應該很夠用了 louielu
利用 time
指令觀察程式執行時間
$ time ./tests/mapreduce
./tests/mapreduce 0.18s user 0.00s system 307% cpu 0.058 total
因為使用 thread 的關係,可以發現 CPU 使用率高於 100%,並且 CPU user time 大於 system time,詳細可以參考 Homework1 (compute-pi)。
看到老師的建議後,研究了如何使用 Yastopwatch ,並將其實做方式從 gettimeoftoday()
改為 clock_gettime()
先使用上一個作業學到的工具 mutrace
來觀察程式。
mutrace: Showing statistics for process mapreduce (pid 11976).
mutrace: 1 mutexes used.
Mutex #0 (0x0x1723300) first referenced by:
/usr/lib/mutrace/libmutrace.so(pthread_mutex_init+0xf2) [0x7fedd58f34b2]
./tests/mapreduce(threadpool_create+0x10a) [0x402055]
./tests/mapreduce(main+0x3b) [0x4017f8]
/lib/x86_64-linux-gnu/libc.so.6(__libc_start_main+0xf0) [0x7fedd532a830]
mutrace: Showing 1 most contended mutexes:
Mutex # Locked Changed Cont. tot.Time[ms] avg.Time[ms] max.Time[ms] Flags
0 56 42 2 0.138 0.002 0.073 M-.--.
||||||
/|||||
Object: M = Mutex, W = RWLock /||||
State: x = dead, ! = inconsistent /|||
Use: R = used in realtime thread /||
Mutex Type: r = RECURSIVE, e = ERRRORCHECK, a = ADAPTIVE /|
Mutex Protocol: i = INHERIT, p = PROTECT /
RWLock Kind: r = PREFER_READER, w = PREFER_WRITER, W = PREFER_WRITER_NONREC
mutrace: Note that the flags column R is only valid in --track-rt mode!
mutrace: Total runtime is 6.375 ms.
mutrace: Results for SMP with 4 processors.
用來察看 data race 的工具
先安裝 llvm 還有 clang
$ sudo apt-get install llvm
$ sudo apt-get install clang
在 Makefile
中使用 clang
編譯,並加上 -fsanitize=thread -g -O1
,結果顯示4個測試檔只有 thrdtest
有 data race 的問題。
$ make test
Execute tests/thrdtest...
Pool started with 32 threads and queue size of 256
Added 288 tasks
==================
WARNING: ThreadSanitizer: data race (pid=15153)
Write of size 4 at 0x0000014ade3c by thread T1 (mutexes: write M0):
#0 dummy_task /home/chien/mapreduce/tests/thrdtest.c:17 (thrdtest+0x0000004a2059)
#1 threadpool_thread threadpool.c (thrdtest+0x0000004a2da2)
Previous read of size 4 at 0x0000014ade3c by main thread:
#0 main /home/chien/mapreduce/tests/thrdtest.c:39 (thrdtest+0x0000004a21b6)
As if synchronized via sleep:
#0 usleep <null> (thrdtest+0x000000433852)
#1 dummy_task /home/chien/mapreduce/tests/thrdtest.c:15 (thrdtest+0x0000004a2037)
#2 threadpool_thread threadpool.c (thrdtest+0x0000004a2da2)
Location is global '<null>' at 0x000000000000 (thrdtest+0x0000014ade3c)
Mutex M0 (0x0000014ade40) created at:
#0 pthread_mutex_init <null> (thrdtest+0x00000043db35)
#1 main /home/chien/mapreduce/tests/thrdtest.c:25 (thrdtest+0x0000004a2092)
Thread T1 (tid=15155, running) created by main thread at:
#0 pthread_create <null> (thrdtest+0x000000422336)
#1 threadpool_create <null> (thrdtest+0x0000004a23dc)
#2 __libc_start_main /build/glibc-GKVZIf/glibc-2.23/csu/../csu/libc-start.c:291 (libc.so.6+0x00000002082f)
SUMMARY: ThreadSanitizer: data race /home/chien/mapreduce/tests/thrdtest.c:17 in dummy_task
根據 ThreadSanitizer 的訊息,出現 race condition 的應該是 thrdtest.c
裡 data 這個變數,嘗試做了以下修改:
原本:
while((tasks / 2) > done) {
usleep(10000);
}
修改後:
將取得 done 值的部分放在 crtical section 裡
pthread_mutex_lock(&lock);
tmp_done = done;
pthread_mutex_unlock(&lock);
while ((tasks / 2) > tmp_done) {
usleep(10000);
pthread_mutex_lock(&lock);
tmp_done = done;
pthread_mutex_unlock(&lock);
}
參考 LFTPool
tpool_init
後,做完一連串的設定後,會呼叫 spawn_new_thread
,在 spawn_new_thread
會做 pthread_create
的動作,接下來進入 wait_for_thread_registration
等待所有 thread create 完成。tpool_thread
後,其他 thread 首先會對 global_num_thread 加 1,以讓 main thread 可以知道是否所有的 thread 都已經 create 完。get_work_concurrently
拿 work,否則就 pthread_exit
。__sync_* 、signal 與 ring-buffer 的操作後面會解釋鄭皓澤
tpool_add_work
這裡 add work 的機制為,首先先根據任務排程演算法去取得相對應的 thread,這裡實作的方法有 Round-Robin 和 Least-Load 演算法,Round-Robin 即輪詢式地分配工作,Least-Load 即選擇目前具有最少工作的 worker thread 放入。
並在 dispatch_work2thread
中加入 work,並且判斷如果只有這個新加的 work 就直接叫醒 thread。
signal 的使用方法可見 man page,首先在 tpool_init
中初始化 SIGUSR1 的處理函式為 sig_do_nothing
,並在 tpool_thread
中以以下方法達到 conditional variable 的作用。
sigemptyset(&zeromask);
sigemptyset(&newmask);
sigaddset(&newmask, SIGXX);
sigprocmask(SIG_BLOCK, &newmask, &oldmask) ;
while (!CONDITION)
sigsuspend(&zeromask);
sigprocmask(SIG_SETMASK, &oldmask, NULL);
sigsuspend 會讓呼叫的 thread 睡著,參考資料鄭皓澤
跟 process 一樣,每個 thread 都有一個 signal mask,用來指定那些非同步 signal 會被 thread 處理,稱為 unblocked signals,這裡就是用這個原理來運作的,而送出 signal 的函式為 pthread_kill(pthread_t thread, int sig)
。
pthread_kill 的作用並不是 kill thread
gcc 從 4.1.2 開始提供了 __sync_*
系列的 built-in 函式,程式碼中有多處使用到,例如 __sync_val_compare_and_swap
、__sync_bool_compare_and_swap
、__sync_fetch_and_add
等等。
此類 api 稱為 Built-in functions for atomic memory access,atomic operation 是一種不會被中斷的操作,透過他來對共享變數做修改,可以減少 mutex 的使用,達到 lock-free 的目標,進而提升效率。
這句話不精確,不要搞錯 atomic operation 和 mutex 的行為!請重新描述 jserv
抱歉,語意表達上不好,參考 Is there any difference between “mutex” and “atomic operation”? 後修正了。鄭皓澤
詳細使用方法請見參考資料。
你前面說要用clang結果這邊用gcc?雖然clang可能多少support,不過現在有現成的C11 atomic operation可以用阿。gnitnaw
這邊是研究LFTPool的過程,研究過程我也有注意到 C11 已經有 atomic operation 可以用,計畫之後用 C11 改寫 LFTPool。鄭皓澤
環狀佇列長度為 2 的整數次方,out 和 in 下標一直遞增至越界後迴轉,其類型為 unsigned int,即 out 指標一直追趕 in 指標,out 和 in 映射至 FiFo 的對應下標處,其間的元素即為隊列元素。
LFTPool 中還有實作
tpool_inc_threads
、tpool_dec_threads
、balance_thread_load
可以達到 lazy initional 的目標,待之後深入研究。鄭皓澤
先初步的將 LFTPool 結合原本的 threadpool.[ch] 後,結果不如預期。遇到以下幾點問題
LFTPool 是個效率較差的 lock-free thread pool,裡頭用到 UNIX signal,後者是很大的效能開銷,應該把 LFTPool 看作「lock-free 的嘗試」,降低 lock contention 但引入更高成本的系統呼叫。關鍵是避開 contention,這部份要善用 atomics jserv
tests/heavy.c
跑不動,程式跑到這裡我的電腦會直接當機,目前還在尋找問題。
嘗試後發現好像是我的電腦記憶體不足,試過幾個數字後發現我的電腦只能跑 QUEUES = 4,建議先將 threadpool.c 中的 WORK_QUEUE_POWER 設小一點鄭皓澤
heavy.c 的話我的是 Segmentation fault (core dumped) fzzzz
這裡研究後還是覺得 SIZE 跟 QUEUE 的質不能定義的太大,會 core dumped 的原因應該是在於 SIZE 太大,導致工作無法消耗,這部分還需要去改善 lock free 版本中對於 queue full 等等情況的 error handle鄭皓澤
tests/mapreduce.c
的運行結果不如預期,在 #define DATASIZE (2000000)
的情況下,執行時間如下
[map] Total time: 0.230074
[is_simple] Total time: 0.776775
[reduce] Total time: 0.012306
原始的版本執行時間如下
[map] Total time: 0.210654
[is_simple] Total time: 0.708091
[reduce] Total time: 0.009638
可見不但沒有精進反而還有變慢的跡象。
各執行 100 次的比較結果如下圖。
但使用 mutrace
觀察下,mapreduce.c
中的 mutex 確實消失了。
is_simple會拖比較長多半是因為當is_simple要判斷比較大的n是否為質數時,它得try過每個單數(3,5,7…)。
有個方法可以嘗試(不確定有用,可能在DATASIZE小的時候反而更糟):當你用is_simple判斷出某數是質數時,順手把此質數的倍數都mark成0。然後is_simple遇到已經為0的數就直接return。不過不保證會比較好,要是我直接在threadpool_map前把很簡單就能確認的mark掉。
不過話說回來,你們使用mapReduce找質數的動機是啥?應該是讓一堆Core可以一起計算分擔工作讓計算時間縮短,可以比用一個Core算快很多。
可是從你們的code看起來,分配工作(map)耗的成本就比工作本身耗的成本(is_simple)多了。
換句話說,你拜託別人工作,結果你得花更多時間把工作分配給他,那你不如自己做算了,還省時省力。
所以你們該做的,是先讓分配工作的成本<完成被分配工作的成本。
提示:每個queue不一定只能處理一個數,你也不是只能用一個map/reduce(分配工作這種事本來就可以分批做),你處理一個數可以順便把相關的數處理掉,後面就不用重複同樣的動作了。gnitnaw
會用 mapreduce 來找質數的原因只是僅僅因為原本的 example 是用他來做實驗,這部份的 code 與想法的確問題還有很多,我們似乎執著於找質數這部份的效能太久了,的確是應該先從根本的 map 與 reduce 改善著手,撰寫 sort 的部份時也發現我們原本的 mapreduce 問題很多,分配工作的部份的確是很耗時,會儘快修正,謝謝提示。鄭皓澤那個is_simple找質數的函式本身就是很笨的方法(老師應該是故意的),google一下應該可以發現快很多的。
還有你對於is_simple的計時有點問題,有些queue沒有計入,請修正。
對於不同DATASIZE的計算時間記得也做一下(請順便解決DATASIZE太大時overflow的問題)。
gnitnaw
使用 C11 內建的 atomic operation 取代 LETPool 中使用的 GCC 提供的 __sync_*
built-in function。
使用方法可見 cppreference.com。
可使用 C11 提供的 atomic operatihttps 去解決 tests/*.c 中的 mutex。鄭皓澤
Terasort 是 yahoo 在 hadoop 上實作的一個排序,在 2008 年 Hadoop 1TB 排序基準評估中獲得第1名。
為了提高 reduce 的並行度,在 map 做了 sample 和 partion
論文參考方向
- Yahoo Terasort
- 網友整理 yahoo terasort 的論文,其中還參考了 yahoo developer – hadoop tutorial 的介紹
- TeraSort SlideShare page 11
- " two level trie "
- 了解 trie tree 資料結構
- trie
- Yahoo Hadoop – MapReduce
- Other Reference
使用自訂的 map
, reduce
, reduce_alloc_neutral
來實做 sort
目前使用原有 有 lock 的版本 實做
參考
觀察 threadpool.[ch]
.reduce_finish
和 變數 .self
將 elements[0] 的結果取出由程式碼的觀察
llist_t *
的陣列,每個元素都是有一個 node 的 link list objectllist_t **
, 是為了保持 reduce(void *self, void *left, void *right)
中傳入 left 和 right 的 type 一致先不說結果,請問使用linked list或array對於效能上會有啥差異?不理解為啥你們要用linked list。gnitnaw
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 4
On-line CPU(s) list: 0-3
Thread(s) per core: 2
Core(s) per socket: 2
Socket(s): 1
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 37
Model name: Intel(R) Core(TM) i3 CPU M 330 @ 2.13GHz
Stepping: 2
CPU MHz: 1066.000
CPU max MHz: 2133.0000
CPU min MHz: 933.0000
BogoMIPS: 4255.69
Virtualization: VT-x
L1d cache: 32K
L1i cache: 32K
L2 cache: 256K
L3 cache: 3072K
NUMA node0 CPU(s): 0-3
8 條 threads 和 256 長的 task queue
[map] Total time: 0.000191
[reduce] Total time: 0.043792
[map] Total time: 0.000797
[reduce] Total time: 4.488939
threadpool_map
)threadpool_reduce
)這部份只有先做圖出來,目前還沒想到如何藉由其他效能測試工具,解釋影響執行時間的關鍵效能
nekoneko這邊應該要加上一個 base_line 的 single thread program 做比較,看看使用 map/reduce framework 實做是否真的有加快。
nekoneko
reduce time的y軸可以改用log scale,方便看。還有請附上實驗環境(測試用電腦的資料)
void my_map 是空的?那你map time計時有啥意義?gnitnaw以補上測試環境,謝謝提醒!nekoneko
關於 map time 計時的部份,的確原本是有思考圖表的意義。目前我認為測試 empty 的 my_map 函式的圖表,可以作為思考 threadpool_map 本身的 overhead (因為 my_map 為空的)。否則,如 gnitnaw 大所說的一樣,其實是沒有意義的。nekoneko
MapReduce: Simplified Data Processing on Large Clusters(內含pdf與slides連結)
Misco-distributed computing framework designed for mobile devices
source code:
a clean version of script: http://www.cs.ucr.edu/~jdou/misco/misco_clean.tar.gz
A more complicated (includes testing framework and advanced schedulers) is here:
http://www.cs.ucr.edu/~jdou/misco/misco_exp.tar.gz
from Jdou
HaoTse
shelly4132
HahaSula
nekoneko
Fzzzz
abba123
team7
sysprog21
mapreduce
LFTPool
TeraSort