# 2021q1 Homework4 (quiz4) contributed by < `ambersun1234` > ## 環境 ```shell $ uname -a Linux station 4.19.128-microsoft-standard #1 SMP Tue Jun 23 12:58:10 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux $ gcc --version gcc (Ubuntu 9.3.0-17ubuntu1~20.04) 9.3.0 ``` ## 解釋程式運作原理 由於 [Bailey–Borwein–Plouffe formula](https://en.wikipedia.org/wiki/Bailey%E2%80%93Borwein%E2%80%93Plouffe_formula) 可以在不需要知道前 $n-1$ 位的資訊去計算第 $n$ 位的數值,所以可以採用並行的方式計算 $\pi$ ```cpp tpool_t pool = tpool_create(4); tpool_future_t futures[PRECISION + 1]; for (int i = 0; i <= PRECISION; i++) { bpp_args[i] = i; futures[i] = tpool_apply(pool, bpp, (void *) &bpp_args[i]); } ``` 以這個實作來說,相比頻繁的建立執行緒,我們採用 thread pool 的方式管理,意即 thread 的數量是固定的,可以讓完成計算的 thread 取得下一個工作,如此一來可以大幅度的減少 thread 的建立與刪除 ```cpp struct threadpool *tpool_create(size_t count) { jobqueue_t *jobqueue = jobqueue_create(); struct threadpool *pool = malloc(sizeof(struct threadpool)); if (!jobqueue || !pool) { if (jobqueue) jobqueue_destroy(jobqueue); free(pool); return NULL; } pool->count = count, pool->jobqueue = jobqueue; if ((pool->workers = malloc(count * sizeof(pthread_t)))) { for (int i = 0; i < count; i++) { if (pthread_create(&pool->workers[i], NULL, jobqueue_fetch, (void *) jobqueue)) { for (int j = 0; j < i; j++) pthread_cancel(pool->workers[j]); for (int j = 0; j < i; j++) pthread_join(pool->workers[j], NULL); free(pool->workers); jobqueue_destroy(jobqueue); free(pool); return NULL; } } return pool; } jobqueue_destroy(jobqueue); free(pool); return NULL; } ``` `tpool_create` 建立一個 thread pool 裡面包含了 4 的 thread,並且共享 jobqueue,當建立 thread 的過程中出錯,會立即中止所有任務,根據 [man pthread_create](https://man7.org/linux/man-pages/man3/pthread_create.3.html) > On success, pthread_create() returns 0; on error, it returns an error number, and the contents of *thread are undefined. 而 `tpool_apply` 將每個工作分配到 thread pool 中 ```cpp struct tpool_future *tpool_apply(struct threadpool *pool, void *(*func)(void *), void *arg) { jobqueue_t *jobqueue = pool->jobqueue; threadtask_t *new_head = malloc(sizeof(threadtask_t)); struct tpool_future *future = tpool_future_create(); if (new_head && future) { new_head->func = func, new_head->arg = arg, new_head->future = future; pthread_mutex_lock(&jobqueue->rwlock); if (jobqueue->head) { new_head->next = jobqueue->head; jobqueue->head = new_head; } else { jobqueue->head = jobqueue->tail = new_head; pthread_cond_broadcast(&jobqueue->cond_nonempty); } pthread_mutex_unlock(&jobqueue->rwlock); } else if (new_head) { ... } ``` 新增 `new_head` 以及 `future` 將其加入到 threadtask_t 的 singly linked list 中,並且多加了一些錯誤處理機制。需要注意到 `pthread_cond_broadcast`,根據 [pthread_cond_broadcast(3)](https://linux.die.net/man/3/pthread_cond_broadcast) > The pthread_cond_broadcast() function is used whenever the shared-variable state has been changed in a way that more than one thread can proceed with its task 因為在這個情況下,4 個 thread 共享 jobqueue,而因為 `jobqueue->head` 以及 `jobqueue->tail` 這兩個共享變數被變更,因此要通知所有 worker 執行運算的統一都是 bbp 這個函數,但要注意的是,實作中並不是直接呼叫 bbp,因為有 jobqueue 的存在,因此需要 `jobqueue_fetch` 的幫忙 這個實作會一直查詢 jobqueue 的 tail,將 tail 的工作取出並且執行,將其執行結果放置於相對應的 future 然後繼續等待直到 jobqueue 為空 ```cpp ... while (1) { pthread_mutex_lock(&jobqueue->rwlock); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state); pthread_testcancel(); while (!jobqueue->tail) pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state); if (jobqueue->head == jobqueue->tail) { task = jobqueue->tail; jobqueue->head = jobqueue->tail = NULL; } else { threadtask_t *tmp; for (tmp = jobqueue->head; tmp->next != jobqueue->tail; tmp = tmp->next) ; task = tmp->next; tmp->next = NULL; jobqueue->tail = tmp; } pthread_mutex_unlock(&jobqueue->rwlock); ... // 計算 PI 第 n 位數值 ... } ``` 運算完成的資料會儲存在各別的 `struct tpool_future` 裡面,並由 `tpool_future_get` 取得資料 ```cpp void *tpool_future_get(struct tpool_future *future, unsigned int seconds) { pthread_mutex_lock(&future->mutex); /* turn off the timeout bit set previously */ future->flag &= ~FUTURE_TIMEOUT; while ((future->flag & FUTURE_FINISHED) == 0) { if (seconds) { struct timespec expire_time; clock_gettime(CLOCK_MONOTONIC, &expire_time); expire_time.tv_sec += seconds; int status = pthread_cond_timedwait(&future->cond_finished, &future->mutex, &expire_time); if (status == ETIMEDOUT) { future->flag |= FUTURE_TIMEOUT; pthread_mutex_unlock(&future->mutex); return NULL; } } else pthread_cond_wait(&future->cond_finished, &future->mutex); } pthread_mutex_unlock(&future->mutex); return future->result; } ``` seconds 用於表示 timeout 的時間(其中 0 seconds 表 blocking wait),首先 while 迴圈檢查是否運算完畢,並且在內部分為 blocking wait 或者 non-blocking wait 根據 [pthread_cond_timedwait(3)](https://linux.die.net/man/3/pthread_cond_timedwait) > The pthread_cond_timedwait() function shall be equivalent to pthread_cond_wait(), except that an error is returned if the absolute time specified by abstime passes (that is, system time equals or exceeds abstime) before the condition cond is signaled or broadcasted, or if the absolute time specified by abstime has already been passed at the time of the call. 所以實作中採用 `當前時間+seconds` 作為參數傳遞至 pthread_cond_timedwait ## [atomic threadpool](https://github.com/Taymindis/atomic_threadpool/blob/master/at_thpool.c) 實作研讀 該實作十分的精簡,他可以運作在 windows、linux 以及 macOS 上面。首先映入眼簾的是 ```c= #define AT_THPOOL_INC(v) sync_fetch_and_add(v, 1) #define AT_THPOOL_DEC(v) sync_fetch_and_add(v, -1) #define AT_THPOOL_SHEDYIELD sched_yield ``` [\_\_sync_fetch_and_add](https://gcc.gnu.org/onlinedocs/gcc-4.1.1/gcc/Atomic-Builtins.html) 是 gcc 的 atomic extension 你可能會好奇,單純的 +1, -1 為何會需要 atomic operation,事實上,這些操作涉及到不單單只有一個指令,有可能會包含 + 讀資料 + 將其+1 + 寫入資料 如果說這些順序亂掉,資料也有很大的機率跟著亂掉,幸好 gcc 提供的這些 extension 保證不會將指令順序調換,參見以下節錄 > In most cases, these builtins are considered a full barrier. That is, no memory operand will be moved across the operation, either forward or backward. Further, instructions will be issued as necessary to prevent the processor from speculating loads across the operation and from queuing stores after the operation. 不過,僅僅擁有這些不足以讓整個 threadpool 成為 lock-free 考慮 thread 執行程式碼 ```cpp #if defined _WIN32 || defined _WIN64 unsigned __stdcall at_thpool_worker(void *_tp) { #else void* at_thpool_worker(void *_tp) { #endif at_thpool_t *tp = (at_thpool_t*)_tp; AT_THPOOL_INC(&tp->nrunning); at_thtask_t *task; void *_task; lfqueue_t *tq = &tp->taskqueue; TASK_PENDING: while (tp->is_running) { if ( (_task = lfqueue_deq(tq)) ) { goto HANDLE_TASK; } lfqueue_sleep(1); } AT_THPOOL_DEC(&tp->nrunning); #if defined _WIN32 || defined _WIN64 return 0; #else return NULL; #endif HANDLE_TASK: task = (at_thtask_t*) _task; task->do_work(task->args); AT_THPOOL_FREE(task); goto TASK_PENDING; } ``` 可以發現到基本上它是一個無窮迴圈,裡面持續向 job queue(lfqueue) 申請工作,當取得工作時,就會執行 <hr> 相比 [quiz4 的實作](https://github.com/ambersun1234/linux2021q1_quiz4/blob/master/bbp.c),`atomic threadpool` 並沒有使用 `mutex` 這種 [read-write lock](https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock),這是因為得益於 [lfqueue](https://github.com/Taymindis/lfqueue/tree/21497d65a579ff441be9530a6b8784ad6b940e2d) 是為 lock-free 實作,因為 `lfqueue_deq` 以及 `lfqueue_enq` 這些操作都是 atomic 的 這裡的 atomic 表示的是,在運行中,並不會有其他部分的系統存取,因此可以被視為 atomic 操作(詳細解釋可參考: [How does a function becomes atomic?](https://stackoverflow.com/a/20494774)),考慮 `lfqueue_enq` 實作 ```c= for (;;) { __LFQ_SYNC_MEMORY(); tail = lfqueue->tail; if (__LFQ_BOOL_COMPARE_AND_SWAP(&tail->next, NULL, node)) { // compulsory swap as tail->next is no NULL anymore, it has fenced on other thread __LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->tail, tail, node); __lfq_check_free(lfqueue); return 0; } } ``` [\_\_sync_synchronize(\_\_LFQ_SYNC_MEMORY)](https://gcc.gnu.org/onlinedocs/gcc-4.6.2/gcc/Atomic-Builtins.html) 他是一個 [memory barrier](https://zh.wikipedia.org/wiki/%E5%86%85%E5%AD%98%E5%B1%8F%E9%9A%9C),其保證嚴格的執行順序,且剩下的操作都是 atomic operation [\_\_sync_bool_compare_and_swap(\_\_LFQ_BOOL_COMPARE_AND_SWAP)](https://gcc.gnu.org/onlinedocs/gcc-4.1.1/gcc/Atomic-Builtins.html) > These builtins perform an atomic compare and swap. That is, if the current value of *ptr is oldval, then write newval into *ptr. The “bool” version returns true if the comparison is successful and newval was written. The “val” version returns the contents of *ptr before the operation. <hr> 回到正題,所以既然 `lfqueue_deq` 為 atomic operation,那這樣也就可以保證以下為 lock-free? ```c=13 TASK_PENDING: while (tp->is_running) { if ( (_task = lfqueue_deq(tq)) ) { goto HANDLE_TASK; } lfqueue_sleep(1); } ``` 因為 `lfqueue_deq` 操作為 atomic,亦即同一時間只會有一個 thread 嘗試存取 job queue 了對吧,如果有一個 thread 被卡在這個迴圈,那麼是否代表至少有一個 thread 成功取得 task :arrow_right: lock free!! > 可參考 [Lock-free-程式設計議題](https://hackmd.io/@sysprog/lock-free-prog#Lock-free-%E7%A8%8B%E5%BC%8F%E8%A8%AD%E8%A8%88%E8%AD%B0%E9%A1%8C) ## 以 [c11 atomics](https://en.cppreference.com/w/c/atomic) 改寫 quiz4 程式實作 綜合以上討論,我們知道,要達到 lock-free 需要以下 + enqueue, dequeue 需要是 atomics 所以首先將 `_Atomic` 引入 ```c= typedef struct __jobqueue { _Atomic threadtask_t *head, *tail; pthread_cond_t cond_nonempty; pthread_mutex_t rwlock; } jobqueue_t; ``` 將 `tpool_apply` 改寫成以下 ```c= ... if (new_head && future) { new_head->func = func; new_head->arg = arg; new_head->future = future; new_head->next = NULL; // pthread_mutex_lock(&jobqueue->rwlock); // if (jobqueue->head) { // new_head->next = jobqueue->head; // jobqueue->head = new_head; // } else { // jobqueue->head = jobqueue->tail = new_head; // pthread_cond_broadcast(&jobqueue->cond_nonempty); // } // pthread_mutex_unlock(&jobqueue->rwlock); _Atomic threadtask_t *tmp = malloc(sizeof(threadtask_t)); _Atomic threadtask_t *tmp2 = atomic_load(&tmp->next); atomic_store(&tmp2, NULL); // compare with NULL pointer, use tmp wrap it bool flag = false; for (;;) { _Atomic threadtask_t *tail = atomic_load(&jobqueue->tail); // no task in queue; if (atomic_compare_exchange_strong(&tail, &tmp2, new_head)) { atomic_store(&jobqueue->tail, new_head); atomic_store(&jobqueue->head, new_head); flag = true; } else { atomic_store(&tmp2, NULL); // have task in queue if (atomic_compare_exchange_strong(&tail->next, &tmp2, new_head)) { atomic_store(&jobqueue->tail, new_head); pthread_cond_broadcast(&jobqueue->cond_nonempty); flag = true; } } // printf("%p %p\n", atomic_load(&jobqueue->head), atomic_load(&jobqueue->tail)); if (flag) break; atomic_store(&lala, NULL); } ... ``` 除了以上實作,在實作中我也加入了避免 [ABA problem](https://en.wikipedia.org/wiki/ABA_problem) 的手段,解法通常為加入 `版本紀錄` 在 java 的實作當中,提供 [AtomicsStampedReference](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/atomic/AtomicStampedReference.html),也就是增加版本紀錄的欄位,同理可以應用到這上面 ```c= typedef struct __threadtask { void *(*func)(void *); void *arg; struct __tpool_future *future; struct __threadtask *next; _Atomic int version; } threadtask_t; ``` ```c= ... _Atomic threadtask_t *tmp = malloc(sizeof(threadtask_t)); _Atomic threadtask_t *tmp2 = atomic_load(&tmp->next); // compare with NULL pointer, use tmp wrap it int oldversion = atomic_load(&new_head->version); int newversion = oldversion + 1; for (;;) { __sync_synchronize(); atomic_store(&tmp2, NULL); _Atomic threadtask_t *tail = atomic_load(&jobqueue->tail); // no task in queue; if (atomic_compare_exchange_strong(&tail, &tmp2, new_head) && atomic_compare_exchange_strong(&new_head->version, &oldversion, newversion)) { atomic_store(&jobqueue->tail, new_head); atomic_store(&jobqueue->head, new_head); break; } else { atomic_store(&tmp2, NULL); // have task in queue if (atomic_compare_exchange_strong(&tail->next, &tmp2, new_head) && atomic_compare_exchange_strong(&new_head->version, &oldversion, newversion)) { atomic_store(&jobqueue->tail, new_head); break; } } } ... ``` ###### tags: `linux2021`