# 2021q1 Homework4 (quiz4) contributed by < `hankluo6` > > [第 4 週測驗題](https://hackmd.io/@sysprog/linux2021-quiz4) ## 測驗 `1` > 參考解答 :::spoiler #### `FFF = ?` - [x] `(a) pthread_cond_wait(&future->cond_finished, &future->mutex)` #### `GGG = ?` - [x] `(c) while (!jobqueue->tail) pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock)` #### `HHH = ?` - [x] `(b) pthread_cond_broadcast(&jobqueue->cond_nonempty)` #### `KKK = ?` - [x] `(b) __FUTURE_FINISHED` #### `LLL = ?` - [x] `(a) pthread_cond_broadcast(&task->future->cond_finished)` ::: ### 運作原理 #### POSIX Threads * ```c int pthread_create(pthread_t *restrict thread, const pthread_attr_t *restrict attr, void *(*start_routine)(void *), void *restrict arg); ``` 建立新的執行緒,運行 `start_routine()`,並將 `arg` 傳入給 `start_routine()` 中。 * ```c noreturn void pthread_exit(void *retval); ``` 結束正在執行的執行緒,並將要迴傳給 process 的值放置在 `retval`。 * ```c int pthread_join(pthread_t thread, void **retval); ``` 等在 `thread` 的終止,並將 `retval` 設置為呼叫 `pthread_exit` 時給予的 `retval`。 * ```c int pthread_cancel(pthread_t thread); int pthread_setcancelstate(int state, int *oldstate); int pthread_setcanceltype(int type, int *oldtype); ``` 送出 cancellation 訊號給 `thread` 執行緒,而 `thread` 會根據 `state` 以及 `type` 來決定如何回應。透過 `pthread_setcancelstate` 以及 `pthread_setcanceltype` 設置 `state` 與 `type`: * `state` * `PTHREAD_CANCEL_ENABLE` 預設行為,表示此執行緒為可被取消的 (cancelable)。 * `PTHREAD_CANCEL_DISABLE` 表示此執行緒無法被取消,當呼叫 `pthread_cancel` 時,process 會被 block 住,直到此執行緒自行中止或改變狀態。 * `type` * `PTHREAD_CANCEL_DEFERRED` 預測行為,當接收到 cancellation 訊號時,執行緒並不會馬上中止,而是會等待執行緒執行到 [`cancellation point`](https://man7.org/linux/man-pages/man7/pthreads.7.html) (可見下方 cancellation point 欄位) 時才會停止。 * `PTHREAD_CANCEL_ASYNCHRONOUS` 當收到 cancellation 訊號時會立刻中止。 * ```c void pthread_cleanup_push(void (*routine)(void *), void *arg); void pthread_cleanup_pop(int execute); ``` 設定 function handler,在執行緒結束或某些情況下 (`pthread_cancel` or `pthread_exit` or `pthread_cleanup_pop(no zeror)`),使用 `arg` 參數呼叫 `routine` function。 * ```c int pthread_mutex_init(pthread_mutex_t *restrict mutex; int pthread_mutex_destroy(pthread_mutex_t *mutex); int pthread_mutex_lock(pthread_mutex_t *mutex); int pthread_mutex_unlock(pthread_mutex_t *mutex); ``` `pthread_mutex_init` 以及 `pthread_mutex_destroy` 用來初始化與刪除互斥鎖 (mutex),`pthread_mutex_lock` 以及 `pthread_mutex_unlock` 分別對 mutex 上鎖與解鎖。 * ```c int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr); int pthread_cond_destroy(pthread_cond_t *cond); int pthread_condattr_init(pthread_condattr_t *attr); int pthread_condattr_destroy(pthread_condattr_t *attr); ``` `pthread_cond_init` 以及 `pthread_cond_destroy` 用來初始化與刪除條件變數 (condition variable),而 `pthread_condattr_init` 與 `pthread_condattr_destroy` 則用來設置 condition variable 的屬性 `attr`。 * ```c int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime); int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex); int pthread_cond_broadcast(pthread_cond_t *cond); ``` `pthread_cond_timedwait` 與 `pthread_cond_wait` 將互斥鎖釋放,並阻塞住該執行緒,等待其它 process 送出信號觸發此 condition variable,在收到信號時返回並重新獲得互斥鎖。其中前者 `timedwait` 中的參數 `abstime` 表示可接受阻塞的最長時間。 `pthread_cond_broadcast` 則送出信號喚醒所有持有條件變數 `cond` 的執行緒。 :::info * [restrict](https://en.wikipedia.org/wiki/Restrict) 為 C99 關鍵字用來幫助編譯器最佳化,表示此 pointer 不會有 alias,只會有此變數可以操控。 * [noreturn](https://en.cppreference.com/w/c/language/_Noreturn) 為 C11 增加的關鍵字,註明此函數不會回傳給 caller,而是透過 `exit()` 等方式跳出,如有回傳值則為 undefined behavior。 ::: #### 程式流程 透過 `tpool_create` 建立 thread pool,並將每個執行緒放入 `__threadpool *pool` 中。而 `jobqueue_t *jobqueue` 將每個待執行的 task,以 linked list 的方式連接,當有任務要執行時,從 `jobqueue` 中 pop 一個任務出來運行,每個任務在程式內是以 `threadtask_t` 結構封裝。 從 `tpool_create` 建立出來的執行緒,會運行 `jobqueue_fetch` 函式,在 `jobqueue` 為空時,每個 thread 會在 `pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock);` 內被 block 住,等待有新的 task 加入。透過 `tpool_apply` 將新的 task 加入到 `jobqueue` 中,如果此時 `jobqueue` 為空,則透過 `pthread_cond_broadcast(&jobqueue->cond_nonempty);` 送出信號給 `cond_nonempty` 條件變數,此時 `jobqueue_fetch` 內的其中一個執行緒中的 `pthread_cond_wait` 將會通過,並在下方取得 `jobqueue` 的 `tail` 任務。 執行完給定的 function 後,會將相關結果存放在 `struct __tpool_future` 中,`__tpool_future` 用來當作使用者可見的執行緒,透過 `tpool_future_get` 取得 thread 執行完的結果。如果 `tpool_future_get` 時任務尚未完成,將會被條件變數 `future->cond_finished` 給 block 住,等待任務執行完畢。 ### timeout 機制與改進 現有程式使用 `pthread_cond_timedwait` 處理 thread 執行過久的問題,並將 `future->flag` 設置為 `__FUTURE_TIMEOUT` 表示該任務超時,但在 `tpool_join` 沒有相對應的處理。當有執行緒無法完成時,將會產生問題,如以下程式碼: ```c static void *loop(void *arg) { while (1) ; } int main() { tpool_future_t future = tpool_apply(pool, loop, (void *)NULL); void *result = tpool_future_get(future, 1 /* cannot set to zero */); tpool_future_destroy(future); free(result); tpool_join(pool); } ``` 所以我們需要有方法在無法退出 function 時正確離開 thread。故我在 `jobqueue_fetch` 內添加以下程式: ```diff static void *jobqueue_fetch(void *queue) { ... + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state); + pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &old_state); void *ret_value = task->func(task->arg); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state); ... } ``` 將 thread 的 cancel type 改成 asynchronous,此時便不需要經過 cancellation point 也能直接退出執行緒。 在移除 `future` 時也要正確處理 timeout 時的情形: ```diff int tpool_future_destroy(struct __tpool_future *future) { if (future) { pthread_mutex_lock(&future->mutex); if (future->flag & __FUTURE_FINISHED || - future->flag & __FUTURE_CANCELLED) { + future->flag & __FUTURE_CANCELLED || + future->flag & __FUTURE_TIMEOUT) { pthread_mutex_unlock(&future->mutex); pthread_mutex_destroy(&future->mutex); pthread_cond_destroy(&future->cond_finished); free(future); } ... } return 0; } ``` 最後在 join 的時候,將所有未完成的 task 所在的執行緒皆終止: ```diff int tpool_join(struct __threadpool *pool) { size_t num_threads = pool->count; for (int i = 0; i < num_threads; i++) tpool_apply(pool, NULL, NULL); + for (int i = 0; i < num_threads; i++) + pthread_cancel(pool->workers[i]); for (int i = 0; i < num_threads; i++) pthread_join(pool->workers[i], NULL); free(pool->workers); jobqueue_destroy(pool->jobqueue); free(pool); return 0; } ``` :::danger 我們在執行過程中不能知道哪些執行緒會無法退出,所以當有執行緒進入無窮迴圈時,該執行緒會一直被佔用,無法被重新利用,直到執行 `tpool_join` 後才能正確被釋放。 ::: ### [`atomic_threadpool`](https://github.com/Taymindis/atomic_threadpool) `atomic_threadppol` 使用 lock-free queue 來取代此程式內的 job queue。thread pool 的實現方式與此程式一樣,透過 `at_thpool_create` 建立足夠的 thread 使用,每個 thread 內部執行 `at_thpool_worker` 並等待 queue 中有任務產生 (相當於 `jobqueue_fetch`),利用 `at_thpool_newtask` 建立任務 (相當於 `tpool_apply`) 並使用 `lfqueue_enq` 將任務放入 queue 當中。`at_thpool_worker` 利用 `lfqueue_deq` 將待處理任務讀出,並執行對應的函式。處理完任務的節點 (`lfqueue_cas_node_t`) 會透過 `__lfq_recycle_free` 以及 `__lfq_check_free` 釋放其空間。唯一不同的是此實作的 thread pool 無法回傳值給使用者。 只要是跟 queue 有關的操作,都必須為 thread safe,防止多個執行緒同時對 queue 操作而造成錯誤,在 `atomic_threadpool` 中使用 gcc 的 [builtin atomic functions](https://gcc.gnu.org/onlinedocs/gcc/_005f_005fatomic-Builtins.html#g_t_005f_005fatomic-Builtins) 實現: * 在 enqueue 的時候,有兩個操作需要保護 * 增加 queue 的 `size` 時,使用 `__sync_add_and_fetch()` 執行加法,便能保證每次更新的值都保持為最新狀態。 * 更新 queue 的 `tail` 時,使用 `__sync_bool_compare_and_swap` 讓 `tail->next` 對於同一個 `tail` 只更新一次,而 `lfqueue->tail` 則保持在最新值。 * 在 dequeue 的時候,有兩個操作需要保護 * 更改 queue 的 `head` 時,使用 `__sync_bool_compare_and_swap` 讓 `lfqueue->head` 保持在最新值。 * 當 queue 只有一個元素時,dequeue 也要更新 `lfqueue->tail`,一樣使用 `__sync_bool_compare_and_swap` 實現。 * 釋放空間時,也需要使用 `__LFQ_BOOL_COMPARE_AND_SWAP` 設置要移除的節點,才不會重複釋放同個記憶體,並使用 `__LFQ_BOOL_COMPARE_AND_SWAP` 指示現在是否為 `in_free_mode`,防止多個執行緒同時進行移除。 --- TODO * ABA problem * [Lock-free 程式設計議題](https://hackmd.io/@sysprog/lock-free-prog) * [c11-queues](https://github.com/stv0g/c11-queues) * [lfqueue](https://github.com/darkautism/lfqueue) * [Hazard Pointers: Safe Memory Reclamation for Lock-Free Objects](http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.395.378&rep=rep1&type=pdf) * [An Optimistic Approach to Lock-Free FIFO Queues](http://people.csail.mit.edu/shanir/publications/FIFO_Queues.pdf) ###### tags: `linux2021`