2023 Homework 3

contributed by < Julian-Chu >

挑戰 1

研讀第 2 次測驗題的測驗 1,研讀程式碼註解提及的論文〈Cilk: An Efficient Multithreaded Runtime System〉,解釋程式碼運作原理

Image Not Showing Possible Reasons
  • The image was uploaded to a note which you don't have access to
  • The note which the image was originally uploaded to has been deleted
Learn More →

上圖可以表示 main function 的主要任務是

  • 建立 queue
  • 建立 work_t 並將其放入 queue
  • 建立 thread
// 在 print_task 內呼叫, 
// - 將完成的 work_t 在 done_work->join_count(代表尚未執行的 print_task work_t) 減 1
// - 當最後一個 print_task 執行完畢, 將 done_work 釋放掉 
// - 設計上的巧思把 done_task 實作成 task_t, 可以如 print_task 一樣被 thread 視為 work_t 處理
work_t *join_work(work_t *work)
{
    int old_join_count = atomic_fetch_sub(&work->join_count, 1);
    if (old_join_count == 1)    // return done_work when last print_task work is done
        return work;
    return NULL; // still some print_task remain, return null to pull work from queue
}

// 全局的變量, 代表所有 work_t 是否完成
atomic_bool done;

// 將 done_work 釋放
// 並將 done 設為 true, 代表所有 work_t 已完成
work_t *done_task(work_t *w)
{
    free(w);
    atomic_store(&done, true);
    return NULL;
}

int main(int argc, char **argv)
{
    /* Check that top and bottom are 64-bit so they never overflow */
    static_assert(sizeof(atomic_size_t) == 8,
                  "Assume atomic_size_t is 8 byte wide");

    pthread_t threads[N_THREADS];
    int tids[N_THREADS];
    thread_queues = malloc(N_THREADS * sizeof(deque_t));
    int nprints = 10;

    atomic_store(&done, false);
    // done_work
    // - 計算剩餘 work_t
    // - 釋放自身
    // 在 print_task 裡面使用 cont 來代表 done_work 猜想是對比 Clik 中的 continuation 
    work_t *done_work = malloc(sizeof(work_t));
    done_work->code = &done_task;
    done_work->join_count = N_THREADS * nprints;

    for (int i = 0; i < N_THREADS; ++i) {
        tids[i] = i;
        init(&thread_queues[i], 8);
        for (int j = 0; j < nprints; ++j) {
            work_t *work = malloc(sizeof(work_t) + 2 * sizeof(int *));
            work->code = &print_task;
            // print_task 沒有前置的 work_t
            work->join_count = 0;
            int *payload = malloc(sizeof(int));
            *payload = 1000 * i + j;
            work->args[0] = payload;
            // 類似將 done_work 視為 hook 傳入
            work->args[1] = done_work;
            push(&thread_queues[i], work);
        }
    }

    for (int i = 0; i < N_THREADS; ++i) {
        if (pthread_create(&threads[i], NULL, thread, &tids[i]) != 0) {
            perror("Failed to start the thread");
            exit(EXIT_FAILURE);
        }
    }

    for (int i = 0; i < N_THREADS; ++i) {
        if (pthread_join(threads[i], NULL) != 0) {
            perror("Failed to join the thread");
            exit(EXIT_FAILURE);
        }
    }
    printf("Expect %d lines of output (including this one)\n",
           2 * N_THREADS * nprints + N_THREADS + 2);

    return 0;
}

// task_t 做兩個設計使其更 gneric
// - 需要回傳下一個需執行的 work_t , 
// - 直接將 work_t 當成參數傳入, 在 task_t 的內部邏輯在調用
// work_t args
typedef struct work_internal *(*task_t)(struct work_internal *);


typedef struct work_internal {
    task_t code; 
    // 需要執行此 work_t 的前置作業是否完成
    atomic_int join_count;
    // task_t 內部程式碼會用到的參數
    void *args[];
} work_t;

// print_task 與 done_task 就是典型的 task_t
// 回傳下一個有依賴關係的 work_t, 否則回傳 NULL 結束, 再由 thread 決定從 queue 中取得 work_t 或是 work steal
work_t *print_task(work_t *w)
{
    int *payload = (int *) w->args[0];
    int item = *payload;
    printf("Did item %p with payload %d\n", w, item);
    work_t *cont = (work_t *) w->args[1]; 
    free(payload);
    free(w);
    return join_work(cont);  
}


work_t *done_task(work_t *w)
{
    free(w);
    atomic_store(&done, true);
    printf("%s\n", "done");
    return NULL;
}
// - 從 thread id 對應 queue 取得 work_t
// - 否則從 id 的 queue 開始循序 work steal
// - 沒有可以偷取的 work_t, 且 done is true, 結束目前的 thread 
void *thread(void *payload)
{
    int id = *(int *) payload;
    deque_t *my_queue = &thread_queues[id];
    while (true) {
        work_t *work = take(my_queue);
        if (work != EMPTY) {
            do_work(id, work);
        } else {
            work_t *stolen = EMPTY;
            for (int i = 0; i < N_THREADS; ++i) {
                if (i == id)
                    continue;
                stolen = steal(&thread_queues[i]);
                if (stolen == ABORT) {
                    i--;
                    continue; 
                } else if (stolen == EMPTY)
                    continue;

                break;
            }
            if (stolen == EMPTY) {

                if (atomic_load(&done))
                    break;
                continue;
            } else {
                do_work(id, stolen);
            }
        }
    }
    printf("work item %d finished\n", id);
    return NULL;
}

work queue and work steal

todo

指出 work-stealing 程式碼可改進之處並著手進行。(歡迎提交 pull request 到 concurrent-programs)

可以考慮改善 work-stealing 的次數

  • 增加 steal 的隨機性, 目前的程式碼是從 index 小的 queue 開始 steal, 這會導致 index 小的 thread 很快就沒有 work_t 可以從 queue 拿取, 需要進行 work_steal, 但是 index 大的 queue, work_t 依然堆積
  • 增加每次 steal 的數量, 例如每次 steal 都取 queue 的 1/4 到 1/2 , 可大量的減少 steal 的次數

利用 work stealing 改寫第 1 次作業測驗 γ 提及的並行化快速排序程式碼,並確保能充分運用硬體資源

研讀 Linux 核心的 CMWQ,討論其 work stealing 的實作手法

挑戰 2