Try   HackMD

2021q1 Homework4 (quiz4)

contributed by < YOYOPIG >

tags: linux2021

2021q1 第 4 週測驗題

程式碼的理解與運作原理

關於 Future

在第一次研究程式碼時,future 相關程式碼造成理解的障礙,由於習慣程式一行一行執行 (就算是 multithread 也該一行一行執行完結束吧?),突然要理解 future 這個既存在又其實還沒做完的概念,花上不少時間。因此,在完整讀完程式前,我決定先研究一下使用 future 在程式設計中扮演的角色。這裡參考 Philipp Oppermann 的文章中的說明,整理出 multitasking 的用法與目的大致可以分為兩種:

  • Preemptive Multitasking
    將程式分成許多 task,常透過開新 thread 的方式實作,各自處理各自的任務,最後再透過主要程式呼叫 join 等待每個 thread 完成。這種方法中每個 thread 的任務不一定有強烈的相關,在 context switching 上也主要交給 OS 來完成。這樣的特性讓我們不用太過擔心每個 thread 各自的執行狀況,透過 timeout 等 preemption 機制可以確保每個 thread 都能順利執行。這也是目前我比較熟悉的機制 (甚至曾經有認為 multitasking 就是單指這種開完 thread 放著等他跑的錯誤觀念)
  • Cooperative Multitasking
    相較於被動的讓 OS 決定 context switch 的時機,這種方式採取由 task 主動讓出 CPU 的方式進行,在即將進行較長的等待時 (例如需要等待一個相當大的 I/O),透過 yield 類的動作讓出運算資源。這種方法常在 Language level 就透過 coroutine 或 async/await 等方式實作。由於這種方法可以人為找到程式中適合 context switch 的時間點,因此如果使用得當會比上面的方法來的好。當然,缺點就是如果程式的設計不良,那麼可能就會有某些 task 明明處在等待階段,卻 block 住其他 task 的執行,造成程式效率的低落。

Future 的概念,就常常用在 Cooperative Multitasking 中。當預期進入長時間任務時,與其等到完全執行完再回傳結果,不如先傳回一個暫時的結果 (當然,要設計能辨別結果完成與否的機制),讓其他 task 可以繼續執行,等到真的需要用到回傳值時,再等待該任務完成。

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

如上圖所示,下面的方法顯然較省時,在等待 file system I/O 時,先回傳 future file,讓 main 得以繼續執行,假設 foo() 暫時還不會用到 file,那就可以省下等待 file 的時間。這一類的 async/await 操作在一些較新的程式語言裡都有內建好了,如老師之前上課提到的 Rust,以及之前自己練習寫手機 app 時用的 Dart 等。回想當初明明有用過,卻不知道其背後原理,還以為那是 Dart 特有的酷東西,不免對自己的孤陋寡聞感到難過

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

實作的方式

這份 threadpool 實作融合了上述兩種概念。由於 C 沒有內建 async 操作,這裡透過自定義的 struct 實現:

struct __tpool_future {
    int flag;
    void *result;
    pthread_mutex_t mutex;
    pthread_cond_t cond_finished;
};

遇到需要長時間執行的 task 時,不做等待直接回傳 tpool_future 指標,同時把實際的任務交給 thread pool 中的 thread 做計算。而真正的結果,則會透過結構中的 void * 型態指標 (適度轉型後方可 dereference) 來儲存。為了確定 task 目前的狀態,另外加入 flag 作為判斷依據,並透過 mutex lock 及 condition variable 保護 critical section。同時,為了避免上面提到 Cooperative Multitasking 可能會有因設計不良導致單一任務執行過久的問題,這裡透過 preemption 機制解決,詳見 timeout 處理

完成了 future 的實作後,回來看到在 task 定義中的應用方式。

typedef struct __threadtask {
    void *(*func)(void *);
    void *arg;
    struct __tpool_future *future;
    struct __threadtask *next;
} threadtask_t;

typedef struct __jobqueue {
    threadtask_t *head, *tail;
    pthread_cond_t cond_nonempty;
    pthread_mutex_t rwlock;
} jobqueue_t;

這裡一個 __threadtask 除了定義基本的 funcarg 作為應該執行什麼工作的依據外,也直接包含一個指向 __tpool_future 的指標 *future,以便隨時回傳一個暫時的結果,之後可以再使用它的 flag 及 result 得到正確的結果。最後,作為 job queue 中的一個節點,需要 *next 指向 job queue 中的下一個 task,如此一來之後的 __jobqueue 結構只需要如往常一樣儲存 queue 的頭尾即可。由於 thread pool 中的各個 thread 會分別來尋找下一個任務,因此這裡另外加上 condition variable 監控 queue 是否為空以及 mutex lock 保護讀寫的 critical section。

最後,定義 threadpool 主體:

struct __threadpool {
    size_t count;
    pthread_t *workers;
    jobqueue_t *jobqueue;
};

timeout 處理

為了避免單一任務執行過久或是因程式撰寫不良,導致非預期的 blocking,這裡不使用一般常用的 pthread_cond_wait() 而選擇了 pthread_cond_timedwait(),除了成功等待到 condition variable 外,也會因為設定的時間而結束等待,詳見 pthread_cond_wait(3)

void *tpool_future_get(struct __tpool_future *future, unsigned int seconds)
{
    pthread_mutex_lock(&future->mutex);
    /* turn off the timeout bit set previously */
    future->flag &= ~__FUTURE_TIMEOUT;
    while ((future->flag & __FUTURE_FINISHED) == 0) {
        if (seconds) {
            struct timespec expire_time;
            clock_gettime(CLOCK_MONOTONIC, &expire_time);
            expire_time.tv_sec += seconds;
            int status = pthread_cond_timedwait(&future->cond_finished,
                                                &future->mutex, &expire_time);
            if (status == ETIMEDOUT) {
                future->flag |= __FUTURE_TIMEOUT;
                pthread_mutex_unlock(&future->mutex);
                return NULL;
            }
        } else
            pthread_cond_wait(&future->cond_finished, &future->mutex)
    }

    pthread_mutex_unlock(&future->mutex);
    return future->result;
}

tpool_future_get 函式中,有著對 timeout 的處理機制。在工作尚未完成時,透過計時的方式決定每個 thread 的執行時間,直到 task 完成讓 pthread_cond_timedwait 成功等待到 cond_finished,或是時間到被 preempt,把 flag 設成 __FUTURE_TIMEOUT。同時,如果使用者未限制執行時間,則正常等待至 task 完成。因此,原先 FFF 處應填 pthread_cond_wait(&future->cond_finished, &future->mutex)

空格填入

接著看有三處填空的 jobqueue_fetch

static void *jobqueue_fetch(void *queue)
{
    jobqueue_t *jobqueue = (jobqueue_t *) queue;
    threadtask_t *task;
    int old_state;

    pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock);

    while (1) {
        pthread_mutex_lock(&jobqueue->rwlock);
        pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state);
        pthread_testcancel();

        GGG;
        pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
        if (jobqueue->head == jobqueue->tail) {
            task = jobqueue->tail;
            jobqueue->head = jobqueue->tail = NULL;
        } else {
            threadtask_t *tmp;
            for (tmp = jobqueue->head; tmp->next != jobqueue->tail;
                 tmp = tmp->next)
                ;
            task = tmp->next;
            tmp->next = NULL;
            jobqueue->tail = tmp;
        }
        pthread_mutex_unlock(&jobqueue->rwlock);

        if (task->func) {
            pthread_mutex_lock(&task->future->mutex);
            if (task->future->flag & __FUTURE_CANCELLED) {
                pthread_mutex_unlock(&task->future->mutex);
                free(task);
                continue;
            } else {
                task->future->flag |= __FUTURE_RUNNING;
                pthread_mutex_unlock(&task->future->mutex);
            }

            void *ret_value = task->func(task->arg);
            pthread_mutex_lock(&task->future->mutex);
            if (task->future->flag & __FUTURE_DESTROYED) {
                pthread_mutex_unlock(&task->future->mutex);
                pthread_mutex_destroy(&task->future->mutex);
                pthread_cond_destroy(&task->future->cond_finished);
                free(task->future);
            } else {
                task->future->flag |= KKK;
                task->future->result = ret_value;
                LLL;
                pthread_mutex_unlock(&task->future->mutex);
            }
            free(task);
        } else {
            pthread_mutex_destroy(&task->future->mutex);
            pthread_cond_destroy(&task->future->cond_finished);
            free(task->future);
            free(task);
            break;
        }
    }

    pthread_cleanup_pop(0);
    pthread_exit(NULL);
}

可以看出,這裡主要透過 while loop 持續執行,從 job queue 中找出下一個任務轉交給 thread pool 中的 thread。因此,在 loop 開頭處應檢查 queue 是否還有元素,若無則應該停下等待,GGG 應選 while (!jobqueue->tail) pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock)。如果 queue 不為空,則找出下一個 task 開始執行。確認該 task 沒有因故被取消後,就可以把 flag 設為 __FUTURE_RUNNING,靜待其在多執行緒環境中自動執行。在執行完畢後,該將 flag 設為已完成,即 __FUTURE_FINISHED,交由後續的處理機制將其釋放。

因此,KKK 應選 (b) __FUTURE_FINISHED。同時,除了將最後結果存下來,也該透過 broadcast 機制告知該 task 已完成。因此,LLL(a) pthread_cond_broadcast(&task->future->cond_finished)

最後一個空格在 tpool_apply() 中:

struct __tpool_future *tpool_apply(struct __threadpool *pool,
                                   void *(*func)(void *),
                                   void *arg)
{
    jobqueue_t *jobqueue = pool->jobqueue;
    threadtask_t *new_head = malloc(sizeof(threadtask_t));
    struct __tpool_future *future = tpool_future_create();
    if (new_head && future) {
        new_head->func = func, new_head->arg = arg, new_head->future = future;
        pthread_mutex_lock(&jobqueue->rwlock);
        if (jobqueue->head) {
            new_head->next = jobqueue->head;
            jobqueue->head = new_head;
        } else {
            jobqueue->head = jobqueue->tail = new_head;
            HHH;
        }
        pthread_mutex_unlock(&jobqueue->rwlock);
    } else if (new_head) {
        free(new_head);
        return NULL;
    } else if (future) {
        tpool_future_destroy(future);
        return NULL;
    }
    return future;
}

由於先前的 jobqueue_fetch 會等待 jobqueue 非空的訊號,因此在 HHH 的地方需在新增 jobqueue 中第一個元素後,透過 broadcast 告知,應選 pthread_cond_broadcast(&jobqueue->cond_nonempty)


研讀 atomic_threadpool

原始程式碼: Taymindis/atomic_threadpool

和 quiz4 版本的比較

Taymindis/atomic_threadpool 的作者使用了 atomic 操作,實現了 lock free 版本的 thread pool。回顧這次小考的程式碼,會發現 mutex lock 主要出現在兩個地方:

  • job queue
    由於各個 thread 會各自試著從 queue 中 fetch 下一個任務,因此若其中一個 thread 讀取到一半,尚未將任務從 queue 中取出就發生 context switch,就可能會有兩個 thread 讀取同一個 job 的可能,因此這個操作屬於 critical section,應被保護。
  • tpool future
    為了實現 future,這裡透過 flag 來掌握執行的狀態。因此,當狀態改變後,需要立即更新 flag 而不能被中斷,在可能改變狀態的操作直到 flag 更新完畢屬於 critical section,應被保護。
typedef struct __jobqueue {
    threadtask_t *head, *tail;
    pthread_cond_t cond_nonempty;
    pthread_mutex_t rwlock;
} jobqueue_t;

struct __tpool_future {
    int flag;
    void *result;
    pthread_mutex_t mutex;
    pthread_cond_t cond_finished;
};

比較 lock free 版本的實作:

typedef struct {
    void (*do_work)(void *);
    void *args;
} at_thtask_t;

struct at_thpool_s {
    pthread_t *threads;
    lfqueue_t taskqueue;
    size_t nrunning;
    int is_running;
};

不難看出,兩種方法其實相當相似,沒有上述兩大 critical section 的主要原因為:

  • Lock free queue 的使用
    這裡引入了作者另外實作的 lock free queue,對這個 queue 的存取皆為 atomic,不需要透過 mutex lock 也能確保不會發生 race condition。
  • 沒有額外紀錄執行狀態
    在這份實作中,每個 task at_thtask_t 預設不會回傳數值,因此並沒有實作紀錄各 thread 狀態及結果的變數,自然也沒有 critical section。

lock free queue 的實作

為了確保 queue 的存取不會有 race condition,這裡使用了 gcc 支援的 built-in atomic operation,可以在一開始 macro definition 的地方找到:

#define __LFQ_VAL_COMPARE_AND_SWAP __sync_val_compare_and_swap
#define __LFQ_BOOL_COMPARE_AND_SWAP __sync_bool_compare_and_swap
#define __LFQ_FETCH_AND_ADD __sync_fetch_and_add
#define __LFQ_ADD_AND_FETCH __sync_add_and_fetch
#define __LFQ_YIELD_THREAD sched_yield
#define __LFQ_SYNC_MEMORY __sync_synchronize

__sync_ 開頭的內建函式已標註為 legacy,應該用 Memory Model Aware Atomic Operations

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
jserv

接著,在插入與取出資料時,使用這些定義好的 atomic operation 取代原本的操作:

下方程式碼縮排改用 4 個空白,讓共筆的視覺效果更緊湊

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
jserv

  • dequeue
static void *
_dequeue(lfqueue_t *lfqueue) {
    lfqueue_cas_node_t *head, *next;
    void *val;

    for (;;) {
        head = lfqueue->head;
        if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->head, head, head)) {
            next = head->next;
            if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->tail, head, head)) {
    	        if (next == NULL) {
    	            val = NULL;
    	            goto _done;
                }
            }
            else {
    	        if (next) {
    	            val = next->value;
    	            if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->head, head, next)) {
    		        break;
    	            }
    	        } else {
    	            val = NULL;
    	            goto _done;
                }
            }
        }
    }

  __lfq_recycle_free(lfqueue, head);
_done:
    // __asm volatile("" ::: "memory");
    __LFQ_SYNC_MEMORY();
    __lfq_check_free(lfqueue);
    return val;
}

這裡使用 __LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->tail, head, head) 來檢查 queue 中是否為空,__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->head, head, next) 來實際進行取出第一個元素並把 head 更新至下一個元素的動作。最後,透過 __LFQ_SYNC_MEMORY (實際上就是 __sync_synchronize) 作為 memory barrier,確保前後的指令不會在編譯器最佳化或處理器指令順序重組過程中被打亂。

  • enqueue
static int
_enqueue(lfqueue_t *lfqueue, void* value) {
    lfqueue_cas_node_t *tail, *node;
    node = (lfqueue_cas_node_t*) malloc(sizeof(lfqueue_cas_node_t));
    if (node == NULL) {
        perror("malloc");
        return errno;
    }
    node->value = value;
    node->next = NULL;
    node->nextfree = NULL;
    for (;;) {
        __LFQ_SYNC_MEMORY();
        tail = lfqueue->tail;
        if (__LFQ_BOOL_COMPARE_AND_SWAP(&tail->next, NULL, node)) {
            // compulsory swap as tail->next is no NULL anymore, it has fenced on other thread
            __LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->tail, tail, node);
            __lfq_check_free(lfqueue);
            return 0;
        }
    }

    /*It never be here*/
    return -1;
}

這裡一樣是透過 __LFQ_BOOL_COMPARE_AND_SWAP(&tail->next, NULL, node) 來插入新元素至尾端,再更新 tail 位置。

有了這樣的 atomic operation,就可以確保每個 thread 在讀寫 job queue 時,不會發生未預期的執行順序進而產生錯誤的結果。基於這樣的 lock free queue,就可以實現一樣是 lock free 的 thread pool 了。

gcc 支援的 built-in atomic operation,手冊提到:

The following builtins are intended to be compatible with those described in the Intel Itanium Processor-specific Application Binary Interface, section 7.4. As such, they depart from the normal GCC practice of using the "__builtin_"" prefix, and further that they are overloaded such that they work on multiple types.

Not all operations are supported by all target processors.

這樣一來,這些 atomic operation 的使用似乎會和硬體限制有關? 不確定在 AMD 或其他處理器的平台是否能有同樣的結果,之後再設計實驗確認


嘗試使用 C11 Atomics 改寫上述程式碼

TODO

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →