contributed by <zmke
、petermouse
、 tina0405
、heathcliffYang
>
在原始的版本中,會將輸入的 list 分割給每個 thread 去做 merge ,而最後 thread 之間 list 的 merge 則是一個不能並行執行的部分,無法隨著 thread 數增加而提高執行效率,是增加 scalability 會遇到的瓶頸。
這讓我們想到一個在計組教過的 Amdahl's law
不能並行的部分則讓核心數增加帶來的加速效果存在極限。
所以我們把其中一個目標放在 thread 之間 list 的 merge 也要讓它可以並行處理,實作方法在下面的部分會詳細介紹。
另外,在與新夥伴溝通時,發現他們參考 Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms 嘗試過 lock-free threadpool。
task_t *tqueue_pop(tqueue_t * const the_queue)
{
task_t *head = NULL, *tail = NULL, *next = NULL, *ret = NULL;
while(1) {
head = the_queue->head;
tail = the_queue->tail;
next = head->next;
if(head == the_queue->head) {
if(head == tail) {
if(next == NULL) {
return NULL;
}
atomic_compare_exchange_weak(&(the_queue->tail), &tail, next);
} else {
ret = next;
if(atomic_compare_exchange_weak(&(the_queue->head), &head, next)) {
break;
}
}
}
}
atomic_fetch_sub(&(the_queue->size), 1);
atomic_fetch_add(&(the_queue->num_of_consumed), 1);
return ret;
}
int tqueue_push(tqueue_t * const the_queue, task_t *task)
{
task_t *tail = NULL, *tail_next = NULL;
task->next = NULL;
while(1) {
tail = the_queue->tail;
tail_next = tail->next;
if(tail == the_queue->tail) {
if(tail_next == NULL) {
if(atomic_compare_exchange_weak(&(tail->next), &tail_next, task)) {
if(atomic_compare_exchange_weak(&(the_queue->tail), &tail, task)) {
}
break;
}
} else {
if(atomic_compare_exchange_weak(&(the_queue->tail), &tail, tail_next)) {
}
}
}
}
atomic_fetch_add(&(the_queue->size), 1);
return 0;
}
structure pointer_t { ptr: pointer to node_t, count: unsigned integer }
以下 2 、 3 版本都有包含 lock-free threadpool,且都是以 ./sort 16 test_data/input.txt
進行測驗。
Original
原版在最後 thread 之間的 list 做 merge 時,因為工作不能多執行緒一起執行,所以其他 thread 會持續要求工作直到結束。
Condition variable
這個版本是 lock-free thread pool 與 condition variable 一起的實作,而 condition variable 則會在 monitor 的部分做介紹。
因為 condition variable 的關係,當 queue 裡面沒有工作時,空閒的 thread 不會佔用到 CPU 的時間,tqueue_pop
的次數有減少,但花費時間卻增加了。
此外,lock-free thread pool 會導致在 thread 多時,跑不完。
Condition variable + merge concurrent
這個版本是 lock-free thread pool 與 condition variable ,再加上 thread 之間的 list 做 merge 的工作是 concurrent。
tqueue_pop
的次數減少一半,後期 merge 的工作已可以允許多個 thread 一起執行;其中工作可以容納的 thread 數有限制: arg->task_cnt = arg->source->size / local_size * 2
,這部分需要再思考如何訂定最有效益的辦法;這是初版的 concurrent merge ,改變了所有 list 的資料結構,同時也可以看到 list_search 與 list_insert 有相當的成本,也是一個可以考慮改進的方向。
這張圖忘記改 label 了
讓 thread 可以保持 mutual exclution 並等待直到特定的 condition 成立
monitor 包含 mutex 和 conditional variable
pthread_cond_wait(cond, mutex)
pthread_cond_signal(cond)
pthread_cond_broadcast(cond)
當 thread 被喚醒之後,應該要重新評估是否該繼續執行或是再等待一次,所以 condition wait 常被實作在 while loop 內,來確保執行的安全性
原本的程式在 task queue 沒東西的時候,也會一直嘗試 tqueue_pop() ,造成 CPU time 的浪費,尤其是在 thread 數量多的時候影響會更大
我們想要利用 condition variable 讓空閒的 thread 在 task queue 裡面沒有東西的時候 sleep ,等到有 task push 到 task queue 裡面之後,再喚醒他們起來要工作,希望減少 pop 空的 task queue 造成的浪費
一開始就拿 mutex ,之後檢查 task queue 內是否是空的,如果是空的就等待並釋放 mutex ,被喚醒之後會拿到 mutex 但是 pool->queue->size 可能已經被改變,所以要再檢查一次,如果沒問題就釋放 mutex ,否則就是再等待一次
static void *task_run(void *data __attribute__ ((__unused__)))
{
while (1) {
/*Get the mutex lock*/
pthread_mutex_lock(&(pool->queue->mutex));
/*Check wether the number of task in the thread queue is zero*/
/*If the task queue is empty, block the thread and release the mutex lock*/
while(pool->queue->size == 0)
pthread_cond_wait(&(pool->queue->cond), &(pool->queue->mutex));
/*When the conditional variable is signaled, wake up the thread and get the mutex lock*/
/*Release the mutex lock*/
pthread_mutex_unlock(&(pool->queue->mutex));
task_t *_task = tqueue_pop(pool->queue);
if (_task) {
if (!_task->func) {
tqueue_push(pool->queue, task_new(NULL, NULL));
free(_task);
break;
} else {
_task->func(_task->arg);
free(_task);
}
}
}
pthread_exit(NULL);
}
在 push 的最後加上 pthread_cond_signal() ,藉此通知被 block 的 thread 已經有工作可以拿
int tqueue_push(tqueue_t * const the_queue, task_t *task)
{
task_t *tail = NULL, *tail_next = NULL;
task->next = NULL;
while(1) {
tail = the_queue->tail;
tail_next = tail->next;
if(tail == the_queue->tail) {
if(tail_next == NULL) {
if(atomic_compare_exchange_weak(&(tail->next), &tail_next, task)) {
if(atomic_compare_exchange_weak(&(the_queue->tail), &tail, task)) {
}
break;
}
} else {
if(atomic_compare_exchange_weak(&(the_queue->tail), &tail, tail_next)) {
}
}
}
}
atomic_fetch_add(&(the_queue->size), 1);
/*Signal the condtional variable*/
pthread_cond_signal(&(the_queue->cond));
return 0;
}
Lock-free linkedlist implementation of Harris' algorithm
原本的數字串如下:
如果想要 insert Node_20 , 我們會這樣做:
如果想要 delete Node_10 , 我們會這樣做:
但如果現在 insert Node_20 和 delete Node_10 同時發生,就有可能出現以下情況:
但我們不樂見,因為我們希望他會走訪 Node_20
解決方法:
步驟1:左邊的圖是先將刪除點 mark 起來
步驟2:右邊的圖是真的刪掉 node_10
logically deleted : 完成步驟1
phisically deleted : 完成步驟2
好處:
一開始本來對,如果要序列: 1,4 中間同時插入 2 跟 3 的話可能會發生,兩個 2 跟 3 同時指到4,此時序列就不是我們要的 1,2,3,4 了 ,但以下程式碼用 CAS 插入做了檢查的動作:
public boolean List::insert (KeyType key) {
//先將 key 建立節點叫 new_node
Node *new_node = new Node(key);
//建立左右節點
Node *right_node, *left_node;
do {
right_node = search (key, &left_node);
//右節點不是尾巴且和我們要插入的值一樣就不插入
//這段我前一篇在改時有拿掉
//因為他讓序列 1,2,2,3 變成 1,2,3
if ((right_node != tail) && (right_node.key == key)) /*T1*/
return false;
//右節點是我們即將插入節點的下一個
new_node.next = right_node;
//如果你的左邊節點的下一個是你的右邊節點,這樣就可以插入
//確保中途沒有其他人改過這段資料
if (CAS (&(left_node.next), right_node, new_node)) /*C2*/
return true;
} while (true); /*B3*/
}
public boolean List::find (KeyType search_key) {
Node *right_node, *left_node;
//把要找的 search_key 放置
right_node = search (search_key, &left_node);
//如果一樣,而且不是 tail 才回傳 true
if ((right_node == tail) || (right_node.key != search_key))
return false;
else
return true;
}
/* 1: Find left_node and right_node */
do {
if (!is_marked_reference(t_next)) {
(*left_node) = t;
left_node_next = t_next;
}
t = get_unmarked_reference(t_next);
if (t == tail) break;
t_next = t.next;
//
} while (is_marked_reference(t_next) || (t.key<search_key)); /*B1*/
right_node = t;
/* 2: Check nodes are adjacent */
if (left_node_next == right_node)
//如果你右邊的 node 的下一個還有做記號,就要再 search 一次
if ((right_node != tail) && is_marked_reference(right_node.next))
goto search_again; /*G1*/
//沒有就可以回傳值了!
else
return right_node; /*R1*/
/* 3: Remove one or more marked nodes */
if (CAS (&(left_node.next), left_node_next, right_node)) /*C1*/
if ((right_node != tail) && is_marked_reference(right_node.next))
goto search_again; /*G2*/
else
return right_node; /*R2*/
} while (true); /*B2*/
}
public boolean List::delete (KeyType search_key) {
Node *right_node, *right_node_next, *left_node;
do {
right_node = search (search_key, &left_node);
//找不到回傳false
if ((right_node == tail) || (right_node.key!= search_key)) /*T1*/
return false;
right_node_next = right_node.next;
//如果未被標記的話,會用 CAS 的標記值蓋過去 , break 出去
if (!is_marked_reference(right_node_next))
if (CAS (&(right_node.next), /*C3*/
right_node_next, get_marked_reference (right_node_next)))
break;
} while (true); /*B4*/
//如果左邊的下一個是右節點的話,表示沒人更動過資料
//可以進行刪除
if (!CAS (&(left_node.next), right_node, right_node_next)) /*C4*/
right_node = search (right_node.key, &left_node);
return true;
}
證明的想法:
We will take a fairly direct approach to outlining the linearizability of the operations by identifying particular instants during their execution at which the complete operation appears to occur atomically
search_key <= right_node.key
left_node.key < search.key
/*程式碼再上面*/
/*R1*/ 如果確認完左右還是相鄰
/*R2*/ 原本的左右不相鄰就用 CAS 的 compare_and_swap 值蓋掉
對於右節點的標記狀態,通過前3個回傳條件都是true後,觀察兩個返回路徑都要確認右結點未被標記,而右結點從未變成 unmarked ,我們推斷起初右節點沒有標記。
CAS 指令 C1 成功後,通過從 list 中 unlinking 標記的節點
這裡是在說明 op(i,m) 是和操作次數 (m) , processor 數 (i) 有關,也就是,在i個 processor 下,執行m次的表現
而 d(i,m) 則是執行時 search 在後置條件被滿足時的 final real-time
我們一直希望利用多執行緒下去做的排序,如果 thread 愈多理論上是愈快
如果能成功算得 d(i,m),就表示我們右節點沒有被 mark 且包含 search_key
如果利用 Find(k) 去搜尋,發現在 search_key 右半部的nodes 的 key 是嚴格遞減,左半部的nodes 的 key 是嚴格遞增,可證其正確性。
merge sort 在最後會將兩個已經排序好的資料重新 merge ,合併成一筆排序好的資料,但是如果兩筆資料量很大時,只有一個 thread 去處理會相當得慢。
因此我們想要將這部份以多個 thread 下去 lock-free 的方式處理,在看了論文之後,我們的想法大概是這樣:
head
) 尋找,每個 thread 會紀錄上一次的插入位置,等取得下一個 node 時,直接從上一次的插入位置尋找參考了原本的論文實作出類似的架構
list_pop()
從 source 取出一個 nodelist_insert()
, list_insert()
會使用 list_search()
協助找尋插入點以上的操作比起論文還要來的單純,一加一減的 linked list 使得我們不用去考慮 marked 以及 unmarked 的問題
這是 concurrent merge 所需的參數,除了上述提到的 source
以及 dest
,還加入了 task_cnt
作為 counter,最後會遞減至 0 ,代表所有 thread 已經做完
這不代表一定要所有 thread 進入才開始,只要有一個 thread 進入就開始 merge 的動作,但一定會有task_cnt
初始值的 thread 進入,即使資料已經處理完也會先進入爾後離開
typedef struct {
llist_t *source;
llist_t *dest;
int task_cnt;
} merge_list_arg_t;
在每個 thread 排序完自身的 linked list 時會丟入 tmp_list
,或是因為 tmp_list
已有 list 存在開啟新的 task ,設定好相關的參數,決定應進入 thread 數量,開啟等量的 task ,並行 merge
void merge_thread_lists(void *data)
{
llist_t *_list = (llist_t *) data;
if (_list->size < (uint32_t) data_count) {
pthread_mutex_lock(&(data_context.mutex));
llist_t *_t = tmp_list;
if (!_t) {
tmp_list = _list;
pthread_mutex_unlock(&(data_context.mutex));
} else {
tmp_list = NULL;
pthread_mutex_unlock(&(data_context.mutex));
merge_list_arg_t *arg = malloc(sizeof(merge_list_arg_t));
if (_t->size < _list->size) {
arg->source = _t;
arg->dest = _list;
} else {
arg->source = _list;
arg->dest = _t;
}
arg->task_cnt = arg->source->size / local_size * 2;
int task_cnt = arg->task_cnt;
for (int i = 0; i < task_cnt; i++)
tqueue_push(pool->queue, task_new(concurrent_merge, arg));
}
} else {
the_list = _list;
tqueue_push(pool->queue, task_new(NULL, NULL));
}
}
每個 thread 的工作就是不斷地
拿取 node >> 插入 node
當再也 pop 不出任何東西時,會去將 task_cnt
減 1 離開,當最後一個 thread 也準備離開時(line 13: if (old_cnt == 1)
):
其實要
list_pop
list_insert
就即時更新了 …
void concurrent_merge(void *data)
{
merge_list_arg_t *arg = (merge_list_arg_t *)data;
llist_t *source = arg->source;
llist_t *dest = arg->dest;
node_t *cur = NULL, *node = NULL;
int old_cnt = 0;
cur = dest->head;
while (1) {
node = list_pop(source);
if (!node) {
old_cnt = atomic_fetch_sub(&(arg->task_cnt), 1);
if (old_cnt == 1) {
dest->size += source->size;
source->size = 0;
list_free_nodes(source);
tqueue_push(pool->queue, task_new(merge_thread_lists, dest));
}
break;
}
list_insert(dest, &cur, node);
}
}
list_pop
、 list_insert
、 list_push
node_t *list_pop(llist_t *list)
{
node_t *head = list->head;
node_t *tail = list->tail;
node_t *node, *node_next;
do {
node = head->next;
if (node == tail)
return NULL;
node_next = node->next;
if (atomic_compare_exchange_weak(&(head->next), &node, node_next))
break;
} while (1);
return node;
}
node_t *list_search(node_t *cur, node_t *tail, val_t data, node_t** left_node)
{
node_t *right_node;
node_t *t = cur;
node_t *t_next = cur->next;
do {
*(left_node) = t;
t = t_next;
if (t == tail)
break;
t_next = t->next;
} while (strcmp((char *)t->data, (char *)data) <= 0);
right_node = t;
return right_node;
}
void list_insert(llist_t *list, node_t **cur, node_t * node)
{
node_t *left_node, *right_node;
do {
right_node = list_search(*cur, list->tail, node->data, &left_node);
node->next = right_node;
if (atomic_compare_exchange_weak(&(left_node->next), &right_node, node)) {
*cur = node;
return;
} else
*cur = left_node;
} while (1);
}
上面沒有提到的問題點是關於 linked list 的資料結構,因為論文的資料結構與原本的不一樣, 所以需要一些改變
原本 list 的資料結構,且初始化 head 指向 NULL
typedef struct {
node_t *head; /**< The head of the linked list */
uint32_t size; /**< The size of the linked list */
} llist_t;
llist_t *list_new()
{
/* allocate list */
llist_t *list = malloc(sizeof(llist_t));
list->head = NULL;
list->size = 0;
return list;
}
但現在新增了 tail
且 head
與 tail
會各指向一個不會使用到、配置空間的 node (這個 node 稱為 sentinel node) 來使演算法正確運作
typedef struct {
node_t *head; /**< The head of the linked list */
node_t *tail; /**< The tail of the linked list */
uint32_t size; /**< The size of the linked list */
} llist_t;
llist_t *list_new()
{
/* allocate list */
llist_t *list = malloc(sizeof(llist_t));
list->tail = node_new((val_t)NULL, NULL);
list->head = node_new((val_t)NULL, list->tail);
list->size = 0;
return list;
}
這樣的改動連帶牽扯了其他相關的 function 都要修正
包含 mergesort.c
中的 split_n_merge()
與 sort_n_merge()
main.c
中的 build_list_from_file()
與 cut_local_list()
有些 function 需要頻繁的使用 list_get
來取得最後一個 node ,也就是說整個 list 都遍歷了
…結果這樣使得效率整個降低
1 個 thread,純修改資料結構的結果
#Total_tasks_consumed: 3
#Elapsed_time: 378.183 ms
#Throughput: 7 (per sec)
#Total_tasks_consumed: 3
#Elapsed_time: 475.714 ms
#Throughput: 6 (per sec)
光是如此已經在時間上拉出差距
因為調整資料結構對於 merge sort 帶來的衝擊很大,最後是不改變原始的資料結構,只有當有需要的時候再改變
為此當要並行 merge 時,我們需要 sentinel nodes,我們在執行前加入了 sentinel head
void list_add_dummy_head(llist_t *list)
{
node_t *new_head = node_new((val_t)NULL, list->head);
list->head = new_head;
}
void list_remove_dummy_head(llist_t *list)
{
node_t *new_head = list->head->next;
free(list->head);
list->head = new_head;
}
至於 sentinal tail 是不一定要的:在我們的程式碼中所有的 list->tail
都可以用 NULL
取代
merge_thread_list()
第 13、14 行針對 src 與 dest 做這樣的處理,再丟入 task 中執行 concurrent_merge()
void merge_thread_lists(void *data)
{
llist_t *_list = (llist_t *) data;
if (_list->size < (uint32_t) data_count) {
pthread_mutex_lock(&(data_context.mutex));
llist_t *_t = tmp_list;
if (!_t) {
tmp_list = _list;
pthread_mutex_unlock(&(data_context.mutex));
} else {
tmp_list = NULL;
pthread_mutex_unlock(&(data_context.mutex));
list_add_dummy_head(_t);
list_add_dummy_head(_list);
merge_list_arg_t *arg = malloc(sizeof(merge_list_arg_t));
if (_t->size < _list->size) {
arg->source = _t;
arg->dest = _list;
} else {
arg->source = _list;
arg->dest = _t;
}
arg->task_cnt = arg->source->size / local_size * 2;
int task_cnt = arg->task_cnt;
for (int i = 0; i < task_cnt; i++)
tqueue_push(pool->queue, task_new(concurrent_merge, arg));
}
} else {
the_list = _list;
tqueue_push(pool->queue, task_new(NULL, NULL));
}
}
concurrent_merge()
的最後一個 thread 再把 dest 的 sentinel head 移除 (第 17 行)
void concurrent_merge(void *data)
{
merge_list_arg_t *arg = (merge_list_arg_t *)data;
llist_t *source = arg->source;
llist_t *dest = arg->dest;
node_t *cur = NULL, *node = NULL;
int old_cnt = 0;
cur = dest->head;
while (1) {
node = list_pop(source);
if (!node) {
old_cnt = atomic_fetch_sub(&(arg->task_cnt), 1);
if (old_cnt == 1) {
dest->size += source->size;
source->size = 0;
list_free_nodes(source);
list_remove_dummy_head(dest);
tqueue_push(pool->queue, task_new(merge_thread_lists, dest));
}
break;
}
list_insert(dest, &cur, node);
}
}
因為 lock-free 很容易爆炸,這邊以維持 mutex 做測試
時間確實有下降一點!不過當 thread 數目變多,時間會再往上增長,主要原因是因為
arg->task_cnt = arg->source->size / local_size * 2;
這是決定 thread 參與數量的方式, local_size
代表 thread 當初分割的local linked list 資料量 ,也就是使用與處理這些資料同等數量的 thread 做 merge ,但是每個 thread 的參與需要遍歷整個 linked list , thread 多於核心數等於過度浪費時間在 list_search()
上,反而降低效益
最後將能夠參與的數量限制在核心數,這裡使用到了 get_nprocs()
執行環境在雙核且支援 hyper-threading 的 intel 處理器,get_nprocs()
回傳了 4
文中有特別指出這個 function 成本昂貴,因為要去開啟 /sys 底下的檔案解析,因此只使用一次存下來
int main()
{
/* ... */
num_of_procs = get_nprocs();
/* ... */
}
arg->task_cnt = MIN(num_of_procs, arg->source->size / local_size * 2);
我們的目標是要改善程式碼的 scalability,但可以由上述的比較圖看出,程式在 thread 數為 4 的時候表現最佳,thread 數繼續變多的時候,無法更快甚至有變慢的情形。所以我們決定驗證這個現象的原因。
我們歸納出的原因有兩個,第一個是程式本身不夠好,concurrent merge 的機制不夠好,因為愈多個 thread 要一起 merge ,
每個 thread 都要拜訪 list 的所有 node ,或者是 thread pool 的機制也尚待改進;第二個是 thread 之間的 context switch 所花的時間,若 context switch 所花的時間遠小於我們程式碼所花的時間,就代表 scalability 還有增加的空間。
以下數據以 64 threads 的 case 來比較
Performance counter stats for './sort 64 test_data/input.txt':
1,519 cs
1.180769370 seconds time elapsed
./cpubench.sh
model name : Intel(R) Core(TM) i5-4200H CPU @ 2.80GHz
1 physical CPUs, 2 cores/CPU, 2 hardware threads/core = 4 hw threads total
-- No CPU affinity --
10000000 system calls in 454871760ns (45.5ns/syscall)
2000000 process context switches in 1997589690ns (998.8ns/ctxsw)
2000000 thread context switches in 2240424384ns (1120.2ns/ctxsw)
sched_setscheduler(): Operation not permitted
2000000 thread context switches in 61020406ns (30.5ns/ctxsw)
-- With CPU affinity --
10000000 system calls in 426586426ns (42.7ns/syscall)
2000000 process context switches in 1962945555ns (981.5ns/ctxsw)
2000000 thread context switches in 1661305925ns (830.7ns/ctxsw)
sched_setscheduler(): Operation not permitted
2000000 thread context switches in 271151922ns (135.6ns/ctxsw)
-- With CPU affinity to CPU 0 --
10000000 system calls in 424558129ns (42.5ns/syscall)
2000000 process context switches in 2258118701ns (1129.1ns/ctxsw)
2000000 thread context switches in 2034381771ns (1017.2ns/ctxsw)
sched_setscheduler(): Operation not permitted
2000000 thread context switches in 58380978ns (29.2ns/ctxsw)
436 ms >> 1519 * 830.7 ns ~ 1 ms
老師提到可以在 merge 的過程中重新建立記憶體連續的資料 (array)
context switch 時間 : https://github.com/jserv/lmbench-next
Quantifying The Cost of Context Switch∗
附上原版 github
static void* thread(void*ctx) {
(void)ctx;
for (int i = 0; i < iterations; i++)
sched_yield();
return NULL;
}
sched_yield() causes the calling thread to relinquish the CPU.The thread is moved
to the end of the queue for its static priority and a new thread gets to run.
On success, sched_yield() returns 0. On error, -1 is returned,and errno is
set appropriately.
tina@tina-X550VB:~$ cat /proc/version
Linux version 4.4.0-78-generic (buildd@lgw01-11) (gcc version 5.4.0 20160609 (Ubuntu 5.4.0-6ubuntu1~16.04.4) ) #99-Ubuntu SMP Thu Apr 27 15:29:09 UTC 2017
The Completely Fair Scheduler (CFS) is a process scheduler which was merged into the 2.6.23 (October 2007) release of the
Linux kernel and is the default scheduler.
CFS 相較於 2.6 版本以前的 kernel
參考文件中提到
( another detail: due to nanosec accounting and timeline sorting,sched_yield()
support is very simple under CFS, and in fact under CFS sched_yield() behaves
much better than under any other scheduler i have tested so far.
一種 system benchmark
sort_n_merge
著手連續的記憶體結構可以使 cache-miss 有機會降低
在排序 local linked list 的過程中 ,嘗試在 merge 的時候先分配一塊準備要塞排序好的資料空間,當作是一個陣列的資料結構來 merge,但內部還是存在 linked list 的結構,所以 next 單純指向下一個陣列元素,方便在之後可以 concurrent merge 時有幫助
llist_t *sort_n_merge(llist_t *a, llist_t *b)
{
llist_t *_list = list_new();
_list->size += a->size + b->size;
node_t* node_list = malloc(sizeof(node_t) * (a->size + b->size));
int idx = 0;
while (a->size && b->size) {
/* Choose the linked list whose data of first node is small. */
llist_t *small;
int cmp = strcmp((char*)(a->head->data), (char*)(b->head->data));
small = (llist_t *)((intptr_t) a * (cmp <= 0) + (intptr_t) b * (cmp > 0));
/* Extract first node of selected list and put to the new list. */
node_list[idx].data = small->head->data;
node_list[idx].next = node_list + idx + 1;
idx++;
small->head = small->head->next;
--small->size;
}
/* Append the remaining nodes */
llist_t *remaining = (llist_t *) ((intptr_t) a * (a->size > 0) +
(intptr_t) b * (b->size > 0));
while(remaining->head) {
node_list[idx].data = remaining->head->data;
remaining->head = remaining->head->next;
node_list[idx].next = node_list + idx + 1;
idx++;
}
node_list[idx - 1].next = NULL;
free(a->head);
free(b->head);
free(a);
free(b);
_list->head = node_list;
return _list;
}
實驗結果
目前時間還沒有顯著差異
concurrent_merge
先前 concurrent_merge
會使得每個 merge 的 thread 幾乎遍歷過整個 linked list 來找尋適合插入的點並插入 node
但是現在建立在連續記憶體的結構下,我們可以
待補
to be continued…