Try   HackMD

2017q1 team1 mergesort-concurrent

contributed by <zmkepetermousetina0405heathcliffYang>

檢討之前的程式碼

從原版來想

在原始的版本中,會將輸入的 list 分割給每個 thread 去做 merge ,而最後 thread 之間 list 的 merge 則是一個不能並行執行的部分,無法隨著 thread 數增加而提高執行效率,是增加 scalability 會遇到的瓶頸。

這讓我們想到一個在計組教過的 Amdahl's law

不能並行的部分則讓核心數增加帶來的加速效果存在極限。

所以我們把其中一個目標放在 thread 之間 list 的 merge 也要讓它可以並行處理,實作方法在下面的部分會詳細介紹。

另外,在與新夥伴溝通時,發現他們參考 Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms 嘗試過 lock-free threadpool。

  1. tqueue_pop()
task_t *tqueue_pop(tqueue_t * const the_queue) { task_t *head = NULL, *tail = NULL, *next = NULL, *ret = NULL; while(1) { head = the_queue->head; tail = the_queue->tail; next = head->next; if(head == the_queue->head) { if(head == tail) { if(next == NULL) { return NULL; } atomic_compare_exchange_weak(&(the_queue->tail), &tail, next); } else { ret = next; if(atomic_compare_exchange_weak(&(the_queue->head), &head, next)) { break; } } } } atomic_fetch_sub(&(the_queue->size), 1); atomic_fetch_add(&(the_queue->num_of_consumed), 1); return ret; }
  1. tqueue_push()
int tqueue_push(tqueue_t * const the_queue, task_t *task) { task_t *tail = NULL, *tail_next = NULL; task->next = NULL; while(1) { tail = the_queue->tail; tail_next = tail->next; if(tail == the_queue->tail) { if(tail_next == NULL) { if(atomic_compare_exchange_weak(&(tail->next), &tail_next, task)) { if(atomic_compare_exchange_weak(&(the_queue->tail), &tail, task)) { } break; } } else { if(atomic_compare_exchange_weak(&(the_queue->tail), &tail, tail_next)) { } } } } atomic_fetch_add(&(the_queue->size), 1); return 0; }
  1. 論文中沒能實踐的部分
    中介的資料結構,這個資料結構去指向真正的 node (node_t) 以及存放一個變數名稱 count 代表 reference counting 的次數,可以用來避免 ABA 問題
structure pointer_t { ptr: pointer to node_t, count: unsigned integer }

實驗結果

以下 2 、 3 版本都有包含 lock-free threadpool,且都是以 ./sort 16 test_data/input.txt 進行測驗。

  1. Original
    原版在最後 thread 之間的 list 做 merge 時,因為工作不能多執行緒一起執行,所以其他 thread 會持續要求工作直到結束。

  2. Condition variable
    這個版本是 lock-free thread pool 與 condition variable 一起的實作,而 condition variable 則會在 monitor 的部分做介紹。
    因為 condition variable 的關係,當 queue 裡面沒有工作時,空閒的 thread 不會佔用到 CPU 的時間,tqueue_pop 的次數有減少,但花費時間卻增加了。

    此外,lock-free thread pool 會導致在 thread 多時,跑不完。

  3. Condition variable + merge concurrent
    這個版本是 lock-free thread pool 與 condition variable ,再加上 thread 之間的 list 做 merge 的工作是 concurrent。
    tqueue_pop 的次數減少一半,後期 merge 的工作已可以允許多個 thread 一起執行;其中工作可以容納的 thread 數有限制: arg->task_cnt = arg->source->size / local_size * 2 ,這部分需要再思考如何訂定最有效益的辦法;這是初版的 concurrent merge ,改變了所有 list 的資料結構,同時也可以看到 list_search 與 list_insert 有相當的成本,也是一個可以考慮改進的方向。

  • 三種實作的效能比較
    三種方法都在 thread 數為 4 時表現較好,而無可避免地都在 thread 數繼續往上增加時,讓花費時間變多,其中加入 concurrent merge 之後卻不盡理想的原因在之後的部份會詳細說明。

這張圖忘記改 label 了

  • 另外的問題
    其實每次測試,時間的浮動非常大,所以測試時應該要多次量測、使用統計的方法來取得較真實的實驗數據。

Monitor

  • 讓 thread 可以保持 mutual exclution 並等待直到特定的 condition 成立

  • monitor 包含 mutex 和 conditional variable

  • pthread_cond_wait(cond, mutex)

    • release mutex
    • block on the condition variable
    • sleep the thread
    • 當被其他 thread 透過 signal() 或 broadcast() 喚醒之後,會再獲得 mutex
  • pthread_cond_signal(cond)

    • unblock threads blocked on a conditional variable
    • 至少喚醒一條 thread ,就算沒有 thread 在等待也會成功 return
    • 如果有多條 thread 同時在等待同一個 conditional variable ,會由排程的優先順序決定哪條 thread 優先被喚醒
    • 在 multi processor 的環境下,當 condition variable 同時被不同 processor signal 時,可能會喚醒多條 thread
  • pthread_cond_broadcast(cond)

    • 喚醒所有在等待該 condtional variable 的 thread
  • 當 thread 被喚醒之後,應該要重新評估是否該繼續執行或是再等待一次,所以 condition wait 常被實作在 while loop 內,來確保執行的安全性

Monitor 實作

原本的程式在 task queue 沒東西的時候,也會一直嘗試 tqueue_pop() ,造成 CPU time 的浪費,尤其是在 thread 數量多的時候影響會更大
我們想要利用 condition variable 讓空閒的 thread 在 task queue 裡面沒有東西的時候 sleep ,等到有 task push 到 task queue 裡面之後,再喚醒他們起來要工作,希望減少 pop 空的 task queue 造成的浪費

task_run()

一開始就拿 mutex ,之後檢查 task queue 內是否是空的,如果是空的就等待並釋放 mutex ,被喚醒之後會拿到 mutex 但是 pool->queue->size 可能已經被改變,所以要再檢查一次,如果沒問題就釋放 mutex ,否則就是再等待一次

static void *task_run(void *data __attribute__ ((__unused__))) { while (1) { /*Get the mutex lock*/ pthread_mutex_lock(&(pool->queue->mutex)); /*Check wether the number of task in the thread queue is zero*/ /*If the task queue is empty, block the thread and release the mutex lock*/ while(pool->queue->size == 0) pthread_cond_wait(&(pool->queue->cond), &(pool->queue->mutex)); /*When the conditional variable is signaled, wake up the thread and get the mutex lock*/ /*Release the mutex lock*/ pthread_mutex_unlock(&(pool->queue->mutex)); task_t *_task = tqueue_pop(pool->queue); if (_task) { if (!_task->func) { tqueue_push(pool->queue, task_new(NULL, NULL)); free(_task); break; } else { _task->func(_task->arg); free(_task); } } } pthread_exit(NULL); }

tqueue_push()

在 push 的最後加上 pthread_cond_signal() ,藉此通知被 block 的 thread 已經有工作可以拿

int tqueue_push(tqueue_t * const the_queue, task_t *task) { task_t *tail = NULL, *tail_next = NULL; task->next = NULL; while(1) { tail = the_queue->tail; tail_next = tail->next; if(tail == the_queue->tail) { if(tail_next == NULL) { if(atomic_compare_exchange_weak(&(tail->next), &tail_next, task)) { if(atomic_compare_exchange_weak(&(the_queue->tail), &tail, task)) { } break; } } else { if(atomic_compare_exchange_weak(&(the_queue->tail), &tail, tail_next)) { } } } } atomic_fetch_add(&(the_queue->size), 1); /*Signal the condtional variable*/ pthread_cond_signal(&(the_queue->cond)); return 0; }

實驗結果

論文閱讀

Lock-free linkedlist implementation of Harris' algorithm

  • 原本的數字串如下:

  • 如果想要 insert Node_20 , 我們會這樣做:

  • 如果想要 delete Node_10 , 我們會這樣做:

  • 但如果現在 insert Node_20 和 delete Node_10 同時發生,就有可能出現以下情況:

  • 但我們不樂見,因為我們希望他會走訪 Node_20

  • 解決方法:

    • 步驟1:左邊的圖是先將刪除點 mark 起來

    • 步驟2:右邊的圖是真的刪掉 node_10

    • logically deleted : 完成步驟1

    • phisically deleted : 完成步驟2

      • 雖然這邊沒有將 node_10 的 right node 斷掉,但整個list 的運行並不走訪 node_10
  • 好處:

    • 被標記的的 node 仍然能夠通過,但必須要能分變出與原來未標記狀態的不同。
    • 此時當訊號 concurrent 時,以上面為例,要插入 node_20 和發現 node_10 是 logically deleted 同事發生時,必須先強制將 10 做 phisically deleted 才可插入,如此一來就可以避免以上情形發生。

insert 的部份:

  • 一開始本來對,如果要序列: 1,4 中間同時插入 2 跟 3 的話可能會發生,兩個 2 跟 3 同時指到4,此時序列就不是我們要的 1,2,3,4 了 ,但以下程式碼用 CAS 插入做了檢查的動作:

    • 文中利用 CAS(addr,o,n) 去比較 addr 和舊的內容 o,有沒有一樣,有的話就寫入,沒有的話就一直重複做,他 return 的 boolean 就是用來提醒相鄰的結點如果沒有一樣的話就要update(重做)。
public boolean List::insert (KeyType key) { //先將 key 建立節點叫 new_node Node *new_node = new Node(key); //建立左右節點 Node *right_node, *left_node; do { right_node = search (key, &left_node); //右節點不是尾巴且和我們要插入的值一樣就不插入 //這段我前一篇在改時有拿掉 //因為他讓序列 1,2,2,3 變成 1,2,3 if ((right_node != tail) && (right_node.key == key)) /*T1*/ return false; //右節點是我們即將插入節點的下一個 new_node.next = right_node; //如果你的左邊節點的下一個是你的右邊節點,這樣就可以插入 //確保中途沒有其他人改過這段資料 if (CAS (&(left_node.next), right_node, new_node)) /*C2*/ return true; } while (true); /*B3*/ }

find 的部份:

  • 一開始先傳進去 search_key 找完存在 right_node ,如果right_node 已經在 tail 的位置或跟傳進去的不一樣就不用找了回傳 false ,一樣才傳 true 。
public boolean List::find (KeyType search_key) { Node *right_node, *left_node; //把要找的 search_key 放置 right_node = search (search_key, &left_node); //如果一樣,而且不是 tail 才回傳 true if ((right_node == tail) || (right_node.key != search_key)) return false; else return true; }

search 的部份:

  • 分成3個部份:
  • part 1
    • 這組迴圈得先判斷是否有被做記號,如果沒有的話,那就是 t 的值小於搜尋值,把 t 的值存在 *left_node , t_next 存在 left_node_next,然後將 t_next unmark 存進去 t,而 t_next 也會等於 t.next ,如此一來就可以一直往下比
    • 如果有 mark 就不會存他,直接跳到他的 next
    • 直到看到沒 mark 而且 t 的值又大於搜尋值的時候。
/* 1: Find left_node and right_node */ do { if (!is_marked_reference(t_next)) { (*left_node) = t; left_node_next = t_next; } t = get_unmarked_reference(t_next); if (t == tail) break; t_next = t.next; // } while (is_marked_reference(t_next) || (t.key<search_key)); /*B1*/ right_node = t;
  • part 2
    • 這組是在 check 左邊的點和右邊的點之間,有沒有突然插入點或刪除點,因為每個執行序都再做事,很可能會不小心動到當前 search 的執行序在使用的點,所以要不停確認。
/* 2: Check nodes are adjacent */ if (left_node_next == right_node) //如果你右邊的 node 的下一個還有做記號,就要再 search 一次 if ((right_node != tail) && is_marked_reference(right_node.next)) goto search_again; /*G1*/ //沒有就可以回傳值了! else return right_node; /*R1*/
  • part 3
    • 因為沒通過上方 if 的 check 所以必須要有以下 CAS 把 right_node 蓋回來。
/* 3: Remove one or more marked nodes */ if (CAS (&(left_node.next), left_node_next, right_node)) /*C1*/ if ((right_node != tail) && is_marked_reference(right_node.next)) goto search_again; /*G2*/ else return right_node; /*R2*/ } while (true); /*B2*/ }

delete 的部份:

  • 再發現未被標記的值即將刪除,會用 CAS 的標記值蓋過去 , break 出去,再重新刪除,這部份就像論文中說的 update 。
public boolean List::delete (KeyType search_key) { Node *right_node, *right_node_next, *left_node; do { right_node = search (search_key, &left_node); //找不到回傳false if ((right_node == tail) || (right_node.key!= search_key)) /*T1*/ return false; right_node_next = right_node.next; //如果未被標記的話,會用 CAS 的標記值蓋過去 , break 出去 if (!is_marked_reference(right_node_next)) if (CAS (&(right_node.next), /*C3*/ right_node_next, get_marked_reference (right_node_next))) break; } while (true); /*B4*/ //如果左邊的下一個是右節點的話,表示沒人更動過資料 //可以進行刪除 if (!CAS (&(left_node.next), right_node, right_node_next)) /*C4*/ right_node = search (right_node.key, &left_node); return true; }

關於正確性

證明的想法:

We will take a fairly direct approach to outlining the linearizability of the operations by identifying particular instants during their execution at which the complete operation appears to occur atomically

  • 操作執行時,在特定的過程,尤其以 atomic function 來實現,去證明結果線性化
  • 在 search 維護的情況下,使用以下 loop 確保是升序狀態:
       search_key <= right_node.key
       left_node.key < search.key
  • 在 search 函式返回值分成 2 個,確保左右相鄰
/*程式碼再上面*/
/*R1*/ 如果確認完左右還是相鄰

/*R2*/ 原本的左右不相鄰就用 CAS 的 compare_and_swap 值蓋掉
  • 對於右節點的標記狀態,通過前3個回傳條件都是true後,觀察兩個返回路徑都要確認右結點未被標記,而右結點從未變成 unmarked ,我們推斷起初右節點沒有標記。

  • CAS 指令 C1 成功後,通過從 list 中 unlinking 標記的節點

B.Linearization points

  • 這裡是在說明 op(i,m) 是和操作次數 (m) , processor 數 (i) 有關,也就是,在i個 processor 下,執行m次的表現

  • 而 d(i,m) 則是執行時 search 在後置條件被滿足時的 final real-time

  • 我們一直希望利用多執行緒下去做的排序,如果 thread 愈多理論上是愈快

  • 如果能成功算得 d(i,m),就表示我們右節點沒有被 mark 且包含 search_key

  • 如果利用 Find(k) 去搜尋,發現在 search_key 右半部的nodes 的 key 是嚴格遞減,左半部的nodes 的 key 是嚴格遞增,可證其正確性。

C.Progress

  • We will show that the concurrent implementation is non-blocking.
  • 每次成功的插入都會產生一個更新
  • 每次成功的刪除產生最多兩次更新

結果

  • 如果單看這篇論文的結果拿 NEW 和 MUTEX 比,是這篇 (NEW) 的花的CPU time(user+system)比 MUTEX 少。

初步想法

  • 拿兩條 thread 舉例,之後數目會往上加
  • 因為在 cut 時已經能分工,是 merge 的部份需要改進
  • source 和 destination 是兩個 sort 好的linked list
  • 我們想把 source 一個個元素安插進 destination,將他們 merge 起來
Created with Raphaël 2.2.0sourcesourcedestinationdestinationthread1insert(CAS)紀錄thread1插入位置thread 2insert(CAS)紀錄thread2插入位置
  • 每條 thread 工作:(以 thread1 為例)
    STEP1 : thread1 insert a element to destination
    STEP2 : insert 成功
    STEP3 : 紀錄 thread1 插入位置
    STEP4 : 下次由 thread1 本身紀錄的位置下去 search

concurrent merge

merge sort 在最後會將兩個已經排序好的資料重新 merge ,合併成一筆排序好的資料,但是如果兩筆資料量很大時,只有一個 thread 去處理會相當得慢。

因此我們想要將這部份以多個 thread 下去 lock-free 的方式處理,在看了論文之後,我們的想法大概是這樣:

  1. 有兩個排序好的 linked list ,我們根據 linked list 的大小,分成 source 與 destination ,目標是將 source 合併至 destination
  2. 根據 source 的大小,決定需要多少個 thread 參與其中
  3. 每個 thread 會從 source 取得一個 node,並試著插入到 destination
  4. 為了避免每次都要從頭 ( destination 的 head ) 尋找,每個 thread 會紀錄上一次的插入位置,等取得下一個 node 時,直接從上一次的插入位置尋找
        這是因為 source 資料已經排序好,已經確定下一個 node 插入的位置必定在上一個 node 後面
  5. 最後一個 thread 把完成的 linked list 再次開啟一個新的 task 再 push 入 task queue 之中

參考了原本的論文實作出類似的架構

  • 對於 source ,我們實作出 list_pop() 從 source 取出一個 node
  • 對於 destination ,實作出 list_insert()list_insert() 會使用 list_search() 協助找尋插入點

以上的操作比起論文還要來的單純,一加一減的 linked list 使得我們不用去考慮 marked 以及 unmarked 的問題

這是 concurrent merge 所需的參數,除了上述提到的 source 以及 dest ,還加入了 task_cnt 作為 counter,最後會遞減至 0 ,代表所有 thread 已經做完

這不代表一定要所有 thread 進入才開始,只要有一個 thread 進入就開始 merge 的動作,但一定會有task_cnt 初始值的 thread 進入,即使資料已經處理完也會先進入爾後離開

typedef struct { llist_t *source; llist_t *dest; int task_cnt; } merge_list_arg_t;

在每個 thread 排序完自身的 linked list 時會丟入 tmp_list ,或是因為 tmp_list 已有 list 存在開啟新的 task ,設定好相關的參數,決定應進入 thread 數量,開啟等量的 task ,並行 merge

void merge_thread_lists(void *data) { llist_t *_list = (llist_t *) data; if (_list->size < (uint32_t) data_count) { pthread_mutex_lock(&(data_context.mutex)); llist_t *_t = tmp_list; if (!_t) { tmp_list = _list; pthread_mutex_unlock(&(data_context.mutex)); } else { tmp_list = NULL; pthread_mutex_unlock(&(data_context.mutex)); merge_list_arg_t *arg = malloc(sizeof(merge_list_arg_t)); if (_t->size < _list->size) { arg->source = _t; arg->dest = _list; } else { arg->source = _list; arg->dest = _t; } arg->task_cnt = arg->source->size / local_size * 2; int task_cnt = arg->task_cnt; for (int i = 0; i < task_cnt; i++) tqueue_push(pool->queue, task_new(concurrent_merge, arg)); } } else { the_list = _list; tqueue_push(pool->queue, task_new(NULL, NULL)); } }

每個 thread 的工作就是不斷地
拿取 node >> 插入 node

當再也 pop 不出任何東西時,會去將 task_cnt 減 1 離開,當最後一個 thread 也準備離開時(line 13: if (old_cnt == 1)):

  • 更改 source 與 dest 的大小

    其實要 list_pop list_insert 就即時更新了 :sweat_smile:

  • 釋放 source 的 linked list
  • 將完成的 list 丟入task queue 繼續做後續處理
void concurrent_merge(void *data) { merge_list_arg_t *arg = (merge_list_arg_t *)data; llist_t *source = arg->source; llist_t *dest = arg->dest; node_t *cur = NULL, *node = NULL; int old_cnt = 0; cur = dest->head; while (1) { node = list_pop(source); if (!node) { old_cnt = atomic_fetch_sub(&(arg->task_cnt), 1); if (old_cnt == 1) { dest->size += source->size; source->size = 0; list_free_nodes(source); tqueue_push(pool->queue, task_new(merge_thread_lists, dest)); } break; } list_insert(dest, &cur, node); } }

list_poplist_insertlist_push

node_t *list_pop(llist_t *list) { node_t *head = list->head; node_t *tail = list->tail; node_t *node, *node_next; do { node = head->next; if (node == tail) return NULL; node_next = node->next; if (atomic_compare_exchange_weak(&(head->next), &node, node_next)) break; } while (1); return node; } node_t *list_search(node_t *cur, node_t *tail, val_t data, node_t** left_node) { node_t *right_node; node_t *t = cur; node_t *t_next = cur->next; do { *(left_node) = t; t = t_next; if (t == tail) break; t_next = t->next; } while (strcmp((char *)t->data, (char *)data) <= 0); right_node = t; return right_node; } void list_insert(llist_t *list, node_t **cur, node_t * node) { node_t *left_node, *right_node; do { right_node = list_search(*cur, list->tail, node->data, &left_node); node->next = right_node; if (atomic_compare_exchange_weak(&(left_node->next), &right_node, node)) { *cur = node; return; } else *cur = left_node; } while (1); }

修改資料結構

上面沒有提到的問題點是關於 linked list 的資料結構,因為論文的資料結構與原本的不一樣, 所以需要一些改變

原本 list 的資料結構,且初始化 head 指向 NULL

typedef struct {
    node_t *head;   /**< The head of the linked list */
    uint32_t size;  /**< The size of the linked list */
} llist_t;
llist_t *list_new()
{
    /* allocate list */
    llist_t *list = malloc(sizeof(llist_t));
    list->head = NULL;
    list->size = 0;
    return list;
}

但現在新增了 tailheadtail 會各指向一個不會使用到、配置空間的 node (這個 node 稱為 sentinel node) 來使演算法正確運作

typedef struct {
    node_t *head;   /**< The head of the linked list */
    node_t *tail;   /**< The tail of the linked list */
    uint32_t size;  /**< The size of the linked list */
} llist_t;
llist_t *list_new()
{
    /* allocate list */
    llist_t *list = malloc(sizeof(llist_t));
    list->tail = node_new((val_t)NULL, NULL);
    list->head = node_new((val_t)NULL, list->tail);
    list->size = 0;
    return list;
}

這樣的改動連帶牽扯了其他相關的 function 都要修正
包含 mergesort.c 中的 split_n_merge()sort_n_merge()
main.c 中的 build_list_from_file()cut_local_list()

有些 function 需要頻繁的使用 list_get 來取得最後一個 node ,也就是說整個 list 都遍歷了

結果這樣使得效率整個降低

1 個 thread,純修改資料結構的結果

#Total_tasks_consumed: 3
#Elapsed_time: 378.183 ms
#Throughput: 7 (per sec)
#Total_tasks_consumed: 3
#Elapsed_time: 475.714 ms
#Throughput: 6 (per sec)

光是如此已經在時間上拉出差距

重新調整資料結構

因為調整資料結構對於 merge sort 帶來的衝擊很大,最後是不改變原始的資料結構,只有當有需要的時候再改變

為此當要並行 merge 時,我們需要 sentinel nodes,我們在執行前加入了 sentinel head

void list_add_dummy_head(llist_t *list)
{
    node_t *new_head = node_new((val_t)NULL, list->head);
    list->head = new_head;
}

void list_remove_dummy_head(llist_t *list)
{
    node_t *new_head = list->head->next;
    free(list->head);
    list->head = new_head;
}

至於 sentinal tail 是不一定要的:在我們的程式碼中所有的 list->tail 都可以用 NULL 取代

merge_thread_list() 第 13、14 行針對 src 與 dest 做這樣的處理,再丟入 task 中執行 concurrent_merge()

void merge_thread_lists(void *data) { llist_t *_list = (llist_t *) data; if (_list->size < (uint32_t) data_count) { pthread_mutex_lock(&(data_context.mutex)); llist_t *_t = tmp_list; if (!_t) { tmp_list = _list; pthread_mutex_unlock(&(data_context.mutex)); } else { tmp_list = NULL; pthread_mutex_unlock(&(data_context.mutex)); list_add_dummy_head(_t); list_add_dummy_head(_list); merge_list_arg_t *arg = malloc(sizeof(merge_list_arg_t)); if (_t->size < _list->size) { arg->source = _t; arg->dest = _list; } else { arg->source = _list; arg->dest = _t; } arg->task_cnt = arg->source->size / local_size * 2; int task_cnt = arg->task_cnt; for (int i = 0; i < task_cnt; i++) tqueue_push(pool->queue, task_new(concurrent_merge, arg)); } } else { the_list = _list; tqueue_push(pool->queue, task_new(NULL, NULL)); } }

concurrent_merge() 的最後一個 thread 再把 dest 的 sentinel head 移除 (第 17 行)

void concurrent_merge(void *data) { merge_list_arg_t *arg = (merge_list_arg_t *)data; llist_t *source = arg->source; llist_t *dest = arg->dest; node_t *cur = NULL, *node = NULL; int old_cnt = 0; cur = dest->head; while (1) { node = list_pop(source); if (!node) { old_cnt = atomic_fetch_sub(&(arg->task_cnt), 1); if (old_cnt == 1) { dest->size += source->size; source->size = 0; list_free_nodes(source); list_remove_dummy_head(dest); tqueue_push(pool->queue, task_new(merge_thread_lists, dest)); } break; } list_insert(dest, &cur, node); } }

實驗結果

因為 lock-free 很容易爆炸,這邊以維持 mutex 做測試

時間確實有下降一點!不過當 thread 數目變多,時間會再往上增長,主要原因是因為

arg->task_cnt = arg->source->size / local_size * 2;

這是決定 thread 參與數量的方式, local_size 代表 thread 當初分割的local linked list 資料量 ,也就是使用與處理這些資料同等數量的 thread 做 merge ,但是每個 thread 的參與需要遍歷整個 linked list , thread 多於核心數等於過度浪費時間在 list_search() 上,反而降低效益

限制 merge 參與的 thread 數

最後將能夠參與的數量限制在核心數,這裡使用到了 get_nprocs()

執行環境在雙核且支援 hyper-threading 的 intel 處理器,get_nprocs() 回傳了 4

文中有特別指出這個 function 成本昂貴,因為要去開啟 /sys 底下的檔案解析,因此只使用一次存下來

int main()
{
    /* ... */
    num_of_procs = get_nprocs();
    /* ... */
}
arg->task_cnt = MIN(num_of_procs, arg->source->size / local_size * 2);

討論改進效果不顯著的原因

我們的目標是要改善程式碼的 scalability,但可以由上述的比較圖看出,程式在 thread 數為 4 的時候表現最佳,thread 數繼續變多的時候,無法更快甚至有變慢的情形。所以我們決定驗證這個現象的原因。
我們歸納出的原因有兩個,第一個是程式本身不夠好,concurrent merge 的機制不夠好,因為愈多個 thread 要一起 merge ,
每個 thread 都要拜訪 list 的所有 node ,或者是 thread pool 的機制也尚待改進;第二個是 thread 之間的 context switch 所花的時間,若 context switch 所花的時間遠小於我們程式碼所花的時間,就代表 scalability 還有增加的空間。

以下數據以 64 threads 的 case 來比較

  • 程式碼花的時間(平均): 436 ms
  • context switch 的次數
 Performance counter stats for './sort 64 test_data/input.txt':

             1,519      cs                                                          

       1.180769370 seconds time elapsed
./cpubench.sh
model name : Intel(R) Core(TM) i5-4200H CPU @ 2.80GHz
1 physical CPUs, 2 cores/CPU, 2 hardware threads/core = 4 hw threads total
-- No CPU affinity --
10000000 system calls in 454871760ns (45.5ns/syscall)
2000000 process context switches in 1997589690ns (998.8ns/ctxsw)
2000000  thread context switches in 2240424384ns (1120.2ns/ctxsw)
sched_setscheduler(): Operation not permitted
2000000  thread context switches in 61020406ns (30.5ns/ctxsw)
-- With CPU affinity --
10000000 system calls in 426586426ns (42.7ns/syscall)
2000000 process context switches in 1962945555ns (981.5ns/ctxsw)
2000000  thread context switches in 1661305925ns (830.7ns/ctxsw)
sched_setscheduler(): Operation not permitted
2000000  thread context switches in 271151922ns (135.6ns/ctxsw)
-- With CPU affinity to CPU 0 --
10000000 system calls in 424558129ns (42.5ns/syscall)
2000000 process context switches in 2258118701ns (1129.1ns/ctxsw)
2000000  thread context switches in 2034381771ns (1017.2ns/ctxsw)
sched_setscheduler(): Operation not permitted
2000000  thread context switches in 58380978ns (29.2ns/ctxsw)

436 ms >> 1519 * 830.7 ns ~ 1 ms

上課筆記

關於原版 context switch 不可行的原因

附上原版 github

static void* thread(void*ctx) { (void)ctx; for (int i = 0; i < iterations; i++) sched_yield(); return NULL; }

資料來源 sched_yield() , CFS

  • DESCRIPTION
sched_yield() causes the calling thread to relinquish the CPU.The thread is moved 
to the end of the queue for its static priority and a new thread gets to run.
  • RETURN VALUE
On success, sched_yield() returns 0.  On error, -1 is returned,and errno is 
set appropriately.
  • 本身電腦版本Linux version 4.4.0
tina@tina-X550VB:~$ cat /proc/version Linux version 4.4.0-78-generic (buildd@lgw01-11) (gcc version 5.4.0 20160609 (Ubuntu 5.4.0-6ubuntu1~16.04.4) ) #99-Ubuntu SMP Thu Apr 27 15:29:09 UTC 2017
  • CFS was merged into the 2.6.23
The Completely Fair Scheduler (CFS) is a process scheduler which was merged into the 2.6.23 (October 2007) release of the 
Linux kernel and is the default scheduler.
  • CFS 相較於 2.6 版本以前的 kernel

    • In contrast to the previous O(1) scheduler used in older Linux 2.6 kernels, the CFS scheduler implementation is not based on run queues. Instead, a red–black tree implements a "timeline" of future task execution.
    • CFS uses a concept called "sleeper fairness", which considers sleeping or waiting tasks equivalent to those on the runqueue.
  • 參考文件中提到

( another detail: due to nanosec accounting and timeline sorting,sched_yield() 
support is very simple under CFS, and in fact under CFS sched_yield() behaves 
much better than under any other scheduler i have tested so far.

jserv's lmbench-next

lmbench

一種 system benchmark

benchmp

重新建立連續記憶體

先從 sort_n_merge 著手

連續的記憶體結構可以使 cache-miss 有機會降低

在排序 local linked list 的過程中 ,嘗試在 merge 的時候先分配一塊準備要塞排序好的資料空間,當作是一個陣列的資料結構來 merge,但內部還是存在 linked list 的結構,所以 next 單純指向下一個陣列元素,方便在之後可以 concurrent merge 時有幫助

llist_t *sort_n_merge(llist_t *a, llist_t *b) { llist_t *_list = list_new(); _list->size += a->size + b->size; node_t* node_list = malloc(sizeof(node_t) * (a->size + b->size)); int idx = 0; while (a->size && b->size) { /* Choose the linked list whose data of first node is small. */ llist_t *small; int cmp = strcmp((char*)(a->head->data), (char*)(b->head->data)); small = (llist_t *)((intptr_t) a * (cmp <= 0) + (intptr_t) b * (cmp > 0)); /* Extract first node of selected list and put to the new list. */ node_list[idx].data = small->head->data; node_list[idx].next = node_list + idx + 1; idx++; small->head = small->head->next; --small->size; } /* Append the remaining nodes */ llist_t *remaining = (llist_t *) ((intptr_t) a * (a->size > 0) + (intptr_t) b * (b->size > 0)); while(remaining->head) { node_list[idx].data = remaining->head->data; remaining->head = remaining->head->next; node_list[idx].next = node_list + idx + 1; idx++; } node_list[idx - 1].next = NULL; free(a->head); free(b->head); free(a); free(b); _list->head = node_list; return _list; }

實驗結果

目前時間還沒有顯著差異

從連續的記憶體改善 concurrent_merge

先前 concurrent_merge 會使得每個 merge 的 thread 幾乎遍歷過整個 linked list 來找尋適合插入的點並插入 node

但是現在建立在連續記憶體的結構下,我們可以

  • 利用 binary search 找尋接近的點,下去 insert node
    • 而且每插入一個 node ,就縮小 binary search 的邊界 (因為插入點之前的 node 不會再由同個 thread 插入)
  • 分配好 source linked list 的切割 ,不用每個 thread 去爭奪

重新調整資料結構

待補

to be continued

參考資料

pthread_cond_signal
pthread_cond_wait