# 2021q1 Homework4 (quiz4) contributed by < `secminhr` > # 題目 ## FFF = ? ```c /** * Return the result when it becomes available. * If @seconds is non-zero and the result does not arrive within specified time, * NULL is returned. Each tpool_future_get() resets the timeout status on * @future. */ void *tpool_future_get(struct __tpool_future *future, unsigned int seconds) { pthread_mutex_lock(&future->mutex); /* turn off the timeout bit set previously */ future->flag &= ~__FUTURE_TIMEOUT; while ((future->flag & __FUTURE_FINISHED) == 0) { if (seconds) { struct timespec expire_time; clock_gettime(CLOCK_MONOTONIC, &expire_time); expire_time.tv_sec += seconds; int status = pthread_cond_timedwait(&future->cond_finished, &future->mutex, &expire_time); if (status == ETIMEDOUT) { future->flag |= __FUTURE_TIMEOUT; pthread_mutex_unlock(&future->mutex); return NULL; } } else FFF; } pthread_mutex_unlock(&future->mutex); return future->result; } ``` `tpool_future_get` 的職責是當 `future` 有結果時將其回傳,或者 timeout 的情況回傳 NULL 。 在 FFF 所在的位置是 `seconds` 為零,也就是沒有限制時間的情況。所以要做的事是等待結果,也就是 `pthread_cond_wait(&future->cond_finished, &future->mutex)` 答案為 `(a)` ## GGG = ? ```c static void *jobqueue_fetch(void *queue) { jobqueue_t *jobqueue = (jobqueue_t *) queue; threadtask_t *task; int old_state; pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock); while (1) { pthread_mutex_lock(&jobqueue->rwlock); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state); pthread_testcancel(); GGG; pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state); if (jobqueue->head == jobqueue->tail) { task = jobqueue->tail; jobqueue->head = jobqueue->tail = NULL; } else { threadtask_t *tmp; for (tmp = jobqueue->head; tmp->next != jobqueue->tail; tmp = tmp->next) ; task = tmp->next; tmp->next = NULL; jobqueue->tail = tmp; } pthread_mutex_unlock(&jobqueue->rwlock); ... } ``` `jobqueue_fetch` 是作為 thread 的內容。 而靠近 GGG 這一段的工作是把 `task` 從 `jobqueue` 裡取出來。 因為下面就要對 `jobqueue` 做操作的關係,因此這裡 GGG 要確保 `jobqueue` 非空,反過來說就是如果為空則等待: `while (!jobqueue->tail) pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock)` 答案為 `(c)` ## HHH = ? ```c /** * Schedules the specific function to be executed. * If successful, a future object representing the execution of * the task is returned. Otherwise, it returns NULL. */ struct __tpool_future *tpool_apply(struct __threadpool *pool, void *(*func)(void *), void *arg) { jobqueue_t *jobqueue = pool->jobqueue; threadtask_t *new_head = malloc(sizeof(threadtask_t)); struct __tpool_future *future = tpool_future_create(); if (new_head && future) { new_head->func = func, new_head->arg = arg, new_head->future = future; pthread_mutex_lock(&jobqueue->rwlock); if (jobqueue->head) { new_head->next = jobqueue->head; jobqueue->head = new_head; } else { jobqueue->head = jobqueue->tail = new_head; HHH; } pthread_mutex_unlock(&jobqueue->rwlock); } else if (new_head) { free(new_head); return NULL; } else if (future) { tpool_future_destroy(future); return NULL; } return future; } ``` `tpool_apply` 的用處是把 `func` 包進 `thread_task_t *new_head` 並將 `new_head` 排進 `jobqueue` 裡面。 HHH 所在的地方 `jobqueue->head` 為 NULL ,代表此時 `jobqueue` 為空。 因此在 `new_head` 排進去之後: `jobqueue->head = jobqueue->tail = new_head;` 要發消息讓在等待 `jobqueue->cond_nonempty` 的地方 (GGG) 可以繼續,因此這裡使用: `pthread_cond_broadcast(&jobqueue->cond_nonempty)` 答案為 `(b)` ## KKK = ?, LLL = ? ```c static void *jobqueue_fetch(void *queue) { ... if (task->func) { pthread_mutex_lock(&task->future->mutex); if (task->future->flag & __FUTURE_CANCELLED) { pthread_mutex_unlock(&task->future->mutex); free(task); continue; } else { task->future->flag |= __FUTURE_RUNNING; pthread_mutex_unlock(&task->future->mutex); } void *ret_value = task->func(task->arg); pthread_mutex_lock(&task->future->mutex); if (task->future->flag & __FUTURE_DESTROYED) { pthread_mutex_unlock(&task->future->mutex); pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); } else { task->future->flag |= KKK; task->future->result = ret_value; LLL; pthread_mutex_unlock(&task->future->mutex); } free(task); } else { pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); free(task); break; } } ``` KKK 所在的情況是 `func` 的回傳值已經取得而且 `task->future` 不是在 `__FUTURE_DESTROYED` 的狀態,代表這個 `task->future` 已經完成,所以 KKK 應為 `__FUTURE_FINISHED` 答案為 `(b)` LLL 所在位置是 KKK 下面第二行。這時已經將 `future->flag` 標成 FINISHED ,也已經得到結果。所以這裡應該發消息讓在等待 `cond_finished` 的地方 (`tpool_future_get`) 可以繼續,因此: `pthread_cond_broadcast(&task->future->cond_finished)` 答案為 `(a)` # 運作原理 ## 型態與結構 ### __future_flags #### __FUTURE_RUNNING-00001 **設定時機:** `jobqueue_fetch` , `func` 執行前 #### __FUTURE_FINISHED-00010 **設定時機:** `jobqueue_fetch` , `func` 執行後 **使用處:** `tpool_future_destroy`, 若 FINISHED 則釋放該 future `tpool_future_get`, 若還沒 FINISHED 則等待 #### __FUTURE_TIMEOUT-00100 **設定時機:** `tpool_future_get`, 超時後 **取消時機:** `tpool_future_get`, 計時前 #### __FUTURE_CANCELLED-01010 (8 | __FUTURE_FINISHED) **設定時機:** `jobqueue_destroy`, 當沒有設定 DESTROYED **使用處:** `tpool_future_destroy`, 若 CANCELLED 則釋放該 future `jobqueue_fetch`, 若 CANCELLED 則不執行 `func` 並釋放 `task` 。 `tpool_future_get`, 若還沒 FINISHED 則等待愛 #### __FUTURE_DESTROYED-10100 (16 | __FUTURE_TIMEOUT) **設定時機:** `tpool_future_destroy`, 當 future 不是 FINISHED 也不是 CANCELLED 。 **使用處:** `jobqueue_destroy`, 若 DESTROYED 則釋放該該future `jobqueue_fetch`, 若 DESTROYED 則釋放該 future #### Freeing path (驗證中) ```graphviz digraph { future[shape=box, label="future\n00000"] future->jobqueue_fetch future->tpool_future_destroy tpool_future_destroy->future2 future2[shape=box label="future\n10100"] future1[shape=box label="future\n00011"] jobqueue_fetch->future1 future1->tpool_future_destroy1 future2->jobqueue_destroy jobqueue_destroy[label="jobqueue_destroy (freed)"] future2->jobqueue_fetch1 jobqueue_fetch1[label="jobqueue_fetch (freed)"] tpool_future_destroy1[label="tpool_future_destroy (freed)"] } ``` ### __threadtask (threadtask_t) 放在 jobqueue 裡面,每個 worker 每次執行一個 task 。 ```c typedef struct __threadtask { void *(*func)(void *); void *arg; struct __tpool_future *future; struct __threadtask *next; } threadtask_t; ``` - `func`: 這個 task 要執行的函式 - `arg`: `func` 的參數 - `future`: 與這個 task 關聯的 future 物件 - `next`: 在 jobqueue 中下一個 task `next` 方向由新到舊 ```graphviz digraph { rankdir=LR newer_task -> older_task [label="next"] } ``` 在程式中有一種特殊的 task ,其 `func` 為 NULL ,用來提示 worker 該停止了。 下面遇到這種 `func` 為 NULL 的 task 時,將直接以 **終止 task** 稱之。 ### __jobqueue (jobqueue_t) 放著 `threadtask_t` 的 queue ,裡面的 task 將會被 worker 執行。 ```c typedef struct __jobqueue { threadtask_t *head, *tail; pthread_cond_t cond_nonempty; pthread_mutex_t rwlock; } jobqueue_t; ``` - `head`, `tail`: 管理這個 queue 的 head ( 最新 task ) 與 tail ( 最舊 task ) - `cond_nonempty`: 對應「這個 jobqueue 非空」的條件變數 - `rwlock`: 對 jobqueue 操作 ( 加入/取出 task ) 時的鎖 ### __tpool_future 對應一個 task 的執行結果 ```c struct __tpool_future { int flag; void *result; pthread_mutex_t mutex; pthread_cond_t cond_finished; }; ``` - `flag`: 五種 __future_flags ,標示目前此 future 的狀況 - `result`: task 被執行完畢後的結果 - `mutex`: 對這個 future 做修改 (`flag` 或 `result`) 時的鎖 - `cond_finished`: 對應「這個 future 已被完成」的條件變數 ### __threadpool ```c struct __threadpool { size_t count; pthread_t *workers; jobqueue_t *jobqueue; }; ``` - `count`: workers 數量 - `workers`: `pthread_t` 的陣列,這些 worker 將執行 `jobqueue` 中的 task - `jobqueue`: 存放著 task 的 queue ## 運作 ## Part A: initialization and task dispatch ### main: 1~9 ```c int main() { int bpp_args[PRECISION + 1]; double bpp_sum = 0; tpool_t pool = tpool_create(4); tpool_future_t futures[PRECISION + 1]; for (int i = 0; i <= PRECISION; i++) { bpp_args[i] = i; futures[i] = tpool_apply(pool, bpp, (void *) &bpp_args[i]); } ... } ``` `bpp_args` 是每個 `bpp` 函式的參數。 `bpp_sum` 是每個 `bpp` 函式回傳結果的相加。 `t_pool_create(4)` 會建立一個 `tpool_t` 並在裡面裡建立4個 worker 。 `futures` 放著每個 future 物件。索引跟 `bpp_args` 對應。 `for` 迴圈設定 `bpp_args` 跟 `futures` , `futures[i]` 由 `tpool_apply` 建立。 `tpool_apply` 會將傳進來的函式跟參數包裝成 `threadtask_t` 、將其放進 `pool` 的 `jobqueue` 裡並回傳相關的 future 。 ### 使用函式 #### tpool_create ```c struct __threadpool *tpool_create(size_t count) { jobqueue_t *jobqueue = jobqueue_create(); struct __threadpool *pool = malloc(sizeof(struct __threadpool)); if (!jobqueue || !pool) { if (jobqueue) jobqueue_destroy(jobqueue); free(pool); return NULL; } pool->count = count, pool->jobqueue = jobqueue; if ((pool->workers = malloc(count * sizeof(pthread_t)))) { for (int i = 0; i < count; i++) { if (pthread_create(&pool->workers[i], NULL, jobqueue_fetch, (void *) jobqueue)) { for (int j = 0; j < i; j++) pthread_cancel(pool->workers[j]); for (int j = 0; j < i; j++) pthread_join(pool->workers[j], NULL); free(pool->workers); jobqueue_destroy(jobqueue); free(pool); return NULL; } } return pool; } jobqueue_destroy(jobqueue); free(pool); return NULL; } ``` 前段部分(第一個空行前面)是建立 `jobqueue` 跟 `pool` ,如果失敗了就會清除然後回傳 NULL 。 中段部份是在使用 `pthread_create` 建立 `workers` 。每個 worker 都是一個 `pthread_t` 。 worker 要執行的是 `jobqueue_fetch` ,而 `jobqueue` 將被作為 `jobqueue_fetch` 的參數。 一旦有 worker 建立失敗(進入 `if` ),就會取消所有 worker ,並且清理完後回傳 NULL 。 後段部分(第二個空行之後)是 `pool->workers` 的 `malloc` 失敗的情況,會清理後回傳 NULL 。 `tpool_create` 中最重要的部分在 `pthread_create` ,是 **Part B 的啟動處**。 #### jobqueue_create ```c static jobqueue_t *jobqueue_create(void) { jobqueue_t *jobqueue = malloc(sizeof(jobqueue_t)); if (jobqueue) { jobqueue->head = jobqueue->tail = NULL; pthread_cond_init(&jobqueue->cond_nonempty, NULL); pthread_mutex_init(&jobqueue->rwlock, NULL); } return jobqueue; } ``` 這個函式比較單純,就是 `malloc` 出 `jobqueue` 的空間並設定好 `jobqueue` 內部的成員。 成功就回傳 `jobqueue` ,失敗則是 NULL 。 #### jobqueue_destroy ```c static void jobqueue_destroy(jobqueue_t *jobqueue) { threadtask_t *tmp = jobqueue->head; while (tmp) { jobqueue->head = jobqueue->head->next; pthread_mutex_lock(&tmp->future->mutex); if (tmp->future->flag & __FUTURE_DESTROYED) { pthread_mutex_unlock(&tmp->future->mutex); pthread_mutex_destroy(&tmp->future->mutex); pthread_cond_destroy(&tmp->future->cond_finished); free(tmp->future); } else { tmp->future->flag |= __FUTURE_CANCELLED; pthread_mutex_unlock(&tmp->future->mutex); } free(tmp); tmp = jobqueue->head; } pthread_mutex_destroy(&jobqueue->rwlock); pthread_cond_destroy(&jobqueue->cond_nonempty); free(jobqueue); } ``` `jobqueue_destroy` 的最後三行就是資源的釋放。 `while` 迴圈意圖將還在 `jobqueue` 裡面的 `threadtask_t` 清空,但是 while 這段程式不會執行到。 原因是 `jobqueue_destroy` 只在這些情況呼叫: 1. `tpool_create` 在 `malloc` 失敗或 `pthread_create` 失敗的情況 這時候還沒有任何的 `threadtask_t` 在 `jobqueue` 裡面。 2. `tpool_join` `tpool_join` 是在最後結束的時候被呼叫,用來釋放資源。 ```c int tpool_join(struct __threadpool *pool) { size_t num_threads = pool->count; for (int i = 0; i < num_threads; i++) tpool_apply(pool, NULL, NULL); for (int i = 0; i < num_threads; i++) pthread_join(pool->workers[i], NULL); free(pool->workers); jobqueue_destroy(pool->jobqueue); free(pool); return 0; } ``` 因為 `jobqueue_dentroy` 在 `pthread_join` 之後呼叫的關係,而 `pool` 裡面的 `workers` 只會在收到 `func` 為 NULL 的 `threadtask_t` 後停止,所以執行到 `jobqueue_destroy` 時 `jobqueue` 裡面就已經沒有 `threadtask_t` 了。 #### tpool_apply ```c struct __tpool_future *tpool_apply(struct __threadpool *pool, void *(*func)(void *), void *arg) { jobqueue_t *jobqueue = pool->jobqueue; threadtask_t *new_head = malloc(sizeof(threadtask_t)); struct __tpool_future *future = tpool_future_create(); if (new_head && future) { new_head->func = func, new_head->arg = arg, new_head->future = future; pthread_mutex_lock(&jobqueue->rwlock); if (jobqueue->head) { new_head->next = jobqueue->head; jobqueue->head = new_head; } else { jobqueue->head = jobqueue->tail = new_head; pthread_cond_broadcast(&jobqueue->cond_nonempty); } pthread_mutex_unlock(&jobqueue->rwlock); } else if (new_head) { free(new_head); return NULL; } else if (future) { tpool_future_destroy(future); return NULL; } return future; } ``` 這個函式將 `func` 和 `arg` 包進 `new_head` 裡,將 `new_head` 放到 `jobqueue` 的頭部並回傳和 `new_head` 相關的 `future` 。 在 `new_head` 和 `future` 都建立成功的情況,會將 `new_head` 加在 `jobqueue` 的頭部,如果 `jobqueue` 原本是空的還會發訊息 `pthread_cond_broadcast(&jobqueue->cond_nonempty);` 讓在等的 worker 可以繼續。 如果 `new_head` 或者 `future` 失敗的話則會釋放資源,並回傳 NULL 。 中間 `if (jobqueue->head)` 跟他的 else 是 Part A 的重要目標:**將 task 放進 `jobqueue`** 。 #### tpool_future_create ```c static struct __tpool_future *tpool_future_create(void) { struct __tpool_future *future = malloc(sizeof(struct __tpool_future)); if (future) { future->flag = 0; future->result = NULL; pthread_mutex_init(&future->mutex, NULL); pthread_condattr_t attr; pthread_condattr_init(&attr); pthread_cond_init(&future->cond_finished, &attr); pthread_condattr_destroy(&attr); } return future; } ``` 比較單純的函式, `malloc` 出 `future` 並設定其成員。 成功就回傳 `future` ,失敗則是 NULL 。 #### tpool_future_destroy ```c int tpool_future_destroy(struct __tpool_future *future) { if (future) { pthread_mutex_lock(&future->mutex); if (future->flag & __FUTURE_FINISHED || future->flag & __FUTURE_CANCELLED) { pthread_mutex_unlock(&future->mutex); pthread_mutex_destroy(&future->mutex); pthread_cond_destroy(&future->cond_finished); free(future); } else { future->flag |= __FUTURE_DESTROYED; pthread_mutex_unlock(&future->mutex); } } return 0; } ``` 作用是釋放 `future` ,但當 `future` 沒有 FINISHED 或者 CANCELLED 時(也就是沒有被 worker 處理過的話),只將 `flag` 標示成 `__FUTURE_DESTROYED` 。釋放將由之後的 `jobqueue_fetch` 或者 `jobqueue_destroy` 進行。 這次的程式中, else 的情況只發生在 `tpool_apply` 中 `malloc` 失敗時。 ## Part B: fetch and execute tasks Part A 將 task 放進 jobqueue 後,由 Part B 執行這些 task 。 ### jobqueue_fetch ```c static void *jobqueue_fetch(void *queue) { jobqueue_t *jobqueue = (jobqueue_t *) queue; threadtask_t *task; int old_state; pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock); while (1) { pthread_mutex_lock(&jobqueue->rwlock); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state); pthread_testcancel(); while (!jobqueue->tail) pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state); if (jobqueue->head == jobqueue->tail) { task = jobqueue->tail; jobqueue->head = jobqueue->tail = NULL; } else { threadtask_t *tmp; for (tmp = jobqueue->head; tmp->next != jobqueue->tail; tmp = tmp->next) ; task = tmp->next; tmp->next = NULL; jobqueue->tail = tmp; } pthread_mutex_unlock(&jobqueue->rwlock); if (task->func) { pthread_mutex_lock(&task->future->mutex); if (task->future->flag & __FUTURE_CANCELLED) { pthread_mutex_unlock(&task->future->mutex); free(task); continue; } else { task->future->flag |= __FUTURE_RUNNING; pthread_mutex_unlock(&task->future->mutex); } void *ret_value = task->func(task->arg); pthread_mutex_lock(&task->future->mutex); if (task->future->flag & __FUTURE_DESTROYED) { pthread_mutex_unlock(&task->future->mutex); pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); } else { task->future->flag |= __FUTURE_FINISHED; task->future->result = ret_value; pthread_cond_broadcast(&task->future->cond_finished); pthread_mutex_unlock(&task->future->mutex); } free(task); } else { pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); free(task); break; } } pthread_cleanup_pop(0); pthread_exit(NULL); } ``` 中間 `while(1)` 是主要獲取和執行 task 的部分。 `while (!jobqueue->tail)` 適當 `jobqueue` 是空著的情況,會等待 `jobqueue->cond_nonempty` 。 接下來的 `if(jobqueue->head == jobqueue->tail){ ... } else { ... }` 取出最尾部 (最舊)的 task ,並維護 jobqueue 的成員 (head/tail) 。 再來 `if (task->func) { ... } else { .... }` 用來判斷是否為[終止 task](https://hackmd.io/dJpJXy59QAasqrRMmB7orQ?both#__threadtask-threadtask_t) 。 如果不是 ( if 判斷為真 ) 則會執行這個 task ,如果是終止 task 則在釋放這個 task 之後跳出迴圈結束 `jobqueue_fetch` 。 #### task 執行 因為裡面還有邏輯,所以再挑出來 ```c pthread_mutex_lock(&task->future->mutex); if (task->future->flag & __FUTURE_CANCELLED) { pthread_mutex_unlock(&task->future->mutex); free(task); continue; } else { task->future->flag |= __FUTURE_RUNNING; pthread_mutex_unlock(&task->future->mutex); } void *ret_value = task->func(task->arg); pthread_mutex_lock(&task->future->mutex); if (task->future->flag & __FUTURE_DESTROYED) { pthread_mutex_unlock(&task->future->mutex); pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); } else { task->future->flag |= __FUTURE_FINISHED; task->future->result = ret_value; pthread_cond_broadcast(&task->future->cond_finished); pthread_mutex_unlock(&task->future->mutex); } free(task); ``` 如果 future 已經被標 CANCELLED ,則直接不執行 `func`。 如果標 DESTROYED ,則釋放 future 。 剩下的狀況就是要執行且要結果。所以執行完後標 FINISHED 、將結果放進 future 中並 broadcast `cond_finished` 。 這裡的 broadcast 銜接 Part C 。 ## Part C: getting results and exit 這是最後一部分:拿到 Part B 中的執行結果以及退出程式。 ### main: 11~20 ```c int main() { ... for (int i = 0; i <= PRECISION; i++) { double *result = tpool_future_get(futures[i], 0 /* blocking wait */); bpp_sum += *result; tpool_future_destroy(futures[i]); free(result); } tpool_join(pool); printf("PI calculated with %d terms: %.15f\n", PRECISION + 1, bpp_sum); return 0; } ``` 使用 `tpool_future_get` 和設定秒數 0 來等待結果 `result` 。 `bpp_sum += *result` 是公式的一部分。 每次計算完之後將 `futures[i]` 和 `result` 釋放。 最後使用 `tpool_join` 來釋放 `pool` 跟他的成員,並印出計算結果。 ### 使用函式 #### tpool_future_get ```c void *tpool_future_get(struct __tpool_future *future, unsigned int seconds) { pthread_mutex_lock(&future->mutex); /* turn off the timeout bit set previously */ future->flag &= ~__FUTURE_TIMEOUT; while ((future->flag & __FUTURE_FINISHED) == 0) { if (seconds) { struct timespec expire_time; clock_gettime(CLOCK_MONOTONIC, &expire_time); expire_time.tv_sec += seconds; int status = pthread_cond_timedwait(&future->cond_finished, &future->mutex, &expire_time); if (status == ETIMEDOUT) { future->flag |= __FUTURE_TIMEOUT; pthread_mutex_unlock(&future->mutex); return NULL; } } else pthread_cond_wait(&future->cond_finished, &future->mutex); } pthread_mutex_unlock(&future->mutex); return future->result; } ``` 若 `second` 為 0 則等待結果。 反之在時間內 `future` 沒有完成則標示 TIMEOUT 並回傳 NULL 。 #### tpool_join ```c int tpool_join(struct __threadpool *pool) { size_t num_threads = pool->count; for (int i = 0; i < num_threads; i++) tpool_apply(pool, NULL, NULL); for (int i = 0; i < num_threads; i++) pthread_join(pool->workers[i], NULL); free(pool->workers); jobqueue_destroy(pool->jobqueue); free(pool); return 0; } ``` 第一個 for 迴圈發送終止 task 。 第二個 for 迴圈等待所有 worker 終止(此時 jobqueue 也應清空,因為終止 task 在 jobqueue 最頭部)。 `jobqueue_destroy` 釋放 jobqueue,最後釋放 `pool` 。