Try   HackMD

2021q1 Homework4 (quiz4)

contributed by < WayneLin1992 >

tags: linux2021

延伸問題

  • 解釋程式碼運作原理,包含 timeout 處理機制。
  • 指出改進空間並實作
  • 研讀 atomic_threadpool,指出其 atomic 操作,並說明 lock-free
  • 嘗試使用 C11 Atomics 改寫,使其有更好的 scalability

題目理解

理解程式運作原理

quiz4
thread pool

enum __future_flags {
    __FUTURE_RUNNING = 01,   //0000 0001
    __FUTURE_FINISHED = 02,  //0000 0010
    __FUTURE_TIMEOUT = 04,   //0000 0100
    __FUTURE_CANCELLED = 010,//0000 1010
    __FUTURE_DESTROYED = 020,//0001 0100
};

__tpool_future

struct __tpool_future {
    int flag;
    void *result;
    pthread_mutex_t mutex;
    pthread_cond_t cond_finished;
};
typedef struct __tpool_future *tpool_future_t;






tpool_future_t



__tpool_future

int flag

result

mutex

cond_finished



tpool_future_t

tpool_future_t



tpool_future_t->__tpool_future





threadtask_t

typedef struct __threadtask {
    void *(*func)(void *);
    void *arg;
    struct __tpool_future *future;
    struct __threadtask *next;
} threadtask_t;






tpool_future_t



__threadtask

func

arg

future

next



__threadtask1

func

arg

future

next



__threadtask:f3->__threadtask1





__tpool_future

int flag

result

mutex

cond_finished



__threadtask:f2->__tpool_future





jobqueue_t

typedef struct __jobqueue {
    threadtask_t *head, *tail;
    pthread_cond_t cond_nonempty;
    pthread_mutex_t rwlock;
} jobqueue_t;






tpool_future_t



__threadtask

func

arg

future

next



more

...



__threadtask:f3->more





__threadtask1

func

arg

future

next



__jibqueue

head

tail

cond_noempty

rwlock



__jibqueue:f0->__threadtask





__jibqueue:f1->__threadtask1





more->__threadtask1





tpool_t

struct __threadpool {
    size_t count;
    pthread_t *workers;
    jobqueue_t *jobqueue;
};
typedef struct __threadpool *tpool_t;






tpool_future_t



__threadpool

count

workers

jobqueue



__jibqueue

head

tail

cond_noempty

rwlock



__threadpool:f2->__jibqueue





tpool_t

tpool_t



tpool_t->__threadpool





__threadtask

func

arg

future

next



more

...



__threadtask:f3->more





__threadtask1

func

arg

future

next



__jibqueue:f0->__threadtask





__jibqueue:f1->__threadtask1





more->__threadtask1





tpool_future_create

static struct __tpool_future *tpool_future_create(void)
{
    struct __tpool_future *future = malloc(sizeof(struct __tpool_future));
    if (future) {
        future->flag = 0;
        future->result = NULL;
        pthread_mutex_init(&future->mutex, NULL);
        pthread_condattr_t attr;
        pthread_condattr_init(&attr);
        pthread_cond_init(&future->cond_finished, &attr);
        pthread_condattr_destroy(&attr);
    }
    return future;
}

建立 tpool_future 物件,一併初始化 mutex, condition variable, condition variable attributes object。
pthread_mutex_init 使用 mutex 必須要初始化,要不然會出現 undefined behavior (UB)

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
建了 mutex 使用 pthread_mutex_init 在結束時還必須要對應要寫 pthread_mutex_destory 將 mutex 銷毀

pthread_condattr_init 建立 condition attributes object 需要初始化,沒初始一樣會 undefined behavior (UB) , 也需要對稱寫 destory,attr 預設為 PTHREAD_PROCESS_PRIVATE 。

pthread_condattr_init()

The pthread_condattr_init() function initializes the attributes in the condition variable attribute object attr to default values. Pass attr to pthread_cond_init() to define the attributes of the condition variable.

pthread_condattr_init 初始化的 attr 將會輸入 pthrad_cond_init 所以可以知道pthread_condattr_init pthrad_cond_init 為一組,不能刪去, return 0 代表初始化成功,失敗會回傳錯誤編號。
int pthread_condattr_init(pthread_condattr_t *attr) in ibm
由上面例子改寫

  pthread_cond_t      cond;
int main(){
  int                   rc=0;
  pthread_condattr_t    attr;
  printf("Create a default condition attribute\n");
  rc = pthread_condattr_init(&attr);
  printf("pthread_condattr_init : %d\n", rc);
  
  printf("Create the condition using the condition attributes object\n");
  rc = pthread_cond_init(&cond, &attr);
  printf("pthread_cond_init : %d\n", rc);
  
  printf("Destroy cond attribute\n");
  rc = pthread_condattr_destroy(&attr);
  printf("pthread_condattr_destroy : %d\n", rc);

  printf("Destroy condition\n");
  rc = pthread_cond_destroy(&cond);
  printf("pthread_cond_destroy : %d\n", rc);
  return 0;
}

Create a default condition attribute
pthread_condattr_init : 0
Create the condition using the condition attributes object
pthread_cond_init : 0
Destroy cond attribute
pthread_condattr_destroy : 0
Destroy condition
pthread_cond_destroy : 0

可以看出來 attr 將會輸入 pthread_cond_init 並且產生對應的 condition variable。
所以這裡 pthread_condattr_init ,這裡 attr 會被設為 0 ,所以也可以把 pthread_condattr_init pthread_condattr_destroy 刪去也可以執行。

pthread_condattr_destroy
這裡建了 condattr 又 destroy, 推測這應該是可以改進的地方 ,由上可以知道當建立好 condition variable 就可以先將 attr destroy 掉, return 0 ,代表成功。
pthread_cond_init 初始化 condition variable ,一樣在結束時必須對應著 destroy , return 0 ,代表成功。

pthread_cond_init() in oracle

int main(){
pthread_cond_t cv;
pthread_condattr_t cattr;
int ret;

/* initialize a condition variable to its default value */
ret = pthread_cond_init(&cv, NULL);
printf( "pthread_cond_init : %d\n", ret);
/* initialize a condition variable */
ret = pthread_cond_init(&cv, &cattr);
printf( "pthread_cond_init : %d\n", ret);
}

pthread_cond_init : 0
pthread_cond_init : 0

即便沒有 pthread_condattr_init 也會以 default value ( is NULL) 方式代入,也可以直接輸入 NULL 來省去存放 cattr 記憶體空間。







tpool_future_t



__tpool_future

int flag

result

mutex

cond_finished



0

0



__tpool_future:f0->0





NULL

NULL



__tpool_future:f1->NULL





tpool_future_destory

int tpool_future_destroy(struct __tpool_future *future)
{
    if (future) {
        pthread_mutex_lock(&future->mutex);
        if (future->flag & __FUTURE_FINISHED ||
            future->flag & __FUTURE_CANCELLED) {
            pthread_mutex_unlock(&future->mutex);
            pthread_mutex_destroy(&future->mutex);
            pthread_cond_destroy(&future->cond_finished);
            free(future);
        } else {
            future->flag |= __FUTURE_DESTROYED;
            pthread_mutex_unlock(&future->mutex);
        }
    }
    return 0;
}

銷毀 tpool_future_destroyflag__FUTURE_FINISHED__FUTURE_CANCELLED 時就要銷毀,銷毀要對應把 mutex, condition variable , flag__FUTURE_DESTROYED 名子為 destroy 但卻保持留 mutex 和 condition variable 這有點問題,之後也許需要改進

tpool_future_get

void *tpool_future_get(struct __tpool_future *future, unsigned int seconds)
{
    pthread_mutex_lock(&future->mutex);
    /* turn off the timeout bit set previously */
    future->flag &= ~__FUTURE_TIMEOUT;
    while ((future->flag & __FUTURE_FINISHED) == 0) {
        if (seconds) {
            struct timespec expire_time;
            clock_gettime(CLOCK_MONOTONIC, &expire_time);
            expire_time.tv_sec += seconds;
            int status = pthread_cond_timedwait(&future->cond_finished,
                                                &future->mutex, &expire_time);
            if (status == ETIMEDOUT) {
                future->flag |= __FUTURE_TIMEOUT;
                pthread_mutex_unlock(&future->mutex);
                return NULL;
            }
        } else
            pthread_cond_wait(&future->cond_finished, &future->mutex);//FFF
    }

    pthread_mutex_unlock(&future->mutex);
    return future->result;
}

future->result 提取出來
pthread_cond_timedwait 要在 mutex 後呼叫,與 cond_wait 類似,cond_timewait 的 condition variable 由時間決定。
ETIMEDOUTpthread_cond_timedwait return 的 error , 正常 return 為 0 , return ETIMEDOUT 代表為超時的意思,對應 flag = __FUTURE_TIMEOUT 並解鎖 mutex 。

由 main 部份可以知道這裡 second 將設為 0 所以實際執行將會不斷 cond_wait 的狀況。

pthread_cond_wait
pthread_cond_wait in ibm
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
增加實作測試

jobqueue_create

static jobqueue_t *jobqueue_create(void)
{
    jobqueue_t *jobqueue = malloc(sizeof(jobqueue_t));
    if (jobqueue) {
        jobqueue->head = jobqueue->tail = NULL;
        pthread_cond_init(&jobqueue->cond_nonempty, NULL);
        pthread_mutex_init(&jobqueue->rwlock, NULL);
    }
    return jobqueue;
}

建立 jobqueue

jobqueue_destroy

static void jobqueue_destroy(jobqueue_t *jobqueue)
{
    threadtask_t *tmp = jobqueue->head;
    while (tmp) {
        jobqueue->head = jobqueue->head->next;
        pthread_mutex_lock(&tmp->future->mutex);
        if (tmp->future->flag & __FUTURE_DESTROYED) {
            pthread_mutex_unlock(&tmp->future->mutex);
            pthread_mutex_destroy(&tmp->future->mutex);
            pthread_cond_destroy(&tmp->future->cond_finished);
            free(tmp->future);
        } else {
            tmp->future->flag |= __FUTURE_CANCELLED;
            pthread_mutex_unlock(&tmp->future->mutex);
        }
        free(tmp);
        tmp = jobqueue->head;
    }

    pthread_mutex_destroy(&jobqueue->rwlock);
    pthread_cond_destroy(&jobqueue->cond_nonempty);
    free(jobqueue);
}

銷毀 jobqueue 依序從 head 開始 free,直到 tail

__jobqueue_fetch_cleanup

static void __jobqueue_fetch_cleanup(void *arg)
{
    pthread_mutex_t *mutex = (pthread_mutex_t *) arg;
    pthread_mutex_unlock(mutex);
}

arg 轉 type 為 pthread_mutex_t ,由下面例子可以知道 pthread_cleanup_push 將把 arg 代入 function ,而我們的 function 為 pthread_mutex_unlock 所以對應的 arg 為 mutex 所以為 jobqueue->rwlock,不過沒有初始化就先解鎖,可能會有問題

jobqueue_fetch

static void *jobqueue_fetch(void *queue)
{
    jobqueue_t *jobqueue = (jobqueue_t *) queue;
    threadtask_t *task;
    int old_state;

    pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock);

    while (1) {
        pthread_mutex_lock(&jobqueue->rwlock);
        pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state);
        pthread_testcancel();

        while (!jobqueue->tail) pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock);//GGG
        pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
        if (jobqueue->head == jobqueue->tail) {
            task = jobqueue->tail;
            jobqueue->head = jobqueue->tail = NULL;
        } else {
            threadtask_t *tmp;
            for (tmp = jobqueue->head; tmp->next != jobqueue->tail;
                 tmp = tmp->next)
                ;
            task = tmp->next;
            tmp->next = NULL;
            jobqueue->tail = tmp;
        }
        pthread_mutex_unlock(&jobqueue->rwlock);

        if (task->func) {
            pthread_mutex_lock(&task->future->mutex);
            if (task->future->flag & __FUTURE_CANCELLED) {
                pthread_mutex_unlock(&task->future->mutex);
                free(task);
                continue;
            } else {
                task->future->flag |= __FUTURE_RUNNING;
                pthread_mutex_unlock(&task->future->mutex);
            }

            void *ret_value = task->func(task->arg);
            pthread_mutex_lock(&task->future->mutex);
            if (task->future->flag & __FUTURE_DESTROYED) {
                pthread_mutex_unlock(&task->future->mutex);
                pthread_mutex_destroy(&task->future->mutex);
                pthread_cond_destroy(&task->future->cond_finished);
                free(task->future);
            } else {
                task->future->flag |= __FUTURE_FINISHED;//KKK
                task->future->result = ret_value;
                pthread_cond_broadcast(&task->future->cond_finished);//LLL
                pthread_mutex_unlock(&task->future->mutex);
            }
            free(task);
        } else {
            pthread_mutex_destroy(&task->future->mutex);
            pthread_cond_destroy(&task->future->cond_finished);
            free(task->future);
            free(task);
            break;
        }
    }

    pthread_cleanup_pop(0);
    pthread_exit(NULL);
}

pthread_cleanup_push

void pthread_cleanup_push(void (*routine)(void *), void *arg);
The pthread_cleanup_push() function pushes routine onto the top
of the stack of clean-up handlers. When routine is later
invoked, it will be given arg as its argument.

pthread_cleanup_push() ibm

  1. pthread_cleanup_push()pthread_cleanup_pop() 為一組必須要成對出現。
  2. cleanup handlers 就是指當 thread 結束時,會呼叫 pthread_exit 或 pthread_cancel
  3. push 和 pop 將以 stack 方式表現,所以會 last in first out(LIFO) 方式
  4. 由於結束時會呼叫 ptherad_exit 會將資源釋放,但 mutex 可能還會是上鎖的,所以要在 pthread_cleanup_pop() 前 unlock mutex 表現方式如下
     pthread_cleanup_push(pthread_mutex_unlock, (void *) &mut);
     pthread_mutex_lock(&mut);
     /* do some work */
     pthread_mutex_unlock(&mut);
     pthread_cleanup_pop(0);

其中 pthread_cleanup_pop(1); 可以代替 pthread_mutex_unlock(&mut);pthread_cleanup_pop(0);

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
這表達方式只適用在同步的情況下,非同步,有可能其他 thread 還存在 mutex lock 的情況。

考量到非同步要修改成如下

​   pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,&oldtype); 
​   pthread_cleanup_push(pthread_mutex_unlock,(void *)&mut); 
​   pthread_mutex_lock(&mut);/* do some work */ 
​   pthread_cleanup_pop(1; 
​   pthread_setcanceltype(oldtype,NULL;

pthread_setcanceltype() in ibm
pthread_setcanceltype() cancel type ,而且 oldtype 將回存取舊的 type , PTHREAD_CANCEL_ASYNCHRONOUS thread 可以隨時取消。 PTHREAD_CANCEL_DEFERRED 只有在特定的 cancellation point 可以取消。

參考資料:
Library C GNC cleanup handler

pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock);
其中第一個參數為要被 push into stack 的 thread , arg 將與 thread 相關

由整體可以知道 tpool_create 使用到 pthread_create(&pool->workers[i], NULL, jobqueue_fetch,(void *) jobqueue) 建立 thread 並 jobqueue_fetch(jobqueue) 可以知道,將 jobqueue 任務提取出來,task 將會從 jobqueue tail -> head 的方式依序處理,處裡完的 task 將會被 free 。

pthread_setcancelstate

The pthread_setcancelstate() sets the cancelability state of the
calling thread to the value given in state.
The pthread_setcanceltype() sets the cancelability type of the
calling thread to the value given in type. The previous
cancelability type of the thread is returned in the buffer
pointed to by oldtype.

old_state 將會存放當初進入之前的狀態。
PTHREAD_CANCEL_ENABLE 代表 thread 可以被 cancel ,成功 cancel 將會 return 0 失敗將會 return EINVAL ,還有 PTHREAD_CANCEL_DISABLE 代表 thread 不能取消。

pthread_testcancel

Calling pthread_testcancel() creates a cancellation point within
the calling thread, so that a thread that is otherwise executing
code that contains no cancellation points will respond to a
cancellation request.

其中 pthread_testcancel 為設定 cancellation point 主要應用在非同步的情況,到這裡就會 cancel thread ,當可能壅塞的地方也要設立 cancellation point避免漫長等待。

tpool_create

struct __threadpool *tpool_create(size_t count)
{
    jobqueue_t *jobqueue = jobqueue_create();
    struct __threadpool *pool = malloc(sizeof(struct __threadpool));
    if (!jobqueue || !pool) {
        if (jobqueue)
            jobqueue_destroy(jobqueue);
        free(pool);
        return NULL;
    }

    pool->count = count, pool->jobqueue = jobqueue;
    if ((pool->workers = malloc(count * sizeof(pthread_t)))) {
        for (int i = 0; i < count; i++) {
            if (pthread_create(&pool->workers[i], NULL, jobqueue_fetch,
                               (void *) jobqueue)) {
                for (int j = 0; j < i; j++)
                    pthread_cancel(pool->workers[j]);
                for (int j = 0; j < i; j++)
                    pthread_join(pool->workers[j], NULL);
                free(pool->workers);
                jobqueue_destroy(jobqueue);
                free(pool);
                return NULL;
            }
        }
        return pool;
    }

    jobqueue_destroy(jobqueue);
    free(pool);
    return NULL;
}






tpool_future_t



__threadpool

count

workers

jobqueue



__jibqueue

head

tail

cond_noempty

rwlock



__threadpool:f2->__jibqueue





tpool_t

tpool_t



tpool_t->__threadpool





__threadtask

func

arg

future

next



more

...



__threadtask:f3->more





__threadtask1

func

arg

future

next



__jibqueue:f0->__threadtask





__jibqueue:f1->__threadtask1





more->__threadtask1





建立一個 tpool 要對應建立一個 jobqueue
count 代表總共有幾個 thread 也就是幾個 workers

        for (int i = 0; i < count; i++) {
            if (pthread_create(&pool->workers[i], NULL, jobqueue_fetch,
                               (void *) jobqueue)) {
                for (int j = 0; j < i; j++)
                    pthread_cancel(pool->workers[j]);
                for (int j = 0; j < i; j++)
                    pthread_join(pool->workers[j], NULL);
                free(pool->workers);
                jobqueue_destroy(jobqueue);
                free(pool);
                return NULL;
            }
        }

其中假如 pool 沒有成功 malloc , 就會將其中的 wokers jobqueue free 掉,但這部分會特意只保留 worker[i],因為其他的 worker 被建立又被 cancel 掉,這部分可能有改進部分,由下面可以知道 thread create 成功將會 return 0 所以 if 將會跳至 return pool 的部分。

pthread_create

int pthread_create(pthread_t *restrict thread, const pthread_attr_t *restrict attr, void *(*start_routine)(void *),void *restrict arg);

The pthread_create() function starts a new thread in the calling process. The new thread starts execution by invoking start_routine(); arg is passed as the sole argument of start_routine().
其中 pthread_t *restrict thread 為建立的 thread , const pthread_attr_t *restrict attr 為 condition attributes object 可以設定為 NULLvoid *(*start_routine)(void *) 為 function void *restrict arg 為輸入 function 的 argument 。
pthread_create(&pool->workers[i], NULL, jobqueue_fetch, (void *) jobqueue)
對應到上, worker 代表 thread , jobqueue_fetch 為 function , jobqueue 為輸入的 argument , 查看 jobqueue_fetch static void *jobqueue_fetch(void *queue) 就是如此 , 建立成功 return 0 失敗將會 return EBUSY
參考資料:
pthread_create()
pthread_create() in ibm

tpool_apply

struct __tpool_future *tpool_apply(struct __threadpool *pool,
                                   void *(*func)(void *),
                                   void *arg)
{
    jobqueue_t *jobqueue = pool->jobqueue;
    threadtask_t *new_head = malloc(sizeof(threadtask_t));
    struct __tpool_future *future = tpool_future_create();
    if (new_head && future) {
        new_head->func = func, new_head->arg = arg, new_head->future = future;
        pthread_mutex_lock(&jobqueue->rwlock);
        if (jobqueue->head) {
            new_head->next = jobqueue->head;
            jobqueue->head = new_head;
        } else {
            jobqueue->head = jobqueue->tail = new_head;
            pthread_cond_broadcast(&jobqueue->cond_nonempty);//HHH
        }
        pthread_mutex_unlock(&jobqueue->rwlock);
    } else if (new_head) {
        free(new_head);
        return NULL;
    } else if (future) {
        tpool_future_destroy(future);
        return NULL;
    }
    return future;
}

pool->jobqueue ,並對應產生 threadtask 。

        pthread_mutex_lock(&jobqueue->rwlock);
        if (jobqueue->head) {
            new_head->next = jobqueue->head;
            jobqueue->head = new_head;
        } else {
            jobqueue->head = jobqueue->tail = new_head;
            pthread_cond_broadcast(&jobqueue->cond_nonempty);//HHH
        }
        pthread_mutex_unlock(&jobqueue->rwlock);

jobqueue 為 add the head 方式 , 但其中 mutex 為 coarse graind lock 可以使用更細緻一點

你應該提出縮小 critical section 的手段

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
jserv

思考此地方有可能 critical section 情況
將這部分 mutex 全數移除:jobqueue 的順序有可能不正確,但也可以得到正確的結果。

PI calculated with 101 terms: 3.141592653589793

有可能兩個 thread 都想建立 jobqueue->head 連結,所以因此產生順序錯誤,再有鎖的情況下,只許一個 thread 進行 add head 的動作,但經過 gdb 執行狀況,發現到 jobqueue->head 一直保持在 NULL 最主要是因為 jobqueue->head lifetime 不足,也沒將 return jobqueue ,使得 jobqueue->head 只執行 else 的部分,所以這裡就算將 mutex lock 移除對整體也不會有影響。 並不是 lifetime 不足才導致,是因為 thread 已經處理完,並且 free(task) 才會有這樣的結果。

tpool_join

int tpool_join(struct __threadpool *pool)
{
    size_t num_threads = pool->count;
    for (int i = 0; i < num_threads; i++)
        tpool_apply(pool, NULL, NULL);
    for (int i = 0; i < num_threads; i++)
        pthread_join(pool->workers[i], NULL);
    free(pool->workers);
    jobqueue_destroy(pool->jobqueue);
    free(pool);
    return 0;
}

產生 tpool 並加入

Bailey–Borwein–Plouffe formula

/* Use Bailey–Borwein–Plouffe formula to approximate PI */
static void *bpp(void *arg)
{
    int k = *(int *) arg;
    double sum = (4.0 / (8 * k + 1)) - (2.0 / (8 * k + 4)) -
                 (1.0 / (8 * k + 5)) - (1.0 / (8 * k + 6));
    double *product = malloc(sizeof(double));
    if (product)
        *product = 1 / pow(16, k) * sum;
    return (void *) product;
}

π=k=0116k(48k+128k+418k+518k+6)
bpp 將上式表示為 function 。

用 LaTeX 語法改寫 BBP 公式

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
jserv

main

#define PRECISION 100 /* upper bound in BPP sum */
int main()
{
    int bpp_args[PRECISION + 1];
    double bpp_sum = 0;
    tpool_t pool = tpool_create(4);
    tpool_future_t futures[PRECISION + 1];

    for (int i = 0; i <= PRECISION; i++) {
        bpp_args[i] = i;
        futures[i] = tpool_apply(pool, bpp, (void *) &bpp_args[i]);
    }

    for (int i = 0; i <= PRECISION; i++) {
        double *result = tpool_future_get(futures[i], 0 /* blocking wait */);
        bpp_sum += *result;
        tpool_future_destroy(futures[i]);
        free(result);
    }

    tpool_join(pool);
    printf("PI calculated with %d terms: %.15f\n", PRECISION + 1, bpp_sum);
    return 0;
}

PI calculated with 101 terms: 3.141592653589793

tpool_t pool = tpool_create(4) count = 4 。
第一個 for 迴圈: 建立 function = bbp 並與 jobqueue 在與 pool 連結進行管理。
第二個 for 迴圈: 使用 tpool_future_getresult 提取 bpp_sum 將全部 result 合併,並銷毀 thread
銷毀 pool 映出結果。
其中 PRECISION + 1 是因為 0 算第一項,所以 k=100 時就是第 101 項。

pthread_join

The pthread_join() function waits for the thread specified by thread to terminate. If that thread has already terminated, then pthread_join() returns immediately. The thread specified by thread must be joinable.

成功結束,將 return 0 錯誤 return 如下:

EDEADLK A deadlock was detected (e.g., two threads tried to join with each other); or thread specifies the calling thread.
EINVAL thread is not a joinable thread.
EINVAL Another thread is already waiting to join with this thread.
ESRCH No thread with the ID thread could be found.

實際執行結果

執行時發現到 poolfutures 是分開的,而其中 futures[0]futures[100] 而且 jobqueue->headjobqueue->tail = NULL 表示沒作用到。 futuresp[0]->flag = 3tpool_future_destroy(futures[i])&2 會有值所以會成功 destroy 掉並 free(future)

指出改進空間並實作

tpool_future_destroy

int tpool_future_destroy(struct __tpool_future *future) { if (future) { pthread_mutex_lock(&future->mutex); if (future->flag & __FUTURE_FINISHED || future->flag & __FUTURE_CANCELLED) { pthread_mutex_unlock(&future->mutex); pthread_mutex_destroy(&future->mutex); pthread_cond_destroy(&future->cond_finished); free(future); } else { future->flag |= __FUTURE_DESTROYED; pthread_mutex_unlock(&future->mutex); + pthread_mutex_destroy(&future->mutex); + pthread_cond_destroy(&future->cond_finished); + free(future); } } return 0; }

else 時一樣把 futures destroy 掉,並釋放 future 空間。

critical section 縮小

tpool_apply

-pthread_mutex_lock(&jobqueue->rwlock); if (jobqueue->head) { + pthread_mutex_lock(&jobqueue->rwlock); new_head->next = jobqueue->head; jobqueue->head = new_head; + pthread_mutex_unlock(&jobqueue->rwlock); } else { + pthread_mutex_lock(&jobqueue->rwlock); jobqueue->head = jobqueue->tail = new_head; pthread_cond_broadcast(&jobqueue->cond_nonempty);//HHH + pthread_mutex_unlock(&jobqueue->rwlock); } - pthread_mutex_unlock(&jobqueue->rwlock);

Sanitizer 觀察結果

gcc -pthread -o bbp bbp.c -lm
與 Sanitizer 連結 gcc -o bpp bpp.c -lm -pthread -Wall -Wextra -Wshadow -O0 -g -fsanitize=thread

WARNING: ThreadSanitizer: unlock of an unlocked mutex (or by a wrong thread) (pid=3893)
    #0 pthread_mutex_unlock <null> (libtsan.so.0+0x3aafc)
    #1 __jobqueue_fetch_cleanup /home/wayne/linux2021/linux2021_week4/pi.c:139 (bpp+0x6c86)
    #2 jobqueue_fetch /home/wayne/linux2021/linux2021_week4/pi.c:210 (bpp+0x855a)
    #3 <null> <null> (libtsan.so.0+0x2d1af)

  Location is heap block of size 104 at 0x7b1c00000000 allocated by main thread:
    #0 malloc <null> (libtsan.so.0+0x30343)
    #1 jobqueue_create /home/wayne/linux2021/linux2021_week4/pi.c:100 (bpp+0x6047)
    #2 tpool_create /home/wayne/linux2021/linux2021_week4/pi.c:216 (bpp+0x858d)
    #3 main /home/wayne/linux2021/linux2021_week4/bpp.c:26 (bpp+0x9bf8)

  Mutex M9 (0x7b1c00000040) created at:
    #0 pthread_mutex_init <null> (libtsan.so.0+0x4a636)
    #1 jobqueue_create /home/wayne/linux2021/linux2021_week4/pi.c:104 (bpp+0x6226)
    #2 tpool_create /home/wayne/linux2021/linux2021_week4/pi.c:216 (bpp+0x858d)
    #3 main /home/wayne/linux2021/linux2021_week4/bpp.c:26 (bpp+0x9bf8)

SUMMARY: ThreadSanitizer: unlock of an unlocked mutex (or by a wrong thread) (/lib/x86_64-linux-gnu/libtsan.so.0+0x3aafc) in pthread_mutex_unlock
==================
PI calculated with 101 terms: 3.141592653589793
ThreadSanitizer: reported 1 warnings
// pthread_mutex_unlock(&jobqueue->rwlock); - pthread_cleanup_pop(1); + pthread_cleanup_pop(0); pthread_exit(NULL); }

研讀 atomic_threadpool 注意 atomic 操作

lock free
所謂的 lock free 不能只單純看有沒有 lock ,lock free 更重要是一種思維寫程式方式執行在多執行緒下,只要滿足下列條件,就可以說是 lock free ,由下圖可以知道條件不是在是否有 lock 而是有沒有可能 lock 鎖永久才是關鍵

文章還提到使用 CAS(Compare-And-Swap) 來達到 lock free 但是要注意 ABA 問題,所謂的 ABA 問題

Process P1 reads value A from shared memory,
P1 is preempted, allowing process P2 to run,
P2 modifies the shared memory value A to value B and back to A before preemption,
P1 begins execution again, sees that the shared memory value has not changed and continues.
Although P1 can continue executing, it is possible that the behavior will not be correct due to the "hidden" modification in shared memory.

此時 P1 的執行結果就會是錯的。

atomic_threadpool 一開始由 at_thpool_create 來建立足夠的 thread 數量,對應到題目就 tpool_create 的功用, at_thpool_newtask 為建立 task 並 enqueue 至 lfqueue 中,當處理完時,也將其中的 task free 掉,對應到題目就 tpool_apply 的功用,在 lfqueue 操作中用了不少 CAS(Compare-And-Swap) 指令, enqueue 是透過 __LFQ_BOOL_COMPARE_AND_SWAP 從尾部加入,__LFQ_BOOL_COMPARE_AND_SWAP 由 GCC atomic function 來完成相關實作,之後再由 at_thpool_gracefully_shutdown 將 pool destroy 掉,對應到題目 tpool_join

GCC atomic function
這一類操作,主要是對應到 intel 內部硬體所支援的操作才能達到 atomic operation,所以並不適用於所有硬體,不過沒對應到 GCC 也會相應給出警告。

bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)
type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)

These builtins perform an atomic compare and swap. That is, if the current value of *ptr is oldval, then write newval into *ptr.
The “bool” version returns true if the comparison is successful and newval was written. The “val” version returns the contents of *ptr before the operation.

當 compare 和 swap 完成後才會算成功。

lfqueue 中整個表示接沒有使用到 mutex 都是已 atomic builtin function 來表示。

  • enqueue: __LFQ_BOOL_COMPARE_AND_SWAP(&tail->next, NULL, node) node 為新的的節點,本來 tail->next 為 NULL 並 __LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->tail, tail, node) 更新 lfqueue tail 的節點。
  • dequeue: __LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->head, head, next) 來維持 head 並將 head __lfq_recycle_free 掉。

使用 C11 Atomics 改寫

嘗試使用 __sync_bool_compare_and_swap

int x = 10;
float y = 25.00;
int ptr[]={10,20};
printf("ptr %d:%d\n",ptr[0],ptr[1]);
printf("x:y %d:%f\n",x,y);
bool k = __sync_bool_compare_and_swap(ptr, x, y);
printf("k:%d\n",k);
printf("ptr %d:%d\n",ptr[0],ptr[1]);
printf("x:y %d:%f\n",x,y);
k = __sync_bool_compare_and_swap(ptr, x, y);
printf("k:%d\n",k);
printf("ptr %d:%d\n",ptr[0],ptr[1]);
printf("x:y %d:%f\n",x,y);

ptr 10:20
x:y 10:25.000000
k:1
ptr 25:20
x:y 10:25.000000
k:0
ptr 25:20
x:y 10:25.000000

由此可以知道會先比較 type *ptr, type oldval 是否相等,才會將 write newval into *ptr 當中

__sync_bool_compare_and_swap 取代

tpool_apply

+ new_head->next = NULL; -pthread_mutex_lock(&jobqueue->rwlock); if (jobqueue->head) { + __sync_bool_compare_and_swap(&new_head->next, NULL, jobqueue->head); + __sync_bool_compare_and_swap(&jobqueue->head, jobqueue->head, new_head); - new_head->next = jobqueue->head; - jobqueue->head = new_head; } else { + __sync_bool_compare_and_swap(&jobqueue->head, NULL, new_head); - jobqueue->head = jobqueue->tail = new_head; + __sync_bool_compare_and_swap(&jobqueue->tail, NULL, jobqueue->head); pthread_cond_broadcast(&jobqueue->cond_nonempty);//HHH } -pthread_mutex_unlock(&jobqueue->rwlock);

因為全改 builtin atomic operations 形式也可以對應將 mutex 刪去。
實驗 scalability 在本來的情況與部份修改成 builtin atomic operations 的結果

雖然只改寫一點,卻可以看得出 scalability 有部分的改善。
執行時間也有 30% 的縮短

C11 改寫

tpool_apply

+#include <stdatomic.h> if (jobqueue->head) { - __sync_bool_compare_and_swap(&new_head->next, NULL, jobqueue->head); - __sync_bool_compare_and_swap(&jobqueue->head, jobqueue->head, new_head); + atomic_compare_exchange_strong(&new_head->next, &new_head->next, jobqueue->head); + atomic_compare_exchange_strong(&jobqueue->head, &jobqueue->head, new_head); } else { - __sync_bool_compare_and_swap(&jobqueue->head, NULL, new_head); - __sync_bool_compare_and_swap(&jobqueue->tail, NULL, jobqueue->head); + atomic_compare_exchange_strong(&jobqueue->head, &jobqueue->head, new_head); + atomic_compare_exchange_strong(&jobqueue->tail, &jobqueue->tail, jobqueue->head); pthread_cond_broadcast(&jobqueue->cond_nonempty);//HHH }

jobqueue_fetch

if (jobqueue->head == jobqueue->tail) { - task = jobqueue->tail; - jobqueue->head = jobqueue->tail = NULL; + task = atomic_load(&jobqueue->tail); + atomic_compare_exchange_strong(&jobqueue->head, &jobqueue->head, NULL); + atomic_compare_exchange_strong(&jobqueue->tail, &jobqueue->tail, NULL); } else { threadtask_t *tmp; for (tmp = jobqueue->head; tmp->next != jobqueue->tail; tmp = tmp->next) ; - task = tmp->next; - tmp->next = NULL; - jobqueue->tail = tmp; + task = atmoic_load(&tmp->next); + atomic_compare_exchange_strong(&tmp->next, &tmp->next, NULL); + atomic_compare_exchange_strong(&jobqueue->tail, &jobqueue->tail, tmp); }

參考資料:
atomic_load

因此可以看到,在 queue 的維護成本還是相當的巨大,尤其在for (tmp = jobqueue->head; tmp->next != jobqueue->tail;tmp = tmp->next) 對於 tail 的維護,是沒辦法改進的,這也是之後朝向 cmwq 的原因。