contributed by < WayneLin1992
>
linux2021
enum __future_flags {
__FUTURE_RUNNING = 01, //0000 0001
__FUTURE_FINISHED = 02, //0000 0010
__FUTURE_TIMEOUT = 04, //0000 0100
__FUTURE_CANCELLED = 010,//0000 1010
__FUTURE_DESTROYED = 020,//0001 0100
};
__tpool_future
struct __tpool_future {
int flag;
void *result;
pthread_mutex_t mutex;
pthread_cond_t cond_finished;
};
typedef struct __tpool_future *tpool_future_t;
threadtask_t
typedef struct __threadtask {
void *(*func)(void *);
void *arg;
struct __tpool_future *future;
struct __threadtask *next;
} threadtask_t;
jobqueue_t
typedef struct __jobqueue {
threadtask_t *head, *tail;
pthread_cond_t cond_nonempty;
pthread_mutex_t rwlock;
} jobqueue_t;
tpool_t
struct __threadpool {
size_t count;
pthread_t *workers;
jobqueue_t *jobqueue;
};
typedef struct __threadpool *tpool_t;
tpool_future_create
static struct __tpool_future *tpool_future_create(void)
{
struct __tpool_future *future = malloc(sizeof(struct __tpool_future));
if (future) {
future->flag = 0;
future->result = NULL;
pthread_mutex_init(&future->mutex, NULL);
pthread_condattr_t attr;
pthread_condattr_init(&attr);
pthread_cond_init(&future->cond_finished, &attr);
pthread_condattr_destroy(&attr);
}
return future;
}
建立 tpool_future
物件,一併初始化 mutex, condition variable, condition variable attributes object。
pthread_mutex_init 使用 mutex 必須要初始化,要不然會出現 undefined behavior (UB)
pthread_condattr_init 建立 condition attributes object 需要初始化,沒初始一樣會 undefined behavior (UB) , 也需要對稱寫 destory,attr 預設為 PTHREAD_PROCESS_PRIVATE 。
The pthread_condattr_init() function initializes the attributes in the condition variable attribute object attr to default values. Pass attr to pthread_cond_init() to define the attributes of the condition variable.
由 pthread_condattr_init
初始化的 attr 將會輸入 pthrad_cond_init
所以可以知道pthread_condattr_init
pthrad_cond_init
為一組,不能刪去, return 0 代表初始化成功,失敗會回傳錯誤編號。
int pthread_condattr_init(pthread_condattr_t *attr) in ibm
由上面例子改寫
pthread_cond_t cond;
int main(){
int rc=0;
pthread_condattr_t attr;
printf("Create a default condition attribute\n");
rc = pthread_condattr_init(&attr);
printf("pthread_condattr_init : %d\n", rc);
printf("Create the condition using the condition attributes object\n");
rc = pthread_cond_init(&cond, &attr);
printf("pthread_cond_init : %d\n", rc);
printf("Destroy cond attribute\n");
rc = pthread_condattr_destroy(&attr);
printf("pthread_condattr_destroy : %d\n", rc);
printf("Destroy condition\n");
rc = pthread_cond_destroy(&cond);
printf("pthread_cond_destroy : %d\n", rc);
return 0;
}
Create a default condition attribute
pthread_condattr_init : 0
Create the condition using the condition attributes object
pthread_cond_init : 0
Destroy cond attribute
pthread_condattr_destroy : 0
Destroy condition
pthread_cond_destroy : 0
可以看出來 attr 將會輸入 pthread_cond_init
並且產生對應的 condition variable。
所以這裡 pthread_condattr_init
,這裡 attr 會被設為 0 ,所以也可以把 pthread_condattr_init
pthread_condattr_destroy
刪去也可以執行。
pthread_condattr_destroy
這裡建了 condattr 又 destroy, 推測這應該是可以改進的地方 ,由上可以知道當建立好 condition variable 就可以先將 attr destroy 掉, return 0 ,代表成功。
pthread_cond_init 初始化 condition variable ,一樣在結束時必須對應著 destroy , return 0 ,代表成功。
int main(){
pthread_cond_t cv;
pthread_condattr_t cattr;
int ret;
/* initialize a condition variable to its default value */
ret = pthread_cond_init(&cv, NULL);
printf( "pthread_cond_init : %d\n", ret);
/* initialize a condition variable */
ret = pthread_cond_init(&cv, &cattr);
printf( "pthread_cond_init : %d\n", ret);
}
pthread_cond_init : 0
pthread_cond_init : 0
即便沒有 pthread_condattr_init
也會以 default value ( is NULL) 方式代入,也可以直接輸入 NULL 來省去存放 cattr 記憶體空間。
tpool_future_destory
int tpool_future_destroy(struct __tpool_future *future)
{
if (future) {
pthread_mutex_lock(&future->mutex);
if (future->flag & __FUTURE_FINISHED ||
future->flag & __FUTURE_CANCELLED) {
pthread_mutex_unlock(&future->mutex);
pthread_mutex_destroy(&future->mutex);
pthread_cond_destroy(&future->cond_finished);
free(future);
} else {
future->flag |= __FUTURE_DESTROYED;
pthread_mutex_unlock(&future->mutex);
}
}
return 0;
}
銷毀 tpool_future_destroy
當 flag
為 __FUTURE_FINISHED
和 __FUTURE_CANCELLED
時就要銷毀,銷毀要對應把 mutex, condition variable , flag
為 __FUTURE_DESTROYED
名子為 destroy 但卻保持留 mutex 和 condition variable 這有點問題,之後也許需要改進
tpool_future_get
void *tpool_future_get(struct __tpool_future *future, unsigned int seconds)
{
pthread_mutex_lock(&future->mutex);
/* turn off the timeout bit set previously */
future->flag &= ~__FUTURE_TIMEOUT;
while ((future->flag & __FUTURE_FINISHED) == 0) {
if (seconds) {
struct timespec expire_time;
clock_gettime(CLOCK_MONOTONIC, &expire_time);
expire_time.tv_sec += seconds;
int status = pthread_cond_timedwait(&future->cond_finished,
&future->mutex, &expire_time);
if (status == ETIMEDOUT) {
future->flag |= __FUTURE_TIMEOUT;
pthread_mutex_unlock(&future->mutex);
return NULL;
}
} else
pthread_cond_wait(&future->cond_finished, &future->mutex);//FFF
}
pthread_mutex_unlock(&future->mutex);
return future->result;
}
將 future->result
提取出來
pthread_cond_timedwait 要在 mutex 後呼叫,與 cond_wait 類似,cond_timewait 的 condition variable 由時間決定。
ETIMEDOUT
為 pthread_cond_timedwait
return 的 error , 正常 return 為 0 , return ETIMEDOUT
代表為超時的意思,對應 flag = __FUTURE_TIMEOUT
並解鎖 mutex 。
由 main 部份可以知道這裡 second 將設為 0 所以實際執行將會不斷 cond_wait 的狀況。
pthread_cond_wait
pthread_cond_wait in ibm
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
增加實作測試
jobqueue_create
static jobqueue_t *jobqueue_create(void)
{
jobqueue_t *jobqueue = malloc(sizeof(jobqueue_t));
if (jobqueue) {
jobqueue->head = jobqueue->tail = NULL;
pthread_cond_init(&jobqueue->cond_nonempty, NULL);
pthread_mutex_init(&jobqueue->rwlock, NULL);
}
return jobqueue;
}
建立 jobqueue
jobqueue_destroy
static void jobqueue_destroy(jobqueue_t *jobqueue)
{
threadtask_t *tmp = jobqueue->head;
while (tmp) {
jobqueue->head = jobqueue->head->next;
pthread_mutex_lock(&tmp->future->mutex);
if (tmp->future->flag & __FUTURE_DESTROYED) {
pthread_mutex_unlock(&tmp->future->mutex);
pthread_mutex_destroy(&tmp->future->mutex);
pthread_cond_destroy(&tmp->future->cond_finished);
free(tmp->future);
} else {
tmp->future->flag |= __FUTURE_CANCELLED;
pthread_mutex_unlock(&tmp->future->mutex);
}
free(tmp);
tmp = jobqueue->head;
}
pthread_mutex_destroy(&jobqueue->rwlock);
pthread_cond_destroy(&jobqueue->cond_nonempty);
free(jobqueue);
}
銷毀 jobqueue
依序從 head
開始 free
,直到 tail
__jobqueue_fetch_cleanup
static void __jobqueue_fetch_cleanup(void *arg)
{
pthread_mutex_t *mutex = (pthread_mutex_t *) arg;
pthread_mutex_unlock(mutex);
}
將 arg
轉 type 為 pthread_mutex_t
,由下面例子可以知道 pthread_cleanup_push
將把 arg 代入 function ,而我們的 function 為 pthread_mutex_unlock
所以對應的 arg 為 mutex 所以為 jobqueue->rwlock
。,不過沒有初始化就先解鎖,可能會有問題。
jobqueue_fetch
static void *jobqueue_fetch(void *queue)
{
jobqueue_t *jobqueue = (jobqueue_t *) queue;
threadtask_t *task;
int old_state;
pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock);
while (1) {
pthread_mutex_lock(&jobqueue->rwlock);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state);
pthread_testcancel();
while (!jobqueue->tail) pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock);//GGG
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
if (jobqueue->head == jobqueue->tail) {
task = jobqueue->tail;
jobqueue->head = jobqueue->tail = NULL;
} else {
threadtask_t *tmp;
for (tmp = jobqueue->head; tmp->next != jobqueue->tail;
tmp = tmp->next)
;
task = tmp->next;
tmp->next = NULL;
jobqueue->tail = tmp;
}
pthread_mutex_unlock(&jobqueue->rwlock);
if (task->func) {
pthread_mutex_lock(&task->future->mutex);
if (task->future->flag & __FUTURE_CANCELLED) {
pthread_mutex_unlock(&task->future->mutex);
free(task);
continue;
} else {
task->future->flag |= __FUTURE_RUNNING;
pthread_mutex_unlock(&task->future->mutex);
}
void *ret_value = task->func(task->arg);
pthread_mutex_lock(&task->future->mutex);
if (task->future->flag & __FUTURE_DESTROYED) {
pthread_mutex_unlock(&task->future->mutex);
pthread_mutex_destroy(&task->future->mutex);
pthread_cond_destroy(&task->future->cond_finished);
free(task->future);
} else {
task->future->flag |= __FUTURE_FINISHED;//KKK
task->future->result = ret_value;
pthread_cond_broadcast(&task->future->cond_finished);//LLL
pthread_mutex_unlock(&task->future->mutex);
}
free(task);
} else {
pthread_mutex_destroy(&task->future->mutex);
pthread_cond_destroy(&task->future->cond_finished);
free(task->future);
free(task);
break;
}
}
pthread_cleanup_pop(0);
pthread_exit(NULL);
}
void pthread_cleanup_push(void (*routine)(void *), void *arg);
The pthread_cleanup_push() function pushes routine onto the top
of the stack of clean-up handlers. When routine is later
invoked, it will be given arg as its argument.
pthread_cleanup_push()
和 pthread_cleanup_pop()
為一組必須要成對出現。pthread_cleanup_pop()
前 unlock mutex 表現方式如下 pthread_cleanup_push(pthread_mutex_unlock, (void *) &mut);
pthread_mutex_lock(&mut);
/* do some work */
pthread_mutex_unlock(&mut);
pthread_cleanup_pop(0);
其中 pthread_cleanup_pop(1);
可以代替 pthread_mutex_unlock(&mut);
及 pthread_cleanup_pop(0);
考量到非同步要修改成如下
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,&oldtype);
pthread_cleanup_push(pthread_mutex_unlock,(void *)&mut);
pthread_mutex_lock(&mut);
/* do some work */
pthread_cleanup_pop(1);
pthread_setcanceltype(oldtype,NULL);
pthread_setcanceltype() in ibm
pthread_setcanceltype()
cancel type ,而且 oldtype 將回存取舊的 type , PTHREAD_CANCEL_ASYNCHRONOUS
thread 可以隨時取消。 PTHREAD_CANCEL_DEFERRED
只有在特定的 cancellation point 可以取消。
pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock);
其中第一個參數為要被 push into stack 的 thread , arg 將與 thread 相關
由整體可以知道 tpool_create 使用到 pthread_create(&pool->workers[i], NULL, jobqueue_fetch,(void *) jobqueue)
建立 thread 並 jobqueue_fetch(jobqueue) 可以知道,將 jobqueue 任務提取出來,task 將會從 jobqueue tail -> head 的方式依序處理,處裡完的 task 將會被 free 。
The pthread_setcancelstate() sets the cancelability state of the
calling thread to the value given in state.
The pthread_setcanceltype() sets the cancelability type of the
calling thread to the value given in type. The previous
cancelability type of the thread is returned in the buffer
pointed to by oldtype.
old_state
將會存放當初進入之前的狀態。
PTHREAD_CANCEL_ENABLE
代表 thread 可以被 cancel ,成功 cancel 將會 return 0
失敗將會 return EINVAL
,還有 PTHREAD_CANCEL_DISABLE
代表 thread 不能取消。
Calling pthread_testcancel() creates a cancellation point within
the calling thread, so that a thread that is otherwise executing
code that contains no cancellation points will respond to a
cancellation request.
其中 pthread_testcancel
為設定 cancellation point 主要應用在非同步的情況,到這裡就會 cancel thread ,當可能壅塞的地方也要設立 cancellation point避免漫長等待。
tpool_create
struct __threadpool *tpool_create(size_t count)
{
jobqueue_t *jobqueue = jobqueue_create();
struct __threadpool *pool = malloc(sizeof(struct __threadpool));
if (!jobqueue || !pool) {
if (jobqueue)
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
pool->count = count, pool->jobqueue = jobqueue;
if ((pool->workers = malloc(count * sizeof(pthread_t)))) {
for (int i = 0; i < count; i++) {
if (pthread_create(&pool->workers[i], NULL, jobqueue_fetch,
(void *) jobqueue)) {
for (int j = 0; j < i; j++)
pthread_cancel(pool->workers[j]);
for (int j = 0; j < i; j++)
pthread_join(pool->workers[j], NULL);
free(pool->workers);
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
}
return pool;
}
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
建立一個 tpool
要對應建立一個 jobqueue
count
代表總共有幾個 thread
也就是幾個 workers
for (int i = 0; i < count; i++) {
if (pthread_create(&pool->workers[i], NULL, jobqueue_fetch,
(void *) jobqueue)) {
for (int j = 0; j < i; j++)
pthread_cancel(pool->workers[j]);
for (int j = 0; j < i; j++)
pthread_join(pool->workers[j], NULL);
free(pool->workers);
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
}
其中假如 pool 沒有成功 malloc , 就會將其中的 wokers
jobqueue
free 掉,但這部分會特意只保留 worker[i]
,因為其他的 worker 被建立又被 cancel 掉,這部分可能有改進部分,由下面可以知道 thread create 成功將會 return 0 所以 if 將會跳至 return pool 的部分。
pthread_create
int pthread_create(pthread_t *restrict thread, const pthread_attr_t *restrict attr, void *(*start_routine)(void *),void *restrict arg);
The pthread_create() function starts a new thread in the calling process. The new thread starts execution by invoking start_routine(); arg is passed as the sole argument of start_routine().
其中 pthread_t *restrict thread
為建立的 thread , const pthread_attr_t *restrict attr
為 condition attributes object 可以設定為 NULL
, void *(*start_routine)(void *)
為 function void *restrict arg
為輸入 function 的 argument 。
pthread_create(&pool->workers[i], NULL, jobqueue_fetch, (void *) jobqueue)
對應到上, worker 代表 thread , jobqueue_fetch
為 function , jobqueue
為輸入的 argument , 查看 jobqueue_fetch
static void *jobqueue_fetch(void *queue)
就是如此 , 建立成功 return 0 失敗將會 return EBUSY
參考資料:
pthread_create()
pthread_create() in ibm
tpool_apply
struct __tpool_future *tpool_apply(struct __threadpool *pool,
void *(*func)(void *),
void *arg)
{
jobqueue_t *jobqueue = pool->jobqueue;
threadtask_t *new_head = malloc(sizeof(threadtask_t));
struct __tpool_future *future = tpool_future_create();
if (new_head && future) {
new_head->func = func, new_head->arg = arg, new_head->future = future;
pthread_mutex_lock(&jobqueue->rwlock);
if (jobqueue->head) {
new_head->next = jobqueue->head;
jobqueue->head = new_head;
} else {
jobqueue->head = jobqueue->tail = new_head;
pthread_cond_broadcast(&jobqueue->cond_nonempty);//HHH
}
pthread_mutex_unlock(&jobqueue->rwlock);
} else if (new_head) {
free(new_head);
return NULL;
} else if (future) {
tpool_future_destroy(future);
return NULL;
}
return future;
}
將 pool->jobqueue
,並對應產生 threadtask 。
pthread_mutex_lock(&jobqueue->rwlock);
if (jobqueue->head) {
new_head->next = jobqueue->head;
jobqueue->head = new_head;
} else {
jobqueue->head = jobqueue->tail = new_head;
pthread_cond_broadcast(&jobqueue->cond_nonempty);//HHH
}
pthread_mutex_unlock(&jobqueue->rwlock);
jobqueue 為 add the head 方式 , 但其中 mutex 為 coarse graind lock 可以使用更細緻一點。
你應該提出縮小 critical section 的手段
思考此地方有可能 critical section 情況
將這部分 mutex 全數移除:jobqueue
的順序有可能不正確,但也可以得到正確的結果。
PI calculated with 101 terms: 3.141592653589793
有可能兩個 thread 都想建立 jobqueue->head
連結,所以因此產生順序錯誤,再有鎖的情況下,只許一個 thread 進行 add head 的動作,但經過 gdb 執行狀況,發現到 jobqueue->head
一直保持在 NULL
最主要是因為 jobqueue->head
lifetime 不足,也沒將 return jobqueue ,使得 並不是 lifetime 不足才導致,是因為 thread 已經處理完,並且 jobqueue->head
只執行 else
的部分,所以這裡就算將 mutex lock 移除對整體也不會有影響。free(task)
才會有這樣的結果。
tpool_join
int tpool_join(struct __threadpool *pool)
{
size_t num_threads = pool->count;
for (int i = 0; i < num_threads; i++)
tpool_apply(pool, NULL, NULL);
for (int i = 0; i < num_threads; i++)
pthread_join(pool->workers[i], NULL);
free(pool->workers);
jobqueue_destroy(pool->jobqueue);
free(pool);
return 0;
}
產生 tpool
並加入
/* Use Bailey–Borwein–Plouffe formula to approximate PI */
static void *bpp(void *arg)
{
int k = *(int *) arg;
double sum = (4.0 / (8 * k + 1)) - (2.0 / (8 * k + 4)) -
(1.0 / (8 * k + 5)) - (1.0 / (8 * k + 6));
double *product = malloc(sizeof(double));
if (product)
*product = 1 / pow(16, k) * sum;
return (void *) product;
}
bpp 將上式表示為 function 。
用 LaTeX 語法改寫 BBP 公式
main
#define PRECISION 100 /* upper bound in BPP sum */
int main()
{
int bpp_args[PRECISION + 1];
double bpp_sum = 0;
tpool_t pool = tpool_create(4);
tpool_future_t futures[PRECISION + 1];
for (int i = 0; i <= PRECISION; i++) {
bpp_args[i] = i;
futures[i] = tpool_apply(pool, bpp, (void *) &bpp_args[i]);
}
for (int i = 0; i <= PRECISION; i++) {
double *result = tpool_future_get(futures[i], 0 /* blocking wait */);
bpp_sum += *result;
tpool_future_destroy(futures[i]);
free(result);
}
tpool_join(pool);
printf("PI calculated with %d terms: %.15f\n", PRECISION + 1, bpp_sum);
return 0;
}
PI calculated with 101 terms: 3.141592653589793
tpool_t pool = tpool_create(4)
count
= 4 。
第一個 for
迴圈: 建立 function
= bbp
並與 jobqueue
在與 pool
連結進行管理。
第二個 for
迴圈: 使用 tpool_future_get
將 result
提取 bpp_sum
將全部 result
合併,並銷毀 thread
銷毀 pool
映出結果。
其中 PRECISION + 1
是因為 0 算第一項,所以 k=100
時就是第 101 項。
pthread_join
The pthread_join() function waits for the thread specified by thread to terminate. If that thread has already terminated, then pthread_join() returns immediately. The thread specified by thread must be joinable.
成功結束,將 return 0 錯誤 return 如下:
EDEADLK A deadlock was detected (e.g., two threads tried to join with each other); or thread specifies the calling thread.
EINVAL thread is not a joinable thread.
EINVAL Another thread is already waiting to join with this thread.
ESRCH No thread with the ID thread could be found.
執行時發現到 pool
和 futures
是分開的,而其中 futures[0]
到 futures[100]
而且 jobqueue->head
及 jobqueue->tail
= NULL
表示沒作用到。 futuresp[0]->flag
= 3
在 tpool_future_destroy(futures[i])
時 &2
會有值所以會成功 destroy 掉並 free(future)
。
tpool_future_destroy
int tpool_future_destroy(struct __tpool_future *future)
{
if (future) {
pthread_mutex_lock(&future->mutex);
if (future->flag & __FUTURE_FINISHED ||
future->flag & __FUTURE_CANCELLED) {
pthread_mutex_unlock(&future->mutex);
pthread_mutex_destroy(&future->mutex);
pthread_cond_destroy(&future->cond_finished);
free(future);
} else {
future->flag |= __FUTURE_DESTROYED;
pthread_mutex_unlock(&future->mutex);
+ pthread_mutex_destroy(&future->mutex);
+ pthread_cond_destroy(&future->cond_finished);
+ free(future);
}
}
return 0;
}
當 else
時一樣把 futures destroy 掉,並釋放 future 空間。
tpool_apply
-pthread_mutex_lock(&jobqueue->rwlock);
if (jobqueue->head) {
+ pthread_mutex_lock(&jobqueue->rwlock);
new_head->next = jobqueue->head;
jobqueue->head = new_head;
+ pthread_mutex_unlock(&jobqueue->rwlock);
} else {
+ pthread_mutex_lock(&jobqueue->rwlock);
jobqueue->head = jobqueue->tail = new_head;
pthread_cond_broadcast(&jobqueue->cond_nonempty);//HHH
+ pthread_mutex_unlock(&jobqueue->rwlock);
}
- pthread_mutex_unlock(&jobqueue->rwlock);
gcc -pthread -o bbp bbp.c -lm
與 Sanitizer 連結 gcc -o bpp bpp.c -lm -pthread -Wall -Wextra -Wshadow -O0 -g -fsanitize=thread
WARNING: ThreadSanitizer: unlock of an unlocked mutex (or by a wrong thread) (pid=3893)
#0 pthread_mutex_unlock <null> (libtsan.so.0+0x3aafc)
#1 __jobqueue_fetch_cleanup /home/wayne/linux2021/linux2021_week4/pi.c:139 (bpp+0x6c86)
#2 jobqueue_fetch /home/wayne/linux2021/linux2021_week4/pi.c:210 (bpp+0x855a)
#3 <null> <null> (libtsan.so.0+0x2d1af)
Location is heap block of size 104 at 0x7b1c00000000 allocated by main thread:
#0 malloc <null> (libtsan.so.0+0x30343)
#1 jobqueue_create /home/wayne/linux2021/linux2021_week4/pi.c:100 (bpp+0x6047)
#2 tpool_create /home/wayne/linux2021/linux2021_week4/pi.c:216 (bpp+0x858d)
#3 main /home/wayne/linux2021/linux2021_week4/bpp.c:26 (bpp+0x9bf8)
Mutex M9 (0x7b1c00000040) created at:
#0 pthread_mutex_init <null> (libtsan.so.0+0x4a636)
#1 jobqueue_create /home/wayne/linux2021/linux2021_week4/pi.c:104 (bpp+0x6226)
#2 tpool_create /home/wayne/linux2021/linux2021_week4/pi.c:216 (bpp+0x858d)
#3 main /home/wayne/linux2021/linux2021_week4/bpp.c:26 (bpp+0x9bf8)
SUMMARY: ThreadSanitizer: unlock of an unlocked mutex (or by a wrong thread) (/lib/x86_64-linux-gnu/libtsan.so.0+0x3aafc) in pthread_mutex_unlock
==================
PI calculated with 101 terms: 3.141592653589793
ThreadSanitizer: reported 1 warnings
// pthread_mutex_unlock(&jobqueue->rwlock);
- pthread_cleanup_pop(1);
+ pthread_cleanup_pop(0);
pthread_exit(NULL);
}
lock free
所謂的 lock free 不能只單純看有沒有 lock ,lock free 更重要是一種思維寫程式方式執行在多執行緒下,只要滿足下列條件,就可以說是 lock free ,由下圖可以知道條件不是在是否有 lock 而是有沒有可能 lock 鎖永久才是關鍵
文章還提到使用 CAS(Compare-And-Swap) 來達到 lock free 但是要注意 ABA 問題,所謂的 ABA 問題
Process
reads value A from shared memory,
is preempted, allowing process to run,
modifies the shared memory value A to value B and back to A before preemption,
begins execution again, sees that the shared memory value has not changed and continues.
Althoughcan continue executing, it is possible that the behavior will not be correct due to the "hidden" modification in shared memory.
此時
atomic_threadpool 一開始由 at_thpool_create
來建立足夠的 thread 數量,對應到題目就 tpool_create
的功用, at_thpool_newtask
為建立 task 並 enqueue 至 lfqueue 中,當處理完時,也將其中的 task free 掉,對應到題目就 tpool_apply
的功用,在 lfqueue 操作中用了不少 CAS(Compare-And-Swap) 指令, enqueue 是透過 __LFQ_BOOL_COMPARE_AND_SWAP
從尾部加入,__LFQ_BOOL_COMPARE_AND_SWAP
由 GCC atomic function 來完成相關實作,之後再由 at_thpool_gracefully_shutdown
將 pool destroy 掉,對應到題目 tpool_join
。
GCC atomic function
這一類操作,主要是對應到 intel 內部硬體所支援的操作才能達到 atomic operation,所以並不適用於所有硬體,不過沒對應到 GCC 也會相應給出警告。
bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)
type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)
These builtins perform an atomic compare and swap. That is, if the current value of *ptr is oldval, then write newval into *ptr.
The “bool” version returns true if the comparison is successful and newval was written. The “val” version returns the contents of *ptr before the operation.
當 compare 和 swap 完成後才會算成功。
lfqueue 中整個表示接沒有使用到 mutex 都是已 atomic builtin function 來表示。
__LFQ_BOOL_COMPARE_AND_SWAP(&tail->next, NULL, node)
node 為新的的節點,本來 tail->next
為 NULL 並 __LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->tail, tail, node)
更新 lfqueue tail 的節點。__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->head, head, next)
來維持 head 並將 head __lfq_recycle_free
掉。嘗試使用 __sync_bool_compare_and_swap
int x = 10;
float y = 25.00;
int ptr[]={10,20};
printf("ptr %d:%d\n",ptr[0],ptr[1]);
printf("x:y %d:%f\n",x,y);
bool k = __sync_bool_compare_and_swap(ptr, x, y);
printf("k:%d\n",k);
printf("ptr %d:%d\n",ptr[0],ptr[1]);
printf("x:y %d:%f\n",x,y);
k = __sync_bool_compare_and_swap(ptr, x, y);
printf("k:%d\n",k);
printf("ptr %d:%d\n",ptr[0],ptr[1]);
printf("x:y %d:%f\n",x,y);
ptr 10:20
x:y 10:25.000000
k:1
ptr 25:20
x:y 10:25.000000
k:0
ptr 25:20
x:y 10:25.000000
由此可以知道會先比較 type *ptr, type oldval
是否相等,才會將 write newval into *ptr
當中
將 __sync_bool_compare_and_swap
取代
tpool_apply
+ new_head->next = NULL;
-pthread_mutex_lock(&jobqueue->rwlock);
if (jobqueue->head) {
+ __sync_bool_compare_and_swap(&new_head->next, NULL, jobqueue->head);
+ __sync_bool_compare_and_swap(&jobqueue->head, jobqueue->head, new_head);
- new_head->next = jobqueue->head;
- jobqueue->head = new_head;
} else {
+ __sync_bool_compare_and_swap(&jobqueue->head, NULL, new_head);
- jobqueue->head = jobqueue->tail = new_head;
+ __sync_bool_compare_and_swap(&jobqueue->tail, NULL, jobqueue->head);
pthread_cond_broadcast(&jobqueue->cond_nonempty);//HHH
}
-pthread_mutex_unlock(&jobqueue->rwlock);
因為全改 builtin atomic operations 形式也可以對應將 mutex 刪去。
實驗 scalability 在本來的情況與部份修改成 builtin atomic operations 的結果
雖然只改寫一點,卻可以看得出 scalability 有部分的改善。
執行時間也有 30% 的縮短
tpool_apply
+#include <stdatomic.h>
if (jobqueue->head) {
- __sync_bool_compare_and_swap(&new_head->next, NULL, jobqueue->head);
- __sync_bool_compare_and_swap(&jobqueue->head, jobqueue->head, new_head);
+ atomic_compare_exchange_strong(&new_head->next, &new_head->next, jobqueue->head);
+ atomic_compare_exchange_strong(&jobqueue->head, &jobqueue->head, new_head);
} else {
- __sync_bool_compare_and_swap(&jobqueue->head, NULL, new_head);
- __sync_bool_compare_and_swap(&jobqueue->tail, NULL, jobqueue->head);
+ atomic_compare_exchange_strong(&jobqueue->head, &jobqueue->head, new_head);
+ atomic_compare_exchange_strong(&jobqueue->tail, &jobqueue->tail, jobqueue->head);
pthread_cond_broadcast(&jobqueue->cond_nonempty);//HHH
}
jobqueue_fetch
if (jobqueue->head == jobqueue->tail) {
- task = jobqueue->tail;
- jobqueue->head = jobqueue->tail = NULL;
+ task = atomic_load(&jobqueue->tail);
+ atomic_compare_exchange_strong(&jobqueue->head, &jobqueue->head, NULL);
+ atomic_compare_exchange_strong(&jobqueue->tail, &jobqueue->tail, NULL);
} else {
threadtask_t *tmp;
for (tmp = jobqueue->head; tmp->next != jobqueue->tail;
tmp = tmp->next)
;
- task = tmp->next;
- tmp->next = NULL;
- jobqueue->tail = tmp;
+ task = atmoic_load(&tmp->next);
+ atomic_compare_exchange_strong(&tmp->next, &tmp->next, NULL);
+ atomic_compare_exchange_strong(&jobqueue->tail, &jobqueue->tail, tmp);
}
參考資料:
atomic_load
因此可以看到,在 queue 的維護成本還是相當的巨大,尤其在for (tmp = jobqueue->head; tmp->next != jobqueue->tail;tmp = tmp->next)
對於 tail 的維護,是沒辦法改進的,這也是之後朝向 cmwq 的原因。