# 2021q1 Homework4 (quiz4)
contributed by < `secminhr` >
# 題目
## FFF = ?
```c
/**
* Return the result when it becomes available.
* If @seconds is non-zero and the result does not arrive within specified time,
* NULL is returned. Each tpool_future_get() resets the timeout status on
* @future.
*/
void *tpool_future_get(struct __tpool_future *future, unsigned int seconds)
{
pthread_mutex_lock(&future->mutex);
/* turn off the timeout bit set previously */
future->flag &= ~__FUTURE_TIMEOUT;
while ((future->flag & __FUTURE_FINISHED) == 0) {
if (seconds) {
struct timespec expire_time;
clock_gettime(CLOCK_MONOTONIC, &expire_time);
expire_time.tv_sec += seconds;
int status = pthread_cond_timedwait(&future->cond_finished,
&future->mutex, &expire_time);
if (status == ETIMEDOUT) {
future->flag |= __FUTURE_TIMEOUT;
pthread_mutex_unlock(&future->mutex);
return NULL;
}
} else
FFF;
}
pthread_mutex_unlock(&future->mutex);
return future->result;
}
```
`tpool_future_get` 的職責是當 `future` 有結果時將其回傳,或者 timeout 的情況回傳 NULL 。
在 FFF 所在的位置是 `seconds` 為零,也就是沒有限制時間的情況。所以要做的事是等待結果,也就是
`pthread_cond_wait(&future->cond_finished, &future->mutex)`
答案為 `(a)`
## GGG = ?
```c
static void *jobqueue_fetch(void *queue)
{
jobqueue_t *jobqueue = (jobqueue_t *) queue;
threadtask_t *task;
int old_state;
pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock);
while (1) {
pthread_mutex_lock(&jobqueue->rwlock);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state);
pthread_testcancel();
GGG;
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
if (jobqueue->head == jobqueue->tail) {
task = jobqueue->tail;
jobqueue->head = jobqueue->tail = NULL;
} else {
threadtask_t *tmp;
for (tmp = jobqueue->head; tmp->next != jobqueue->tail;
tmp = tmp->next)
;
task = tmp->next;
tmp->next = NULL;
jobqueue->tail = tmp;
}
pthread_mutex_unlock(&jobqueue->rwlock);
...
}
```
`jobqueue_fetch` 是作為 thread 的內容。
而靠近 GGG 這一段的工作是把 `task` 從 `jobqueue` 裡取出來。
因為下面就要對 `jobqueue` 做操作的關係,因此這裡 GGG 要確保 `jobqueue` 非空,反過來說就是如果為空則等待:
`while (!jobqueue->tail) pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock)`
答案為 `(c)`
## HHH = ?
```c
/**
* Schedules the specific function to be executed.
* If successful, a future object representing the execution of
* the task is returned. Otherwise, it returns NULL.
*/
struct __tpool_future *tpool_apply(struct __threadpool *pool,
void *(*func)(void *),
void *arg)
{
jobqueue_t *jobqueue = pool->jobqueue;
threadtask_t *new_head = malloc(sizeof(threadtask_t));
struct __tpool_future *future = tpool_future_create();
if (new_head && future) {
new_head->func = func, new_head->arg = arg, new_head->future = future;
pthread_mutex_lock(&jobqueue->rwlock);
if (jobqueue->head) {
new_head->next = jobqueue->head;
jobqueue->head = new_head;
} else {
jobqueue->head = jobqueue->tail = new_head;
HHH;
}
pthread_mutex_unlock(&jobqueue->rwlock);
} else if (new_head) {
free(new_head);
return NULL;
} else if (future) {
tpool_future_destroy(future);
return NULL;
}
return future;
}
```
`tpool_apply` 的用處是把 `func` 包進 `thread_task_t *new_head` 並將 `new_head` 排進 `jobqueue` 裡面。
HHH 所在的地方 `jobqueue->head` 為 NULL ,代表此時 `jobqueue` 為空。
因此在 `new_head` 排進去之後:
`jobqueue->head = jobqueue->tail = new_head;`
要發消息讓在等待 `jobqueue->cond_nonempty` 的地方 (GGG) 可以繼續,因此這裡使用:
`pthread_cond_broadcast(&jobqueue->cond_nonempty)`
答案為 `(b)`
## KKK = ?, LLL = ?
```c
static void *jobqueue_fetch(void *queue)
{
...
if (task->func) {
pthread_mutex_lock(&task->future->mutex);
if (task->future->flag & __FUTURE_CANCELLED) {
pthread_mutex_unlock(&task->future->mutex);
free(task);
continue;
} else {
task->future->flag |= __FUTURE_RUNNING;
pthread_mutex_unlock(&task->future->mutex);
}
void *ret_value = task->func(task->arg);
pthread_mutex_lock(&task->future->mutex);
if (task->future->flag & __FUTURE_DESTROYED) {
pthread_mutex_unlock(&task->future->mutex);
pthread_mutex_destroy(&task->future->mutex);
pthread_cond_destroy(&task->future->cond_finished);
free(task->future);
} else {
task->future->flag |= KKK;
task->future->result = ret_value;
LLL;
pthread_mutex_unlock(&task->future->mutex);
}
free(task);
} else {
pthread_mutex_destroy(&task->future->mutex);
pthread_cond_destroy(&task->future->cond_finished);
free(task->future);
free(task);
break;
}
}
```
KKK 所在的情況是 `func` 的回傳值已經取得而且 `task->future` 不是在 `__FUTURE_DESTROYED` 的狀態,代表這個 `task->future` 已經完成,所以 KKK 應為 `__FUTURE_FINISHED`
答案為 `(b)`
LLL 所在位置是 KKK 下面第二行。這時已經將 `future->flag` 標成 FINISHED ,也已經得到結果。所以這裡應該發消息讓在等待 `cond_finished` 的地方 (`tpool_future_get`) 可以繼續,因此:
`pthread_cond_broadcast(&task->future->cond_finished)`
答案為 `(a)`
# 運作原理
## 型態與結構
### __future_flags
#### __FUTURE_RUNNING-00001
**設定時機:** `jobqueue_fetch` , `func` 執行前
#### __FUTURE_FINISHED-00010
**設定時機:** `jobqueue_fetch` , `func` 執行後
**使用處:**
`tpool_future_destroy`, 若 FINISHED 則釋放該 future
`tpool_future_get`, 若還沒 FINISHED 則等待
#### __FUTURE_TIMEOUT-00100
**設定時機:** `tpool_future_get`, 超時後
**取消時機:** `tpool_future_get`, 計時前
#### __FUTURE_CANCELLED-01010 (8 | __FUTURE_FINISHED)
**設定時機:** `jobqueue_destroy`, 當沒有設定 DESTROYED
**使用處:**
`tpool_future_destroy`, 若 CANCELLED 則釋放該 future
`jobqueue_fetch`, 若 CANCELLED 則不執行 `func` 並釋放 `task` 。
`tpool_future_get`, 若還沒 FINISHED 則等待愛
#### __FUTURE_DESTROYED-10100 (16 | __FUTURE_TIMEOUT)
**設定時機:** `tpool_future_destroy`, 當 future 不是 FINISHED 也不是 CANCELLED 。
**使用處:**
`jobqueue_destroy`, 若 DESTROYED 則釋放該該future
`jobqueue_fetch`, 若 DESTROYED 則釋放該 future
#### Freeing path (驗證中)
```graphviz
digraph {
future[shape=box, label="future\n00000"]
future->jobqueue_fetch
future->tpool_future_destroy
tpool_future_destroy->future2
future2[shape=box label="future\n10100"]
future1[shape=box label="future\n00011"]
jobqueue_fetch->future1
future1->tpool_future_destroy1
future2->jobqueue_destroy
jobqueue_destroy[label="jobqueue_destroy (freed)"]
future2->jobqueue_fetch1
jobqueue_fetch1[label="jobqueue_fetch (freed)"]
tpool_future_destroy1[label="tpool_future_destroy (freed)"]
}
```
### __threadtask (threadtask_t)
放在 jobqueue 裡面,每個 worker 每次執行一個 task 。
```c
typedef struct __threadtask {
void *(*func)(void *);
void *arg;
struct __tpool_future *future;
struct __threadtask *next;
} threadtask_t;
```
- `func`: 這個 task 要執行的函式
- `arg`: `func` 的參數
- `future`: 與這個 task 關聯的 future 物件
- `next`: 在 jobqueue 中下一個 task
`next` 方向由新到舊
```graphviz
digraph {
rankdir=LR
newer_task -> older_task [label="next"]
}
```
在程式中有一種特殊的 task ,其 `func` 為 NULL ,用來提示 worker 該停止了。
下面遇到這種 `func` 為 NULL 的 task 時,將直接以 **終止 task** 稱之。
### __jobqueue (jobqueue_t)
放著 `threadtask_t` 的 queue ,裡面的 task 將會被 worker 執行。
```c
typedef struct __jobqueue {
threadtask_t *head, *tail;
pthread_cond_t cond_nonempty;
pthread_mutex_t rwlock;
} jobqueue_t;
```
- `head`, `tail`: 管理這個 queue 的 head ( 最新 task ) 與 tail ( 最舊 task )
- `cond_nonempty`: 對應「這個 jobqueue 非空」的條件變數
- `rwlock`: 對 jobqueue 操作 ( 加入/取出 task ) 時的鎖
### __tpool_future
對應一個 task 的執行結果
```c
struct __tpool_future {
int flag;
void *result;
pthread_mutex_t mutex;
pthread_cond_t cond_finished;
};
```
- `flag`: 五種 __future_flags ,標示目前此 future 的狀況
- `result`: task 被執行完畢後的結果
- `mutex`: 對這個 future 做修改 (`flag` 或 `result`) 時的鎖
- `cond_finished`: 對應「這個 future 已被完成」的條件變數
### __threadpool
```c
struct __threadpool {
size_t count;
pthread_t *workers;
jobqueue_t *jobqueue;
};
```
- `count`: workers 數量
- `workers`: `pthread_t` 的陣列,這些 worker 將執行 `jobqueue` 中的 task
- `jobqueue`: 存放著 task 的 queue
## 運作
## Part A: initialization and task dispatch
### main: 1~9
```c
int main()
{
int bpp_args[PRECISION + 1];
double bpp_sum = 0;
tpool_t pool = tpool_create(4);
tpool_future_t futures[PRECISION + 1];
for (int i = 0; i <= PRECISION; i++) {
bpp_args[i] = i;
futures[i] = tpool_apply(pool, bpp, (void *) &bpp_args[i]);
}
...
}
```
`bpp_args` 是每個 `bpp` 函式的參數。
`bpp_sum` 是每個 `bpp` 函式回傳結果的相加。
`t_pool_create(4)` 會建立一個 `tpool_t` 並在裡面裡建立4個 worker 。
`futures` 放著每個 future 物件。索引跟 `bpp_args` 對應。
`for` 迴圈設定 `bpp_args` 跟 `futures` , `futures[i]` 由 `tpool_apply` 建立。
`tpool_apply` 會將傳進來的函式跟參數包裝成 `threadtask_t` 、將其放進 `pool` 的 `jobqueue` 裡並回傳相關的 future 。
### 使用函式
#### tpool_create
```c
struct __threadpool *tpool_create(size_t count)
{
jobqueue_t *jobqueue = jobqueue_create();
struct __threadpool *pool = malloc(sizeof(struct __threadpool));
if (!jobqueue || !pool) {
if (jobqueue)
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
pool->count = count, pool->jobqueue = jobqueue;
if ((pool->workers = malloc(count * sizeof(pthread_t)))) {
for (int i = 0; i < count; i++) {
if (pthread_create(&pool->workers[i], NULL, jobqueue_fetch,
(void *) jobqueue)) {
for (int j = 0; j < i; j++)
pthread_cancel(pool->workers[j]);
for (int j = 0; j < i; j++)
pthread_join(pool->workers[j], NULL);
free(pool->workers);
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
}
return pool;
}
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
```
前段部分(第一個空行前面)是建立 `jobqueue` 跟 `pool` ,如果失敗了就會清除然後回傳 NULL 。
中段部份是在使用 `pthread_create` 建立 `workers` 。每個 worker 都是一個 `pthread_t` 。 worker 要執行的是 `jobqueue_fetch` ,而 `jobqueue` 將被作為 `jobqueue_fetch` 的參數。
一旦有 worker 建立失敗(進入 `if` ),就會取消所有 worker ,並且清理完後回傳 NULL 。
後段部分(第二個空行之後)是 `pool->workers` 的 `malloc` 失敗的情況,會清理後回傳 NULL 。
`tpool_create` 中最重要的部分在 `pthread_create` ,是 **Part B 的啟動處**。
#### jobqueue_create
```c
static jobqueue_t *jobqueue_create(void)
{
jobqueue_t *jobqueue = malloc(sizeof(jobqueue_t));
if (jobqueue) {
jobqueue->head = jobqueue->tail = NULL;
pthread_cond_init(&jobqueue->cond_nonempty, NULL);
pthread_mutex_init(&jobqueue->rwlock, NULL);
}
return jobqueue;
}
```
這個函式比較單純,就是 `malloc` 出 `jobqueue` 的空間並設定好 `jobqueue` 內部的成員。
成功就回傳 `jobqueue` ,失敗則是 NULL 。
#### jobqueue_destroy
```c
static void jobqueue_destroy(jobqueue_t *jobqueue)
{
threadtask_t *tmp = jobqueue->head;
while (tmp) {
jobqueue->head = jobqueue->head->next;
pthread_mutex_lock(&tmp->future->mutex);
if (tmp->future->flag & __FUTURE_DESTROYED) {
pthread_mutex_unlock(&tmp->future->mutex);
pthread_mutex_destroy(&tmp->future->mutex);
pthread_cond_destroy(&tmp->future->cond_finished);
free(tmp->future);
} else {
tmp->future->flag |= __FUTURE_CANCELLED;
pthread_mutex_unlock(&tmp->future->mutex);
}
free(tmp);
tmp = jobqueue->head;
}
pthread_mutex_destroy(&jobqueue->rwlock);
pthread_cond_destroy(&jobqueue->cond_nonempty);
free(jobqueue);
}
```
`jobqueue_destroy` 的最後三行就是資源的釋放。
`while` 迴圈意圖將還在 `jobqueue` 裡面的 `threadtask_t` 清空,但是 while 這段程式不會執行到。
原因是 `jobqueue_destroy` 只在這些情況呼叫:
1. `tpool_create` 在 `malloc` 失敗或 `pthread_create` 失敗的情況
這時候還沒有任何的 `threadtask_t` 在 `jobqueue` 裡面。
2. `tpool_join`
`tpool_join` 是在最後結束的時候被呼叫,用來釋放資源。
```c
int tpool_join(struct __threadpool *pool)
{
size_t num_threads = pool->count;
for (int i = 0; i < num_threads; i++)
tpool_apply(pool, NULL, NULL);
for (int i = 0; i < num_threads; i++)
pthread_join(pool->workers[i], NULL);
free(pool->workers);
jobqueue_destroy(pool->jobqueue);
free(pool);
return 0;
}
```
因為 `jobqueue_dentroy` 在 `pthread_join` 之後呼叫的關係,而 `pool` 裡面的 `workers` 只會在收到 `func` 為 NULL 的 `threadtask_t` 後停止,所以執行到 `jobqueue_destroy` 時 `jobqueue` 裡面就已經沒有 `threadtask_t` 了。
#### tpool_apply
```c
struct __tpool_future *tpool_apply(struct __threadpool *pool,
void *(*func)(void *),
void *arg)
{
jobqueue_t *jobqueue = pool->jobqueue;
threadtask_t *new_head = malloc(sizeof(threadtask_t));
struct __tpool_future *future = tpool_future_create();
if (new_head && future) {
new_head->func = func, new_head->arg = arg, new_head->future = future;
pthread_mutex_lock(&jobqueue->rwlock);
if (jobqueue->head) {
new_head->next = jobqueue->head;
jobqueue->head = new_head;
} else {
jobqueue->head = jobqueue->tail = new_head;
pthread_cond_broadcast(&jobqueue->cond_nonempty);
}
pthread_mutex_unlock(&jobqueue->rwlock);
} else if (new_head) {
free(new_head);
return NULL;
} else if (future) {
tpool_future_destroy(future);
return NULL;
}
return future;
}
```
這個函式將 `func` 和 `arg` 包進 `new_head` 裡,將 `new_head` 放到 `jobqueue` 的頭部並回傳和 `new_head` 相關的 `future` 。
在 `new_head` 和 `future` 都建立成功的情況,會將 `new_head` 加在 `jobqueue` 的頭部,如果 `jobqueue` 原本是空的還會發訊息 `pthread_cond_broadcast(&jobqueue->cond_nonempty);` 讓在等的 worker 可以繼續。
如果 `new_head` 或者 `future` 失敗的話則會釋放資源,並回傳 NULL 。
中間 `if (jobqueue->head)` 跟他的 else 是 Part A 的重要目標:**將 task 放進 `jobqueue`** 。
#### tpool_future_create
```c
static struct __tpool_future *tpool_future_create(void)
{
struct __tpool_future *future = malloc(sizeof(struct __tpool_future));
if (future) {
future->flag = 0;
future->result = NULL;
pthread_mutex_init(&future->mutex, NULL);
pthread_condattr_t attr;
pthread_condattr_init(&attr);
pthread_cond_init(&future->cond_finished, &attr);
pthread_condattr_destroy(&attr);
}
return future;
}
```
比較單純的函式, `malloc` 出 `future` 並設定其成員。
成功就回傳 `future` ,失敗則是 NULL 。
#### tpool_future_destroy
```c
int tpool_future_destroy(struct __tpool_future *future)
{
if (future) {
pthread_mutex_lock(&future->mutex);
if (future->flag & __FUTURE_FINISHED ||
future->flag & __FUTURE_CANCELLED) {
pthread_mutex_unlock(&future->mutex);
pthread_mutex_destroy(&future->mutex);
pthread_cond_destroy(&future->cond_finished);
free(future);
} else {
future->flag |= __FUTURE_DESTROYED;
pthread_mutex_unlock(&future->mutex);
}
}
return 0;
}
```
作用是釋放 `future` ,但當 `future` 沒有 FINISHED 或者 CANCELLED 時(也就是沒有被 worker 處理過的話),只將 `flag` 標示成 `__FUTURE_DESTROYED` 。釋放將由之後的 `jobqueue_fetch` 或者 `jobqueue_destroy` 進行。
這次的程式中, else 的情況只發生在 `tpool_apply` 中 `malloc` 失敗時。
## Part B: fetch and execute tasks
Part A 將 task 放進 jobqueue 後,由 Part B 執行這些 task 。
### jobqueue_fetch
```c
static void *jobqueue_fetch(void *queue)
{
jobqueue_t *jobqueue = (jobqueue_t *) queue;
threadtask_t *task;
int old_state;
pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock);
while (1) {
pthread_mutex_lock(&jobqueue->rwlock);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state);
pthread_testcancel();
while (!jobqueue->tail)
pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
if (jobqueue->head == jobqueue->tail) {
task = jobqueue->tail;
jobqueue->head = jobqueue->tail = NULL;
} else {
threadtask_t *tmp;
for (tmp = jobqueue->head; tmp->next != jobqueue->tail;
tmp = tmp->next)
;
task = tmp->next;
tmp->next = NULL;
jobqueue->tail = tmp;
}
pthread_mutex_unlock(&jobqueue->rwlock);
if (task->func) {
pthread_mutex_lock(&task->future->mutex);
if (task->future->flag & __FUTURE_CANCELLED) {
pthread_mutex_unlock(&task->future->mutex);
free(task);
continue;
} else {
task->future->flag |= __FUTURE_RUNNING;
pthread_mutex_unlock(&task->future->mutex);
}
void *ret_value = task->func(task->arg);
pthread_mutex_lock(&task->future->mutex);
if (task->future->flag & __FUTURE_DESTROYED) {
pthread_mutex_unlock(&task->future->mutex);
pthread_mutex_destroy(&task->future->mutex);
pthread_cond_destroy(&task->future->cond_finished);
free(task->future);
} else {
task->future->flag |= __FUTURE_FINISHED;
task->future->result = ret_value;
pthread_cond_broadcast(&task->future->cond_finished);
pthread_mutex_unlock(&task->future->mutex);
}
free(task);
} else {
pthread_mutex_destroy(&task->future->mutex);
pthread_cond_destroy(&task->future->cond_finished);
free(task->future);
free(task);
break;
}
}
pthread_cleanup_pop(0);
pthread_exit(NULL);
}
```
中間 `while(1)` 是主要獲取和執行 task 的部分。
`while (!jobqueue->tail)` 適當 `jobqueue` 是空著的情況,會等待 `jobqueue->cond_nonempty` 。
接下來的 `if(jobqueue->head == jobqueue->tail){ ... } else { ... }` 取出最尾部 (最舊)的 task ,並維護 jobqueue 的成員 (head/tail) 。
再來 `if (task->func) { ... } else { .... }` 用來判斷是否為[終止 task](https://hackmd.io/dJpJXy59QAasqrRMmB7orQ?both#__threadtask-threadtask_t) 。
如果不是 ( if 判斷為真 ) 則會執行這個 task ,如果是終止 task 則在釋放這個 task 之後跳出迴圈結束 `jobqueue_fetch` 。
#### task 執行
因為裡面還有邏輯,所以再挑出來
```c
pthread_mutex_lock(&task->future->mutex);
if (task->future->flag & __FUTURE_CANCELLED) {
pthread_mutex_unlock(&task->future->mutex);
free(task);
continue;
} else {
task->future->flag |= __FUTURE_RUNNING;
pthread_mutex_unlock(&task->future->mutex);
}
void *ret_value = task->func(task->arg);
pthread_mutex_lock(&task->future->mutex);
if (task->future->flag & __FUTURE_DESTROYED) {
pthread_mutex_unlock(&task->future->mutex);
pthread_mutex_destroy(&task->future->mutex);
pthread_cond_destroy(&task->future->cond_finished);
free(task->future);
} else {
task->future->flag |= __FUTURE_FINISHED;
task->future->result = ret_value;
pthread_cond_broadcast(&task->future->cond_finished);
pthread_mutex_unlock(&task->future->mutex);
}
free(task);
```
如果 future 已經被標 CANCELLED ,則直接不執行 `func`。
如果標 DESTROYED ,則釋放 future 。
剩下的狀況就是要執行且要結果。所以執行完後標 FINISHED 、將結果放進 future 中並 broadcast `cond_finished` 。
這裡的 broadcast 銜接 Part C 。
## Part C: getting results and exit
這是最後一部分:拿到 Part B 中的執行結果以及退出程式。
### main: 11~20
```c
int main() {
...
for (int i = 0; i <= PRECISION; i++) {
double *result = tpool_future_get(futures[i], 0 /* blocking wait */);
bpp_sum += *result;
tpool_future_destroy(futures[i]);
free(result);
}
tpool_join(pool);
printf("PI calculated with %d terms: %.15f\n", PRECISION + 1, bpp_sum);
return 0;
}
```
使用 `tpool_future_get` 和設定秒數 0 來等待結果 `result` 。
`bpp_sum += *result` 是公式的一部分。
每次計算完之後將 `futures[i]` 和 `result` 釋放。
最後使用 `tpool_join` 來釋放 `pool` 跟他的成員,並印出計算結果。
### 使用函式
#### tpool_future_get
```c
void *tpool_future_get(struct __tpool_future *future, unsigned int seconds)
{
pthread_mutex_lock(&future->mutex);
/* turn off the timeout bit set previously */
future->flag &= ~__FUTURE_TIMEOUT;
while ((future->flag & __FUTURE_FINISHED) == 0) {
if (seconds) {
struct timespec expire_time;
clock_gettime(CLOCK_MONOTONIC, &expire_time);
expire_time.tv_sec += seconds;
int status = pthread_cond_timedwait(&future->cond_finished,
&future->mutex, &expire_time);
if (status == ETIMEDOUT) {
future->flag |= __FUTURE_TIMEOUT;
pthread_mutex_unlock(&future->mutex);
return NULL;
}
} else
pthread_cond_wait(&future->cond_finished, &future->mutex);
}
pthread_mutex_unlock(&future->mutex);
return future->result;
}
```
若 `second` 為 0 則等待結果。
反之在時間內 `future` 沒有完成則標示 TIMEOUT 並回傳 NULL 。
#### tpool_join
```c
int tpool_join(struct __threadpool *pool)
{
size_t num_threads = pool->count;
for (int i = 0; i < num_threads; i++)
tpool_apply(pool, NULL, NULL);
for (int i = 0; i < num_threads; i++)
pthread_join(pool->workers[i], NULL);
free(pool->workers);
jobqueue_destroy(pool->jobqueue);
free(pool);
return 0;
}
```
第一個 for 迴圈發送終止 task 。
第二個 for 迴圈等待所有 worker 終止(此時 jobqueue 也應清空,因為終止 task 在 jobqueue 最頭部)。
`jobqueue_destroy` 釋放 jobqueue,最後釋放 `pool` 。