---
tags: Linux
---
# 2021q1 Homework4 (quiz4)
contributed by < `Chialiang86` >
## 作業要求
- [x] 解釋程式碼運作原理,包含 timeout 處理機制。
- [ ] 指出改進空間並實作
- [ ] 研讀 [atomic_threadpool](https://github.com/Taymindis/atomic_threadpool),指出其 atomic 操作,並說明 lock-free
- [ ] 嘗試使用 C11 Atomics 改寫,使其有更好的 scalability
## 程式碼運作原理
### data structure
- 由上到下看資料型態
#### **1. struct __threadpool**
- 為最上層的資料結構
- `count` 為 pthread 的數量,在建立 thread pool 時提供
- `workers` 為 pthread 的陣列,總共包含 `count` 個 threads
- `jobqueue` 為一個單向 linked list ,為所有 task 存放的容器
```c=
typedef struct __threadpool *tpool_t;
struct __threadpool {
size_t count;
pthread_t *workers;
jobqueue_t *jobqueue;
};
```
#### **2. jobqueue_t**
- 為 task 所在的 queue ,採用單向 linked list 來實作
- `head`, `tail` 為 job queue 的首跟尾、方便插入及取出 task 時以 O(1) 的時間複雜度操作
- `cond_nonempty` 為 condition variable 、在 job queue 為空時,藉由呼叫 `pthread_cond_wait` 來等待 task 插入 job queue 中成為非空 ; 反之當 job queue 變為非空,藉由呼叫 `pthread_cond_broadcast` 來提醒所有 thread job queue 中已有待執行的 task
- `rwlock` 為保護 job queue 在多執行序環境下執行 task 的插入或取出動作時,可以藉由 lock, unlock 來解決同步問題,使 job queue 的操作為 atomic
```c=
typedef struct __jobqueue {
threadtask_t *head, *tail;
pthread_cond_t cond_nonempty;
pthread_mutex_t rwlock;
} jobqueue_t;
```
#### **3. threadtask_t**
- 包裝所有 task 相關的資訊
- `func` 為待執行 task 的 function pointer ,可見其參數及回傳值都為 `void *`
- `arg` 為 task 存放參數的 pointer ,雖然看似 `arg` 只能存一個 argument ,但實際上若有多個 arguments 要存放可將其包裝成一個 struct ,再將該 struct 的 pointer 轉型成 `void *` 讓 arg 接收
- `future` 主要為存放 task 執行的結果、同時包含 task 執行的狀態、 return value 的同步保護機制和提供 condition variable 幫助其他 thread 去依照不同的狀態進行對應的操作(在 return value 尚未被算出來時等待執行, return value 被算完則提醒其他執行緒可以獲取運算結果)
```c=
typedef struct __threadtask {
void *(*func)(void *);
void *arg;
struct __tpool_future *future;
struct __threadtask *next;
} threadtask_t;
```
#### **4. struct __tpool_future**
- `flag` 表示 task 的執行狀態,方便其他執行緒依照狀態做對應動作
```c=
enum __future_flags {
__FUTURE_RUNNING = 01, // 正在運算中
__FUTURE_FINISHED = 02, // 已將結果運算出來
__FUTURE_TIMEOUT = 04, // 超時
__FUTURE_CANCELLED = 010, // thread 被取消
__FUTURE_DESTROYED = 020, // thread 被釋放
};
```
- `result` 存放 task 回傳值(此範例將 result 轉型成為 double 來接收圓周率運算的值)
- `mutex` 控制對此結構的同步存取
- `cond_finished` 提供其他執行緒回傳值目前的狀態、依照此狀態來進行等待或繼續執行
```c=
typedef struct __tpool_future *tpool_future_t
struct __tpool_future {
int flag;
void *result;
pthread_mutex_t mutex;
pthread_cond_t cond_finished;
};
```
#### **總結**
- 圖形化整個資料結構的概況
```graphviz
digraph tpool_future_t {
node [shape=record];
rankdir=LR;
__threadpool [label="<f0> count|<f1> workers|<f2> jobqueue"];
__threadtask [label="<f0> func|<f1> arg|<f2> future|<f3> next"];
__threadtask1 [label="<f0> func|<f1> arg|<f2> future|<f3> next"];
__jibqueue [label="<f0> head|<f1> tail|<f2> cond_noempty|<f3> rwlock"];
tpool_future_t [label="<f0> flag|<f1> result|<f2> mutex|<f3> cond_finished"]
more1[label="..."]
more2[label="..."]
more3[label="..."]
tpool_t->__threadpool;
__jibqueue:f0->__threadtask
__jibqueue:f1->__threadtask1:f3
subgraph queue{
name [shape=plaintext, fontsize="36" label="jobqueue"]
name -> more3[color=none]
__threadtask:f3->more1
more1->more2
more2->more3
more3->__threadtask1:f2
graph[style=dotted];
}
__threadtask1:f2->tpool_future_t:f0
__threadpool:f2->__jibqueue
}
```
### interface
- 從 main 中觀察使用到的 thread pool 的 interface
- 先呼叫 `tpool_create` 來建立 thread pool 並初始化指定數目的執行緒
- 再進入 `tpool_apply` 來進行 task 的指派、將 task 加入 job queue 中,並同時初始化回傳值 `future` 陣列,包含動態配置 `truct __tpool_future` 記憶體空間以及 condition variable 的初始化
- `pool_future_get` 等待並獲取運算回傳的結果,運算完成則呼叫 `tpool_future_destroy` 來將 `truct __tpool_future` 記憶體空間釋放,最後用 `tpool_join` 來將 thread pool 進行完整的記憶體釋放。
```c=
int main()
{
int bpp_args[PRECISION + 1];
double bpp_sum = 0;
tpool_t pool = tpool_create(4);
tpool_future_t futures[PRECISION + 1];
for (int i = 0; i <= PRECISION; i++) {
bpp_args[i] = i;
futures[i] = tpool_apply(pool, bpp, (void *) &bpp_args[i]);
}
for (int i = 0; i <= PRECISION; i++) {
double *result = tpool_future_get(futures[i], 0 /* blocking wait */);
bpp_sum += *result;
tpool_future_destroy(futures[i]);
free(result);
}
tpool_join(pool);
printf("PI calculated with %d terms: %.15f\n", PRECISION + 1, bpp_sum);
return 0;
}
```
#### **1. tpool_create**
- 掌管 thread 的創建,共會產生新的 `count` 個 pthread ,`pthread_create(&pool->workers[i], NULL, jobqueue_fetch, (void *) jobqueue)` 為創建 task 的 API ,若創建成功回傳 0 跳出 if 判斷式; 反之則進入 15-24 行 if 區塊將 pthread 取消、並釋放 jobqueue, pthread, thread pool
- 函式回傳被創建好 pthread 的 thread pool
- 由 `pthread_create` 的 argument 可見其將名為 `jobqueue_fetch` 的 function pointer 當作執行目標,並將 `jobqueue` 作為 `jobqueue_fetch` 的 argument ,接著會細看此函數的定義
```c=
struct __threadpool *tpool_create(size_t count)
{
jobqueue_t *jobqueue = jobqueue_create();
struct __threadpool *pool = malloc(sizeof(struct __threadpool));
if (!jobqueue || !pool) {
if (jobqueue)
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
pool->count = count, pool->jobqueue = jobqueue;
if ((pool->workers = malloc(count * sizeof(pthread_t)))) {
for (int i = 0; i < count; i++) {
if (pthread_create(&pool->workers[i], NULL, jobqueue_fetch, (void *) jobqueue)) {
for (int j = 0; j < i; j++)
pthread_cancel(pool->workers[j]);
for (int j = 0; j < i; j++)
pthread_join(pool->workers[j], NULL);
free(pool->workers);
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
}
return pool;
}
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
```
#### **2. jobqueue_fetch**
- 由函數名稱可見其主要功能就是從 job queue 中獲取新的 task 來執行
- 關於 `pthread_cleanup_push`, `pthread_clean_push` 和 thread cancellation clean-up handlers
:::info
- 當 thread 執行結束 (呼叫 `pthread_exit` 或 `pthread_cancel`) 時,系統會觸發 [thread cancellation clean-up handlers](https://man7.org/linux/man-pages/man3/pthread_cleanup_push.3.html)
> A clean-up handler is a function that is automatically executed when a thread is canceled
- cleanup handlers 的加入或取出會遵守 stack (last in first out) 的原則,而 `pthread_cleanup_push`, `pthread_clean_push` 為操作 clean-up handler 的介面,兩函數必須成對出現(在同一層級的大括號內)
- 有三種情況會觸發 clean-up handler stack 的 pop 動作
1. pthread 被 cancel
2. 呼叫 pthread_exit
3. 呼叫 pthread_clean_push
:::
- `int pthread_setcancelstate(int state, int *oldstate)` : 主要為設置 thread 取消的狀態 (cancelability type),並回傳舊的 cancelability type 至 oldstate ,兩種 state 為
- `PTHREAD_CANCEL_ENABLE` : 預設值,cancel 的請求會被回應
- `PTHREAD_CANCEL_DISABLE` : 當 cancel 的請求發出會進行 block 的動作,直到 cancelability type 變為 PTHREAD_CANCEL_ENABLE
- `int pthread_setcanceltype(int type, int *oldtype)` : 主要功能為設定 pthread cancel 時的回應速度,並傳舊的設定值給 oldtype ,而 type 也有兩種:
- `PTHREAD_CANCEL_DEFERRED` : 為預設值, cancel 呼叫時會被延遲到下一個 cancellation point 時才會進行 pthread 的 cancel
- `PTHREAD_CANCEL_ASYNCHRONOUS` : 當收到 cancel 的請求時會立馬執行 cancel 的動作
:::info
- 有一系列[函數]()在被呼叫時會被作為 cancellation point 如
- pthread_cond_timedwait()
- pthread_cond_wait()
- pthread_join()
- pthread_testcancel()
- 其他被作為 cancellation point 的函數可見 [pthreads(7) manual page](https://www.man7.org/linux/man-pages/man7/pthreads.7.html) 中的 cancellation points 章節
:::
```c=
static void *jobqueue_fetch(void *queue)
{
jobqueue_t *jobqueue = (jobqueue_t *) queue;
threadtask_t *task;
int old_state;
pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock);
while (1) {
pthread_mutex_lock(&jobqueue->rwlock);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state);
pthread_testcancel();
while (!jobqueue->tail)
pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
if (jobqueue->head == jobqueue->tail) {
task = jobqueue->tail;
jobqueue->head = jobqueue->tail = NULL;
} else {
threadtask_t *tmp;
for (tmp = jobqueue->head; tmp->next != jobqueue->tail;
tmp = tmp->next)
;
task = tmp->next;
tmp->next = NULL;
jobqueue->tail = tmp;
}
pthread_mutex_unlock(&jobqueue->rwlock);
if (task->func) {
pthread_mutex_lock(&task->future->mutex);
if (task->future->flag & __FUTURE_CANCELLED) {
pthread_mutex_unlock(&task->future->mutex);
free(task);
continue;
} else {
task->future->flag |= __FUTURE_RUNNING;
pthread_mutex_unlock(&task->future->mutex);
}
void *ret_value = task->func(task->arg);
pthread_mutex_lock(&task->future->mutex);
if (task->future->flag & __FUTURE_DESTROYED) {
pthread_mutex_unlock(&task->future->mutex);
pthread_mutex_destroy(&task->future->mutex);
pthread_cond_destroy(&task->future->cond_finished);
free(task->future);
} else {
task->future->flag |= __FUTURE_FINISHED;
task->future->result = ret_value;
pthread_cond_broadcast(&task->future->cond_finished);
pthread_mutex_unlock(&task->future->mutex);
}
free(task);
} else {
pthread_mutex_destroy(&task->future->mutex);
pthread_cond_destroy(&task->future->cond_finished);
free(task->future);
free(task);
break;
}
}
pthread_cleanup_pop(0);
pthread_exit(NULL);
}
```
#### **3. tpool_apply**
- 將 task 及對應的 argument 加入 thread pool 中 job queue 的 head ,其中 `struct __tpool_future *future = tpool_future_create();` 為初始化 return value 的結構
- `pthread_cond_broadcast(&jobqueue->cond_nonempty);` 為當 job queue 從空變為非空時,提醒所有 thread 現在有新的 task 進入 job queue 中待執行
- `pthread_mutex_lock(&jobqueue->rwlock);` 控制從 job queue 插入或取出 task 的同步操作
```c=
struct __tpool_future *tpool_apply(struct __threadpool *pool,
void *(*func)(void *),
void *arg)
{
jobqueue_t *jobqueue = pool->jobqueue;
threadtask_t *new_head = malloc(sizeof(threadtask_t));
struct __tpool_future *future = tpool_future_create();
if (new_head && future) {
new_head->func = func, new_head->arg = arg, new_head->future = future;
pthread_mutex_lock(&jobqueue->rwlock);
if (jobqueue->head) {
new_head->next = jobqueue->head;
jobqueue->head = new_head;
} else {
jobqueue->head = jobqueue->tail = new_head;
pthread_cond_broadcast(&jobqueue->cond_nonempty) /*HHH*/;
}
pthread_mutex_unlock(&jobqueue->rwlock);
} else if (new_head) {
free(new_head);
return NULL;
} else if (future) {
tpool_future_destroy(future);
return NULL;
}
return future;
}
```
#### **4. tpool_future_get**
- 重複確認 task 回傳值直到已經被運算完成,並將結果 return
- `pthread_mutex_lock(&future->mutex);` 控制 `struct __tpool_future` 的同步,包含 `cond_finished` 等
- `seconds` 控制執行的時間,以秒為單位,若超過時間還未將結果運算出來則立下 `__FUTURE_TIMEOUT` 的 flag ,表示執行失敗並回傳 `NULL`
- 若 `seconds` 為 0 則表示直到運算完成前會持續等待,實做機制使用 `pthread_cond_wait(&future->cond_finished, &future->mutex)` 控制,釋放 `future->mutex` 互斥鎖並用 `future->cond_finished` condition variable 來等待運算結果被算完
```c=
void *tpool_future_get(struct __tpool_future *future, unsigned int seconds)
{
pthread_mutex_lock(&future->mutex);
/* turn off the timeout bit set previously */
future->flag &= ~__FUTURE_TIMEOUT;
while ((future->flag & __FUTURE_FINISHED) == 0) {
if (seconds) {
struct timespec expire_time;
clock_gettime(CLOCK_MONOTONIC, &expire_time);
expire_time.tv_sec += seconds;
int status = pthread_cond_timedwait(&future->cond_finished,
&future->mutex, &expire_time);
if (status == ETIMEDOUT) {
future->flag |= __FUTURE_TIMEOUT;
pthread_mutex_unlock(&future->mutex);
return NULL;
}
} else
pthread_cond_wait(&future->cond_finished, &future->mutex) /*FFF*/ ;
}
pthread_mutex_unlock(&future->mutex);
return future->result;
}
```
## 改進
-