# 2021q1 Homework4 (quiz4)
contributed by < `tiffany6022` >
###### tags: `linux2021`
> [作業說明](https://hackmd.io/@sysprog/linux2021-quiz4)
- [x] 解釋上述程式碼運作原理,包含 timeout 處理機制,指出改進空間並實作
- [ ] 研讀 atomic_threadpool,指出其 atomic 操作的運用方式,並說明該 lock-free 的手法
- [ ] 嘗試使用 C11 Atomics 改寫上述程式碼,使其有更好的 scalability
## 解釋程式運作原理
### 程式結構
```cpp
struct __threadpool { // (tpool_t)
size_t count; // 紀錄 thread 數量
pthread_t *workers; // 指向 thread 的地址
jobqueue_t *jobqueue; // 裝 job 的 queue (呈下)
};
typedef struct __jobqueue {
threadtask_t *head, *tail; // 頭尾的 task 地址 (呈下)
pthread_cond_t cond_nonempty; // 當 jobqueue 是不空的時會發通知 (signal、broadcast)
pthread_mutex_t rwlock; // 關於 jobqueue 的 lock
} jobqueue_t;
typedef struct __threadtask {
void *(*func)(void *); // function pointer 指向當前 thread 要做的 task
void *arg; // 上述 function 帶入的參數
struct __tpool_future *future; // 指向跟此 task 相關的資訊 (呈下)
struct __threadtask *next; // 指向當前 task 在 jobqueue 中的下一個 task
} threadtask_t;
struct __tpool_future { // (tpool_future_t)
int flag; // 存此 task 的狀態
void *result; // 存此 task 的 function 執行完後的結果
pthread_mutex_t mutex; // 關於 __tpool_future 的 lock
pthread_cond_t cond_finished; // 當 task 的 function 執行完後發通知 (signal、broadcast)
};
```
### 程式函式
#### tpool
* create (創建 thread pool,成功 return pool,失敗 return NULL)
```cpp
struct __threadpool *tpool_create(size_t count)
{
/* 創建 jobqueue 並建立 __threadpool 的空間
* 若任一建立失敗,destroy 並釋放資源
*/
jobqueue_t *jobqueue = jobqueue_create();
struct __threadpool *pool = malloc(sizeof(struct __threadpool));
if (!jobqueue || !pool) {
if (jobqueue)
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
/* 創建成功後,給定 pool->count 和 pool->jobqueue
* 依據 count 數,建立對應的空間給 pool->workers
* workers 創建成功後,創建 threads 並分配每個任務給 threads
* 任務為 `jobqueue_fetch` 函式,所帶參數為 `(void *) jobqueue`
*/
pool->count = count, pool->jobqueue = jobqueue;
if ((pool->workers = malloc(count * sizeof(pthread_t)))) {
for (int i = 0; i < count; i++) {
if (pthread_create(&pool->workers[i], NULL, jobqueue_fetch,
(void *) jobqueue)) {
/*
* 若當中有 pthread_create 失敗的 thread,return 值不為 0
* 先將之前創建的 threads 都提交 cancellation request
* pthread_join 可當一個取消點
* 因此可以取消 threads 並將資源釋放掉
*/
for (int j = 0; j < i; j++)
pthread_cancel(pool->workers[j]);
for (int j = 0; j < i; j++)
pthread_join(pool->workers[j], NULL);
free(pool->workers);
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
}
return pool;
}
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
```
* apply (將 function 加入 jobqueue 的 task)
```cpp
struct __tpool_future *tpool_apply(struct __threadpool *pool,
void *(*func)(void *),
void *arg)
{
/* 建立 threadtask_t 的空間和創建跟存 task 資訊的 __tpool_future
* 若不成功,destroy 並釋放對應資源
* 若成功,將前面建的 threadtask_t (new_head) 給定 func、arg、future
* 並將 new_head 放入 jobqueue 的 head 位置
* 注意操作 jobqueue 的 head tail 需要用 jobqueue->rwlock 保護
*/
jobqueue_t *jobqueue = pool->jobqueue;
threadtask_t *new_head = malloc(sizeof(threadtask_t));
struct __tpool_future *future = tpool_future_create();
if (new_head && future) {
new_head->func = func, new_head->arg = arg, new_head->future = future;
pthread_mutex_lock(&jobqueue->rwlock);
if (jobqueue->head) {
new_head->next = jobqueue->head;
jobqueue->head = new_head;
} else {
/*
* 若 jobqueue 本來為空的,將用 CV(jobqueue->cond_nonempty) 發通知
*/
jobqueue->head = jobqueue->tail = new_head;
pthread_cond_broadcast(&jobqueue->cond_nonempty); // HHH
}
pthread_mutex_unlock(&jobqueue->rwlock);
} else if (new_head) {
free(new_head);
return NULL;
} else if (future) {
tpool_future_destroy(future);
return NULL;
}
return future;
}
```
* join (等 thread pool 中的 threads 都完成 join 回來,將資源釋放掉)
```cpp
int tpool_join(struct __threadpool *pool)
{
/* 對每個 threads 依次將 func 和 arg 為 NULL 的 task 放入 jobqueue 中
* 執行 join 等待每個 threads 結束並釋放資源
*/
size_t num_threads = pool->count;
for (int i = 0; i < num_threads; i++)
tpool_apply(pool, NULL, NULL);
for (int i = 0; i < num_threads; i++)
pthread_join(pool->workers[i], NULL);
free(pool->workers);
jobqueue_destroy(pool->jobqueue);
free(pool);
return 0;
}
```
#### jobqueue
* create (創建 jobqueue)
```cpp
static jobqueue_t *jobqueue_create(void)
{
/* 初始化所有值
*/
jobqueue_t *jobqueue = malloc(sizeof(jobqueue_t));
if (jobqueue) {
jobqueue->head = jobqueue->tail = NULL;
pthread_cond_init(&jobqueue->cond_nonempty, NULL);
pthread_mutex_init(&jobqueue->rwlock, NULL);
}
return jobqueue;
}
```
* destroy (將 jobqueue destroy)
```cpp
static void jobqueue_destroy(jobqueue_t *jobqueue)
{
/* 從 jobqueue 的 head 開始依次 destroy
* 當要讀取 task 中的 future 的 flag 時,需要 lock 保護
*/
threadtask_t *tmp = jobqueue->head;
while (tmp) {
jobqueue->head = jobqueue->head->next;
pthread_mutex_lock(&tmp->future->mutex);
/*
* 若 flag 為 __FUTURE_DESTROYED(0001 0100) 或 __FUTURE_TIMEOUT(0000 0100)
* 需要 destroy future 的 mutex 和 CV 並釋放 future 資源
*/
if (tmp->future->flag & __FUTURE_DESTROYED) {
pthread_mutex_unlock(&tmp->future->mutex);
pthread_mutex_destroy(&tmp->future->mutex);
pthread_cond_destroy(&tmp->future->cond_finished);
free(tmp->future);
} else {
/*
* 反之,將 flag 設為 __FUTURE_CANCELLED (0000 1010)
*/
tmp->future->flag |= __FUTURE_CANCELLED;
pthread_mutex_unlock(&tmp->future->mutex);
}
free(tmp);
tmp = jobqueue->head;
}
/* 若 jobqueue 為空的,只須分別把 lock 和 condition variable 毀滅
* 再釋放 jobqueue 的資源
*/
pthread_mutex_destroy(&jobqueue->rwlock);
pthread_cond_destroy(&jobqueue->cond_nonempty);
free(jobqueue);
}
```
* fetch (每個 threads 執行的 function,執行 jobqueue 中的每個 tasks)
```cpp
static void __jobqueue_fetch_cleanup(void *arg)
{
/* pthread_cleanup_push 中執行的 function
* 為了避免結束 thread 時 lock 可能還是鎖的,要解鎖
*/
pthread_mutex_t *mutex = (pthread_mutex_t *) arg;
pthread_mutex_unlock(mutex);
}
static void *jobqueue_fetch(void *queue)
{
jobqueue_t *jobqueue = (jobqueue_t *) queue;
threadtask_t *task;
int old_state;
pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock);
while (1) {
pthread_mutex_lock(&jobqueue->rwlock);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state);
pthread_testcancel();
/* 若 jobqueue 為空的,等待當 jobqueue 不為空的時 cond_nonempty 的通知
* 不為空時,拿出 jobqueue 中的 tail 存給 `task`
*/
while (!jobqueue->tail) pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock); // GGG
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
if (jobqueue->head == jobqueue->tail) {
task = jobqueue->tail;
jobqueue->head = jobqueue->tail = NULL;
} else {
threadtask_t *tmp;
for (tmp = jobqueue->head; tmp->next != jobqueue->tail;
tmp = tmp->next)
;
task = tmp->next;
tmp->next = NULL;
jobqueue->tail = tmp;
}
pthread_mutex_unlock(&jobqueue->rwlock);
/* 當 task->func 不為空時,執行 task->func(task->arg)
* 結果寫入 task->future->result
*/
if (task->func) {
pthread_mutex_lock(&task->future->mutex);
/*
* 若 flag 為 __FUTURE_CANCELLED (0000 1010) 或 __FUTURE_FINISHED (0000 0010)
* 釋放 task 資源並直接 continue 重新找下一個 task
* 注意操作 flag 時需要 future->mutex 保護
*/
if (task->future->flag & __FUTURE_CANCELLED) {
pthread_mutex_unlock(&task->future->mutex);
free(task);
continue;
} else {
/*
* 反之,將 flag 設為 __FUTURE_RUNNING (0000 0001)
*/
task->future->flag |= __FUTURE_RUNNING;
pthread_mutex_unlock(&task->future->mutex);
}
void *ret_value = task->func(task->arg);
pthread_mutex_lock(&task->future->mutex);
/*
* 若 flag 為 __FUTURE_DESTROYED(0001 0100) 或 __FUTURE_TIMEOUT(0000 0100)
* 需要 destroy future 的 mutex 和 CV 並釋放 future 資源
*/
if (task->future->flag & __FUTURE_DESTROYED) {
pthread_mutex_unlock(&task->future->mutex);
pthread_mutex_destroy(&task->future->mutex);
pthread_cond_destroy(&task->future->cond_finished);
free(task->future);
} else {
/*
* 反之,將 flag 設為 __FUTURE_FINISHED
* 並將結果存給 future->result
* 用 cond_finished 發通知給 tpool_future_get function
* 通知可以返回結果值
*/
task->future->flag |= __FUTURE_FINISHED; // KKK
task->future->result = ret_value;
pthread_cond_broadcast(&task->future->cond_finished); // LLL
pthread_mutex_unlock(&task->future->mutex);
}
free(task);
} else {
/*
* 若 task->func 為空時
* 先後釋放 future 和 task 資源
* 並結束無窮迴圈
*/
pthread_mutex_destroy(&task->future->mutex);
pthread_cond_destroy(&task->future->cond_finished);
free(task->future);
free(task);
break;
}
}
pthread_cleanup_pop(0); // 不執行之前 push 的清除函式
pthread_exit(NULL); // pthread 終止
}
```
#### tpool_future
* create (創建 tpool_future)
```cpp
static struct __tpool_future *tpool_future_create(void)
{
/* 初始化所有值
*/
struct __tpool_future *future = malloc(sizeof(struct __tpool_future));
if (future) {
future->flag = 0;
future->result = NULL;
pthread_mutex_init(&future->mutex, NULL);
pthread_condattr_t attr;
pthread_condattr_init(&attr);
pthread_cond_init(&future->cond_finished, &attr);
pthread_condattr_destroy(&attr);
}
return future;
}
```
* destroy
```cpp
int tpool_future_destroy(struct __tpool_future *future)
{
if (future) {
pthread_mutex_lock(&future->mutex);
/*
* 若 flag 為 __FUTURE_CANCELLED (0000 1010) 或 __FUTURE_FINISHED (0000 0010)
* 需要 destroy future 的 mutex 和 CV 並釋放 future 資源
* (可以確定 future 是完成或取消的狀態)
*/
if (future->flag & __FUTURE_FINISHED ||
future->flag & __FUTURE_CANCELLED) {
pthread_mutex_unlock(&future->mutex);
pthread_mutex_destroy(&future->mutex);
pthread_cond_destroy(&future->cond_finished);
free(future);
} else {
/*
* 反之,flag 設為 __FUTURE_DESTROYED
* 當執行到 jobqueue_destroy 時,會再釋放 future 資源
*/
future->flag |= __FUTURE_DESTROYED;
pthread_mutex_unlock(&future->mutex);
}
}
return 0;
}
```
* get (取得 result 和實現 timeout 功能)
```cpp
void *tpool_future_get(struct __tpool_future *future, unsigned int seconds)
{
pthread_mutex_lock(&future->mutex);
/* turn off the timeout bit set previously */
future->flag &= ~__FUTURE_TIMEOUT;
while ((future->flag & __FUTURE_FINISHED) == 0) {
if (seconds) {
/*
* 若 seconds 不為 0 (使用者有限定執行時間)
* 利用 timespec 計算時間,將現在時間加上限定的執行時間
* 再透過 pthread_cond_timedwait,若時間到或被通知都會被喚醒
* 但若是時間到被喚醒的,return 值為 ETIMEDOUT
*/
struct timespec expire_time;
clock_gettime(CLOCK_MONOTONIC, &expire_time);
expire_time.tv_sec += seconds;
int status = pthread_cond_timedwait(&future->cond_finished,
&future->mutex, &expire_time);
if (status == ETIMEDOUT) {
/*
* 時間到 task 還沒做完
* 將 flag 設為 __FUTURE_TIMEOUT
* 結果回傳 NULL
*/
future->flag |= __FUTURE_TIMEOUT;
pthread_mutex_unlock(&future->mutex);
return NULL;
}
} else
/*
* 若 seconds 為 0 (使用者未限定執行時間)
* 等 cond_finished 通知可以繼續回傳 future->result
*/
pthread_cond_wait(&future->cond_finished, &future->mutex); // FFF
}
pthread_mutex_unlock(&future->mutex);
return future->result;
}
```
#### 關於 pthread 取消
* **pthread_cancel** (pthread_t thread)
* 提交一個 cancellation request 給 thread
* 會不會取消或取消時機根據 cancellation 的 **state** 和 **type** 有所不同
* 取消成功 return 0
* **pthread_setcancelstate** (int state, int \*oldstate)
* state 分為以下兩種:
* **PTHREAD_CANCEL_ENABLE** $\rightarrow$ 設為取消狀態
* **PTHREAD_CANCEL_DISABLE** $\rightarrow$ 設為不取消狀態
* **pthread_setcanceltype** (int type, int \*oldtype)
* type 分為以下兩種:
* **PTHREAD_CANCEL_DEFERRED** $\rightarrow$ 運行制下一個取消點再執行取消動作
* **PTHREAD_CANCEL_ASYNCHRONOUS** $\rightarrow$ 立即執行取消動作
* 僅當 cancellation 的 state 為 ENABLE 時有效:exclamation:
* **pthread_testcancel** (void)
* 創建一個取消點
* 僅當 cancellation 的 state 為 ENABLE 和 type 為 DEFFERED 時有效 :exclamation:
#### 關於 pthread 終止
不論是正常或是非正常終止都有資源釋放的問題 (終止時 lock 可能還是鎖的)
* 正常 ex: pthread_exit()、return
* 非正常 ex: pthread_cancel()、出錯而退出等等
所以當 thread 終止時 clean-up handler 會自動執行 stack 中的清理函式
* **pthread_cleanup_push** (void (*routine)(void *), void *arg)
* 將清理函式 routine 放入 stack 中
* arg 為清理函式所帶的參數
* **pthread_cleanup_pop** (int execute)
* pop 並執行 stack 最上方的清理函式
* 只有當 execute 為非 0 時有效 :exclamation:
因此要執行清理函式需要滿足以下條件:
1. a thread is canceled
2. a thread terminates by calling pthread_exit()
3. pthread_cleanup_pop() with a nonzero execute argument
也就是終止動作包括 pthread_exit() 和非正常終止,但不包括 return,且 execute 為非 0,才會執行 stack 中的清理函式
#### 關於 future 中的 flag
```cpp
enum __future_flags {
__FUTURE_RUNNING = 01, // 0000 0001
__FUTURE_FINISHED = 02, // 0000 0010
__FUTURE_TIMEOUT = 04, // 0000 0100
__FUTURE_CANCELLED = 010, // 0000 1010
__FUTURE_DESTROYED = 020, // 0001 0100
};
```
利用 flag 來判斷狀態
* `__FUTURE_FINISHED & __FUTURE_CANCELLED` 結果為 1
* `__FUTURE_TIMEOUT & __FUTURE_DESTROYED` 結果為 1