Try   HackMD

2021q1 Homework4 (quiz4)

contributed by < YoLinTsai >

tags: linux2021-hw

延伸問題:

  • 解釋上述程式碼運作原理,包含 timeout 處理機制,指出改進空間並實作
  • 研讀 atomic_threadpool,指出其 atomic 操作的運用方式,並說明該 lock-free 的手法
  • 嘗試使用 C11 Atomics 改寫上述程式碼,使其有更好的 scalability

運作原理

POSIX thread

pthread_create

#include <pthread.h>

int pthread_create(pthread_t *restrict thread,
                  const pthread_attr_t *restrict attr,
                  void *(*start_routine)(void *),
                  void *restrict arg);
  • 如果成功 create, thread 所指向的 buffer 會紀錄 thread ID,attr 是開 thread 的參數,用 NULL 就是 default attributes。
  • 執行緒將會運行 start_routine() ,並將 arg 餵給 start_routine()

pthread_cancel

int pthread_cancel(pthread_t thread);

pthread_join

int pthread_join(pthread_t thread, void **retval);
  • tpool_create() 中可以發現,一發現 ptread_create 失敗,立刻會把所有 thread 取消掉,他的手法是先做 pthread_cancle 接著 pthread_join,原因在 linux man page 中的描述:

After a canceled thread has terminated, a join with that thread using pthread_join(3) obtains PTHREAD_CANCELED as the thread's exit status.
(Joining with a thread is the only way to know that cancellation has completed.)

pthread_cond_wait

int pthread_cond_timedwait(pthread_cond_t *restrict cond,
   pthread_mutex_t *restrict mutex,
   const struct timespec *restrict abstime);

int pthread_cond_wait(pthread_cond_t *restrict cond,
   pthread_mutex_t *restrict mutex);
  • 我們在進行 thread 時,有時後會希望某些條件達成後,在繼續進行,而 pthread_cond_wait 便能達成這個情形,呼叫了之後 thread 會停在該處,直到被喚醒。
  • 喚醒的方式有兩個 pthread_cond_signal()pthread_cond_broadcast(),差別如同函式名稱一樣,前者一次只會喚醒其中一個,後者全部在等待該 condition variable 的通通都會喚醒。
  • 而判斷的條件需要用 mutex 去控制,如下:

Thread A

mutex_lock(&done_lock);
is_done = true;
pthread_cond_signal(&done_cond);
mutex_unlock(&done_lock);

Thread B

mutex_lock(&done_lock);
if(!is_done){
    pthread_cond_wait(&done_cond, &done_lock);
}
mutex_unlock(&done_lock);
  • 為何要搭配 lock 使用呢? 我的理解避免判斷到一半的時候 is_done 又被更動,直到進入 pthread_cond_wait 才把 lock 放掉,一旦被喚醒後又重新把 lock 拿回來。

pthread_cleanup_push

pthread_cleanup_pop

void pthread_cleanup_push(void (*routine)(void *), void *arg);
void pthread_cleanup_pop(int execute);
  • 這兩個 function 是成雙成對出現的,pthread_cleanup_up的目的在於避免 thread 結束時沒有正常收回資源,特別是 lock。

pthread_setcancelstate

int pthread_setcancelstate(int state, int *oldstate);

顧名思義這兩個 function 是用來控制 thread 取消的行為,在 pthread_setcancelstate 中,一共有兩種 state 可以選擇:

  1. PTHREAD_CANCEL_ENABLE
  2. PTHREAD_CANCEL_DISABLE

其中 DISABLE 在 thread 接收到 cancel 的 signal 時,不會立即取消,而是會等到 state 被改回 ENABLE 時才會取消。

不過這裡的 cancel 之所以這樣設計,就是因為他的取消並不意味馬上結束,而是在 thread 正式結束前還有待處理的事項,取消動作依序包含:

  1. pop 每個 cancellation clean-up handlers 並執行。(見 pthread_cleanup_push())
  2. 呼叫 thread-specific data destructors (見 pthread_key_create())
  3. thread 結束 (見 pthread_exit())

程式架構

  • quiz4 中的 thread pool 是用 linked list 的資料結構去實現,完整的架構圖如下:

tpool_create()

  • tpool_create 創建並初始化 __threadpool__jobqueue ,同時確認是否有正常初始化。
  • pthread_create 如果 create 失敗,返回值不為0,接著連續使用 pthread_cancelpthread_join 去結束所有 thread。
struct __threadpool *tpool_create(size_t count)
{
    jobqueue_t *jobqueue = jobqueue_create();
    struct __threadpool *pool = malloc(sizeof(struct __threadpool));
    if (!jobqueue || !pool) {
        if (jobqueue)
            jobqueue_destroy(jobqueue);
        free(pool);
        return NULL;
    }

    pool->count = count, pool->jobqueue = jobqueue;
    if ((pool->workers = malloc(count * sizeof(pthread_t)))) {
        for (int i = 0; i < count; i++) {
            if (pthread_create(&pool->workers[i], NULL, jobqueue_fetch,
                               (void *) jobqueue)) {
                for (int j = 0; j < i; j++)
                    pthread_cancel(pool->workers[j]);
                for (int j = 0; j < i; j++)
                    pthread_join(pool->workers[j], NULL);
                free(pool->workers);
                jobqueue_destroy(jobqueue);
                free(pool);
                return NULL;
            }
        }
        return pool;
    }

    jobqueue_destroy(jobqueue);
    free(pool);
    return NULL;
}

jobqueue_create()

  • 初始化 jobqueue,注意 jobqueue 在這邊是用 linked list 去實作,因此把 tail 和 head 都先設定成 NULL ,同時把 jobqueue 的 attribute cond_nonemptyrwlock 初始化。(透過 pthread_cond_initpthread_mutex_init)
static jobqueue_t *jobqueue_create(void)
{
    jobqueue_t *jobqueue = malloc(sizeof(jobqueue_t));
    if (jobqueue) {
        jobqueue->head = jobqueue->tail = NULL;
        pthread_cond_init(&jobqueue->cond_nonempty, NULL);
        pthread_mutex_init(&jobqueue->rwlock, NULL);
    }
    return jobqueue;
}
  • 接著我們看 tpool_create 中一個關鍵的部分:
pthread_create(&pool->workers[i], NULL, jobqueue_fetch, (void *) jobqueue)
  • 這段的意思根據前面的資料,我們將 jobqueue_fetch(void *jobqueue) 指定給 pool->workers[i] 運行,接著來解析 jobqueue_fetch()
  • 首先來概述 jobqueue_fetch() 的運作邏輯,當我們創建完成 threadpool 和 jobqueue 後,開了複數個 thread 去運行 jobqueue_fetch(),一旦等到 jobqueue 有 threadtask 被塞到 linked list 中,便嘗試執行 threadtask 中的 function
static void *jobqueue_fetch(void *queue)
{
    jobqueue_t *jobqueue = (jobqueue_t *) queue;
    threadtask_t *task;
    int old_state;

    pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock);

    while (1) {
        pthread_mutex_lock(&jobqueue->rwlock);
        pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state);
        pthread_testcancel();

        //GGG
        while (!jobqueue->tail){
            pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock)
        }
        
        pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
        if (jobqueue->head == jobqueue->tail) {
            task = jobqueue->tail;
            jobqueue->head = jobqueue->tail = NULL;
        } else {
            threadtask_t *tmp;
            for (tmp = jobqueue->head; tmp->next != jobqueue->tail;
                 tmp = tmp->next)
                ;
            task = tmp->next;
            tmp->next = NULL;
            jobqueue->tail = tmp;
        }
        pthread_mutex_unlock(&jobqueue->rwlock);

        if (task->func) {
            pthread_mutex_lock(&task->future->mutex);
            if (task->future->flag & __FUTURE_CANCELLED) {
                pthread_mutex_unlock(&task->future->mutex);
                free(task);
                continue;
            } else {
                task->future->flag |= __FUTURE_RUNNING;
                pthread_mutex_unlock(&task->future->mutex);
            }

            void *ret_value = task->func(task->arg);
            pthread_mutex_lock(&task->future->mutex);
            if (task->future->flag & __FUTURE_DESTROYED) {
                pthread_mutex_unlock(&task->future->mutex);
                pthread_mutex_destroy(&task->future->mutex);
                pthread_cond_destroy(&task->future->cond_finished);
                free(task->future);
            } else {
                task->future->flag |= __FUTURE_FINISHED; //KKK
                task->future->result = ret_value;
                //LLL
                pthread_cond_broadcast(&task->future->cond_finished);
                
                pthread_mutex_unlock(&task->future->mutex);
            }
            free(task);
        } else {
            pthread_mutex_destroy(&task->future->mutex);
            pthread_cond_destroy(&task->future->cond_finished);
            free(task->future);
            free(task);
            break;
        }
    }

    pthread_cleanup_pop(0);
    pthread_exit(NULL);
}

tpool_apply()

  • tpool_apply 會創建 treadtask_t 並新增到 pool->jobqueue 當中,特別注意 HHH 的部分呼應了 jobqueue_fetch() 的設計,當 linked_list 中從沒有 task 到出現第一個 task 時,會呼叫 pthread_cond_broadcast(&jobqueue->cond_nonempty); 喚醒待命中的 thread 去 fetch task 來執行。
struct __tpool_future *tpool_apply(struct __threadpool *pool,
                                   void *(*func)(void *),
                                   void *arg)
{
    jobqueue_t *jobqueue = pool->jobqueue;
    threadtask_t *new_head = malloc(sizeof(threadtask_t));
    struct __tpool_future *future = tpool_future_create();
    if (new_head && future) {
        new_head->func = func, new_head->arg = arg, new_head->future = future;
        pthread_mutex_lock(&jobqueue->rwlock);
        if (jobqueue->head) {
            new_head->next = jobqueue->head;
            jobqueue->head = new_head;
        } else {
            jobqueue->head = jobqueue->tail = new_head;
            //HHH
            pthread_cond_broadcast(&jobqueue->cond_nonempty);
        }
        pthread_mutex_unlock(&jobqueue->rwlock);
    } else if (new_head) {
        free(new_head);
        return NULL;
    } else if (future) {
        tpool_future_destroy(future);
        return NULL;
    }
    return future;
}

tpool_future_get()

  • 這裡提供兩種模式,當 seconds 值為零時,便是 blocking wait,等到該 thread 執行完成才會繼續執行
  • seconds 有值,便會用 pthread_cond_timeout 來等待 cond 的 signal ,如果等待時間超過 seconds,便將 future->flag 設成 __FUTURE_TIMEOUT 並回傳 NULL
  • CLOCK_MONOTONIC 根據 man page 的解釋, monotonic 保證計時器只曾不減,適合拿來當作 timeout 的計時器

All CLOCK_MONOTONIC variants guarantee that the time returned by consecutive calls will not go backwards, but successive calls may—depending on the architecture—return identical (not-increased) time values.

void *tpool_future_get(struct __tpool_future *future, unsigned int seconds)
{
    pthread_mutex_lock(&future->mutex);
    /* turn off the timeout bit set previously */
    future->flag &= ~__FUTURE_TIMEOUT;
    while ((future->flag & __FUTURE_FINISHED) == 0) {
        if (seconds) {
            struct timespec expire_time;
            clock_gettime(CLOCK_MONOTONIC, &expire_time);
            expire_time.tv_sec += seconds;
            int status = pthread_cond_timedwait(&future->cond_finished,
                                                &future->mutex, &expire_time);
            if (status == ETIMEDOUT) {
                future->flag |= __FUTURE_TIMEOUT;
                pthread_mutex_unlock(&future->mutex);
                return NULL;
            }
        } else
            //FFF
            pthread_cond_wait(&future->cond_finished, &future->mutex);
    }

    pthread_mutex_unlock(&future->mutex);
    return future->result;
}
  • if / while 和 pthread_cond_wait 搭配的差別?

當我們用 pthread_cond_wait 阻塞 thread 時,可能有不只一個 thread 在等待該 cond 通過,用 while 來反覆監控會是一個比較合理的做法

指出改進空間並實作

誠實的面對自己,目前自己沒有發現可以改進的空間,對程式碼優化的敏感度還不足

atomic_threadpool

TODO