Try   HackMD

2021q1 Homework4 (quiz4)

contributed by < hankluo6 >

第 4 週測驗題

測驗 1

參考解答

FFF = ?

  • (a) pthread_cond_wait(&future->cond_finished, &future->mutex)

GGG = ?

  • (c) while (!jobqueue->tail) pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock)

HHH = ?

  • (b) pthread_cond_broadcast(&jobqueue->cond_nonempty)

KKK = ?

  • (b) __FUTURE_FINISHED

LLL = ?

  • (a) pthread_cond_broadcast(&task->future->cond_finished)

運作原理

POSIX Threads

  • ​​int pthread_create(pthread_t *restrict thread,
    ​​                   const pthread_attr_t *restrict attr,
    ​​                   void *(*start_routine)(void *),
    ​​                   void *restrict arg);
    

    建立新的執行緒,運行 start_routine(),並將 arg 傳入給 start_routine() 中。

  • ​​noreturn void pthread_exit(void *retval);
    

    結束正在執行的執行緒,並將要迴傳給 process 的值放置在 retval

  • ​​int pthread_join(pthread_t thread, void **retval);
    

    等在 thread 的終止,並將 retval 設置為呼叫 pthread_exit 時給予的 retval

  • ​​int pthread_cancel(pthread_t thread);
    ​​
    ​​int pthread_setcancelstate(int state, int *oldstate);
    ​​int pthread_setcanceltype(int type, int *oldtype);
    

    送出 cancellation 訊號給 thread 執行緒,而 thread 會根據 state 以及 type 來決定如何回應。透過 pthread_setcancelstate 以及 pthread_setcanceltype 設置 statetype

    • state
      • PTHREAD_CANCEL_ENABLE 預設行為,表示此執行緒為可被取消的 (cancelable)。
      • PTHREAD_CANCEL_DISABLE 表示此執行緒無法被取消,當呼叫 pthread_cancel 時,process 會被 block 住,直到此執行緒自行中止或改變狀態。
    • type
      • PTHREAD_CANCEL_DEFERRED 預測行為,當接收到 cancellation 訊號時,執行緒並不會馬上中止,而是會等待執行緒執行到 cancellation point (可見下方 cancellation point 欄位) 時才會停止。
      • PTHREAD_CANCEL_ASYNCHRONOUS 當收到 cancellation 訊號時會立刻中止。
  • ​​void pthread_cleanup_push(void (*routine)(void *), void *arg);
    ​​void pthread_cleanup_pop(int execute);
    

    設定 function handler,在執行緒結束或某些情況下 (pthread_cancel or pthread_exit or pthread_cleanup_pop(no zeror)),使用 arg 參數呼叫 routine function。

  • ​​int pthread_mutex_init(pthread_mutex_t *restrict mutex;
    ​​int pthread_mutex_destroy(pthread_mutex_t *mutex);
    ​​
    ​​int pthread_mutex_lock(pthread_mutex_t *mutex);
    ​​int pthread_mutex_unlock(pthread_mutex_t *mutex);
    

    pthread_mutex_init 以及 pthread_mutex_destroy 用來初始化與刪除互斥鎖 (mutex),pthread_mutex_lock 以及 pthread_mutex_unlock 分別對 mutex 上鎖與解鎖。

  • ​​int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);
    ​​int pthread_cond_destroy(pthread_cond_t *cond);
    ​​
    ​​int pthread_condattr_init(pthread_condattr_t *attr);
    ​​int pthread_condattr_destroy(pthread_condattr_t *attr);
    

    pthread_cond_init 以及 pthread_cond_destroy 用來初始化與刪除條件變數 (condition variable),而 pthread_condattr_initpthread_condattr_destroy 則用來設置 condition variable 的屬性 attr

  • ​​int pthread_cond_timedwait(pthread_cond_t *restrict cond, 
    ​​                           pthread_mutex_t *restrict mutex,
    ​​                           const struct timespec *restrict abstime);
    ​​int pthread_cond_wait(pthread_cond_t *restrict cond,
    ​​                      pthread_mutex_t *restrict mutex);
    ​​
    ​​int pthread_cond_broadcast(pthread_cond_t *cond);
    

    pthread_cond_timedwaitpthread_cond_wait 將互斥鎖釋放,並阻塞住該執行緒,等待其它 process 送出信號觸發此 condition variable,在收到信號時返回並重新獲得互斥鎖。其中前者 timedwait 中的參數 abstime 表示可接受阻塞的最長時間。

    pthread_cond_broadcast 則送出信號喚醒所有持有條件變數 cond 的執行緒。

  • restrict 為 C99 關鍵字用來幫助編譯器最佳化,表示此 pointer 不會有 alias,只會有此變數可以操控。
  • noreturn 為 C11 增加的關鍵字,註明此函數不會回傳給 caller,而是透過 exit() 等方式跳出,如有回傳值則為 undefined behavior。

程式流程

透過 tpool_create 建立 thread pool,並將每個執行緒放入 __threadpool *pool 中。而 jobqueue_t *jobqueue 將每個待執行的 task,以 linked list 的方式連接,當有任務要執行時,從 jobqueue 中 pop 一個任務出來運行,每個任務在程式內是以 threadtask_t 結構封裝。

tpool_create 建立出來的執行緒,會運行 jobqueue_fetch 函式,在 jobqueue 為空時,每個 thread 會在 pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock); 內被 block 住,等待有新的 task 加入。透過 tpool_apply 將新的 task 加入到 jobqueue 中,如果此時 jobqueue 為空,則透過 pthread_cond_broadcast(&jobqueue->cond_nonempty); 送出信號給 cond_nonempty 條件變數,此時 jobqueue_fetch 內的其中一個執行緒中的 pthread_cond_wait 將會通過,並在下方取得 jobqueuetail 任務。

執行完給定的 function 後,會將相關結果存放在 struct __tpool_future 中,__tpool_future 用來當作使用者可見的執行緒,透過 tpool_future_get 取得 thread 執行完的結果。如果 tpool_future_get 時任務尚未完成,將會被條件變數 future->cond_finished 給 block 住,等待任務執行完畢。

timeout 機制與改進

現有程式使用 pthread_cond_timedwait 處理 thread 執行過久的問題,並將 future->flag 設置為 __FUTURE_TIMEOUT 表示該任務超時,但在 tpool_join 沒有相對應的處理。當有執行緒無法完成時,將會產生問題,如以下程式碼:

static void *loop(void *arg)
{
    while (1) ;
}

int main()
{
    tpool_future_t future = tpool_apply(pool, loop, (void *)NULL);
    void *result = tpool_future_get(future, 1 /* cannot set to zero */);
    tpool_future_destroy(future);
    free(result);
    
    tpool_join(pool);
}

所以我們需要有方法在無法退出 function 時正確離開 thread。故我在 jobqueue_fetch 內添加以下程式:

static void *jobqueue_fetch(void *queue)
{
    ...
+   pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state);
+   pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &old_state);
    void *ret_value = task->func(task->arg);
+   pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
    ...
}

將 thread 的 cancel type 改成 asynchronous,此時便不需要經過 cancellation point 也能直接退出執行緒。

在移除 future 時也要正確處理 timeout 時的情形:

int tpool_future_destroy(struct __tpool_future *future)
{
    if (future) {
        pthread_mutex_lock(&future->mutex);
        if (future->flag & __FUTURE_FINISHED ||
-           future->flag & __FUTURE_CANCELLED) { 
+           future->flag & __FUTURE_CANCELLED ||
+           future->flag & __FUTURE_TIMEOUT) {
            pthread_mutex_unlock(&future->mutex);
            pthread_mutex_destroy(&future->mutex);
            pthread_cond_destroy(&future->cond_finished);
            free(future);
        }
        ...
    }
    return 0;
}

最後在 join 的時候,將所有未完成的 task 所在的執行緒皆終止:

int tpool_join(struct __threadpool *pool)
{
    size_t num_threads = pool->count;
    for (int i = 0; i < num_threads; i++)
        tpool_apply(pool, NULL, NULL);
+   for (int i = 0; i < num_threads; i++)
+       pthread_cancel(pool->workers[i]);
    for (int i = 0; i < num_threads; i++)
        pthread_join(pool->workers[i], NULL);
    free(pool->workers);
    jobqueue_destroy(pool->jobqueue);
    free(pool);
    return 0;
}

我們在執行過程中不能知道哪些執行緒會無法退出,所以當有執行緒進入無窮迴圈時,該執行緒會一直被佔用,無法被重新利用,直到執行 tpool_join 後才能正確被釋放。

atomic_threadpool

atomic_threadppol 使用 lock-free queue 來取代此程式內的 job queue。thread pool 的實現方式與此程式一樣,透過 at_thpool_create 建立足夠的 thread 使用,每個 thread 內部執行 at_thpool_worker 並等待 queue 中有任務產生 (相當於 jobqueue_fetch),利用 at_thpool_newtask 建立任務 (相當於 tpool_apply) 並使用 lfqueue_enq 將任務放入 queue 當中。at_thpool_worker 利用 lfqueue_deq 將待處理任務讀出,並執行對應的函式。處理完任務的節點 (lfqueue_cas_node_t) 會透過 __lfq_recycle_free 以及 __lfq_check_free 釋放其空間。唯一不同的是此實作的 thread pool 無法回傳值給使用者。

只要是跟 queue 有關的操作,都必須為 thread safe,防止多個執行緒同時對 queue 操作而造成錯誤,在 atomic_threadpool 中使用 gcc 的 builtin atomic functions 實現:

  • 在 enqueue 的時候,有兩個操作需要保護
    • 增加 queue 的 size 時,使用 __sync_add_and_fetch() 執行加法,便能保證每次更新的值都保持為最新狀態。
    • 更新 queue 的 tail 時,使用 __sync_bool_compare_and_swaptail->next 對於同一個 tail 只更新一次,而 lfqueue->tail 則保持在最新值。
  • 在 dequeue 的時候,有兩個操作需要保護
    • 更改 queue 的 head 時,使用 __sync_bool_compare_and_swaplfqueue->head 保持在最新值。
    • 當 queue 只有一個元素時,dequeue 也要更新 lfqueue->tail,一樣使用 __sync_bool_compare_and_swap 實現。
  • 釋放空間時,也需要使用 __LFQ_BOOL_COMPARE_AND_SWAP 設置要移除的節點,才不會重複釋放同個記憶體,並使用 __LFQ_BOOL_COMPARE_AND_SWAP 指示現在是否為 in_free_mode,防止多個執行緒同時進行移除。

TODO

tags: linux2021