# 2021q1 Homework4 (quiz4)
contributed by < [`WayneLin1992`](https://github.com/WayneLin1992) >
###### tags: `linux2021`
## 延伸問題
- [x] 解釋程式碼運作原理,包含 timeout 處理機制。
- [x] 指出改進空間並實作
- [x] 研讀 [atomic_threadpool](https://github.com/Taymindis/atomic_threadpool),指出其 atomic 操作,並說明 lock-free
- [x] 嘗試使用 C11 Atomics 改寫,使其有更好的 scalability
## 題目理解
### 理解程式運作原理
[quiz4](https://hackmd.io/@sysprog/linux2021-quiz4)
[thread pool](https://en.wikipedia.org/wiki/Thread_pool)
```cpp
enum __future_flags {
__FUTURE_RUNNING = 01, //0000 0001
__FUTURE_FINISHED = 02, //0000 0010
__FUTURE_TIMEOUT = 04, //0000 0100
__FUTURE_CANCELLED = 010,//0000 1010
__FUTURE_DESTROYED = 020,//0001 0100
};
```
#### `__tpool_future`
```cpp
struct __tpool_future {
int flag;
void *result;
pthread_mutex_t mutex;
pthread_cond_t cond_finished;
};
typedef struct __tpool_future *tpool_future_t;
```
```graphviz
digraph tpool_future_t {
node [shape=record];
rankdir=LR;
__tpool_future [label="<f0>int flag|<f1> result|<f2> mutex|<f3> cond_finished"];
tpool_future_t->__tpool_future
}
```
#### `threadtask_t`
```cpp
typedef struct __threadtask {
void *(*func)(void *);
void *arg;
struct __tpool_future *future;
struct __threadtask *next;
} threadtask_t;
```
```graphviz
digraph tpool_future_t {
node [shape=record];
rankdir=LR;
__threadtask [label="<f0> func|<f1> arg|<f2> future|<f3> next"];
__threadtask1 [label="<f0> func|<f1> arg|<f2> future|<f3> next"];
__tpool_future [label="<f0>int flag|<f1> result|<f2> mutex|<f3> cond_finished"];
__threadtask:f3->__threadtask1;
__threadtask:f2->__tpool_future;
}
```
#### `jobqueue_t`
```cpp
typedef struct __jobqueue {
threadtask_t *head, *tail;
pthread_cond_t cond_nonempty;
pthread_mutex_t rwlock;
} jobqueue_t;
```
```graphviz
digraph tpool_future_t {
node [shape=record];
rankdir=LR;
__threadtask [label="<f0> func|<f1> arg|<f2> future|<f3> next"];
__threadtask1 [label="<f0> func|<f1> arg|<f2> future|<f3> next"];
__jibqueue [label="<f0> head|<f1> tail|<f2> cond_noempty|<f3> rwlock"];
more[label="..."]
__jibqueue:f0->__threadtask
__jibqueue:f1->__threadtask1
__threadtask:f3->more
more->__threadtask1
}
```
#### `tpool_t`
```cpp
struct __threadpool {
size_t count;
pthread_t *workers;
jobqueue_t *jobqueue;
};
typedef struct __threadpool *tpool_t;
```
```graphviz
digraph tpool_future_t {
node [shape=record];
rankdir=LR;
__threadpool [label="<f0> count|<f1> workers|<f2> jobqueue"];
tpool_t->__threadpool;
__threadtask [label="<f0> func|<f1> arg|<f2> future|<f3> next"];
__threadtask1 [label="<f0> func|<f1> arg|<f2> future|<f3> next"];
__jibqueue [label="<f0> head|<f1> tail|<f2> cond_noempty|<f3> rwlock"];
more[label="..."]
__jibqueue:f0->__threadtask
__jibqueue:f1->__threadtask1
__threadtask:f3->more
more->__threadtask1
__threadpool:f2->__jibqueue
}
```
#### `tpool_future_create`
```cpp
static struct __tpool_future *tpool_future_create(void)
{
struct __tpool_future *future = malloc(sizeof(struct __tpool_future));
if (future) {
future->flag = 0;
future->result = NULL;
pthread_mutex_init(&future->mutex, NULL);
pthread_condattr_t attr;
pthread_condattr_init(&attr);
pthread_cond_init(&future->cond_finished, &attr);
pthread_condattr_destroy(&attr);
}
return future;
}
```
建立 `tpool_future` 物件,一併初始化 mutex, condition variable, condition variable attributes object。
[pthread_mutex_init](https://linux.die.net/man/3/pthread_mutex_init) 使用 mutex 必須要初始化,要不然會出現 undefined behavior (UB)
:::info
:bell: 建了 mutex 使用 pthread_mutex_init 在結束時還必須要對應要寫 pthread_mutex_destory 將 mutex 銷毀
:::
[pthread_condattr_init](https://linux.die.net/man/3/pthread_condattr_init) 建立 condition attributes object 需要初始化,沒初始一樣會 undefined behavior (UB) , 也需要對稱寫 destory,attr 預設為 PTHREAD_PROCESS_PRIVATE 。
:::info
[pthread_condattr_init()](http://www.qnx.com/developers/docs/6.5.0/index.jsp?topic=%2Fcom.qnx.doc.neutrino_lib_ref%2Fp%2Fpthread_condattr_init.html)
>The pthread_condattr_init() function initializes the attributes in the condition variable attribute object attr to default values. Pass attr to pthread_cond_init() to define the attributes of the condition variable.
由 `pthread_condattr_init` 初始化的 attr 將會輸入 `pthrad_cond_init` 所以可以知道`pthread_condattr_init` `pthrad_cond_init` 為一組,不能刪去, return 0 代表初始化成功,失敗會回傳錯誤編號。
[int pthread_condattr_init(pthread_condattr_t *attr) in ibm](https://www.ibm.com/support/knowledgecenter/en/ssw_ibm_i_74/apis/users_71.htm)
由上面例子改寫
```cpp
pthread_cond_t cond;
int main(){
int rc=0;
pthread_condattr_t attr;
printf("Create a default condition attribute\n");
rc = pthread_condattr_init(&attr);
printf("pthread_condattr_init : %d\n", rc);
printf("Create the condition using the condition attributes object\n");
rc = pthread_cond_init(&cond, &attr);
printf("pthread_cond_init : %d\n", rc);
printf("Destroy cond attribute\n");
rc = pthread_condattr_destroy(&attr);
printf("pthread_condattr_destroy : %d\n", rc);
printf("Destroy condition\n");
rc = pthread_cond_destroy(&cond);
printf("pthread_cond_destroy : %d\n", rc);
return 0;
}
```
>Create a default condition attribute
pthread_condattr_init : 0
Create the condition using the condition attributes object
pthread_cond_init : 0
Destroy cond attribute
pthread_condattr_destroy : 0
Destroy condition
pthread_cond_destroy : 0
可以看出來 attr 將會輸入 `pthread_cond_init` 並且產生對應的 condition variable。
所以這裡 `pthread_condattr_init` ,這裡 attr 會被設為 0 ,所以也可以把 `pthread_condattr_init` `pthread_condattr_destroy` 刪去也可以執行。
:::
[pthread_condattr_destroy](https://linux.die.net/man/3/pthread_condattr_destroy)
這裡建了 condattr 又 destroy, ~~**推測這應該是可以改進的地方**~~ ,由上可以知道當建立好 condition variable 就可以先將 attr destroy 掉, return 0 ,代表成功。
[pthread_cond_init](https://linux.die.net/man/3/pthread_cond_init) 初始化 condition variable ,一樣在結束時必須對應著 destroy , return 0 ,代表成功。
:::info
[pthread_cond_init() in oracle](https://docs.oracle.com/cd/E19455-01/806-5257/6je9h032r/index.html#sync-59145)
```cpp
int main(){
pthread_cond_t cv;
pthread_condattr_t cattr;
int ret;
/* initialize a condition variable to its default value */
ret = pthread_cond_init(&cv, NULL);
printf( "pthread_cond_init : %d\n", ret);
/* initialize a condition variable */
ret = pthread_cond_init(&cv, &cattr);
printf( "pthread_cond_init : %d\n", ret);
}
```
>pthread_cond_init : 0
pthread_cond_init : 0
即便沒有 `pthread_condattr_init` 也會以 default value ( is NULL) 方式代入,也可以直接輸入 NULL 來省去存放 cattr 記憶體空間。
:::
```graphviz
digraph tpool_future_t {
node [shape=record];
rankdir=LR;
__tpool_future [label="<f0>int flag|<f1> result|<f2> mutex|<f3> cond_finished"];
__tpool_future:f0 -> 0
__tpool_future:f1 -> NULL
}
```
:::info
參考資料:
[Condition Variable Attributes](https://docs.oracle.com/cd/E19455-01/806-5257/6je9h032q/index.html#sync-81445)
:::
#### `tpool_future_destory`
```cpp
int tpool_future_destroy(struct __tpool_future *future)
{
if (future) {
pthread_mutex_lock(&future->mutex);
if (future->flag & __FUTURE_FINISHED ||
future->flag & __FUTURE_CANCELLED) {
pthread_mutex_unlock(&future->mutex);
pthread_mutex_destroy(&future->mutex);
pthread_cond_destroy(&future->cond_finished);
free(future);
} else {
future->flag |= __FUTURE_DESTROYED;
pthread_mutex_unlock(&future->mutex);
}
}
return 0;
}
```
銷毀 `tpool_future_destroy` 當 `flag` 為 `__FUTURE_FINISHED` 和 `__FUTURE_CANCELLED` 時就要銷毀,銷毀要對應把 mutex, condition variable , `flag` 為 `__FUTURE_DESTROYED` 名子為 destroy 但卻保持留 mutex 和 condition variable 這有點問題,**之後也許需要改進**
#### `tpool_future_get`
```cpp
void *tpool_future_get(struct __tpool_future *future, unsigned int seconds)
{
pthread_mutex_lock(&future->mutex);
/* turn off the timeout bit set previously */
future->flag &= ~__FUTURE_TIMEOUT;
while ((future->flag & __FUTURE_FINISHED) == 0) {
if (seconds) {
struct timespec expire_time;
clock_gettime(CLOCK_MONOTONIC, &expire_time);
expire_time.tv_sec += seconds;
int status = pthread_cond_timedwait(&future->cond_finished,
&future->mutex, &expire_time);
if (status == ETIMEDOUT) {
future->flag |= __FUTURE_TIMEOUT;
pthread_mutex_unlock(&future->mutex);
return NULL;
}
} else
pthread_cond_wait(&future->cond_finished, &future->mutex);//FFF
}
pthread_mutex_unlock(&future->mutex);
return future->result;
}
```
將 `future->result` 提取出來
[pthread_cond_timedwait](https://linux.die.net/man/3/pthread_cond_timedwait) 要在 mutex 後呼叫,與 cond_wait 類似,cond_timewait 的 condition variable 由時間決定。
`ETIMEDOUT` 為 `pthread_cond_timedwait` return 的 error , 正常 return 為 0 , `return ETIMEDOUT` 代表為超時的意思,對應 `flag = __FUTURE_TIMEOUT` 並解鎖 mutex 。
由 main 部份可以知道這裡 second 將設為 0 所以實際執行將會不斷 cond_wait 的狀況。
:::info
[pthread_cond_wait](https://linux.die.net/man/3/pthread_cond_wait)
[pthread_cond_wait in ibm](https://www.ibm.com/docs/en/i/7.4?topic=ssw_ibm_i_74/apis/users_78.htm)
`int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);`
**增加實作測試**
:::
#### `jobqueue_create`
```cpp
static jobqueue_t *jobqueue_create(void)
{
jobqueue_t *jobqueue = malloc(sizeof(jobqueue_t));
if (jobqueue) {
jobqueue->head = jobqueue->tail = NULL;
pthread_cond_init(&jobqueue->cond_nonempty, NULL);
pthread_mutex_init(&jobqueue->rwlock, NULL);
}
return jobqueue;
}
```
建立 `jobqueue`
#### `jobqueue_destroy`
```cpp
static void jobqueue_destroy(jobqueue_t *jobqueue)
{
threadtask_t *tmp = jobqueue->head;
while (tmp) {
jobqueue->head = jobqueue->head->next;
pthread_mutex_lock(&tmp->future->mutex);
if (tmp->future->flag & __FUTURE_DESTROYED) {
pthread_mutex_unlock(&tmp->future->mutex);
pthread_mutex_destroy(&tmp->future->mutex);
pthread_cond_destroy(&tmp->future->cond_finished);
free(tmp->future);
} else {
tmp->future->flag |= __FUTURE_CANCELLED;
pthread_mutex_unlock(&tmp->future->mutex);
}
free(tmp);
tmp = jobqueue->head;
}
pthread_mutex_destroy(&jobqueue->rwlock);
pthread_cond_destroy(&jobqueue->cond_nonempty);
free(jobqueue);
}
```
銷毀 `jobqueue` 依序從 `head` 開始 `free`,直到 `tail`
#### `__jobqueue_fetch_cleanup`
```cpp
static void __jobqueue_fetch_cleanup(void *arg)
{
pthread_mutex_t *mutex = (pthread_mutex_t *) arg;
pthread_mutex_unlock(mutex);
}
```
將 `arg` 轉 type 為 `pthread_mutex_t` ,由下面例子可以知道 `pthread_cleanup_push` 將把 arg 代入 function ,而我們的 function 為 `pthread_mutex_unlock` 所以對應的 arg 為 mutex 所以為 `jobqueue->rwlock` 。~~,不過沒有初始化就先解鎖,~~~~**可能會有問題**。~~
#### `jobqueue_fetch`
```cpp
static void *jobqueue_fetch(void *queue)
{
jobqueue_t *jobqueue = (jobqueue_t *) queue;
threadtask_t *task;
int old_state;
pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock);
while (1) {
pthread_mutex_lock(&jobqueue->rwlock);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state);
pthread_testcancel();
while (!jobqueue->tail) pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock);//GGG
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
if (jobqueue->head == jobqueue->tail) {
task = jobqueue->tail;
jobqueue->head = jobqueue->tail = NULL;
} else {
threadtask_t *tmp;
for (tmp = jobqueue->head; tmp->next != jobqueue->tail;
tmp = tmp->next)
;
task = tmp->next;
tmp->next = NULL;
jobqueue->tail = tmp;
}
pthread_mutex_unlock(&jobqueue->rwlock);
if (task->func) {
pthread_mutex_lock(&task->future->mutex);
if (task->future->flag & __FUTURE_CANCELLED) {
pthread_mutex_unlock(&task->future->mutex);
free(task);
continue;
} else {
task->future->flag |= __FUTURE_RUNNING;
pthread_mutex_unlock(&task->future->mutex);
}
void *ret_value = task->func(task->arg);
pthread_mutex_lock(&task->future->mutex);
if (task->future->flag & __FUTURE_DESTROYED) {
pthread_mutex_unlock(&task->future->mutex);
pthread_mutex_destroy(&task->future->mutex);
pthread_cond_destroy(&task->future->cond_finished);
free(task->future);
} else {
task->future->flag |= __FUTURE_FINISHED;//KKK
task->future->result = ret_value;
pthread_cond_broadcast(&task->future->cond_finished);//LLL
pthread_mutex_unlock(&task->future->mutex);
}
free(task);
} else {
pthread_mutex_destroy(&task->future->mutex);
pthread_cond_destroy(&task->future->cond_finished);
free(task->future);
free(task);
break;
}
}
pthread_cleanup_pop(0);
pthread_exit(NULL);
}
```
[pthread_cleanup_push](https://man7.org/linux/man-pages/man3/pthread_cleanup_push.3.html)
>void pthread_cleanup_push(void (*routine)(void *), void *arg);
>The pthread_cleanup_push() function pushes routine onto the top
>of the stack of clean-up handlers. When routine is later
>invoked, it will be given arg as its argument.
:::info
[pthread_cleanup_push() ibm](https://www.ibm.com/support/knowledgecenter/SSLTBW_2.4.0/com.ibm.zos.v2r4.bpxbd00/ptcpush.htm)
1. `pthread_cleanup_push()` 和 `pthread_cleanup_pop()` 為一組必須要成對出現。
2. cleanup handlers 就是指當 thread 結束時,會呼叫 pthread_exit 或 pthread_cancel
3. push 和 pop 將以 stack 方式表現,所以會 last in first out(LIFO) 方式
4. 由於結束時會呼叫 ptherad_exit 會將資源釋放,但 mutex 可能還會是上鎖的,所以要在 `pthread_cleanup_pop()` 前 unlock mutex 表現方式如下
```cpp
pthread_cleanup_push(pthread_mutex_unlock, (void *) &mut);
pthread_mutex_lock(&mut);
/* do some work */
pthread_mutex_unlock(&mut);
pthread_cleanup_pop(0);
```
其中 `pthread_cleanup_pop(1);` 可以代替 `pthread_mutex_unlock(&mut);` 及 `pthread_cleanup_pop(0);`
:bell: 這表達方式只適用在同步的情況下,非同步,有可能其他 thread 還存在 mutex lock 的情況。
考量到非同步要修改成如下
```cpp
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,&oldtype);
pthread_cleanup_push(pthread_mutex_unlock,(void *)&mut);
pthread_mutex_lock(&mut);
/* do some work */
pthread_cleanup_pop(1);
pthread_setcanceltype(oldtype,NULL);
```
[pthread_setcanceltype() in ibm](https://www.ibm.com/support/knowledgecenter/SSLTBW_2.3.0/com.ibm.zos.v2r3.bpxbd00/pthread_setcanceltype.htm#pthread_setcanceltype)
`pthread_setcanceltype()` cancel type ,而且 oldtype 將回存取舊的 type , `PTHREAD_CANCEL_ASYNCHRONOUS` thread 可以隨時取消。 `PTHREAD_CANCEL_DEFERRED` 只有在特定的 cancellation point 可以取消。
參考資料:
[Library C GNC cleanup handler](https://elias.rhi.hi.is/libc/Cleanup-Handlers.html)
:::
`pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock);`
其中第一個參數為要被 push into stack 的 thread , arg 將與 thread 相關
:::success
由整體可以知道 tpool_create 使用到 `pthread_create(&pool->workers[i], NULL, jobqueue_fetch,(void *) jobqueue)` 建立 thread 並 jobqueue_fetch(jobqueue) 可以知道,將 jobqueue 任務提取出來,task 將會從 jobqueue tail -> head 的方式依序處理,處裡完的 task 將會被 free 。
:::
[pthread_setcancelstate](https://www.man7.org/linux/man-pages/man3/pthread_setcanceltype.3.html)
>The pthread_setcancelstate() sets the cancelability state of the
calling thread to the value given in state.
The pthread_setcanceltype() sets the cancelability type of the
calling thread to the value given in type. The previous
cancelability type of the thread is returned in the buffer
pointed to by oldtype.
`old_state` 將會存放當初進入之前的狀態。
`PTHREAD_CANCEL_ENABLE` 代表 thread 可以被 cancel ,成功 cancel 將會 `return 0` 失敗將會 `return EINVAL` ,還有 `PTHREAD_CANCEL_DISABLE` 代表 thread 不能取消。
:::info
參考資料:
[pthread_setcancelstate() in ibm](https://www.ibm.com/support/knowledgecenter/SSLTBW_2.3.0/com.ibm.zos.v2r3.bpxbd00/pthread_setcancelstate.htm)
[pthread_setcanceltype() in ibm](https://www.ibm.com/support/knowledgecenter/SSLTBW_2.3.0/com.ibm.zos.v2r3.bpxbd00/pthread_setcanceltype.htm#pthread_setcanceltype)
:::
[pthread_testcancel](https://man7.org/linux/man-pages/man3/pthread_testcancel.3.html)
>Calling pthread_testcancel() creates a cancellation point within
the calling thread, so that a thread that is otherwise executing
code that contains no cancellation points will respond to a
cancellation request.
其中 `pthread_testcancel` 為設定 cancellation point 主要應用在非同步的情況,到這裡就會 cancel thread ,當可能壅塞的地方也要設立 cancellation point避免漫長等待。
:::info
參考資料:
[pthread_testcancel() in ibm](https://www.ibm.com/support/knowledgecenter/ssw_ibm_i_71/apis/users_46.htm)
[Cancellation Points](https://docs.oracle.com/cd/E19120-01/open.solaris/816-5137/tlib-62067/index.html)
:::
#### `tpool_create`
```cpp
struct __threadpool *tpool_create(size_t count)
{
jobqueue_t *jobqueue = jobqueue_create();
struct __threadpool *pool = malloc(sizeof(struct __threadpool));
if (!jobqueue || !pool) {
if (jobqueue)
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
pool->count = count, pool->jobqueue = jobqueue;
if ((pool->workers = malloc(count * sizeof(pthread_t)))) {
for (int i = 0; i < count; i++) {
if (pthread_create(&pool->workers[i], NULL, jobqueue_fetch,
(void *) jobqueue)) {
for (int j = 0; j < i; j++)
pthread_cancel(pool->workers[j]);
for (int j = 0; j < i; j++)
pthread_join(pool->workers[j], NULL);
free(pool->workers);
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
}
return pool;
}
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
```
```graphviz
digraph tpool_future_t {
node [shape=record];
rankdir=LR;
__threadpool [label="<f0> count|<f1> workers|<f2> jobqueue"];
tpool_t->__threadpool;
__threadtask [label="<f0> func|<f1> arg|<f2> future|<f3> next"];
__threadtask1 [label="<f0> func|<f1> arg|<f2> future|<f3> next"];
__jibqueue [label="<f0> head|<f1> tail|<f2> cond_noempty|<f3> rwlock"];
more[label="..."]
__jibqueue:f0->__threadtask
__jibqueue:f1->__threadtask1
__threadtask:f3->more
more->__threadtask1
__threadpool:f2->__jibqueue
}
```
建立一個 `tpool` 要對應建立一個 `jobqueue`
`count` 代表總共有幾個 `thread` 也就是幾個 `workers`
```cpp
for (int i = 0; i < count; i++) {
if (pthread_create(&pool->workers[i], NULL, jobqueue_fetch,
(void *) jobqueue)) {
for (int j = 0; j < i; j++)
pthread_cancel(pool->workers[j]);
for (int j = 0; j < i; j++)
pthread_join(pool->workers[j], NULL);
free(pool->workers);
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
}
```
其中假如 pool 沒有成功 malloc , 就會將其中的 `wokers` `jobqueue` free 掉,但這部分會特意只保留 `worker[i]`,因為其他的 worker 被建立又被 cancel 掉,~~**這部分可能有改進部分**~~,由下面可以知道 thread create 成功將會 return 0 所以 if 將會跳至 return pool 的部分。
:::info
`pthread_create`
```cpp
int pthread_create(pthread_t *restrict thread, const pthread_attr_t *restrict attr, void *(*start_routine)(void *),void *restrict arg);
```
The pthread_create() function starts a new thread in the calling process. The new thread starts execution by invoking start_routine(); arg is passed as the sole argument of start_routine().
其中 `pthread_t *restrict thread` 為建立的 thread , `const pthread_attr_t *restrict attr` 為 condition attributes object 可以設定為 `NULL` , `void *(*start_routine)(void *)` 為 function `void *restrict arg` 為輸入 function 的 argument 。
`pthread_create(&pool->workers[i], NULL, jobqueue_fetch, (void *) jobqueue)`
對應到上, worker 代表 thread , `jobqueue_fetch` 為 function , `jobqueue` 為輸入的 argument , 查看 `jobqueue_fetch` `static void *jobqueue_fetch(void *queue)` 就是如此 , 建立成功 return 0 失敗將會 return EBUSY
參考資料:
[pthread_create()](https://man7.org/linux/man-pages/man3/pthread_create.3.html)
[pthread_create() in ibm](https://www.ibm.com/docs/en/i/7.4?topic=ssw_ibm_i_74/apis/users_14.htm)
:::
#### `tpool_apply`
```cpp
struct __tpool_future *tpool_apply(struct __threadpool *pool,
void *(*func)(void *),
void *arg)
{
jobqueue_t *jobqueue = pool->jobqueue;
threadtask_t *new_head = malloc(sizeof(threadtask_t));
struct __tpool_future *future = tpool_future_create();
if (new_head && future) {
new_head->func = func, new_head->arg = arg, new_head->future = future;
pthread_mutex_lock(&jobqueue->rwlock);
if (jobqueue->head) {
new_head->next = jobqueue->head;
jobqueue->head = new_head;
} else {
jobqueue->head = jobqueue->tail = new_head;
pthread_cond_broadcast(&jobqueue->cond_nonempty);//HHH
}
pthread_mutex_unlock(&jobqueue->rwlock);
} else if (new_head) {
free(new_head);
return NULL;
} else if (future) {
tpool_future_destroy(future);
return NULL;
}
return future;
}
```
將 `pool->jobqueue` ,並對應產生 threadtask 。
```cpp
pthread_mutex_lock(&jobqueue->rwlock);
if (jobqueue->head) {
new_head->next = jobqueue->head;
jobqueue->head = new_head;
} else {
jobqueue->head = jobqueue->tail = new_head;
pthread_cond_broadcast(&jobqueue->cond_nonempty);//HHH
}
pthread_mutex_unlock(&jobqueue->rwlock);
```
jobqueue 為 add the head 方式 , 但其中 mutex 為 coarse graind lock **可以使用更細緻一點**。
:::warning
你應該提出縮小 critical section 的手段
:notes: jserv
:::
:::info
思考此地方有可能 critical section 情況
將這部分 mutex 全數移除:`jobqueue` 的順序有可能不正確,但也可以得到正確的結果。
>PI calculated with 101 terms: 3.141592653589793
有可能兩個 thread 都想建立 `jobqueue->head` 連結,所以因此產生順序錯誤,再有鎖的情況下,只許一個 thread 進行 add head 的動作,但經過 gdb 執行狀況,發現到 `jobqueue->head` 一直保持在 `NULL` 最主要是因為 `jobqueue->head` ~~lifetime 不足,也沒將 return jobqueue ,使得 `jobqueue->head` 只執行 `else` 的部分,所以這裡就算將 mutex lock 移除對整體也不會有影響。~~ 並不是 lifetime 不足才導致,是因為 thread 已經處理完,並且 `free(task)` 才會有這樣的結果。
:::
#### `tpool_join`
```cpp
int tpool_join(struct __threadpool *pool)
{
size_t num_threads = pool->count;
for (int i = 0; i < num_threads; i++)
tpool_apply(pool, NULL, NULL);
for (int i = 0; i < num_threads; i++)
pthread_join(pool->workers[i], NULL);
free(pool->workers);
jobqueue_destroy(pool->jobqueue);
free(pool);
return 0;
}
```
產生 `tpool` 並加入
#### Bailey–Borwein–Plouffe formula
```cpp
/* Use Bailey–Borwein–Plouffe formula to approximate PI */
static void *bpp(void *arg)
{
int k = *(int *) arg;
double sum = (4.0 / (8 * k + 1)) - (2.0 / (8 * k + 4)) -
(1.0 / (8 * k + 5)) - (1.0 / (8 * k + 6));
double *product = malloc(sizeof(double));
if (product)
*product = 1 / pow(16, k) * sum;
return (void *) product;
}
```
$\pi=\displaystyle\sum_{k=0}^{\infty}\frac{1}{16^k}(\frac{4}{8k+1}-\frac{2}{8k+4}-\frac{1}{8k+5}-\frac{1}{8k+6})$
bpp 將上式表示為 function 。
:::warning
用 LaTeX 語法改寫 BBP 公式
:notes: jserv
:::
#### `main`
```cpp
#define PRECISION 100 /* upper bound in BPP sum */
int main()
{
int bpp_args[PRECISION + 1];
double bpp_sum = 0;
tpool_t pool = tpool_create(4);
tpool_future_t futures[PRECISION + 1];
for (int i = 0; i <= PRECISION; i++) {
bpp_args[i] = i;
futures[i] = tpool_apply(pool, bpp, (void *) &bpp_args[i]);
}
for (int i = 0; i <= PRECISION; i++) {
double *result = tpool_future_get(futures[i], 0 /* blocking wait */);
bpp_sum += *result;
tpool_future_destroy(futures[i]);
free(result);
}
tpool_join(pool);
printf("PI calculated with %d terms: %.15f\n", PRECISION + 1, bpp_sum);
return 0;
}
```
>PI calculated with 101 terms: 3.141592653589793
`tpool_t pool = tpool_create(4)` `count` = 4 。
第一個 `for` 迴圈: 建立 `function` = `bbp` 並與 `jobqueue` 在與 `pool` 連結進行管理。
第二個 `for` 迴圈: 使用 `tpool_future_get` 將 `result` 提取 `bpp_sum` 將全部 `result` 合併,並銷毀 `thread`
銷毀 `pool` 映出結果。
其中 `PRECISION + 1` 是因為 0 算第一項,所以 `k=100` 時就是第 101 項。
:::info
`pthread_join`
>The pthread_join() function waits for the thread specified by thread to terminate. If that thread has already terminated, then pthread_join() returns immediately. The thread specified by thread must be joinable.
成功結束,將 return 0 錯誤 return 如下:
>EDEADLK A deadlock was detected (e.g., two threads tried to join with each other); or thread specifies the calling thread.
EINVAL thread is not a joinable thread.
EINVAL Another thread is already waiting to join with this thread.
ESRCH No thread with the ID thread could be found.
:::
### 實際執行結果
執行時發現到 `pool` 和 `futures` 是分開的,而其中 `futures[0]` 到 `futures[100]` 而且 `jobqueue->head` 及 `jobqueue->tail` = `NULL` 表示沒作用到。 `futuresp[0]->flag` = `3` 在 `tpool_future_destroy(futures[i])` 時 `&2` 會有值所以會成功 destroy 掉並 `free(future)` 。
## 指出改進空間並實作
### `tpool_future_destroy`
```diff=
int tpool_future_destroy(struct __tpool_future *future)
{
if (future) {
pthread_mutex_lock(&future->mutex);
if (future->flag & __FUTURE_FINISHED ||
future->flag & __FUTURE_CANCELLED) {
pthread_mutex_unlock(&future->mutex);
pthread_mutex_destroy(&future->mutex);
pthread_cond_destroy(&future->cond_finished);
free(future);
} else {
future->flag |= __FUTURE_DESTROYED;
pthread_mutex_unlock(&future->mutex);
+ pthread_mutex_destroy(&future->mutex);
+ pthread_cond_destroy(&future->cond_finished);
+ free(future);
}
}
return 0;
}
```
當 `else` 時一樣把 futures destroy 掉,並釋放 future 空間。
### critical section 縮小
#### `tpool_apply`
```diff=10
-pthread_mutex_lock(&jobqueue->rwlock);
if (jobqueue->head) {
+ pthread_mutex_lock(&jobqueue->rwlock);
new_head->next = jobqueue->head;
jobqueue->head = new_head;
+ pthread_mutex_unlock(&jobqueue->rwlock);
} else {
+ pthread_mutex_lock(&jobqueue->rwlock);
jobqueue->head = jobqueue->tail = new_head;
pthread_cond_broadcast(&jobqueue->cond_nonempty);//HHH
+ pthread_mutex_unlock(&jobqueue->rwlock);
}
- pthread_mutex_unlock(&jobqueue->rwlock);
```
####
### Sanitizer 觀察結果
`gcc -pthread -o bbp bbp.c -lm`
與 Sanitizer 連結 `gcc -o bpp bpp.c -lm -pthread -Wall -Wextra -Wshadow -O0 -g -fsanitize=thread`
```shell
WARNING: ThreadSanitizer: unlock of an unlocked mutex (or by a wrong thread) (pid=3893)
#0 pthread_mutex_unlock <null> (libtsan.so.0+0x3aafc)
#1 __jobqueue_fetch_cleanup /home/wayne/linux2021/linux2021_week4/pi.c:139 (bpp+0x6c86)
#2 jobqueue_fetch /home/wayne/linux2021/linux2021_week4/pi.c:210 (bpp+0x855a)
#3 <null> <null> (libtsan.so.0+0x2d1af)
Location is heap block of size 104 at 0x7b1c00000000 allocated by main thread:
#0 malloc <null> (libtsan.so.0+0x30343)
#1 jobqueue_create /home/wayne/linux2021/linux2021_week4/pi.c:100 (bpp+0x6047)
#2 tpool_create /home/wayne/linux2021/linux2021_week4/pi.c:216 (bpp+0x858d)
#3 main /home/wayne/linux2021/linux2021_week4/bpp.c:26 (bpp+0x9bf8)
Mutex M9 (0x7b1c00000040) created at:
#0 pthread_mutex_init <null> (libtsan.so.0+0x4a636)
#1 jobqueue_create /home/wayne/linux2021/linux2021_week4/pi.c:104 (bpp+0x6226)
#2 tpool_create /home/wayne/linux2021/linux2021_week4/pi.c:216 (bpp+0x858d)
#3 main /home/wayne/linux2021/linux2021_week4/bpp.c:26 (bpp+0x9bf8)
SUMMARY: ThreadSanitizer: unlock of an unlocked mutex (or by a wrong thread) (/lib/x86_64-linux-gnu/libtsan.so.0+0x3aafc) in pthread_mutex_unlock
==================
PI calculated with 101 terms: 3.141592653589793
ThreadSanitizer: reported 1 warnings
```
```diff=209
// pthread_mutex_unlock(&jobqueue->rwlock);
- pthread_cleanup_pop(1);
+ pthread_cleanup_pop(0);
pthread_exit(NULL);
}
```
## 研讀 [atomic_threadpool](https://github.com/Taymindis/atomic_threadpool) 注意 atomic 操作
[lock free](https://preshing.com/20120612/an-introduction-to-lock-free-programming/)
所謂的 lock free 不能只單純看有沒有 lock ,lock free 更重要是一種思維寫程式方式執行在多執行緒下,只要滿足下列條件,就可以說是 lock free ,由下圖可以知道條件不是在是否有 lock 而是有沒有可能 lock 鎖永久才是關鍵
![](https://i.imgur.com/YP3urPn.png)
文章還提到使用 CAS(Compare-And-Swap) 來達到 lock free 但是要注意 ABA 問題,所謂的 [ABA 問題](https://en.wikipedia.org/wiki/ABA_problem)
>Process ${\displaystyle P_{1}}$ reads value A from shared memory,
${\displaystyle P_{1}}$ is preempted, allowing process ${\displaystyle P_{2}}$ to run,
${\displaystyle P_{2}}$ modifies the shared memory value A to value B and back to A before preemption,
${\displaystyle P_{1}}$ begins execution again, sees that the shared memory value has not changed and continues.
Although ${\displaystyle P_{1}}$ can continue executing, it is possible that the behavior will not be correct due to the "hidden" modification in shared memory.
此時 ${\displaystyle P_{1}}$ 的執行結果就會是錯的。
[atomic_threadpool](https://github.com/Taymindis/atomic_threadpool) 一開始由 `at_thpool_create` 來建立足夠的 thread 數量,對應到題目就 `tpool_create` 的功用, `at_thpool_newtask` 為建立 task 並 enqueue 至 [lfqueue](https://github.com/Taymindis/lfqueue) 中,當處理完時,也將其中的 task free 掉,對應到題目就 `tpool_apply` 的功用,在 lfqueue 操作中用了不少 CAS(Compare-And-Swap) 指令, enqueue 是透過 `__LFQ_BOOL_COMPARE_AND_SWAP` 從尾部加入,`__LFQ_BOOL_COMPARE_AND_SWAP` 由 GCC atomic function 來完成相關實作,之後再由 `at_thpool_gracefully_shutdown` 將 pool destroy 掉,對應到題目 `tpool_join` 。
:::info
[GCC atomic function](https://gcc.gnu.org/onlinedocs/gcc-4.1.1/gcc/Atomic-Builtins.html)
這一類操作,主要是對應到 intel 內部硬體所支援的操作才能達到 atomic operation,所以並不適用於所有硬體,不過沒對應到 GCC 也會相應給出警告。
```
bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)
type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)
```
>These builtins perform an atomic compare and swap. That is, if the current value of *ptr is oldval, then write newval into *ptr.
The “bool” version returns true if the comparison is successful and newval was written. The “val” version returns the contents of *ptr before the operation.
當 compare 和 swap 完成後才會算成功。
:::
lfqueue 中整個表示接沒有使用到 mutex 都是已 atomic builtin function 來表示。
* enqueue: `__LFQ_BOOL_COMPARE_AND_SWAP(&tail->next, NULL, node)` node 為新的的節點,本來 `tail->next` 為 NULL 並 `__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->tail, tail, node)` 更新 lfqueue tail 的節點。
* dequeue: `__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->head, head, next)` 來維持 head 並將 head `__lfq_recycle_free` 掉。
## 使用 C11 Atomics 改寫
:::info
嘗試使用 `__sync_bool_compare_and_swap`
```cpp
int x = 10;
float y = 25.00;
int ptr[]={10,20};
printf("ptr %d:%d\n",ptr[0],ptr[1]);
printf("x:y %d:%f\n",x,y);
bool k = __sync_bool_compare_and_swap(ptr, x, y);
printf("k:%d\n",k);
printf("ptr %d:%d\n",ptr[0],ptr[1]);
printf("x:y %d:%f\n",x,y);
k = __sync_bool_compare_and_swap(ptr, x, y);
printf("k:%d\n",k);
printf("ptr %d:%d\n",ptr[0],ptr[1]);
printf("x:y %d:%f\n",x,y);
```
>ptr 10:20
x:y 10:25.000000
k:1
ptr 25:20
x:y 10:25.000000
k:0
ptr 25:20
x:y 10:25.000000
由此可以知道會先比較 `type *ptr, type oldval` 是否相等,才會將 write newval into `*ptr` 當中
:::
將 `__sync_bool_compare_and_swap` 取代
#### `tpool_apply`
```diff=10
+ new_head->next = NULL;
-pthread_mutex_lock(&jobqueue->rwlock);
if (jobqueue->head) {
+ __sync_bool_compare_and_swap(&new_head->next, NULL, jobqueue->head);
+ __sync_bool_compare_and_swap(&jobqueue->head, jobqueue->head, new_head);
- new_head->next = jobqueue->head;
- jobqueue->head = new_head;
} else {
+ __sync_bool_compare_and_swap(&jobqueue->head, NULL, new_head);
- jobqueue->head = jobqueue->tail = new_head;
+ __sync_bool_compare_and_swap(&jobqueue->tail, NULL, jobqueue->head);
pthread_cond_broadcast(&jobqueue->cond_nonempty);//HHH
}
-pthread_mutex_unlock(&jobqueue->rwlock);
```
因為全改 builtin atomic operations 形式也可以對應將 mutex 刪去。
實驗 scalability 在本來的情況與部份修改成 builtin atomic operations 的結果
![](https://i.imgur.com/QIdQexO.png)
雖然只改寫一點,卻可以看得出 scalability 有部分的改善。
執行時間也有 30% 的縮短
![](https://i.imgur.com/gv03ucw.png)
### [C11](https://en.cppreference.com/w/c/atomic) 改寫
#### `tpool_apply`
```diff=
+#include <stdatomic.h>
if (jobqueue->head) {
- __sync_bool_compare_and_swap(&new_head->next, NULL, jobqueue->head);
- __sync_bool_compare_and_swap(&jobqueue->head, jobqueue->head, new_head);
+ atomic_compare_exchange_strong(&new_head->next, &new_head->next, jobqueue->head);
+ atomic_compare_exchange_strong(&jobqueue->head, &jobqueue->head, new_head);
} else {
- __sync_bool_compare_and_swap(&jobqueue->head, NULL, new_head);
- __sync_bool_compare_and_swap(&jobqueue->tail, NULL, jobqueue->head);
+ atomic_compare_exchange_strong(&jobqueue->head, &jobqueue->head, new_head);
+ atomic_compare_exchange_strong(&jobqueue->tail, &jobqueue->tail, jobqueue->head);
pthread_cond_broadcast(&jobqueue->cond_nonempty);//HHH
}
```
:::info
參考資料:
[atomic_compare_exchange_strong](https://www.khronos.org/registry/OpenCL/sdk/2.0/docs/man/xhtml/atomic_compare_exchange.html)
:::
#### `jobqueue_fetch`
```diff=
if (jobqueue->head == jobqueue->tail) {
- task = jobqueue->tail;
- jobqueue->head = jobqueue->tail = NULL;
+ task = atomic_load(&jobqueue->tail);
+ atomic_compare_exchange_strong(&jobqueue->head, &jobqueue->head, NULL);
+ atomic_compare_exchange_strong(&jobqueue->tail, &jobqueue->tail, NULL);
} else {
threadtask_t *tmp;
for (tmp = jobqueue->head; tmp->next != jobqueue->tail;
tmp = tmp->next)
;
- task = tmp->next;
- tmp->next = NULL;
- jobqueue->tail = tmp;
+ task = atmoic_load(&tmp->next);
+ atomic_compare_exchange_strong(&tmp->next, &tmp->next, NULL);
+ atomic_compare_exchange_strong(&jobqueue->tail, &jobqueue->tail, tmp);
}
```
:::info
參考資料:
[atomic_load](https://en.cppreference.com/w/c/atomic/atomic_load)
:::
因此可以看到,在 queue 的維護成本還是相當的巨大,尤其在`for (tmp = jobqueue->head; tmp->next != jobqueue->tail;tmp = tmp->next)` 對於 tail 的維護,是沒辦法改進的,這也是之後朝向 [cmwq](https://www.kernel.org/doc/html/v4.10/core-api/workqueue.html) 的原因。