# 2021q1 Homework4 (quiz4) contributed by <`chiehen` > ###### tags: `linux2021` ## Check List - [ ] 解釋上述程式碼運作原理,包含 timeout 處理機制,指出改進空間並實作 - [ ] 研讀 atomic_threadpool,指出其 atomic 操作的運用方式,並說明該 lock-free 的手法 - [ ] 嘗試使用 C11 Atomics 改寫上述程式碼,使其有更好的 scalability ## 解釋程式碼 有四種結構 * threadtask_t: 執行的任務,以 linked list 的方式連結 * jobqueue: 待執行任務的貯列 * struct __tpool_future: 任務的執行結果 * struct __threadpool: threadpool ```c typedef struct __threadtask { void *(*func)(void *); // thread 要執行的函式 void *arg; // 上述函式的參數 struct __tpool_future *future; // 函式執行結果儲存的結構 struct __threadtask *next; // 下一個任務 } threadtask_t; typedef struct __jobqueue { threadtask_t *head, *tail; // 紀錄第一個和最後一個任務 pthread_cond_t cond_nonempty; // 當佇列有任務時通知 pthread_mutex_t rwlock; // 避免多個執行緒同時更改 jobqueue_t } jobqueue_t; struct __tpool_future { int flag; // 目前運算狀態 void *result; // 函式運算結果 pthread_mutex_t mutex; // 避免多個執行緒同時更改此結構 pthread_cond_t cond_finished; // 當運算結束時通知 }; struct __threadpool { size_t count; // 執行緒數量 pthread_t *workers; // 執行緒 array jobqueue_t *jobqueue; // 待執行任務的貯列 }; ``` jobqueue_t 和 threadtask_t ```graphviz digraph structs { node[shape=record] queue [label="<head>head|<tail>tail|<cond>cond_nonempty|<lock>rwlock"] node[shape=box]; task1 [label= "threadtask_t"]; task2 [label= "threadtask_t"]; task3 [label= "threadtask_t"]; task4 [label= "threadtask_t"]; queue:head->task1 queue:tail->task4 rankdir=LR; task1->task2; task2->task3; task3->task4; } ``` threadtask_t ```graphviz digraph structs { node[shape=record] task1 [label="<func>(*func)(void *)|<arg>arg|<future>future|<next>next"] task2 [label="<func>(*func)(void *)|<arg>arg|<future>future|<next>next"] future1 [label="<flag>flag|<result>result|<lock>mutex|<cond>cond_finished"] rankdir=LR; task1:next->task2 task1:future->future1 } ``` ### main 首先看 main function, 在第 5 行後, thread 開始執行,但此時 jobqueue 為空,直到第 10 行的迴圈 task 開始被放入。 接著在透過第 13 行的迴圈將各自的運算結果相加,第 20 行將 thread pool 摧毀。 其中用到了 5 個函式: - tpool_create: 建立 thread pool, thread 開始執行 - tpool_apply: 建立任務並放進 jobqueue,回傳該任務的 future object - tpool_future_get: 從 future object 取得運算結果 - tpool_future_destroy: 摧毀 future object - tpool_join: 摧毀 thread pool ```c= int main() { int bpp_args[PRECISION + 1]; double bpp_sum = 0; tpool_t pool = tpool_create(4); tpool_future_t futures[PRECISION + 1]; for (int i = 0; i <= PRECISION; i++) { bpp_args[i] = i; futures[i] = tpool_apply(pool, bpp, (void *) &bpp_args[i]); } for (int i = 0; i <= PRECISION; i++) { double *result = tpool_future_get(futures[i], 0 /* blocking wait */); bpp_sum += *result; tpool_future_destroy(futures[i]); free(result); } tpool_join(pool); printf("PI calculated with %d terms: %.15f\n", PRECISION + 1, bpp_sum); return 0; } ``` ### tpool_create 建立 thread pool, thread 開始執行: 在此函式中建立 jobqueue 及所須數量的 thread,其中 jobqueue_fetch 為執行緒要執行的函式,在下部份會解釋。 值得注意的是此過程若有一個階段 malloc 失敗,要釋放先前所要求的記憶體及中止執行緒,及摧毀 jobqueue ```c= struct __threadpool *tpool_create(size_t count) { jobqueue_t *jobqueue = jobqueue_create(); struct __threadpool *pool = malloc(sizeof(struct __threadpool)); if (!jobqueue || !pool) { if (jobqueue) jobqueue_destroy(jobqueue); free(pool); return NULL; } pool->count = count, pool->jobqueue = jobqueue; if ((pool->workers = malloc(count * sizeof(pthread_t)))) { for (int i = 0; i < count; i++) { if (pthread_create(&pool->workers[i], NULL, jobqueue_fetch, (void *) jobqueue)) { for (int j = 0; j < i; j++) pthread_cancel(pool->workers[j]); for (int j = 0; j < i; j++) pthread_join(pool->workers[j], NULL); free(pool->workers); jobqueue_destroy(jobqueue); free(pool); return NULL; } } return pool; } jobqueue_destroy(jobqueue); free(pool); return NULL; } ``` ### jobqueue_fetch 持續從 jobqueue 拿取任務並執行 - 在 14 行時確認 queue 是否為空,如果為空則使用 pthread_cond_wait 進行 sleep-wait, 直到有執行緒呼叫 broadcast,才會繼續執行拿出 queue 中的最後一個任務,在 14 行到 27 行因為對 jobqueue 進行操作因此使用 mutex lock 避免同時操作 - 在這個 critical section 中 對 jobqueue 做了遍歷 是相對大的 critical section ==應該進行改進== - 在 32 行檢查是否為被cancel 但沒有設定 cancel 的地方,==可改進== - 如果函式運算完成,將在 48 行的區域,將 flag 設為 finished, 並 broadcast finished, 讓在 tpool_future_get 中等待的 thread 繼續運算 - 如果 fucn 為 NULL,將進入第 55 行的區域並跳出迴圈,結束此函式 ```c= static void *jobqueue_fetch(void *queue) { jobqueue_t *jobqueue = (jobqueue_t *) queue; threadtask_t *task; int old_state; pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock); while (1) { pthread_mutex_lock(&jobqueue->rwlock); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state); pthread_testcancel(); while (!jobqueue->tail) pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock)// GGG; pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state); if (jobqueue->head == jobqueue->tail) { task = jobqueue->tail; jobqueue->head = jobqueue->tail = NULL; } else { threadtask_t *tmp; for (tmp = jobqueue->head; tmp->next != jobqueue->tail; tmp = tmp->next) ; task = tmp->next; tmp->next = NULL; jobqueue->tail = tmp; } pthread_mutex_unlock(&jobqueue->rwlock); if (task->func) { pthread_mutex_lock(&task->future->mutex); if (task->future->flag & __FUTURE_CANCELLED) { pthread_mutex_unlock(&task->future->mutex); free(task); continue; } else { task->future->flag |= __FUTURE_RUNNING; pthread_mutex_unlock(&task->future->mutex); } void *ret_value = task->func(task->arg); pthread_mutex_lock(&task->future->mutex); if (task->future->flag & __FUTURE_DESTROYED) { pthread_mutex_unlock(&task->future->mutex); pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); } else { task->future->flag |= __FUTURE_FINISHE; // KKK task->future->result = ret_value; pthread_cond_broadcast(&task->future->cond_finished); // LLL pthread_mutex_unlock(&task->future->mutex); } free(task); } else { pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); free(task); break; } } pthread_cleanup_pop(0); pthread_exit(NULL); } ``` ### tpool_apply: 建立任務並放進 jobqueue,回傳該任務的 future object: 建立一個任務和一個 future object, 並將任務放入 jobqueue,在對 jobqueue 進行變動時要使用 mutex lock 以避免有其他執行緒同時操作 jobqueue。 當 jobqueue 中原本是空的時,使用pthread_cond_broadcast,通知正在等待的 thread 現在有可執行的任務。 ```c= struct __tpool_future *tpool_apply(struct __threadpool *pool, void *(*func)(void *), void *arg) { jobqueue_t *jobqueue = pool->jobqueue; threadtask_t *new_head = malloc(sizeof(threadtask_t)); struct __tpool_future *future = tpool_future_create(); if (new_head && future) { new_head->func = func, new_head->arg = arg, new_head->future = future; // 將 threadtask 對應到一個 future pthread_mutex_lock(&jobqueue->rwlock); if (jobqueue->head) { new_head->next = jobqueue->head; jobqueue->head = new_head; } else { // 原本 jobqueue 是空的 jobqueue->head = jobqueue->tail = new_head; pthread_cond_broadcast(&jobqueue->cond_nonempty); // HHH } pthread_mutex_unlock(&jobqueue->rwlock); } else if (new_head) { free(new_head); return NULL; } else if (future) { tpool_future_destroy(future); return NULL; } return future; // 回傳 future } ``` ### tpool_future_get 從 future object 取得運算結果 - 如未設定 timeout(second = 0), 則等待至被通知 future object 完成 - 如設定 timeout,則使用`pthread_cond_timedwait` ,此函式在超過指定時間時將回傳 error,而此時將取得 lock -clock selection? ```c= void *tpool_future_get(struct __tpool_future *future, unsigned int seconds) { pthread_mutex_lock(&future->mutex); /* turn off the timeout bit set previously */ future->flag &= ~__FUTURE_TIMEOUT; while ((future->flag & __FUTURE_FINISHED) == 0) { if (seconds) { struct timespec expire_time; clock_gettime(CLOCK_MONOTONIC, &expire_time); expire_time.tv_sec += seconds; int status = pthread_cond_timedwait(&future->cond_finished, &future->mutex, &expire_time); if (status == ETIMEDOUT) { future->flag |= __FUTURE_TIMEOUT; pthread_mutex_unlock(&future->mutex); return NULL; } } else pthread_cond_wait(&future->cond_finished, &future->mutex);// FFF } ``` ### tpool_future_destroy 摧毀 future object ```c= int tpool_future_destroy(struct __tpool_future *future) { if (future) { pthread_mutex_lock(&future->mutex); if (future->flag & __FUTURE_FINISHED || future->flag & __FUTURE_CANCELLED) { pthread_mutex_unlock(&future->mutex); pthread_mutex_destroy(&future->mutex); pthread_cond_destroy(&future->cond_finished); free(future); } else { future->flag |= __FUTURE_DESTROYED; pthread_mutex_unlock(&future->mutex); } } return 0; } ``` ### tpool_join 摧毀 thread pool - 將 NULL 作為參數傳進 tpool_apply, 將在 job_fetch中止 thread - 呼叫 pthread_join 將等待 thread 中止並返回 - 釋放記憶體 ```c= int tpool_join(struct __threadpool *pool) { size_t num_threads = pool->count; for (int i = 0; i < num_threads; i++) tpool_apply(pool, NULL, NULL); for (int i = 0; i < num_threads; i++) pthread_join(pool->workers[i], NULL); free(pool->workers); jobqueue_destroy(pool->jobqueue); free(pool); return 0; } ``` ### 改進 #### timeout 在 second 不為 0 時,執行程式後會得到 segmentation fault ```c=34 double *result = tpool_future_get(futures[i], 5 /* 將時間設為 5 秒 */); ``` 使用 gdb 發現問題出在 ```c=35 bpp_sum += *result; ``` 如果 timeout 發生則 result 會為 null 因此將程式碼改為先確認回傳值 ```c=35 if(result) bpp_sum += *result; ``` 解決 segmentation fault,但再次執行會發現仍會超時造成運算結果錯誤 ``` jane@jane-NB:~/linux2021/linux2021-quiz4$ ./pi # 錯誤 PI calculated with 101 terms: 0.008259320256460 # 正確 PI calculated with 101 terms: 3.141592653589793 ``` 但理論上不應超時,因為總執行時間並不到 1 秒 ``` jane@jane-NB:~/linux2021/linux2021-quiz4$ time ./pi PI calculated with 101 terms: 0.000000196022358 real 0m0.005s user 0m0.000s sys 0m0.008s ``` 查看 manual 中的 pthread_cond_timedwait 有一段: > If the Clock Selection option is supported, the condition variable shall have a clock attribute which specifies the clock that shall be used to measure the time specified by the abstime argument. 發現 condition variable 可以設置 Clock 的類型 透過 gdb 查看, 發現 condition variable 的 clock ID 為 CLOCK_REALTIME 與 tpool_future_get 使用的 CLOCK_MONOTONIC 不同 ``` (gdb) p futures[0]->cond_finished $2 = pthread_cond_t = {Threads known to still execute a wait function = 0, Clock ID = CLOCK_REALTIME, Shared = No} ``` 修改 tpool_future_create ```c=49 pthread_condattr_t attr; pthread_condattr_init(&attr); + pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); pthread_cond_init(&future->cond_finished, &attr); ``` 使用 pi_timeout.c 測試 timeout 並開啟 Adress Sanitizer 執行 ```c static void *bpp(void *arg) { + sleep(5); int k = *(int *) arg; ... } ``` 發現 memory leak: ``` $ gcc pi_timeout.c tpool.c -o pi -g -lm -lpthread -fsanitize=address $ ./pi PI calculated with 11 terms: 0.000000000000000 ================================================================= ==12278==ERROR: LeakSanitizer: detected memory leaks Direct leak of 88 byte(s) in 11 object(s) allocated from: #0 0x7f1808543bc8 in malloc (/usr/lib/x86_64-linux-gnu/libasan.so.5+0x10dbc8) #1 0x557789ac4701 in bpp /home/jane/linux2021/linux2021-quiz4/pi_timeout.c:17 #2 0x557789ac5b4d in jobqueue_fetch /home/jane/linux2021/linux2021-quiz4/tpool.c:181 #3 0x7f18082cd608 in start_thread /build/glibc-eX1tMB/glibc-2.31/nptl/pthread_create.c:477 SUMMARY: AddressSanitizer: 88 byte(s) leaked in 11 allocation(s). ``` 問題出在如果發生 timeout, 程式將進入 if 的區塊,jobqueue_fetch 中的 ret_value 則不會有機會被釋放 ```c void *ret_value = task->func(task->arg); pthread_mutex_lock(&task->future->mutex); if (task->future->flag & __FUTURE_DESTROYED) { pthread_mutex_unlock(&task->future->mutex); pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); free(ret_value); } else { task->future->flag |= __FUTURE_FINISHED; task->future->result = ret_value; pthread_cond_broadcast(&task->future->cond_finished); pthread_mutex_unlock(&task->future->mutex); } ``` 因此在增加記憶體釋放的程式碼 ```c= @@ -185,6 +185,7 @@ static void *jobqueue_fetch(void *queue) pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); + free(ret_value); } else { task->future->flag |= __FUTURE_FINISHED; task->future->result = ret_value; ``` :::info Todo: 修改 blocking wait, cancel, 大的 critical section ::: ### 計算 pi 的方程式 在 quiz4 中提到兩種計算 pi 的方程式: [Leibniz’s Formula for Pi](https://proofwiki.org/wiki/Leibniz%27s_Formula_for_Pi) 和 [ Bailey–Borwein–Plouffe formula](https://en.wikipedia.org/wiki/Bailey%E2%80%93Borwein%E2%80%93Plouffe_formula) 原本疑惑為什麼兩者都可以平行化運算,為什麼不使用 Leibniz’s Formula,後來發現這個方法有收斂很慢的問題。 改寫 quiz4 的 compute_pi_leibniz: ```c /* Use Gregory-Leibniz series to approximate PI */ static void *gls(void *arg) { int k = *(int *) arg; double *product = malloc(sizeof(double)); if (product) { double temp = (k & 1) ? (-1) : 1; *product = temp / (2 * k + 1) * 4; } return product; } ``` 發現要達到小數點後第五位的精準度, N 大約為 150000 ``` PI calculated with 150001 terms: 3.141599320211967 real 0m0.299s user 0m0.355s sys 0m0.531s ``` 而使用 Bailey–Borwein–Plouffe formula 時, N 設為 100 時, 精準度能達到小數後 15 位 ``` PI calculated with 101 terms: 3.141592653589793 real 0m0.003s user 0m0.003s sys 0m0.001s ``` 在運算時間上也有很大的差距 ## Atomic_threadpool 為了便於理解,以下程式僅留下適用於 GNU C 的部份 在 at_thpool.h 中 ``` c typedef struct at_thpool_s at_thpool_t; at_thpool_t *at_thpool_create(size_t nthreads); int at_thpool_newtask(at_thpool_t *pool, void (*task_pt)(void *),void *arg); void at_thpool_gracefully_shutdown(at_thpool_t *tp); void at_thpool_immediate_shutdown(at_thpool_t *tp); ``` ### at_thpool_newtask 呼叫 lfqueue_enq 將 task 放入 taskqueue equeue 的作法是透過迴圈不斷: 1. 使用 __sync_bool_compare_and_swap 確認 tail 是不是真的tail (tail 的 next 為 NULL), 如果是的話則將 tail->next 設為新的 node 此操作可以保證只有一個執行緒能成功,因為其他執行緒將看到 tail->next 不為 NULL 3. 使用 __sync_bool_compare_and_swap 確認 queue 的 tail 為先前看到的 tail, 如果是則將 tail 更新為新的 node 此方式能確保一次只有一個 thread 修改 tail * 不論是在 dequeue 或 init 皆保證 tail 不為 NULL , 因此可直接比對 tail->next ``` c #define __LFQ_BOOL_COMPARE_AND_SWAP __sync_bool_compare_and_swap static int _enqueue(lfqueue_t *lfqueue, void* value) { lfqueue_cas_node_t *tail, *node; node = (lfqueue_cas_node_t*) malloc(sizeof(lfqueue_cas_node_t)); if (node == NULL) { perror("malloc"); return errno; } node->value = value; node->next = NULL; node->nextfree = NULL; for (;;) { __LFQ_SYNC_MEMORY(); // memory barrier tail = lfqueue->tail; if (__LFQ_BOOL_COMPARE_AND_SWAP(&tail->next, NULL, node)) { // compulsory swap as tail->next is no NULL anymore, it has fenced on other thread __LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->tail, tail, node); __lfq_check_free(lfqueue); return 0; } } /*It never be here*/ return -1; } int lfqueue_enq(lfqueue_t *lfqueue, void *value) { if (_enqueue(lfqueue, value)) { return -1; } __LFQ_ADD_AND_FETCH(&lfqueue->size, 1); return 0; } ``` ```c int at_thpool_newtask(at_thpool_t *tp, void (*task_pt)(void *), void *arg) { ... task->do_work = task_pt; task->args = arg; if (lfqueue_enq(&tp->taskqueue, task) == -1) { fprintf(stderr, "Task unable to assigned to pool, it might be full\n"); AT_THPOOL_FREE(task); return -1; } return 0; } ``` ### at_thpool_worker Thread pool 中所執行的程式 _dequeue(): 保證 thread-safe: 在迴圈內不斷確認 1. 此時的 head 是正確的 2. queue 中有兩個以上元素 3. 將 assign 回傳值後, 利用 __sync_bool_compare_and_swap 確認此時的 head 是正確的 (代表 next 及 next->value 也是正確的), 則更新 head, 不然再次回到步驟一 dequeue 方法: 當 head 與 tail 不相同(queue 中有兩個以上元素), 回傳第二個元素的 value, 並將 head 更新, 舊的 head push 進 free queue ```c static void * _dequeue(lfqueue_t *lfqueue) { lfqueue_cas_node_t *head, *next; void *val; for (;;) { head = lfqueue->head; if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->head, head, head)) { next = head->next; if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->tail, head, head)) { if (next == NULL) { val = NULL; goto _done; } } else { if (next) { val = next->value; if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->head, head, next)) { break; } } else { val = NULL; goto _done; } } } } __lfq_recycle_free(lfqueue, head); _done: // __asm volatile("" ::: "memory"); __LFQ_SYNC_MEMORY(); __lfq_check_free(lfqueue); return val; } ``` ```c void* at_thpool_worker(void *_tp) { at_thpool_t *tp = (at_thpool_t*)_tp; AT_THPOOL_INC(&tp->nrunning); at_thtask_t *task; void *_task; lfqueue_t *tq = &tp->taskqueue; TASK_PENDING: while (tp->is_running) { if ( (_task = lfqueue_deq(tq)) ) { goto HANDLE_TASK; } lfqueue_sleep(1); } AT_THPOOL_DEC(&tp->nrunning); return NULL; HANDLE_TASK: task = (at_thtask_t*) _task; task->do_work(task->args); AT_THPOOL_FREE(task); goto TASK_PENDING; } ``` ## 使用 Atomics 改寫程式碼