# 2021q1 Homework4 (quiz4) contributed by < [`WayneLin1992`](https://github.com/WayneLin1992) > ###### tags: `linux2021` ## 延伸問題 - [x] 解釋程式碼運作原理,包含 timeout 處理機制。 - [x] 指出改進空間並實作 - [x] 研讀 [atomic_threadpool](https://github.com/Taymindis/atomic_threadpool),指出其 atomic 操作,並說明 lock-free - [x] 嘗試使用 C11 Atomics 改寫,使其有更好的 scalability ## 題目理解 ### 理解程式運作原理 [quiz4](https://hackmd.io/@sysprog/linux2021-quiz4) [thread pool](https://en.wikipedia.org/wiki/Thread_pool) ```cpp enum __future_flags { __FUTURE_RUNNING = 01, //0000 0001 __FUTURE_FINISHED = 02, //0000 0010 __FUTURE_TIMEOUT = 04, //0000 0100 __FUTURE_CANCELLED = 010,//0000 1010 __FUTURE_DESTROYED = 020,//0001 0100 }; ``` #### `__tpool_future` ```cpp struct __tpool_future { int flag; void *result; pthread_mutex_t mutex; pthread_cond_t cond_finished; }; typedef struct __tpool_future *tpool_future_t; ``` ```graphviz digraph tpool_future_t { node [shape=record]; rankdir=LR; __tpool_future [label="<f0>int flag|<f1> result|<f2> mutex|<f3> cond_finished"]; tpool_future_t->__tpool_future } ``` #### `threadtask_t` ```cpp typedef struct __threadtask { void *(*func)(void *); void *arg; struct __tpool_future *future; struct __threadtask *next; } threadtask_t; ``` ```graphviz digraph tpool_future_t { node [shape=record]; rankdir=LR; __threadtask [label="<f0> func|<f1> arg|<f2> future|<f3> next"]; __threadtask1 [label="<f0> func|<f1> arg|<f2> future|<f3> next"]; __tpool_future [label="<f0>int flag|<f1> result|<f2> mutex|<f3> cond_finished"]; __threadtask:f3->__threadtask1; __threadtask:f2->__tpool_future; } ``` #### `jobqueue_t` ```cpp typedef struct __jobqueue { threadtask_t *head, *tail; pthread_cond_t cond_nonempty; pthread_mutex_t rwlock; } jobqueue_t; ``` ```graphviz digraph tpool_future_t { node [shape=record]; rankdir=LR; __threadtask [label="<f0> func|<f1> arg|<f2> future|<f3> next"]; __threadtask1 [label="<f0> func|<f1> arg|<f2> future|<f3> next"]; __jibqueue [label="<f0> head|<f1> tail|<f2> cond_noempty|<f3> rwlock"]; more[label="..."] __jibqueue:f0->__threadtask __jibqueue:f1->__threadtask1 __threadtask:f3->more more->__threadtask1 } ``` #### `tpool_t` ```cpp struct __threadpool { size_t count; pthread_t *workers; jobqueue_t *jobqueue; }; typedef struct __threadpool *tpool_t; ``` ```graphviz digraph tpool_future_t { node [shape=record]; rankdir=LR; __threadpool [label="<f0> count|<f1> workers|<f2> jobqueue"]; tpool_t->__threadpool; __threadtask [label="<f0> func|<f1> arg|<f2> future|<f3> next"]; __threadtask1 [label="<f0> func|<f1> arg|<f2> future|<f3> next"]; __jibqueue [label="<f0> head|<f1> tail|<f2> cond_noempty|<f3> rwlock"]; more[label="..."] __jibqueue:f0->__threadtask __jibqueue:f1->__threadtask1 __threadtask:f3->more more->__threadtask1 __threadpool:f2->__jibqueue } ``` #### `tpool_future_create` ```cpp static struct __tpool_future *tpool_future_create(void) { struct __tpool_future *future = malloc(sizeof(struct __tpool_future)); if (future) { future->flag = 0; future->result = NULL; pthread_mutex_init(&future->mutex, NULL); pthread_condattr_t attr; pthread_condattr_init(&attr); pthread_cond_init(&future->cond_finished, &attr); pthread_condattr_destroy(&attr); } return future; } ``` 建立 `tpool_future` 物件,一併初始化 mutex, condition variable, condition variable attributes object。 [pthread_mutex_init](https://linux.die.net/man/3/pthread_mutex_init) 使用 mutex 必須要初始化,要不然會出現 undefined behavior (UB) :::info :bell: 建了 mutex 使用 pthread_mutex_init 在結束時還必須要對應要寫 pthread_mutex_destory 將 mutex 銷毀 ::: [pthread_condattr_init](https://linux.die.net/man/3/pthread_condattr_init) 建立 condition attributes object 需要初始化,沒初始一樣會 undefined behavior (UB) , 也需要對稱寫 destory,attr 預設為 PTHREAD_PROCESS_PRIVATE 。 :::info [pthread_condattr_init()](http://www.qnx.com/developers/docs/6.5.0/index.jsp?topic=%2Fcom.qnx.doc.neutrino_lib_ref%2Fp%2Fpthread_condattr_init.html) >The pthread_condattr_init() function initializes the attributes in the condition variable attribute object attr to default values. Pass attr to pthread_cond_init() to define the attributes of the condition variable. 由 `pthread_condattr_init` 初始化的 attr 將會輸入 `pthrad_cond_init` 所以可以知道`pthread_condattr_init` `pthrad_cond_init` 為一組,不能刪去, return 0 代表初始化成功,失敗會回傳錯誤編號。 [int pthread_condattr_init(pthread_condattr_t *attr) in ibm](https://www.ibm.com/support/knowledgecenter/en/ssw_ibm_i_74/apis/users_71.htm) 由上面例子改寫 ```cpp pthread_cond_t cond; int main(){ int rc=0; pthread_condattr_t attr; printf("Create a default condition attribute\n"); rc = pthread_condattr_init(&attr); printf("pthread_condattr_init : %d\n", rc); printf("Create the condition using the condition attributes object\n"); rc = pthread_cond_init(&cond, &attr); printf("pthread_cond_init : %d\n", rc); printf("Destroy cond attribute\n"); rc = pthread_condattr_destroy(&attr); printf("pthread_condattr_destroy : %d\n", rc); printf("Destroy condition\n"); rc = pthread_cond_destroy(&cond); printf("pthread_cond_destroy : %d\n", rc); return 0; } ``` >Create a default condition attribute pthread_condattr_init : 0 Create the condition using the condition attributes object pthread_cond_init : 0 Destroy cond attribute pthread_condattr_destroy : 0 Destroy condition pthread_cond_destroy : 0 可以看出來 attr 將會輸入 `pthread_cond_init` 並且產生對應的 condition variable。 所以這裡 `pthread_condattr_init` ,這裡 attr 會被設為 0 ,所以也可以把 `pthread_condattr_init` `pthread_condattr_destroy` 刪去也可以執行。 ::: [pthread_condattr_destroy](https://linux.die.net/man/3/pthread_condattr_destroy) 這裡建了 condattr 又 destroy, ~~**推測這應該是可以改進的地方**~~ ,由上可以知道當建立好 condition variable 就可以先將 attr destroy 掉, return 0 ,代表成功。 [pthread_cond_init](https://linux.die.net/man/3/pthread_cond_init) 初始化 condition variable ,一樣在結束時必須對應著 destroy , return 0 ,代表成功。 :::info [pthread_cond_init() in oracle](https://docs.oracle.com/cd/E19455-01/806-5257/6je9h032r/index.html#sync-59145) ```cpp int main(){ pthread_cond_t cv; pthread_condattr_t cattr; int ret; /* initialize a condition variable to its default value */ ret = pthread_cond_init(&cv, NULL); printf( "pthread_cond_init : %d\n", ret); /* initialize a condition variable */ ret = pthread_cond_init(&cv, &cattr); printf( "pthread_cond_init : %d\n", ret); } ``` >pthread_cond_init : 0 pthread_cond_init : 0 即便沒有 `pthread_condattr_init` 也會以 default value ( is NULL) 方式代入,也可以直接輸入 NULL 來省去存放 cattr 記憶體空間。 ::: ```graphviz digraph tpool_future_t { node [shape=record]; rankdir=LR; __tpool_future [label="<f0>int flag|<f1> result|<f2> mutex|<f3> cond_finished"]; __tpool_future:f0 -> 0 __tpool_future:f1 -> NULL } ``` :::info 參考資料: [Condition Variable Attributes](https://docs.oracle.com/cd/E19455-01/806-5257/6je9h032q/index.html#sync-81445) ::: #### `tpool_future_destory` ```cpp int tpool_future_destroy(struct __tpool_future *future) { if (future) { pthread_mutex_lock(&future->mutex); if (future->flag & __FUTURE_FINISHED || future->flag & __FUTURE_CANCELLED) { pthread_mutex_unlock(&future->mutex); pthread_mutex_destroy(&future->mutex); pthread_cond_destroy(&future->cond_finished); free(future); } else { future->flag |= __FUTURE_DESTROYED; pthread_mutex_unlock(&future->mutex); } } return 0; } ``` 銷毀 `tpool_future_destroy` 當 `flag` 為 `__FUTURE_FINISHED` 和 `__FUTURE_CANCELLED` 時就要銷毀,銷毀要對應把 mutex, condition variable , `flag` 為 `__FUTURE_DESTROYED` 名子為 destroy 但卻保持留 mutex 和 condition variable 這有點問題,**之後也許需要改進** #### `tpool_future_get` ```cpp void *tpool_future_get(struct __tpool_future *future, unsigned int seconds) { pthread_mutex_lock(&future->mutex); /* turn off the timeout bit set previously */ future->flag &= ~__FUTURE_TIMEOUT; while ((future->flag & __FUTURE_FINISHED) == 0) { if (seconds) { struct timespec expire_time; clock_gettime(CLOCK_MONOTONIC, &expire_time); expire_time.tv_sec += seconds; int status = pthread_cond_timedwait(&future->cond_finished, &future->mutex, &expire_time); if (status == ETIMEDOUT) { future->flag |= __FUTURE_TIMEOUT; pthread_mutex_unlock(&future->mutex); return NULL; } } else pthread_cond_wait(&future->cond_finished, &future->mutex);//FFF } pthread_mutex_unlock(&future->mutex); return future->result; } ``` 將 `future->result` 提取出來 [pthread_cond_timedwait](https://linux.die.net/man/3/pthread_cond_timedwait) 要在 mutex 後呼叫,與 cond_wait 類似,cond_timewait 的 condition variable 由時間決定。 `ETIMEDOUT` 為 `pthread_cond_timedwait` return 的 error , 正常 return 為 0 , `return ETIMEDOUT` 代表為超時的意思,對應 `flag = __FUTURE_TIMEOUT` 並解鎖 mutex 。 由 main 部份可以知道這裡 second 將設為 0 所以實際執行將會不斷 cond_wait 的狀況。 :::info [pthread_cond_wait](https://linux.die.net/man/3/pthread_cond_wait) [pthread_cond_wait in ibm](https://www.ibm.com/docs/en/i/7.4?topic=ssw_ibm_i_74/apis/users_78.htm) `int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);` **增加實作測試** ::: #### `jobqueue_create` ```cpp static jobqueue_t *jobqueue_create(void) { jobqueue_t *jobqueue = malloc(sizeof(jobqueue_t)); if (jobqueue) { jobqueue->head = jobqueue->tail = NULL; pthread_cond_init(&jobqueue->cond_nonempty, NULL); pthread_mutex_init(&jobqueue->rwlock, NULL); } return jobqueue; } ``` 建立 `jobqueue` #### `jobqueue_destroy` ```cpp static void jobqueue_destroy(jobqueue_t *jobqueue) { threadtask_t *tmp = jobqueue->head; while (tmp) { jobqueue->head = jobqueue->head->next; pthread_mutex_lock(&tmp->future->mutex); if (tmp->future->flag & __FUTURE_DESTROYED) { pthread_mutex_unlock(&tmp->future->mutex); pthread_mutex_destroy(&tmp->future->mutex); pthread_cond_destroy(&tmp->future->cond_finished); free(tmp->future); } else { tmp->future->flag |= __FUTURE_CANCELLED; pthread_mutex_unlock(&tmp->future->mutex); } free(tmp); tmp = jobqueue->head; } pthread_mutex_destroy(&jobqueue->rwlock); pthread_cond_destroy(&jobqueue->cond_nonempty); free(jobqueue); } ``` 銷毀 `jobqueue` 依序從 `head` 開始 `free`,直到 `tail` #### `__jobqueue_fetch_cleanup` ```cpp static void __jobqueue_fetch_cleanup(void *arg) { pthread_mutex_t *mutex = (pthread_mutex_t *) arg; pthread_mutex_unlock(mutex); } ``` 將 `arg` 轉 type 為 `pthread_mutex_t` ,由下面例子可以知道 `pthread_cleanup_push` 將把 arg 代入 function ,而我們的 function 為 `pthread_mutex_unlock` 所以對應的 arg 為 mutex 所以為 `jobqueue->rwlock` 。~~,不過沒有初始化就先解鎖,~~~~**可能會有問題**。~~ #### `jobqueue_fetch` ```cpp static void *jobqueue_fetch(void *queue) { jobqueue_t *jobqueue = (jobqueue_t *) queue; threadtask_t *task; int old_state; pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock); while (1) { pthread_mutex_lock(&jobqueue->rwlock); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state); pthread_testcancel(); while (!jobqueue->tail) pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock);//GGG pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state); if (jobqueue->head == jobqueue->tail) { task = jobqueue->tail; jobqueue->head = jobqueue->tail = NULL; } else { threadtask_t *tmp; for (tmp = jobqueue->head; tmp->next != jobqueue->tail; tmp = tmp->next) ; task = tmp->next; tmp->next = NULL; jobqueue->tail = tmp; } pthread_mutex_unlock(&jobqueue->rwlock); if (task->func) { pthread_mutex_lock(&task->future->mutex); if (task->future->flag & __FUTURE_CANCELLED) { pthread_mutex_unlock(&task->future->mutex); free(task); continue; } else { task->future->flag |= __FUTURE_RUNNING; pthread_mutex_unlock(&task->future->mutex); } void *ret_value = task->func(task->arg); pthread_mutex_lock(&task->future->mutex); if (task->future->flag & __FUTURE_DESTROYED) { pthread_mutex_unlock(&task->future->mutex); pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); } else { task->future->flag |= __FUTURE_FINISHED;//KKK task->future->result = ret_value; pthread_cond_broadcast(&task->future->cond_finished);//LLL pthread_mutex_unlock(&task->future->mutex); } free(task); } else { pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); free(task); break; } } pthread_cleanup_pop(0); pthread_exit(NULL); } ``` [pthread_cleanup_push](https://man7.org/linux/man-pages/man3/pthread_cleanup_push.3.html) >void pthread_cleanup_push(void (*routine)(void *), void *arg); >The pthread_cleanup_push() function pushes routine onto the top >of the stack of clean-up handlers. When routine is later >invoked, it will be given arg as its argument. :::info [pthread_cleanup_push() ibm](https://www.ibm.com/support/knowledgecenter/SSLTBW_2.4.0/com.ibm.zos.v2r4.bpxbd00/ptcpush.htm) 1. `pthread_cleanup_push()` 和 `pthread_cleanup_pop()` 為一組必須要成對出現。 2. cleanup handlers 就是指當 thread 結束時,會呼叫 pthread_exit 或 pthread_cancel 3. push 和 pop 將以 stack 方式表現,所以會 last in first out(LIFO) 方式 4. 由於結束時會呼叫 ptherad_exit 會將資源釋放,但 mutex 可能還會是上鎖的,所以要在 `pthread_cleanup_pop()` 前 unlock mutex 表現方式如下 ```cpp pthread_cleanup_push(pthread_mutex_unlock, (void *) &mut); pthread_mutex_lock(&mut); /* do some work */ pthread_mutex_unlock(&mut); pthread_cleanup_pop(0); ``` 其中 `pthread_cleanup_pop(1);` 可以代替 `pthread_mutex_unlock(&mut);` 及 `pthread_cleanup_pop(0);` :bell: 這表達方式只適用在同步的情況下,非同步,有可能其他 thread 還存在 mutex lock 的情況。 考量到非同步要修改成如下 ```cpp pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,&oldtype); pthread_cleanup_push(pthread_mutex_unlock,(void *)&mut); pthread_mutex_lock(&mut); /* do some work */ pthread_cleanup_pop(1); pthread_setcanceltype(oldtype,NULL); ``` [pthread_setcanceltype() in ibm](https://www.ibm.com/support/knowledgecenter/SSLTBW_2.3.0/com.ibm.zos.v2r3.bpxbd00/pthread_setcanceltype.htm#pthread_setcanceltype) `pthread_setcanceltype()` cancel type ,而且 oldtype 將回存取舊的 type , `PTHREAD_CANCEL_ASYNCHRONOUS` thread 可以隨時取消。 `PTHREAD_CANCEL_DEFERRED` 只有在特定的 cancellation point 可以取消。 參考資料: [Library C GNC cleanup handler](https://elias.rhi.hi.is/libc/Cleanup-Handlers.html) ::: `pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock);` 其中第一個參數為要被 push into stack 的 thread , arg 將與 thread 相關 :::success 由整體可以知道 tpool_create 使用到 `pthread_create(&pool->workers[i], NULL, jobqueue_fetch,(void *) jobqueue)` 建立 thread 並 jobqueue_fetch(jobqueue) 可以知道,將 jobqueue 任務提取出來,task 將會從 jobqueue tail -> head 的方式依序處理,處裡完的 task 將會被 free 。 ::: [pthread_setcancelstate](https://www.man7.org/linux/man-pages/man3/pthread_setcanceltype.3.html) >The pthread_setcancelstate() sets the cancelability state of the calling thread to the value given in state. The pthread_setcanceltype() sets the cancelability type of the calling thread to the value given in type. The previous cancelability type of the thread is returned in the buffer pointed to by oldtype. `old_state` 將會存放當初進入之前的狀態。 `PTHREAD_CANCEL_ENABLE` 代表 thread 可以被 cancel ,成功 cancel 將會 `return 0` 失敗將會 `return EINVAL` ,還有 `PTHREAD_CANCEL_DISABLE` 代表 thread 不能取消。 :::info 參考資料: [pthread_setcancelstate() in ibm](https://www.ibm.com/support/knowledgecenter/SSLTBW_2.3.0/com.ibm.zos.v2r3.bpxbd00/pthread_setcancelstate.htm) [pthread_setcanceltype() in ibm](https://www.ibm.com/support/knowledgecenter/SSLTBW_2.3.0/com.ibm.zos.v2r3.bpxbd00/pthread_setcanceltype.htm#pthread_setcanceltype) ::: [pthread_testcancel](https://man7.org/linux/man-pages/man3/pthread_testcancel.3.html) >Calling pthread_testcancel() creates a cancellation point within the calling thread, so that a thread that is otherwise executing code that contains no cancellation points will respond to a cancellation request. 其中 `pthread_testcancel` 為設定 cancellation point 主要應用在非同步的情況,到這裡就會 cancel thread ,當可能壅塞的地方也要設立 cancellation point避免漫長等待。 :::info 參考資料: [pthread_testcancel() in ibm](https://www.ibm.com/support/knowledgecenter/ssw_ibm_i_71/apis/users_46.htm) [Cancellation Points](https://docs.oracle.com/cd/E19120-01/open.solaris/816-5137/tlib-62067/index.html) ::: #### `tpool_create` ```cpp struct __threadpool *tpool_create(size_t count) { jobqueue_t *jobqueue = jobqueue_create(); struct __threadpool *pool = malloc(sizeof(struct __threadpool)); if (!jobqueue || !pool) { if (jobqueue) jobqueue_destroy(jobqueue); free(pool); return NULL; } pool->count = count, pool->jobqueue = jobqueue; if ((pool->workers = malloc(count * sizeof(pthread_t)))) { for (int i = 0; i < count; i++) { if (pthread_create(&pool->workers[i], NULL, jobqueue_fetch, (void *) jobqueue)) { for (int j = 0; j < i; j++) pthread_cancel(pool->workers[j]); for (int j = 0; j < i; j++) pthread_join(pool->workers[j], NULL); free(pool->workers); jobqueue_destroy(jobqueue); free(pool); return NULL; } } return pool; } jobqueue_destroy(jobqueue); free(pool); return NULL; } ``` ```graphviz digraph tpool_future_t { node [shape=record]; rankdir=LR; __threadpool [label="<f0> count|<f1> workers|<f2> jobqueue"]; tpool_t->__threadpool; __threadtask [label="<f0> func|<f1> arg|<f2> future|<f3> next"]; __threadtask1 [label="<f0> func|<f1> arg|<f2> future|<f3> next"]; __jibqueue [label="<f0> head|<f1> tail|<f2> cond_noempty|<f3> rwlock"]; more[label="..."] __jibqueue:f0->__threadtask __jibqueue:f1->__threadtask1 __threadtask:f3->more more->__threadtask1 __threadpool:f2->__jibqueue } ``` 建立一個 `tpool` 要對應建立一個 `jobqueue` `count` 代表總共有幾個 `thread` 也就是幾個 `workers` ```cpp for (int i = 0; i < count; i++) { if (pthread_create(&pool->workers[i], NULL, jobqueue_fetch, (void *) jobqueue)) { for (int j = 0; j < i; j++) pthread_cancel(pool->workers[j]); for (int j = 0; j < i; j++) pthread_join(pool->workers[j], NULL); free(pool->workers); jobqueue_destroy(jobqueue); free(pool); return NULL; } } ``` 其中假如 pool 沒有成功 malloc , 就會將其中的 `wokers` `jobqueue` free 掉,但這部分會特意只保留 `worker[i]`,因為其他的 worker 被建立又被 cancel 掉,~~**這部分可能有改進部分**~~,由下面可以知道 thread create 成功將會 return 0 所以 if 將會跳至 return pool 的部分。 :::info `pthread_create` ```cpp int pthread_create(pthread_t *restrict thread, const pthread_attr_t *restrict attr, void *(*start_routine)(void *),void *restrict arg); ``` The pthread_create() function starts a new thread in the calling process. The new thread starts execution by invoking start_routine(); arg is passed as the sole argument of start_routine(). 其中 `pthread_t *restrict thread` 為建立的 thread , `const pthread_attr_t *restrict attr` 為 condition attributes object 可以設定為 `NULL` , `void *(*start_routine)(void *)` 為 function `void *restrict arg` 為輸入 function 的 argument 。 `pthread_create(&pool->workers[i], NULL, jobqueue_fetch, (void *) jobqueue)` 對應到上, worker 代表 thread , `jobqueue_fetch` 為 function , `jobqueue` 為輸入的 argument , 查看 `jobqueue_fetch` `static void *jobqueue_fetch(void *queue)` 就是如此 , 建立成功 return 0 失敗將會 return EBUSY 參考資料: [pthread_create()](https://man7.org/linux/man-pages/man3/pthread_create.3.html) [pthread_create() in ibm](https://www.ibm.com/docs/en/i/7.4?topic=ssw_ibm_i_74/apis/users_14.htm) ::: #### `tpool_apply` ```cpp struct __tpool_future *tpool_apply(struct __threadpool *pool, void *(*func)(void *), void *arg) { jobqueue_t *jobqueue = pool->jobqueue; threadtask_t *new_head = malloc(sizeof(threadtask_t)); struct __tpool_future *future = tpool_future_create(); if (new_head && future) { new_head->func = func, new_head->arg = arg, new_head->future = future; pthread_mutex_lock(&jobqueue->rwlock); if (jobqueue->head) { new_head->next = jobqueue->head; jobqueue->head = new_head; } else { jobqueue->head = jobqueue->tail = new_head; pthread_cond_broadcast(&jobqueue->cond_nonempty);//HHH } pthread_mutex_unlock(&jobqueue->rwlock); } else if (new_head) { free(new_head); return NULL; } else if (future) { tpool_future_destroy(future); return NULL; } return future; } ``` 將 `pool->jobqueue` ,並對應產生 threadtask 。 ```cpp pthread_mutex_lock(&jobqueue->rwlock); if (jobqueue->head) { new_head->next = jobqueue->head; jobqueue->head = new_head; } else { jobqueue->head = jobqueue->tail = new_head; pthread_cond_broadcast(&jobqueue->cond_nonempty);//HHH } pthread_mutex_unlock(&jobqueue->rwlock); ``` jobqueue 為 add the head 方式 , 但其中 mutex 為 coarse graind lock **可以使用更細緻一點**。 :::warning 你應該提出縮小 critical section 的手段 :notes: jserv ::: :::info 思考此地方有可能 critical section 情況 將這部分 mutex 全數移除:`jobqueue` 的順序有可能不正確,但也可以得到正確的結果。 >PI calculated with 101 terms: 3.141592653589793 有可能兩個 thread 都想建立 `jobqueue->head` 連結,所以因此產生順序錯誤,再有鎖的情況下,只許一個 thread 進行 add head 的動作,但經過 gdb 執行狀況,發現到 `jobqueue->head` 一直保持在 `NULL` 最主要是因為 `jobqueue->head` ~~lifetime 不足,也沒將 return jobqueue ,使得 `jobqueue->head` 只執行 `else` 的部分,所以這裡就算將 mutex lock 移除對整體也不會有影響。~~ 並不是 lifetime 不足才導致,是因為 thread 已經處理完,並且 `free(task)` 才會有這樣的結果。 ::: #### `tpool_join` ```cpp int tpool_join(struct __threadpool *pool) { size_t num_threads = pool->count; for (int i = 0; i < num_threads; i++) tpool_apply(pool, NULL, NULL); for (int i = 0; i < num_threads; i++) pthread_join(pool->workers[i], NULL); free(pool->workers); jobqueue_destroy(pool->jobqueue); free(pool); return 0; } ``` 產生 `tpool` 並加入 #### Bailey–Borwein–Plouffe formula ```cpp /* Use Bailey–Borwein–Plouffe formula to approximate PI */ static void *bpp(void *arg) { int k = *(int *) arg; double sum = (4.0 / (8 * k + 1)) - (2.0 / (8 * k + 4)) - (1.0 / (8 * k + 5)) - (1.0 / (8 * k + 6)); double *product = malloc(sizeof(double)); if (product) *product = 1 / pow(16, k) * sum; return (void *) product; } ``` $\pi=\displaystyle\sum_{k=0}^{\infty}\frac{1}{16^k}(\frac{4}{8k+1}-\frac{2}{8k+4}-\frac{1}{8k+5}-\frac{1}{8k+6})$ bpp 將上式表示為 function 。 :::warning 用 LaTeX 語法改寫 BBP 公式 :notes: jserv ::: #### `main` ```cpp #define PRECISION 100 /* upper bound in BPP sum */ int main() { int bpp_args[PRECISION + 1]; double bpp_sum = 0; tpool_t pool = tpool_create(4); tpool_future_t futures[PRECISION + 1]; for (int i = 0; i <= PRECISION; i++) { bpp_args[i] = i; futures[i] = tpool_apply(pool, bpp, (void *) &bpp_args[i]); } for (int i = 0; i <= PRECISION; i++) { double *result = tpool_future_get(futures[i], 0 /* blocking wait */); bpp_sum += *result; tpool_future_destroy(futures[i]); free(result); } tpool_join(pool); printf("PI calculated with %d terms: %.15f\n", PRECISION + 1, bpp_sum); return 0; } ``` >PI calculated with 101 terms: 3.141592653589793 `tpool_t pool = tpool_create(4)` `count` = 4 。 第一個 `for` 迴圈: 建立 `function` = `bbp` 並與 `jobqueue` 在與 `pool` 連結進行管理。 第二個 `for` 迴圈: 使用 `tpool_future_get` 將 `result` 提取 `bpp_sum` 將全部 `result` 合併,並銷毀 `thread` 銷毀 `pool` 映出結果。 其中 `PRECISION + 1` 是因為 0 算第一項,所以 `k=100` 時就是第 101 項。 :::info `pthread_join` >The pthread_join() function waits for the thread specified by thread to terminate. If that thread has already terminated, then pthread_join() returns immediately. The thread specified by thread must be joinable. 成功結束,將 return 0 錯誤 return 如下: >EDEADLK A deadlock was detected (e.g., two threads tried to join with each other); or thread specifies the calling thread. EINVAL thread is not a joinable thread. EINVAL Another thread is already waiting to join with this thread. ESRCH No thread with the ID thread could be found. ::: ### 實際執行結果 執行時發現到 `pool` 和 `futures` 是分開的,而其中 `futures[0]` 到 `futures[100]` 而且 `jobqueue->head` 及 `jobqueue->tail` = `NULL` 表示沒作用到。 `futuresp[0]->flag` = `3` 在 `tpool_future_destroy(futures[i])` 時 `&2` 會有值所以會成功 destroy 掉並 `free(future)` 。 ## 指出改進空間並實作 ### `tpool_future_destroy` ```diff= int tpool_future_destroy(struct __tpool_future *future) { if (future) { pthread_mutex_lock(&future->mutex); if (future->flag & __FUTURE_FINISHED || future->flag & __FUTURE_CANCELLED) { pthread_mutex_unlock(&future->mutex); pthread_mutex_destroy(&future->mutex); pthread_cond_destroy(&future->cond_finished); free(future); } else { future->flag |= __FUTURE_DESTROYED; pthread_mutex_unlock(&future->mutex); + pthread_mutex_destroy(&future->mutex); + pthread_cond_destroy(&future->cond_finished); + free(future); } } return 0; } ``` 當 `else` 時一樣把 futures destroy 掉,並釋放 future 空間。 ### critical section 縮小 #### `tpool_apply` ```diff=10 -pthread_mutex_lock(&jobqueue->rwlock); if (jobqueue->head) { + pthread_mutex_lock(&jobqueue->rwlock); new_head->next = jobqueue->head; jobqueue->head = new_head; + pthread_mutex_unlock(&jobqueue->rwlock); } else { + pthread_mutex_lock(&jobqueue->rwlock); jobqueue->head = jobqueue->tail = new_head; pthread_cond_broadcast(&jobqueue->cond_nonempty);//HHH + pthread_mutex_unlock(&jobqueue->rwlock); } - pthread_mutex_unlock(&jobqueue->rwlock); ``` #### ### Sanitizer 觀察結果 `gcc -pthread -o bbp bbp.c -lm` 與 Sanitizer 連結 `gcc -o bpp bpp.c -lm -pthread -Wall -Wextra -Wshadow -O0 -g -fsanitize=thread` ```shell WARNING: ThreadSanitizer: unlock of an unlocked mutex (or by a wrong thread) (pid=3893) #0 pthread_mutex_unlock <null> (libtsan.so.0+0x3aafc) #1 __jobqueue_fetch_cleanup /home/wayne/linux2021/linux2021_week4/pi.c:139 (bpp+0x6c86) #2 jobqueue_fetch /home/wayne/linux2021/linux2021_week4/pi.c:210 (bpp+0x855a) #3 <null> <null> (libtsan.so.0+0x2d1af) Location is heap block of size 104 at 0x7b1c00000000 allocated by main thread: #0 malloc <null> (libtsan.so.0+0x30343) #1 jobqueue_create /home/wayne/linux2021/linux2021_week4/pi.c:100 (bpp+0x6047) #2 tpool_create /home/wayne/linux2021/linux2021_week4/pi.c:216 (bpp+0x858d) #3 main /home/wayne/linux2021/linux2021_week4/bpp.c:26 (bpp+0x9bf8) Mutex M9 (0x7b1c00000040) created at: #0 pthread_mutex_init <null> (libtsan.so.0+0x4a636) #1 jobqueue_create /home/wayne/linux2021/linux2021_week4/pi.c:104 (bpp+0x6226) #2 tpool_create /home/wayne/linux2021/linux2021_week4/pi.c:216 (bpp+0x858d) #3 main /home/wayne/linux2021/linux2021_week4/bpp.c:26 (bpp+0x9bf8) SUMMARY: ThreadSanitizer: unlock of an unlocked mutex (or by a wrong thread) (/lib/x86_64-linux-gnu/libtsan.so.0+0x3aafc) in pthread_mutex_unlock ================== PI calculated with 101 terms: 3.141592653589793 ThreadSanitizer: reported 1 warnings ``` ```diff=209 // pthread_mutex_unlock(&jobqueue->rwlock); - pthread_cleanup_pop(1); + pthread_cleanup_pop(0); pthread_exit(NULL); } ``` ## 研讀 [atomic_threadpool](https://github.com/Taymindis/atomic_threadpool) 注意 atomic 操作 [lock free](https://preshing.com/20120612/an-introduction-to-lock-free-programming/) 所謂的 lock free 不能只單純看有沒有 lock ,lock free 更重要是一種思維寫程式方式執行在多執行緒下,只要滿足下列條件,就可以說是 lock free ,由下圖可以知道條件不是在是否有 lock 而是有沒有可能 lock 鎖永久才是關鍵 ![](https://i.imgur.com/YP3urPn.png) 文章還提到使用 CAS(Compare-And-Swap) 來達到 lock free 但是要注意 ABA 問題,所謂的 [ABA 問題](https://en.wikipedia.org/wiki/ABA_problem) >Process ${\displaystyle P_{1}}$ reads value A from shared memory, ${\displaystyle P_{1}}$ is preempted, allowing process ${\displaystyle P_{2}}$ to run, ${\displaystyle P_{2}}$ modifies the shared memory value A to value B and back to A before preemption, ${\displaystyle P_{1}}$ begins execution again, sees that the shared memory value has not changed and continues. Although ${\displaystyle P_{1}}$ can continue executing, it is possible that the behavior will not be correct due to the "hidden" modification in shared memory. 此時 ${\displaystyle P_{1}}$ 的執行結果就會是錯的。 [atomic_threadpool](https://github.com/Taymindis/atomic_threadpool) 一開始由 `at_thpool_create` 來建立足夠的 thread 數量,對應到題目就 `tpool_create` 的功用, `at_thpool_newtask` 為建立 task 並 enqueue 至 [lfqueue](https://github.com/Taymindis/lfqueue) 中,當處理完時,也將其中的 task free 掉,對應到題目就 `tpool_apply` 的功用,在 lfqueue 操作中用了不少 CAS(Compare-And-Swap) 指令, enqueue 是透過 `__LFQ_BOOL_COMPARE_AND_SWAP` 從尾部加入,`__LFQ_BOOL_COMPARE_AND_SWAP` 由 GCC atomic function 來完成相關實作,之後再由 `at_thpool_gracefully_shutdown` 將 pool destroy 掉,對應到題目 `tpool_join` 。 :::info [GCC atomic function](https://gcc.gnu.org/onlinedocs/gcc-4.1.1/gcc/Atomic-Builtins.html) 這一類操作,主要是對應到 intel 內部硬體所支援的操作才能達到 atomic operation,所以並不適用於所有硬體,不過沒對應到 GCC 也會相應給出警告。 ``` bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...) type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...) ``` >These builtins perform an atomic compare and swap. That is, if the current value of *ptr is oldval, then write newval into *ptr. The “bool” version returns true if the comparison is successful and newval was written. The “val” version returns the contents of *ptr before the operation. 當 compare 和 swap 完成後才會算成功。 ::: lfqueue 中整個表示接沒有使用到 mutex 都是已 atomic builtin function 來表示。 * enqueue: `__LFQ_BOOL_COMPARE_AND_SWAP(&tail->next, NULL, node)` node 為新的的節點,本來 `tail->next` 為 NULL 並 `__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->tail, tail, node)` 更新 lfqueue tail 的節點。 * dequeue: `__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->head, head, next)` 來維持 head 並將 head `__lfq_recycle_free` 掉。 ## 使用 C11 Atomics 改寫 :::info 嘗試使用 `__sync_bool_compare_and_swap` ```cpp int x = 10; float y = 25.00; int ptr[]={10,20}; printf("ptr %d:%d\n",ptr[0],ptr[1]); printf("x:y %d:%f\n",x,y); bool k = __sync_bool_compare_and_swap(ptr, x, y); printf("k:%d\n",k); printf("ptr %d:%d\n",ptr[0],ptr[1]); printf("x:y %d:%f\n",x,y); k = __sync_bool_compare_and_swap(ptr, x, y); printf("k:%d\n",k); printf("ptr %d:%d\n",ptr[0],ptr[1]); printf("x:y %d:%f\n",x,y); ``` >ptr 10:20 x:y 10:25.000000 k:1 ptr 25:20 x:y 10:25.000000 k:0 ptr 25:20 x:y 10:25.000000 由此可以知道會先比較 `type *ptr, type oldval` 是否相等,才會將 write newval into `*ptr` 當中 ::: 將 `__sync_bool_compare_and_swap` 取代 #### `tpool_apply` ```diff=10 + new_head->next = NULL; -pthread_mutex_lock(&jobqueue->rwlock); if (jobqueue->head) { + __sync_bool_compare_and_swap(&new_head->next, NULL, jobqueue->head); + __sync_bool_compare_and_swap(&jobqueue->head, jobqueue->head, new_head); - new_head->next = jobqueue->head; - jobqueue->head = new_head; } else { + __sync_bool_compare_and_swap(&jobqueue->head, NULL, new_head); - jobqueue->head = jobqueue->tail = new_head; + __sync_bool_compare_and_swap(&jobqueue->tail, NULL, jobqueue->head); pthread_cond_broadcast(&jobqueue->cond_nonempty);//HHH } -pthread_mutex_unlock(&jobqueue->rwlock); ``` 因為全改 builtin atomic operations 形式也可以對應將 mutex 刪去。 實驗 scalability 在本來的情況與部份修改成 builtin atomic operations 的結果 ![](https://i.imgur.com/QIdQexO.png) 雖然只改寫一點,卻可以看得出 scalability 有部分的改善。 執行時間也有 30% 的縮短 ![](https://i.imgur.com/gv03ucw.png) ### [C11](https://en.cppreference.com/w/c/atomic) 改寫 #### `tpool_apply` ```diff= +#include <stdatomic.h> if (jobqueue->head) { - __sync_bool_compare_and_swap(&new_head->next, NULL, jobqueue->head); - __sync_bool_compare_and_swap(&jobqueue->head, jobqueue->head, new_head); + atomic_compare_exchange_strong(&new_head->next, &new_head->next, jobqueue->head); + atomic_compare_exchange_strong(&jobqueue->head, &jobqueue->head, new_head); } else { - __sync_bool_compare_and_swap(&jobqueue->head, NULL, new_head); - __sync_bool_compare_and_swap(&jobqueue->tail, NULL, jobqueue->head); + atomic_compare_exchange_strong(&jobqueue->head, &jobqueue->head, new_head); + atomic_compare_exchange_strong(&jobqueue->tail, &jobqueue->tail, jobqueue->head); pthread_cond_broadcast(&jobqueue->cond_nonempty);//HHH } ``` :::info 參考資料: [atomic_compare_exchange_strong](https://www.khronos.org/registry/OpenCL/sdk/2.0/docs/man/xhtml/atomic_compare_exchange.html) ::: #### `jobqueue_fetch` ```diff= if (jobqueue->head == jobqueue->tail) { - task = jobqueue->tail; - jobqueue->head = jobqueue->tail = NULL; + task = atomic_load(&jobqueue->tail); + atomic_compare_exchange_strong(&jobqueue->head, &jobqueue->head, NULL); + atomic_compare_exchange_strong(&jobqueue->tail, &jobqueue->tail, NULL); } else { threadtask_t *tmp; for (tmp = jobqueue->head; tmp->next != jobqueue->tail; tmp = tmp->next) ; - task = tmp->next; - tmp->next = NULL; - jobqueue->tail = tmp; + task = atmoic_load(&tmp->next); + atomic_compare_exchange_strong(&tmp->next, &tmp->next, NULL); + atomic_compare_exchange_strong(&jobqueue->tail, &jobqueue->tail, tmp); } ``` :::info 參考資料: [atomic_load](https://en.cppreference.com/w/c/atomic/atomic_load) ::: 因此可以看到,在 queue 的維護成本還是相當的巨大,尤其在`for (tmp = jobqueue->head; tmp->next != jobqueue->tail;tmp = tmp->next)` 對於 tail 的維護,是沒辦法改進的,這也是之後朝向 [cmwq](https://www.kernel.org/doc/html/v4.10/core-api/workqueue.html) 的原因。