# 2021q1 Homework4 (quiz4) contributed by < `ilkclord` > ## Problem `1` ### Struct * **thread_task_t** Store each task with the funtion needs to execute . ```clike typedef struct __threadtask { void *(*func)(void *); void *arg; struct __tpool_future *future; struct __threadtask *next; } threadtask_t; ``` * **jobqueue_t** A job queue with mutex to ensure that only one function doing on the queue in the same time . ```clike typedef struct __jobqueue { threadtask_t *head, *tail; pthread_cond_t cond_nonempty; pthread_mutex_t rwlock; } jobqueue_t; ``` * **__tpool_future** ``result`` store the result ``flag`` shows the task state ```clike struct __tpool_future { int flag; void *result; pthread_mutex_t mutex; pthread_cond_t cond_finished; }; ``` * **__threadpool** `count` record the muber of workers (threads) `workers` threads that do the task `jobqueue` a linked list of task ```clike struct __threadpool { size_t count; pthread_t *workers; jobqueue_t *jobqueue; }; ``` ### Code flow 4 threads start running at the background ```graphviz Digraph G{ node[shape = box]; tpool_create -> jobfetch tpool_create -> jobfetch2 tpool_create -> jobfetch3 tpool_create -> jobfetch4 rankdir = LR } ``` `tool_apply` adds task into the queue `jobfetch` pops task out and do it ```graphviz Digraph G{ node[shape = box] jobqueue->jobfetch jobqueue->jobfetch2 jobqueue->jobfetch3 jobqueue->jobfetch4 tpool_apply->jobqueue rankdir = LR } ``` In the end , call the `tpool_join `to join each threads , and release the memory . ```graphviz Digraph G{ node[shape = box] jobfetch->tpool_join jobfetch2->tpool_join jobfetch3->tpool_join jobfetch4->tpool_join tpool_join->end rankdir = LR } ``` ### Function explaination * **jobqueue_create** Initialize the `jobqueue` ```clike static jobqueue_t *jobqueue_create(void) { jobqueue_t *jobqueue = malloc(sizeof(jobqueue_t)); if (jobqueue) { jobqueue->head = jobqueue->tail = NULL; pthread_cond_init(&jobqueue->cond_nonempty, NULL); pthread_mutex_init(&jobqueue->rwlock, NULL); } return jobqueue; } ``` * **jobqueue_destroy** Release the memory used by `jobqueue` ```clike static void jobqueue_destroy(jobqueue_t *jobqueue) { threadtask_t *tmp = jobqueue->head; while (tmp) { jobqueue->head = jobqueue->head->next; pthread_mutex_lock(&tmp->future->mutex); if (tmp->future->flag & __FUTURE_DESTROYED) { pthread_mutex_unlock(&tmp->future->mutex); pthread_mutex_destroy(&tmp->future->mutex); pthread_cond_destroy(&tmp->future->cond_finished); free(tmp->future); } else { tmp->future->flag |= __FUTURE_CANCELLED; pthread_mutex_unlock(&tmp->future->mutex); } free(tmp); tmp = jobqueue->head; } pthread_mutex_destroy(&jobqueue->rwlock); pthread_cond_destroy(&jobqueue->cond_nonempty); free(jobqueue); } ``` * **__jobqueue_fetch_cleanup** Unlock the specific mutex which depends on input (`arg`) ```clike static void __jobqueue_fetch_cleanup(void *arg) { pthread_mutex_t *mutex = (pthread_mutex_t *) arg; pthread_mutex_unlock(mutex); } ``` * **jobqueue_fetch** It can be consider as the Worker function of this program .`pthread_cleanup_push` makes sure that the mutex is unlocked ,and we go into the first while-loop .The while-loop will be running if the jobqueue is empty ,and it will wait and sleep untill the cond_t brocast . After the brocast , it will lock the mutex immediately , and lock the mutex during the pop-queue process .Then it will do the func in the task This function will keep running with this order ```graphviz Digraph G{ Waiting->"get the task" ->"do the task" ->"Waiting" rankdir = LR ; } ``` ```clike static void *jobqueue_fetch(void *queue) { jobqueue_t *jobqueue = (jobqueue_t *) queue; threadtask_t *task; int old_state; pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock); while (1) { pthread_mutex_lock(&jobqueue->rwlock); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state); pthread_testcancel(); while (!jobqueue->tail) pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state); if (jobqueue->head == jobqueue->tail) { task = jobqueue->tail; jobqueue->head = jobqueue->tail = NULL; } else { threadtask_t *tmp; for (tmp = jobqueue->head; tmp->next != jobqueue->tail; tmp = tmp->next) ; task = tmp->next; tmp->next = NULL; jobqueue->tail = tmp; } pthread_mutex_unlock(&jobqueue->rwlock); if (task->func) { pthread_mutex_lock(&task->future->mutex); if (task->future->flag & __FUTURE_CANCELLED) { pthread_mutex_unlock(&task->future->mutex); free(task); continue; } else { task->future->flag |= __FUTURE_RUNNING; pthread_mutex_unlock(&task->future->mutex); } void *ret_value = task->func(task->arg); pthread_mutex_lock(&task->future->mutex); if (task->future->flag & __FUTURE_DESTROYED) { pthread_mutex_unlock(&task->future->mutex); pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); } else { task->future->flag |= KKK; task->future->result = ret_value; LLL; pthread_mutex_unlock(&task->future->mutex); } free(task); } else { pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); free(task); break; } } pthread_cleanup_pop(0); pthread_exit(NULL); } ``` * **tpool_create** Create the thread pool , and start the parrallelism program ```clike struct __threadpool *tpool_create(size_t count) { jobqueue_t *jobqueue = jobqueue_create(); struct __threadpool *pool = malloc(sizeof(struct __threadpool)); if (!jobqueue || !pool) { if (jobqueue) jobqueue_destroy(jobqueue); free(pool); return NULL; } pool->count = count, pool->jobqueue = jobqueue; if ((pool->workers = malloc(count * sizeof(pthread_t)))) { for (int i = 0; i < count; i++) { if (pthread_create(&pool->workers[i], NULL, jobqueue_fetch, (void *) jobqueue)) { for (int j = 0; j < i; j++) pthread_cancel(pool->workers[j]); for (int j = 0; j < i; j++) pthread_join(pool->workers[j], NULL); free(pool->workers); jobqueue_destroy(jobqueue); free(pool); return NULL; } } return pool; } jobqueue_destroy(jobqueue); free(pool); return NULL; } ``` * **tpool_apply** It can be consider as Producer function which push the task with it's arg into the queue ```clike struct __tpool_future *tpool_apply(struct __threadpool *pool, void *(*func)(void *), void *arg) { jobqueue_t *jobqueue = pool->jobqueue; threadtask_t *new_head = malloc(sizeof(threadtask_t)); struct __tpool_future *future = tpool_future_create(); if (new_head && future) { new_head->func = func, new_head->arg = arg, new_head->future = future; pthread_mutex_lock(&jobqueue->rwlock); if (jobqueue->head) { new_head->next = jobqueue->head; jobqueue->head = new_head; } else { jobqueue->head = jobqueue->tail = new_head; HHH; } pthread_mutex_unlock(&jobqueue->rwlock); } else if (new_head) { free(new_head); return NULL; } else if (future) { tpool_future_destroy(future); return NULL; } return future; } ``` * **tpool_join** The end of the program , make all threads join and stop running . It also free the memory used by pool . ```clike int tpool_join(struct __threadpool *pool) { size_t num_threads = pool->count; for (int i = 0; i < num_threads; i++) tpool_apply(pool, NULL, NULL); for (int i = 0; i < num_threads; i++) pthread_join(pool->workers[i], NULL); free(pool->workers); jobqueue_destroy(pool->jobqueue); free(pool); return 0; } ``` * **bbp** Cacualte the specific digit of pi , which is designed by the formula of bbp ```clike static void *bpp(void *arg) { int k = *(int *) arg; double sum = (4.0 / (8 * k + 1)) - (2.0 / (8 * k + 4)) - (1.0 / (8 * k + 5)) - (1.0 / (8 * k + 6)); double *product = malloc(sizeof(double)); if (product) *product = 1 / pow(16, k) * sum; return (void *) product; } ``` ### Timeout ``` ==14488== Memcheck, a memory error detector ==14488== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al. ==14488== Using Valgrind-3.15.0 and LibVEX; rerun with -h for copyright info ==14488== Command: ./pi_time ==14488== ==14488== Thread 2: ==14488== Invalid read of size 4 ==14488== at 0x486D7B4: __pthread_mutex_unlock_usercnt (pthread_mutex_unlock.c:41) ==14488== by 0x486D7B4: pthread_mutex_unlock (pthread_mutex_unlock.c:357) ==14488== by 0x109C48: jobqueue_fetch (in /home/ilkclord/linux2020/quiz4/pi_time) ==14488== by 0x4869608: start_thread (pthread_create.c:477) ==14488== by 0x4AF4292: clone (clone.S:95) ==14488== Address 0x4bc69d0 is 32 bytes inside a block of size 104 free'd ==14488== at 0x483CA3F: free (in /usr/lib/x86_64-linux-gnu/valgrind/vgpreload_memcheck-amd64-linux.so) ==14488== by 0x1095CD: tpool_future_destroy (in /home/ilkclord/linux2020/quiz4/pi_time) ==14488== by 0x10A3F2: main (in /home/ilkclord/linux2020/quiz4/pi_time) ==14488== Block was alloc'd at ==14488== at 0x483B7F3: malloc (in /usr/lib/x86_64-linux-gnu/valgrind/vgpreload_memcheck-amd64-linux.so) ==14488== by 0x1094CD: tpool_future_create (in /home/ilkclord/linux2020/quiz4/pi_time) ==14488== by 0x109F8D: tpool_apply (in /home/ilkclord/linux2020/quiz4/pi_time) ==14488== by 0x10A34C: main (in /home/ilkclord/linux2020/quiz4/pi_time) ==14488== ==14488== Invalid read of size 4 ==14488== at 0x486D300: __pthread_mutex_unlock_full (pthread_mutex_unlock.c:103) ==14488== by 0x109C48: jobqueue_fetch (in /home/ilkclord/linux2020/quiz4/pi_time) ==14488== by 0x4869608: start_thread (pthread_create.c:477) ==14488== by 0x4AF4292: clone (clone.S:95) ==14488== Address 0x4bc69d0 is 32 bytes inside a block of size 104 free'd ==14488== at 0x483CA3F: free (in /usr/lib/x86_64-linux-gnu/valgrind/vgpreload_memcheck-amd64-linux.so) ==14488== by 0x1095CD: tpool_future_destroy (in /home/ilkclord/linux2020/quiz4/pi_time) ==14488== by 0x10A3F2: main (in /home/ilkclord/linux2020/quiz4/pi_time) ==14488== Block was alloc'd at ==14488== at 0x483B7F3: malloc (in /usr/lib/x86_64-linux-gnu/valgrind/vgpreload_memcheck-amd64-linux.so) ==14488== by 0x1094CD: tpool_future_create (in /home/ilkclord/linux2020/quiz4/pi_time) ==14488== by 0x109F8D: tpool_apply (in /home/ilkclord/linux2020/quiz4/pi_time) ==14488== by 0x10A34C: main (in /home/ilkclord/linux2020/quiz4/pi_time) ==14488== ``` ### Problem For a perfect parallelism , the workers should be loaded .Every rest of a worker is a waste of source .I take a test to check if every worker is doing a job when someones gets a task .From the result showed below , we can find out that over 75% workers are resting which points out that the speed of pushing process is much slower than that of doing the task . So we may speed up the pushing process to enhence the efficiency . [code](https://github.com/ilkclord/quiz4/blob/master/pi_debug.c) [result](https://github.com/ilkclord/quiz4/blob/master/result.txt) This problem may occured since pop and push will both lock the queue , and cause the pushing process delayed . #### Improvement ##### **adding more jobqueue** ```graphviz Digraph G{ queue1->Worker1 queue2->Worker2 queue3->Worker3 queue4->Worker4 rankdir = LR } ``` To fix the delay between pop and push , we give every worker a own queue , which make the pushing process not being blocked . [4-queue version](https://github.com/ilkclord/quiz4/blob/master/pi_v2.c) [result_v2](https://github.com/ilkclord/quiz4/blob/master/result_v2) The improvement still needs to be complete , there are a few issues need to be fixed * Achieive fully loaded , with Busy state 1 1 1 1 * The Timeout process has to be rewrite ##### **Switching** We can take the use of resting worker and let them do the pushing task . ----still trying to finish this program------