# 2021q1 Homework4 (quiz4) contribute by <`cyrong`> > [quiz4](https://hackmd.io/@sysprog/linux2021-quiz4) ## 解釋程式運作原理 ### struct #### `threadtask_t` ```c typedef struct __threadtask { void *(*func)(void *); void *arg; struct __tpool_future *future; struct __threadtask *next; } threadtask_t; ``` `threadtask_t` 代表的是要作的任務,其中的成員有 - `func` : 要執行的 function - `arg` : function 所需要的 argument - `future` : 這個 task 也就是 job 的 future 狀態 - `next` : 連接的下一個 job #### `jobqueue_t` ```c typedef struct __jobqueue { threadtask_t *head, *tail; pthread_cond_t cond_nonempty; pthread_mutex_t rwlock; } jobqueue_t; ``` `jobqueue_t` 代表等待要進入 thread pool 的 job queue - `head` `tail` : queue 的 head tail - `cond_nonempty` : `tpool_create` 後要先等待 job 進來,有 job 進入後會發出信號,讓 thread pool 開始運作 - `rwlock` : 保證 jobqueue thread safety 的 mutex #### `__tpool_future` ```c enum __future_flags { __FUTURE_RUNNING = 01, __FUTURE_FINISHED = 02, __FUTURE_TIMEOUT = 04, __FUTURE_CANCELLED = 010, __FUTURE_DESTROYED = 020, }; struct __tpool_future { int flag; void *result; pthread_mutex_t mutex; pthread_cond_t cond_finished; }; ``` `__tpool_future` 代表 job 運作的未來狀態 - `flag` : 代表這個 job 狀態,可能為上面 enum 表中的狀態 - `result` : job function return 值 - `mutex` : job function 的 mutex - `cond_finished` : job finished 的 condtion variable `__threadpool` ```c struct __threadpool { size_t count; pthread_t *workers; jobqueue_t *jobqueue; }; ``` `__threadpool` thread pool 本體 - `count` : 代表有幾個 thread 在裡面 - `workers` : `pthread_t` 所需配置空間的指標 - `jobqueue` : 這個 thread pool 對應的 jobqueue ### function #### `__tpool_future *tpool_future_create` 配置一個 `future` object 的記憶體 #### `tpool_future_destroy` 將 `future->flag` 設定成 `__FUTURE_DESTROYED` 如果 `future->flag` 已經是 `__FUTURE_FINISHED` 或是 `__FUTURE_CANCELLED` 就釋放記憶體 #### `tpool_future_get` ```c void *tpool_future_get(struct __tpool_future *future, unsigned int seconds) { pthread_mutex_lock(&future->mutex); /* turn off the timeout bit set previously */ future->flag &= ~__FUTURE_TIMEOUT; while ((future->flag & __FUTURE_FINISHED) == 0) { if (seconds) { struct timespec expire_time; clock_gettime(CLOCK_MONOTONIC, &expire_time); expire_time.tv_sec += seconds; int status = pthread_cond_timedwait(&future->cond_finished, &future->mutex, &expire_time); if (status == ETIMEDOUT) { future->flag |= __FUTURE_TIMEOUT; pthread_mutex_unlock(&future->mutex); return NULL; } } else pthread_cond_wait(&future->cond_finished, &future->mutex);//FFF; } pthread_mutex_unlock(&future->mutex); return future->result; } ``` 其中有 timeout 檢查機制,透過設定 `seconds` 可以實作 如果 `seconds` 設定為 0 就單純等待 `future->cond_finished` 如果沒有 timeout 就回傳 job `result` #### `jobqueue_create` 配置新 `jobqueue_t` 記憶體 #### `jobqueue_destroy` ```c static void jobqueue_destroy(jobqueue_t *jobqueue) { threadtask_t *tmp = jobqueue->head; while (tmp) { jobqueue->head = jobqueue->head->next; pthread_mutex_lock(&tmp->future->mutex); if (tmp->future->flag & __FUTURE_DESTROYED) { pthread_mutex_unlock(&tmp->future->mutex); pthread_mutex_destroy(&tmp->future->mutex); pthread_cond_destroy(&tmp->future->cond_finished); free(tmp->future); } else { tmp->future->flag |= __FUTURE_CANCELLED; pthread_mutex_unlock(&tmp->future->mutex); } free(tmp); tmp = jobqueue->head; } pthread_mutex_destroy(&jobqueue->rwlock); pthread_cond_destroy(&jobqueue->cond_nonempty); free(jobqueue); } ``` 把整個 job queue destroy 如果 job `future->flag` 已經是 `__FUTURE_DESTROYED` 就直接釋放記憶體 否則將 `future->flag` 設定為 `__FUTURE_CAMCELLED` #### `__jobqueue_fetch_cleanup` ```c static void __jobqueue_fetch_cleanup(void *arg) { pthread_mutex_t *mutex = (pthread_mutex_t *) arg; pthread_mutex_unlock(mutex); } ``` 此為之後 `jobqueue_fetch` 中 `pthread_cleanup_push` 所需要用到的 routine #### `jobqueue_fetch` ```c static void *jobqueue_fetch(void *queue) { jobqueue_t *jobqueue = (jobqueue_t *) queue; threadtask_t *task; int old_state; pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock); while (1) { pthread_mutex_lock(&jobqueue->rwlock); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state); pthread_testcancel(); while (!jobqueue->tail) pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock);//GGG; pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state); if (jobqueue->head == jobqueue->tail) { task = jobqueue->tail; jobqueue->head = jobqueue->tail = NULL; } else { threadtask_t *tmp; for (tmp = jobqueue->head; tmp->next != jobqueue->tail; tmp = tmp->next) ; task = tmp->next; tmp->next = NULL; jobqueue->tail = tmp; } pthread_mutex_unlock(&jobqueue->rwlock); if (task->func) { pthread_mutex_lock(&task->future->mutex); if (task->future->flag & __FUTURE_CANCELLED) { pthread_mutex_unlock(&task->future->mutex); free(task); continue; } else { task->future->flag |= __FUTURE_RUNNING; pthread_mutex_unlock(&task->future->mutex); } void *ret_value = task->func(task->arg); pthread_mutex_lock(&task->future->mutex); if (task->future->flag & __FUTURE_DESTROYED) { pthread_mutex_unlock(&task->future->mutex); pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); } else { task->future->flag |= __FUTURE_FINISHED;//KKK; task->future->result = ret_value; pthread_cond_broadcast(&task->future->cond_finished);//LLL; pthread_mutex_unlock(&task->future->mutex); } free(task); } else { pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); free(task); break; } } pthread_cleanup_pop(0); pthread_exit(NULL); } ``` `jobqueue_fetch` 為 thread pool 中的 thread 從 job queue 中提取 job 的 function 是整個 thread pool 運作核心 一開始進入 while 後會先設定 cancel point 對應 wait nonemtpy 在這之後 worker 從 jobqueue 中提取 job 接著判斷是否有 cancel 或是 destroy 的 flag 否則就是執行 job 中的 function 做完後繼續提取下一個 job 直到沒有 job 結束前會呼叫一開始設定的 clean up routine #### `tpool_create` 配置 `__thread_pool` 所需要的記憶體 #### `tpool_apply` 將 job 添加到 thread pool 中的 job queue 中 並回傳這個 job 的 future object #### `tpool_join` 將 thread pool 中的 worker 作 join 並且釋放記憶體 ## timeout 機制