# 2021q1 Homework4 (quiz4)
contributed by < `YoLinTsai` >
###### tags: `linux2021-hw`
:::success
延伸問題:
- [x] 解釋上述程式碼運作原理,包含 timeout 處理機制,指出改進空間並實作
- [ ] 研讀 atomic_threadpool,指出其 atomic 操作的運用方式,並說明該 lock-free 的手法
- [ ] 嘗試使用 C11 Atomics 改寫上述程式碼,使其有更好的 scalability
:::
## 運作原理
### POSIX thread
#### [```pthread_create``` ](https://man7.org/linux/man-pages/man3/pthread_create.3.html)
```cpp
#include <pthread.h>
int pthread_create(pthread_t *restrict thread,
const pthread_attr_t *restrict attr,
void *(*start_routine)(void *),
void *restrict arg);
```
- 如果成功 create, ```thread``` 所指向的 buffer 會紀錄 thread ID,```attr``` 是開 thread 的參數,用 NULL 就是 default attributes。
- 執行緒將會運行 ```start_routine()``` ,並將 ```arg``` 餵給 ```start_routine()```。
#### [```pthread_cancel```](https://man7.org/linux/man-pages/man3/pthread_cancel.3.html)
```cpp
int pthread_cancel(pthread_t thread);
```
#### [```pthread_join```](https://man7.org/linux/man-pages/man3/pthread_join.3.html)
```cpp
int pthread_join(pthread_t thread, void **retval);
```
- 在 `tpool_create()` 中可以發現,一發現 `ptread_create` 失敗,立刻會把所有 thread 取消掉,他的手法是先做 `pthread_cancle` 接著 `pthread_join`,原因在 linux man page 中的描述:
:::success
After a canceled thread has terminated, a join with that thread using pthread_join(3) obtains PTHREAD_CANCELED as the thread's exit status.
**(Joining with a thread is the only way to know that cancellation has completed.)**
:::
#### [```pthread_cond_wait```](https://man7.org/linux/man-pages/man3/pthread_cond_wait.3p.html)
```cpp
int pthread_cond_timedwait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex,
const struct timespec *restrict abstime);
int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);
```
- 我們在進行 thread 時,有時後會希望某些條件達成後,在繼續進行,而 `pthread_cond_wait` 便能達成這個情形,呼叫了之後 thread 會停在該處,直到被喚醒。
- 喚醒的方式有兩個 `pthread_cond_signal()` 和 `pthread_cond_broadcast()`,差別如同函式名稱一樣,前者一次只會喚醒其中一個,後者全部在等待該 condition variable 的通通都會喚醒。
- 而判斷的條件需要用 mutex 去控制,如下:
Thread A
```cpp
mutex_lock(&done_lock);
is_done = true;
pthread_cond_signal(&done_cond);
mutex_unlock(&done_lock);
```
Thread B
```cpp
mutex_lock(&done_lock);
if(!is_done){
pthread_cond_wait(&done_cond, &done_lock);
}
mutex_unlock(&done_lock);
```
- 為何要搭配 lock 使用呢? 我的理解避免判斷到一半的時候 `is_done` 又被更動,直到進入 `pthread_cond_wait` 才把 lock 放掉,一旦被喚醒後又重新把 lock 拿回來。
#### [```pthread_cleanup_push```](https://man7.org/linux/man-pages/man3/pthread_cleanup_push.3.html)
#### [```pthread_cleanup_pop```](https://man7.org/linux/man-pages/man3/pthread_cleanup_push.3.html)
```cpp
void pthread_cleanup_push(void (*routine)(void *), void *arg);
void pthread_cleanup_pop(int execute);
```
- 這兩個 function 是成雙成對出現的,`pthread_cleanup_up`的目的在於避免 thread 結束時沒有正常收回資源,特別是 lock。
#### [```pthread_setcancelstate```](https://www.man7.org/linux/man-pages/man3/pthread_setcanceltype.3.html)
```cpp
int pthread_setcancelstate(int state, int *oldstate);
```
顧名思義這兩個 function 是用來控制 thread 取消的行為,在 `pthread_setcancelstate` 中,一共有兩種 state 可以選擇:
1. `PTHREAD_CANCEL_ENABLE`
2. `PTHREAD_CANCEL_DISABLE`
其中 `DISABLE` 在 thread 接收到 cancel 的 signal 時,不會立即取消,而是會等到 state 被改回 `ENABLE` 時才會取消。
不過這裡的 `cancel` 之所以這樣設計,就是因為他的取消並**不意味馬上結束**,而是在 thread 正式結束前還有待處理的事項,取消動作依序包含:
1. pop 每個 cancellation clean-up handlers 並執行。(見 pthread_cleanup_push())
2. 呼叫 thread-specific data destructors (見 pthread_key_create())
3. thread 結束 (見 pthread_exit())
## 程式架構
- quiz4 中的 thread pool 是用 linked list 的資料結構去實現,完整的架構圖如下:

### tpool_create()
- `tpool_create` 創建並初始化 `__threadpool` 和 `__jobqueue` ,同時確認是否有正常初始化。
- `pthread_create` 如果 create 失敗,返回值不為0,接著連續使用 `pthread_cancel` 和 `pthread_join` 去結束所有 thread。
```cpp
struct __threadpool *tpool_create(size_t count)
{
jobqueue_t *jobqueue = jobqueue_create();
struct __threadpool *pool = malloc(sizeof(struct __threadpool));
if (!jobqueue || !pool) {
if (jobqueue)
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
pool->count = count, pool->jobqueue = jobqueue;
if ((pool->workers = malloc(count * sizeof(pthread_t)))) {
for (int i = 0; i < count; i++) {
if (pthread_create(&pool->workers[i], NULL, jobqueue_fetch,
(void *) jobqueue)) {
for (int j = 0; j < i; j++)
pthread_cancel(pool->workers[j]);
for (int j = 0; j < i; j++)
pthread_join(pool->workers[j], NULL);
free(pool->workers);
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
}
return pool;
}
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
```
### jobqueue_create()
- 初始化 jobqueue,注意 jobqueue 在這邊是用 linked list 去實作,因此把 tail 和 head 都先設定成 NULL ,同時把 jobqueue 的 attribute `cond_nonempty` 和 `rwlock` 初始化。(透過 `pthread_cond_init` 和 `pthread_mutex_init`)
```cpp
static jobqueue_t *jobqueue_create(void)
{
jobqueue_t *jobqueue = malloc(sizeof(jobqueue_t));
if (jobqueue) {
jobqueue->head = jobqueue->tail = NULL;
pthread_cond_init(&jobqueue->cond_nonempty, NULL);
pthread_mutex_init(&jobqueue->rwlock, NULL);
}
return jobqueue;
}
```
- 接著我們看 `tpool_create` 中一個關鍵的部分:
```cpp
pthread_create(&pool->workers[i], NULL, jobqueue_fetch, (void *) jobqueue)
```
- 這段的意思根據前面的資料,我們將 `jobqueue_fetch(void *jobqueue)` 指定給 `pool->workers[i]` 運行,接著來解析 `jobqueue_fetch()`
- 首先來概述 `jobqueue_fetch()` 的運作邏輯,當我們創建完成 threadpool 和 jobqueue 後,開了複數個 thread 去運行 jobqueue_fetch(),一旦等到 jobqueue 有 threadtask 被塞到 linked list 中,便嘗試執行 threadtask 中的 function
```cpp
static void *jobqueue_fetch(void *queue)
{
jobqueue_t *jobqueue = (jobqueue_t *) queue;
threadtask_t *task;
int old_state;
pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock);
while (1) {
pthread_mutex_lock(&jobqueue->rwlock);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state);
pthread_testcancel();
//GGG
while (!jobqueue->tail){
pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock)
}
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
if (jobqueue->head == jobqueue->tail) {
task = jobqueue->tail;
jobqueue->head = jobqueue->tail = NULL;
} else {
threadtask_t *tmp;
for (tmp = jobqueue->head; tmp->next != jobqueue->tail;
tmp = tmp->next)
;
task = tmp->next;
tmp->next = NULL;
jobqueue->tail = tmp;
}
pthread_mutex_unlock(&jobqueue->rwlock);
if (task->func) {
pthread_mutex_lock(&task->future->mutex);
if (task->future->flag & __FUTURE_CANCELLED) {
pthread_mutex_unlock(&task->future->mutex);
free(task);
continue;
} else {
task->future->flag |= __FUTURE_RUNNING;
pthread_mutex_unlock(&task->future->mutex);
}
void *ret_value = task->func(task->arg);
pthread_mutex_lock(&task->future->mutex);
if (task->future->flag & __FUTURE_DESTROYED) {
pthread_mutex_unlock(&task->future->mutex);
pthread_mutex_destroy(&task->future->mutex);
pthread_cond_destroy(&task->future->cond_finished);
free(task->future);
} else {
task->future->flag |= __FUTURE_FINISHED; //KKK
task->future->result = ret_value;
//LLL
pthread_cond_broadcast(&task->future->cond_finished);
pthread_mutex_unlock(&task->future->mutex);
}
free(task);
} else {
pthread_mutex_destroy(&task->future->mutex);
pthread_cond_destroy(&task->future->cond_finished);
free(task->future);
free(task);
break;
}
}
pthread_cleanup_pop(0);
pthread_exit(NULL);
}
```
### tpool_apply()
- `tpool_apply` 會創建 `treadtask_t` 並新增到 pool->jobqueue 當中,特別注意 `HHH` 的部分呼應了 `jobqueue_fetch()` 的設計,當 linked_list 中從沒有 task 到出現第一個 task 時,會呼叫 `pthread_cond_broadcast(&jobqueue->cond_nonempty);` 喚醒待命中的 thread 去 fetch task 來執行。
```cpp
struct __tpool_future *tpool_apply(struct __threadpool *pool,
void *(*func)(void *),
void *arg)
{
jobqueue_t *jobqueue = pool->jobqueue;
threadtask_t *new_head = malloc(sizeof(threadtask_t));
struct __tpool_future *future = tpool_future_create();
if (new_head && future) {
new_head->func = func, new_head->arg = arg, new_head->future = future;
pthread_mutex_lock(&jobqueue->rwlock);
if (jobqueue->head) {
new_head->next = jobqueue->head;
jobqueue->head = new_head;
} else {
jobqueue->head = jobqueue->tail = new_head;
//HHH
pthread_cond_broadcast(&jobqueue->cond_nonempty);
}
pthread_mutex_unlock(&jobqueue->rwlock);
} else if (new_head) {
free(new_head);
return NULL;
} else if (future) {
tpool_future_destroy(future);
return NULL;
}
return future;
}
```
### tpool_future_get()
- 這裡提供兩種模式,當 `seconds` 值為零時,便是 blocking wait,等到該 thread 執行完成才會繼續執行
- 若 `seconds` 有值,便會用 `pthread_cond_timeout` 來等待 `cond` 的 signal ,如果等待時間超過 seconds,便將 `future->flag` 設成 `__FUTURE_TIMEOUT` 並回傳 NULL
- [CLOCK_MONOTONIC](https://man7.org/linux/man-pages/man2/clock_getres.2.html) 根據 man page 的解釋, monotonic 保證計時器只曾不減,適合拿來當作 timeout 的計時器
:::info
All CLOCK_MONOTONIC variants guarantee that the time returned by consecutive calls will not go backwards, but successive calls may—depending on the architecture—return identical (not-increased) time values.
:::
```cpp
void *tpool_future_get(struct __tpool_future *future, unsigned int seconds)
{
pthread_mutex_lock(&future->mutex);
/* turn off the timeout bit set previously */
future->flag &= ~__FUTURE_TIMEOUT;
while ((future->flag & __FUTURE_FINISHED) == 0) {
if (seconds) {
struct timespec expire_time;
clock_gettime(CLOCK_MONOTONIC, &expire_time);
expire_time.tv_sec += seconds;
int status = pthread_cond_timedwait(&future->cond_finished,
&future->mutex, &expire_time);
if (status == ETIMEDOUT) {
future->flag |= __FUTURE_TIMEOUT;
pthread_mutex_unlock(&future->mutex);
return NULL;
}
} else
//FFF
pthread_cond_wait(&future->cond_finished, &future->mutex);
}
pthread_mutex_unlock(&future->mutex);
return future->result;
}
```
- if / while 和 pthread_cond_wait 搭配的差別?
:::success
當我們用 `pthread_cond_wait` 阻塞 thread 時,可能有不只一個 thread 在等待該 `cond` 通過,用 while 來反覆監控會是一個比較合理的做法
:::
## 指出改進空間並實作
誠實的面對自己,目前自己沒有發現可以改進的空間,對程式碼優化的敏感度還不足
## atomic_threadpool
TODO