Try   HackMD

2021q1 Homework4 (quiz4)

contributed by < linD026 >

tags: linux2021

2021 年第 4 週隨堂測驗題目

0 進度

  • 1 解釋程式碼運作原理(包含 timeout 處理機制)與改進
  • 2 atomic_threadpool
    • lock-free 說明
  • 3 C11 Atomics 改進, scalability 提升
    • NULL

1 解釋程式碼運作原理(包含 timeout 處理機制)與改進

結構定義以及 task 流程

  • jobqueue_t

    • 利用 tpool_applytask 放入 jobqueue_t
    • thread pool 的執行緒會利用 jobqueue_fetchjobqueue_t 中得到需要執行的 task
    • queue 是空了的時候,執行緒會等待。
      • pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock);
    • 每次執行緒拿取時會上 rwlock
    ​​​​typedef struct __jobqueue {
    ​​​​    threadtask_t *head, *tail;
    ​​​​    pthread_cond_t cond_nonempty;
    ​​​​    pthread_mutex_t rwlock;
    ​​​​} jobqueue_t;
    
  • threadtask_t

    • 執行緒在 jobqueue_fetch 函式中從 jobqueue_t 接收的 task 結構
    • 並且執行 func
      • void *ret_value = task->func(task->arg);
    ​​​​typedef struct __threadtask {
    ​​​​    void *(*func)(void *);
    ​​​​    void *arg;
    ​​​​    struct __tpool_future *future;
    ​​​​    struct __threadtask *next;
    ​​​​} threadtask_t;
    
  • __tpool_future

    • threadtask_t 結構所包住
    • 是確認 task 的狀態以及儲存 func 回傳結果的結構
      • 利用 tpool_future_get 確認執行時間是否超過限制以及得到結果
        • 利用 pthread_cond_timedwait 回傳的數值來確認
        • 如果確認超過時間則改變 flag 狀態
          • future->flag |= __FUTURE_TIMEOUT;
    • 也是管理執行緒實作層級的結構
    ​​​​struct __tpool_future {
    ​​​​    int flag;
    ​​​​    void *result;
    ​​​​    pthread_mutex_t mutex;
    ​​​​    pthread_cond_t cond_finished;
    ​​​​};
    
  • __threadpool

    • count 為執行緒總數
    ​​​​struct __threadpool {
    ​​​​    size_t count;
    ​​​​    pthread_t *workers;
    ​​​​    jobqueue_t *jobqueue;
    ​​​​};
    
    • 所以一個 task 的整體流程會是
    
    
    
    
    
    
    main
    
    
    cluster_queue
    
    queue
    
    
    cluster_pool
    
    thread pool
    
    
    cluster_thread0
    
    thread 0
    
    
    cluster_thread1
    
    thread 1
    
    
    cluster_apply
    
    tpool_apply
    
    
    
    queue
    
    head
    
    task 0
    
    task 1
    
    . . .
    
    task N
    
    tail
    
    
    
    fetch0
    
    jobqueue_fetch
    
    func
    
    
    
    queue:head->fetch0
    
    
    pop
    
    
    
    cancel
    
    destoryed (canceled)
    
    
    
    fetch0->cancel
    
    
    timeout
    
    
    
    finish
    
    finished
    
    
    
    fetch0->finish
    
    
    task finished the work
    
    
    
    fetch1
    
    __________________
    
    
    
    task
    
    task
    
    
    
    task->queue:tail
    
    
    push
    
    
    
    
  • __future_flags

    • 可以利用以下 flags 來判斷 task 的狀態
    • 例如:
      • __FUTURE_TIMEOUT & __FUTURE_DESTROYED 為 1
      • __FUTURE_FINISHED__FUTURE_DESTROYED 為 0
    • 可以區分對超出時間限制的 task 。
    ​​​​enum __future_flags {
    ​​​​    __FUTURE_RUNNING = 01,      /*0000 0001*/
    ​​​​    __FUTURE_FINISHED = 02,     /*0000 0010*/
    ​​​​    __FUTURE_TIMEOUT = 04,      /*0000 0100*/
    ​​​​    __FUTURE_CANCELLED = 010,   /*0000 1010*/
    ​​​​    __FUTURE_DESTROYED = 020,   /*0001 0100*/
    ​​​​};
    

函式解釋

  • tpool_apply

    ​​​​struct __tpool_future *tpool_apply(struct __threadpool *pool,
    ​​​​                                   void *(*func)(void *),
    ​​​​                                   void *arg)
    ​​​​{
    ​​​​    jobqueue_t *jobqueue = pool->jobqueue;
    ​​​​    threadtask_t *new_head = malloc(sizeof(threadtask_t));
    ​​​​    struct __tpool_future *future = tpool_future_create();
    ​​​​    if (new_head && future) {
    ​​​​        new_head->func = func, new_head->arg = arg, new_head->future = future;
    ​​​​        pthread_mutex_lock(&jobqueue->rwlock);
    ​​​​        if (jobqueue->head) {
    ​​​​            new_head->next = jobqueue->head;
    ​​​​            jobqueue->head = new_head;
    ​​​​        } else {
    ​​​​            jobqueue->head = jobqueue->tail = new_head;
    ​​​​            // HHH
    ​​​​            pthread_cond_broadcast(&jobqueue->cond_nonempty);
    ​​​​        }
    ​​​​        pthread_mutex_unlock(&jobqueue->rwlock);
    ​​​​    } else if (new_head) {
    ​​​​        free(new_head);
    ​​​​        return NULL;
    ​​​​    } else if (future) {
    ​​​​        tpool_future_destroy(future);
    ​​​​        return NULL;
    ​​​​    }
    ​​​​    return future;
    ​​​​}
    
  • tpool_future_get

    • 此函式會利用 pthread_cond_timedwait 來檢測 task 是否完成。
    • 若超出時間限制距今 seconds 後,則會回傳直至 status
    • 而若 statusETIMEDOUT 則會使 taskflag 設為 __FUTURE_TIMEOUT 並且不等 task 結束回傳 NULL
    • 然而此函式在時間處理上有問題:
      • CLOCK_MONOTONIC 為:

        CLOCK_MONOTONIC
        Clock that cannot be set and represents monotonic time since—as described by POSIX—"some unspecified point in the past". On Linux, that point corresponds to the number of seconds that the system has been running since it was booted.

      • 因此若要以 second 為單位來計算超除時間應以 CLOCK_REALTIME 來取得現在時間。
    ​​​​void *tpool_future_get(struct __tpool_future *future, unsigned int seconds)
    ​​​​{
    ​​​​    pthread_mutex_lock(&future->mutex);
    ​​​​    /* turn off the timeout bit set previously */
    ​​​​    future->flag &= ~__FUTURE_TIMEOUT;
    ​​​​    while ((future->flag & __FUTURE_FINISHED) == 0) {
    ​​​​        if (seconds) {
    ​​​​            struct timespec expire_time;
    ​​​​            clock_gettime(CLOCK_MONOTONIC, &expire_time);
    ​​​​            expire_time.tv_sec += seconds;
    ​​​​            int status = pthread_cond_timedwait(&future->cond_finished,
    ​​​​                                                &future->mutex, &expire_time);
    ​​​​            if (status == ETIMEDOUT) {
    ​​​​                future->flag |= __FUTURE_TIMEOUT;
    ​​​​                pthread_mutex_unlock(&future->mutex);
    ​​​​                return NULL;
    ​​​​            }
    ​​​​        } else
    ​​​​            pthread_cond_wait(&future->cond_finished, &future->mutex);
    ​​​​    }
    ​​​​    pthread_mutex_unlock(&future->mutex);
    ​​​​    return future->result;
    ​​​​}
    
  • jobqueue_fetch

    • 此函式為執行緒主要執行函式。
    • 此函式會從 queue 裡拿取 task,若沒有則會等待。
    • 拿取的 task 會以呼叫函式形式執行,並搭配 future 來確認此狀態。
    ​​​​static void *jobqueue_fetch(void *queue)
    ​​​​{
    ​​​​    jobqueue_t *jobqueue = (jobqueue_t *) queue;
    ​​​​    threadtask_t *task;
    ​​​​    int old_state;
    
    ​​​​    pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock);
    
    ​​​​    while (1) {
    
    ​​​​        pthread_mutex_lock(&jobqueue->rwlock);
    ​​​​        pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state);
    ​​​​        pthread_testcancel();
    
    ​​​​        while (!jobqueue->tail)
    ​​​​            pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock);
    
    ​​​​        pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
    ​​​​        if (jobqueue->head == jobqueue->tail) {
    ​​​​            task = jobqueue->tail;
    ​​​​            jobqueue->head = jobqueue->tail = NULL;
    ​​​​        } else {
    ​​​​            threadtask_t *tmp;
    ​​​​            for (tmp = jobqueue->head; tmp->next != jobqueue->tail;
    ​​​​                 tmp = tmp->next)
    ​​​​                ;
    ​​​​            task = tmp->next;
    ​​​​            tmp->next = NULL;
    ​​​​            jobqueue->tail = tmp;
    ​​​​        }
    ​​​​        pthread_mutex_unlock(&jobqueue->rwlock);
    
    ​​​​        if (task->func) {
    ​​​​            /***********************************************/
    ​​​​            pthread_mutex_lock(&task->future->mutex);
    ​​​​            // task cancelled
    ​​​​            if (task->future->flag & __FUTURE_CANCELLED) {
    ​​​​                pthread_mutex_unlock(&task->future->mutex);
    ​​​​                free(task);
    ​​​​                continue;
    ​​​​            } else {
    ​​​​            // task running
    ​​​​                task->future->flag |= __FUTURE_RUNNING;
    ​​​​                pthread_mutex_unlock(&task->future->mutex);
    ​​​​            }
    ​​​​            /***********************************************/
    ​​​​            // FUNCTION START
    ​​​​            void *ret_value = task->func(task->arg);
    ​​​​            pthread_mutex_lock(&task->future->mutex);
    ​​​​            if (task->future->flag & __FUTURE_DESTROYED) {
    ​​​​                pthread_mutex_unlock(&task->future->mutex);
    ​​​​                pthread_mutex_destroy(&task->future->mutex);
    ​​​​                pthread_cond_destroy(&task->future->cond_finished);
    ​​​​                free(task->future);
    ​​​​            } else {
    ​​​​                task->future->flag |= __FUTURE_FINISHED;
    ​​​​                task->future->result = ret_value;
    ​​​​                pthread_cond_broadcast(&task->future->cond_finished);
    ​​​​                pthread_mutex_unlock(&task->future->mutex);
    ​​​​            }
    ​​​​            free(task);
    ​​​​            /***********************************************/
    
    ​​​​        } else {
    ​​​​            pthread_mutex_destroy(&task->future->mutex);
    ​​​​            pthread_cond_destroy(&task->future->cond_finished);
    ​​​​            free(task->future);
    ​​​​            free(task);
    ​​​​            break;
    ​​​​        }
    ​​​​    }
    ​​​​    pthread_cleanup_pop(0);
    ​​​​    pthread_exit(NULL);
    ​​​​}
    

改進 1 - thread cancel

說明

以目前 tpool_future_get 對超出時間限制的 task 操作仍然是等到 task 結束後才處理。
因此如果 task 是花超長時間才會結束,在那段時間內會有一個 thread 被浪費。
所以利用 thread_cancel 來停止,隨後重新運作。

  • 為了要在 tpool_future_get 裡看到目前操作的執行緒為何,先在 struct __tpool_future 裡增加當前 thread 的 pid
    ​​​​struct __tpool_future {
    ​​​​    int flag;
    ​​​​    void *result;
    ​​​​    pthread_t pid;
    ​​​​    pthread_mutex_t mutex;
    ​​​​    pthread_cond_t cond_finished;
    ​​​​};
    
  • 之後在 jobqueue_fetch 裡利用 pthread_setcanceltype 設定如果遇到 cancellation request 時可以立即 cancel (但系統不一定保證)

    A thread's cancellation type, determined by pthread_setcanceltype(3), may be either asynchronous or deferred (the default for new threads). Asynchronous cancelability means that the thread can be canceled at any time (usually immediately, but the system does not guarantee this).

    ​​​​int old_type;
    ​​​​void *ret_value = NULL;
    ​​​​pthread_cleanup_push(__task_cleanup, task);
    ​​​​pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &old_type);
    ​​​​printf("get in the work\n");
    ​​​​ret_value = task->func(task->arg);
    ​​​​printf("get out the work\n");
    ​​​​pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &old_type);
    ​​​​pthread_cleanup_pop(0);
    

    要設定能夠在 task 過長時 cancel 的狀態,那麼下面設定執行緒 state 的函式須刪除。
    不需要設定 PTHREAD_CANCEL_ENABLE 的狀態,是因為執行緒的初始狀態已經是如此了。

    ​​​​pthread_mutex_lock(&jobqueue->rwlock); ​​​​pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state); ​​​​pthread_testcancel(); ​​​​while (!jobqueue->tail) ​​​​ pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock); ​​​​pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state); ​​​​if (jobqueue->head == jobqueue->tail) { ​​​​ task = jobqueue->tail; ​​​​ jobqueue->head = jobqueue->tail = NULL; ​​​​} else { ​​​​ threadtask_t *tmp; ​​​​ for (tmp = jobqueue->head; tmp->next != jobqueue->tail; ​​​​ tmp = tmp->next) ​​​​ ; ​​​​ task = tmp->next; ​​​​ tmp->next = NULL; ​​​​ jobqueue->tail = tmp; ​​​​} ​​​​pthread_mutex_unlock(&jobqueue->rwlock);

    pthread_setcancelstate

    The pthread_setcancelstate() sets the cancelability state of the calling thread to the value given in state. The previous cancelability state of the thread is returned in the buffer pointed to by oldstate. The state argument must have one of the following values:
    PTHREAD_CANCEL_ENABLE
    The thread is cancelable. This is the default cancelability state in all new threads, including the initial thread. The thread's cancelability type determines when a cancelable thread will respond to a cancellation request.
    PTHREAD_CANCEL_DISABLE
    The thread is not cancelable. If a cancellation request is received, it is blocked until cancelability is enabled.

    pthread_setcanceltype

    The pthread_setcanceltype() sets the cancelability type of the calling thread to the value given in type. The previous cancelability type of the thread is returned in the buffer pointed to by oldtype. The type argument must have one of the following values:
    PTHREAD_CANCEL_DEFERRED
    A cancellation request is deferred until the thread next calls a function that is a cancellation point (see pthreads(7)). This is the default cancelability type in all new threads, including the initial thread.
    PTHREAD_CANCEL_ASYNCHRONOUS
    The thread can be canceled at any time. (Typically, it will be canceled immediately upon receiving a cancellation request, but the system doesn't guarantee this.)

  • 而在 tpool_future_get 如果超出時間會利用之前在 future 所紀錄的 pid 以及 pthread_cancel 發送 cancellation request 。
    • 在確認 cancel 後,利用 pid 來尋找 pool 裡被 cancel 的 thread ,之後重新 create 。
    ​​​​static void *jobqueue_fetch(void *queue);
    ​​​​void *tpool_future_get(struct __threadpool *pool, struct __tpool_future *future, unsigned int seconds)
    ​​​​{
    ​​​​    pthread_mutex_lock(&future->mutex);
    ​​​​    /* turn off the timeout bit set previously */
    ​​​​    future->flag &= ~__FUTURE_TIMEOUT;
    ​​​​    while ((future->flag & __FUTURE_FINISHED) == 0) {
    ​​​​        if (seconds) {
    ​​​​            struct timespec expire_time;
    ​​​​            clock_gettime(CLOCK_REALTIME, &expire_time);
    ​​​​            expire_time.tv_sec += seconds;
    ​​​​            int status = pthread_cond_timedwait(&future->cond_finished,
    ​​​​                                                &future->mutex, &expire_time);
    ​​​​            if (status == ETIMEDOUT) {
    ​​​​                future->flag |= __FUTURE_TIMEOUT;
    ​​​​                pthread_t pid = future->pid;
    ​​​​                pthread_mutex_unlock(&future->mutex);
    ​​​​                int cancel_status = pthread_cancel(future->pid);
    ​​​​                if (cancel_status != ESRCH) {
    ​​​​                    printf("canceled\n");
    ​​​​                    future = NULL;
    ​​​​                    int i;
    ​​​​                    for (i = 0;i < pool->count;i++)
    ​​​​                    if (pool->workers[i] == pid) {
    ​​​​                        printf("find out %d with pid %ld\n", i, pid);
    ​​​​                        break;
    ​​​​                    }
    ​​​​                    pthread_join(pid, NULL);
    ​​​​                    pthread_create(&pool->workers[i], NULL, jobqueue_fetch,
    ​​​​                               (void *) pool->jobqueue);
    ​​​​                }
    ​​​​                else
    ​​​​                    printf("cancel failed\n");
    ​​​​                return NULL;
    ​​​​            }
    ​​​​        } else
    ​​​​            pthread_cond_wait(&future->cond_finished, &future->mutex);
    ​​​​            //FFF
    ​​​​    }
    
    ​​​​    pthread_mutex_unlock(&future->mutex);
    ​​​​    return future->result;
    ​​​​}
    
  • 呼叫 thread_cancel 時 handler 對 task 的作處理
    ​​​​static void __task_cleanup(void *arg) {
    ​​​​    threadtask_t *task = (threadtask_t *) arg;
    ​​​​    pthread_mutex_unlock(&task->future->mutex);
    ​​​​    pthread_mutex_destroy(&task->future->mutex);
    ​​​​    pthread_cond_destroy(&task->future->cond_finished);
    ​​​​    free(task->future->result);
    ​​​​    free(task->future);
    ​​​​    free(task);
    ​​​​}
    

測試

  • cancel test
    ​​​​static void *dummy(void *arg) {
    ​​​​    sleep(10);
    ​​​​    double *product = malloc(sizeof(double));
    ​​​​    printf("dummy %d\n", *(int*)arg);
    ​​​​    *product = 1;
    ​​​​    return (void *) product;
    ​​​​}
    
    ​​​​#define task_n 8
    ​​​​#define wait_t 1
    ​​​​int main()
    ​​​​{
    ​​​​    tpool_t pool = tpool_create(4);
    ​​​​    tpool_future_t futures[task_n];
    ​​​​    int temp[task_n] = {0};
    ​​​​    for (int i = 0; i < task_n; i++) {
    ​​​​        temp[i] = i;
    ​​​​        futures[i] = tpool_apply(pool, dummy, (void *) &temp[i]);
    ​​​​    }
    ​​​​    int sum = 0;
    ​​​​    for (int i = 0; i < task_n; i++) {
    ​​​​        double *result = tpool_future_get(pool, futures[i], wait_t);
    ​​​​        if (result != NULL) {
    ​​​​            sum += *result;
    ​​​​            tpool_future_destroy(futures[i]);
    ​​​​            free(result);
    ​​​​        }
    ​​​​    }
    ​​​​    tpool_join(pool);
    ​​​​    printf("sum %d\n", sum);
    ​​​​    return 0;
    ​​​​}
    
    • result
      ​​​​​​​​$ ./cancel
      ​​​​​​​​canceled
      ​​​​​​​​find out 1 with pid 139919644755712
      ​​​​​​​​canceled
      ​​​​​​​​find out 0 with pid 139919653148416
      ​​​​​​​​canceled
      ​​​​​​​​find out 2 with pid 139919636363008
      ​​​​​​​​canceled
      ​​​​​​​​find out 3 with pid 139919627970304
      ​​​​​​​​canceled
      ​​​​​​​​find out 1 with pid 139919644755712
      ​​​​​​​​canceled
      ​​​​​​​​find out 0 with pid 139919653148416
      ​​​​​​​​canceled
      ​​​​​​​​find out 2 with pid 139919636363008
      ​​​​​​​​canceled
      ​​​​​​​​find out 3 with pid 139919627970304
      ​​​​​​​​sum 0
      
  • success test
    ​​​​static void *dummy(void *arg) {
    ​​​​    sleep(1);
    ​​​​}
    ​​​​#define wait_t 10
    
    • result
      ​​​​​​​​$ ./cancel
      ​​​​​​​​dummy 2
      ​​​​​​​​finished
      ​​​​​​​​dummy 0
      ​​​​​​​​finished
      ​​​​​​​​dummy 3
      ​​​​​​​​finished
      ​​​​​​​​dummy 1
      ​​​​​​​​finished
      ​​​​​​​​dummy 4
      ​​​​​​​​finished
      ​​​​​​​​dummy 6
      ​​​​​​​​finished
      ​​​​​​​​dummy 5
      ​​​​​​​​finished
      ​​​​​​​​dummy 7
      ​​​​​​​​finished
      ​​​​​​​​sum 8
      
  • random test
    ​​​​static void *dummy(void *arg) {
    ​​​​    srandom(time(NULL));
    ​​​​    sleep( random() % 10);
    ​​​​}
    ​​​​#define wait_t 2
    
    • result
      ​​​​​​​​$ ./cancel
      ​​​​​​​​canceled
      ​​​​​​​​find out 3 with pid 139754115122944
      ​​​​​​​​canceled
      ​​​​​​​​find out 1 with pid 139754131908352
      ​​​​​​​​dummy 4
      ​​​​​​​​finished
      ​​​​​​​​dummy 2
      ​​​​​​​​finished
      ​​​​​​​​dummy 3
      ​​​​​​​​finished
      ​​​​​​​​dummy 7
      ​​​​​​​​finished
      ​​​​​​​​canceled
      ​​​​​​​​find out 3 with pid 139754115122944
      ​​​​​​​​dummy 6
      ​​​​​​​​finished
      ​​​​​​​​sum 5
      

分析

設定八個 task 並且執行下列 dummy 函式,時間限制為 1 秒。
而因 dummysleep(10); 函式因此會一定會被 cancel 。

static void *dummy(void *arg) {
    sleep(10);
    double *product = malloc(sizeof(double));
    printf("dummy %d\n", *(int*)arg);
    *product = 1;
    return (void *) product;
}

#define task_n 8
#define wait_t 1

時間

如果以 terminal 裡的 time 來檢測,可以看出確實有在此情況省了很多時間。

$ time ./rigin
dummy 2
dummy 1
dummy 3
dummy 0
dummy 4
dummy 6
dummy 5
dummy 7
sum 0

real	0m20.004s
user	0m0.001s
sys	0m0.005s

--------------------------------------

$ time ./cancel
canceled
find out 3 with pid 140378407266048
canceled
find out 2 with pid 140378415658752
canceled
find out 0 with pid 140378432444160
canceled
find out 1 with pid 140378424051456
canceled
find out 3 with pid 140378407266048
canceled
find out 2 with pid 140378415658752
canceled
find out 0 with pid 140378432444160
canceled
find out 1 with pid 140378424051456
sum 0

real	0m8.007s
user	0m0.002s
sys	0m0.006s

記憶體空間

  • 原處理方式
massif peak detail
--------------------------------------------------------------------------------
  n        time(i)         total(B)   useful-heap(B) extra-heap(B)    stacks(B)
--------------------------------------------------------------------------------
 48        219,908            5,368            4,918           450            0
 49        223,880            5,440            4,974           466            0
 50        225,081            5,832            5,358           474            0
 51        287,488            5,856            5,366           490            0
 52        288,336            5,896            5,374           522            0
 53        288,573            5,936            5,382           554            0
 54        289,225            5,936            5,382           554            0
90.67% (5,382B) (heap allocation functions) malloc/new/new[], --alloc-fns, etc.
->20.18% (1,198B) 0x400D273: _dl_new_object (dl-object.c:89)
| ->20.18% (1,198B) 0x4006E96: _dl_map_object_from_fd (dl-load.c:997)
|   ->20.18% (1,198B) 0x400A61A: _dl_map_object (dl-load.c:2236)
|     ->20.18% (1,198B) 0x4015D36: dl_open_worker (dl-open.c:513)
|       ->20.18% (1,198B) 0x49DE8B7: _dl_catch_exception (dl-error-skeleton.c:208)
|         ->20.18% (1,198B) 0x40155F9: _dl_open (dl-open.c:837)
|           ->20.18% (1,198B) 0x49DD860: do_dlopen (dl-libc.c:96)
|             ->20.18% (1,198B) 0x49DE8B7: _dl_catch_exception (dl-error-skeleton.c:208)
|               ->20.18% (1,198B) 0x49DE982: _dl_catch_error (dl-error-skeleton.c:227)
|                 ->20.18% (1,198B) 0x49DD994: dlerror_run (dl-libc.c:46)
|                   ->20.18% (1,198B) 0x49DD994: __libc_dlopen_mode (dl-libc.c:195)
|                     ->20.18% (1,198B) 0x486D99A: pthread_cancel_init (unwind-forcedunwind.c:53)
|                       ->20.18% (1,198B) 0x486DBB3: _Unwind_ForcedUnwind (unwind-forcedunwind.c:127)
|                         ->20.18% (1,198B) 0x486BF05: __pthread_unwind (unwind.c:121)
|                           ->20.18% (1,198B) 0x4862971: __do_cancel (pthreadP.h:309)
|                             ->20.18% (1,198B) 0x4862971: pthread_exit (pthread_exit.c:28)
|                               ->20.18% (1,198B) 0x109CA6: jobqueue_fetch (original.c:210)
|                                 ->20.18% (1,198B) 0x4861608: start_thread (pthread_create.c:477)
|                                   ->20.18% (1,198B) 0x499D292: clone (clone.S:95)
|                                     
->19.27% (1,144B) 0x1094AD: tpool_future_create (original.c:44)
| ->19.27% (1,144B) 0x109E9B: tpool_apply (original.c:253)
|   ->14.02% (832B) 0x10A128: main (origin_main.c:34)
|   | 
|   ->05.26% (312B) 0x109FC1: tpool_join (original.c:280)
|     ->05.26% (312B) 0x10A1DC: main (origin_main.c:48)
|       
->18.33% (1,088B) 0x40149CA: allocate_dtv (dl-tls.c:286)
| ->18.33% (1,088B) 0x40149CA: _dl_allocate_tls (dl-tls.c:532)
|   ->18.33% (1,088B) 0x4862322: allocate_stack (allocatestack.c:622)
|     ->18.33% (1,088B) 0x4862322: pthread_create@@GLIBC_2.2.5 (pthread_create.c:660)
|       ->18.33% (1,088B) 0x109D86: tpool_create (original.c:227)
|         ->18.33% (1,088B) 0x10A0BF: main (origin_main.c:26)
|           
->17.25% (1,024B) 0x48FFE83: _IO_file_doallocate (filedoalloc.c:101)
| ->17.25% (1,024B) 0x491004F: _IO_doallocbuf (genops.c:347)
|   ->17.25% (1,024B) 0x490F0AF: _IO_file_overflow@@GLIBC_2.2.5 (fileops.c:745)
|     ->17.25% (1,024B) 0x490D834: _IO_new_file_xsputn (fileops.c:1244)
|       ->17.25% (1,024B) 0x490D834: _IO_file_xsputn@@GLIBC_2.2.5 (fileops.c:1197)
|         ->17.25% (1,024B) 0x48F4AF1: __vfprintf_internal (vfprintf-internal.c:1373)
|           ->17.25% (1,024B) 0x48DFEBE: printf (printf.c:33)
|             ->17.25% (1,024B) 0x10A081: dummy (origin_main.c:15)
|               ->17.25% (1,024B) 0x109B44: jobqueue_fetch (original.c:185)
|                 ->17.25% (1,024B) 0x4861608: start_thread (pthread_create.c:477)
|                   ->17.25% (1,024B) 0x499D292: clone (clone.S:95)
|                     
->06.47% (384B) 0x401330A: _dl_check_map_versions (dl-version.c:274)
| ->06.47% (384B) 0x40160EC: dl_open_worker (dl-open.c:577)
|   ->06.47% (384B) 0x49DE8B7: _dl_catch_exception (dl-error-skeleton.c:208)
|     ->06.47% (384B) 0x40155F9: _dl_open (dl-open.c:837)
|       ->06.47% (384B) 0x49DD860: do_dlopen (dl-libc.c:96)
|         ->06.47% (384B) 0x49DE8B7: _dl_catch_exception (dl-error-skeleton.c:208)
|           ->06.47% (384B) 0x49DE982: _dl_catch_error (dl-error-skeleton.c:227)
|             ->06.47% (384B) 0x49DD994: dlerror_run (dl-libc.c:46)
|               ->06.47% (384B) 0x49DD994: __libc_dlopen_mode (dl-libc.c:195)
|                 ->06.47% (384B) 0x486D99A: pthread_cancel_init (unwind-forcedunwind.c:53)
|                   ->06.47% (384B) 0x486DBB3: _Unwind_ForcedUnwind (unwind-forcedunwind.c:127)
|                     ->06.47% (384B) 0x486BF05: __pthread_unwind (unwind.c:121)
|                       ->06.47% (384B) 0x4862971: __do_cancel (pthreadP.h:309)
|                         ->06.47% (384B) 0x4862971: pthread_exit (pthread_exit.c:28)
|                           ->06.47% (384B) 0x109CA6: jobqueue_fetch (original.c:210)
|                             ->06.47% (384B) 0x4861608: start_thread (pthread_create.c:477)
|                               ->06.47% (384B) 0x499D292: clone (clone.S:95)
|                                 
->03.23% (192B) 0x109E92: tpool_apply (original.c:252)
| ->01.62% (96B) 0x109FC1: tpool_join (original.c:280)
| | ->01.62% (96B) 0x10A1DC: main (origin_main.c:48)
| |   
| ->01.62% (96B) 0x10A128: main (origin_main.c:34)
|   
->03.10% (184B) in 5 places, all below massif's threshold (1.00%)
| 
->01.75% (104B) 0x109704: jobqueue_create (original.c:103)
| ->01.75% (104B) 0x109CC0: tpool_create (original.c:215)
|   ->01.75% (104B) 0x10A0BF: main (origin_main.c:26)
|     
->01.08% (64B) 0x10A064: dummy (origin_main.c:14)
  ->01.08% (64B) 0x109B44: jobqueue_fetch (original.c:185)
    ->01.08% (64B) 0x4861608: start_thread (pthread_create.c:477)
      ->01.08% (64B) 0x499D292: clone (clone.S:95)
  • cancel
massif peak detail
--------------------------------------------------------------------------------
  n        time(i)         total(B)   useful-heap(B) extra-heap(B)    stacks(B)
--------------------------------------------------------------------------------
 30        316,894            5,416            5,134           282            0
94.79% (5,134B) (heap allocation functions) malloc/new/new[], --alloc-fns, etc.
->22.12% (1,198B) 0x400D273: _dl_new_object (dl-object.c:89)
| ->22.12% (1,198B) 0x4006E96: _dl_map_object_from_fd (dl-load.c:997)
|   ->22.12% (1,198B) 0x400A61A: _dl_map_object (dl-load.c:2236)
|     ->22.12% (1,198B) 0x4015D36: dl_open_worker (dl-open.c:513)
|       ->22.12% (1,198B) 0x4B2D8B7: _dl_catch_exception (dl-error-skeleton.c:208)
|         ->22.12% (1,198B) 0x40155F9: _dl_open (dl-open.c:837)
|           ->22.12% (1,198B) 0x4B2C860: do_dlopen (dl-libc.c:96)
|             ->22.12% (1,198B) 0x4B2D8B7: _dl_catch_exception (dl-error-skeleton.c:208)
|               ->22.12% (1,198B) 0x4B2D982: _dl_catch_error (dl-error-skeleton.c:227)
|                 ->22.12% (1,198B) 0x4B2C994: dlerror_run (dl-libc.c:46)
|                   ->22.12% (1,198B) 0x4B2C994: __libc_dlopen_mode (dl-libc.c:195)
|                     ->22.12% (1,198B) 0x486D99A: pthread_cancel_init (unwind-forcedunwind.c:53)
|                       ->22.12% (1,198B) 0x486A043: pthread_cancel (pthread_cancel.c:38)
|                         ->22.12% (1,198B) 0x1096F5: tpool_future_get (thread_pi.c:100)
|                           ->22.12% (1,198B) 0x10A3E8: main (main.c:54)
|                             
->20.09% (1,088B) 0x40149CA: allocate_dtv (dl-tls.c:286)
| ->20.09% (1,088B) 0x40149CA: _dl_allocate_tls (dl-tls.c:532)
|   ->20.09% (1,088B) 0x4862322: allocate_stack (allocatestack.c:622)
|     ->20.09% (1,088B) 0x4862322: pthread_create@@GLIBC_2.2.5 (pthread_create.c:660)
|       ->20.09% (1,088B) 0x109F11: tpool_create (thread_pi.c:303)
|         ->20.09% (1,088B) 0x10A32A: main (main.c:40)
|           
->18.91% (1,024B) 0x4A4EE83: _IO_file_doallocate (filedoalloc.c:101)
| ->18.91% (1,024B) 0x4A5F04F: _IO_doallocbuf (genops.c:347)
|   ->18.91% (1,024B) 0x4A5E0AF: _IO_file_overflow@@GLIBC_2.2.5 (fileops.c:745)
|     ->18.91% (1,024B) 0x4A5C834: _IO_new_file_xsputn (fileops.c:1244)
|       ->18.91% (1,024B) 0x4A5C834: _IO_file_xsputn@@GLIBC_2.2.5 (fileops.c:1197)
|         ->18.91% (1,024B) 0x4A51677: puts (ioputs.c:40)
|           ->18.91% (1,024B) 0x10970E: tpool_future_get (thread_pi.c:102)
|             ->18.91% (1,024B) 0x10A3E8: main (main.c:54)
|               
->16.54% (896B) 0x1094ED: tpool_future_create (thread_pi.c:47)
| ->16.54% (896B) 0x10A026: tpool_apply (thread_pi.c:329)
|   ->16.54% (896B) 0x10A393: main (main.c:48)
|     
->07.09% (384B) 0x401330A: _dl_check_map_versions (dl-version.c:274)
| ->07.09% (384B) 0x40160EC: dl_open_worker (dl-open.c:577)
|   ->07.09% (384B) 0x4B2D8B7: _dl_catch_exception (dl-error-skeleton.c:208)
|     ->07.09% (384B) 0x40155F9: _dl_open (dl-open.c:837)
|       ->07.09% (384B) 0x4B2C860: do_dlopen (dl-libc.c:96)
|         ->07.09% (384B) 0x4B2D8B7: _dl_catch_exception (dl-error-skeleton.c:208)
|           ->07.09% (384B) 0x4B2D982: _dl_catch_error (dl-error-skeleton.c:227)
|             ->07.09% (384B) 0x4B2C994: dlerror_run (dl-libc.c:46)
|               ->07.09% (384B) 0x4B2C994: __libc_dlopen_mode (dl-libc.c:195)
|                 ->07.09% (384B) 0x486D99A: pthread_cancel_init (unwind-forcedunwind.c:53)
|                   ->07.09% (384B) 0x486A043: pthread_cancel (pthread_cancel.c:38)
|                     ->07.09% (384B) 0x1096F5: tpool_future_get (thread_pi.c:100)
|                       ->07.09% (384B) 0x10A3E8: main (main.c:54)
|                         
->04.73% (256B) 0x10A01D: tpool_apply (thread_pi.c:328)
| ->04.73% (256B) 0x10A393: main (main.c:48)
|   
->02.36% (128B) in 4 places, all below massif's threshold (1.00%)
| 
->01.92% (104B) 0x109847: jobqueue_create (thread_pi.c:132)
| ->01.92% (104B) 0x109E4B: tpool_create (thread_pi.c:291)
|   ->01.92% (104B) 0x10A32A: main (main.c:40)
|     
->01.03% (56B) 0x400FC39: _dl_map_object_deps (dl-deps.c:479)
  ->01.03% (56B) 0x4015D9F: dl_open_worker (dl-open.c:571)
    ->01.03% (56B) 0x4B2D8B7: _dl_catch_exception (dl-error-skeleton.c:208)
      ->01.03% (56B) 0x40155F9: _dl_open (dl-open.c:837)
        ->01.03% (56B) 0x4B2C860: do_dlopen (dl-libc.c:96)
          ->01.03% (56B) 0x4B2D8B7: _dl_catch_exception (dl-error-skeleton.c:208)
            ->01.03% (56B) 0x4B2D982: _dl_catch_error (dl-error-skeleton.c:227)
              ->01.03% (56B) 0x4B2C994: dlerror_run (dl-libc.c:46)
                ->01.03% (56B) 0x4B2C994: __libc_dlopen_mode (dl-libc.c:195)
                  ->01.03% (56B) 0x486D99A: pthread_cancel_init (unwind-forcedunwind.c:53)
                    ->01.03% (56B) 0x486A043: pthread_cancel (pthread_cancel.c:38)
                      ->01.03% (56B) 0x1096F5: tpool_future_get (thread_pi.c:100)
                        ->01.03% (56B) 0x10A3E8: main (main.c:54)

可以看出兩者記憶體使用率相差不大,並且 ./rigin 也就是原先的方式甚至多出了 0.3 KB

問題

task 資源

  • 超出時間、 cancel thread 、重新開始,這幾個步驟雖然看起來是成功執行了,但對於 task 內部非配的記憶體卻沒辦法釋放掉。
    • 這方面的解決方案可以利用調整資料的初始位置來解決。
    • 也就是說,在執行 task 前先分配所要用的記憶體,並且使用者也自己寫個釋放的函式。
    • 把這些函式與資料新增至 threadtask_t ,之後如 pthread_cleanup_push(__task_cleanup, task); 一樣處理即可。

thread sanitizer

  • 如果開啟 thread sanitizer 會出現下列 data race 警告:
$ gcc -o cancel thread_pi.c main.c -g -lpthread -lm -fsanitize=thread
$ ./cancel
canceled
find out 1 with pid 139852066576128
==================
WARNING: ThreadSanitizer: data race (pid=6779)
  Write of size 8 at 0x7b1c00000070 by thread T2:
    #0 free <null> (libtsan.so.0+0x35f45)
    #1 jobqueue_fetch /home/****/linux2021/W4/thread_pool_BBP/thread_pi.c:248 (cancel+0x3309)
    #2 dummy /home/****/linux2021/W4/thread_pool_BBP/main.c:27 (cancel+0x3c08)
    #3 jobqueue_fetch /home/****/linux2021/W4/thread_pool_BBP/thread_pi.c:253 (cancel+0x3376)
    #4 <null> <null> (libtsan.so.0+0x2d1af)

  Previous write of size 4 at 0x7b1c00000070 by main thread (mutexes: write M416835166011528):
    #0 tpool_future_get /home/****/linux2021/W4/thread_pool_BBP/thread_pi.c:97 (cancel+0x28b8)
    #1 main /home/****/linux2021/W4/thread_pool_BBP/main.c:54 (cancel+0x3dd7)

  Mutex M416835166011528 is already destroyed.

  Thread T2 (tid=6782, running) created by main thread at:
    #0 pthread_create <null> (libtsan.so.0+0x5ea99)
    #1 tpool_create /home/****/linux2021/W4/thread_pool_BBP/thread_pi.c:305 (cancel+0x3699)
    #2 main /home/****/linux2021/W4/thread_pool_BBP/main.c:40 (cancel+0x3c9f)

SUMMARY: ThreadSanitizer: data race (/lib/x86_64-linux-gnu/libtsan.so.0+0x35f45) in __interceptor_free
==================
==================
WARNING: ThreadSanitizer: data race (pid=6779)
  Write of size 8 at 0x7b1c00000080 by thread T2:
    #0 free <null> (libtsan.so.0+0x35f45)
    #1 jobqueue_fetch /home/****/linux2021/W4/thread_pool_BBP/thread_pi.c:248 (cancel+0x3309)
    #2 dummy /home/****/linux2021/W4/thread_pool_BBP/main.c:27 (cancel+0x3c08)
    #3 jobqueue_fetch /home/****/linux2021/W4/thread_pool_BBP/thread_pi.c:253 (cancel+0x3376)
    #4 <null> <null> (libtsan.so.0+0x2d1af)

  Previous read of size 8 at 0x7b1c00000080 by main thread:
    #0 tpool_future_get /home/****/linux2021/W4/thread_pool_BBP/thread_pi.c:100 (cancel+0x28fa)
    #1 main /home/****/linux2021/W4/thread_pool_BBP/main.c:54 (cancel+0x3dd7)

  Thread T2 (tid=6782, running) created by main thread at:
    #0 pthread_create <null> (libtsan.so.0+0x5ea99)
    #1 tpool_create /home/****/linux2021/W4/thread_pool_BBP/thread_pi.c:305 (cancel+0x3699)
    #2 main /home/****/linux2021/W4/thread_pool_BBP/main.c:40 (cancel+0x3c9f)

SUMMARY: ThreadSanitizer: data race (/lib/x86_64-linux-gnu/libtsan.so.0+0x35f45) in __interceptor_free
==================
==================
WARNING: ThreadSanitizer: data race (pid=6779)
  Write of size 8 at 0x7b1c00000088 by thread T2:
    #0 free <null> (libtsan.so.0+0x35f45)
    #1 jobqueue_fetch /home/****/linux2021/W4/thread_pool_BBP/thread_pi.c:248 (cancel+0x3309)
    #2 dummy /home/****/linux2021/W4/thread_pool_BBP/main.c:27 (cancel+0x3c08)
    #3 jobqueue_fetch /home/****/linux2021/W4/thread_pool_BBP/thread_pi.c:253 (cancel+0x3376)
    #4 <null> <null> (libtsan.so.0+0x2d1af)

  Previous atomic read of size 1 at 0x7b1c00000088 by main thread:
    #0 pthread_cond_timedwait <null> (libtsan.so.0+0x6227d)
    #1 tpool_future_get /home/****/linux2021/W4/thread_pool_BBP/thread_pi.c:93 (cancel+0x2888)
    #2 main /home/****/linux2021/W4/thread_pool_BBP/main.c:54 (cancel+0x3dd7)

  Thread T2 (tid=6782, running) created by main thread at:
    #0 pthread_create <null> (libtsan.so.0+0x5ea99)
    #1 tpool_create /home/****/linux2021/W4/thread_pool_BBP/thread_pi.c:305 (cancel+0x3699)
    #2 main /home/****/linux2021/W4/thread_pool_BBP/main.c:40 (cancel+0x3c9f)

SUMMARY: ThreadSanitizer: data race (/lib/x86_64-linux-gnu/libtsan.so.0+0x35f45) in __interceptor_free
==================
canceled
find out 0 with pid 139852074968832
canceled
find out 2 with pid 139852056098560
canceled
find out 3 with pid 139852047705856
canceled
find out 1 with pid 139852066576128
canceled
find out 0 with pid 139852074968832
canceled
find out 2 with pid 139852056098560
canceled
find out 3 with pid 139852047705856
sum 0
ThreadSanitizer: reported 3 warnings

改進 2 - jobqueue

task 設定回原先算圓周率的函式,並且 blocking wait

  • 原先
    ​​​​// push
    ​​​​new_head->next = jobqueue->head;
    ​​​​jobqueue->head = new_head;
    
    ​​​​// pop
    ​​​​threadtask_t *tmp;
    ​​​​for (tmp = jobqueue->head; tmp->next != jobqueue->tail;
    ​​​​     tmp = tmp->next)
    ​​​​    ;
    ​​​​task = tmp->next;
    ​​​​tmp->next = NULL;
    ​​​​jobqueue->tail = tmp;
    
 Performance counter stats for './cancel' (100000 runs):

          323,0100      instructions                                                  ( +-  0.05% )

         0.0009871 +- 0.0000108 seconds time elapsed  ( +-  1.09% )
  • 更改
    ​​​​// push
    ​​​​jobqueue->tail->next = new_head;
    ​​​​jobqueue->tail = new_head;
    
    ​​​​// pop
    ​​​​task = jobqueue->head;
    ​​​​jobqueue->head = task->next;
    
 Performance counter stats for './cancel' (100000 runs):

          309,2876      instructions                                                  ( +-  0.02% )

         0.0009453 +- 0.0000110 seconds time elapsed  ( +-  1.16% )

可以看出其實時間差距不會很大,但在 instructions 上卻有 20,0000 左右的明顯差距。

不能只看時間差,還要觀察 CPU 使用率和特定指令的行為

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
jserv

  • 在這之後做了較詳細的分析。改進過後的 jobqueue 雖然 cache-misses 較高,但在 instructions , mem-stores 和 cycles ( cpu-cycles ) 上明顯較低於原先版本。
 Performance counter stats for './origin_queue --per-thread' (100000 runs):

              7429      cache-misses              #    3.491 % of all cache refs      ( +-  0.22% )  (90.55%)
           21,2832      cache-references                                              ( +-  0.02% )  (100.00%)
          219,8701      instructions              #    0.72  insn per cycle           ( +-  0.05% )  (100.00%)
          306,0106      cycles                                                        ( +-  0.04% )
                 0      mem-loads                                                     ( +-  4.56% )
            5,5124      mem-stores                                                    ( +-  0.74% )  (9.45%)

       0.000803264 +- 0.000000511 seconds time elapsed  ( +-  0.06% )

----------------------------------------------------------------------------------------------------
 Performance counter stats for './alter_queue --per-thread' (100000 runs):

              7255      cache-misses              #    3.552 % of all cache refs      ( +-  0.23% )  (91.13%)
           20,4268      cache-references                                              ( +-  0.02% )  (100.00%)
          209,5561      instructions              #    0.72  insn per cycle           ( +-  0.04% )
          289,3691      cycles                                                        ( +-  0.04% )
                 0      mem-loads                                                     ( +- 13.13% )
            5,2020      mem-stores                                                    ( +-  0.75% )  (8.87%)

       0.000781026 +- 0.000000552 seconds time elapsed  ( +-  0.07% )

關於 queue 的效率,我找到一篇論文剛好有研究到 cache friendly 和 lock-free :
A lock-free cache-friendly software queue buffer for decoupled software pipelining

// TODO 仔細研讀


2 atomic_threadpool

要從 github 下載來測試,需要先處理好 lfqueue 才可正常執行。

初步分析

  • 以 atomic_threadpool 裡的 example_threadpool.c 的程式碼為檢測目標。
    • thread pool - 8 threads
    • tasks 200
    ​​​​#define TASK_SIZE 200
    ​​​​void t1(void *arg) {
    ​​​​    printf("t1 is running on thread \n");
    ​​​​}
    ​​​​void t2(void *arg) {
    ​​​​    printf("t2 is running on thread \n");
    ​​​​}
    ​​​​void t3(void *arg) {
    ​​​​    printf("t3 is running on thread \n");
    ​​​​}
    ​​​​void t4(void *arg) {
    ​​​​    printf("t4 is running on thread \n");
    ​​​​}
    
    ​​​​int main(void) {
    ​​​​    int nthreads = 8;//sysconf(_SC_NPROCESSORS_ONLN); // Linux
    ​​​​    printf("Share thread pool with %d threads with at lease totalthroughput * nthreads task size\n", nthreads);
    ​​​​    at_thpool_t *thpool = at_thpool_create(nthreads);
    
    ​​​​    printf("assigned %d tasks between %d threads\n", TASK_SIZE, nthreads);
    ​​​​    int i;
    ​​​​    for (i = 0; i < TASK_SIZE; i++) {
    ​​​​        at_thpool_newtask(thpool, t1, NULL);
    ​​​​        at_thpool_newtask(thpool, t2, NULL);
    ​​​​        at_thpool_newtask(thpool, t3, NULL);
    ​​​​        at_thpool_newtask(thpool, t4, NULL);
    ​​​​    }
    ​​​​    sleep(1);
    ​​​​    puts("shutdown thread pool");
    ​​​​    at_thpool_gracefully_shutdown(thpool);
    ​​​​    return 0;
    ​​​​}
    
    ​​​​void t1(void *arg) {
    ​​​​    printf("t1 is running on thread \n");
    ​​​​}
    ​​​​void t2(void *arg) {
    ​​​​    printf("t2 is running on thread \n");
    ​​​​}
    ​​​​void t3(void *arg) {
    ​​​​    printf("t3 is running on thread \n");
    ​​​​}
    ​​​​void t4(void *arg) {
    ​​​​    printf("t4 is running on thread \n");
    ​​​​}
    
    ​​​​#define task_n 200
    ​​​​int main()
    ​​​​{
    ​​​​    tpool_t pool = tpool_create(8);
    ​​​​    for (int i = 0;i < task_n;i++) {
    ​​​​        tpool_apply(pool, (void* )t1, NULL);
    ​​​​        tpool_apply(pool, (void* )t2, NULL);
    ​​​​        tpool_apply(pool, (void* )t3, NULL);
    ​​​​        tpool_apply(pool, (void* )t4, NULL);
    ​​​​    }
    ​​​​    sleep(1);
    ​​​​    tpool_join(pool);
    ​​​​    return 0;
    ​​​​}
    

時間以及 CPU 使用率

  • 可以看到 tpool 的時間以及 CPU 使用率都低於 atomic_threadpool 。
 Performance counter stats for './test' (10 runs):

            5,5095      cache-misses              #    4.697 % of all cache refs      ( +-  2.37% )
          117,2866      cache-references                                              ( +-  2.18% )
         1120,1461      instructions              #    0.64  insn per cycle           ( +-  1.67% )
         1745,6858      cycles                                                        ( +-  1.59% )

          1.004192 +- 0.000217 seconds time elapsed  ( +-  0.02% )

---------------------------------------------------------------------------------------------------
 Performance counter stats for './example_threadpool' (10 runs):

            5,3871      cache-misses              #    0.556 % of all cache refs      ( +-  1.74% )
          969,3766      cache-references                                              ( +-  1.60% )
         5621,3787      instructions              #    0.41  insn per cycle           ( +-  1.94% )
       1,3715,0591      cycles                                                        ( +-  1.08% )

          1.104209 +- 0.000209 seconds time elapsed  ( +-  0.02% )
  • 這可以在 atomic_threadpool 裡執行緒拿取 task 的 at_thpool_worker 看出。
    • at_thpool_worker 對於等待新的 task 進來的處理是以用每 1 ms sleep 的方式。
    • 而 tpool 則是利用 pthread_cond_wait 以及 pthread_cond_broadcast 方式。
    ​​​​// at_thpool_worker function
    ​​​​while (tp->is_running) {
    ​​​​    if ( (_task = lfqueue_deq(tq)) ) {
    ​​​​        goto HANDLE_TASK;
    ​​​​    }
    ​​​​    lfqueue_sleep(1);
    ​​​​}
    
    ​​​​lfqueue_sleep(unsigned int milisec) {
    ​​​​#if defined __GNUC__ || defined __CYGWIN__ || defined __MINGW32__ || defined __APPLE__
    ​​​​#pragma GCC diagnostic push
    ​​​​#pragma GCC diagnostic ignored "-Wimplicit-function-declaration"
    ​​​​    usleep(milisec * 1000);
    ​​​​#pragma GCC diagnostic pop
    ​​​​#else
    ​​​​    Sleep(milisec);
    ​​​​#endif
    ​​​​}
    

記憶體使用量

  • 而在記憶體使用量 tpool 的最高峰卻可達 115.8 KB ,而 atomic_threadpool 只有 40.8 KB 。
    • 關於 tpool 記憶體如此之高,是因為除了 queue 以及 thread 所使用到的 threadtask_t ,在每次分配任務時也同時會分配 __tpool_future 物件讓 thread 回傳 task 的結果。
    • tpool
    • atomic_threadpool
    • tpool 回傳的 future 作適當處理後,記憶體使用量的高峰值降到 6.0 KB 。
    • 而 CPU 使用率也因此上升了一定的倍數。
      ​​​​​​​​ Performance counter stats for './improve_future' (10 runs):
      
      ​​​​​​​​            6,7391      cache-misses              #    1.605 % of all cache refs      ( +-  3.93% )
      ​​​​​​​​          419,9732      cache-references                                              ( +-  2.48% )
      ​​​​​​​​         4056,9105      instructions              #    0.61  insn per cycle           ( +-  0.59% )
      ​​​​​​​​         6605,1353      cycles                                                        ( +-  1.19% )
      
      ​​​​​​​​           1.03471 +- 0.00433 seconds time elapsed  ( +-  0.42% )
      

atomic

thread

  • marco
    ​​​​#define AT_THPOOL_INC(v) __sync_fetch_and_add(v, 1)
    ​​​​#define AT_THPOOL_DEC(v) __sync_fetch_and_add(v, -1)
    ​​​​#define AT_THPOOL_SHEDYIELD sched_yield
    
  • at_thpool_worker
    • 此函式為新建 thread 時執行的函式。
    • 從 queue 拿 task 並且執行
    ​​​​#if defined _WIN32 || defined _WIN64
    ​​​​unsigned __stdcall at_thpool_worker(void *_tp) {
    ​​​​#else
    ​​​​void* at_thpool_worker(void *_tp) {
    ​​​​#endif
    ​​​​    at_thpool_t *tp = (at_thpool_t*)_tp;
    ​​​​    AT_THPOOL_INC(&tp->nrunning);
    
    ​​​​    at_thtask_t *task;
    ​​​​    void *_task;
    ​​​​    lfqueue_t *tq = &tp->taskqueue;
    
    ​​​​TASK_PENDING:
    ​​​​    while (tp->is_running) {
    ​​​​        if ( (_task = lfqueue_deq(tq)) ) {
    ​​​​            goto HANDLE_TASK;
    ​​​​        }
    ​​​​        lfqueue_sleep(1);
    ​​​​    }
    
    ​​​​    AT_THPOOL_DEC(&tp->nrunning);
    ​​​​#if defined _WIN32 || defined _WIN64
    ​​​​    return 0;
    ​​​​#else
    ​​​​    return NULL;
    ​​​​#endif
    ​​​​HANDLE_TASK:
    ​​​​    task = (at_thtask_t*) _task;
    ​​​​    task->do_work(task->args);
    ​​​​    AT_THPOOL_FREE(task);
    ​​​​    goto TASK_PENDING;
    ​​​​}
    

queue

  • lfqueue_deq
    • queue pop
      ​​​​​​​​void*
      ​​​​​​​​lfqueue_deq(lfqueue_t *lfqueue) {
      ​​​​​​​​    void *v;
      ​​​​​​​​    if (//__LFQ_ADD_AND_FETCH(&lfqueue->size, 0) &&
      ​​​​​​​​        (v = _dequeue(lfqueue))
      ​​​​​​​​    ) {
      
      ​​​​​​​​        __LFQ_FETCH_AND_ADD(&lfqueue->size, -1);
      ​​​​​​​​        return v;
      ​​​​​​​​    }
      ​​​​​​​​    return NULL;
      ​​​​​​​​}
      
  • _dequeue
    • 利用 atomic 操作 CAS 代替 lock 執行取出 task 。
    1. 確保變數 head 為當前 queue 的 head 。
      ​​​​​​​​if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->head, head, head)) {
      ​​​​​​​​    next = head->next;
      ​​​​​​​​    ...
      ​​​​​​​​}
      
    
    
    
    
    
    
    main
    
    
    cluster_next
    
    next
    
    
    cluster_pop
    
    CAS(&lfqueue->head, head, head)
    
    
    
    inter
    
    ...
    
    
    
    node_n
    
    node_n
    
    
    
    inter->node_n
    
    
    
    
    
    head
    head
    
    
    
    node_1
    
    node_1
    
    
    
    head->node_1
    
    
    
    
    
    tail
    tail
    
    
    
    tail->node_n
    
    
    
    
    
    _next
    
    _next
    
    
    
    node_1->_next
    
    
    
    
    
    _next->inter
    
    
    
    
    
    
    1. 之後確認 queue 是否為 empty 。
    2. pop task 後,替換 head 的操作是 atomic 。
      ​​​​​​​​if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->tail, head, head)) {
      ​​​​​​​​    if (next == NULL) {
      ​​​​​​​​        val = NULL;
      ​​​​​​​​        goto _done;
      ​​​​​​​​    }
      ​​​​​​​​}
      ​​​​​​​​else {
      ​​​​​​​​    // value is null or not
      ​​​​​​​​    if (next) {
      ​​​​​​​​        val = next->value;
      ​​​​​​​​        if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->head, head, next)) {
      ​​​​​​​​            break;
      ​​​​​​​​        }
      ​​​​​​​​    } else {
      ​​​​​​​​        val = NULL;
      ​​​​​​​​        goto _done;
      ​​​​​​​​    }
      ​​​​​​​​}
      
    
    
    
    
    
    
    main
    
    
    cluster_next
    
    next
    
    
    cluster_empty
    
    CAS(&lfqueue->tail, head, head)
    
    
    
    inter
    
    ...
    
    
    
    node_n
    
    node_n
    
    
    
    inter->node_n
    
    
    
    
    
    NULL
    
    next == NULL
    
    
    
    
    head
    head
    
    
    
    node_1
    
    node_1
    
    
    
    head->node_1
    
    
    
    
    
    tail
    tail
    
    
    
    _next
    
    _next
    
    
    
    node_1->_next
    
    
    
    
    
    _next->inter
    
    
    
    
    
    _next->NULL
    
    
    
    
    
    val
    
    next != NULL
    
    
    
    _next->val
    
    
    
    
    
    node_n->tail
    
    
    
    
    
    
    
    
    
    
    
    
    main
    
    
    cluster_pop_suc
    
    CAS(&lfqueue->head, head, next)
    
    
    
    head
    head
    
    
    
    temp
    assign
    _next to
    head
    
    
    
    head->temp
    
    
    if lfqueue->head
    is node_1 (head)
    
    
    
    node_1
    
    node_1
    
    
    
    head->node_1
    
    
    
    
    
    _next
    
    _next
    
    
    
    temp->_next
    
    
    head'    
    
    
    
    val
    
    next != NULL
    
    
    
    val->head
    
    
    
    
    
    node_1->_next
    
    
    
    
    
    
    • 完整函式
    ​​​​#define __LFQ_BOOL_COMPARE_AND_SWAP __sync_bool_compare_and_swap
    ​​​​_dequeue(lfqueue_t *lfqueue) {
    ​​​​    lfqueue_cas_node_t *head, *next;
    ​​​​    void *val;
    
    ​​​​    for (;;) {
    ​​​​        head = lfqueue->head;
    ​​​​        if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->head, head, head)) {
    ​​​​            next = head->next;
    ​​​​            if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->tail, head, head)) {
    ​​​​                if (next == NULL) {
    ​​​​                    val = NULL;
    ​​​​                    goto _done;
    ​​​​                }
    ​​​​            }
    ​​​​            else {
    ​​​​                if (next) {
    ​​​​                    val = next->value;
    ​​​​                    if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->head, head, next)) {
    ​​​​                        break;
    ​​​​                    }
    ​​​​                } else {
    ​​​​                    val = NULL;
    ​​​​                    goto _done;
    ​​​​                }
    ​​​​            }
    ​​​​        }
    ​​​​    }
    
    ​​​​    __lfq_recycle_free(lfqueue, head);
    ​​​​_done:
    ​​​​    // __asm volatile("" ::: "memory");
    ​​​​    __LFQ_SYNC_MEMORY();
    ​​​​    __lfq_check_free(lfqueue);
    ​​​​    return val;
    ​​​​}
    
  • 而關於 push 、create 等操作,因為與我們認知的並無差別,在此不多贅述。

queue 資源釋放

  • queue 結構
    • 最初建立時,headtail 會指向一個節點 baseroot_freemove_free 也會指向一個節點 free_base
    • 再做資源釋放時,會以當下 pop 出來呼叫 __lfq_recycle_free 來判斷。
typedef struct {
	lfqueue_cas_node_t *head, *tail, *root_free, *move_free;
	volatile size_t size;
	volatile lfq_bool_t in_free_mode;
	lfqueue_malloc_fn _malloc;
	lfqueue_free_fn _free;
	void *pl;
} lfqueue_t;

struct lfqueue_cas_node_s {
	void * value;
	struct lfqueue_cas_node_s *next, *nextfree;
	lfq_time_t _deactivate_tm;
};
  • __lfq_recycle_free
    • 此函式是把裝著 pop 出來 value 的節點釋放掉之前的前置動作。
    • 因為在多執行緒下,如果直接把節點的資源釋放掉可能會在之後的操作造成錯誤。
    • 因此此函式會先把要是放的節點放置到 move_free 的鍊結串列。
      • 在最一開始 root_freemove_free 是指向同一物件,因此在之後操作時,兩個指標都會在同一個鍊結串列上。
    • 在一開始,因 while 迴圈需要釋放的節點 freenode 一定會放入至鍊結串列裡。
    • 而最後則是保證 move_free 一定會指向最後連結串列的最後一個節點。
    ​​​​static void
    ​​​​__lfq_recycle_free(lfqueue_t *lfqueue, lfqueue_cas_node_t* freenode) {
    ​​​​    lfqueue_cas_node_t *freed;
    ​​​​    do {
    ​​​​        freed = lfqueue->move_free;
    ​​​​    } while (!__LFQ_BOOL_COMPARE_AND_SWAP(&freed->nextfree, NULL, freenode) );
    
    ​​​​    lfq_get_curr_time(&freenode->_deactivate_tm);
    
    ​​​​    __LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->move_free, freed, freenode);
    ​​​​}
    






main


cluster_move

CAS(&freed->nextfree, NULL, freenode)


cluster_freed

lfqueue->root_free
lfqueue->move_free
freed



freebase

free base



node_1

free node 1



freebase->node_1


nextfree









main


cluster_cas

CAS(&lfqueue->move_free, freed, freenode)


cluster_freed_l

freed"


cluster_freed

lfqueue->root_free
lfqueue->move_free
freed



freebase

free base



node_1

free node 1



freebase->node_1


compare
 success



temp

free base



freebase->temp





  • __lfq_check_free
    • 此函式會從上述鍊結串列中 root_free 指標指向的節點開始釋放,直到遇到 movefree 所指向的節點,或是用於釋放的 rtfree 指標指向 NULL 。
    • 可以看到,此函式在運用 atomic 操作重現 lock 操作。
static void
__lfq_check_free(lfqueue_t *lfqueue) {
	lfq_time_t curr_time;
	if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->in_free_mode, 0, 1)) {
		lfq_get_curr_time(&curr_time);
		lfqueue_cas_node_t *rtfree = lfqueue->root_free, *nextfree;
		// rtfree not NULL and move_free
		while ( rtfree && (rtfree != lfqueue->move_free) ) {
			nextfree = rtfree->nextfree;
			if ( lfq_diff_time(curr_time, rtfree->_deactivate_tm) > 2) {
				//	printf("%p\n", rtfree);
				// FREE
				lfqueue->_free(lfqueue->pl, rtfree);
				// GO NEXT
				rtfree = nextfree;
			} else {
				break;
			}
		}
		lfqueue->root_free = rtfree;
		__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->in_free_mode, 1, 0);
	}
	__LFQ_SYNC_MEMORY();
}






main

free mode
loop
if rtfree != NULL && rtfree != lfqueue->move_free

cluster_free

free ( functoin )


cluster_rtfree

rtfree


cluster_move

lfqueue->move_free



freebase

lfqueue->root_free
(free base)



node_1

free node 1



freebase->node_1


nextfree



node_1->freebase


assign to
rbfree



總結

儘管 lock-free 在一般認知上可能會比有 lock 的效率還要好。
但為了避免使用 lock 進而完全採用 atomic 操作,有可能為了符合要求在機制上作改變,使得某些操作的效能反而降低了。
譬如,沒辦法完全釋放掉已經不需要的記憶體、利用迴圈以及諸多 if 和 CAS 來確保資料的正確性等。


3 C11 Atomics 改進, scalability 提升

GitHub - linux2021_homework_quiz4_threadpool_pi

queue

我參考了 atomic_threadpool 裡 lfqueue 的物件操作以及資源管理。
原先 queue 的結構改成 head 會指向一個已被提取過的節點來當作 base
queue head 以及 tail 的初始化因此改成指向一個預先配置的無用 task 來當作 base
這樣作的目的是在於保證多執行緒下,以 atomic 進行多次 push 和 pop 操作後,head 一定會在 linked list 的頭端,反之亦然。

typedef struct __jobqueue {
    _Atomic(threadtask_t *) head;
    _Atomic(threadtask_t *) tail;
    _Atomic(threadtask_t *) free_list_head;
    _Atomic(threadtask_t *) free_list_tail;
    atomic_flag free_mode;
    atomic_flag head_stop;
    atomic_bool tail_stop;
    atomic_int free_list_num;
    atomic_int free_list_adding;
} jobqueue_t;

static jobqueue_t *jobqueue_create(void)
{
    jobqueue_t *jobqueue = malloc(sizeof(jobqueue_t));
    if (jobqueue) {
        threadtask_t *dummy = malloc(sizeof(threadtask_t));
        dummy->future = NULL;
        dummy->next = NULL;
        jobqueue->head = ATOMIC_VAR_INIT(dummy);
        jobqueue->tail = ATOMIC_VAR_INIT(dummy);

        dummy = malloc(sizeof(threadtask_t));
        dummy->future = NULL;
        dummy->next = NULL;
        jobqueue->free_list_head = ATOMIC_VAR_INIT(dummy);
        jobqueue->free_list_tail = ATOMIC_VAR_INIT(dummy);

        atomic_init(&jobqueue->free_list_num, 0);
        atomic_init(&jobqueue->free_list_adding, 0);

        atomic_flag_clear(&jobqueue->free_mode);
        atomic_flag_clear(&jobqueue->head_stop);
        atomic_init(&jobqueue->tail_stop, false);
    }
    return jobqueue;
}
  • 在 push 、 pop 時,利用 _Atomic(threadtask_t *) 限制每個執行緒對 queue 頭尾兩端的操作。
    • push task 至 queue ( linked list ) 的尾端,並且嘗試讓 tail 指標指向自己新增的 task 。
      • 後者每個執行緒不一定成功,因為若在同一時間有複數執行緒新增 task ,一定只有其中一個是 linked list 的尾端。
    ​​​​struct __tpool_future *tpool_apply(struct __threadpool *pool,
    ​​​​                                   void *(*func)(void *),
    ​​​​                                   void *arg)
    ​​​​{
    ​​​​    jobqueue_t *jobqueue = pool->jobqueue;
    ​​​​    threadtask_t *new_tail = malloc(sizeof(threadtask_t));
    ​​​​    struct __tpool_future *future = tpool_future_create();
    ​​​​    if (new_tail && future) {
    ​​​​        new_tail->func = func, new_tail->arg = arg, new_tail->future = future;
    ​​​​        new_tail->next = NULL;
    
    ​​​​       /**
    ​​​​        * atomic push
    ​​​​        */
    ​​​​       while (1) {
    ​​​​            threadtask_t *old_tail = atomic_load(&jobqueue->tail);
    ​​​​            if (old_tail != NULL) {
    ​​​​                threadtask_t *dummy = NULL;
    ​​​​                if (atomic_compare_exchange_strong(&old_tail->next, &dummy, new_tail)) {
    ​​​​                    atomic_compare_exchange_strong(&jobqueue->tail, &old_tail, new_tail);
    ​​​​                    break;
    ​​​​                }    
    ​​​​            }
    ​​​​       }
    
    ​​​​    } else if (new_tail) {
    ​​​​        free(new_tail);
    ​​​​        return NULL;
    ​​​​    } else if (future) {
    ​​​​        tpool_future_destroy(future);
    ​​​​        return NULL;
    ​​​​    }
    ​​​​    return future;
    ​​​​}
    
  • 至於 pop 操作,則是在提取 base 後,以 base->next 作為 task 。
    • 可以在下方程式碼看出,對於此次拿取的 task 會被放置到 base 亦即 queue head 端。
      • 至於, 原先的 base 則會經由 jobqueue_free_list_through_and_clean push 到 free_list 上。
    ​​​​static void *jobqueue_fetch(void *queue){
    ​​​​    ...
    ​​​​    threadtask_t *base = NULL;
    ​​​​    while (1) {
    ​​​​        base = atomic_load(&jobqueue->head);
    ​​​​        if (atomic_compare_exchange_strong(&jobqueue->head, &base, base)) {
    ​​​​            task = base->next;
    ​​​​            if (task != NULL) {
    ​​​​                if (atomic_compare_exchange_strong(&jobqueue->head, &base, task)) {
    ​​​​                    jobqueue_free_list_through_and_clean(jobqueue, base);
    ​​​​                    break;                  
    ​​​​                }                      
    ​​​​            }
    ​​​​        }
    ​​​​    }
    

trash task

對於被放到 free_list 上的 task ,有可能把它放到其中與擁有它的是不同執行緒,因此也有可能正好還在執行,所以不能夠直接釋放它。
對於這方面的處理,我設計了兩個函式分別是 jobqueue_free_list_through_and_clean 以及 jobqueue_free_list_clean

  • jobqueue_free_list_through_and_clean 主要是處理 queue task 被從 base 上換下來,並且放入至 free_list 這個 linked list 上。
  • jobqueue_free_list_clean 則是真正處理資源釋放的函式。它會在釋放前對 task 的狀態作一些判斷來確保 task 是完成的狀態。
    • free_list 的節點超出 water_mark 時,會進入 free_mode 的 critical section ,以確保每次進行釋放時只有一個執行緒會操作。
    • 進入 free_mode 後,不會馬上進行釋放。但會先利用 tail_stopfree_list 上鎖,以防止有新的 task 干擾釋放資源的操作。
    • 之後利用 free_list_adding 的狀態來判斷還有多少 task 沒完成。
    • task ( base ) 被放入 free_list 時會增加 free_list_adding ,而當 task 執行結束後呼叫 jobqueue_free_list_clean 確認 water_mark 甚至是進入 free_mode 前會減少 free_list_adding
      • 因為 free_list_adding 的操作是替換並放入增加,執行結束減少,因此可以確定當其數值為零的時候, free_list 的任務一定都是做完的。
    ​​​​#define water_mark 50
    ​​​​static void jobqueue_free_list_through_and_clean(jobqueue_t *jobqueue, threadtask_t *new_tail) {
    ​​​​        // blocked from free_mode
    ​​​​        while (atomic_load(&jobqueue->tail_stop));
    ​​​​        atomic_fetch_add(&jobqueue->free_list_adding, 1);
    ​​​​        while (1) {
    ​​​​            threadtask_t *old_tail = atomic_load(&jobqueue->free_list_tail);
    ​​​​            if (old_tail != NULL) {
    ​​​​                threadtask_t *dummy = NULL;
    ​​​​                new_tail->next = NULL;
    ​​​​                if (atomic_compare_exchange_strong(&old_tail->next, &dummy, new_tail)) {
    ​​​​                    atomic_compare_exchange_strong(&jobqueue->free_list_tail, &old_tail, new_tail);
    ​​​​                    return;
    ​​​​                }    
    ​​​​            }
    ​​​​       }
    ​​​​}
    
    ​​​​static void jobqueue_free_list_clean(jobqueue_t *jobqueue) {
    ​​​​    atomic_fetch_add(&jobqueue->free_list_num, 1);
    ​​​​    atomic_fetch_sub(&jobqueue->free_list_adding, 1);
    
    ​​​​    if (atomic_load(&jobqueue->free_list_num) > water_mark) {
    
    ​​​​        while (atomic_flag_test_and_set(&jobqueue->free_mode) == 1);
    
    ​​​​        // already free
    ​​​​        if (atomic_load(&jobqueue->free_list_num) <= water_mark) {
    ​​​​            atomic_flag_clear(&jobqueue->free_mode);
    ​​​​            return;
    ​​​​        }
    
    ​​​​        // block add into free list
    ​​​​        atomic_store(&jobqueue->tail_stop, true);
    
    ​​​​        // waiting all work done
    ​​​​        while (atomic_load(&jobqueue->free_list_adding) > 0);
    
    ​​​​        threadtask_t *ptr = atomic_load(&jobqueue->free_list_head);
    ​​​​        for (;ptr->next;ptr = ptr->next) {
    ​​​​            free(ptr);
    ​​​​        }
    
    ​​​​        jobqueue->free_list_head = ATOMIC_VAR_INIT(ptr);
    ​​​​        jobqueue->free_list_tail = ATOMIC_VAR_INIT(ptr);
    
    ​​​​        atomic_store(&jobqueue->free_list_num, 0);
    ​​​​        atomic_store(&jobqueue->tail_stop, false);
    ​​​​        atomic_flag_clear(&jobqueue->free_mode);
    ​​​​    }
    
static void *jobqueue_fetch(void *queue) {
    ...
    while (1) {
        base = atomic_load(&jobqueue->head);
        if (atomic_compare_exchange_strong(&jobqueue->head, &base, base)) {
            task = base->next;
            if (task != NULL) {
                if (atomic_compare_exchange_strong(&jobqueue->head, &base, task)) {
                    jobqueue_free_list_through_and_clean(jobqueue, base);
                    break;                  
                }                      
            }
        }
    }
    ...
    // task finish
    pthread_mutex_lock(&task->future->mutex);
    task->future->flag |= __FUTURE_FINISHED; 
    task->future->result = ret_value;
    pthread_cond_broadcast(&task->future->cond_finished);
    pthread_mutex_unlock(&task->future->mutex);

    pthread_cleanup_pop(0);
    jobqueue_free_list_clean(jobqueue);
    ...
}

分析

一樣以最一開始的 bpp 作為檢測目標。

void test1 (void) {
    int bpp_args[PRECISION + 1];
    double bpp_sum = 0;
    // create the thread and each thread loop for fetch work. (empty then wait)
    tpool_t pool = tpool_create(4);
    tpool_future_t futures[PRECISION + 1];


    // put task in the thread
    for (int i = 0; i <= PRECISION; i++) {
        bpp_args[i] = i;
        futures[i] = tpool_apply(pool, bpp, (void *) &bpp_args[i]);
    }

    // get result
    for (int i = 0; i <= PRECISION; i++) {
        double *result = tpool_future_get(pool, futures[i], 0 /* blocking wait */);
        bpp_sum += *result;
        tpool_future_destroy(futures[i]);
        free(result);
    }

    // printf("thread done\n");
    tpool_join(pool);
    printf("PI calculated with %d terms: %.15f\n", PRECISION + 1, bpp_sum);  
}

記憶體使用量

  • normal
  • atomic

可以看到兩者的最高記憶體使用量不會相差太多,分別是 15.4 KB 和 15.8 KB 。
但從圖表上可以輕易的看出,因為 atomic 版本在對資源的釋放方式,導致在執行時期都會有一定程度的沒有再使用資源殘留。
當然這取決於 task 的執行時間、 water_mark 的值為多少。
如果 water_mark 設太小會對影響放入 free_list 的操作效率,太大則會有過多的無用資源殘留。

CPU 使用率

 Performance counter stats for './bpp' (10000 runs):

              9399      cache-misses              #    4.627 % of all cache refs      ( +-  0.57% )
           20,3140      cache-references                                              ( +-  0.05% )
          207,7181      instructions              #    0.71  insn per cycle           ( +-  0.09% )
          293,4863      cycles                                                        ( +-  0.10% )

        0.00075883 +- 0.00000183 seconds time elapsed  ( +-  0.24% )

---------------------------------------------------------------------------------
 Performance counter stats for './atomic' (10000 runs):

              8314      cache-misses              #    3.823 % of all cache refs      ( +-  0.55% )
           21,7458      cache-references                                              ( +-  0.06% )
          206,0215      instructions              #    0.64  insn per cycle           ( +-  0.09% )
          322,2468      cycles                                                        ( +-  0.13% )

         0.0008462 +- 0.0000226 seconds time elapsed  ( +-  2.67% )

在經過更改 jobqueue 以及 trash task 的操作後,在 cache-missesinstructions 以及使用時間都略優於原版。
可得知,確實有因為 atomic 讓各個執行緒的操作靈活度變大,使得 scalability 提升。
雖然也因為沒有了 mutexcond_wait 系統層級對於排程上的處理,在實做 critical section 或是取值都需要進行 busy wait 或類似的操作使得 cycles 提高。
但整體而言,效率上還是有所提升的。

bug 修正以及執行緒數量分析

bug

關於上述,後來在嘗試對執行緒數量進行效能分析時發現 memory leak 等記憶體問題。
原因有幾項:

  • base 也就是之前的 task 可能已經被釋放掉,進而導致現在的 task = base->next; 產生錯誤。
    • 解決方案目前只能在 pop task 利用 critical section 避免。
  • int tpool_join(struct __threadpool *pool)tpool_apply(pool, NULL, NULL); 是使執行緒結束的函式,但仍然會 allocate memory ,這在上述會導致 memory leak 情況。並且還有其他相關記憶體 bug ,在此不再贅述。
    • 解決方案:
      ​​​​​​​​    ...
      ​​​​​​​​    struct __tpool_future *future = NULL;
      ​​​​​​​​    int det = 1;
      ​​​​​​​​    if (func) {
      ​​​​​​​​        future = tpool_future_create();
      ​​​​​​​​        if (!future) det = 0;
      ​​​​​​​​    }
      
      ​​​​​​​​    if (new_tail && det) {
      ​​​​​​​​    ...
      
    • 結果:

分析

在試過 1 到 100 個執行緒測試 bpp 函式後,發現並不是越多執行緒效率就會提昇。
下圖可以看到,在過了約莫 5 執行緒數量後,atomic 版本的花費時間快速上升,約略到 20 個執行緒後才緩和。

而且可以得知, atomic 版本在執行緒 20 以下的時間花費是低於 normal 版本的。但在這之上卻是相似甚至在 70 以上是高於 normal 版本。

這主要的差異差別,在於有多少執行緒競爭 pop task 的執行權,並且也因為 free_list 釋放記憶體的機制也與前者有關(執行緒 A 進入 free_mode 進行釋放前,還須等待已進入 list_adding 領域的執行緒完成任務)。

void benchmark(int thread_pool_size) {
  FILE *ptr = NULL;
  ptr = fopen("bpp_benckmark_normal.txt", "w");
  if (!ptr)
    return;
  struct timespec time_start;
  struct timespec time_end;
  double during;
  int time_i = 0;
  printf("start testing\n");
  for (time_i = 1; time_i < thread_pool_size; time_i++) {
    clock_gettime(CLOCK_MONOTONIC, &time_start);
    __benchmark(time_i);
    clock_gettime(CLOCK_MONOTONIC, &time_end);
    during = time_diff(time_start, time_end);
    fprintf(ptr, "%d %f\n", time_i, during);
    printf("%d finished\n", time_i);
  }
  fclose(ptr);
}

void __benchmark(int thread_pool_size) {
  int bpp_args[PRECISION + 1];
  double bpp_sum = 0;
  // create the thread and each thread loop for fetch work. (empty then wait)
  tpool_t pool = tpool_create(thread_pool_size);
  tpool_future_t futures[PRECISION + 1];

  // put task in the thread
  for (int i = 0; i <= PRECISION; i++) {
    bpp_args[i] = i;
    futures[i] = tpool_apply(pool, bpp, (void *)&bpp_args[i]);
  }

  // get result
  for (int i = 0; i <= PRECISION; i++) {
    double *result = tpool_future_get(pool, futures[i], 0 /* blocking wait */);
    bpp_sum += *result;
    tpool_future_destroy(futures[i]);
    free(result);
  }

  // printf("thread done\n");
  tpool_join(pool);
  // printf("PI calculated with %d terms: %.15f\n", PRECISION + 1, bpp_sum);
}

之後,也各自對每個數量的執行緒進行分析與比較(忘記更改 X 軸標注,X軸為次數):





#define time_diff(start, end)                                                  \
  (end.tv_nsec - start.tv_nsec < 0                                             \
       ? (1000000000 + end.tv_nsec - start.tv_nsec)                            \
       : (end.tv_nsec - start.tv_nsec)) 
#define time_check(_FUNC_)                                                     \
  do {                                                                         \
  struct timespec time_start;                                                  \
    struct timespec time_end;                                                  \
    double during;                                                             \
    int time_i = 0;                                                            \
    clock_gettime(CLOCK_MONOTONIC, &time_start);                               \
    _FUNC_;                                                                    \
    clock_gettime(CLOCK_MONOTONIC, &time_end);                                 \
    during = time_diff(time_start, time_end);                                  \
    printf("%f  ", during);                                                    \
  } while (0)
  
  
time_check(__benchmark(16));

比較讓人難以理解的是,為何執行緒數量個別比較在數量越高的時候,atomic 以及 noraml 的效能高低是顛倒於整體數量測試的。

最終分析

後來發現是檢測方式的落差而導致,若以下列更改過後的函式測量個別數值則會符合執行結果:

在此也可看到當執行緒數量來到 100 的時候, atomic 與 nomral 版本差異不會很大。
但在 4 到 16 區間,atomic 的時間成本較低。



void benchmark_split(int thread_pool_size, int times) {
  FILE *ptr = NULL;
  ptr = fopen("bpp_benckmark_atomic.txt", "w");
  if (!ptr)
    return;
  struct timespec time_start;
  struct timespec time_end;
  double during;
  int time_i = 0;
  printf("start testing\n");
  for (time_i = 1; time_i < times + 1; time_i++) {
    clock_gettime(CLOCK_MONOTONIC, &time_start);
    __benchmark(thread_pool_size);
    clock_gettime(CLOCK_MONOTONIC, &time_end);
    during = time_diff(time_start, time_end);
    fprintf(ptr, "%d %f\n", time_i, during);
    printf("%d finished\n", time_i);
  }
  fclose(ptr);
}

原始程式碼: GitHub - linux2021_homework_quiz4_threadpool_pi

測試:
git clone https://github.com/linD026/linux2021_homework_quiz4_threadpool_pi.git
到 main.c 裡的 main 函式更改執行函式分別是

  • 執行緒數量各別測試,輸出格式為 terminal: cost_time current_time
    • time_check(__benchmark(thread_num));
  • 執行緒數量整體測試, 執行為 1 至 thread_num 的 thread pool 測量,輸出為 txt 檔。
    • benchmark(thread_num);
  • 更改過後的執行緒數量個別測試,與執行緒數量整體測試 的輸出格式相同。
    • benchmark_split(int thread_pool_size, int times)

之後下 make 進行編譯,make test 執行 1000 次,make one 執行一次。
也可以利用 make massif 產生記憶體使用量圖表或 make perf CPU 使用率。