# 2021q4 Homework4 (quiz4)
contributed by < `Nahemah1022` >
###### tags: `linux2021`
## POSIX Threads
Multithread 程式的執行流程如下,由 Master thread 將 tasks 分至各個 thread,分頭執行結束後 exit,再由 Master 將所有 thread join 回來
 =>  => 
### pthread 基本使用
- `int pthread_create(pthread_t *restrict thread,
const pthread_attr_t *restrict attr,
void *(*start_routine)(void *),
void *restrict arg);`:
- 建立並開啟 thread,由 `arg` 傳遞參數
- 若回傳值非零代表 thread 建立失敗
- `noreturn void pthread_exit(void *retval);`:
- 結束目前的 thread,並由 `retval` 回傳值
- `int pthread_join(pthread_t thread, void **retval);`:
- 等待 `thread` 執行結束,並從 `retval` 接收回傳值
### mutex lock
lock 某 mutex 的機至,用來確保不同 threads 之間的 mutual exclusion
:::info
mutex 是 mutual exclusion 的縮寫
:::
- ```c
int pthread_mutex_init(pthread_mutex_t *restrict mutex;
int pthread_mutex_destroy(pthread_mutex_t *mutex);
```
- 初始化與移除 `mutex`
- ```c
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
```
- lock 與 unlock `mutex`
### condition variable
讓 thread 在等待某件事情發生時能夠進到 waiting queue 等待,不要持續佔用 CPU 資源
- ```c
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);
int pthread_cond_destroy(pthread_cond_t *cond);
```
- 用來初始化與刪除 condition variable
- ```c
int pthread_cond_timedwait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex,
const struct timespec *restrict abstime);
int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);
```
- 將傳入的 `mutex` unlock,並將此 thread 放入 waiting queue 中等待 condition 發生
- 一旦發生便會重新將 `mutex` lock 後繼續向下執行
- ```c
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);
```
- 喚醒一個(signal)或多個(broadcast)正在等待此 condition variable 發生的 thread
### pthread 終止機制
當我們發現某個 thread 執行異常時,會希望能夠從 main thread 中直接將其終止,但我們無法確定該 thread 目前的執行進度,若終止時該 thread 尚未將他拿到的 mutex unlock 即會造成 deadlock
pthread cancelation 讓我們在 thread 開始前就設定好**若此 thread 被終止時,需要先完成哪些事情才能結束**,下方逐一解釋相關函式的用途:
:::info
- clean-up handler stack 是一個由 function pointer 組成的 stack,在 thread 被 cancel 、或呼叫 `pthread_exit` 時會被依序執行
- 若 thread 自行正常 return 時則不會執行 clean-up handler
:::
- `pthread_cleanup_push(void (*routine)(void *), void *arg)`:
- 在 clean-up routine stack 中新增一個 routine
- routine 被執行時會帶入參數 `arg`
- `pthread_cleanup_pop(int execute)`:
- 移除位於 clean-up routine stack 最上方的 routine
- 若參數 `execute` 非零,則執行現在 pop 的 routine
- `pthread_setcancelstate(int state, int *oldstate)`:
- 設定此 thread 的 cancelability(是否能夠被取消)
- state 為 `PTHREAD_CANCEL_ENABLE` 表示能夠被取消
- state 為 `PTHREAD_CANCEL_DISABLE` 表示不能被取消
- 若不能被取消的 thread 被要求取消時,thread 會被 block
- `pthread_setcanceltype(int state, int *oldstate)`:
- 設定此 thread 在被取消時的行為
- type 為 `PTHREAD_CANCEL_DEFERRED` 時,若 thread 接收到 cancel 時並不會馬上終止,會等到下一個 cancelation point 發生才會終止
- type 為 `PTHREAD_CANCEL_ASYNCHRONOUS` 時,接收到 cancel 後會立馬終止
## 程式解析
### 程式目的
透過平行運算 [Bailey–Borwein–Plouffe formula](https://en.wikipedia.org/wiki/Bailey%E2%80%93Borwein%E2%80%93Plouffe_formula) 中的多項,來加快算出圓周率
### Thread pool 使用流程
1. 程式需要從 `tpool_create` 開始建立 thread pool,需傳入欲建立的 worker 數量
- 此時會初始化 thread pool 中的 `jobqueue`
- 為 thread pool 中的每一個 worker 建立其對應的 pthread,每一個 worker 皆執行 `jobqueue_fetch` 任務
2. 透過 `tpool_apply` 建立一個新的 `threadtask_t` 結構,以 linked list 的形式推入 jobqueue 中等待被執行
- 若在 task 被推入前 jobqueue 為空,則會 broadcast condition variable `cond_nonempty`,使 `jobqueue_fetch` 開始運作
- 一旦有新的 task 被推入 jobqueue 中,閒置中的 worker 便會從 `jobqueue_fetch` 取得並執行該 task
- 執行完畢後,結果會被存放在 task 對應的 `tpool_future` 結構中
3. 用 `tpool_future_get` 來取得 thread 執行的結果
- 其中有用 `pthread_cond_wait` 來確保取得時 thread 必定已執行完畢
- 在 task 完全執行完畢後才會 `pthread_cond_signal` 送出執行完畢的訊號
4. 最後用 `tpool_join` 將所有 pthread worker join 回 master thread
- 其中會順便將 jobqueue 與 workers destroy
### Thread pool 各部分實作細節
#### `tpool_create(size_t count)`
```c=
/**
* Create a thread pool containing specified number of threads.
* If successful, the thread pool is returned. Otherwise, it
* returns NULL.
*/
struct __threadpool *tpool_create(size_t count)
{
jobqueue_t *jobqueue = jobqueue_create();
struct __threadpool *pool = malloc(sizeof(struct __threadpool));
if (!jobqueue || !pool) {
if (jobqueue)
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
pool->count = count, pool->jobqueue = jobqueue;
if ((pool->workers = malloc(count * sizeof(pthread_t)))) {
for (int i = 0; i < count; i++) {
if (pthread_create(&pool->workers[i], NULL, jobqueue_fetch,
(void *) jobqueue)) {
// if failing to create pthread, cancel and clean-up
for (int j = 0; j < i; j++)
pthread_cancel(pool->workers[j]);
for (int j = 0; j < i; j++)
pthread_join(pool->workers[j], NULL);
free(pool->workers);
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
}
return pool;
}
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
```
- 取得整個 thread pool 所需的記憶體空間後將,並將 jobqueue create 出來後 assign 給 thread pool
- 取得容納 `count` 個 `pthread_t` 的記憶體空間給每一個 workers 後,將 pthread create 出來,指定 thread 要執行的 function 為 `jobqueue_fetch()`
- 若有任一 pthread create 失敗,則將所有 worker thread 取消並 return `NULL`
#### `tpool_apply(tpool_t pool, void *(*func)(void *), void *arg)`
```c=
/**
* Schedules the specific function to be executed.
* If successful, a future object representing the execution of
* the task is returned. Otherwise, it returns NULL.
*/
struct __tpool_future *tpool_apply(struct __threadpool *pool,
void *(*func)(void *),
void *arg)
{
jobqueue_t *jobqueue = pool->jobqueue;
threadtask_t *new_head = malloc(sizeof(threadtask_t));
struct __tpool_future *future = tpool_future_create();
if (new_head && future) {
new_head->func = func, new_head->arg = arg, new_head->future = future;
pthread_mutex_lock(&jobqueue->rwlock);
if (jobqueue->head) {
new_head->next = jobqueue->head;
jobqueue->head = new_head;
} else {
jobqueue->head = jobqueue->tail = new_head;
pthread_cond_broadcast(&jobqueue->cond_nonempty);
}
pthread_mutex_unlock(&jobqueue->rwlock);
} else if (new_head) {
free(new_head);
return NULL;
} else if (future) {
tpool_future_destroy(future);
return NULL;
}
return future;
}
```
- 新增一個 `threadtask_t` 變數的空間,push 至 thread pool 的 `jobqueue` 中等待被 fetch
- 新增一個 `__tpool_future` 的空間,用來存放此 threadtask 的執行結果
- 若 `jobqueue` 原本空,則使用 `pthread_cond_broadcast(&jobqueue->cond_nonempty)` 喚醒所有正在等待 fetch job 的 worker
#### `tpool_join(tpool_t pool)`
```c=
/**
* Wait for all pending tasks to complete before destroying the thread pool.
*/
int tpool_join(struct __threadpool *pool)
{
size_t num_threads = pool->count;
for (int i = 0; i < num_threads; i++)
tpool_apply(pool, NULL, NULL);
for (int i = 0; i < num_threads; i++)
pthread_join(pool->workers[i], NULL);
free(pool->workers);
jobqueue_destroy(pool->jobqueue);
free(pool);
return 0;
}
```
- apply `count` 個 `NULL` function 到 thread pool 中,確保每一個 worker 有執行到 threadtask 後停留在執行完畢的狀態
- 將所有 worker join 回到 master thread
- 將 jobqueue 以及 thread pool 本身使用的記憶體空間釋放
#### `tpool_future_get(tpool_future_t future, unsigned int seconds)`
```c=
/**
* Return the result when it becomes available.
* If @seconds is non-zero and the result does not arrive within specified time,
* NULL is returned. Each tpool_future_get() resets the timeout status on
* @future.
*/
void *tpool_future_get(struct __tpool_future *future, unsigned int seconds)
{
pthread_mutex_lock(&future->mutex);
/* turn off the timeout bit set previously */
future->flag &= ~__FUTURE_TIMEOUT;
while ((future->flag & __FUTURE_FINISHED) == 0) {
if (seconds) {
struct timespec expire_time;
clock_gettime(CLOCK_MONOTONIC, &expire_time);
expire_time.tv_sec += seconds;
int status = pthread_cond_timedwait(&future->cond_finished,
&future->mutex, &expire_time);
if (status == ETIMEDOUT) {
future->flag |= __FUTURE_TIMEOUT;
pthread_mutex_unlock(&future->mutex);
return NULL;
}
} else
pthread_cond_wait(&future->cond_finished, &future->mutex);
}
pthread_mutex_unlock(&future->mutex);
return future->result;
}
```
- 若欲取得結果的 threadtask 尚未執行完畢,則使用 `pthread_cond_wait(&future->cond_finished, &future->mutex)` 等待執行完畢
- 若有設定等待時間 `second`,則使用 `pthread_cond_timedwait(&future->cond_finished, &future->mutex, &expire_time)` 等待
- 若時間到後任務尚未執行完畢,則將 future flag 中的 `__FUTURE_TIMEOUT` bit 設起
#### `tpool_future_destroy(tpool_future_t future)`
```c=
/**
* Destroy the future object and free resources once it is no longer used.
* It is an error to refer to a destroyed future object. Note that destroying
* a future object does not prevent a pending task from being executed.
*/
int tpool_future_destroy(struct __tpool_future *future)
{
if (future) {
pthread_mutex_lock(&future->mutex);
if (future->flag & __FUTURE_FINISHED ||
future->flag & __FUTURE_CANCELLED) {
pthread_mutex_unlock(&future->mutex);
pthread_mutex_destroy(&future->mutex);
pthread_cond_destroy(&future->cond_finished);
free(future);
} else {
future->flag |= __FUTURE_DESTROYED;
pthread_mutex_unlock(&future->mutex);
}
}
return 0;
}
```
- 將 mutex unlock,釋放記憶體空間,將 future flag 的 `__FUTURE_DESTROYED` bit 設起
#### `jobqueue_fetch(void *queue)`
```c=
static void *jobqueue_fetch(void *queue)
{
jobqueue_t *jobqueue = (jobqueue_t *) queue;
threadtask_t *task;
int old_state;
pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock);
while (1) {
pthread_mutex_lock(&jobqueue->rwlock);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state);
pthread_testcancel();
while (!jobqueue->tail)
pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
if (jobqueue->head == jobqueue->tail) {
task = jobqueue->tail;
jobqueue->head = jobqueue->tail = NULL;
} else {
threadtask_t *tmp;
for (tmp = jobqueue->head; tmp->next != jobqueue->tail;
tmp = tmp->next)
;
task = tmp->next;
tmp->next = NULL;
jobqueue->tail = tmp;
}
pthread_mutex_unlock(&jobqueue->rwlock);
if (task->func) {
pthread_mutex_lock(&task->future->mutex);
if (task->future->flag & __FUTURE_CANCELLED) {
pthread_mutex_unlock(&task->future->mutex);
free(task);
continue;
} else {
task->future->flag |= __FUTURE_RUNNING;
pthread_mutex_unlock(&task->future->mutex);
}
void *ret_value = task->func(task->arg);
pthread_mutex_lock(&task->future->mutex);
if (task->future->flag & __FUTURE_DESTROYED) {
pthread_mutex_unlock(&task->future->mutex);
pthread_mutex_destroy(&task->future->mutex);
pthread_cond_destroy(&task->future->cond_finished);
free(task->future);
} else {
task->future->flag |= __FUTURE_FINISHED;
task->future->result = ret_value;
pthread_cond_broadcast(&task->future->cond_finished);
pthread_mutex_unlock(&task->future->mutex);
}
free(task);
} else {
pthread_mutex_destroy(&task->future->mutex);
pthread_cond_destroy(&task->future->cond_finished);
free(task->future);
free(task);
break;
}
}
pthread_cleanup_pop(0);
pthread_exit(NULL);
}
```
- 設定此 thread 的 clean-up function 為 `__jobqueue_fetch_cleanup`
- clean-up function 的內容是將 jobqueue 的 mutex lock 釋放
- 使用 `pthread_setcancelstate` 讓此 thread 可以被外部 cancel
- 使用 `pthread_testcancel` 設定一個 cancelation point
- 當外部 cancel 此 thread 時,可以確保此 thread 在這個 cancelation point 以後才被終止
- 從 jobqueue 的尾端 pop 出一個 task 並執行
- 執行完畢後將 return value 存至 `task->future` 中
- 用 `pthread_cond_broadcast` 喚醒在等待此 task 執行並回傳的 thread
---
## [atomic_threadpool](https://github.com/Taymindis/atomic_threadpool) 的操作手法
---