contributed by <TempoJiJi
>
先來測試程式效能跟檢查結果:
$ (for i in {1..8}; do echo $RANDOM; done) | ./sort 4 8
input unsorted data line-by-line
[11562] [17074] [20028] [20439] [20600] [21083] [29465] [32236]
大致上閱讀程式碼後,先來設計自動測試的實驗,方便日後修改程式碼時能夠做對比:
先準備產生亂數的程式intput_generator.c
接着修改main.c,將FIX ME
的部分都改成檔案輸入出
最後在Makefile中加入Bash腳本:
bench:
for i in `seq 100000 10000 500000`; do \
./input_generator $$i; \
for j in 1 2 4 8 16 32 64; do \
./sort $$j $$i; \
done \
done
plot: bench
gnuplot runtime.gp
執行
$ make plot
執行結果:
這個結果有點出乎意料,執行緒越多,時間抖動的越厲害,而且sorting的時間比一個執行緒還來的慢
首先爲了確保每次更改後,sorting後的結果是正確的,我多加了一個CHECK的機制,每次只需利用$ make check
就能確認結果有沒有正確:
check: input_generator
for i in `seq 1 1 100`; do\
./input_generator $$i; \
./sort 4 $$i; \
sort -n input > sorted; \
diff output sorted; \
done
參考老師的解說:「你所不知道的 C 語言」:物件導向篇 ,首先在list_add()
中,它並沒有回傳任何的address(list_t),加上傳入的參數只是副本,因此這裏需要改成回傳一個成功list_add的一個address,直接看code吧(這裏也可以使用pointer to pointer,避免傳入的只是副本),這部分不太清楚需不需要避免重復的值,我就直接連重復的值也加進去了:
list_t *list_add(list_t *e, val_t val)
{
e->next = malloc(sizeof(list_t));
e = e->next;
e->data = val;
e->next = NULL;
return e;
}
還有在tpool_init
時,也需要避免這個問題
接着我把node_t這個結構給刪掉,改成只有一個結構,因爲這樣的改動,所以list_new()
已經不需要了,所有跟size有關的部分都需要更動:
typedef struct _LIST_T {
val_t data;
struct _LIST_T *next;
} list_t;
再來size這個變數已經沒有了,加上list_nth
這個function的名字有點怪怪的,所以我把它改名成getMiddle
也改變了裏面的行爲,就只是單純找出list的中間位置,不需要用到size:
list_t *getMiddle(list_t *list)
{
if(!list)
return list;
list_t *slow = list;
list_t *fast = list;
while(fast->next && fast->next->next){
slow = slow->next;
fast = fast->next->next;
}
return slow;
}
Wow,我沒有想過可以這樣找中間的 element XD Kun-Yi Li
再來是修改merge_sort
跟merge_list
的部分,我把它們都寫的比較簡單了一點,這個應該會比之前的容易理解很多:
list_t *merge_list(list_t *a, list_t *b)
{
list_t *head = malloc(sizeof(list_t));
list_t *cur = head;
while (a && b) {
if(a->data <= b->data){
cur->next = a;
a = a->next;
}
else if(a->data > b->data){
cur->next = b;
b = b->next;
}
cur = cur->next;
}
cur->next = (!a) ? b : a;
return head->next;
}
list_t *merge_sort(list_t *list)
{
if (!list || !list->next)
return list;
list_t *mid = getMiddle(list);
list_t *half = mid->next;
mid->next = NULL;
return merge_list(merge_sort(list), merge_sort(half));
}
排序的部分大致上都處理完畢後,來看一看關於lock contention的問題,觀察cut_func
,其實這個這個部分並不需要lock,因爲各個thread都在處理各自的list,並不會影響到對方,所以我把這個部分的mutex_lock給拿掉了,這樣就減少了很多lock contention的問題:
void cut_func(void *data)
{
list_t *list = (list_t *) data;
if (list && list->next && cut_count <= max_cut) {
cut_count++;
/* cut list */
list_t *mid = getMiddle(list);
list_t *half = mid->next;
mid->next = NULL;
....
....
接着task_run
的部分我將它簡化了一點,不是利用NULL
來終止,而是設立一個變數shutdown
:
static void *task_run(void *data)
{
(void) data;
while (!shutdown) {
task_t *task = tqueue_pop(pool->queue);
if (task)
task->func(task->arg);
}
pthread_exit(NULL);
}
最後在task_run
這個部分其實有點疑問,如果沒有block着會不會一直佔着CPU的資源,所以我就想說沒有task時就block着,空出CPU的資源,有task時才進行,不知道這樣的效率會不會比較好,原本想說要做出來比較看看,可是一直遇到bug…(背景知識還不夠熟悉)
最後查看執行結果:
這裏雖然簡化了程式碼,但結果並沒有好很多,還有部分需要改進,還需要更加了解關於Thread pool跟synchronization object的背景知識才行…
提示: 試著透過 phonebook-concurrent 提到的 SuperMalloc,比照執行結果 jserv
經過老師提點後,就來嘗試看看,在phonebook-concurrent中有提到的SuperMalloc,從SuperMalloc上下載程式碼安裝後,就可以開始使用,只要在SuperMalloc/release
這個路徑底下輸入指令,就能使用:
$ LD_PRELOAD=libsupermalloc_pthread.so 程式路徑
利用SuperMalloc的執行結果:
這裏可以從橫軸中很明顯的看到,最大值只到1.6sec,跟之前的比起來效能快了很多
Original vs SuperMalloc:
由於還在啃論文中,所以還需要一點時間來理解關於SuperMalloc的原理跟行爲
參考資料:
SuperMalloc: A Super Fast Multithreaded Malloc for 64-bit Machines
TempoJiJi
mergesort-concurrent
sysprog21