# 2021q1 Homework4 (quiz4) contributed by < [`bakudr18`](https://github.com/bakudr18)> ###### tags: `linux2021` > [第 4 週測驗題](https://hackmd.io/@sysprog/linux2021-quiz4) ## 解釋程式碼運作原理 ### 功能簡介 ```c /* Linked list to store the jobs in queue */ typedef struct __threadtask { void *(*func)(void *); /* the function that task should execute */ void *arg; /* argument passed to func */ struct __tpool_future *future; /* A structure to store task status and result */ struct __threadtask *next; /* pointer to next task */ } threadtask_t; typedef struct __jobqueue { threadtask_t *head, *tail; /* store head and tail of queue */ pthread_cond_t cond_nonempty; /* condition variable to check if queue is non-empty */ pthread_mutex_t rwlock; /* lock share resources like head and tail */ } jobqueue_t; /* A structure to store job status and return result */ struct __tpool_future { int flag; /* job status */ void *result; /* the result of job which points to this future */ pthread_mutex_t mutex; /* lock share resouces like flag and result */ pthread_cond_t cond_finished; /* condition variable to check if job is finished */ }; struct __threadpool { size_t count; /* count of pthread */ pthread_t *workers; /* store pthread id */ jobqueue_t *jobqueue; /* queue which stores pending jobs */ }; ``` * `tpool_create`: 建立一 thread pool 包含 `count` 個 theads * `tpool_apply`: 新增 task 進 jobqueue 中給 thread 執行,回傳該 task 的 `future` * `tpool_join`: 等待所有 task 執行結束回收所有 thread * `tpool_future_get`: 等待 `seconds` 秒給 task 執行,並回傳 task 的執行結果 * `tpool_future_destroy`: 釋放 `future` 的資源 ### 細部函式探討 `tpool_apply` 在新增的 task 為 queue 的第一筆資料時會透過 `pthread_cond_broadcast` 喚起所有等待中的 thread 來執行 task。根據 [pthread_cond_signal(3)](https://linux.die.net/man/3/pthread_cond_signal) > The thread(s) that are unblocked shall contend for the mutex according to the scheduling policy (if applicable), and as if each had called pthread_mutex_lock(). 換言之,`pthread_cond_broadcast` 和 `pthread_cond_signal` 只負責喚起 thread ,而哪個 thread 能競爭到 mutex 是由 OS scheduler 所決定。 ```c struct __tpool_future *tpool_apply(struct __threadpool *pool, void *(*func)(void *), void *arg) { ... if (new_head && future) { new_head->func = func, new_head->arg = arg, new_head->future = future; pthread_mutex_lock(&jobqueue->rwlock); if (jobqueue->head) { new_head->next = jobqueue->head; jobqueue->head = new_head; } else { jobqueue->head = jobqueue->tail = new_head; pthread_cond_broadcast(&jobqueue->cond_nonempty); } pthread_mutex_unlock(&jobqueue->rwlock); } ... return future; } ``` `jobqueue_fetch` 為 thread 所執行的函式,在 `while(1)` 的前半部份 (Line 10~29) 是從 jobqueue 中取出任務存到 `task` 中,後半部份 (Line 31~62) 是執行 `task->func`,這裡分段進行探討。 快速掃過整個函式會發現只有當 `task->func == NULL` 時才會跳出迴圈,然而在 Line 11~16 使用了 thread cancellation 的機制,讓 process 可以透過發出 cancellation request 來終止 thread 執行,被終止的 thread 會去執行事先在 `pthread_cleanup_push` 註冊的 callback function 後才 return ,而這裡的 callback function 通常會用來釋放 lock 或釋放其他未釋放的資源。 在 Line 11, 16 使用 [`pthread_setcancelstate`](https://man7.org/linux/man-pages/man3/pthread_setcancelstate.3.html) 分別做 enable 與 disable,表示只有在這個 section 內 thread 是可以接受且執行 cancellation request 的(透過 [`pthread_cancel`](https://man7.org/linux/man-pages/man3/pthread_cancel.3.html) 發出 request),而 thread 所預設的 cancelability type 為 `PTHREAD_CANCEL_DEFERRED` ,代表只有在 thread 遇到 cancellation point 時才會執行 cancellation request 以結束 thread,而 [`pthread_testcancel`](https://man7.org/linux/man-pages/man3/pthread_testcancel.3.html) 就是一個 cancellation point,另外由 [pthreads(7)](https://man7.org/linux/man-pages/man7/pthreads.7.html) 可知,大部分的 blocking function (例如這裡的 `pthread_cond_wait`) 都是 cancellation point ,因此在 Line 12, 14 都有機會結束執行緒。 :::info * 思考1: 為什麼 Line 14 是 `while (!jobqueue->tail)` 而不是 `if (!jobqueue->tail)` ? 前面提過 `pthread_cond_broadcast` 會喚起所有 blocking thread,即便是 `pthread_cond_signal` 也會喚起**至少**一個 blocking thread,而哪個 thread 能競爭到 lock 是由 OS scheduler 決定,因此沒拿到 lock 的 thread 實際上並不應該繼續執行 critical section 的程式碼,這個行為稱為 [spurious wakeup](https://linux.die.net/man/3/pthread_cond_wait) ,因此有必要再次執行 `pthread_cond_wait` 進入等待。 * 思考2: 既然 Line 15 已經有 `pthread_cond_wait` ,為什麼 Line 12 還要放 `pthread_testcancel` ? 考慮到當 `jobqueue` 一直不為空,永遠有 `task->func` 可以執行,那麼 thread 就永遠不會停止,為了讓 thread 有機會停止,因此 `pthread_testcancel` 就變得有必要了! * 思考3: 為什麼要限制 cancelable 區域在 Line 11~16? 猜測這裡的目的是希望在有 task 可執行且被執行時不會執行到一半被 cancel ,畢竟不能保證`task->func` 內沒有使用到為 cancellation point 的函式。 ::: 接著看到 Line 21~27 ,透過走訪 `tmp` linked list 來取得最後一個 `task` ,既然本來就有 `tail` 就應有機會不需走訪整個 list ,因此看起來就是一個應該改進的地方 :slightly_smiling_face: ```c= static void *jobqueue_fetch(void *queue) { jobqueue_t *jobqueue = (jobqueue_t *) queue; threadtask_t *task; int old_state; pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock); while (1) { pthread_mutex_lock(&jobqueue->rwlock); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state); pthread_testcancel(); while (!jobqueue->tail) pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state); if (jobqueue->head == jobqueue->tail) { task = jobqueue->tail; jobqueue->head = jobqueue->tail = NULL; } else { threadtask_t *tmp; for (tmp = jobqueue->head; tmp->next != jobqueue->tail; tmp = tmp->next) ; task = tmp->next; tmp->next = NULL; jobqueue->tail = tmp; } pthread_mutex_unlock(&jobqueue->rwlock); if (task->func) { pthread_mutex_lock(&task->future->mutex); if (task->future->flag & __FUTURE_CANCELLED) { pthread_mutex_unlock(&task->future->mutex); free(task); continue; } else { task->future->flag |= __FUTURE_RUNNING; pthread_mutex_unlock(&task->future->mutex); } void *ret_value = task->func(task->arg); pthread_mutex_lock(&task->future->mutex); if (task->future->flag & __FUTURE_DESTROYED) { pthread_mutex_unlock(&task->future->mutex); pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); } else { task->future->flag |= __FUTURE_FINISHED; task->future->result = ret_value; pthread_cond_broadcast(&task->future->cond_finished); pthread_mutex_unlock(&task->future->mutex); } free(task); } else { pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); free(task); break; } } pthread_cleanup_pop(0); pthread_exit(NULL); } ``` 繼續看到後半部份 (Line 31~62) ,除了執行 `task->func` 外,也需要修改到 `future` 的狀態,而 `future->flag` 與 `future->result` 都是共用資源(使用前需取得 lock),會在 `tpool_future_get`, `tpool_future_destroy` 與 `jobqueue_destroy` 都會用到,勢必得一起討論。 * `__FUTURE_CANCELLED`: 表示 task 被要求取消,只有在 `jobqueue_destroy` 會被設置,而此函式只有在當所有 thread 都被 cancel 並合併之後,也就是不再執行 `jobqueue_fetch` 時才會被執行,因此在 `jobqueue_fetch` 應該是不需要判斷此狀態的,這裡是一個可以改進之處。 ```graphviz digraph list { rankdir = "LR" tpool_create tpool_join jobqueue_destroy pthread_join __FUTURE_CANCELLED { rank = "same" tpool_create tpool_join } { tpool_create->pthread_join[label = "fail"] tpool_join->pthread_join pthread_join->jobqueue_destroy jobqueue_destroy->__FUTURE_CANCELLED[label = "set flag"] } } ``` * `__FUTURE_RUNNING`: 表示 task 正在執行中,設置後才可執行 `task->func` ,但目前其他函式並沒有以此 flag 來做任何判斷。 * `__FUTURE_DESTROYED`: 表示 future 應該要被銷毀,此 flag 是由 `tpool_future_destroy` 所設置,而 `tpool_future_destroy` 是釋出給 user 使用的函式,注意到執行此函式後 `future` 並不一定會被馬上釋放資源 ,可能會由 `jobqueue_fetch` 或 `jobqueue_destroy` 來釋放。另外這裡看不出來明確規範需要在 `task->func` 執行完後才可響應 `__FUTURE_DESTROYED` 要求,我認為放在 `task->func` 之前也是符合邏輯的,因此在修改上程式碼前還需要先釐清與 `__FUTURE_FINISHED` 之間的關係。 ```c int tpool_future_destroy(struct __tpool_future *future) { if (future) { pthread_mutex_lock(&future->mutex); if (future->flag & __FUTURE_FINISHED || future->flag & __FUTURE_CANCELLED) { pthread_mutex_unlock(&future->mutex); pthread_mutex_destroy(&future->mutex); pthread_cond_destroy(&future->cond_finished); free(future); } else { future->flag |= __FUTURE_DESTROYED; pthread_mutex_unlock(&future->mutex); } } return 0; } ``` * `__FUTURE_FINISHED`: 表示 task 執行完畢,邏輯上來說應該要和 `__FUTURE_RUNNING` 是互斥的存在,因此需改進。`tpool_future_get` 以此 flag 做判斷依據決定是否需回傳`future->result` ,至於 `__FUTURE_DESTROYED` 狀態下是否需回傳 result 會在下方探討。 ```c void *tpool_future_get(struct __tpool_future *future, unsigned int seconds) { pthread_mutex_lock(&future->mutex); /* turn off the timeout bit set previously */ future->flag &= ~__FUTURE_TIMEOUT; while ((future->flag & __FUTURE_FINISHED) == 0) { if (seconds) { struct timespec expire_time; clock_gettime(CLOCK_MONOTONIC, &expire_time); expire_time.tv_sec += seconds; int status = pthread_cond_timedwait(&future->cond_finished, &future->mutex, &expire_time); if (status == ETIMEDOUT) { future->flag |= __FUTURE_TIMEOUT; pthread_mutex_unlock(&future->mutex); return NULL; } } else pthread_cond_wait(&future->cond_finished, &future->mutex); } pthread_mutex_unlock(&future->mutex); return future->result; } ``` * `__FUTURE_TIMEOUT`: 表示 task 超過 `tpool_future_get` 給予的執行時間,應儘快回傳 NULL。`pthread_cond_timedwait` 與 `pthread_cond_wait` 類似,但多了一個時間參數,當超過時間時會回傳 `ETIMEDOUT` error。而這個函式有一些細節需要注意,根據 [pthread_cond_wait(3)](https://linux.die.net/man/3/pthread_cond_wait) > The pthread_cond_timedwait() function shall be equivalent to pthread_cond_wait(), except that an error is returned if the absolute time specified by abstime passes (that is, **system time equals or exceeds abstime**) before the condition cond is signaled or broadcasted, or if the absolute time specified by abstime has already been passed at the time of the call. 首先,`pthread_cond_timedwait` 是有可能超過時間才回傳的,而可能超過多久在 man page 裡並沒有明確定義,這涉及到的議題非常多,可以在 [Stackoverflow](https://stackoverflow.com/a/45784669/11425048) 的回答先有初步了解即可,但絕大部分時間 OS 會儘可能不超時過多,所以若無特殊需求(如 real-time system)並不需要太過擔心。接著繼續看 [pthread_cond_wait(3)](https://linux.die.net/man/3/pthread_cond_wait) > When such timeouts occur, pthread_cond_timedwait() shall nonetheless release and re-acquire the mutex referenced by mutex. 表示當 `ETIMEDOUT` 發生時,此 thread 應重新獲得 mutex,因此後續也需做對應的 unlock 。 :::info 為了驗證 `ETIMEDOUT` 發生後 thread 是否重新獲得 mutex ,我做了以下實驗: 首先設定 mutex type 為 `PTHREAD_MUTEX_ERRORCHECK`。 ```c static struct __tpool_future *tpool_future_create(void) { ... pthread_mutexattr_t mutex_attr; pthread_mutexattr_init(&mutex_attr); pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_ERRORCHECK); pthread_mutex_init(&future->mutex, &mutex_attr); pthread_mutexattr_destroy(&mutex_attr); ... } ``` 接著在 `ETIMEDOUT` 發生後嘗試以 `pthread_mutex_trylock` 重新獲得 mutex 。 ```c= void *tpool_future_get(struct __tpool_future *future, unsigned int seconds) { ... if (status == ETIMEDOUT) { future->flag |= __FUTURE_TIMEOUT; int rc = pthread_mutex_trylock(&future->mutex); if (rc != 0) { errno = rc; perror("mutex lock"); rc = pthread_mutex_unlock(&future->mutex); if (rc != 0) { errno = rc; perror("mutex unlock"); } } else { printf("%s owns metex\n", __func__); pthread_mutex_unlock(&future->mutex); } return NULL; } ... } ``` 根據 [pthread_mutex_lock(3)](https://linux.die.net/man/3/pthread_mutex_lock) >If the mutex type is PTHREAD_MUTEX_ERRORCHECK, then error checking shall be provided. If a thread attempts to relock a mutex that it has already locked, an error shall be returned. If a thread attempts to unlock a mutex that it has not locked or a mutex which is unlocked, an error shall be returned. EBUSY - The mutex could not be acquired because it was already locked. EDEADLK - The current thread already owns the mutex. 這裡 `pthread_mutex_trylock` 回傳的結果會是 `EBUSY` 而非 `EDEADLK`,邏輯上來說並沒有錯,而我也理解 dead lock 應該是要有兩個以上的鎖才會發生,但這句 `EDEADLK - The current thread already owns the mutex` 真的很容易讓人誤會成在持有鎖時再次 try lock 就會得到 `EDEADLK` 的 error。而從 Line 10 `pthread_mutex_unlock` 沒有回傳錯誤就可以知道此 thread 確實是持有 mutex 的。 ::: :::info * 思考4:`__FUTURE_DESTROYED` 狀態下是否需回傳 result ? 這是取捨問題,只要在文件中明確規定針對同一個 `future` 先執行 `tpool_future_destroy` 後才執行 `tpool_future_get` ,`tpool_future_get` 回傳結果是 undefined 即可。 ::: ## Improvement 原實作中 pop task 的方式是走訪整個 linked list 取出 tail ,其 time complexity 為 $O(1)$ ,造成在 critical section 內駐留過久,這對於 multithread 效能的傷害是很大的,透過 `sudo taskset 0xF0 ./pi` 以 4 core CPU測量 `PRECISION = 100000` 時的執行時間,多執行幾次可發現,雖然大多數執行時間落在 160~200 milliseconds ,但偶爾會有超過 100 seconds 或更久的執行時間,以 [perf record](https://man7.org/linux/man-pages/man1/perf-record.1.html) 紀錄超過 100 seconds 的 process,發現有約 83% 的時間在執行 `for (tmp = jobqueue->head; tmp->next != jobqueue->tail; tmp = tmp->next);` ,而越是減少可用的 CPU 此情況會更嚴重(因為當 main thread 執行時不斷 push task,若 OS scheduler 沒有頻繁 context switch 給 `jobqueue_fetch` 消化 task,linked list 會增長非常快)。 因此,為了使 pop task 達到 $O(1)$ ,我把原本的 `jobqueue` 改成 doubly list,後來更是直接引入 [linux/list.h](https://github.com/torvalds/linux/blob/master/include/linux/list.h) 以 circular doubly linked list 取代原本的 singly linked list,如此不但使 pop task 達到 $O(1)$ ,也減少了在 `jobqueue->head == jobqueue->tail` 的 if statement ,詳細程式碼可見 [commit f8761](https://github.com/bakudr18/quiz4/commit/f8761d6b4c884f1a96ef80f08d590fd5573aacee) ,而修改後以 4 core 執行的平均時間約為 138 milliseconds,且沒有特別的 outliers 。 ```diff typedef struct __threadtask { void *(*func)(void *); /* the function that task should execute */ void *arg; /* argument passed to func */ struct __tpool_future *future; /* A structure to store task status and result */ - struct __threadtask *next; /* pointer to next task */ + struct list_head list; /* linked list of task structure */ } threadtask_t; typedef struct __jobqueue { - threadtask_t *head, *tail; /* store head and tail of queue */ + struct list_head head; /* list head of task */ pthread_cond_t cond_nonempty; /* condition variable to check if queue is non-empty */ pthread_mutex_t rwlock; /* lock share resources like head and tail */ } jobqueue_t; ```