# 2021q1 Homework4 (quiz4) contributed by < `henrybear327` > ###### tags: `linux2021` > [Quiz 4](https://hackmd.io/@sysprog/linux2021-quiz4) > [Github](https://github.com/henrybear327/2021q1-Homework4-quiz4) # Problem 1 ## Implementation [Code with comments](https://github.com/henrybear327/2021q1-Homework4-quiz4/blob/master/thread_pool.c) The main idea of the thread pool contains 2 parts: thread pool and task. ```c typedef struct __threadpool *tpool_t; struct __threadpool { size_t count; pthread_t *workers; jobqueue_t *jobqueue; }; ``` The `tpool_t` struct holds exactly `n` workers specified by `count`, it also holds a `jobqueue_t`. ```c typedef struct __jobqueue { threadtask_t *head, *tail; pthread_cond_t cond_nonempty; pthread_mutex_t rwlock; } jobqueue_t; ``` The `jobqueue_t` holds `threadtask_t` in a queue, which is implemented using a singly linked list. ```c typedef struct __threadtask { void *(*func)(void *); void *arg; struct __tpool_future *future; struct __threadtask *next; } threadtask_t; typedef struct __tpool_future *tpool_future_t; struct __tpool_future { int flag; void *result; pthread_mutex_t mutex; pthread_cond_t cond_finished; }; ``` The `threadtask_t` encapsulates the actual start_routine and argument pair for the `pthread` to actually run on, also a `tpool_future_t` to represent the execution status and the return value of the thread. ## `pthread_mutex_t` and `pthread_cond_t` There are 2 sets of `pthread_mutex_t` and `pthread_cond_t`, one in `jobqueue_t` and one in `tpool_future_t`. The set in `jobqueue_t` is to make sure that no concurrent access to the queue that it holds, and to signify if an empty queue has a new entry that one of the `pthread` should pick up and execute it. The set in `tpool_future_t` is to block and wait for the future to be completed, if the `tpool_future_t` is not yet completed at the time of the result retrieval by the user. ### Thread pool creation ```c static jobqueue_t *jobqueue_create(void) { jobqueue_t *jobqueue = malloc(sizeof(jobqueue_t)); if (jobqueue) { jobqueue->head = jobqueue->tail = NULL; pthread_cond_init(&jobqueue->cond_nonempty, NULL); pthread_mutex_init(&jobqueue->rwlock, NULL); } return jobqueue; } struct __threadpool *tpool_create(size_t count) { jobqueue_t *jobqueue = jobqueue_create(); struct __threadpool *pool = malloc(sizeof(struct __threadpool)); if (!jobqueue || !pool) { // if the creation failed, deinit and return NULL if (jobqueue) jobqueue_destroy(jobqueue); free(pool); // If ptr is NULL, no operation is performed. return NULL; } pool->count = count, pool->jobqueue = jobqueue; if ((pool->workers = malloc(count * sizeof(pthread_t)))) { for (size_t i = 0; i < count; i++) { if (pthread_create(&pool->workers[i], NULL, jobqueue_fetch, (void *)jobqueue)) { // if non-0 == failed for (size_t j = 0; j < i; j++) pthread_cancel(pool->workers[j]); for (size_t j = 0; j < i; j++) pthread_join(pool->workers[j], NULL); free(pool->workers); jobqueue_destroy(jobqueue); free(pool); return NULL; } } return pool; } jobqueue_destroy(jobqueue); free(pool); return NULL; } ``` `tpool_create` will initialize a `tpool_t` with a `jobqueue_t` and an array of workers, which is essentially an array of `pthread`. The `pthread`s are created with the start_routine `jobqueue_fetch` and arg `jobqueue`. If any of the creations failed, `NULL` will be returned. ### Add tasks to thread pool ```c struct __tpool_future *tpool_apply(struct __threadpool *pool, void *(*func)(void *), void *arg) { jobqueue_t *jobqueue = pool->jobqueue; threadtask_t *new_head = malloc(sizeof(threadtask_t)); struct __tpool_future *future = tpool_future_create(); if (new_head && future) { new_head->func = func, new_head->arg = arg, new_head->future = future; pthread_mutex_lock(&jobqueue->rwlock); if (jobqueue->head) { // has head already new_head->next = jobqueue->head; jobqueue->head = new_head; } else { // empty queue jobqueue->head = jobqueue->tail = new_head; pthread_cond_broadcast(&jobqueue->cond_nonempty); // HHH; } pthread_mutex_unlock(&jobqueue->rwlock); } else if (new_head) { // future creation failed free(new_head); return NULL; } else if (future) { // new_head malloc failed tpool_future_destroy(future); return NULL; } return future; } ``` `tpool_apply` basically maintains the jobqueue, which is implemented using a singly linked list holding `threadtask_t`s. Whenever the queue is being modified, the `rwlock` will be acquired, and the task will be added to `head`. When the task is added to an empty queue, the `cond_nonempty` will be broadcasted, causing one of the `pthread`s blocking on `cond_nonempty` (waiting for task) to be unblocked. According to the man page, `If more than one thread is blocked on a condition variable, the scheduling policy shall determine the order in which threads are unblocked.`, so we don't need to worry about picking one `pthread` to execute for us. Notice that changing `pthread_cond_broadcast` to `pthread_cond_signal` will not work, as you will only wake up 1 thread from the blocked threads, which sounds like a performance gain but it's actually letting all the other threads blocked on conditional varible keep on blocking (they might never get a chance to be woken up, as you are waking up 1 thread at a time only when the queue is empty!) Upon a successful task creation, a `tpool_future_t` will be returned to the caller, otherwise `NULL` will be returned. ### Execution ```c static void __jobqueue_fetch_cleanup(void *arg) { pthread_mutex_t *mutex = (pthread_mutex_t *)arg; pthread_mutex_unlock(mutex); } // returns the start_routine static void *jobqueue_fetch(void *queue) { jobqueue_t *jobqueue = (jobqueue_t *)queue; threadtask_t *task; int old_state; // The previous cancelability type of the thread is returned in the buffer pointed to by oldtype. pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *)&jobqueue->rwlock); // defer while (1) { pthread_mutex_lock(&jobqueue->rwlock); // defer guaranteed this will be released pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state); pthread_testcancel(); // Calling pthread_testcancel() creates a cancellation point within the calling thread, so that a thread that is otherwise executing code that contains no cancellation points will respond to a cancellation request. // should be waiting for a job to show up while (!jobqueue->tail) // check tail as we pop from tail pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock); // GGG; // we were waiting, so we allowed the thread to be cancelled // now we are about to execute code, so we don't like the thread to be cancelled anymore pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state); if (jobqueue->head == jobqueue->tail) { // only one task in queue, we empty the queue task = jobqueue->tail; jobqueue->head = jobqueue->tail = NULL; } else { // we pop the tail threadtask_t *tmp; for (tmp = jobqueue->head; tmp->next != jobqueue->tail; tmp = tmp->next) ; // get prev task = tmp->next; tmp->next = NULL; jobqueue->tail = tmp; } pthread_mutex_unlock(&jobqueue->rwlock); if (task->func) { pthread_mutex_lock(&task->future->mutex); if (task->future->flag & __FUTURE_CANCELLED) { // jobqueue destroyed pthread_mutex_unlock(&task->future->mutex); free(task); continue; // the jobqueue_destroy will only happen all threads are joined. } else { task->future->flag |= __FUTURE_RUNNING; pthread_mutex_unlock(&task->future->mutex); } void *ret_value = task->func(task->arg); // TODO: should move it into else since in the if clause it is a waste of time to run. pthread_mutex_lock(&task->future->mutex); if (task->future->flag & __FUTURE_DESTROYED) { pthread_mutex_unlock(&task->future->mutex); pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); } else { task->future->flag |= __FUTURE_FINISHED; // KKK task->future->result = ret_value; pthread_cond_broadcast(&task->future->cond_finished); // LLL, so the tpool_get_future can return the value properly! pthread_mutex_unlock(&task->future->mutex); // future is where the return value is held, also by main.c caller // task encapsulated future // so we free task, but not free future, so the caller can get the result! } free(task); } else { // nothing to execute, tpool_join operation will send in no func and arg pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); free(task); break; // let's end the thread! } } pthread_cleanup_pop(0); // pop the defer function WITHOUT executing it: The pthread_cleanup_pop() function removes the routine at the top of the stack of clean-up handlers, and optionally executes it if execute is nonzero. pthread_exit(NULL); } ``` `jobqueue_fetch` is where all the magic happens. There is a `while(1)` which will block until the caller calls `tpool_join`, which will add a task with `NULL` for both the start_routine and arg. When `while(1)` starts to execute, it will first acquire the `rwlock` of the jobqueue, and then check if the queue is empty by looking at if the tail is `NULL` or not. If the queue is not empty, we will continue on and pop one task from the jobqueue. If the queue is indeed empty, the code will block in `while(!jobqueue->tail)` at `pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock);`, and the `rwlock` will be released by `pthread_cond_wait`. This is the reason why the broadcasting of the `cond_nonempty` in `tpool_apply` when the queue is empty is necessary! Upon the broadcasting of `cond_nonempty`, `pthread`s that were blocking on `pthread_cond_wait` will be unblocked and one of them will be able to move on and pop one task from the jobqueue's tail. When a task is popped from the jobqueue, `rwlock` will be released again. We also need to pay attention to the cancel state handling code. `pthread_setcancelstate` can enable or disable the cancelability of a thread. From the man page, we know that the default cancelability type is `PTHREAD_CANCEL_DEFERRED`, which means we will also need to add a cancellation point, by using `pthread_testcancel()`. The idea of using the cancel state is that when the thread is blocked due to waiting for a new task, we allow the thread to be killed if needed. But when a task is running on the thread, we will disable the cancelling, which means that the task can be executed on the thread without any being killed mid-way. Before the task starts executing, the mutex in the future object will be acquired. If the flag in future contains `__FUTURE_CANCELLED`, that means the jobqueue has been destroyed already, so we will free this task. Otherwise we will add the flag with `__FUTURE_RUNNING`. Notice that `threadtask_t` is for internal use only, and `tpool_future_t` is what the external caller will hold. So we can only free the task object but NOT the future object. After executing the task, we will add `__FUTURE_FINISHED` to the flag, and broadcast the `cond_finished` in the future. The `pthread_cleanup_push` is like the `defer` in golang. When the thread is cancelled, the function registered here will be executed. The `pthread_cleanup_pop(0)` at the end of the function actually pops the registered function without executing it, since we are exiting the thread normally. :::warning Q: Is checking for `__FUTURE_CANCEL` required? ```c if (task->future->flag & __FUTURE_CANCELLED) { // jobqueue destroyed pthread_mutex_unlock(&task->future->mutex); free(task); continue; // the jobqueue_destroy will only happen all threads are joined. } ``` I can't think of a condition that this block of code will be run. - If the future is destroyed before the task being run, it will be marked as `__FUTURE_DESTROYED`. - If the task is finished, it will be marked as `__FUTURE_FINISHED`. - If tasks are not finished but the jobqueue is requested to be destroyed by calling `tpool_join`, tasks will all be run first, and then the `NULL` start_routine and arg will cause the pthread to exit already. Then the `jubqueue_destroy` will be called, which will mark the future objects as `__FUTURE_CANCEL`. So checking for `__FUTURE_CANCEL` doesn't make sense to me so far. Ans: This is the part where we can improve upon $\rightarrow$ cancellation of tasks ::: ### Retrieve the result of tasks using future ```c void *tpool_future_get(struct __tpool_future *future, unsigned int seconds) { pthread_mutex_lock(&future->mutex); /* turn off the timeout bit set previously */ future->flag &= ~__FUTURE_TIMEOUT; // because you might want to come back again to check the result after the last timeout! while ((future->flag & __FUTURE_FINISHED) == 0) { // not finished, let's keep waiting if (seconds) { struct timespec expire_time; clock_gettime(CLOCK_MONOTONIC, &expire_time); expire_time.tv_sec += seconds; int status = pthread_cond_timedwait(&future->cond_finished, &future->mutex, &expire_time); if (status == ETIMEDOUT) { future->flag |= __FUTURE_TIMEOUT; pthread_mutex_unlock(&future->mutex); return NULL; } } else { pthread_cond_wait(&future->cond_finished, &future->mutex); // FFF; } } pthread_mutex_unlock(&future->mutex); return future->result; } ``` `tpool_future_get` will check if the future object is `__FUTURE_FINISHED` after acquiring the mutex of the future object. If it's finished, then we can directly return the result. Otherwise, we check if `seconds` is $\geq 0$ If `seconds` is 0, then we block and wait on the `cond_finished`, since when the task is completed it will be broadcasted. If `seconds` is non-zero, we will set an expiration time. `pthread_cond_timedwait` will block until time expires or is broadcasted. If timeout happens first, we set `__FUTURE_TIMEOUT` flag, and return `NULL`, otherwise, we return the return value in the future object. As stated in the man page: The pthread_cond_timedwait() function shall be equivalent to pthread_cond_wait(), except that an error is returned if the absolute time specified by abstime passes (that is, system time equals or exceeds abstime) before the condition cond is signaled or broadcasted, or if the absolute time specified by abstime has already been passed at the time of the call. When such timeouts occur, pthread_cond_timedwait() shall nonetheless release and re-acquire the mutex referenced by mutex, and may consume a condition signal directed concurrently at the condition variable.) Notice that if the timeout happened on the last `tpool_future_get()`, but the caller attempted for another time, we need to clear the `__FUTURE_TIMEOUT` from the flag first, and then run the remaining logic like the first time. By doing this we can maintain the current result from the last run all the time. :::warning Q: Why is `__FUTURE_CANCEL` and `__FUTURE_DESTROYED` doing the bit sharing? (e.g. `__FUTURE_CANCEL` is 8 + `__FUTURE_FINISHED`) A: It's octal, because the flags starts with prefix 0. Need to read more carefully. ::: ### Deconstruct the future ```c int tpool_future_destroy(struct __tpool_future *future) { if (future) { pthread_mutex_lock(&future->mutex); if (future->flag & __FUTURE_FINISHED || future->flag & __FUTURE_CANCELLED) { pthread_mutex_unlock(&future->mutex); pthread_mutex_destroy(&future->mutex); pthread_cond_destroy(&future->cond_finished); free(future); } else { future->flag |= __FUTURE_DESTROYED; pthread_mutex_unlock(&future->mutex); // you can't free it, as the task holding this future is still in the queue (not popped when future is destroyed). It's kind of like cancelling the task before it's run. } } return 0; } ``` `tpool_future_destroy` will free the future object if the future is `__FUTURE_FINISHED` or `__FUTURE_CANCEL`. Otherwise, `__FUTURE_DESTROYED` will be added to the flag, but NOT freeing the future object. The reason is that the future is still pending executing, which means the `threadtask_t` object is still holding it! ### Deconstruct the thread pool ```c static void jobqueue_destroy(jobqueue_t *jobqueue) { threadtask_t *tmp = jobqueue->head; while (tmp) { // clean up all pending tasks in the jobqueue jobqueue->head = jobqueue->head->next; pthread_mutex_lock(&tmp->future->mutex); if (tmp->future->flag & __FUTURE_DESTROYED) { // future already got requested to be destroyed by caller pthread_mutex_unlock(&tmp->future->mutex); pthread_mutex_destroy(&tmp->future->mutex); pthread_cond_destroy(&tmp->future->cond_finished); free(tmp->future); } else { // not destroyed, means that the caller is still holding on to the future object! // can't free for the caller tmp->future->flag |= __FUTURE_CANCELLED; pthread_mutex_unlock(&tmp->future->mutex); } free(tmp); tmp = jobqueue->head; } pthread_mutex_destroy(&jobqueue->rwlock); pthread_cond_destroy(&jobqueue->cond_nonempty); free(jobqueue); } int tpool_join(struct __threadpool *pool) { size_t num_threads = pool->count; for (size_t i = 0; i < num_threads; i++) tpool_apply(pool, NULL, NULL); for (size_t i = 0; i < num_threads; i++) pthread_join(pool->workers[i], NULL); free(pool->workers); jobqueue_destroy(pool->jobqueue); free(pool); return 0; } ``` `tpool_join` adds exactly `count` number of tasks with `NULL` for both start_routine and arg to the jobqueue, which will cause the pthread to exit normally when run. After adding the tasks, we wait for the pthread to join, and then we free the workers, and in the end we destroy the jobqueue, which essentially destroyed the thread pool. The `jobqueue_destroy` will loop through all tasks in the jobqueue and delete all tasks one by one. :::warning Q: Are there any remaining tasks in the jobqueue to clean up though? Since exactly `count` number of tasks with `NULL` for both start_routine and arg were added to the queue, and finished executing, before reaching here. Thus, the jobqueue should be empty already. A: Also for cancellation implementation improvement ::: ## Point out potential improvements that can be made and implement it * the queue can be re-written in a lockless queue * the popping of the queue is linear time (since the code scans for the second-to-last position by looping over from head). This can be improved to $O(1)$ * threads can be [tied](https://man7.org/linux/man-pages/man3/pthread_setaffinity_np.3.html) to specific CPUs # 研讀 atomic_threadpool,指出其 atomic 操作的運用方式,並說明該 lock-free 的手法 **TODO** [lfqueue](https://github.com/Taymindis/lfqueue) conditional variable -> needs a lock - jobqueue has lock - future has lock # 嘗試使用 C11 Atomics 改寫上述程式碼,使其有更好的 scalability **TODO**