--- tags: Linux --- # 2021q1 Homework4 (quiz4) contributed by < `Chialiang86` > ## 作業要求 - [x] 解釋程式碼運作原理,包含 timeout 處理機制。 - [ ] 指出改進空間並實作 - [ ] 研讀 [atomic_threadpool](https://github.com/Taymindis/atomic_threadpool),指出其 atomic 操作,並說明 lock-free - [ ] 嘗試使用 C11 Atomics 改寫,使其有更好的 scalability ## 程式碼運作原理 ### data structure - 由上到下看資料型態 #### **1. struct __threadpool** - 為最上層的資料結構 - `count` 為 pthread 的數量,在建立 thread pool 時提供 - `workers` 為 pthread 的陣列,總共包含 `count` 個 threads - `jobqueue` 為一個單向 linked list ,為所有 task 存放的容器 ```c= typedef struct __threadpool *tpool_t; struct __threadpool { size_t count; pthread_t *workers; jobqueue_t *jobqueue; }; ``` #### **2. jobqueue_t** - 為 task 所在的 queue ,採用單向 linked list 來實作 - `head`, `tail` 為 job queue 的首跟尾、方便插入及取出 task 時以 O(1) 的時間複雜度操作 - `cond_nonempty` 為 condition variable 、在 job queue 為空時,藉由呼叫 `pthread_cond_wait` 來等待 task 插入 job queue 中成為非空 ; 反之當 job queue 變為非空,藉由呼叫 `pthread_cond_broadcast` 來提醒所有 thread job queue 中已有待執行的 task - `rwlock` 為保護 job queue 在多執行序環境下執行 task 的插入或取出動作時,可以藉由 lock, unlock 來解決同步問題,使 job queue 的操作為 atomic ```c= typedef struct __jobqueue { threadtask_t *head, *tail; pthread_cond_t cond_nonempty; pthread_mutex_t rwlock; } jobqueue_t; ``` #### **3. threadtask_t** - 包裝所有 task 相關的資訊 - `func` 為待執行 task 的 function pointer ,可見其參數及回傳值都為 `void *` - `arg` 為 task 存放參數的 pointer ,雖然看似 `arg` 只能存一個 argument ,但實際上若有多個 arguments 要存放可將其包裝成一個 struct ,再將該 struct 的 pointer 轉型成 `void *` 讓 arg 接收 - `future` 主要為存放 task 執行的結果、同時包含 task 執行的狀態、 return value 的同步保護機制和提供 condition variable 幫助其他 thread 去依照不同的狀態進行對應的操作(在 return value 尚未被算出來時等待執行, return value 被算完則提醒其他執行緒可以獲取運算結果) ```c= typedef struct __threadtask { void *(*func)(void *); void *arg; struct __tpool_future *future; struct __threadtask *next; } threadtask_t; ``` #### **4. struct __tpool_future** - `flag` 表示 task 的執行狀態,方便其他執行緒依照狀態做對應動作 ```c= enum __future_flags { __FUTURE_RUNNING = 01, // 正在運算中 __FUTURE_FINISHED = 02, // 已將結果運算出來 __FUTURE_TIMEOUT = 04, // 超時 __FUTURE_CANCELLED = 010, // thread 被取消 __FUTURE_DESTROYED = 020, // thread 被釋放 }; ``` - `result` 存放 task 回傳值(此範例將 result 轉型成為 double 來接收圓周率運算的值) - `mutex` 控制對此結構的同步存取 - `cond_finished` 提供其他執行緒回傳值目前的狀態、依照此狀態來進行等待或繼續執行 ```c= typedef struct __tpool_future *tpool_future_t struct __tpool_future { int flag; void *result; pthread_mutex_t mutex; pthread_cond_t cond_finished; }; ``` #### **總結** - 圖形化整個資料結構的概況 ```graphviz digraph tpool_future_t { node [shape=record]; rankdir=LR; __threadpool [label="<f0> count|<f1> workers|<f2> jobqueue"]; __threadtask [label="<f0> func|<f1> arg|<f2> future|<f3> next"]; __threadtask1 [label="<f0> func|<f1> arg|<f2> future|<f3> next"]; __jibqueue [label="<f0> head|<f1> tail|<f2> cond_noempty|<f3> rwlock"]; tpool_future_t [label="<f0> flag|<f1> result|<f2> mutex|<f3> cond_finished"] more1[label="..."] more2[label="..."] more3[label="..."] tpool_t->__threadpool; __jibqueue:f0->__threadtask __jibqueue:f1->__threadtask1:f3 subgraph queue{ name [shape=plaintext, fontsize="36" label="jobqueue"] name -> more3[color=none] __threadtask:f3->more1 more1->more2 more2->more3 more3->__threadtask1:f2 graph[style=dotted]; } __threadtask1:f2->tpool_future_t:f0 __threadpool:f2->__jibqueue } ``` ### interface - 從 main 中觀察使用到的 thread pool 的 interface - 先呼叫 `tpool_create` 來建立 thread pool 並初始化指定數目的執行緒 - 再進入 `tpool_apply` 來進行 task 的指派、將 task 加入 job queue 中,並同時初始化回傳值 `future` 陣列,包含動態配置 `truct __tpool_future` 記憶體空間以及 condition variable 的初始化 - `pool_future_get` 等待並獲取運算回傳的結果,運算完成則呼叫 `tpool_future_destroy` 來將 `truct __tpool_future` 記憶體空間釋放,最後用 `tpool_join` 來將 thread pool 進行完整的記憶體釋放。 ```c= int main() { int bpp_args[PRECISION + 1]; double bpp_sum = 0; tpool_t pool = tpool_create(4); tpool_future_t futures[PRECISION + 1]; for (int i = 0; i <= PRECISION; i++) { bpp_args[i] = i; futures[i] = tpool_apply(pool, bpp, (void *) &bpp_args[i]); } for (int i = 0; i <= PRECISION; i++) { double *result = tpool_future_get(futures[i], 0 /* blocking wait */); bpp_sum += *result; tpool_future_destroy(futures[i]); free(result); } tpool_join(pool); printf("PI calculated with %d terms: %.15f\n", PRECISION + 1, bpp_sum); return 0; } ``` #### **1. tpool_create** - 掌管 thread 的創建,共會產生新的 `count` 個 pthread ,`pthread_create(&pool->workers[i], NULL, jobqueue_fetch, (void *) jobqueue)` 為創建 task 的 API ,若創建成功回傳 0 跳出 if 判斷式; 反之則進入 15-24 行 if 區塊將 pthread 取消、並釋放 jobqueue, pthread, thread pool - 函式回傳被創建好 pthread 的 thread pool - 由 `pthread_create` 的 argument 可見其將名為 `jobqueue_fetch` 的 function pointer 當作執行目標,並將 `jobqueue` 作為 `jobqueue_fetch` 的 argument ,接著會細看此函數的定義 ```c= struct __threadpool *tpool_create(size_t count) { jobqueue_t *jobqueue = jobqueue_create(); struct __threadpool *pool = malloc(sizeof(struct __threadpool)); if (!jobqueue || !pool) { if (jobqueue) jobqueue_destroy(jobqueue); free(pool); return NULL; } pool->count = count, pool->jobqueue = jobqueue; if ((pool->workers = malloc(count * sizeof(pthread_t)))) { for (int i = 0; i < count; i++) { if (pthread_create(&pool->workers[i], NULL, jobqueue_fetch, (void *) jobqueue)) { for (int j = 0; j < i; j++) pthread_cancel(pool->workers[j]); for (int j = 0; j < i; j++) pthread_join(pool->workers[j], NULL); free(pool->workers); jobqueue_destroy(jobqueue); free(pool); return NULL; } } return pool; } jobqueue_destroy(jobqueue); free(pool); return NULL; } ``` #### **2. jobqueue_fetch** - 由函數名稱可見其主要功能就是從 job queue 中獲取新的 task 來執行 - 關於 `pthread_cleanup_push`, `pthread_clean_push` 和 thread cancellation clean-up handlers :::info - 當 thread 執行結束 (呼叫 `pthread_exit` 或 `pthread_cancel`) 時,系統會觸發 [thread cancellation clean-up handlers](https://man7.org/linux/man-pages/man3/pthread_cleanup_push.3.html) > A clean-up handler is a function that is automatically executed when a thread is canceled - cleanup handlers 的加入或取出會遵守 stack (last in first out) 的原則,而 `pthread_cleanup_push`, `pthread_clean_push` 為操作 clean-up handler 的介面,兩函數必須成對出現(在同一層級的大括號內) - 有三種情況會觸發 clean-up handler stack 的 pop 動作 1. pthread 被 cancel 2. 呼叫 pthread_exit 3. 呼叫 pthread_clean_push ::: - `int pthread_setcancelstate(int state, int *oldstate)` : 主要為設置 thread 取消的狀態 (cancelability type),並回傳舊的 cancelability type 至 oldstate ,兩種 state 為 - `PTHREAD_CANCEL_ENABLE` : 預設值,cancel 的請求會被回應 - `PTHREAD_CANCEL_DISABLE` : 當 cancel 的請求發出會進行 block 的動作,直到 cancelability type 變為 PTHREAD_CANCEL_ENABLE - `int pthread_setcanceltype(int type, int *oldtype)` : 主要功能為設定 pthread cancel 時的回應速度,並傳舊的設定值給 oldtype ,而 type 也有兩種: - `PTHREAD_CANCEL_DEFERRED` : 為預設值, cancel 呼叫時會被延遲到下一個 cancellation point 時才會進行 pthread 的 cancel - `PTHREAD_CANCEL_ASYNCHRONOUS` : 當收到 cancel 的請求時會立馬執行 cancel 的動作 :::info - 有一系列[函數]()在被呼叫時會被作為 cancellation point 如 - pthread_cond_timedwait() - pthread_cond_wait() - pthread_join() - pthread_testcancel() - 其他被作為 cancellation point 的函數可見 [pthreads(7) manual page](https://www.man7.org/linux/man-pages/man7/pthreads.7.html) 中的 cancellation points 章節 ::: ```c= static void *jobqueue_fetch(void *queue) { jobqueue_t *jobqueue = (jobqueue_t *) queue; threadtask_t *task; int old_state; pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock); while (1) { pthread_mutex_lock(&jobqueue->rwlock); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state); pthread_testcancel(); while (!jobqueue->tail) pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state); if (jobqueue->head == jobqueue->tail) { task = jobqueue->tail; jobqueue->head = jobqueue->tail = NULL; } else { threadtask_t *tmp; for (tmp = jobqueue->head; tmp->next != jobqueue->tail; tmp = tmp->next) ; task = tmp->next; tmp->next = NULL; jobqueue->tail = tmp; } pthread_mutex_unlock(&jobqueue->rwlock); if (task->func) { pthread_mutex_lock(&task->future->mutex); if (task->future->flag & __FUTURE_CANCELLED) { pthread_mutex_unlock(&task->future->mutex); free(task); continue; } else { task->future->flag |= __FUTURE_RUNNING; pthread_mutex_unlock(&task->future->mutex); } void *ret_value = task->func(task->arg); pthread_mutex_lock(&task->future->mutex); if (task->future->flag & __FUTURE_DESTROYED) { pthread_mutex_unlock(&task->future->mutex); pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); } else { task->future->flag |= __FUTURE_FINISHED; task->future->result = ret_value; pthread_cond_broadcast(&task->future->cond_finished); pthread_mutex_unlock(&task->future->mutex); } free(task); } else { pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); free(task); break; } } pthread_cleanup_pop(0); pthread_exit(NULL); } ``` #### **3. tpool_apply** - 將 task 及對應的 argument 加入 thread pool 中 job queue 的 head ,其中 `struct __tpool_future *future = tpool_future_create();` 為初始化 return value 的結構 - `pthread_cond_broadcast(&jobqueue->cond_nonempty);` 為當 job queue 從空變為非空時,提醒所有 thread 現在有新的 task 進入 job queue 中待執行 - `pthread_mutex_lock(&jobqueue->rwlock);` 控制從 job queue 插入或取出 task 的同步操作 ```c= struct __tpool_future *tpool_apply(struct __threadpool *pool, void *(*func)(void *), void *arg) { jobqueue_t *jobqueue = pool->jobqueue; threadtask_t *new_head = malloc(sizeof(threadtask_t)); struct __tpool_future *future = tpool_future_create(); if (new_head && future) { new_head->func = func, new_head->arg = arg, new_head->future = future; pthread_mutex_lock(&jobqueue->rwlock); if (jobqueue->head) { new_head->next = jobqueue->head; jobqueue->head = new_head; } else { jobqueue->head = jobqueue->tail = new_head; pthread_cond_broadcast(&jobqueue->cond_nonempty) /*HHH*/; } pthread_mutex_unlock(&jobqueue->rwlock); } else if (new_head) { free(new_head); return NULL; } else if (future) { tpool_future_destroy(future); return NULL; } return future; } ``` #### **4. tpool_future_get** - 重複確認 task 回傳值直到已經被運算完成,並將結果 return - `pthread_mutex_lock(&future->mutex);` 控制 `struct __tpool_future` 的同步,包含 `cond_finished` 等 - `seconds` 控制執行的時間,以秒為單位,若超過時間還未將結果運算出來則立下 `__FUTURE_TIMEOUT` 的 flag ,表示執行失敗並回傳 `NULL` - 若 `seconds` 為 0 則表示直到運算完成前會持續等待,實做機制使用 `pthread_cond_wait(&future->cond_finished, &future->mutex)` 控制,釋放 `future->mutex` 互斥鎖並用 `future->cond_finished` condition variable 來等待運算結果被算完 ```c= void *tpool_future_get(struct __tpool_future *future, unsigned int seconds) { pthread_mutex_lock(&future->mutex); /* turn off the timeout bit set previously */ future->flag &= ~__FUTURE_TIMEOUT; while ((future->flag & __FUTURE_FINISHED) == 0) { if (seconds) { struct timespec expire_time; clock_gettime(CLOCK_MONOTONIC, &expire_time); expire_time.tv_sec += seconds; int status = pthread_cond_timedwait(&future->cond_finished, &future->mutex, &expire_time); if (status == ETIMEDOUT) { future->flag |= __FUTURE_TIMEOUT; pthread_mutex_unlock(&future->mutex); return NULL; } } else pthread_cond_wait(&future->cond_finished, &future->mutex) /*FFF*/ ; } pthread_mutex_unlock(&future->mutex); return future->result; } ``` ## 改進 -