# 2017q1 team1 mergesort-concurrent contributed by <`zmke`、`petermouse`、 `tina0405`、`heathcliffYang`> ## 檢討之前的程式碼 ### 從原版來想 在原始的版本中,會將輸入的 list 分割給每個 thread 去做 merge ,而最後 thread 之間 list 的 merge 則是一個不能並行執行的部分,無法隨著 thread 數增加而提高執行效率,是增加 scalability 會遇到的瓶頸。 ![](https://i.imgur.com/aW4KeUJ.png) 這讓我們想到一個在計組教過的 [Amdahl's law](https://en.wikipedia.org/wiki/Amdahl%27s_law) ![](https://i.imgur.com/KZDdaid.png) 不能並行的部分則讓核心數增加帶來的加速效果存在極限。 ![](https://i.imgur.com/o9AVbcR.png) 所以我們把其中一個目標放在 thread 之間 list 的 merge 也要讓它可以並行處理,實作方法在下面的部分會詳細介紹。 另外,在與新夥伴溝通時,發現他們參考[ Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms](https://www.research.ibm.com/people/m/michael/podc-1996.pdf) 嘗試過 lock-free threadpool。 1. tqueue_pop() ```c= task_t *tqueue_pop(tqueue_t * const the_queue) { task_t *head = NULL, *tail = NULL, *next = NULL, *ret = NULL; while(1) { head = the_queue->head; tail = the_queue->tail; next = head->next; if(head == the_queue->head) { if(head == tail) { if(next == NULL) { return NULL; } atomic_compare_exchange_weak(&(the_queue->tail), &tail, next); } else { ret = next; if(atomic_compare_exchange_weak(&(the_queue->head), &head, next)) { break; } } } } atomic_fetch_sub(&(the_queue->size), 1); atomic_fetch_add(&(the_queue->num_of_consumed), 1); return ret; } ``` 2. tqueue_push() ```c= int tqueue_push(tqueue_t * const the_queue, task_t *task) { task_t *tail = NULL, *tail_next = NULL; task->next = NULL; while(1) { tail = the_queue->tail; tail_next = tail->next; if(tail == the_queue->tail) { if(tail_next == NULL) { if(atomic_compare_exchange_weak(&(tail->next), &tail_next, task)) { if(atomic_compare_exchange_weak(&(the_queue->tail), &tail, task)) { } break; } } else { if(atomic_compare_exchange_weak(&(the_queue->tail), &tail, tail_next)) { } } } } atomic_fetch_add(&(the_queue->size), 1); return 0; } ``` 3. 論文中沒能實踐的部分 中介的資料結構,這個資料結構去指向真正的 node (node_t) 以及存放一個變數名稱 count 代表 reference counting 的次數,可以用來避免 ABA 問題 ```c= structure pointer_t { ptr: pointer to node_t, count: unsigned integer } ``` ### 實驗結果 以下 2 、 3 版本都有包含 lock-free threadpool,且都是以 `./sort 16 test_data/input.txt` 進行測驗。 1. Original 原版在最後 thread 之間的 list 做 merge 時,因為工作不能多執行緒一起執行,所以其他 thread 會持續要求工作直到結束。 ![](https://i.imgur.com/zv6d9NS.png) 2. Condition variable 這個版本是 lock-free thread pool 與 condition variable 一起的實作,而 condition variable 則會在 monitor 的部分做介紹。 因為 condition variable 的關係,當 queue 裡面沒有工作時,空閒的 thread 不會佔用到 CPU 的時間,`tqueue_pop` 的次數有減少,但花費時間卻增加了。 ![](https://i.imgur.com/FkibzUW.png) 此外,lock-free thread pool 會導致在 thread 多時,跑不完。 3. Condition variable + merge concurrent 這個版本是 lock-free thread pool 與 condition variable ,再加上 thread 之間的 list 做 merge 的工作是 concurrent。 ![](https://i.imgur.com/kk39cru.png) `tqueue_pop` 的次數減少一半,後期 merge 的工作已可以允許多個 thread 一起執行;其中工作可以容納的 thread 數有限制: `arg->task_cnt = arg->source->size / local_size * 2` ,這部分需要再思考如何訂定最有效益的辦法;這是初版的 concurrent merge ,改變了所有 list 的資料結構,同時也可以看到 list_search 與 list_insert 有相當的成本,也是一個可以考慮改進的方向。 * 三種實作的效能比較 三種方法都在 thread 數為 4 時表現較好,而無可避免地都在 thread 數繼續往上增加時,讓花費時間變多,其中加入 concurrent merge 之後卻不盡理想的原因在之後的部份會詳細說明。 ![](https://i.imgur.com/WnnZJAj.png) >> 這張圖忘記改 label 了 * 另外的問題 其實每次測試,時間的浮動非常大,所以測試時應該要多次量測、使用統計的方法來取得較真實的實驗數據。 ## Monitor * 讓 thread 可以保持 mutual exclution 並等待直到特定的 condition 成立 * monitor 包含 mutex 和 conditional variable * `pthread_cond_wait(cond, mutex)` * release mutex * block on the condition variable * sleep the thread * 當被其他 thread 透過 signal() 或 broadcast() 喚醒之後,會再獲得 mutex * `pthread_cond_signal(cond)` * unblock threads blocked on a conditional variable * 至少喚醒一條 thread ,就算沒有 thread 在等待也會成功 return * 如果有多條 thread 同時在等待同一個 conditional variable ,會由排程的優先順序決定哪條 thread 優先被喚醒 * 在 multi processor 的環境下,當 condition variable 同時被不同 processor signal 時,可能會喚醒多條 thread * `pthread_cond_broadcast(cond)` * 喚醒所有在等待該 condtional variable 的 thread * 當 thread 被喚醒之後,應該要重新評估是否該繼續執行或是再等待一次,所以 condition wait 常被實作在 while loop 內,來確保執行的安全性 ### Monitor 實作 原本的程式在 task queue 沒東西的時候,也會一直嘗試 tqueue_pop() ,造成 CPU time 的浪費,尤其是在 thread 數量多的時候影響會更大 我們想要利用 condition variable 讓空閒的 thread 在 task queue 裡面沒有東西的時候 sleep ,等到有 task push 到 task queue 裡面之後,再喚醒他們起來要工作,希望減少 pop 空的 task queue 造成的浪費 #### task_run() 一開始就拿 mutex ,之後檢查 task queue 內是否是空的,如果是空的就等待並釋放 mutex ,被喚醒之後會拿到 mutex 但是 pool->queue->size 可能已經被改變,所以要再檢查一次,如果沒問題就釋放 mutex ,否則就是再等待一次 ```clike= static void *task_run(void *data __attribute__ ((__unused__))) { while (1) { /*Get the mutex lock*/ pthread_mutex_lock(&(pool->queue->mutex)); /*Check wether the number of task in the thread queue is zero*/ /*If the task queue is empty, block the thread and release the mutex lock*/ while(pool->queue->size == 0) pthread_cond_wait(&(pool->queue->cond), &(pool->queue->mutex)); /*When the conditional variable is signaled, wake up the thread and get the mutex lock*/ /*Release the mutex lock*/ pthread_mutex_unlock(&(pool->queue->mutex)); task_t *_task = tqueue_pop(pool->queue); if (_task) { if (!_task->func) { tqueue_push(pool->queue, task_new(NULL, NULL)); free(_task); break; } else { _task->func(_task->arg); free(_task); } } } pthread_exit(NULL); } ``` #### tqueue_push() 在 push 的最後加上 pthread_cond_signal() ,藉此通知被 block 的 thread 已經有工作可以拿 ```clike= int tqueue_push(tqueue_t * const the_queue, task_t *task) { task_t *tail = NULL, *tail_next = NULL; task->next = NULL; while(1) { tail = the_queue->tail; tail_next = tail->next; if(tail == the_queue->tail) { if(tail_next == NULL) { if(atomic_compare_exchange_weak(&(tail->next), &tail_next, task)) { if(atomic_compare_exchange_weak(&(the_queue->tail), &tail, task)) { } break; } } else { if(atomic_compare_exchange_weak(&(the_queue->tail), &tail, tail_next)) { } } } } atomic_fetch_add(&(the_queue->size), 1); /*Signal the condtional variable*/ pthread_cond_signal(&(the_queue->cond)); return 0; } ``` ### 實驗結果 ![](https://i.imgur.com/vmdpuSo.png) ## 論文閱讀 [Lock-free linkedlist implementation of Harris' algorithm](https://www.cl.cam.ac.uk/research/srg/netos/papers/2001-caslists.pdf) * 原本的數字串如下: ![](https://i.imgur.com/4CT1jNs.png) * 如果想要 insert Node_20 , 我們會這樣做: ![](https://i.imgur.com/USDNKRB.png) * 如果想要 delete Node_10 , 我們會這樣做: ![](https://i.imgur.com/oqEMsns.png) * 但如果現在 insert Node_20 和 delete Node_10 同時發生,就有可能出現以下情況: ![](https://i.imgur.com/yybhpPk.png) * 但我們不樂見,因為我們希望他會走訪 Node_20 * 解決方法: * 步驟1:左邊的圖是先將刪除點 mark 起來 * 步驟2:右邊的圖是真的刪掉 node_10 ![](https://i.imgur.com/R9nrnba.png) * logically deleted : 完成步驟1 * phisically deleted : 完成步驟2 * 雖然這邊沒有將 node_10 的 right node 斷掉,但整個list 的運行並不走訪 node_10 * 好處: * 被標記的的 node 仍然能夠通過,但必須要能分變出與原來未標記狀態的不同。 * 此時當訊號 concurrent 時,以上面為例,要插入 node_20 和發現 node_10 是 logically deleted 同事發生時,必須先強制將 10 做 phisically deleted 才可插入,如此一來就可以避免以上情形發生。 ### insert 的部份: * 一開始本來對,如果要序列: 1,4 中間同時插入 2 跟 3 的話可能會發生,兩個 2 跟 3 同時指到4,此時序列就不是我們要的 1,2,3,4 了 ,但以下程式碼用 CAS 插入做了檢查的動作: * 文中利用 CAS(addr,o,n) 去比較 addr 和舊的內容 o,有沒有一樣,有的話就寫入,沒有的話就一直重複做,他 return 的 boolean 就是用來提醒相鄰的結點如果沒有一樣的話就要update(重做)。 ~~~ likec= public boolean List::insert (KeyType key) { //先將 key 建立節點叫 new_node Node *new_node = new Node(key); //建立左右節點 Node *right_node, *left_node; do { right_node = search (key, &left_node); //右節點不是尾巴且和我們要插入的值一樣就不插入 //這段我前一篇在改時有拿掉 //因為他讓序列 1,2,2,3 變成 1,2,3 if ((right_node != tail) && (right_node.key == key)) /*T1*/ return false; //右節點是我們即將插入節點的下一個 new_node.next = right_node; //如果你的左邊節點的下一個是你的右邊節點,這樣就可以插入 //確保中途沒有其他人改過這段資料 if (CAS (&(left_node.next), right_node, new_node)) /*C2*/ return true; } while (true); /*B3*/ } ~~~ ### find 的部份: * 一開始先傳進去 search_key 找完存在 right_node ,如果right_node 已經在 tail 的位置或跟傳進去的不一樣就不用找了回傳 false ,一樣才傳 true 。 ~~~ clike= public boolean List::find (KeyType search_key) { Node *right_node, *left_node; //把要找的 search_key 放置 right_node = search (search_key, &left_node); //如果一樣,而且不是 tail 才回傳 true if ((right_node == tail) || (right_node.key != search_key)) return false; else return true; } ~~~ ### search 的部份: * 分成3個部份: * part 1 * 這組迴圈得先判斷是否有被做記號,如果沒有的話,那就是 t 的值小於搜尋值,把 t 的值存在 *left_node , t_next 存在 left_node_next,然後將 t_next unmark 存進去 t,而 t_next 也會等於 t.next ,如此一來就可以一直往下比 * 如果有 mark 就不會存他,直接跳到他的 next * 直到看到沒 mark 而且 t 的值又大於搜尋值的時候。 ~~~ clike= /* 1: Find left_node and right_node */ do { if (!is_marked_reference(t_next)) { (*left_node) = t; left_node_next = t_next; } t = get_unmarked_reference(t_next); if (t == tail) break; t_next = t.next; // } while (is_marked_reference(t_next) || (t.key<search_key)); /*B1*/ right_node = t; ~~~ * part 2 * 這組是在 check 左邊的點和右邊的點之間,有沒有突然插入點或刪除點,因為每個執行序都再做事,很可能會不小心動到當前 search 的執行序在使用的點,所以要不停確認。 ~~~ clike= /* 2: Check nodes are adjacent */ if (left_node_next == right_node) //如果你右邊的 node 的下一個還有做記號,就要再 search 一次 if ((right_node != tail) && is_marked_reference(right_node.next)) goto search_again; /*G1*/ //沒有就可以回傳值了! else return right_node; /*R1*/ ~~~ * part 3 * 因為沒通過上方 if 的 check 所以必須要有以下 CAS 把 right_node 蓋回來。 ~~~ clike= /* 3: Remove one or more marked nodes */ if (CAS (&(left_node.next), left_node_next, right_node)) /*C1*/ if ((right_node != tail) && is_marked_reference(right_node.next)) goto search_again; /*G2*/ else return right_node; /*R2*/ } while (true); /*B2*/ } ~~~ ### delete 的部份: * 再發現未被標記的值即將刪除,會用 CAS 的標記值蓋過去 , break 出去,再重新刪除,這部份就像論文中說的 update 。 ~~~ clike= public boolean List::delete (KeyType search_key) { Node *right_node, *right_node_next, *left_node; do { right_node = search (search_key, &left_node); //找不到回傳false if ((right_node == tail) || (right_node.key!= search_key)) /*T1*/ return false; right_node_next = right_node.next; //如果未被標記的話,會用 CAS 的標記值蓋過去 , break 出去 if (!is_marked_reference(right_node_next)) if (CAS (&(right_node.next), /*C3*/ right_node_next, get_marked_reference (right_node_next))) break; } while (true); /*B4*/ //如果左邊的下一個是右節點的話,表示沒人更動過資料 //可以進行刪除 if (!CAS (&(left_node.next), right_node, right_node_next)) /*C4*/ right_node = search (right_node.key, &left_node); return true; } ~~~ ### 關於正確性 證明的想法: We will take a fairly direct approach to outlining the linearizability of the operations by identifying particular instants during their execution at which the complete operation appears to occur atomically * 操作執行時,在特定的過程,尤其以 atomic function 來實現,去證明結果線性化 #### A.Conditions Maintained by Search * 在 search 維護的情況下,使用以下 loop 確保是升序狀態: ~~~ search_key <= right_node.key left_node.key < search.key ~~~ * 在 search 函式返回值分成 2 個,確保左右相鄰 ~~~ /*程式碼再上面*/ /*R1*/ 如果確認完左右還是相鄰 /*R2*/ 原本的左右不相鄰就用 CAS 的 compare_and_swap 值蓋掉 ~~~ * 對於右節點的標記狀態,通過前3個回傳條件都是true後,觀察兩個返回路徑都要確認右結點未被標記,而右結點從未變成 unmarked ,我們推斷起初右節點沒有標記。 * CAS 指令 C1 成功後,通過從 list 中 unlinking 標記的節點 #### B.Linearization points * 這裡是在說明 op(i,m) 是和操作次數 (m) , processor 數 (i) 有關,也就是,在i個 processor 下,執行m次的表現 * 而 d(i,m) 則是執行時 search 在後置條件被滿足時的 final real-time * 我們一直希望利用多執行緒下去做的排序,如果 thread 愈多理論上是愈快 ![](https://i.imgur.com/aJb6eG8.png) * 如果能成功算得 d(i,m),就表示我們右節點沒有被 mark 且包含 search_key * 如果利用 Find(k) 去搜尋,發現在 search_key 右半部的nodes 的 key 是嚴格遞減,左半部的nodes 的 key 是嚴格遞增,可證其正確性。 #### C.Progress * We will show that the concurrent implementation is non-blocking. * 每次成功的插入都會產生一個更新 * 每次成功的刪除產生最多兩次更新 ### 結果 * 如果單看這篇論文的結果拿 NEW 和 MUTEX 比,是這篇 (NEW) 的花的CPU time(user+system)比 MUTEX 少。 ![](https://i.imgur.com/JEhoIKL.png) ### 初步想法 * 拿兩條 thread 舉例,之後數目會往上加 * 因為在 cut 時已經能分工,是 merge 的部份需要改進 * source 和 destination 是兩個 sort 好的linked list * 我們想把 source 一個個元素安插進 destination,將他們 merge 起來  ```sequence Note left of source:thread1 source->destination: insert(CAS) Note right of destination:紀錄thread1插入位置 Note left of source:thread 2 source->destination: insert(CAS) Note right of destination:紀錄thread2插入位置 ``` * 每條 thread 工作:(以 thread1 為例) STEP1 : thread1 insert a element to destination STEP2 : insert 成功 STEP3 : 紀錄 thread1 插入位置 STEP4 : 下次由 thread1 本身紀錄的位置下去 search ## concurrent merge merge sort 在最後會將兩個已經排序好的資料重新 merge ,合併成一筆排序好的資料,但是如果兩筆資料量很大時,只有一個 thread 去處理會相當得慢。 因此我們想要將這部份以多個 thread 下去 lock-free 的方式處理,在看了論文之後,我們的想法大概是這樣: 1. 有兩個排序好的 linked list ,我們根據 linked list 的大小,分成 source 與 destination ,目標是將 source 合併至 destination 2. 根據 source 的大小,決定需要多少個 thread 參與其中 3. 每個 thread 會從 source 取得一個 node,並試著插入到 destination 4. 為了避免每次都要從頭 ( destination 的 `head` ) 尋找,每個 thread 會紀錄上一次的插入位置,等取得下一個 node 時,直接從上一次的插入位置尋找 &nbsp;&nbsp;&nbsp;&nbsp;這是因為 source 資料已經排序好,已經確定下一個 node 插入的位置必定在上一個 node 後面 5. 最後一個 thread 把完成的 linked list 再次開啟一個新的 task 再 push 入 task queue 之中 參考了原本的論文實作出類似的架構 * 對於 source ,我們實作出 `list_pop()` 從 source 取出一個 node * 對於 destination ,實作出 `list_insert()` , `list_insert()` 會使用 `list_search()` 協助找尋插入點 以上的操作比起論文還要來的單純,一加一減的 linked list 使得我們不用去考慮 marked 以及 unmarked 的問題 這是 concurrent merge 所需的參數,除了上述提到的 `source` 以及 `dest` ,還加入了 `task_cnt` 作為 counter,最後會遞減至 0 ,代表所有 thread 已經做完 這不代表一定要所有 thread 進入才開始,只要有一個 thread 進入就開始 merge 的動作,但一定會有`task_cnt` 初始值的 thread 進入,即使資料已經處理完也會先進入爾後離開 ```clike= typedef struct { llist_t *source; llist_t *dest; int task_cnt; } merge_list_arg_t; ``` 在每個 thread 排序完自身的 linked list 時會丟入 `tmp_list` ,或是因為 `tmp_list` 已有 list 存在開啟新的 task ,設定好相關的參數,決定應進入 thread 數量,開啟等量的 task ,並行 merge ```clike= void merge_thread_lists(void *data) { llist_t *_list = (llist_t *) data; if (_list->size < (uint32_t) data_count) { pthread_mutex_lock(&(data_context.mutex)); llist_t *_t = tmp_list; if (!_t) { tmp_list = _list; pthread_mutex_unlock(&(data_context.mutex)); } else { tmp_list = NULL; pthread_mutex_unlock(&(data_context.mutex)); merge_list_arg_t *arg = malloc(sizeof(merge_list_arg_t)); if (_t->size < _list->size) { arg->source = _t; arg->dest = _list; } else { arg->source = _list; arg->dest = _t; } arg->task_cnt = arg->source->size / local_size * 2; int task_cnt = arg->task_cnt; for (int i = 0; i < task_cnt; i++) tqueue_push(pool->queue, task_new(concurrent_merge, arg)); } } else { the_list = _list; tqueue_push(pool->queue, task_new(NULL, NULL)); } } ``` 每個 thread 的工作就是不斷地 拿取 node >> 插入 node 當再也 pop 不出任何東西時,會去將 `task_cnt` 減 1 離開,當最後一個 thread 也準備離開時(line 13: `if (old_cnt == 1)`): * 更改 source 與 dest 的大小 > 其實要 `list_pop` `list_insert` 就即時更新了 ... :sweat_smile: * 釋放 source 的 linked list * 將完成的 list 丟入task queue 繼續做後續處理 ```clike= void concurrent_merge(void *data) { merge_list_arg_t *arg = (merge_list_arg_t *)data; llist_t *source = arg->source; llist_t *dest = arg->dest; node_t *cur = NULL, *node = NULL; int old_cnt = 0; cur = dest->head; while (1) { node = list_pop(source); if (!node) { old_cnt = atomic_fetch_sub(&(arg->task_cnt), 1); if (old_cnt == 1) { dest->size += source->size; source->size = 0; list_free_nodes(source); tqueue_push(pool->queue, task_new(merge_thread_lists, dest)); } break; } list_insert(dest, &cur, node); } } ``` `list_pop` 、 `list_insert` 、 `list_push` ```clike= node_t *list_pop(llist_t *list) { node_t *head = list->head; node_t *tail = list->tail; node_t *node, *node_next; do { node = head->next; if (node == tail) return NULL; node_next = node->next; if (atomic_compare_exchange_weak(&(head->next), &node, node_next)) break; } while (1); return node; } node_t *list_search(node_t *cur, node_t *tail, val_t data, node_t** left_node) { node_t *right_node; node_t *t = cur; node_t *t_next = cur->next; do { *(left_node) = t; t = t_next; if (t == tail) break; t_next = t->next; } while (strcmp((char *)t->data, (char *)data) <= 0); right_node = t; return right_node; } void list_insert(llist_t *list, node_t **cur, node_t * node) { node_t *left_node, *right_node; do { right_node = list_search(*cur, list->tail, node->data, &left_node); node->next = right_node; if (atomic_compare_exchange_weak(&(left_node->next), &right_node, node)) { *cur = node; return; } else *cur = left_node; } while (1); } ``` ### 修改資料結構 上面沒有提到的問題點是關於 linked list 的資料結構,因為論文的資料結構與原本的不一樣, 所以需要一些改變 原本 list 的資料結構,且初始化 head 指向 `NULL` ```clike typedef struct { node_t *head; /**< The head of the linked list */ uint32_t size; /**< The size of the linked list */ } llist_t; ``` ```clike llist_t *list_new() { /* allocate list */ llist_t *list = malloc(sizeof(llist_t)); list->head = NULL; list->size = 0; return list; } ``` 但現在新增了 `tail` 且 `head` 與 `tail` 會各指向一個不會使用到、配置空間的 node (這個 node 稱為 [sentinel node](https://en.wikipedia.org/wiki/Linked_list#Sentinel_nodes)) 來使演算法正確運作 ```clike typedef struct { node_t *head; /**< The head of the linked list */ node_t *tail; /**< The tail of the linked list */ uint32_t size; /**< The size of the linked list */ } llist_t; ``` ```clike llist_t *list_new() { /* allocate list */ llist_t *list = malloc(sizeof(llist_t)); list->tail = node_new((val_t)NULL, NULL); list->head = node_new((val_t)NULL, list->tail); list->size = 0; return list; } ``` 這樣的改動連帶牽扯了其他相關的 function 都要修正 包含 `mergesort.c` 中的 `split_n_merge()` 與 `sort_n_merge()` `main.c` 中的 `build_list_from_file()` 與 `cut_local_list()` 有些 function 需要頻繁的使用 `list_get` 來取得最後一個 node ,也就是說整個 list 都遍歷了 ...結果這樣使得效率整個降低 1 個 thread,純修改資料結構的結果 ``` #Total_tasks_consumed: 3 #Elapsed_time: 378.183 ms #Throughput: 7 (per sec) ``` ``` #Total_tasks_consumed: 3 #Elapsed_time: 475.714 ms #Throughput: 6 (per sec) ``` 光是如此已經在時間上拉出差距 ### 重新調整資料結構 因為調整資料結構對於 merge sort 帶來的衝擊很大,最後是不改變原始的資料結構,只有當有需要的時候再改變 為此當要並行 merge 時,我們需要 sentinel nodes,我們在執行前加入了 sentinel head ```clike void list_add_dummy_head(llist_t *list) { node_t *new_head = node_new((val_t)NULL, list->head); list->head = new_head; } void list_remove_dummy_head(llist_t *list) { node_t *new_head = list->head->next; free(list->head); list->head = new_head; } ``` 至於 sentinal tail 是不一定要的:在我們的程式碼中所有的 `list->tail` 都可以用 `NULL` 取代 `merge_thread_list()` 第 13、14 行針對 src 與 dest 做這樣的處理,再丟入 task 中執行 `concurrent_merge()` ```clike= void merge_thread_lists(void *data) { llist_t *_list = (llist_t *) data; if (_list->size < (uint32_t) data_count) { pthread_mutex_lock(&(data_context.mutex)); llist_t *_t = tmp_list; if (!_t) { tmp_list = _list; pthread_mutex_unlock(&(data_context.mutex)); } else { tmp_list = NULL; pthread_mutex_unlock(&(data_context.mutex)); list_add_dummy_head(_t); list_add_dummy_head(_list); merge_list_arg_t *arg = malloc(sizeof(merge_list_arg_t)); if (_t->size < _list->size) { arg->source = _t; arg->dest = _list; } else { arg->source = _list; arg->dest = _t; } arg->task_cnt = arg->source->size / local_size * 2; int task_cnt = arg->task_cnt; for (int i = 0; i < task_cnt; i++) tqueue_push(pool->queue, task_new(concurrent_merge, arg)); } } else { the_list = _list; tqueue_push(pool->queue, task_new(NULL, NULL)); } } ``` `concurrent_merge()` 的最後一個 thread 再把 dest 的 sentinel head 移除 (第 17 行) ```clike= void concurrent_merge(void *data) { merge_list_arg_t *arg = (merge_list_arg_t *)data; llist_t *source = arg->source; llist_t *dest = arg->dest; node_t *cur = NULL, *node = NULL; int old_cnt = 0; cur = dest->head; while (1) { node = list_pop(source); if (!node) { old_cnt = atomic_fetch_sub(&(arg->task_cnt), 1); if (old_cnt == 1) { dest->size += source->size; source->size = 0; list_free_nodes(source); list_remove_dummy_head(dest); tqueue_push(pool->queue, task_new(merge_thread_lists, dest)); } break; } list_insert(dest, &cur, node); } } ``` ### 實驗結果 因為 lock-free 很容易爆炸,這邊以維持 mutex 做測試 ![](https://i.imgur.com/kdKPZ5V.png) 時間確實有下降一點!不過當 thread 數目變多,時間會再往上增長,主要原因是因為 ```clike arg->task_cnt = arg->source->size / local_size * 2; ``` 這是決定 thread 參與數量的方式, `local_size` 代表 thread 當初分割的local linked list 資料量 ,也就是使用與處理這些資料同等數量的 thread 做 merge ,但是每個 thread 的參與需要遍歷整個 linked list , thread 多於核心數等於過度浪費時間在 `list_search()` 上,反而降低效益 ### 限制 merge 參與的 thread 數 最後將能夠參與的數量限制在核心數,這裡使用到了 [get_nprocs()](http://man7.org/linux/man-pages/man3/get_nprocs.3.html) 執行環境在雙核且支援 hyper-threading 的 intel 處理器,`get_nprocs()` 回傳了 4 文中有特別指出這個 function 成本昂貴,因為要去開啟 /sys 底下的檔案解析,因此只使用一次存下來 ```clike int main() { /* ... */ num_of_procs = get_nprocs(); /* ... */ } ``` ```clike arg->task_cnt = MIN(num_of_procs, arg->source->size / local_size * 2); ``` ![](https://i.imgur.com/o5r5pDP.png) ## 討論改進效果不顯著的原因 我們的目標是要改善程式碼的 scalability,但可以由上述的比較圖看出,程式在 thread 數為 4 的時候表現最佳,thread 數繼續變多的時候,無法更快甚至有變慢的情形。所以我們決定驗證這個現象的原因。 我們歸納出的原因有兩個,第一個是程式本身不夠好,concurrent merge 的機制不夠好,因為愈多個 thread 要一起 merge , 每個 thread 都要拜訪 list 的所有 node ,或者是 thread pool 的機制也尚待改進;第二個是 thread 之間的 context switch 所花的時間,若 context switch 所花的時間遠小於我們程式碼所花的時間,就代表 scalability 還有增加的空間。 以下數據以 64 threads 的 case 來比較 * 程式碼花的時間(平均): 436 ms * context switch 的次數 ``` Performance counter stats for './sort 64 test_data/input.txt': 1,519 cs 1.180769370 seconds time elapsed ``` * petermouse 的電腦的 context switch 花的時間 * [contextswitch](https://github.com/tsuna/contextswitch) * [lmbench-next](https://github.com/jserv/lmbench-next) ``` ./cpubench.sh model name : Intel(R) Core(TM) i5-4200H CPU @ 2.80GHz 1 physical CPUs, 2 cores/CPU, 2 hardware threads/core = 4 hw threads total -- No CPU affinity -- 10000000 system calls in 454871760ns (45.5ns/syscall) 2000000 process context switches in 1997589690ns (998.8ns/ctxsw) 2000000 thread context switches in 2240424384ns (1120.2ns/ctxsw) sched_setscheduler(): Operation not permitted 2000000 thread context switches in 61020406ns (30.5ns/ctxsw) -- With CPU affinity -- 10000000 system calls in 426586426ns (42.7ns/syscall) 2000000 process context switches in 1962945555ns (981.5ns/ctxsw) 2000000 thread context switches in 1661305925ns (830.7ns/ctxsw) sched_setscheduler(): Operation not permitted 2000000 thread context switches in 271151922ns (135.6ns/ctxsw) -- With CPU affinity to CPU 0 -- 10000000 system calls in 424558129ns (42.5ns/syscall) 2000000 process context switches in 2258118701ns (1129.1ns/ctxsw) 2000000 thread context switches in 2034381771ns (1017.2ns/ctxsw) sched_setscheduler(): Operation not permitted 2000000 thread context switches in 58380978ns (29.2ns/ctxsw) ``` 436 ms >> 1519 * 830.7 ns ~ 1 ms ### 上課筆記 * 老師提到可以在 merge 的過程中重新建立記憶體連續的資料 (array) * [CFS -- linux 2.6.23](https://en.wikipedia.org/wiki/Completely_Fair_Scheduler) * context switch 時間 : https://github.com/jserv/lmbench-next [Quantifying The Cost of Context Switch∗](http://www.cs.rochester.edu/u/cli/research/switch.pdf) ### 關於原版 context switch 不可行的原因 附上原版 [github](https://github.com/tsuna/contextswitch) ~~~ clike= static void* thread(void*ctx) { (void)ctx; for (int i = 0; i < iterations; i++) sched_yield(); return NULL; } ~~~ #### 資料來源 [sched_yield()](http://man7.org/linux/man-pages/man2/sched_yield.2.html#top_of_page) , [CFS](https://en.wikipedia.org/wiki/Completely_Fair_Scheduler) * DESCRIPTION ~~~ sched_yield() causes the calling thread to relinquish the CPU.The thread is moved to the end of the queue for its static priority and a new thread gets to run. ~~~ * RETURN VALUE ~~~ On success, sched_yield() returns 0. On error, -1 is returned,and errno is set appropriately. ~~~ * 本身電腦版本Linux version 4.4.0 ~~~ clike= tina@tina-X550VB:~$ cat /proc/version Linux version 4.4.0-78-generic (buildd@lgw01-11) (gcc version 5.4.0 20160609 (Ubuntu 5.4.0-6ubuntu1~16.04.4) ) #99-Ubuntu SMP Thu Apr 27 15:29:09 UTC 2017 ~~~ * CFS was merged into the 2.6.23 ~~~ The Completely Fair Scheduler (CFS) is a process scheduler which was merged into the 2.6.23 (October 2007) release of the Linux kernel and is the default scheduler. ~~~ * CFS 相較於 2.6 版本以前的 kernel * In contrast to the previous O(1) scheduler used in older Linux 2.6 kernels, the CFS scheduler implementation is not based on run queues. Instead, a red–black tree implements a "timeline" of future task execution. * CFS uses a concept called "sleeper fairness", which considers sleeping or waiting tasks equivalent to those on the runqueue. * 參考[文件](http://people.redhat.com/mingo/cfs-scheduler/sched-design-CFS.txt)中提到 ~~~ ( another detail: due to nanosec accounting and timeline sorting,sched_yield() support is very simple under CFS, and in fact under CFS sched_yield() behaves much better than under any other scheduler i have tested so far. ~~~ ### jserv's lmbench-next #### [lmbench](http://lmbench.sourceforge.net/cgi-bin/man?keyword=lmbench&section=8) 一種 system benchmark [benchmp](http://manpages.ubuntu.com/manpages/precise/timing.3.html) ## 重新建立連續記憶體 ### 先從 `sort_n_merge` 著手 連續的記憶體結構可以使 cache-miss 有機會降低 在排序 local linked list 的過程中 ,嘗試在 merge 的時候先分配一塊準備要塞排序好的資料空間,當作是一個陣列的資料結構來 merge,但內部還是存在 linked list 的結構,所以 next 單純指向下一個陣列元素,方便在之後可以 concurrent merge 時有幫助 ```clike= llist_t *sort_n_merge(llist_t *a, llist_t *b) { llist_t *_list = list_new(); _list->size += a->size + b->size; node_t* node_list = malloc(sizeof(node_t) * (a->size + b->size)); int idx = 0; while (a->size && b->size) { /* Choose the linked list whose data of first node is small. */ llist_t *small; int cmp = strcmp((char*)(a->head->data), (char*)(b->head->data)); small = (llist_t *)((intptr_t) a * (cmp <= 0) + (intptr_t) b * (cmp > 0)); /* Extract first node of selected list and put to the new list. */ node_list[idx].data = small->head->data; node_list[idx].next = node_list + idx + 1; idx++; small->head = small->head->next; --small->size; } /* Append the remaining nodes */ llist_t *remaining = (llist_t *) ((intptr_t) a * (a->size > 0) + (intptr_t) b * (b->size > 0)); while(remaining->head) { node_list[idx].data = remaining->head->data; remaining->head = remaining->head->next; node_list[idx].next = node_list + idx + 1; idx++; } node_list[idx - 1].next = NULL; free(a->head); free(b->head); free(a); free(b); _list->head = node_list; return _list; } ``` 實驗結果 ![](https://i.imgur.com/qXpXy7S.png) 目前時間還沒有顯著差異 ### 從連續的記憶體改善 `concurrent_merge` 先前 `concurrent_merge` 會使得每個 merge 的 thread 幾乎遍歷過整個 linked list 來找尋適合插入的點並插入 node 但是現在建立在連續記憶體的結構下,我們可以 * 利用 binary search 找尋接近的點,下去 insert node * 而且每插入一個 node ,就縮小 binary search 的邊界 (因為插入點之前的 node 不會再由同個 thread 插入) * 分配好 source linked list 的切割 ,不用每個 thread 去爭奪 #### 重新調整資料結構 待補 ![](https://i.imgur.com/tdeLmKm.png) #### binary search to be continued... ## 參考資料 [pthread_cond_signal](https://linux.die.net/man/3/pthread_cond_signal) [pthread_cond_wait](https://linux.die.net/man/3/pthread_cond_wait) [](https://www.ibm.com/developerworks/library/l-completely-fair-scheduler/) [](http://man7.org/linux/man-pages/man7/sched.7.html) [](http://www.cs.montana.edu/~chandrima.sarkar/AdvancedOS/CSCI560_Proj_main/)