# 2017q1 team1 mergesort-concurrent
contributed by <`zmke`、`petermouse`、 `tina0405`、`heathcliffYang`>
## 檢討之前的程式碼
### 從原版來想
在原始的版本中,會將輸入的 list 分割給每個 thread 去做 merge ,而最後 thread 之間 list 的 merge 則是一個不能並行執行的部分,無法隨著 thread 數增加而提高執行效率,是增加 scalability 會遇到的瓶頸。
![](https://i.imgur.com/aW4KeUJ.png)
這讓我們想到一個在計組教過的 [Amdahl's law](https://en.wikipedia.org/wiki/Amdahl%27s_law)
![](https://i.imgur.com/KZDdaid.png)
不能並行的部分則讓核心數增加帶來的加速效果存在極限。
![](https://i.imgur.com/o9AVbcR.png)
所以我們把其中一個目標放在 thread 之間 list 的 merge 也要讓它可以並行處理,實作方法在下面的部分會詳細介紹。
另外,在與新夥伴溝通時,發現他們參考[ Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms](https://www.research.ibm.com/people/m/michael/podc-1996.pdf) 嘗試過 lock-free threadpool。
1. tqueue_pop()
```c=
task_t *tqueue_pop(tqueue_t * const the_queue)
{
task_t *head = NULL, *tail = NULL, *next = NULL, *ret = NULL;
while(1) {
head = the_queue->head;
tail = the_queue->tail;
next = head->next;
if(head == the_queue->head) {
if(head == tail) {
if(next == NULL) {
return NULL;
}
atomic_compare_exchange_weak(&(the_queue->tail), &tail, next);
} else {
ret = next;
if(atomic_compare_exchange_weak(&(the_queue->head), &head, next)) {
break;
}
}
}
}
atomic_fetch_sub(&(the_queue->size), 1);
atomic_fetch_add(&(the_queue->num_of_consumed), 1);
return ret;
}
```
2. tqueue_push()
```c=
int tqueue_push(tqueue_t * const the_queue, task_t *task)
{
task_t *tail = NULL, *tail_next = NULL;
task->next = NULL;
while(1) {
tail = the_queue->tail;
tail_next = tail->next;
if(tail == the_queue->tail) {
if(tail_next == NULL) {
if(atomic_compare_exchange_weak(&(tail->next), &tail_next, task)) {
if(atomic_compare_exchange_weak(&(the_queue->tail), &tail, task)) {
}
break;
}
} else {
if(atomic_compare_exchange_weak(&(the_queue->tail), &tail, tail_next)) {
}
}
}
}
atomic_fetch_add(&(the_queue->size), 1);
return 0;
}
```
3. 論文中沒能實踐的部分
中介的資料結構,這個資料結構去指向真正的 node (node_t) 以及存放一個變數名稱 count 代表 reference counting 的次數,可以用來避免 ABA 問題
```c=
structure pointer_t { ptr: pointer to node_t, count: unsigned integer }
```
### 實驗結果
以下 2 、 3 版本都有包含 lock-free threadpool,且都是以 `./sort 16 test_data/input.txt` 進行測驗。
1. Original
原版在最後 thread 之間的 list 做 merge 時,因為工作不能多執行緒一起執行,所以其他 thread 會持續要求工作直到結束。
![](https://i.imgur.com/zv6d9NS.png)
2. Condition variable
這個版本是 lock-free thread pool 與 condition variable 一起的實作,而 condition variable 則會在 monitor 的部分做介紹。
因為 condition variable 的關係,當 queue 裡面沒有工作時,空閒的 thread 不會佔用到 CPU 的時間,`tqueue_pop` 的次數有減少,但花費時間卻增加了。
![](https://i.imgur.com/FkibzUW.png)
此外,lock-free thread pool 會導致在 thread 多時,跑不完。
3. Condition variable + merge concurrent
這個版本是 lock-free thread pool 與 condition variable ,再加上 thread 之間的 list 做 merge 的工作是 concurrent。
![](https://i.imgur.com/kk39cru.png) `tqueue_pop` 的次數減少一半,後期 merge 的工作已可以允許多個 thread 一起執行;其中工作可以容納的 thread 數有限制: `arg->task_cnt = arg->source->size / local_size * 2` ,這部分需要再思考如何訂定最有效益的辦法;這是初版的 concurrent merge ,改變了所有 list 的資料結構,同時也可以看到 list_search 與 list_insert 有相當的成本,也是一個可以考慮改進的方向。
* 三種實作的效能比較
三種方法都在 thread 數為 4 時表現較好,而無可避免地都在 thread 數繼續往上增加時,讓花費時間變多,其中加入 concurrent merge 之後卻不盡理想的原因在之後的部份會詳細說明。
![](https://i.imgur.com/WnnZJAj.png)
>> 這張圖忘記改 label 了
* 另外的問題
其實每次測試,時間的浮動非常大,所以測試時應該要多次量測、使用統計的方法來取得較真實的實驗數據。
## Monitor
* 讓 thread 可以保持 mutual exclution 並等待直到特定的 condition 成立
* monitor 包含 mutex 和 conditional variable
* `pthread_cond_wait(cond, mutex)`
* release mutex
* block on the condition variable
* sleep the thread
* 當被其他 thread 透過 signal() 或 broadcast() 喚醒之後,會再獲得 mutex
* `pthread_cond_signal(cond)`
* unblock threads blocked on a conditional variable
* 至少喚醒一條 thread ,就算沒有 thread 在等待也會成功 return
* 如果有多條 thread 同時在等待同一個 conditional variable ,會由排程的優先順序決定哪條 thread 優先被喚醒
* 在 multi processor 的環境下,當 condition variable 同時被不同 processor signal 時,可能會喚醒多條 thread
* `pthread_cond_broadcast(cond)`
* 喚醒所有在等待該 condtional variable 的 thread
* 當 thread 被喚醒之後,應該要重新評估是否該繼續執行或是再等待一次,所以 condition wait 常被實作在 while loop 內,來確保執行的安全性
### Monitor 實作
原本的程式在 task queue 沒東西的時候,也會一直嘗試 tqueue_pop() ,造成 CPU time 的浪費,尤其是在 thread 數量多的時候影響會更大
我們想要利用 condition variable 讓空閒的 thread 在 task queue 裡面沒有東西的時候 sleep ,等到有 task push 到 task queue 裡面之後,再喚醒他們起來要工作,希望減少 pop 空的 task queue 造成的浪費
#### task_run()
一開始就拿 mutex ,之後檢查 task queue 內是否是空的,如果是空的就等待並釋放 mutex ,被喚醒之後會拿到 mutex 但是 pool->queue->size 可能已經被改變,所以要再檢查一次,如果沒問題就釋放 mutex ,否則就是再等待一次
```clike=
static void *task_run(void *data __attribute__ ((__unused__)))
{
while (1) {
/*Get the mutex lock*/
pthread_mutex_lock(&(pool->queue->mutex));
/*Check wether the number of task in the thread queue is zero*/
/*If the task queue is empty, block the thread and release the mutex lock*/
while(pool->queue->size == 0)
pthread_cond_wait(&(pool->queue->cond), &(pool->queue->mutex));
/*When the conditional variable is signaled, wake up the thread and get the mutex lock*/
/*Release the mutex lock*/
pthread_mutex_unlock(&(pool->queue->mutex));
task_t *_task = tqueue_pop(pool->queue);
if (_task) {
if (!_task->func) {
tqueue_push(pool->queue, task_new(NULL, NULL));
free(_task);
break;
} else {
_task->func(_task->arg);
free(_task);
}
}
}
pthread_exit(NULL);
}
```
#### tqueue_push()
在 push 的最後加上 pthread_cond_signal() ,藉此通知被 block 的 thread 已經有工作可以拿
```clike=
int tqueue_push(tqueue_t * const the_queue, task_t *task)
{
task_t *tail = NULL, *tail_next = NULL;
task->next = NULL;
while(1) {
tail = the_queue->tail;
tail_next = tail->next;
if(tail == the_queue->tail) {
if(tail_next == NULL) {
if(atomic_compare_exchange_weak(&(tail->next), &tail_next, task)) {
if(atomic_compare_exchange_weak(&(the_queue->tail), &tail, task)) {
}
break;
}
} else {
if(atomic_compare_exchange_weak(&(the_queue->tail), &tail, tail_next)) {
}
}
}
}
atomic_fetch_add(&(the_queue->size), 1);
/*Signal the condtional variable*/
pthread_cond_signal(&(the_queue->cond));
return 0;
}
```
### 實驗結果
![](https://i.imgur.com/vmdpuSo.png)
## 論文閱讀
[Lock-free linkedlist implementation of Harris' algorithm](https://www.cl.cam.ac.uk/research/srg/netos/papers/2001-caslists.pdf)
* 原本的數字串如下:
![](https://i.imgur.com/4CT1jNs.png)
* 如果想要 insert Node_20 , 我們會這樣做:
![](https://i.imgur.com/USDNKRB.png)
* 如果想要 delete Node_10 , 我們會這樣做:
![](https://i.imgur.com/oqEMsns.png)
* 但如果現在 insert Node_20 和 delete Node_10 同時發生,就有可能出現以下情況:
![](https://i.imgur.com/yybhpPk.png)
* 但我們不樂見,因為我們希望他會走訪 Node_20
* 解決方法:
* 步驟1:左邊的圖是先將刪除點 mark 起來
* 步驟2:右邊的圖是真的刪掉 node_10
![](https://i.imgur.com/R9nrnba.png)
* logically deleted : 完成步驟1
* phisically deleted : 完成步驟2
* 雖然這邊沒有將 node_10 的 right node 斷掉,但整個list 的運行並不走訪 node_10
* 好處:
* 被標記的的 node 仍然能夠通過,但必須要能分變出與原來未標記狀態的不同。
* 此時當訊號 concurrent 時,以上面為例,要插入 node_20 和發現 node_10 是 logically deleted 同事發生時,必須先強制將 10 做 phisically deleted 才可插入,如此一來就可以避免以上情形發生。
### insert 的部份:
* 一開始本來對,如果要序列: 1,4 中間同時插入 2 跟 3 的話可能會發生,兩個 2 跟 3 同時指到4,此時序列就不是我們要的 1,2,3,4 了 ,但以下程式碼用 CAS 插入做了檢查的動作:
* 文中利用 CAS(addr,o,n) 去比較 addr 和舊的內容 o,有沒有一樣,有的話就寫入,沒有的話就一直重複做,他 return 的 boolean 就是用來提醒相鄰的結點如果沒有一樣的話就要update(重做)。
~~~ likec=
public boolean List::insert (KeyType key) {
//先將 key 建立節點叫 new_node
Node *new_node = new Node(key);
//建立左右節點
Node *right_node, *left_node;
do {
right_node = search (key, &left_node);
//右節點不是尾巴且和我們要插入的值一樣就不插入
//這段我前一篇在改時有拿掉
//因為他讓序列 1,2,2,3 變成 1,2,3
if ((right_node != tail) && (right_node.key == key)) /*T1*/
return false;
//右節點是我們即將插入節點的下一個
new_node.next = right_node;
//如果你的左邊節點的下一個是你的右邊節點,這樣就可以插入
//確保中途沒有其他人改過這段資料
if (CAS (&(left_node.next), right_node, new_node)) /*C2*/
return true;
} while (true); /*B3*/
}
~~~
### find 的部份:
* 一開始先傳進去 search_key 找完存在 right_node ,如果right_node 已經在 tail 的位置或跟傳進去的不一樣就不用找了回傳 false ,一樣才傳 true 。
~~~ clike=
public boolean List::find (KeyType search_key) {
Node *right_node, *left_node;
//把要找的 search_key 放置
right_node = search (search_key, &left_node);
//如果一樣,而且不是 tail 才回傳 true
if ((right_node == tail) || (right_node.key != search_key))
return false;
else
return true;
}
~~~
### search 的部份:
* 分成3個部份:
* part 1
* 這組迴圈得先判斷是否有被做記號,如果沒有的話,那就是 t 的值小於搜尋值,把 t 的值存在 *left_node , t_next 存在 left_node_next,然後將 t_next unmark 存進去 t,而 t_next 也會等於 t.next ,如此一來就可以一直往下比
* 如果有 mark 就不會存他,直接跳到他的 next
* 直到看到沒 mark 而且 t 的值又大於搜尋值的時候。
~~~ clike=
/* 1: Find left_node and right_node */
do {
if (!is_marked_reference(t_next)) {
(*left_node) = t;
left_node_next = t_next;
}
t = get_unmarked_reference(t_next);
if (t == tail) break;
t_next = t.next;
//
} while (is_marked_reference(t_next) || (t.key<search_key)); /*B1*/
right_node = t;
~~~
* part 2
* 這組是在 check 左邊的點和右邊的點之間,有沒有突然插入點或刪除點,因為每個執行序都再做事,很可能會不小心動到當前 search 的執行序在使用的點,所以要不停確認。
~~~ clike=
/* 2: Check nodes are adjacent */
if (left_node_next == right_node)
//如果你右邊的 node 的下一個還有做記號,就要再 search 一次
if ((right_node != tail) && is_marked_reference(right_node.next))
goto search_again; /*G1*/
//沒有就可以回傳值了!
else
return right_node; /*R1*/
~~~
* part 3
* 因為沒通過上方 if 的 check 所以必須要有以下 CAS 把 right_node 蓋回來。
~~~ clike=
/* 3: Remove one or more marked nodes */
if (CAS (&(left_node.next), left_node_next, right_node)) /*C1*/
if ((right_node != tail) && is_marked_reference(right_node.next))
goto search_again; /*G2*/
else
return right_node; /*R2*/
} while (true); /*B2*/
}
~~~
### delete 的部份:
* 再發現未被標記的值即將刪除,會用 CAS 的標記值蓋過去 , break 出去,再重新刪除,這部份就像論文中說的 update 。
~~~ clike=
public boolean List::delete (KeyType search_key) {
Node *right_node, *right_node_next, *left_node;
do {
right_node = search (search_key, &left_node);
//找不到回傳false
if ((right_node == tail) || (right_node.key!= search_key)) /*T1*/
return false;
right_node_next = right_node.next;
//如果未被標記的話,會用 CAS 的標記值蓋過去 , break 出去
if (!is_marked_reference(right_node_next))
if (CAS (&(right_node.next), /*C3*/
right_node_next, get_marked_reference (right_node_next)))
break;
} while (true); /*B4*/
//如果左邊的下一個是右節點的話,表示沒人更動過資料
//可以進行刪除
if (!CAS (&(left_node.next), right_node, right_node_next)) /*C4*/
right_node = search (right_node.key, &left_node);
return true;
}
~~~
### 關於正確性
證明的想法:
We will take a fairly direct approach to outlining the linearizability of the operations by identifying particular instants during their execution at which the complete operation appears to occur atomically
* 操作執行時,在特定的過程,尤其以 atomic function 來實現,去證明結果線性化
#### A.Conditions Maintained by Search
* 在 search 維護的情況下,使用以下 loop 確保是升序狀態:
~~~
search_key <= right_node.key
left_node.key < search.key
~~~
* 在 search 函式返回值分成 2 個,確保左右相鄰
~~~
/*程式碼再上面*/
/*R1*/ 如果確認完左右還是相鄰
/*R2*/ 原本的左右不相鄰就用 CAS 的 compare_and_swap 值蓋掉
~~~
* 對於右節點的標記狀態,通過前3個回傳條件都是true後,觀察兩個返回路徑都要確認右結點未被標記,而右結點從未變成 unmarked ,我們推斷起初右節點沒有標記。
* CAS 指令 C1 成功後,通過從 list 中 unlinking 標記的節點
#### B.Linearization points
* 這裡是在說明 op(i,m) 是和操作次數 (m) , processor 數 (i) 有關,也就是,在i個 processor 下,執行m次的表現
* 而 d(i,m) 則是執行時 search 在後置條件被滿足時的 final real-time
* 我們一直希望利用多執行緒下去做的排序,如果 thread 愈多理論上是愈快
![](https://i.imgur.com/aJb6eG8.png)
* 如果能成功算得 d(i,m),就表示我們右節點沒有被 mark 且包含 search_key
* 如果利用 Find(k) 去搜尋,發現在 search_key 右半部的nodes 的 key 是嚴格遞減,左半部的nodes 的 key 是嚴格遞增,可證其正確性。
#### C.Progress
* We will show that the concurrent implementation is non-blocking.
* 每次成功的插入都會產生一個更新
* 每次成功的刪除產生最多兩次更新
### 結果
* 如果單看這篇論文的結果拿 NEW 和 MUTEX 比,是這篇 (NEW) 的花的CPU time(user+system)比 MUTEX 少。
![](https://i.imgur.com/JEhoIKL.png)
### 初步想法
* 拿兩條 thread 舉例,之後數目會往上加
* 因為在 cut 時已經能分工,是 merge 的部份需要改進
* source 和 destination 是兩個 sort 好的linked list
* 我們想把 source 一個個元素安插進 destination,將他們 merge 起來
```sequence
Note left of source:thread1
source->destination: insert(CAS)
Note right of destination:紀錄thread1插入位置
Note left of source:thread 2
source->destination: insert(CAS)
Note right of destination:紀錄thread2插入位置
```
* 每條 thread 工作:(以 thread1 為例)
STEP1 : thread1 insert a element to destination
STEP2 : insert 成功
STEP3 : 紀錄 thread1 插入位置
STEP4 : 下次由 thread1 本身紀錄的位置下去 search
## concurrent merge
merge sort 在最後會將兩個已經排序好的資料重新 merge ,合併成一筆排序好的資料,但是如果兩筆資料量很大時,只有一個 thread 去處理會相當得慢。
因此我們想要將這部份以多個 thread 下去 lock-free 的方式處理,在看了論文之後,我們的想法大概是這樣:
1. 有兩個排序好的 linked list ,我們根據 linked list 的大小,分成 source 與 destination ,目標是將 source 合併至 destination
2. 根據 source 的大小,決定需要多少個 thread 參與其中
3. 每個 thread 會從 source 取得一個 node,並試著插入到 destination
4. 為了避免每次都要從頭 ( destination 的 `head` ) 尋找,每個 thread 會紀錄上一次的插入位置,等取得下一個 node 時,直接從上一次的插入位置尋找
這是因為 source 資料已經排序好,已經確定下一個 node 插入的位置必定在上一個 node 後面
5. 最後一個 thread 把完成的 linked list 再次開啟一個新的 task 再 push 入 task queue 之中
參考了原本的論文實作出類似的架構
* 對於 source ,我們實作出 `list_pop()` 從 source 取出一個 node
* 對於 destination ,實作出 `list_insert()` , `list_insert()` 會使用 `list_search()` 協助找尋插入點
以上的操作比起論文還要來的單純,一加一減的 linked list 使得我們不用去考慮 marked 以及 unmarked 的問題
這是 concurrent merge 所需的參數,除了上述提到的 `source` 以及 `dest` ,還加入了 `task_cnt` 作為 counter,最後會遞減至 0 ,代表所有 thread 已經做完
這不代表一定要所有 thread 進入才開始,只要有一個 thread 進入就開始 merge 的動作,但一定會有`task_cnt` 初始值的 thread 進入,即使資料已經處理完也會先進入爾後離開
```clike=
typedef struct {
llist_t *source;
llist_t *dest;
int task_cnt;
} merge_list_arg_t;
```
在每個 thread 排序完自身的 linked list 時會丟入 `tmp_list` ,或是因為 `tmp_list` 已有 list 存在開啟新的 task ,設定好相關的參數,決定應進入 thread 數量,開啟等量的 task ,並行 merge
```clike=
void merge_thread_lists(void *data)
{
llist_t *_list = (llist_t *) data;
if (_list->size < (uint32_t) data_count) {
pthread_mutex_lock(&(data_context.mutex));
llist_t *_t = tmp_list;
if (!_t) {
tmp_list = _list;
pthread_mutex_unlock(&(data_context.mutex));
} else {
tmp_list = NULL;
pthread_mutex_unlock(&(data_context.mutex));
merge_list_arg_t *arg = malloc(sizeof(merge_list_arg_t));
if (_t->size < _list->size) {
arg->source = _t;
arg->dest = _list;
} else {
arg->source = _list;
arg->dest = _t;
}
arg->task_cnt = arg->source->size / local_size * 2;
int task_cnt = arg->task_cnt;
for (int i = 0; i < task_cnt; i++)
tqueue_push(pool->queue, task_new(concurrent_merge, arg));
}
} else {
the_list = _list;
tqueue_push(pool->queue, task_new(NULL, NULL));
}
}
```
每個 thread 的工作就是不斷地
拿取 node >> 插入 node
當再也 pop 不出任何東西時,會去將 `task_cnt` 減 1 離開,當最後一個 thread 也準備離開時(line 13: `if (old_cnt == 1)`):
* 更改 source 與 dest 的大小
> 其實要 `list_pop` `list_insert` 就即時更新了 ... :sweat_smile:
* 釋放 source 的 linked list
* 將完成的 list 丟入task queue 繼續做後續處理
```clike=
void concurrent_merge(void *data)
{
merge_list_arg_t *arg = (merge_list_arg_t *)data;
llist_t *source = arg->source;
llist_t *dest = arg->dest;
node_t *cur = NULL, *node = NULL;
int old_cnt = 0;
cur = dest->head;
while (1) {
node = list_pop(source);
if (!node) {
old_cnt = atomic_fetch_sub(&(arg->task_cnt), 1);
if (old_cnt == 1) {
dest->size += source->size;
source->size = 0;
list_free_nodes(source);
tqueue_push(pool->queue, task_new(merge_thread_lists, dest));
}
break;
}
list_insert(dest, &cur, node);
}
}
```
`list_pop` 、 `list_insert` 、 `list_push`
```clike=
node_t *list_pop(llist_t *list)
{
node_t *head = list->head;
node_t *tail = list->tail;
node_t *node, *node_next;
do {
node = head->next;
if (node == tail)
return NULL;
node_next = node->next;
if (atomic_compare_exchange_weak(&(head->next), &node, node_next))
break;
} while (1);
return node;
}
node_t *list_search(node_t *cur, node_t *tail, val_t data, node_t** left_node)
{
node_t *right_node;
node_t *t = cur;
node_t *t_next = cur->next;
do {
*(left_node) = t;
t = t_next;
if (t == tail)
break;
t_next = t->next;
} while (strcmp((char *)t->data, (char *)data) <= 0);
right_node = t;
return right_node;
}
void list_insert(llist_t *list, node_t **cur, node_t * node)
{
node_t *left_node, *right_node;
do {
right_node = list_search(*cur, list->tail, node->data, &left_node);
node->next = right_node;
if (atomic_compare_exchange_weak(&(left_node->next), &right_node, node)) {
*cur = node;
return;
} else
*cur = left_node;
} while (1);
}
```
### 修改資料結構
上面沒有提到的問題點是關於 linked list 的資料結構,因為論文的資料結構與原本的不一樣, 所以需要一些改變
原本 list 的資料結構,且初始化 head 指向 `NULL`
```clike
typedef struct {
node_t *head; /**< The head of the linked list */
uint32_t size; /**< The size of the linked list */
} llist_t;
```
```clike
llist_t *list_new()
{
/* allocate list */
llist_t *list = malloc(sizeof(llist_t));
list->head = NULL;
list->size = 0;
return list;
}
```
但現在新增了 `tail` 且 `head` 與 `tail` 會各指向一個不會使用到、配置空間的 node (這個 node 稱為 [sentinel node](https://en.wikipedia.org/wiki/Linked_list#Sentinel_nodes)) 來使演算法正確運作
```clike
typedef struct {
node_t *head; /**< The head of the linked list */
node_t *tail; /**< The tail of the linked list */
uint32_t size; /**< The size of the linked list */
} llist_t;
```
```clike
llist_t *list_new()
{
/* allocate list */
llist_t *list = malloc(sizeof(llist_t));
list->tail = node_new((val_t)NULL, NULL);
list->head = node_new((val_t)NULL, list->tail);
list->size = 0;
return list;
}
```
這樣的改動連帶牽扯了其他相關的 function 都要修正
包含 `mergesort.c` 中的 `split_n_merge()` 與 `sort_n_merge()`
`main.c` 中的 `build_list_from_file()` 與 `cut_local_list()`
有些 function 需要頻繁的使用 `list_get` 來取得最後一個 node ,也就是說整個 list 都遍歷了
...結果這樣使得效率整個降低
1 個 thread,純修改資料結構的結果
```
#Total_tasks_consumed: 3
#Elapsed_time: 378.183 ms
#Throughput: 7 (per sec)
```
```
#Total_tasks_consumed: 3
#Elapsed_time: 475.714 ms
#Throughput: 6 (per sec)
```
光是如此已經在時間上拉出差距
### 重新調整資料結構
因為調整資料結構對於 merge sort 帶來的衝擊很大,最後是不改變原始的資料結構,只有當有需要的時候再改變
為此當要並行 merge 時,我們需要 sentinel nodes,我們在執行前加入了 sentinel head
```clike
void list_add_dummy_head(llist_t *list)
{
node_t *new_head = node_new((val_t)NULL, list->head);
list->head = new_head;
}
void list_remove_dummy_head(llist_t *list)
{
node_t *new_head = list->head->next;
free(list->head);
list->head = new_head;
}
```
至於 sentinal tail 是不一定要的:在我們的程式碼中所有的 `list->tail` 都可以用 `NULL` 取代
`merge_thread_list()` 第 13、14 行針對 src 與 dest 做這樣的處理,再丟入 task 中執行 `concurrent_merge()`
```clike=
void merge_thread_lists(void *data)
{
llist_t *_list = (llist_t *) data;
if (_list->size < (uint32_t) data_count) {
pthread_mutex_lock(&(data_context.mutex));
llist_t *_t = tmp_list;
if (!_t) {
tmp_list = _list;
pthread_mutex_unlock(&(data_context.mutex));
} else {
tmp_list = NULL;
pthread_mutex_unlock(&(data_context.mutex));
list_add_dummy_head(_t);
list_add_dummy_head(_list);
merge_list_arg_t *arg = malloc(sizeof(merge_list_arg_t));
if (_t->size < _list->size) {
arg->source = _t;
arg->dest = _list;
} else {
arg->source = _list;
arg->dest = _t;
}
arg->task_cnt = arg->source->size / local_size * 2;
int task_cnt = arg->task_cnt;
for (int i = 0; i < task_cnt; i++)
tqueue_push(pool->queue, task_new(concurrent_merge, arg));
}
} else {
the_list = _list;
tqueue_push(pool->queue, task_new(NULL, NULL));
}
}
```
`concurrent_merge()` 的最後一個 thread 再把 dest 的 sentinel head 移除 (第 17 行)
```clike=
void concurrent_merge(void *data)
{
merge_list_arg_t *arg = (merge_list_arg_t *)data;
llist_t *source = arg->source;
llist_t *dest = arg->dest;
node_t *cur = NULL, *node = NULL;
int old_cnt = 0;
cur = dest->head;
while (1) {
node = list_pop(source);
if (!node) {
old_cnt = atomic_fetch_sub(&(arg->task_cnt), 1);
if (old_cnt == 1) {
dest->size += source->size;
source->size = 0;
list_free_nodes(source);
list_remove_dummy_head(dest);
tqueue_push(pool->queue, task_new(merge_thread_lists, dest));
}
break;
}
list_insert(dest, &cur, node);
}
}
```
### 實驗結果
因為 lock-free 很容易爆炸,這邊以維持 mutex 做測試
![](https://i.imgur.com/kdKPZ5V.png)
時間確實有下降一點!不過當 thread 數目變多,時間會再往上增長,主要原因是因為
```clike
arg->task_cnt = arg->source->size / local_size * 2;
```
這是決定 thread 參與數量的方式, `local_size` 代表 thread 當初分割的local linked list 資料量 ,也就是使用與處理這些資料同等數量的 thread 做 merge ,但是每個 thread 的參與需要遍歷整個 linked list , thread 多於核心數等於過度浪費時間在 `list_search()` 上,反而降低效益
### 限制 merge 參與的 thread 數
最後將能夠參與的數量限制在核心數,這裡使用到了 [get_nprocs()](http://man7.org/linux/man-pages/man3/get_nprocs.3.html)
執行環境在雙核且支援 hyper-threading 的 intel 處理器,`get_nprocs()` 回傳了 4
文中有特別指出這個 function 成本昂貴,因為要去開啟 /sys 底下的檔案解析,因此只使用一次存下來
```clike
int main()
{
/* ... */
num_of_procs = get_nprocs();
/* ... */
}
```
```clike
arg->task_cnt = MIN(num_of_procs, arg->source->size / local_size * 2);
```
![](https://i.imgur.com/o5r5pDP.png)
## 討論改進效果不顯著的原因
我們的目標是要改善程式碼的 scalability,但可以由上述的比較圖看出,程式在 thread 數為 4 的時候表現最佳,thread 數繼續變多的時候,無法更快甚至有變慢的情形。所以我們決定驗證這個現象的原因。
我們歸納出的原因有兩個,第一個是程式本身不夠好,concurrent merge 的機制不夠好,因為愈多個 thread 要一起 merge ,
每個 thread 都要拜訪 list 的所有 node ,或者是 thread pool 的機制也尚待改進;第二個是 thread 之間的 context switch 所花的時間,若 context switch 所花的時間遠小於我們程式碼所花的時間,就代表 scalability 還有增加的空間。
以下數據以 64 threads 的 case 來比較
* 程式碼花的時間(平均): 436 ms
* context switch 的次數
```
Performance counter stats for './sort 64 test_data/input.txt':
1,519 cs
1.180769370 seconds time elapsed
```
* petermouse 的電腦的 context switch 花的時間
* [contextswitch](https://github.com/tsuna/contextswitch)
* [lmbench-next](https://github.com/jserv/lmbench-next)
```
./cpubench.sh
model name : Intel(R) Core(TM) i5-4200H CPU @ 2.80GHz
1 physical CPUs, 2 cores/CPU, 2 hardware threads/core = 4 hw threads total
-- No CPU affinity --
10000000 system calls in 454871760ns (45.5ns/syscall)
2000000 process context switches in 1997589690ns (998.8ns/ctxsw)
2000000 thread context switches in 2240424384ns (1120.2ns/ctxsw)
sched_setscheduler(): Operation not permitted
2000000 thread context switches in 61020406ns (30.5ns/ctxsw)
-- With CPU affinity --
10000000 system calls in 426586426ns (42.7ns/syscall)
2000000 process context switches in 1962945555ns (981.5ns/ctxsw)
2000000 thread context switches in 1661305925ns (830.7ns/ctxsw)
sched_setscheduler(): Operation not permitted
2000000 thread context switches in 271151922ns (135.6ns/ctxsw)
-- With CPU affinity to CPU 0 --
10000000 system calls in 424558129ns (42.5ns/syscall)
2000000 process context switches in 2258118701ns (1129.1ns/ctxsw)
2000000 thread context switches in 2034381771ns (1017.2ns/ctxsw)
sched_setscheduler(): Operation not permitted
2000000 thread context switches in 58380978ns (29.2ns/ctxsw)
```
436 ms >> 1519 * 830.7 ns ~ 1 ms
### 上課筆記
* 老師提到可以在 merge 的過程中重新建立記憶體連續的資料 (array)
* [CFS -- linux 2.6.23](https://en.wikipedia.org/wiki/Completely_Fair_Scheduler)
* context switch 時間 : https://github.com/jserv/lmbench-next
[Quantifying The Cost of Context Switch∗](http://www.cs.rochester.edu/u/cli/research/switch.pdf)
### 關於原版 context switch 不可行的原因
附上原版 [github](https://github.com/tsuna/contextswitch)
~~~ clike=
static void* thread(void*ctx) {
(void)ctx;
for (int i = 0; i < iterations; i++)
sched_yield();
return NULL;
}
~~~
#### 資料來源 [sched_yield()](http://man7.org/linux/man-pages/man2/sched_yield.2.html#top_of_page) , [CFS](https://en.wikipedia.org/wiki/Completely_Fair_Scheduler)
* DESCRIPTION
~~~
sched_yield() causes the calling thread to relinquish the CPU.The thread is moved
to the end of the queue for its static priority and a new thread gets to run.
~~~
* RETURN VALUE
~~~
On success, sched_yield() returns 0. On error, -1 is returned,and errno is
set appropriately.
~~~
* 本身電腦版本Linux version 4.4.0
~~~ clike=
tina@tina-X550VB:~$ cat /proc/version
Linux version 4.4.0-78-generic (buildd@lgw01-11) (gcc version 5.4.0 20160609 (Ubuntu 5.4.0-6ubuntu1~16.04.4) ) #99-Ubuntu SMP Thu Apr 27 15:29:09 UTC 2017
~~~
* CFS was merged into the 2.6.23
~~~
The Completely Fair Scheduler (CFS) is a process scheduler which was merged into the 2.6.23 (October 2007) release of the
Linux kernel and is the default scheduler.
~~~
* CFS 相較於 2.6 版本以前的 kernel
* In contrast to the previous O(1) scheduler used in older Linux 2.6 kernels, the CFS scheduler implementation is not based on run queues. Instead, a red–black tree implements a "timeline" of future task execution.
* CFS uses a concept called "sleeper fairness", which considers sleeping or waiting tasks equivalent to those on the runqueue.
* 參考[文件](http://people.redhat.com/mingo/cfs-scheduler/sched-design-CFS.txt)中提到
~~~
( another detail: due to nanosec accounting and timeline sorting,sched_yield()
support is very simple under CFS, and in fact under CFS sched_yield() behaves
much better than under any other scheduler i have tested so far.
~~~
### jserv's lmbench-next
#### [lmbench](http://lmbench.sourceforge.net/cgi-bin/man?keyword=lmbench§ion=8)
一種 system benchmark
[benchmp](http://manpages.ubuntu.com/manpages/precise/timing.3.html)
## 重新建立連續記憶體
### 先從 `sort_n_merge` 著手
連續的記憶體結構可以使 cache-miss 有機會降低
在排序 local linked list 的過程中 ,嘗試在 merge 的時候先分配一塊準備要塞排序好的資料空間,當作是一個陣列的資料結構來 merge,但內部還是存在 linked list 的結構,所以 next 單純指向下一個陣列元素,方便在之後可以 concurrent merge 時有幫助
```clike=
llist_t *sort_n_merge(llist_t *a, llist_t *b)
{
llist_t *_list = list_new();
_list->size += a->size + b->size;
node_t* node_list = malloc(sizeof(node_t) * (a->size + b->size));
int idx = 0;
while (a->size && b->size) {
/* Choose the linked list whose data of first node is small. */
llist_t *small;
int cmp = strcmp((char*)(a->head->data), (char*)(b->head->data));
small = (llist_t *)((intptr_t) a * (cmp <= 0) + (intptr_t) b * (cmp > 0));
/* Extract first node of selected list and put to the new list. */
node_list[idx].data = small->head->data;
node_list[idx].next = node_list + idx + 1;
idx++;
small->head = small->head->next;
--small->size;
}
/* Append the remaining nodes */
llist_t *remaining = (llist_t *) ((intptr_t) a * (a->size > 0) +
(intptr_t) b * (b->size > 0));
while(remaining->head) {
node_list[idx].data = remaining->head->data;
remaining->head = remaining->head->next;
node_list[idx].next = node_list + idx + 1;
idx++;
}
node_list[idx - 1].next = NULL;
free(a->head);
free(b->head);
free(a);
free(b);
_list->head = node_list;
return _list;
}
```
實驗結果
![](https://i.imgur.com/qXpXy7S.png)
目前時間還沒有顯著差異
### 從連續的記憶體改善 `concurrent_merge`
先前 `concurrent_merge` 會使得每個 merge 的 thread 幾乎遍歷過整個 linked list 來找尋適合插入的點並插入 node
但是現在建立在連續記憶體的結構下,我們可以
* 利用 binary search 找尋接近的點,下去 insert node
* 而且每插入一個 node ,就縮小 binary search 的邊界 (因為插入點之前的 node 不會再由同個 thread 插入)
* 分配好 source linked list 的切割 ,不用每個 thread 去爭奪
#### 重新調整資料結構
待補
![](https://i.imgur.com/tdeLmKm.png)
#### binary search
to be continued...
## 參考資料
[pthread_cond_signal](https://linux.die.net/man/3/pthread_cond_signal)
[pthread_cond_wait](https://linux.die.net/man/3/pthread_cond_wait)
[](https://www.ibm.com/developerworks/library/l-completely-fair-scheduler/)
[](http://man7.org/linux/man-pages/man7/sched.7.html)
[](http://www.cs.montana.edu/~chandrima.sarkar/AdvancedOS/CSCI560_Proj_main/)