# 2021q4 Homework4 (quiz4) contributed by < `Nahemah1022` > ###### tags: `linux2021` ## POSIX Threads Multithread 程式的執行流程如下,由 Master thread 將 tasks 分至各個 thread,分頭執行結束後 exit,再由 Master 將所有 thread join 回來 ![](https://i.imgur.com/Xb8S4Fa.png =20%x) => ![](https://i.imgur.com/5SMjQec.png =30%x) => ![](https://i.imgur.com/nG82dzw.png =30%x) ### pthread 基本使用 - `int pthread_create(pthread_t *restrict thread, const pthread_attr_t *restrict attr, void *(*start_routine)(void *), void *restrict arg);`: - 建立並開啟 thread,由 `arg` 傳遞參數 - 若回傳值非零代表 thread 建立失敗 - `noreturn void pthread_exit(void *retval);`: - 結束目前的 thread,並由 `retval` 回傳值 - `int pthread_join(pthread_t thread, void **retval);`: - 等待 `thread` 執行結束,並從 `retval` 接收回傳值 ### mutex lock lock 某 mutex 的機至,用來確保不同 threads 之間的 mutual exclusion :::info mutex 是 mutual exclusion 的縮寫 ::: - ```c int pthread_mutex_init(pthread_mutex_t *restrict mutex; int pthread_mutex_destroy(pthread_mutex_t *mutex); ``` - 初始化與移除 `mutex` - ```c int pthread_mutex_lock(pthread_mutex_t *mutex); int pthread_mutex_unlock(pthread_mutex_t *mutex); ``` - lock 與 unlock `mutex` ### condition variable 讓 thread 在等待某件事情發生時能夠進到 waiting queue 等待,不要持續佔用 CPU 資源 - ```c int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr); int pthread_cond_destroy(pthread_cond_t *cond); ``` - 用來初始化與刪除 condition variable - ```c int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime); int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex); ``` - 將傳入的 `mutex` unlock,並將此 thread 放入 waiting queue 中等待 condition 發生 - 一旦發生便會重新將 `mutex` lock 後繼續向下執行 - ```c int pthread_cond_signal(pthread_cond_t *cond); int pthread_cond_broadcast(pthread_cond_t *cond); ``` - 喚醒一個(signal)或多個(broadcast)正在等待此 condition variable 發生的 thread ### pthread 終止機制 當我們發現某個 thread 執行異常時,會希望能夠從 main thread 中直接將其終止,但我們無法確定該 thread 目前的執行進度,若終止時該 thread 尚未將他拿到的 mutex unlock 即會造成 deadlock pthread cancelation 讓我們在 thread 開始前就設定好**若此 thread 被終止時,需要先完成哪些事情才能結束**,下方逐一解釋相關函式的用途: :::info - clean-up handler stack 是一個由 function pointer 組成的 stack,在 thread 被 cancel 、或呼叫 `pthread_exit` 時會被依序執行 - 若 thread 自行正常 return 時則不會執行 clean-up handler ::: - `pthread_cleanup_push(void (*routine)(void *), void *arg)`: - 在 clean-up routine stack 中新增一個 routine - routine 被執行時會帶入參數 `arg` - `pthread_cleanup_pop(int execute)`: - 移除位於 clean-up routine stack 最上方的 routine - 若參數 `execute` 非零,則執行現在 pop 的 routine - `pthread_setcancelstate(int state, int *oldstate)`: - 設定此 thread 的 cancelability(是否能夠被取消) - state 為 `PTHREAD_CANCEL_ENABLE` 表示能夠被取消 - state 為 `PTHREAD_CANCEL_DISABLE` 表示不能被取消 - 若不能被取消的 thread 被要求取消時,thread 會被 block - `pthread_setcanceltype(int state, int *oldstate)`: - 設定此 thread 在被取消時的行為 - type 為 `PTHREAD_CANCEL_DEFERRED` 時,若 thread 接收到 cancel 時並不會馬上終止,會等到下一個 cancelation point 發生才會終止 - type 為 `PTHREAD_CANCEL_ASYNCHRONOUS` 時,接收到 cancel 後會立馬終止 ## 程式解析 ### 程式目的 透過平行運算 [Bailey–Borwein–Plouffe formula](https://en.wikipedia.org/wiki/Bailey%E2%80%93Borwein%E2%80%93Plouffe_formula) 中的多項,來加快算出圓周率 ### Thread pool 使用流程 1. 程式需要從 `tpool_create` 開始建立 thread pool,需傳入欲建立的 worker 數量 - 此時會初始化 thread pool 中的 `jobqueue` - 為 thread pool 中的每一個 worker 建立其對應的 pthread,每一個 worker 皆執行 `jobqueue_fetch` 任務 2. 透過 `tpool_apply` 建立一個新的 `threadtask_t` 結構,以 linked list 的形式推入 jobqueue 中等待被執行 - 若在 task 被推入前 jobqueue 為空,則會 broadcast condition variable `cond_nonempty`,使 `jobqueue_fetch` 開始運作 - 一旦有新的 task 被推入 jobqueue 中,閒置中的 worker 便會從 `jobqueue_fetch` 取得並執行該 task - 執行完畢後,結果會被存放在 task 對應的 `tpool_future` 結構中 3. 用 `tpool_future_get` 來取得 thread 執行的結果 - 其中有用 `pthread_cond_wait` 來確保取得時 thread 必定已執行完畢 - 在 task 完全執行完畢後才會 `pthread_cond_signal` 送出執行完畢的訊號 4. 最後用 `tpool_join` 將所有 pthread worker join 回 master thread - 其中會順便將 jobqueue 與 workers destroy ### Thread pool 各部分實作細節 #### `tpool_create(size_t count)` ```c= /** * Create a thread pool containing specified number of threads. * If successful, the thread pool is returned. Otherwise, it * returns NULL. */ struct __threadpool *tpool_create(size_t count) { jobqueue_t *jobqueue = jobqueue_create(); struct __threadpool *pool = malloc(sizeof(struct __threadpool)); if (!jobqueue || !pool) { if (jobqueue) jobqueue_destroy(jobqueue); free(pool); return NULL; } pool->count = count, pool->jobqueue = jobqueue; if ((pool->workers = malloc(count * sizeof(pthread_t)))) { for (int i = 0; i < count; i++) { if (pthread_create(&pool->workers[i], NULL, jobqueue_fetch, (void *) jobqueue)) { // if failing to create pthread, cancel and clean-up for (int j = 0; j < i; j++) pthread_cancel(pool->workers[j]); for (int j = 0; j < i; j++) pthread_join(pool->workers[j], NULL); free(pool->workers); jobqueue_destroy(jobqueue); free(pool); return NULL; } } return pool; } jobqueue_destroy(jobqueue); free(pool); return NULL; } ``` - 取得整個 thread pool 所需的記憶體空間後將,並將 jobqueue create 出來後 assign 給 thread pool - 取得容納 `count` 個 `pthread_t` 的記憶體空間給每一個 workers 後,將 pthread create 出來,指定 thread 要執行的 function 為 `jobqueue_fetch()` - 若有任一 pthread create 失敗,則將所有 worker thread 取消並 return `NULL` #### `tpool_apply(tpool_t pool, void *(*func)(void *), void *arg)` ```c= /** * Schedules the specific function to be executed. * If successful, a future object representing the execution of * the task is returned. Otherwise, it returns NULL. */ struct __tpool_future *tpool_apply(struct __threadpool *pool, void *(*func)(void *), void *arg) { jobqueue_t *jobqueue = pool->jobqueue; threadtask_t *new_head = malloc(sizeof(threadtask_t)); struct __tpool_future *future = tpool_future_create(); if (new_head && future) { new_head->func = func, new_head->arg = arg, new_head->future = future; pthread_mutex_lock(&jobqueue->rwlock); if (jobqueue->head) { new_head->next = jobqueue->head; jobqueue->head = new_head; } else { jobqueue->head = jobqueue->tail = new_head; pthread_cond_broadcast(&jobqueue->cond_nonempty); } pthread_mutex_unlock(&jobqueue->rwlock); } else if (new_head) { free(new_head); return NULL; } else if (future) { tpool_future_destroy(future); return NULL; } return future; } ``` - 新增一個 `threadtask_t` 變數的空間,push 至 thread pool 的 `jobqueue` 中等待被 fetch - 新增一個 `__tpool_future` 的空間,用來存放此 threadtask 的執行結果 - 若 `jobqueue` 原本空,則使用 `pthread_cond_broadcast(&jobqueue->cond_nonempty)` 喚醒所有正在等待 fetch job 的 worker #### `tpool_join(tpool_t pool)` ```c= /** * Wait for all pending tasks to complete before destroying the thread pool. */ int tpool_join(struct __threadpool *pool) { size_t num_threads = pool->count; for (int i = 0; i < num_threads; i++) tpool_apply(pool, NULL, NULL); for (int i = 0; i < num_threads; i++) pthread_join(pool->workers[i], NULL); free(pool->workers); jobqueue_destroy(pool->jobqueue); free(pool); return 0; } ``` - apply `count` 個 `NULL` function 到 thread pool 中,確保每一個 worker 有執行到 threadtask 後停留在執行完畢的狀態 - 將所有 worker join 回到 master thread - 將 jobqueue 以及 thread pool 本身使用的記憶體空間釋放 #### `tpool_future_get(tpool_future_t future, unsigned int seconds)` ```c= /** * Return the result when it becomes available. * If @seconds is non-zero and the result does not arrive within specified time, * NULL is returned. Each tpool_future_get() resets the timeout status on * @future. */ void *tpool_future_get(struct __tpool_future *future, unsigned int seconds) { pthread_mutex_lock(&future->mutex); /* turn off the timeout bit set previously */ future->flag &= ~__FUTURE_TIMEOUT; while ((future->flag & __FUTURE_FINISHED) == 0) { if (seconds) { struct timespec expire_time; clock_gettime(CLOCK_MONOTONIC, &expire_time); expire_time.tv_sec += seconds; int status = pthread_cond_timedwait(&future->cond_finished, &future->mutex, &expire_time); if (status == ETIMEDOUT) { future->flag |= __FUTURE_TIMEOUT; pthread_mutex_unlock(&future->mutex); return NULL; } } else pthread_cond_wait(&future->cond_finished, &future->mutex); } pthread_mutex_unlock(&future->mutex); return future->result; } ``` - 若欲取得結果的 threadtask 尚未執行完畢,則使用 `pthread_cond_wait(&future->cond_finished, &future->mutex)` 等待執行完畢 - 若有設定等待時間 `second`,則使用 `pthread_cond_timedwait(&future->cond_finished, &future->mutex, &expire_time)` 等待 - 若時間到後任務尚未執行完畢,則將 future flag 中的 `__FUTURE_TIMEOUT` bit 設起 #### `tpool_future_destroy(tpool_future_t future)` ```c= /** * Destroy the future object and free resources once it is no longer used. * It is an error to refer to a destroyed future object. Note that destroying * a future object does not prevent a pending task from being executed. */ int tpool_future_destroy(struct __tpool_future *future) { if (future) { pthread_mutex_lock(&future->mutex); if (future->flag & __FUTURE_FINISHED || future->flag & __FUTURE_CANCELLED) { pthread_mutex_unlock(&future->mutex); pthread_mutex_destroy(&future->mutex); pthread_cond_destroy(&future->cond_finished); free(future); } else { future->flag |= __FUTURE_DESTROYED; pthread_mutex_unlock(&future->mutex); } } return 0; } ``` - 將 mutex unlock,釋放記憶體空間,將 future flag 的 `__FUTURE_DESTROYED` bit 設起 #### `jobqueue_fetch(void *queue)` ```c= static void *jobqueue_fetch(void *queue) { jobqueue_t *jobqueue = (jobqueue_t *) queue; threadtask_t *task; int old_state; pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock); while (1) { pthread_mutex_lock(&jobqueue->rwlock); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state); pthread_testcancel(); while (!jobqueue->tail) pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state); if (jobqueue->head == jobqueue->tail) { task = jobqueue->tail; jobqueue->head = jobqueue->tail = NULL; } else { threadtask_t *tmp; for (tmp = jobqueue->head; tmp->next != jobqueue->tail; tmp = tmp->next) ; task = tmp->next; tmp->next = NULL; jobqueue->tail = tmp; } pthread_mutex_unlock(&jobqueue->rwlock); if (task->func) { pthread_mutex_lock(&task->future->mutex); if (task->future->flag & __FUTURE_CANCELLED) { pthread_mutex_unlock(&task->future->mutex); free(task); continue; } else { task->future->flag |= __FUTURE_RUNNING; pthread_mutex_unlock(&task->future->mutex); } void *ret_value = task->func(task->arg); pthread_mutex_lock(&task->future->mutex); if (task->future->flag & __FUTURE_DESTROYED) { pthread_mutex_unlock(&task->future->mutex); pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); } else { task->future->flag |= __FUTURE_FINISHED; task->future->result = ret_value; pthread_cond_broadcast(&task->future->cond_finished); pthread_mutex_unlock(&task->future->mutex); } free(task); } else { pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); free(task); break; } } pthread_cleanup_pop(0); pthread_exit(NULL); } ``` - 設定此 thread 的 clean-up function 為 `__jobqueue_fetch_cleanup` - clean-up function 的內容是將 jobqueue 的 mutex lock 釋放 - 使用 `pthread_setcancelstate` 讓此 thread 可以被外部 cancel - 使用 `pthread_testcancel` 設定一個 cancelation point - 當外部 cancel 此 thread 時,可以確保此 thread 在這個 cancelation point 以後才被終止 - 從 jobqueue 的尾端 pop 出一個 task 並執行 - 執行完畢後將 return value 存至 `task->future` 中 - 用 `pthread_cond_broadcast` 喚醒在等待此 task 執行並回傳的 thread --- ## [atomic_threadpool](https://github.com/Taymindis/atomic_threadpool) 的操作手法 ---