# 2021q1 Homework4 (quiz4)
contributed by < [`bakudr18`](https://github.com/bakudr18)>
###### tags: `linux2021`
> [第 4 週測驗題](https://hackmd.io/@sysprog/linux2021-quiz4)
## 解釋程式碼運作原理
### 功能簡介
```c
/* Linked list to store the jobs in queue */
typedef struct __threadtask {
void *(*func)(void *); /* the function that task should execute */
void *arg; /* argument passed to func */
struct __tpool_future *future; /* A structure to store task status and result */
struct __threadtask *next; /* pointer to next task */
} threadtask_t;
typedef struct __jobqueue {
threadtask_t *head, *tail; /* store head and tail of queue */
pthread_cond_t cond_nonempty; /* condition variable to check if queue is non-empty */
pthread_mutex_t rwlock; /* lock share resources like head and tail */
} jobqueue_t;
/* A structure to store job status and return result */
struct __tpool_future {
int flag; /* job status */
void *result; /* the result of job which points to this future */
pthread_mutex_t mutex; /* lock share resouces like flag and result */
pthread_cond_t cond_finished; /* condition variable to check if job is finished */
};
struct __threadpool {
size_t count; /* count of pthread */
pthread_t *workers; /* store pthread id */
jobqueue_t *jobqueue; /* queue which stores pending jobs */
};
```
* `tpool_create`: 建立一 thread pool 包含 `count` 個 theads
* `tpool_apply`: 新增 task 進 jobqueue 中給 thread 執行,回傳該 task 的 `future`
* `tpool_join`: 等待所有 task 執行結束回收所有 thread
* `tpool_future_get`: 等待 `seconds` 秒給 task 執行,並回傳 task 的執行結果
* `tpool_future_destroy`: 釋放 `future` 的資源
### 細部函式探討
`tpool_apply` 在新增的 task 為 queue 的第一筆資料時會透過 `pthread_cond_broadcast` 喚起所有等待中的 thread 來執行 task。根據 [pthread_cond_signal(3)](https://linux.die.net/man/3/pthread_cond_signal)
> The thread(s) that are unblocked shall contend for the mutex according to the scheduling policy (if applicable), and as if each had called pthread_mutex_lock().
換言之,`pthread_cond_broadcast` 和 `pthread_cond_signal` 只負責喚起 thread ,而哪個 thread 能競爭到 mutex 是由 OS scheduler 所決定。
```c
struct __tpool_future *tpool_apply(struct __threadpool *pool,
void *(*func)(void *),
void *arg)
{
...
if (new_head && future) {
new_head->func = func, new_head->arg = arg, new_head->future = future;
pthread_mutex_lock(&jobqueue->rwlock);
if (jobqueue->head) {
new_head->next = jobqueue->head;
jobqueue->head = new_head;
} else {
jobqueue->head = jobqueue->tail = new_head;
pthread_cond_broadcast(&jobqueue->cond_nonempty);
}
pthread_mutex_unlock(&jobqueue->rwlock);
}
...
return future;
}
```
`jobqueue_fetch` 為 thread 所執行的函式,在 `while(1)` 的前半部份 (Line 10~29) 是從 jobqueue 中取出任務存到 `task` 中,後半部份 (Line 31~62) 是執行 `task->func`,這裡分段進行探討。
快速掃過整個函式會發現只有當 `task->func == NULL` 時才會跳出迴圈,然而在 Line 11~16 使用了 thread cancellation 的機制,讓 process 可以透過發出 cancellation request 來終止 thread 執行,被終止的 thread 會去執行事先在 `pthread_cleanup_push` 註冊的 callback function 後才 return ,而這裡的 callback function 通常會用來釋放 lock 或釋放其他未釋放的資源。
在 Line 11, 16 使用 [`pthread_setcancelstate`](https://man7.org/linux/man-pages/man3/pthread_setcancelstate.3.html) 分別做 enable 與 disable,表示只有在這個 section 內 thread 是可以接受且執行 cancellation request 的(透過 [`pthread_cancel`](https://man7.org/linux/man-pages/man3/pthread_cancel.3.html) 發出 request),而 thread 所預設的 cancelability type 為 `PTHREAD_CANCEL_DEFERRED` ,代表只有在 thread 遇到 cancellation point 時才會執行 cancellation request 以結束 thread,而 [`pthread_testcancel`](https://man7.org/linux/man-pages/man3/pthread_testcancel.3.html) 就是一個 cancellation point,另外由 [pthreads(7)](https://man7.org/linux/man-pages/man7/pthreads.7.html) 可知,大部分的 blocking function (例如這裡的 `pthread_cond_wait`) 都是 cancellation point ,因此在 Line 12, 14 都有機會結束執行緒。
:::info
* 思考1: 為什麼 Line 14 是 `while (!jobqueue->tail)` 而不是 `if (!jobqueue->tail)` ?
前面提過 `pthread_cond_broadcast` 會喚起所有 blocking thread,即便是 `pthread_cond_signal` 也會喚起**至少**一個 blocking thread,而哪個 thread 能競爭到 lock 是由 OS scheduler 決定,因此沒拿到 lock 的 thread 實際上並不應該繼續執行 critical section 的程式碼,這個行為稱為 [spurious wakeup](https://linux.die.net/man/3/pthread_cond_wait) ,因此有必要再次執行 `pthread_cond_wait` 進入等待。
* 思考2: 既然 Line 15 已經有 `pthread_cond_wait` ,為什麼 Line 12 還要放 `pthread_testcancel` ?
考慮到當 `jobqueue` 一直不為空,永遠有 `task->func` 可以執行,那麼 thread 就永遠不會停止,為了讓 thread 有機會停止,因此 `pthread_testcancel` 就變得有必要了!
* 思考3: 為什麼要限制 cancelable 區域在 Line 11~16?
猜測這裡的目的是希望在有 task 可執行且被執行時不會執行到一半被 cancel ,畢竟不能保證`task->func` 內沒有使用到為 cancellation point 的函式。
:::
接著看到 Line 21~27 ,透過走訪 `tmp` linked list 來取得最後一個 `task` ,既然本來就有 `tail` 就應有機會不需走訪整個 list ,因此看起來就是一個應該改進的地方 :slightly_smiling_face:
```c=
static void *jobqueue_fetch(void *queue)
{
jobqueue_t *jobqueue = (jobqueue_t *) queue;
threadtask_t *task;
int old_state;
pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock);
while (1) {
pthread_mutex_lock(&jobqueue->rwlock);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state);
pthread_testcancel();
while (!jobqueue->tail)
pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
if (jobqueue->head == jobqueue->tail) {
task = jobqueue->tail;
jobqueue->head = jobqueue->tail = NULL;
} else {
threadtask_t *tmp;
for (tmp = jobqueue->head; tmp->next != jobqueue->tail;
tmp = tmp->next)
;
task = tmp->next;
tmp->next = NULL;
jobqueue->tail = tmp;
}
pthread_mutex_unlock(&jobqueue->rwlock);
if (task->func) {
pthread_mutex_lock(&task->future->mutex);
if (task->future->flag & __FUTURE_CANCELLED) {
pthread_mutex_unlock(&task->future->mutex);
free(task);
continue;
} else {
task->future->flag |= __FUTURE_RUNNING;
pthread_mutex_unlock(&task->future->mutex);
}
void *ret_value = task->func(task->arg);
pthread_mutex_lock(&task->future->mutex);
if (task->future->flag & __FUTURE_DESTROYED) {
pthread_mutex_unlock(&task->future->mutex);
pthread_mutex_destroy(&task->future->mutex);
pthread_cond_destroy(&task->future->cond_finished);
free(task->future);
} else {
task->future->flag |= __FUTURE_FINISHED;
task->future->result = ret_value;
pthread_cond_broadcast(&task->future->cond_finished);
pthread_mutex_unlock(&task->future->mutex);
}
free(task);
} else {
pthread_mutex_destroy(&task->future->mutex);
pthread_cond_destroy(&task->future->cond_finished);
free(task->future);
free(task);
break;
}
}
pthread_cleanup_pop(0);
pthread_exit(NULL);
}
```
繼續看到後半部份 (Line 31~62) ,除了執行 `task->func` 外,也需要修改到 `future` 的狀態,而 `future->flag` 與 `future->result` 都是共用資源(使用前需取得 lock),會在 `tpool_future_get`, `tpool_future_destroy` 與 `jobqueue_destroy` 都會用到,勢必得一起討論。
* `__FUTURE_CANCELLED`: 表示 task 被要求取消,只有在 `jobqueue_destroy` 會被設置,而此函式只有在當所有 thread 都被 cancel 並合併之後,也就是不再執行 `jobqueue_fetch` 時才會被執行,因此在 `jobqueue_fetch` 應該是不需要判斷此狀態的,這裡是一個可以改進之處。
```graphviz
digraph list {
rankdir = "LR"
tpool_create
tpool_join
jobqueue_destroy
pthread_join
__FUTURE_CANCELLED
{
rank = "same"
tpool_create
tpool_join
}
{
tpool_create->pthread_join[label = "fail"]
tpool_join->pthread_join
pthread_join->jobqueue_destroy
jobqueue_destroy->__FUTURE_CANCELLED[label = "set flag"]
}
}
```
* `__FUTURE_RUNNING`: 表示 task 正在執行中,設置後才可執行 `task->func` ,但目前其他函式並沒有以此 flag 來做任何判斷。
* `__FUTURE_DESTROYED`: 表示 future 應該要被銷毀,此 flag 是由 `tpool_future_destroy` 所設置,而 `tpool_future_destroy` 是釋出給 user 使用的函式,注意到執行此函式後 `future` 並不一定會被馬上釋放資源 ,可能會由 `jobqueue_fetch` 或 `jobqueue_destroy` 來釋放。另外這裡看不出來明確規範需要在 `task->func` 執行完後才可響應 `__FUTURE_DESTROYED` 要求,我認為放在 `task->func` 之前也是符合邏輯的,因此在修改上程式碼前還需要先釐清與 `__FUTURE_FINISHED` 之間的關係。
```c
int tpool_future_destroy(struct __tpool_future *future)
{
if (future) {
pthread_mutex_lock(&future->mutex);
if (future->flag & __FUTURE_FINISHED ||
future->flag & __FUTURE_CANCELLED) {
pthread_mutex_unlock(&future->mutex);
pthread_mutex_destroy(&future->mutex);
pthread_cond_destroy(&future->cond_finished);
free(future);
} else {
future->flag |= __FUTURE_DESTROYED;
pthread_mutex_unlock(&future->mutex);
}
}
return 0;
}
```
* `__FUTURE_FINISHED`: 表示 task 執行完畢,邏輯上來說應該要和 `__FUTURE_RUNNING` 是互斥的存在,因此需改進。`tpool_future_get` 以此 flag 做判斷依據決定是否需回傳`future->result` ,至於 `__FUTURE_DESTROYED` 狀態下是否需回傳 result 會在下方探討。
```c
void *tpool_future_get(struct __tpool_future *future, unsigned int seconds)
{
pthread_mutex_lock(&future->mutex);
/* turn off the timeout bit set previously */
future->flag &= ~__FUTURE_TIMEOUT;
while ((future->flag & __FUTURE_FINISHED) == 0) {
if (seconds) {
struct timespec expire_time;
clock_gettime(CLOCK_MONOTONIC, &expire_time);
expire_time.tv_sec += seconds;
int status = pthread_cond_timedwait(&future->cond_finished,
&future->mutex, &expire_time);
if (status == ETIMEDOUT) {
future->flag |= __FUTURE_TIMEOUT;
pthread_mutex_unlock(&future->mutex);
return NULL;
}
} else
pthread_cond_wait(&future->cond_finished, &future->mutex);
}
pthread_mutex_unlock(&future->mutex);
return future->result;
}
```
* `__FUTURE_TIMEOUT`: 表示 task 超過 `tpool_future_get` 給予的執行時間,應儘快回傳 NULL。`pthread_cond_timedwait` 與 `pthread_cond_wait` 類似,但多了一個時間參數,當超過時間時會回傳 `ETIMEDOUT` error。而這個函式有一些細節需要注意,根據 [pthread_cond_wait(3)](https://linux.die.net/man/3/pthread_cond_wait)
> The pthread_cond_timedwait() function shall be equivalent to pthread_cond_wait(), except that an error is returned if the absolute time specified by abstime passes (that is, **system time equals or exceeds abstime**) before the condition cond is signaled or broadcasted, or if the absolute time specified by abstime has already been passed at the time of the call.
首先,`pthread_cond_timedwait` 是有可能超過時間才回傳的,而可能超過多久在 man page 裡並沒有明確定義,這涉及到的議題非常多,可以在 [Stackoverflow](https://stackoverflow.com/a/45784669/11425048) 的回答先有初步了解即可,但絕大部分時間 OS 會儘可能不超時過多,所以若無特殊需求(如 real-time system)並不需要太過擔心。接著繼續看 [pthread_cond_wait(3)](https://linux.die.net/man/3/pthread_cond_wait)
> When such timeouts occur, pthread_cond_timedwait() shall nonetheless release and re-acquire the mutex referenced by mutex.
表示當 `ETIMEDOUT` 發生時,此 thread 應重新獲得 mutex,因此後續也需做對應的 unlock 。
:::info
為了驗證 `ETIMEDOUT` 發生後 thread 是否重新獲得 mutex ,我做了以下實驗:
首先設定 mutex type 為 `PTHREAD_MUTEX_ERRORCHECK`。
```c
static struct __tpool_future *tpool_future_create(void)
{
...
pthread_mutexattr_t mutex_attr;
pthread_mutexattr_init(&mutex_attr);
pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_ERRORCHECK);
pthread_mutex_init(&future->mutex, &mutex_attr);
pthread_mutexattr_destroy(&mutex_attr);
...
}
```
接著在 `ETIMEDOUT` 發生後嘗試以 `pthread_mutex_trylock` 重新獲得 mutex 。
```c=
void *tpool_future_get(struct __tpool_future *future, unsigned int seconds)
{
...
if (status == ETIMEDOUT) {
future->flag |= __FUTURE_TIMEOUT;
int rc = pthread_mutex_trylock(&future->mutex);
if (rc != 0) {
errno = rc;
perror("mutex lock");
rc = pthread_mutex_unlock(&future->mutex);
if (rc != 0) {
errno = rc;
perror("mutex unlock");
}
} else {
printf("%s owns metex\n", __func__);
pthread_mutex_unlock(&future->mutex);
}
return NULL;
}
...
}
```
根據 [pthread_mutex_lock(3)](https://linux.die.net/man/3/pthread_mutex_lock)
>If the mutex type is PTHREAD_MUTEX_ERRORCHECK, then error checking shall be provided. If a thread attempts to relock a mutex that it has already locked, an error shall be returned. If a thread attempts to unlock a mutex that it has not locked or a mutex which is unlocked, an error shall be returned.
EBUSY - The mutex could not be acquired because it was already locked.
EDEADLK - The current thread already owns the mutex.
這裡 `pthread_mutex_trylock` 回傳的結果會是 `EBUSY` 而非 `EDEADLK`,邏輯上來說並沒有錯,而我也理解 dead lock 應該是要有兩個以上的鎖才會發生,但這句 `EDEADLK - The current thread already owns the mutex` 真的很容易讓人誤會成在持有鎖時再次 try lock 就會得到 `EDEADLK` 的 error。而從 Line 10 `pthread_mutex_unlock` 沒有回傳錯誤就可以知道此 thread 確實是持有 mutex 的。
:::
:::info
* 思考4:`__FUTURE_DESTROYED` 狀態下是否需回傳 result ?
這是取捨問題,只要在文件中明確規定針對同一個 `future` 先執行 `tpool_future_destroy` 後才執行 `tpool_future_get` ,`tpool_future_get` 回傳結果是 undefined 即可。
:::
## Improvement
原實作中 pop task 的方式是走訪整個 linked list 取出 tail ,其 time complexity 為 $O(1)$ ,造成在 critical section 內駐留過久,這對於 multithread 效能的傷害是很大的,透過 `sudo taskset 0xF0 ./pi` 以 4 core CPU測量 `PRECISION = 100000` 時的執行時間,多執行幾次可發現,雖然大多數執行時間落在 160~200 milliseconds ,但偶爾會有超過 100 seconds 或更久的執行時間,以 [perf record](https://man7.org/linux/man-pages/man1/perf-record.1.html) 紀錄超過 100 seconds 的 process,發現有約 83% 的時間在執行 `for (tmp = jobqueue->head; tmp->next != jobqueue->tail; tmp = tmp->next);` ,而越是減少可用的 CPU 此情況會更嚴重(因為當 main thread 執行時不斷 push task,若 OS scheduler 沒有頻繁 context switch 給 `jobqueue_fetch` 消化 task,linked list 會增長非常快)。
因此,為了使 pop task 達到 $O(1)$ ,我把原本的 `jobqueue` 改成 doubly list,後來更是直接引入 [linux/list.h](https://github.com/torvalds/linux/blob/master/include/linux/list.h) 以 circular doubly linked list 取代原本的 singly linked list,如此不但使 pop task 達到 $O(1)$ ,也減少了在 `jobqueue->head == jobqueue->tail` 的 if statement ,詳細程式碼可見 [commit f8761](https://github.com/bakudr18/quiz4/commit/f8761d6b4c884f1a96ef80f08d590fd5573aacee) ,而修改後以 4 core 執行的平均時間約為 138 milliseconds,且沒有特別的 outliers 。
```diff
typedef struct __threadtask {
void *(*func)(void *); /* the function that task should execute */
void *arg; /* argument passed to func */
struct __tpool_future
*future; /* A structure to store task status and result */
- struct __threadtask *next; /* pointer to next task */
+ struct list_head list; /* linked list of task structure */
} threadtask_t;
typedef struct __jobqueue {
- threadtask_t *head, *tail; /* store head and tail of queue */
+ struct list_head head; /* list head of task */
pthread_cond_t
cond_nonempty; /* condition variable to check if queue is non-empty */
pthread_mutex_t rwlock; /* lock share resources like head and tail */
} jobqueue_t;
```