# 2021q1 Homework4 (quiz4) contributed by < `YoLinTsai` > ###### tags: `linux2021-hw` :::success 延伸問題: - [x] 解釋上述程式碼運作原理,包含 timeout 處理機制,指出改進空間並實作 - [ ] 研讀 atomic_threadpool,指出其 atomic 操作的運用方式,並說明該 lock-free 的手法 - [ ] 嘗試使用 C11 Atomics 改寫上述程式碼,使其有更好的 scalability ::: ## 運作原理 ### POSIX thread #### [```pthread_create``` ](https://man7.org/linux/man-pages/man3/pthread_create.3.html) ```cpp #include <pthread.h> int pthread_create(pthread_t *restrict thread, const pthread_attr_t *restrict attr, void *(*start_routine)(void *), void *restrict arg); ``` - 如果成功 create, ```thread``` 所指向的 buffer 會紀錄 thread ID,```attr``` 是開 thread 的參數,用 NULL 就是 default attributes。 - 執行緒將會運行 ```start_routine()``` ,並將 ```arg``` 餵給 ```start_routine()```。 #### [```pthread_cancel```](https://man7.org/linux/man-pages/man3/pthread_cancel.3.html) ```cpp int pthread_cancel(pthread_t thread); ``` #### [```pthread_join```](https://man7.org/linux/man-pages/man3/pthread_join.3.html) ```cpp int pthread_join(pthread_t thread, void **retval); ``` - 在 `tpool_create()` 中可以發現,一發現 `ptread_create` 失敗,立刻會把所有 thread 取消掉,他的手法是先做 `pthread_cancle` 接著 `pthread_join`,原因在 linux man page 中的描述: :::success After a canceled thread has terminated, a join with that thread using pthread_join(3) obtains PTHREAD_CANCELED as the thread's exit status. **(Joining with a thread is the only way to know that cancellation has completed.)** ::: #### [```pthread_cond_wait```](https://man7.org/linux/man-pages/man3/pthread_cond_wait.3p.html) ```cpp int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime); int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex); ``` - 我們在進行 thread 時,有時後會希望某些條件達成後,在繼續進行,而 `pthread_cond_wait` 便能達成這個情形,呼叫了之後 thread 會停在該處,直到被喚醒。 - 喚醒的方式有兩個 `pthread_cond_signal()` 和 `pthread_cond_broadcast()`,差別如同函式名稱一樣,前者一次只會喚醒其中一個,後者全部在等待該 condition variable 的通通都會喚醒。 - 而判斷的條件需要用 mutex 去控制,如下: Thread A ```cpp mutex_lock(&done_lock); is_done = true; pthread_cond_signal(&done_cond); mutex_unlock(&done_lock); ``` Thread B ```cpp mutex_lock(&done_lock); if(!is_done){ pthread_cond_wait(&done_cond, &done_lock); } mutex_unlock(&done_lock); ``` - 為何要搭配 lock 使用呢? 我的理解避免判斷到一半的時候 `is_done` 又被更動,直到進入 `pthread_cond_wait` 才把 lock 放掉,一旦被喚醒後又重新把 lock 拿回來。 #### [```pthread_cleanup_push```](https://man7.org/linux/man-pages/man3/pthread_cleanup_push.3.html) #### [```pthread_cleanup_pop```](https://man7.org/linux/man-pages/man3/pthread_cleanup_push.3.html) ```cpp void pthread_cleanup_push(void (*routine)(void *), void *arg); void pthread_cleanup_pop(int execute); ``` - 這兩個 function 是成雙成對出現的,`pthread_cleanup_up`的目的在於避免 thread 結束時沒有正常收回資源,特別是 lock。 #### [```pthread_setcancelstate```](https://www.man7.org/linux/man-pages/man3/pthread_setcanceltype.3.html) ```cpp int pthread_setcancelstate(int state, int *oldstate); ``` 顧名思義這兩個 function 是用來控制 thread 取消的行為,在 `pthread_setcancelstate` 中,一共有兩種 state 可以選擇: 1. `PTHREAD_CANCEL_ENABLE` 2. `PTHREAD_CANCEL_DISABLE` 其中 `DISABLE` 在 thread 接收到 cancel 的 signal 時,不會立即取消,而是會等到 state 被改回 `ENABLE` 時才會取消。 不過這裡的 `cancel` 之所以這樣設計,就是因為他的取消並**不意味馬上結束**,而是在 thread 正式結束前還有待處理的事項,取消動作依序包含: 1. pop 每個 cancellation clean-up handlers 並執行。(見 pthread_cleanup_push()) 2. 呼叫 thread-specific data destructors (見 pthread_key_create()) 3. thread 結束 (見 pthread_exit()) ## 程式架構 - quiz4 中的 thread pool 是用 linked list 的資料結構去實現,完整的架構圖如下: ![](https://i.imgur.com/jm5NpJs.jpg) ### tpool_create() - `tpool_create` 創建並初始化 `__threadpool` 和 `__jobqueue` ,同時確認是否有正常初始化。 - `pthread_create` 如果 create 失敗,返回值不為0,接著連續使用 `pthread_cancel` 和 `pthread_join` 去結束所有 thread。 ```cpp struct __threadpool *tpool_create(size_t count) { jobqueue_t *jobqueue = jobqueue_create(); struct __threadpool *pool = malloc(sizeof(struct __threadpool)); if (!jobqueue || !pool) { if (jobqueue) jobqueue_destroy(jobqueue); free(pool); return NULL; } pool->count = count, pool->jobqueue = jobqueue; if ((pool->workers = malloc(count * sizeof(pthread_t)))) { for (int i = 0; i < count; i++) { if (pthread_create(&pool->workers[i], NULL, jobqueue_fetch, (void *) jobqueue)) { for (int j = 0; j < i; j++) pthread_cancel(pool->workers[j]); for (int j = 0; j < i; j++) pthread_join(pool->workers[j], NULL); free(pool->workers); jobqueue_destroy(jobqueue); free(pool); return NULL; } } return pool; } jobqueue_destroy(jobqueue); free(pool); return NULL; } ``` ### jobqueue_create() - 初始化 jobqueue,注意 jobqueue 在這邊是用 linked list 去實作,因此把 tail 和 head 都先設定成 NULL ,同時把 jobqueue 的 attribute `cond_nonempty` 和 `rwlock` 初始化。(透過 `pthread_cond_init` 和 `pthread_mutex_init`) ```cpp static jobqueue_t *jobqueue_create(void) { jobqueue_t *jobqueue = malloc(sizeof(jobqueue_t)); if (jobqueue) { jobqueue->head = jobqueue->tail = NULL; pthread_cond_init(&jobqueue->cond_nonempty, NULL); pthread_mutex_init(&jobqueue->rwlock, NULL); } return jobqueue; } ``` - 接著我們看 `tpool_create` 中一個關鍵的部分: ```cpp pthread_create(&pool->workers[i], NULL, jobqueue_fetch, (void *) jobqueue) ``` - 這段的意思根據前面的資料,我們將 `jobqueue_fetch(void *jobqueue)` 指定給 `pool->workers[i]` 運行,接著來解析 `jobqueue_fetch()` - 首先來概述 `jobqueue_fetch()` 的運作邏輯,當我們創建完成 threadpool 和 jobqueue 後,開了複數個 thread 去運行 jobqueue_fetch(),一旦等到 jobqueue 有 threadtask 被塞到 linked list 中,便嘗試執行 threadtask 中的 function ```cpp static void *jobqueue_fetch(void *queue) { jobqueue_t *jobqueue = (jobqueue_t *) queue; threadtask_t *task; int old_state; pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock); while (1) { pthread_mutex_lock(&jobqueue->rwlock); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state); pthread_testcancel(); //GGG while (!jobqueue->tail){ pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock) } pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state); if (jobqueue->head == jobqueue->tail) { task = jobqueue->tail; jobqueue->head = jobqueue->tail = NULL; } else { threadtask_t *tmp; for (tmp = jobqueue->head; tmp->next != jobqueue->tail; tmp = tmp->next) ; task = tmp->next; tmp->next = NULL; jobqueue->tail = tmp; } pthread_mutex_unlock(&jobqueue->rwlock); if (task->func) { pthread_mutex_lock(&task->future->mutex); if (task->future->flag & __FUTURE_CANCELLED) { pthread_mutex_unlock(&task->future->mutex); free(task); continue; } else { task->future->flag |= __FUTURE_RUNNING; pthread_mutex_unlock(&task->future->mutex); } void *ret_value = task->func(task->arg); pthread_mutex_lock(&task->future->mutex); if (task->future->flag & __FUTURE_DESTROYED) { pthread_mutex_unlock(&task->future->mutex); pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); } else { task->future->flag |= __FUTURE_FINISHED; //KKK task->future->result = ret_value; //LLL pthread_cond_broadcast(&task->future->cond_finished); pthread_mutex_unlock(&task->future->mutex); } free(task); } else { pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); free(task); break; } } pthread_cleanup_pop(0); pthread_exit(NULL); } ``` ### tpool_apply() - `tpool_apply` 會創建 `treadtask_t` 並新增到 pool->jobqueue 當中,特別注意 `HHH` 的部分呼應了 `jobqueue_fetch()` 的設計,當 linked_list 中從沒有 task 到出現第一個 task 時,會呼叫 `pthread_cond_broadcast(&jobqueue->cond_nonempty);` 喚醒待命中的 thread 去 fetch task 來執行。 ```cpp struct __tpool_future *tpool_apply(struct __threadpool *pool, void *(*func)(void *), void *arg) { jobqueue_t *jobqueue = pool->jobqueue; threadtask_t *new_head = malloc(sizeof(threadtask_t)); struct __tpool_future *future = tpool_future_create(); if (new_head && future) { new_head->func = func, new_head->arg = arg, new_head->future = future; pthread_mutex_lock(&jobqueue->rwlock); if (jobqueue->head) { new_head->next = jobqueue->head; jobqueue->head = new_head; } else { jobqueue->head = jobqueue->tail = new_head; //HHH pthread_cond_broadcast(&jobqueue->cond_nonempty); } pthread_mutex_unlock(&jobqueue->rwlock); } else if (new_head) { free(new_head); return NULL; } else if (future) { tpool_future_destroy(future); return NULL; } return future; } ``` ### tpool_future_get() - 這裡提供兩種模式,當 `seconds` 值為零時,便是 blocking wait,等到該 thread 執行完成才會繼續執行 - 若 `seconds` 有值,便會用 `pthread_cond_timeout` 來等待 `cond` 的 signal ,如果等待時間超過 seconds,便將 `future->flag` 設成 `__FUTURE_TIMEOUT` 並回傳 NULL - [CLOCK_MONOTONIC](https://man7.org/linux/man-pages/man2/clock_getres.2.html) 根據 man page 的解釋, monotonic 保證計時器只曾不減,適合拿來當作 timeout 的計時器 :::info All CLOCK_MONOTONIC variants guarantee that the time returned by consecutive calls will not go backwards, but successive calls may—depending on the architecture—return identical (not-increased) time values. ::: ```cpp void *tpool_future_get(struct __tpool_future *future, unsigned int seconds) { pthread_mutex_lock(&future->mutex); /* turn off the timeout bit set previously */ future->flag &= ~__FUTURE_TIMEOUT; while ((future->flag & __FUTURE_FINISHED) == 0) { if (seconds) { struct timespec expire_time; clock_gettime(CLOCK_MONOTONIC, &expire_time); expire_time.tv_sec += seconds; int status = pthread_cond_timedwait(&future->cond_finished, &future->mutex, &expire_time); if (status == ETIMEDOUT) { future->flag |= __FUTURE_TIMEOUT; pthread_mutex_unlock(&future->mutex); return NULL; } } else //FFF pthread_cond_wait(&future->cond_finished, &future->mutex); } pthread_mutex_unlock(&future->mutex); return future->result; } ``` - if / while 和 pthread_cond_wait 搭配的差別? :::success 當我們用 `pthread_cond_wait` 阻塞 thread 時,可能有不只一個 thread 在等待該 `cond` 通過,用 while 來反覆監控會是一個比較合理的做法 ::: ## 指出改進空間並實作 誠實的面對自己,目前自己沒有發現可以改進的空間,對程式碼優化的敏感度還不足 ## atomic_threadpool TODO