# 2021q1 Homework4 (quiz4)
contribute by <`cyrong`>
> [quiz4](https://hackmd.io/@sysprog/linux2021-quiz4)
## 解釋程式運作原理
### struct
#### `threadtask_t`
```c
typedef struct __threadtask {
void *(*func)(void *);
void *arg;
struct __tpool_future *future;
struct __threadtask *next;
} threadtask_t;
```
`threadtask_t` 代表的是要作的任務,其中的成員有
- `func` : 要執行的 function
- `arg` : function 所需要的 argument
- `future` : 這個 task 也就是 job 的 future 狀態
- `next` : 連接的下一個 job
#### `jobqueue_t`
```c
typedef struct __jobqueue {
threadtask_t *head, *tail;
pthread_cond_t cond_nonempty;
pthread_mutex_t rwlock;
} jobqueue_t;
```
`jobqueue_t` 代表等待要進入 thread pool 的 job queue
- `head` `tail` : queue 的 head tail
- `cond_nonempty` : `tpool_create` 後要先等待 job 進來,有 job 進入後會發出信號,讓 thread pool 開始運作
- `rwlock` : 保證 jobqueue thread safety 的 mutex
#### `__tpool_future`
```c
enum __future_flags {
__FUTURE_RUNNING = 01,
__FUTURE_FINISHED = 02,
__FUTURE_TIMEOUT = 04,
__FUTURE_CANCELLED = 010,
__FUTURE_DESTROYED = 020,
};
struct __tpool_future {
int flag;
void *result;
pthread_mutex_t mutex;
pthread_cond_t cond_finished;
};
```
`__tpool_future` 代表 job 運作的未來狀態
- `flag` : 代表這個 job 狀態,可能為上面 enum 表中的狀態
- `result` : job function return 值
- `mutex` : job function 的 mutex
- `cond_finished` : job finished 的 condtion variable
`__threadpool`
```c
struct __threadpool {
size_t count;
pthread_t *workers;
jobqueue_t *jobqueue;
};
```
`__threadpool` thread pool 本體
- `count` : 代表有幾個 thread 在裡面
- `workers` : `pthread_t` 所需配置空間的指標
- `jobqueue` : 這個 thread pool 對應的 jobqueue
### function
#### `__tpool_future *tpool_future_create`
配置一個 `future` object 的記憶體
#### `tpool_future_destroy`
將 `future->flag` 設定成 `__FUTURE_DESTROYED`
如果 `future->flag` 已經是 `__FUTURE_FINISHED` 或是 `__FUTURE_CANCELLED` 就釋放記憶體
#### `tpool_future_get`
```c
void *tpool_future_get(struct __tpool_future *future, unsigned int seconds)
{
pthread_mutex_lock(&future->mutex);
/* turn off the timeout bit set previously */
future->flag &= ~__FUTURE_TIMEOUT;
while ((future->flag & __FUTURE_FINISHED) == 0) {
if (seconds) {
struct timespec expire_time;
clock_gettime(CLOCK_MONOTONIC, &expire_time);
expire_time.tv_sec += seconds;
int status = pthread_cond_timedwait(&future->cond_finished,
&future->mutex, &expire_time);
if (status == ETIMEDOUT) {
future->flag |= __FUTURE_TIMEOUT;
pthread_mutex_unlock(&future->mutex);
return NULL;
}
} else
pthread_cond_wait(&future->cond_finished, &future->mutex);//FFF;
}
pthread_mutex_unlock(&future->mutex);
return future->result;
}
```
其中有 timeout 檢查機制,透過設定 `seconds` 可以實作
如果 `seconds` 設定為 0 就單純等待 `future->cond_finished`
如果沒有 timeout 就回傳 job `result`
#### `jobqueue_create`
配置新 `jobqueue_t` 記憶體
#### `jobqueue_destroy`
```c
static void jobqueue_destroy(jobqueue_t *jobqueue)
{
threadtask_t *tmp = jobqueue->head;
while (tmp) {
jobqueue->head = jobqueue->head->next;
pthread_mutex_lock(&tmp->future->mutex);
if (tmp->future->flag & __FUTURE_DESTROYED) {
pthread_mutex_unlock(&tmp->future->mutex);
pthread_mutex_destroy(&tmp->future->mutex);
pthread_cond_destroy(&tmp->future->cond_finished);
free(tmp->future);
} else {
tmp->future->flag |= __FUTURE_CANCELLED;
pthread_mutex_unlock(&tmp->future->mutex);
}
free(tmp);
tmp = jobqueue->head;
}
pthread_mutex_destroy(&jobqueue->rwlock);
pthread_cond_destroy(&jobqueue->cond_nonempty);
free(jobqueue);
}
```
把整個 job queue destroy
如果 job `future->flag` 已經是 `__FUTURE_DESTROYED` 就直接釋放記憶體
否則將 `future->flag` 設定為 `__FUTURE_CAMCELLED`
#### `__jobqueue_fetch_cleanup`
```c
static void __jobqueue_fetch_cleanup(void *arg)
{
pthread_mutex_t *mutex = (pthread_mutex_t *) arg;
pthread_mutex_unlock(mutex);
}
```
此為之後 `jobqueue_fetch` 中 `pthread_cleanup_push` 所需要用到的 routine
#### `jobqueue_fetch`
```c
static void *jobqueue_fetch(void *queue)
{
jobqueue_t *jobqueue = (jobqueue_t *) queue;
threadtask_t *task;
int old_state;
pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock);
while (1) {
pthread_mutex_lock(&jobqueue->rwlock);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state);
pthread_testcancel();
while (!jobqueue->tail) pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock);//GGG;
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
if (jobqueue->head == jobqueue->tail) {
task = jobqueue->tail;
jobqueue->head = jobqueue->tail = NULL;
} else {
threadtask_t *tmp;
for (tmp = jobqueue->head; tmp->next != jobqueue->tail;
tmp = tmp->next)
;
task = tmp->next;
tmp->next = NULL;
jobqueue->tail = tmp;
}
pthread_mutex_unlock(&jobqueue->rwlock);
if (task->func) {
pthread_mutex_lock(&task->future->mutex);
if (task->future->flag & __FUTURE_CANCELLED) {
pthread_mutex_unlock(&task->future->mutex);
free(task);
continue;
} else {
task->future->flag |= __FUTURE_RUNNING;
pthread_mutex_unlock(&task->future->mutex);
}
void *ret_value = task->func(task->arg);
pthread_mutex_lock(&task->future->mutex);
if (task->future->flag & __FUTURE_DESTROYED) {
pthread_mutex_unlock(&task->future->mutex);
pthread_mutex_destroy(&task->future->mutex);
pthread_cond_destroy(&task->future->cond_finished);
free(task->future);
} else {
task->future->flag |= __FUTURE_FINISHED;//KKK;
task->future->result = ret_value;
pthread_cond_broadcast(&task->future->cond_finished);//LLL;
pthread_mutex_unlock(&task->future->mutex);
}
free(task);
} else {
pthread_mutex_destroy(&task->future->mutex);
pthread_cond_destroy(&task->future->cond_finished);
free(task->future);
free(task);
break;
}
}
pthread_cleanup_pop(0);
pthread_exit(NULL);
}
```
`jobqueue_fetch` 為 thread pool 中的 thread 從 job queue 中提取 job 的 function
是整個 thread pool 運作核心
一開始進入 while 後會先設定 cancel point 對應 wait nonemtpy
在這之後 worker 從 jobqueue 中提取 job 接著判斷是否有 cancel 或是 destroy 的 flag 否則就是執行 job 中的 function 做完後繼續提取下一個 job 直到沒有 job
結束前會呼叫一開始設定的 clean up routine
#### `tpool_create`
配置 `__thread_pool` 所需要的記憶體
#### `tpool_apply`
將 job 添加到 thread pool 中的 job queue 中
並回傳這個 job 的 future object
#### `tpool_join`
將 thread pool 中的 worker 作 join
並且釋放記憶體
## timeout 機制