Try   HackMD

Linux 核心專題: 並行工作佇列的設計和應用

執行人: chloe0919
專題解說影片

Reviewed by yy214123

考慮到佇列僅剩最後一個元素的情況,我想詢問在 takesteal 操作中, q->topt 這兩個變數的變化與 cmpxchg 的運作機制是如何搭配的?

if (!atomic_compare_exchange_strong_explicit(
          &q->top, &t, t + 1, memory_order_seq_cst, memory_order_relaxed))
    /* Failed race */
    x = EMPTY;

Chloexywtake 為例,在佇列僅剩一個元素時會利用 cmpxchgq->topt 的位置進行判斷

  1. 如果此時這兩個位置相同,則代表沒有其他執行緒在 steal,所以 cmpxchg 會將 &q->top 更新成 t + 1 並且回傳 true
  2. 這兩個位置不相同,代表有其他執行緒在 steal 所以改變了 &q->top 的值,導致此值和 take 剛開始前索取出保存在 t 的不同,所以此時 cmpxchg 則不會更新 &q->top (已經被提早更新了),並且回傳 false 進入 if 判斷中將 x 設為 EMPTY,代表 take 失敗

Reviewed by youjiaw

steal 的競爭問題,我認為用隨機數減少執行緒之間的競爭是很好的方式,但是我很好奇測試結果中,少數不使用隨機數的執行時間反而比較少的情況,是由什麼原因造成?

Reviewed by kkkkk1109

在模擬好情況後一樣使用 valgrin Valgrind 檢查看看,可以看到這邊會顯示出 definitely lost,也就是真的記憶體洩漏

這裡有錯字

任務簡介

延伸第九週測驗三 (work-steal),搭配閱讀指定的論文〈Cilk: An Efficient Multithreaded Runtime System〉,說明如何用 C11 Atomics 實作 deque 並建構足以進行 work-stealing 的並行結構,最終應用於平行化的資料排序實作中。

TODO: 重作第九週測驗三及延伸問題

不用抄寫參考題解,專注於 work-steal 程式碼本身。應進行所有指定的延伸問題。

結構體:

首先建立 work_t 結構體,用來將執行流程拆成多個段落,每個段落都以 work_t 進行描述,根據 〈Cilk: An Efficient Multithreaded Runtime System〉的說明,work_t 對應到的也就是論文中提到的資料結構 "closure"

typedef struct work_internal {
    task_t code;
    atomic_int join_count;
    void *args[];
} work_t
  • code: 指標,用來指向各執行緒要啟動任務時所要執行的函式,如果準備好執行則回傳下一個執行項目,否則回傳 NULL
  • join_count: 定義為 atomic 的整數,用來計算這個 work 還缺少了多少 arguments 才能進行
  • args: 即 work 執行所需要的 arguments

兩結構體用來定義 deque

typedef struct {
    atomic_size_t size;
    _Atomic work_t *buffer[];
} array_t;

typedef struct {
    /* Assume that they never overflow */
    atomic_size_t top, bottom;
    _Atomic(array_t *) array;
} deque_t;
  • top/bottom: 表示佇列中元素的有效範圍
  • array: 為指向函式 array_t 的指標,用來指向整個佇列本體
  • push

操作流程主要是先取出 bottomtop 對應位置,並且先判斷佇列是否為 full,若成立則需要 resize 佇列,避免覆蓋到原本的 work,若不為 full 則直接將給定的 work w 更新到當前 bottom 所指的位置。

另外要確保放入 work w 和更新 bottom 位置為 bottom + 1 這兩個動作是不能被重排的,否則會出現某一執行緒還沒有放入 work,而另一執行緒已處理該位置上工作的問題,因此使用 atomic_thread_fence(memory_order_release) 確保前面的指令不會被重排到該指令之後。

atomic_store_explicit(&a->buffer[b % a->size], w, memory_order_relaxed);
atomic_thread_fence(memory_order_release);
atomic_store_explicit(&q->bottom, b + 1, memory_order_relaxed);
  • resize

擴展佇列是為了解決當佇列已滿時,為避免覆蓋到原始 work,所以重新創造一個新佇列,根據程式碼的註解說明,原始論文中是使用 garbage collector,可以自動釋放許久未訪問到的記憶體空間,但在此例中並沒有使用 garbage collector,但也不能直接釋放 *a,會產生其他執行緒正使用舊的佇列到一半被釋放的問題,所以藉由擴展佇列到原本的兩倍,使 memory leakage 是指數成長,可以成功限制 memory leakage 不會無限增長。

/* The question arises as to the appropriate timing for releasing memory
 * associated with the previous array denoted by *a. In the original Chase
 * and Lev paper, this task was undertaken by the garbage collector, which
 * presumably possessed knowledge about ongoing steal operations by other
 * threads that might attempt to access data within the array.
 *
 * In our context, the responsible deallocation of *a cannot occur at this
 * point, as another thread could potentially be in the process of reading
 * from it. Thus, we opt to abstain from freeing *a in this context,
 * resulting in memory leakage. It is worth noting that our expansion
 * strategy for these queues involves consistent doubling of their size;
 * this design choice ensures that any leaked memory remains bounded by the
 * memory actively employed by the functional queues.
 */
  • take

take 的目的是從執行緒自己的 dequeue 中取得下個要實行的 work,分別取出 bottom 和 top 的位置後,因為有 atomic_thread_fence 確保前面程式必須優於後面執行,如果等到確定好 deque 是否為空之後才進行 bottom 的更新,若此時有其他執行緒執行 steal 的動作時,就會發生 takesteal 同時進行的競爭問題,反之先更新 bottom 即使事後之後發現 deque 是空的或是 work 已經被 steal 掉,只要再復原 bottom 就好。

work_t *take(deque_t *q)
{
    size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed) - 1;
    array_t *a = atomic_load_explicit(&q->array, memory_order_relaxed);
    atomic_store_explicit(&q->bottom, b, memory_order_relaxed);
    atomic_thread_fence(memory_order_seq_cst);

再來,deque 有三種情境 :

  • t < b : 可以直接取走 b 位置對應的 work,最後要恢復 bottom 的值(更新為 b + 1)

  • t == b : deque 目前只剩一個 entry,此時可能會和 steal 發生競爭問題,可以透過 cmpxchg 來判斷是哪種情形 :

    • 只有 take : 判斷 q->topt 的位置是否相等,若相等則代表目前沒有發生 steal 的競爭問題,所以只需要直接將 q->top 更新成 t + 1 並返回 true 即可
    • 發生 steal : q->topt 的位置不相等,代表目前有其他執行緒正在 steal,返回 false 進入 Failed race 將 x = EMPTY,表示不需要 take

    最後兩種情況都要恢復 bottom

if (t <= b) {
/* Non-empty queue */
x = (work_t *)atomic_load_explicit(&a->buffer[b % a->size],
                                   memory_order_relaxed);
if (t == b) {
  /* Single last element in queue */
  if (!atomic_compare_exchange_strong_explicit(
          &q->top, &t, t + 1, memory_order_seq_cst, memory_order_relaxed))
    /* Failed race */
    x = EMPTY;
  atomic_store_explicit(&q->bottom, b + 1, memory_order_relaxed);
}
  • t > b: deque 為空,需要復原 bottom

以下用圖片說明只有 take 的操作流程 :
假設目前僅剩一個 entry







structs



struct1

A[0]

A[1]


A[2]

A[3]

...

A[n-1]

A[n]



top
top



top->struct1:f2





bottom
bottom



bottom->struct1:f3





首先要取出 bottom - 1 的位置並存在 b,之後先更新 bottom







structs



struct1

A[0]

A[1]


A[2]

A[3]

...

A[n-1]

A[n]



top
top



top->struct1:f2





bottom
bottom



bottom->struct1:f2





之後判斷目前沒有發生 steal 的競爭問題,所以需要將 q->top 更新成 t + 1,並且恢復 bottom 的值,最後取走 b 位置上的 work







structs



struct1

A[0]

A[1]


A[2]

A[3]

...

A[n-1]

A[n]



top
top



top->struct1:f3





bottom
bottom



bottom->struct1:f3





k
k



k->struct1:f2





  • steal

注意用語,務必使用本課程教材規範的術語。

atomic_thread_fence 保證需先取 top 再去取得 bottom,防止在這兩個讀取之間有其他內存 記憶體操作的指令被重排在此

size_t t = atomic_load_explicit(&q->top, memory_order_acquire);
atomic_thread_fence(memory_order_seq_cst);
size_t b = atomic_load_explicit(&q->bottom, memory_order_acquire);

再來若是 t < b 則代表有 work 可以 steal,不過還是要透過 cmpxchg 去和 take 競爭,這邊的操作和 take 類似,若發生 take 則返回 false 進入 Failed race,代表 steal 失敗

work_t *x = EMPTY;
if (t < b) {
/* Non-empty queue */
array_t *a = atomic_load_explicit(&q->array, memory_order_consume);
x = (work_t *)atomic_load_explicit(&a->buffer[t % a->size],
                                   memory_order_relaxed);

if (!atomic_compare_exchange_strong_explicit(
        &q->top, &t, t + 1, memory_order_seq_cst, memory_order_relaxed)) {
  /* Failed race */
  return ABORT;
}
  • thread

首先,先嘗試在自己的 deque 中執行 take,確定是否有 work 可以執行,若沒有就會去其他執行緒的 deque 中尋找可以 steal 的 work,其中如果 i == id 就代表是自己 deque,必須跳過到下一個找

stolenABORT 時,代表當時正有其他執行緒在 take,所以 steal 的行為才會失敗,不代表此執行緒的 deque 不能 steal,所以這時將 i--,重複回到這個 deque 再嘗試一次 steal

/* Currently, there is no work present in my own queue */
work_t *stolen = EMPTY;
for (int i = 0; i < N_THREADS; ++i) {
  if (i == id)
      continue;
  stolen = steal(&thread_queues[i]);
  if (stolen == ABORT) {
      i--;
      continue; /* Try again at the same i */
  } else if (stolen == EMPTY)
      continue;

  /* Found some work to do */
  break;
}

如果 stolen 不為空代表有 steal 到,則執行該 work,若為空也就是看了一輪都沒有發現可偷的 work,但仍不代表所有 work 都完成,所以額外建立一個 work done_work,當其他的 work 完成時就會將 done_workjoin_count 減一,直到 join_count 歸零時則會執行 done_task 函式,將 done 設為 true,最後檢查 done 看是否所有 work 皆已完成

work_t *done_task(work_t *w)
{
    free(w);
    atomic_store(&done, true);
    return NULL;
}

改進原始實作

先檢查看看原始 work steal 的策略是否有確實減少執行時間,以下為使用 clock() 對原始 work steal 程式和未使用 work steal 的策略進行時間的測試,其中仿照 thread 的內容定義一個函式,每個 thread 都只完成自己 deque 的工作而不進行 steal

注意書寫規範:

  • 使用 lab0 規範的程式碼書寫風格,務必用 clang-format 確認一致

Chloexyw 好的,已修改
不!你沒改,注意看程式碼規範的「clang-format 工具和一致的程式撰寫風格」,筆記列出的程式碼依循指定的風格。

void *thread_nosteal(void *payload) {
  int id = *(int *)payload;
  deque_t *my_queue = &thread_queues[id];
  while (true) {
    work_t *work = take(my_queue);
    if (work != EMPTY) {
      do_work(id, work);
    } else {
      if (atomic_load(&done))
        break;
    }
  }
  // printf("Thread %d finished\n", id);
  return NULL;
}

可以看到 thread 數量由 24 到 100 個,使用 work steal 策略的表現大致上比沒有使用來的好

compare_100

記憶體釋放

900c826

我們要處理的記憶體釋放問題包括下面兩種:

  1. 所有程式執行完後執行緒的 deque 沒有完全釋放乾淨
  2. 在 resize 的函式中為了避免釋放舊的 deque 而導致目前有其他執行緒正在偷取工作時造成錯誤,所以當 deque 滿了而 resize 並不會釋放任何空間

首先第一種記憶體釋放可以在所有執行緒都結束工作時,再將他們釋放:

注意書寫規範:

  • 使用 lab0 規範的程式碼書寫風格,務必用 clang-format 確認一致

Chloexyw 好的,已修改
不!你沒有!筆記列出的程式碼都要嚴格依循規範,注意細節。

for (int i = 0; i < N_THREADS; ++i){
  array_t *array = atomic_load_explicit(&thread_queues[i].array, memory_order_relaxed);
  free(array);
}
free(thread_queues);

以 Valgrind 進行記憶體的檢查,原本在修改前會有 still reachable,根據 第一次作業關於 Valgrind 的解說,still reachable 指的是在程式結束後有未釋放卻還有指標指著的記憶體

==79134== LEAK SUMMARY:
==79134==    definitely lost: 0 bytes in 0 blocks
==79134==    indirectly lost: 0 bytes in 0 blocks
==79134==      possibly lost: 0 bytes in 0 blocks
==79134==    still reachable: 208 bytes in 3 blocks
==79134==         suppressed: 0 bytes in 0 blocks
==79134== Rerun with --leak-check=full to see details of leaked memory

經過上述釋放後,這些記憶體都被順利釋放成功:

==80249== HEAP SUMMARY:
==80249==     in use at exit: 0 bytes in 0 blocks
==80249==   total heap usage: 39 allocs, 39 frees, 2,368 bytes allocated
==80249== 
==80249== All heap blocks were freed -- no leaks are possible
==80249== 
==80249== For lists of detected and suppressed errors, rerun with: -s
==80249== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

改進漢語表達。

再來第二種記憶體釋放問題,我們先觀察原本的 work-stealing 的 main function,可以看到這邊的實作是先將所有的工作都先放進各自執行緒的 deque 之後才執行 pthread_create,而且只有 push 函式會使用到 resize,所以這邊實作的情況是不會發生有執行緒在 resize 時,恰好另一執行緒正在 steal 的問題,所以我在 thread 函式中塞入工作模擬出某些執行緒在 push 時可能會有另一執行緒正在 steal 的情況。

在模擬好情況後一樣使用 valgrin 檢查看看,可以看到這邊會顯示出 definitely lost,也就是真的記憶體洩漏

==81221== LEAK SUMMARY:
==81221==    definitely lost: 80 bytes in 1 blocks
==81221==    indirectly lost: 0 bytes in 0 blocks
==81221==      possibly lost: 0 bytes in 0 blocks
==81221==    still reachable: 0 bytes in 0 blocks
==81221==         suppressed: 0 bytes in 0 blocks
==81221== Rerun with --leak-check=full to see details of leaked memory

為了達到安全釋放 resize 後的舊佇列,在 array_t 的結構體中另外建立一個用來計數的 ref_count,並且在每次 resize 時都去判斷此 ref_count 是否為一(也就是是否只有自己使用中),若為一則代表可以安全的釋放出舊佇列的記憶體

typedef struct {
  atomic_size_t size;
  atomic_size_t ref_count;
  _Atomic work_t *buffer[];
} array_t;

另外,建立 refcountaddrefcountsub 函式來控制 ref_count 的值,假設有 a 執行緒要偷取 b 執行緒的工作時,就將 b 執行緒的 ref_count 的值加一,若當時執行緒 b 也正在執行 resize 就會發現 ref_count 不為一,所以也就無法立即釋放舊佇列

注意書寫規範:

  • 使用 lab0 規範的程式碼書寫風格,務必用 clang-format 確認一致
  • 注意看程式碼規範的「clang-format 工具和一致的程式撰寫風格」,筆記列出的程式碼依循指定的風格。
void refcountadd(array_t *a) {
  if (a != NULL) {
    atomic_fetch_add(&a->ref_count, 1);
  }
}

void refcountsub(array_t *a) {
  if (a != NULL) {
    atomic_fetch_sub(&a->ref_count, 1);
  }
}

經過調整後可以成功安全釋放記憶體:

==82789== 
==82789== HEAP SUMMARY:
==82789==     in use at exit: 0 bytes in 0 blocks
==82789==   total heap usage: 44 allocs, 44 frees, 2,584 bytes allocated
==82789== 
==82789== All heap blocks were freed -- no leaks are possible
==82789== 
==82789== For lists of detected and suppressed errors, rerun with: -s
==82789== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

steal 的競爭問題

7b189a3

在原始的 steal 的實作中是使用 for 迴圈從第一個執行緒開始嘗試偷竊工作,這樣會容易導致多個執行緒都同時讀取同一個 deque 所產生的競爭問題,所以這邊可以改進成不同執行緒從隨機數開始依序的偷取,減少執行緒之間競爭同一個 deque 的問題。

接下來測試執行緒數量由 24 到 1024 個,有無使用隨機數之間的速度,可以看到大部分速度都有提昇

random_seq

TODO: 閱讀指定的論文〈Cilk: An Efficient Multithreaded Runtime System〉並落實

摘錄論文關鍵論述、闡述給定測驗題的程式碼和論文的落差、說明如何用 C11 Atomics 實作 deque 並建構足以進行 work-stealing 的並行結構

根據 〈Cilk: An Efficient Multithreaded Runtime System〉的描述,含有執行程式碼與所需的參數的稱為 closure,每一個處理器都會有自己的 ready queue,用來保存已經準備好的 closures,每一個 closure 都會有對應的 level,計算的方法是從 spawn tree 的根節點一直到自己本身所經過的 closure 數量。

論文中相關操作

spawn & spawn next

當執行緒執行時,可能會 spawn 子執行緒,若現在有一個位於 level L 的執行緒要 spawn 一個子執行緒 T 則需要執行以下的步驟:

  1. 先為 T 分配且初始化 closure
  2. 將可用的參數複製到 closure,初始化指向缺失的參數的指標和 join counter
  3. 將該 closure 標記為 level (L + 1)
  4. 如果沒有缺少的參數,則將 closure 放到 ready queue 級別為 (L + 1) 的列表的頭部

一樣是產生新的 closure,spawn 產生的新 closure 被視為 child,但 spawn next 則是被視為 successor,所以如果是 spawn next 則是需要將 closure 標記成 level L

46D90929-62BF-4913-B036-CA6CB94A83BD

send argument(k, value)
當執行緒執行該指令時,代表要傳送參數到其他的執行緒,其中 k 是 Cilk language 提供的一種資料型態 (continuation),用來包含指向的 closure 指標和其參數所在的偏移量,而傳送參數的流程為:

  1. 利用 continuation k 找到對應的 closure 和參數位置
  2. value 放到對應的參數位置中並且將 join counter 減一
  3. 檢查 join counter 是否為 0,若成立則代表該 closure 可以等待被執行

work stealing 操作過程 :
首先,處理器會先檢查 ready queue 是否為空

  • ready queue 為空:
    代表目前該佇列已經沒有 closure 正在等待執行了,所以可以進行 work stealing 的操作:
    1. 隨機選擇一個處理器作為 victim
    2. 如果 victim 的 ready queue 為空則回到步驟 1 重新選擇
    3. 從 victim 的 ready queue 中最上層非空 level 中選擇尾部的執行緒並執行
  • ready queue 為非空:
    執行以下步驟:
    1. 移除目前佇列中最深的非空 level 中位於 head 的執行緒
    2. 從 closure 提取該執行緒,並且執行

闡述給定測驗題的程式碼和論文的落差

原論文是設計 C-based runtime system 以執行多執行緒並行程式,因此除 work stealing 外,有額外定義其他特殊的資料結構,在本測驗中並沒有使用,不過在測驗中有嘗試實作出論文中提到的 closure,並且以結構體 wort_t 進行定義,並使用 atomic_int 紀錄參數缺少的數量,若此數為 0 代表該 closure 已經準備好執行。

留意 "Modeling performance" 和 "A theoretical analysis of the Cilk scheduler",描述本實作的性能表現。

TODO: 實作「平行化」的資料排序程式

將上述 work-steal 程式碼應用於「平行化」的資料排序程式的實作,應確保程式碼的正確性和效率。