# 2021q1 Homework4 (quiz4)
contributed by < `hankluo6` >
> [第 4 週測驗題](https://hackmd.io/@sysprog/linux2021-quiz4)
## 測驗 `1`
> 參考解答
:::spoiler
#### `FFF = ?`
- [x] `(a) pthread_cond_wait(&future->cond_finished, &future->mutex)`
#### `GGG = ?`
- [x] `(c) while (!jobqueue->tail) pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock)`
#### `HHH = ?`
- [x] `(b) pthread_cond_broadcast(&jobqueue->cond_nonempty)`
#### `KKK = ?`
- [x] `(b) __FUTURE_FINISHED`
#### `LLL = ?`
- [x] `(a) pthread_cond_broadcast(&task->future->cond_finished)`
:::
### 運作原理
#### POSIX Threads
* ```c
int pthread_create(pthread_t *restrict thread,
const pthread_attr_t *restrict attr,
void *(*start_routine)(void *),
void *restrict arg);
```
建立新的執行緒,運行 `start_routine()`,並將 `arg` 傳入給 `start_routine()` 中。
* ```c
noreturn void pthread_exit(void *retval);
```
結束正在執行的執行緒,並將要迴傳給 process 的值放置在 `retval`。
* ```c
int pthread_join(pthread_t thread, void **retval);
```
等在 `thread` 的終止,並將 `retval` 設置為呼叫 `pthread_exit` 時給予的 `retval`。
* ```c
int pthread_cancel(pthread_t thread);
int pthread_setcancelstate(int state, int *oldstate);
int pthread_setcanceltype(int type, int *oldtype);
```
送出 cancellation 訊號給 `thread` 執行緒,而 `thread` 會根據 `state` 以及 `type` 來決定如何回應。透過 `pthread_setcancelstate` 以及 `pthread_setcanceltype` 設置 `state` 與 `type`:
* `state`
* `PTHREAD_CANCEL_ENABLE` 預設行為,表示此執行緒為可被取消的 (cancelable)。
* `PTHREAD_CANCEL_DISABLE` 表示此執行緒無法被取消,當呼叫 `pthread_cancel` 時,process 會被 block 住,直到此執行緒自行中止或改變狀態。
* `type`
* `PTHREAD_CANCEL_DEFERRED` 預測行為,當接收到 cancellation 訊號時,執行緒並不會馬上中止,而是會等待執行緒執行到 [`cancellation point`](https://man7.org/linux/man-pages/man7/pthreads.7.html) (可見下方 cancellation point 欄位) 時才會停止。
* `PTHREAD_CANCEL_ASYNCHRONOUS` 當收到 cancellation 訊號時會立刻中止。
* ```c
void pthread_cleanup_push(void (*routine)(void *), void *arg);
void pthread_cleanup_pop(int execute);
```
設定 function handler,在執行緒結束或某些情況下 (`pthread_cancel` or `pthread_exit` or `pthread_cleanup_pop(no zeror)`),使用 `arg` 參數呼叫 `routine` function。
* ```c
int pthread_mutex_init(pthread_mutex_t *restrict mutex;
int pthread_mutex_destroy(pthread_mutex_t *mutex);
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
```
`pthread_mutex_init` 以及 `pthread_mutex_destroy` 用來初始化與刪除互斥鎖 (mutex),`pthread_mutex_lock` 以及 `pthread_mutex_unlock` 分別對 mutex 上鎖與解鎖。
* ```c
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);
int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_condattr_init(pthread_condattr_t *attr);
int pthread_condattr_destroy(pthread_condattr_t *attr);
```
`pthread_cond_init` 以及 `pthread_cond_destroy` 用來初始化與刪除條件變數 (condition variable),而 `pthread_condattr_init` 與 `pthread_condattr_destroy` 則用來設置 condition variable 的屬性 `attr`。
* ```c
int pthread_cond_timedwait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex,
const struct timespec *restrict abstime);
int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);
int pthread_cond_broadcast(pthread_cond_t *cond);
```
`pthread_cond_timedwait` 與 `pthread_cond_wait` 將互斥鎖釋放,並阻塞住該執行緒,等待其它 process 送出信號觸發此 condition variable,在收到信號時返回並重新獲得互斥鎖。其中前者 `timedwait` 中的參數 `abstime` 表示可接受阻塞的最長時間。
`pthread_cond_broadcast` 則送出信號喚醒所有持有條件變數 `cond` 的執行緒。
:::info
* [restrict](https://en.wikipedia.org/wiki/Restrict) 為 C99 關鍵字用來幫助編譯器最佳化,表示此 pointer 不會有 alias,只會有此變數可以操控。
* [noreturn](https://en.cppreference.com/w/c/language/_Noreturn) 為 C11 增加的關鍵字,註明此函數不會回傳給 caller,而是透過 `exit()` 等方式跳出,如有回傳值則為 undefined behavior。
:::
#### 程式流程
透過 `tpool_create` 建立 thread pool,並將每個執行緒放入 `__threadpool *pool` 中。而 `jobqueue_t *jobqueue` 將每個待執行的 task,以 linked list 的方式連接,當有任務要執行時,從 `jobqueue` 中 pop 一個任務出來運行,每個任務在程式內是以 `threadtask_t` 結構封裝。
從 `tpool_create` 建立出來的執行緒,會運行 `jobqueue_fetch` 函式,在 `jobqueue` 為空時,每個 thread 會在 `pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock);` 內被 block 住,等待有新的 task 加入。透過 `tpool_apply` 將新的 task 加入到 `jobqueue` 中,如果此時 `jobqueue` 為空,則透過 `pthread_cond_broadcast(&jobqueue->cond_nonempty);` 送出信號給 `cond_nonempty` 條件變數,此時 `jobqueue_fetch` 內的其中一個執行緒中的 `pthread_cond_wait` 將會通過,並在下方取得 `jobqueue` 的 `tail` 任務。
執行完給定的 function 後,會將相關結果存放在 `struct __tpool_future` 中,`__tpool_future` 用來當作使用者可見的執行緒,透過 `tpool_future_get` 取得 thread 執行完的結果。如果 `tpool_future_get` 時任務尚未完成,將會被條件變數 `future->cond_finished` 給 block 住,等待任務執行完畢。
### timeout 機制與改進
現有程式使用 `pthread_cond_timedwait` 處理 thread 執行過久的問題,並將 `future->flag` 設置為 `__FUTURE_TIMEOUT` 表示該任務超時,但在 `tpool_join` 沒有相對應的處理。當有執行緒無法完成時,將會產生問題,如以下程式碼:
```c
static void *loop(void *arg)
{
while (1) ;
}
int main()
{
tpool_future_t future = tpool_apply(pool, loop, (void *)NULL);
void *result = tpool_future_get(future, 1 /* cannot set to zero */);
tpool_future_destroy(future);
free(result);
tpool_join(pool);
}
```
所以我們需要有方法在無法退出 function 時正確離開 thread。故我在 `jobqueue_fetch` 內添加以下程式:
```diff
static void *jobqueue_fetch(void *queue)
{
...
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state);
+ pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &old_state);
void *ret_value = task->func(task->arg);
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
...
}
```
將 thread 的 cancel type 改成 asynchronous,此時便不需要經過 cancellation point 也能直接退出執行緒。
在移除 `future` 時也要正確處理 timeout 時的情形:
```diff
int tpool_future_destroy(struct __tpool_future *future)
{
if (future) {
pthread_mutex_lock(&future->mutex);
if (future->flag & __FUTURE_FINISHED ||
- future->flag & __FUTURE_CANCELLED) {
+ future->flag & __FUTURE_CANCELLED ||
+ future->flag & __FUTURE_TIMEOUT) {
pthread_mutex_unlock(&future->mutex);
pthread_mutex_destroy(&future->mutex);
pthread_cond_destroy(&future->cond_finished);
free(future);
}
...
}
return 0;
}
```
最後在 join 的時候,將所有未完成的 task 所在的執行緒皆終止:
```diff
int tpool_join(struct __threadpool *pool)
{
size_t num_threads = pool->count;
for (int i = 0; i < num_threads; i++)
tpool_apply(pool, NULL, NULL);
+ for (int i = 0; i < num_threads; i++)
+ pthread_cancel(pool->workers[i]);
for (int i = 0; i < num_threads; i++)
pthread_join(pool->workers[i], NULL);
free(pool->workers);
jobqueue_destroy(pool->jobqueue);
free(pool);
return 0;
}
```
:::danger
我們在執行過程中不能知道哪些執行緒會無法退出,所以當有執行緒進入無窮迴圈時,該執行緒會一直被佔用,無法被重新利用,直到執行 `tpool_join` 後才能正確被釋放。
:::
### [`atomic_threadpool`](https://github.com/Taymindis/atomic_threadpool)
`atomic_threadppol` 使用 lock-free queue 來取代此程式內的 job queue。thread pool 的實現方式與此程式一樣,透過 `at_thpool_create` 建立足夠的 thread 使用,每個 thread 內部執行 `at_thpool_worker` 並等待 queue 中有任務產生 (相當於 `jobqueue_fetch`),利用 `at_thpool_newtask` 建立任務 (相當於 `tpool_apply`) 並使用 `lfqueue_enq` 將任務放入 queue 當中。`at_thpool_worker` 利用 `lfqueue_deq` 將待處理任務讀出,並執行對應的函式。處理完任務的節點 (`lfqueue_cas_node_t`) 會透過 `__lfq_recycle_free` 以及 `__lfq_check_free` 釋放其空間。唯一不同的是此實作的 thread pool 無法回傳值給使用者。
只要是跟 queue 有關的操作,都必須為 thread safe,防止多個執行緒同時對 queue 操作而造成錯誤,在 `atomic_threadpool` 中使用 gcc 的 [builtin atomic functions](https://gcc.gnu.org/onlinedocs/gcc/_005f_005fatomic-Builtins.html#g_t_005f_005fatomic-Builtins) 實現:
* 在 enqueue 的時候,有兩個操作需要保護
* 增加 queue 的 `size` 時,使用 `__sync_add_and_fetch()` 執行加法,便能保證每次更新的值都保持為最新狀態。
* 更新 queue 的 `tail` 時,使用 `__sync_bool_compare_and_swap` 讓 `tail->next` 對於同一個 `tail` 只更新一次,而 `lfqueue->tail` 則保持在最新值。
* 在 dequeue 的時候,有兩個操作需要保護
* 更改 queue 的 `head` 時,使用 `__sync_bool_compare_and_swap` 讓 `lfqueue->head` 保持在最新值。
* 當 queue 只有一個元素時,dequeue 也要更新 `lfqueue->tail`,一樣使用 `__sync_bool_compare_and_swap` 實現。
* 釋放空間時,也需要使用 `__LFQ_BOOL_COMPARE_AND_SWAP` 設置要移除的節點,才不會重複釋放同個記憶體,並使用 `__LFQ_BOOL_COMPARE_AND_SWAP` 指示現在是否為 `in_free_mode`,防止多個執行緒同時進行移除。
---
TODO
* ABA problem
* [Lock-free 程式設計議題](https://hackmd.io/@sysprog/lock-free-prog)
* [c11-queues](https://github.com/stv0g/c11-queues)
* [lfqueue](https://github.com/darkautism/lfqueue)
* [Hazard Pointers: Safe Memory Reclamation for Lock-Free Objects](http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.395.378&rep=rep1&type=pdf)
* [An Optimistic Approach to Lock-Free FIFO Queues](http://people.csail.mit.edu/shanir/publications/FIFO_Queues.pdf)
###### tags: `linux2021`