# 2021q1 Homework4 (quiz4) contributed by < `tiffany6022` > ###### tags: `linux2021` > [作業說明](https://hackmd.io/@sysprog/linux2021-quiz4) - [x] 解釋上述程式碼運作原理,包含 timeout 處理機制,指出改進空間並實作 - [ ] 研讀 atomic_threadpool,指出其 atomic 操作的運用方式,並說明該 lock-free 的手法 - [ ] 嘗試使用 C11 Atomics 改寫上述程式碼,使其有更好的 scalability ## 解釋程式運作原理 ### 程式結構 ```cpp struct __threadpool { // (tpool_t) size_t count; // 紀錄 thread 數量 pthread_t *workers; // 指向 thread 的地址 jobqueue_t *jobqueue; // 裝 job 的 queue (呈下) }; typedef struct __jobqueue { threadtask_t *head, *tail; // 頭尾的 task 地址 (呈下) pthread_cond_t cond_nonempty; // 當 jobqueue 是不空的時會發通知 (signal、broadcast) pthread_mutex_t rwlock; // 關於 jobqueue 的 lock } jobqueue_t; typedef struct __threadtask { void *(*func)(void *); // function pointer 指向當前 thread 要做的 task void *arg; // 上述 function 帶入的參數 struct __tpool_future *future; // 指向跟此 task 相關的資訊 (呈下) struct __threadtask *next; // 指向當前 task 在 jobqueue 中的下一個 task } threadtask_t; struct __tpool_future { // (tpool_future_t) int flag; // 存此 task 的狀態 void *result; // 存此 task 的 function 執行完後的結果 pthread_mutex_t mutex; // 關於 __tpool_future 的 lock pthread_cond_t cond_finished; // 當 task 的 function 執行完後發通知 (signal、broadcast) }; ``` ### 程式函式 #### tpool * create (創建 thread pool,成功 return pool,失敗 return NULL) ```cpp struct __threadpool *tpool_create(size_t count) { /* 創建 jobqueue 並建立 __threadpool 的空間 * 若任一建立失敗,destroy 並釋放資源 */ jobqueue_t *jobqueue = jobqueue_create(); struct __threadpool *pool = malloc(sizeof(struct __threadpool)); if (!jobqueue || !pool) { if (jobqueue) jobqueue_destroy(jobqueue); free(pool); return NULL; } /* 創建成功後,給定 pool->count 和 pool->jobqueue * 依據 count 數,建立對應的空間給 pool->workers * workers 創建成功後,創建 threads 並分配每個任務給 threads * 任務為 `jobqueue_fetch` 函式,所帶參數為 `(void *) jobqueue` */ pool->count = count, pool->jobqueue = jobqueue; if ((pool->workers = malloc(count * sizeof(pthread_t)))) { for (int i = 0; i < count; i++) { if (pthread_create(&pool->workers[i], NULL, jobqueue_fetch, (void *) jobqueue)) { /* * 若當中有 pthread_create 失敗的 thread,return 值不為 0 * 先將之前創建的 threads 都提交 cancellation request * pthread_join 可當一個取消點 * 因此可以取消 threads 並將資源釋放掉 */ for (int j = 0; j < i; j++) pthread_cancel(pool->workers[j]); for (int j = 0; j < i; j++) pthread_join(pool->workers[j], NULL); free(pool->workers); jobqueue_destroy(jobqueue); free(pool); return NULL; } } return pool; } jobqueue_destroy(jobqueue); free(pool); return NULL; } ``` * apply (將 function 加入 jobqueue 的 task) ```cpp struct __tpool_future *tpool_apply(struct __threadpool *pool, void *(*func)(void *), void *arg) { /* 建立 threadtask_t 的空間和創建跟存 task 資訊的 __tpool_future * 若不成功,destroy 並釋放對應資源 * 若成功,將前面建的 threadtask_t (new_head) 給定 func、arg、future * 並將 new_head 放入 jobqueue 的 head 位置 * 注意操作 jobqueue 的 head tail 需要用 jobqueue->rwlock 保護 */ jobqueue_t *jobqueue = pool->jobqueue; threadtask_t *new_head = malloc(sizeof(threadtask_t)); struct __tpool_future *future = tpool_future_create(); if (new_head && future) { new_head->func = func, new_head->arg = arg, new_head->future = future; pthread_mutex_lock(&jobqueue->rwlock); if (jobqueue->head) { new_head->next = jobqueue->head; jobqueue->head = new_head; } else { /* * 若 jobqueue 本來為空的,將用 CV(jobqueue->cond_nonempty) 發通知 */ jobqueue->head = jobqueue->tail = new_head; pthread_cond_broadcast(&jobqueue->cond_nonempty); // HHH } pthread_mutex_unlock(&jobqueue->rwlock); } else if (new_head) { free(new_head); return NULL; } else if (future) { tpool_future_destroy(future); return NULL; } return future; } ``` * join (等 thread pool 中的 threads 都完成 join 回來,將資源釋放掉) ```cpp int tpool_join(struct __threadpool *pool) { /* 對每個 threads 依次將 func 和 arg 為 NULL 的 task 放入 jobqueue 中 * 執行 join 等待每個 threads 結束並釋放資源 */ size_t num_threads = pool->count; for (int i = 0; i < num_threads; i++) tpool_apply(pool, NULL, NULL); for (int i = 0; i < num_threads; i++) pthread_join(pool->workers[i], NULL); free(pool->workers); jobqueue_destroy(pool->jobqueue); free(pool); return 0; } ``` #### jobqueue * create (創建 jobqueue) ```cpp static jobqueue_t *jobqueue_create(void) { /* 初始化所有值 */ jobqueue_t *jobqueue = malloc(sizeof(jobqueue_t)); if (jobqueue) { jobqueue->head = jobqueue->tail = NULL; pthread_cond_init(&jobqueue->cond_nonempty, NULL); pthread_mutex_init(&jobqueue->rwlock, NULL); } return jobqueue; } ``` * destroy (將 jobqueue destroy) ```cpp static void jobqueue_destroy(jobqueue_t *jobqueue) { /* 從 jobqueue 的 head 開始依次 destroy * 當要讀取 task 中的 future 的 flag 時,需要 lock 保護 */ threadtask_t *tmp = jobqueue->head; while (tmp) { jobqueue->head = jobqueue->head->next; pthread_mutex_lock(&tmp->future->mutex); /* * 若 flag 為 __FUTURE_DESTROYED(0001 0100) 或 __FUTURE_TIMEOUT(0000 0100) * 需要 destroy future 的 mutex 和 CV 並釋放 future 資源 */ if (tmp->future->flag & __FUTURE_DESTROYED) { pthread_mutex_unlock(&tmp->future->mutex); pthread_mutex_destroy(&tmp->future->mutex); pthread_cond_destroy(&tmp->future->cond_finished); free(tmp->future); } else { /* * 反之,將 flag 設為 __FUTURE_CANCELLED (0000 1010) */ tmp->future->flag |= __FUTURE_CANCELLED; pthread_mutex_unlock(&tmp->future->mutex); } free(tmp); tmp = jobqueue->head; } /* 若 jobqueue 為空的,只須分別把 lock 和 condition variable 毀滅 * 再釋放 jobqueue 的資源 */ pthread_mutex_destroy(&jobqueue->rwlock); pthread_cond_destroy(&jobqueue->cond_nonempty); free(jobqueue); } ``` * fetch (每個 threads 執行的 function,執行 jobqueue 中的每個 tasks) ```cpp static void __jobqueue_fetch_cleanup(void *arg) { /* pthread_cleanup_push 中執行的 function * 為了避免結束 thread 時 lock 可能還是鎖的,要解鎖 */ pthread_mutex_t *mutex = (pthread_mutex_t *) arg; pthread_mutex_unlock(mutex); } static void *jobqueue_fetch(void *queue) { jobqueue_t *jobqueue = (jobqueue_t *) queue; threadtask_t *task; int old_state; pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock); while (1) { pthread_mutex_lock(&jobqueue->rwlock); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state); pthread_testcancel(); /* 若 jobqueue 為空的,等待當 jobqueue 不為空的時 cond_nonempty 的通知 * 不為空時,拿出 jobqueue 中的 tail 存給 `task` */ while (!jobqueue->tail) pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock); // GGG pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state); if (jobqueue->head == jobqueue->tail) { task = jobqueue->tail; jobqueue->head = jobqueue->tail = NULL; } else { threadtask_t *tmp; for (tmp = jobqueue->head; tmp->next != jobqueue->tail; tmp = tmp->next) ; task = tmp->next; tmp->next = NULL; jobqueue->tail = tmp; } pthread_mutex_unlock(&jobqueue->rwlock); /* 當 task->func 不為空時,執行 task->func(task->arg) * 結果寫入 task->future->result */ if (task->func) { pthread_mutex_lock(&task->future->mutex); /* * 若 flag 為 __FUTURE_CANCELLED (0000 1010) 或 __FUTURE_FINISHED (0000 0010) * 釋放 task 資源並直接 continue 重新找下一個 task * 注意操作 flag 時需要 future->mutex 保護 */ if (task->future->flag & __FUTURE_CANCELLED) { pthread_mutex_unlock(&task->future->mutex); free(task); continue; } else { /* * 反之,將 flag 設為 __FUTURE_RUNNING (0000 0001) */ task->future->flag |= __FUTURE_RUNNING; pthread_mutex_unlock(&task->future->mutex); } void *ret_value = task->func(task->arg); pthread_mutex_lock(&task->future->mutex); /* * 若 flag 為 __FUTURE_DESTROYED(0001 0100) 或 __FUTURE_TIMEOUT(0000 0100) * 需要 destroy future 的 mutex 和 CV 並釋放 future 資源 */ if (task->future->flag & __FUTURE_DESTROYED) { pthread_mutex_unlock(&task->future->mutex); pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); } else { /* * 反之,將 flag 設為 __FUTURE_FINISHED * 並將結果存給 future->result * 用 cond_finished 發通知給 tpool_future_get function * 通知可以返回結果值 */ task->future->flag |= __FUTURE_FINISHED; // KKK task->future->result = ret_value; pthread_cond_broadcast(&task->future->cond_finished); // LLL pthread_mutex_unlock(&task->future->mutex); } free(task); } else { /* * 若 task->func 為空時 * 先後釋放 future 和 task 資源 * 並結束無窮迴圈 */ pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); free(task); break; } } pthread_cleanup_pop(0); // 不執行之前 push 的清除函式 pthread_exit(NULL); // pthread 終止 } ``` #### tpool_future * create (創建 tpool_future) ```cpp static struct __tpool_future *tpool_future_create(void) { /* 初始化所有值 */ struct __tpool_future *future = malloc(sizeof(struct __tpool_future)); if (future) { future->flag = 0; future->result = NULL; pthread_mutex_init(&future->mutex, NULL); pthread_condattr_t attr; pthread_condattr_init(&attr); pthread_cond_init(&future->cond_finished, &attr); pthread_condattr_destroy(&attr); } return future; } ``` * destroy ```cpp int tpool_future_destroy(struct __tpool_future *future) { if (future) { pthread_mutex_lock(&future->mutex); /* * 若 flag 為 __FUTURE_CANCELLED (0000 1010) 或 __FUTURE_FINISHED (0000 0010) * 需要 destroy future 的 mutex 和 CV 並釋放 future 資源 * (可以確定 future 是完成或取消的狀態) */ if (future->flag & __FUTURE_FINISHED || future->flag & __FUTURE_CANCELLED) { pthread_mutex_unlock(&future->mutex); pthread_mutex_destroy(&future->mutex); pthread_cond_destroy(&future->cond_finished); free(future); } else { /* * 反之,flag 設為 __FUTURE_DESTROYED * 當執行到 jobqueue_destroy 時,會再釋放 future 資源 */ future->flag |= __FUTURE_DESTROYED; pthread_mutex_unlock(&future->mutex); } } return 0; } ``` * get (取得 result 和實現 timeout 功能) ```cpp void *tpool_future_get(struct __tpool_future *future, unsigned int seconds) { pthread_mutex_lock(&future->mutex); /* turn off the timeout bit set previously */ future->flag &= ~__FUTURE_TIMEOUT; while ((future->flag & __FUTURE_FINISHED) == 0) { if (seconds) { /* * 若 seconds 不為 0 (使用者有限定執行時間) * 利用 timespec 計算時間,將現在時間加上限定的執行時間 * 再透過 pthread_cond_timedwait,若時間到或被通知都會被喚醒 * 但若是時間到被喚醒的,return 值為 ETIMEDOUT */ struct timespec expire_time; clock_gettime(CLOCK_MONOTONIC, &expire_time); expire_time.tv_sec += seconds; int status = pthread_cond_timedwait(&future->cond_finished, &future->mutex, &expire_time); if (status == ETIMEDOUT) { /* * 時間到 task 還沒做完 * 將 flag 設為 __FUTURE_TIMEOUT * 結果回傳 NULL */ future->flag |= __FUTURE_TIMEOUT; pthread_mutex_unlock(&future->mutex); return NULL; } } else /* * 若 seconds 為 0 (使用者未限定執行時間) * 等 cond_finished 通知可以繼續回傳 future->result */ pthread_cond_wait(&future->cond_finished, &future->mutex); // FFF } pthread_mutex_unlock(&future->mutex); return future->result; } ``` #### 關於 pthread 取消 * **pthread_cancel** (pthread_t thread) * 提交一個 cancellation request 給 thread * 會不會取消或取消時機根據 cancellation 的 **state** 和 **type** 有所不同 * 取消成功 return 0 * **pthread_setcancelstate** (int state, int \*oldstate) * state 分為以下兩種: * **PTHREAD_CANCEL_ENABLE** $\rightarrow$ 設為取消狀態 * **PTHREAD_CANCEL_DISABLE** $\rightarrow$ 設為不取消狀態 * **pthread_setcanceltype** (int type, int \*oldtype) * type 分為以下兩種: * **PTHREAD_CANCEL_DEFERRED** $\rightarrow$ 運行制下一個取消點再執行取消動作 * **PTHREAD_CANCEL_ASYNCHRONOUS** $\rightarrow$ 立即執行取消動作 * 僅當 cancellation 的 state 為 ENABLE 時有效:exclamation: * **pthread_testcancel** (void) * 創建一個取消點 * 僅當 cancellation 的 state 為 ENABLE 和 type 為 DEFFERED 時有效 :exclamation: #### 關於 pthread 終止 不論是正常或是非正常終止都有資源釋放的問題 (終止時 lock 可能還是鎖的) * 正常 ex: pthread_exit()、return * 非正常 ex: pthread_cancel()、出錯而退出等等 所以當 thread 終止時 clean-up handler 會自動執行 stack 中的清理函式 * **pthread_cleanup_push** (void (*routine)(void *), void *arg) * 將清理函式 routine 放入 stack 中 * arg 為清理函式所帶的參數 * **pthread_cleanup_pop** (int execute) * pop 並執行 stack 最上方的清理函式 * 只有當 execute 為非 0 時有效 :exclamation: 因此要執行清理函式需要滿足以下條件: 1. a thread is canceled 2. a thread terminates by calling pthread_exit() 3. pthread_cleanup_pop() with a nonzero execute argument 也就是終止動作包括 pthread_exit() 和非正常終止,但不包括 return,且 execute 為非 0,才會執行 stack 中的清理函式 #### 關於 future 中的 flag ```cpp enum __future_flags { __FUTURE_RUNNING = 01, // 0000 0001 __FUTURE_FINISHED = 02, // 0000 0010 __FUTURE_TIMEOUT = 04, // 0000 0100 __FUTURE_CANCELLED = 010, // 0000 1010 __FUTURE_DESTROYED = 020, // 0001 0100 }; ``` 利用 flag 來判斷狀態 * `__FUTURE_FINISHED & __FUTURE_CANCELLED` 結果為 1 * `__FUTURE_TIMEOUT & __FUTURE_DESTROYED` 結果為 1