sysprog
      • Sharing URL Link copied
      • /edit
      • View mode
        • Edit mode
        • View mode
        • Book mode
        • Slide mode
        Edit mode View mode Book mode Slide mode
      • Customize slides
      • Note Permission
      • Read
        • Owners
        • Signed-in users
        • Everyone
        Owners Signed-in users Everyone
      • Write
        • Owners
        • Signed-in users
        • Everyone
        Owners Signed-in users Everyone
      • Engagement control Commenting, Suggest edit, Emoji Reply
    • Invite by email
      Invitee

      This note has no invitees

    • Publish Note

      Share your work with the world Congratulations! 🎉 Your note is out in the world Publish Note

      Your note will be visible on your profile and discoverable by anyone.
      Your note is now live.
      This note is visible on your profile and discoverable online.
      Everyone on the web can find and read all notes of this public team.
      See published notes
      Unpublish note
      Please check the box to agree to the Community Guidelines.
      View profile
    • Commenting
      Permission
      Disabled Forbidden Owners Signed-in users Everyone
    • Enable
    • Permission
      • Forbidden
      • Owners
      • Signed-in users
      • Everyone
    • Suggest edit
      Permission
      Disabled Forbidden Owners Signed-in users Everyone
    • Enable
    • Permission
      • Forbidden
      • Owners
      • Signed-in users
    • Emoji Reply
    • Enable
    • Versions and GitHub Sync
    • Note settings
    • Note Insights
    • Engagement control
    • Transfer ownership
    • Delete this note
    • Insert from template
    • Import from
      • Dropbox
      • Google Drive
      • Gist
      • Clipboard
    • Export to
      • Dropbox
      • Google Drive
      • Gist
    • Download
      • Markdown
      • HTML
      • Raw HTML
Menu Note settings Versions and GitHub Sync Note Insights Sharing URL Help
Menu
Options
Engagement control Transfer ownership Delete this note
Import from
Dropbox Google Drive Gist Clipboard
Export to
Dropbox Google Drive Gist
Download
Markdown HTML Raw HTML
Back
Sharing URL Link copied
/edit
View mode
  • Edit mode
  • View mode
  • Book mode
  • Slide mode
Edit mode View mode Book mode Slide mode
Customize slides
Note Permission
Read
Owners
  • Owners
  • Signed-in users
  • Everyone
Owners Signed-in users Everyone
Write
Owners
  • Owners
  • Signed-in users
  • Everyone
Owners Signed-in users Everyone
Engagement control Commenting, Suggest edit, Emoji Reply
  • Invite by email
    Invitee

    This note has no invitees

  • Publish Note

    Share your work with the world Congratulations! 🎉 Your note is out in the world Publish Note

    Your note will be visible on your profile and discoverable by anyone.
    Your note is now live.
    This note is visible on your profile and discoverable online.
    Everyone on the web can find and read all notes of this public team.
    See published notes
    Unpublish note
    Please check the box to agree to the Community Guidelines.
    View profile
    Engagement control
    Commenting
    Permission
    Disabled Forbidden Owners Signed-in users Everyone
    Enable
    Permission
    • Forbidden
    • Owners
    • Signed-in users
    • Everyone
    Suggest edit
    Permission
    Disabled Forbidden Owners Signed-in users Everyone
    Enable
    Permission
    • Forbidden
    • Owners
    • Signed-in users
    Emoji Reply
    Enable
    Import from Dropbox Google Drive Gist Clipboard
       owned this note    owned this note      
    Published Linked with GitHub
    Subscribed
    • Any changes
      Be notified of any changes
    • Mention me
      Be notified of mention me
    • Unsubscribe
    Subscribe
    # Linux 核心專題: 並行程式設計 > 執行人: kkkkk1109 > [簡報內容](https://docs.google.com/presentation/d/1MTa8o6FMPKAB01YYOXVmzj_e0gA1vfJe/edit?usp=drive_link&ouid=104070822328263146319&rtpof=true&sd=true) > [講解影片](https://www.youtube.com/watch?v=JDrb_v_Im5g) ### Reviewed by `chloe0919` > EMPTY 代表搶工作失敗,試著在同個佇列中再搶一次 應為 ABORT 才是搶工作失敗,需要在同佇列中再嘗試偷竊一次 > 收到!感謝提醒 ### Reviewed by `fennecJ` * 透過 Work-stealing 從其他 theads 把工作偷過來執行的手法雖然可以提昇總體的 throughput ,卻會因為 task migration 的關係造成額外的成本,若 task migration 頻繁發生,甚至可能導致執行上花費大量時間處理因 task migration 產生的成本反而使總體執行效率降低。為了降低 task migration 造成的衝擊,當今天有 task 滿足可說明影片中被竊取的條件時「 Top <= bottom 」,我們還有哪些可以考慮的因素決定是否要竊取該 task ? > 我認為還可以從剩餘工作較多的佇列偷取,並限制可偷取工作執行緒數量來減少 task migration 的大量成本 * Bloom filter 透過 hash 達成高效查詢的能力,但它卻有無法刪除存在於 table 內的 element 的顯著缺點,針對這個問題,資料科學家提出了改善的資料結構: [Counting Bloom Filter](https://en.wikipedia.org/wiki/Counting_Bloom_filter) 以及 [Quotient filter](https://en.wikipedia.org/wiki/Quotient_filter) 可否請你就上述不同資料結構進行比較,針對實做成本、支援操作、時間複雜度進行探討,並歸納出各自適合的應用場景。 > 可以! 感謝補充,我會實作看看 ### Reviewed by `Wufangni` > 判斷目前佇列的剩餘工作,由 top 和 bottom 的關係判斷 若能利用 top 和 bottom 的差當作佇列目前的任務多寡來判斷該優先偷取哪個佇列的任務,是否能降低空佇列(佇列內部)發生的情況? > 收到,我認為此想法是可行的,可以實驗看看。 ### Reviewed by `stevendd543` 想請問後面有提到將 channel 資料透過互斥鎖保護 ```c mutex_lock(&ch->data_mtx); *data = *ptr; mutex_unlock(&ch->data_mtx); ``` > 之後改成使用 atomic 操作,沒有使用互斥鎖是因為在 `chan_sen_unbuf` 和 `chan_recv_unbuf` 不會同時進入到 `*data = *ptr` 和 `*ptr = data` 操作中,因此在前後加上互斥鎖無法解決 data race 的問題。實際上是外部 `reader` 存取 `msg` 時造成 data race,且已經使用了 `send_mtx` 和 `recv_mtx`,想要避免使用多個鎖而造成 dead lock 的情況。 就我了解 msg 也是指向 chnnel 為什麼前面將其保護,後面 main 在存取的時候還需要再使用 atomic 操作? ` atomic_fetch_add_explicit(&msg_count[count], 1, memory_order_relaxed);` > 雖然已經使用了 atomic 操作在進行 `atomic_fetch_add_explicit(&msg_count[msg], 1, memory_order_relaxed)` ,但這個 `msg` 需要再進行讀取一次,並且是沒有進行保護的,很容易就被其他執行緒同時更改這個值,而導致讀取失敗和寫入失敗,因此才使用 `count` 進行 atomic 操作來避免上述事件發生。 ### Reviewed by `Shiang1212` * 建議一 > 成功後,以 new_head 存取 old_head->next,若 new_head 為 0 ,則代表為最後一個節點,返回 NULL。 這段話看起來是用來描述這行程式碼: ```c new_head = atomic_load(&old_head->next); ``` 這裡使用 "存取" 這個詞不太恰當,函式名稱都有 "load" 字眼出現了,應該使用 "載入" 或 "讀取" 之類的詞,建議可以改成:使用 `atomic_load` 載入 `old_head->next` 的值,並將其賦值給 `new_head`。 > 收到!謝謝指正 * 建議二 > `fph` 和 `fpt` 對應到 `free_pool` 的頭和尾 >`insert_pool` 將 `node` 放入 `free_pool` 的節尾 >`free_pool` 為釋放 `free_pool` 中的節點 `free_pool` 應是 [lfq.c](https://gist.github.com/jserv/128b735e8e73846d38bf1e98ba553607) 中的一個函式,如上面的例子,若你想表達是 retire list,你應該使用其他詞來表示,否則容易造成誤會。 > 收到!謝謝指正 ## TODO: 紀錄閱讀 [並行程式設計](https://hackmd.io/@sysprog/concurrency) 教材中遇到的問題 > 重現實驗並嘗試對內文做出貢獻 > [閱讀筆記](https://hackmd.io/@kkkkk1109/ry9tV2hzA/edit) #### 排程器 CPU 會為各個工作進行排程,來決定下一個工作,可以是 static 排程,也可以是 dynamic 的排程。 排程有多種演算法,依照時間分割(round-robin scheduling 或 time slicing) 用於處理相同重要性的工作,而任務優先順序的 (priority scheduling)則處理那些有不同時效性的任務,也就是 hard real-time 的工作。 :::info 排程器是在哪裡運行的 是會有一個CPU來完成這些事情嗎 排程這件事算一個task嗎 ::: #### 搶佔式與非強取式核心 兩者的核心差別為,工作交出CPU的使用權是強制性的或是非強制的。 非搶佔式(non-preemptive)不會依照工作的優先順序交出CPU的使用權,而是不定期的交出使用權。為了達到並行,因此頻率要夠快,否則讓使用者感受到等待時間,有下列好處 1. 實作單純 2. 工作中可使用**非再進入程式碼(non-reentrant code)**,換言之,每個工作不需擔心在程式未執行完畢時又重新進入。因此該工作本身所用的記憶區不會有被污染 (corruption) 的可能; 3. 對系統共用記憶區的保護動作可減至最少,因為每一工作在未使用完記憶區時不會放棄 CPU ,無須擔心會被其他工作在半途中修改; :::info 2、3項<s>不太懂</s> > 不懂就說不懂,不要說「不太懂」。教材有講解錄影,搭配課程相關教材。對照閱讀後,紀錄詳盡的問題。 ::: ## TODO: [第 9 週測驗題之 3](https://hackmd.io/@sysprog/linux2024-quiz9) > 解釋上述程式碼運作原理,包含延伸問題 此題是嘗試以 C11 Atomics 撰寫一 [work stealing](https://en.wikipedia.org/wiki/Work_stealing) 的程式碼 ![image](https://hackmd.io/_uploads/ByWOf_uUA.png) 可以把 work stealing 想成,兩執行緒各自有分配的任務,但若其中一方較早完成,便可以**偷走**另一方的任務,來避免空等的情況。 為了避免發生搶到另一方正在執行的任務,通常這種任務佇列會使用雙向佇列(double-ended queue),不同執行緒分別從佇列的頭和尾去完成任務。 接著介紹定義 * `work_t` ```c typedef struct work_internal { task_t code; atomic_int join_count; void *args[]; } work_t; ``` `task_t` 會回傳一 `work_t` 的指標,其定義如下,主要是回傳此工作的指標 ```c typedef struct work_internal *(*task_t)(struct work_internal *); ``` 而 `join_count` 為計算此任務還需要多少參數,`args` 即為此任務所需要的參數 ```c typedef struct { atomic_size_t size; _Atomic work_t *buffer[]; } array_t; ``` ```c typedef struct { /* Assume that they never overflow */ atomic_size_t top, bottom; _Atomic(array_t *) array; } deque_t; ``` 接著是工作佇列的定義,使用 `top` 和 `bottom` 指向佇列的頭和尾 接著來解釋各個操作 * `init` 使用 `atomic_init` 來初始 `deque_t` 的各個參數 * 將 `top` 、 `bottom` 設為 0 * `size_hint` 為 可放入幾項任務,並使用 malloc 配置記憶體空間 * 將 `size` 以 `size_hint` 初始化 * 將 `array` 指向 `a` 的指標 ```c void init(deque_t *q, int size_hint) { atomic_init(&q->top, 0); atomic_init(&q->bottom, 0); array_t *a = malloc(sizeof(array_t) + sizeof(work_t *) * size_hint); atomic_init(&a->size, size_hint); atomic_init(&q->array, a); } ``` 而主要操作有 `push` 、 `take` 和 `steal`,下列是一個 deque 的範例 ```graphviz digraph G { rankdir=LR; node[shape=record]; map [label="A[0]|<a1>A[1]|A[2]|A[3]|... |<an>A[n-2]|A[n-1] "]; node[shape=plaintext] top [fontcolor="red"] bottom [fontcolor="red"] top->map:a1 bottom->map:an } ``` * `push` 的操作為將新的任務放入 `bottom` 的位置,並使 `bottom += 1 ` ```c void push(deque_t *q, work_t *w) { size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed); size_t t = atomic_load_explicit(&q->top, memory_order_acquire); array_t *a = atomic_load_explicit(&q->array, memory_order_relaxed); if (b - t > a->size - 1) { /* Full queue */ resize(q); a = atomic_load_explicit(&q->array, memory_order_relaxed); } atomic_store_explicit(&a->buffer[b % a->size], w, memory_order_relaxed); atomic_thread_fence(memory_order_release); atomic_store_explicit(&q->bottom, DDDD, memory_order_relaxed); } ``` 這邊要注意,由於 `push` 可能會使佇列的空間改變,因此當佇列滿時會調用到 `resize` 來改變佇列大小 `atomic_thread_fence(memory_order_release)` 用來確保 fence 後的 `store` 必定排在 fence 前的操作 :::success `DDDD` 應為 `b+1` ::: * `take` 的操作為從自己的佇列中,將 `bottom - 1` 的工作取出執行,並將 `bottom -= 1` ,但要考慮到佇列內剩餘工作的數量,考量是否會發生和 `steal` 同時發生競爭的情況,主要分為兩個部分 1. 預設取得任務會成功,將 `bottom -1` 並存回 `bottom`,取得 `top` 和 `bottom` 的數值 ```c work_t *take(deque_t *q) { size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed) - 1; array_t *a = atomic_load_explicit(&q->array, memory_order_relaxed); atomic_store_explicit(&q->bottom, b, memory_order_relaxed); atomic_thread_fence(memory_order_seq_cst); size_t t = atomic_load_explicit(&q->top, memory_order_relaxed); work_t *x; ... } ``` 2. 判斷目前佇列的剩餘工作,由 `top` 和 `bottom` 的關係判斷 * `top` < `bottom` 代表還有工作,可直接取出 * `top` = `bottom` 代表只剩一個工作,可能造成和 `steal` 競爭最後一個工作,以 `atomic_compare_exchange_strong_explicit` 判斷是否失敗,失敗代表被 `steal` 搶走工作,復原 `bottom` ;反之則代表取得工作成功,將 `top += 1`, * `top` > `bottom` 代表沒有工作,需復原 `bottom` ```c ... if (t <= b) { /* Non-empty queue */ x = atomic_load_explicit(&a->buffer[b % a->size], memory_order_relaxed); if (t == b) { /* Single last element in queue */ if (!atomic_compare_exchange_strong_explicit(&q->top, &t, t + 1, memory_order_seq_cst, memory_order_relaxed)) /* Failed race */ x = EMPTY; atomic_store_explicit(&q->bottom,b + 1, memory_order_relaxed); } } else { /* Empty queue */ x = EMPTY; atomic_store_explicit(&q->bottom, b + 1, memory_order_relaxed); } return x; ``` <s> `AAAA` 應為 `t+1` ,`BBBB` 應為 `b+1` ,`CCCC` 應為 `b+1` </s> :::danger 專注在程式碼本身,不用抄題目,更正上方程式碼的內容。 ::: * `steal` 從 `top` 的方向取得工作,取得 `top` 和 `bottom` 後,一樣使用 `atomic_compare_exchange_strong_explicit` 判斷是否會和 `take` 競爭 ```c work_t *steal(deque_t *q) { size_t t = atomic_load_explicit(&q->top, memory_order_acquire); atomic_thread_fence(memory_order_seq_cst); size_t b = atomic_load_explicit(&q->bottom, memory_order_acquire); work_t *x = EMPTY; if (t < b) { /* Non-empty queue */ array_t *a = atomic_load_explicit(&q->array, memory_order_consume); x = atomic_load_explicit(&a->buffer[t % a->size], memory_order_relaxed); if (!atomic_compare_exchange_strong_explicit( &q->top, &t, t + 1, memory_order_seq_cst, memory_order_relaxed)) /* Failed race */ return ABORT; } return x; } ``` <s> `EEEE` 應為 `t+1` </s> :::danger 專注在程式碼本身,不用抄寫題目。 不要急著解說程式碼,應揣摩程式開發者的本意,說明「如果是我設計這段程式碼,我會怎麼做?」 ::: * `resize` 的動作為將佇列的空間加大,會直接開兩倍的空間給新的佇列。 ```c void resize(deque_t *q) { array_t *a = atomic_load_explicit(&q->array, memory_order_relaxed); size_t old_size = a->size; size_t new_size = old_size * 2; array_t *new = malloc(sizeof(array_t) + sizeof(work_t *) * new_size); atomic_init(&new->size, new_size); size_t t = atomic_load_explicit(&q->top, memory_order_relaxed); size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed); for (size_t i = t; i < b; i++) new->buffer[i % new_size] = a->buffer[i % old_size]; atomic_store_explicit(&q->array, new, memory_order_relaxed); ``` 接著是實際操作 `thread` 為每個 thread 各自處理自己的工作佇列,使用 `take` 取得任務 ```c void *thread(void *payload) { int id = *(int *) payload; deque_t *my_queue = &thread_queues[id]; while (true) { work_t *work = take(my_queue); if (work != EMPTY) { do_work(id, work); }else { ... ``` 當沒有工作後,則可以開始 `steal` 別個 thread 的工作 `ABORT` 代表搶工作失敗,試著在同個佇列中再搶一次;`EMPTY` 代表此佇列已空。只要偷到工作便會跳離迴圈 ```c for (int i = 0; i < N_THREADS; ++i) { if (i == id) continue; stolen = steal(&thread_queues[i]); if (stolen == ABORT) { i--; continue; /* Try again at the same i */ } else if (stolen == EMPTY) continue; /* Found some work to do */ break; } ``` 如果全部跑完後,都沒有偷到任務的話,則檢查 `done` ,代表所有任務都已做完,沒有的話再重新跑一次迴圈。 ```c if (stolen == EMPTY) { if (atomic_load(&done)) break; continue; } else { do_work(id, stolen); } printf("work item %d finished\n", id); return NULL; ``` --- ## TODO: [第 10 週測驗題之 1, 2, 3](https://hackmd.io/@sysprog/linux2024-quiz10) > 解釋上述程式碼運作原理,包含延伸問題 ### 測驗 `1` 此題延伸 [第 6 周測驗題](https://hackmd.io/@sysprog/linux2024-quiz6)中的 [bloom filter](https://en.wikipedia.org/wiki/Bloom_filter),發展在並行環境中的程式碼,觀察理論與實際效能。 Bloom Filter 利用雜湊函數,在不用走訪全部元素的前提,預測特定字串是否在資料結構中。 其架構如下 * n 個位元構成的 table * k 個雜湊函數:$h_1$、$h_2$、...、$h_k$ * 當有新的字串 s ,將 s 透過 k 個雜湊函數,會得到 k 個 `index` ,並將 `table[index]` 設為 `1` 當要檢查字串 s 是否存在,只需要透過這 k 個雜湊函數觀察 table 上的 index 是否為 1 即可。 ![image](https://hackmd.io/_uploads/rJ3g6OYUR.png) 不過此作法存在著錯誤,有可能字串 s1 和 s2 經過 k 個雜湊函數後的 `table` 一模一樣,實際上我們只有放入字串 s2 。因此我們只能確保說,此字串一定不在資料結構中,但不能確保一定存在。 此外,無法將字串刪除,因為刪除可能會影響到其他字串的 index ,因此 bloom filter **只能新增,不能移除**。 此題的 table 為 $2^{28}$ 個位元,並使用 2 個雜湊函數 * `bloom_s` 為 bloom filter 的 table,而 table size 為 `2^28` 個位元 ```c #define BLOOMFILTER_SIZE 268435456 (2^28) #define BLOOMFILTER_SIZE_BYTE BLOOMFILTER_SIZE / sizeof(volatile char) struct bloom_s { volatile char data[BLOOMFILTER_SIZE_BYTE]; }; ``` 並將 `bloom_s` 定義成 `bloom_t` ```c typedef struct bloom_s bloom_t; ``` * `get` 為獲得 table 中的 key 的 bit ```c static inline int get(bloom_t *filter, size_t key) { uint64_t index = key / sizeof(char); uint8_t bit = 1u << (key % sizeof(char)); return (filter->data[index] & bit) != 0; } ``` * `set` 將 table 中的 key 的 bit 設為 1 ```c static inline int set(bloom_t *filter, size_t key) { uint64_t index = key / sizeof(char); uint64_t bit = 1u << (key % sizeof(char)); return (atomic_fetch_or(&filter->data[index], bit) & bit) == 0; } ``` <s> `AAAA` 應為 `atomic_fetch_or` ,和 `bit` 做 `or` 運算 </s> :::danger 不用抄寫題目,專注在程式碼本身。 ::: * `bloom_new` 為開啟一個新的 bloom filter,使用 `memset` 將所有 bit 設為 0 ```c bloom_t *bloom_new(bloom_allocator allocator) { bloom_t *filter = allocator(sizeof(bloom_t)); memset(filter, 0, sizeof(bloom_t)); return filter; } ``` * `bloom_add` 將 key 透過 hash 產生 `hbase`,再將 `hbase` 分成 `h1` 和 `h2` 放入 table 中。 ```c void bloom_add(bloom_t *filter, const void *key, size_t keylen) { uint64_t hbase = hash(key, keylen); uint32_t h1 = (hbase >> 32) % BLOOMFILTER_SIZE; uint32_t h2 = hbase % BLOOMFILTER_SIZE; set(filter, h1); set(filter, h2); } ``` * `bloom_test` 測試 `key` 是否存在於資料結構中 ```c int bloom_test(bloom_t *filter, const void *key, size_t keylen) { uint64_t hbase = hash(key, keylen); uint32_t h1 = (hbase >> 32) % BLOOMFILTER_SIZE; uint32_t h2 = hbase % BLOOMFILTER_SIZE; return get(filter,h1) & get(filter,h2); } ``` * `bloom_destroy` 會釋放 bloom filter 中的記憶體 * `bloom_clear` 將 bloom filter 所有 bit 設為 0 接著看測試檔案 * `globals_t` ```c typedef struct { int parent_fd; bloom_t *filter; uint32_t op_counter; } globals_t; globals_t globals; ``` :::info `parent_fd` 可以讓 child process 和 parent process 透過 pipe 傳輸資料 `op_counter` 做了幾筆操作 ::: * `worker_loop` `key` 為放入的字串,`key_len` 為字串長度 ```c void worker_loop() { const u_int8_t key[] = "wiki.csie.ncku.edu.tw"; u_int64_t key_len = strlen((const char *) key); ... } ``` [`getpid`](https://man7.org/linux/man-pages/man2/getpid.2.html) 可以得到目前使用的 process id,[`srand`](https://man7.org/linux/man-pages/man3/rand.3p.html)可以產生亂數種子,使用 `rand` 基於產生的亂數種子產生亂數 ```c ... int *k = (int *) key; srand(getpid()); *k = rand(); ... ``` 根據 `k` 的數值,決定是要將 key 放入資料結構中或是測試,並將 `*k++` 重複迴圈 ```c while (1) { int n = (*k) % 100; if (n < CONTAINS_P) { bloom_test(globals.filter, key, key_len); } else { bloom_add(globals.filter, key, key_len); } (*k)++; globals.op_counter++; } ``` :::info 使用 85% 的機率進行測試,15%的機率放入 bloom filter ::: * `create_worker` 首先,使用 [`pipe`](https://man7.org/linux/man-pages/man2/pipe.2.html) 創造可以在 process 間傳送訊息的通道,而可以透過 `fd` 讀寫。使用 [`fork()`](https://man7.org/linux/man-pages/man2/fork.2.html) 創造 child process , 並運行 `worker_loop`。 `fork()` 會回傳 pid, `pid = 0` 代表為 child process, `pid = -1` 則為 fork 失敗。 ```c int create_worker(worker *wrk) { int fd[2]; int ret = pipe(fd); if (ret == -1) return -1; pid_t pid = fork(); if (!pid) { /* Worker */ close(fd[0]); globals.parent_fd = fd[1]; worker_loop(); exit(0); } if (pid < 0) { printf("ERROR[%d]:%s", errno, strerror(errno)); close(fd[0]); close(fd[1]); return -1; } close(fd[1]); wrk->pid = pid; wrk->fd = fd[0]; return 0; } ``` 在 `main` 中,會創建 `N_WORKERS` 個 `create_worker` ,最後再透過 [kill](https://man7.org/linux/man-pages/man2/kill.2.html)去中止 process ```c for (int i = 0; i < N_WORKERS; i++) { uint32_t worker_out = 0; if (kill(workers[i].pid, SIGKILL)) { bloom_destroy(&globals.filter, bloom_free); printf("ERROR[%d]:%s", errno, strerror(errno)); exit(1); } (void) read(workers[i].fd, &worker_out, sizeof(uint32_t)); globals.op_counter += worker_out; } ``` ### 使用 [Counting Bloom filter](https://en.wikipedia.org/wiki/Counting_Bloom_filter) 來刪除字串 原先的 Bloom Filter 中,若隨意刪除字串,可能會影響其他字串的在 table 中的 index,Counting Bloom filter 使用了 counter 來計算每個 index 被使用了幾次,當要進行刪除時,只需要將其對應的 counter 減一即可。 在 filter 結構中,加入 counter ```c struct bloom_s { volatile char data[BLOOMFILTER_SIZE_BYTE]; uint8_t counter[BLOOMFILTER_SIZE]; }; ``` 在 `set` 中,將其對應的 counter 加一 ```c static inline int set(bloom_t *filter, size_t key) { uint64_t index = key / sizeof(char); uint64_t bit = 1u << (key % sizeof(char)); filter->counter[key]++; return (atomic_fetch_or(&filter->data[index], bit) & bit) == 0; } ``` 增加了 `bloom_delete`,首先獲得字串對應到的 index `h1`、`h2`,並確保此字串在 filter 中,再將其刪除。 ```c int bloom_delete(bloom_t *filter, const void *key, size_t keylen) { uint64_t hbase = hash(key, keylen); uint32_t h1 = (hbase >> 32) % BLOOMFILTER_SIZE; uint32_t h2 = hbase % BLOOMFILTER_SIZE; // make sure the delete string is in the table if(get(filter,h1) & get(filter,h2)) { filter->counter[h1]--; filter->counter[h2]--; if(filter->counter[h1] == 0 ) set_zero(filter,h1); if(filter->counter[h2] == 0 ) set_zero(filter,h2); return 1; } return 0; } ``` 使用 `set_zero` 刪除 table 中的 index ```c static inline int set_zero(bloom_t *filter, size_t key) { uint64_t index = key / sizeof(char); uint64_t bit = 1u << (key % sizeof(char)); uint64_t mask = 0xffffffffffffffff ^ bit; filter->data[index] &= mask; return 0; } ``` ### 測驗 `2` `lfq` 嘗試實作精簡的並行佇列 (concurrent queue),運用 [hazard pointer](https://en.wikipedia.org/wiki/Hazard_pointer) 來釋放並行處理過程中的記憶體。 首先,先介紹 Hazard pointer,於〈[並行程式設計: Hazard pointer](https://hackmd.io/@sysprog/concurrency-hazard-pointer#%E4%B8%A6%E8%A1%8C%E7%A8%8B%E5%BC%8F%E8%A8%AD%E8%A8%88-Hazard-pointer)〉中提到 > 在並行程式設計中,當我們在存取共用的記憶體物件時,需要考慮到其他執行緒是否有可能也正在存取同一個物件,若要釋放該記憶體物件時,不考慮這個問題,會引發嚴重的後果,例如 dangling pointer。 > 使用 mutex 是最簡單且直觀的方法:存取共用記憶體時,acquire lock 即可保證沒有其他執行緒正在存取同一物件,也就可安全地釋放記憶體。但若我們正在存取的是一種 lock-free 資料結構,當然就不能恣意地使用 lock,因為會違反 lock-free 特性,即無論任何執行緒失敗,其他執行緒都能可繼續執行。於是乎,我們需要某種同為 lock-free 的記憶體物件回收機制。 因此使用了 Hazard pointer 的設計,其架構如下 ![image](https://hackmd.io/_uploads/HkjJTvcL0.png) 每個 thread 都有自己的 hazard pointer 和 retire list。 設想以下情境,若同個物件, thread 1 正在讀取,但 thread 2 要釋放此記憶體,若是 thread 2 先進行釋放的動作,那麼 thread 1 便會出錯。因此要進行釋放前,要先確保無人讀取或已經讀取完畢,才能進行釋放。 * Hazard pointer : 此 thread 正在讀取的物件指標 * Retire list : 此 thread 即將釋放的物件指標 因此在要釋放前,會加入 retire list,並查看每個 thread 的 hazard pointer 是否指向要釋放的物件,若有,則等到其讀取完成,最後再進行釋放。 而 hazard 涉及為單一 thread 寫入,多個 thread 讀取,才能符合以上情境。 接著來看 `lfq` 中的各種定義 * `lfq_node` `data` 指向物件,`next ` 指向下個節點, `can_free` 表示這個物件是否可以被釋放 :::success `free_next` 指向 `free pool` 中的下個節點,應該就是 `retire list` ::: ```c struct lfq_node { void *data; union { struct lfq_node *next; struct lfq_node *free_next; }; bool can_free; }; ``` * `lfq_ctx` * `head` 和 `tail` 分別指向佇列的頭和尾,使用 `alignas(64)` 避免造成 false sharing * `HP` 即為 hazard pointer,`MAX_HP_SIZE` 為最多可以有幾個 `HP` ? * `fph` 和 `fpt` 對應到 `free pool` 的頭和尾 * `bool is_freeing` 確認是否正在釋放 ```c struct lfq_ctx { alignas(64) struct lfq_node *head; int count; struct lfq_node **HP; /* hazard pointers */ int *tid_map; bool is_freeing; struct lfq_node *fph, *fpt; /* free pool head/tail */ /* FIXME: get rid of struct. Make it configurable */ int MAX_HP_SIZE; /* avoid cacheline contention */ alignas(64) struct lfq_node *tail; }; ``` :::success `tid_map` 目前不清楚 thread id, ::: 接下來是 `lfq` 的各項操作 * `lfq_init` 會初始化整個 `lfq_ctx` * `lfq_release` 會釋放整個 `lfq_ctx` * `insert_pool` 將 `node` 放入 `free_pool` 的節尾 * 函式 `free_pool` 為釋放 `free pool` 中的節點 :::success `AAAA` 應為檢查目前是否有人正在`free` ,所以應為 ``,操作成功即可進入迴圈,失敗代表已經有人在操作。 ::: 獲得 `free_pool` 的 `head` 後,逐一檢查其中的節點是否可以釋放,若為以下條件 1. `(!atomic_load(&p->can_free)` 代表 `can_free = 0` 無法釋放 2. `(!atomic_load(&p->free_next))` 代表此節點已經是佇列末端,無法釋放 3. `in_hp(ctx, (struct lfq_node *) p)` 此節點被 `HP` 所指向,有 thread 正在讀取 則會跳出迴圈,並說明目前並無釋放記憶體,或者是已經釋放完成。 ```c static void free_pool(struct lfq_ctx *ctx, bool freeall) { bool old = 0; if (!atomic_compare_exchange_strong(&ctx->freeing,0,1)) return; for (int i = 0; i < MAX_FREE || freeall; i++) { struct lfq_node *p = ctx->fph; if ((!atomic_load(&p->can_free)) || (!atomic_load(&p->free_next)) || in_hp(ctx, (struct lfq_node *) p)) break; ctx->fph = p->free_next; free(p); } atomic_store(&ctx->is_freeing, false); atomic_thread_fence(memory_order_seq_cst); } ``` * `safe_free` 為釋放特定節點 首先,檢查此節點是可以釋放且不在 `HP` 中,接著判斷是否有人在釋放節點中 成功替換,則將此節點釋放,並將 `is_freeing` 改回 `false` 代表釋放完成。 若無法釋放,則加入 `free pool` 即可。 ```c static void safe_free(struct lfq_ctx *ctx, struct lfq_node *node) { if (atomic_load(&node->can_free) && !in_hp(ctx, node)) { /* free is not thread-safe */ bool old = 0; if (atomic_compare_exchange_strong(&ctx->freeing,0,1)) { /* poison the pointer to detect use-after-free */ node->next = (void *) -1; free(node); /* we got the lock; actually free */ atomic_store(&ctx->is_freeing, false); atomic_thread_fence(memory_order_seq_cst); } else /* we did not get the lock; only add to a freelist */ insert_pool(ctx, node); } else insert_pool(ctx, node); free_pool(ctx, false); } ``` * `lfq_enqueue` 在 `lfq_ctx` 中新增節點,使用 `atomic_exchange` 和 `atomic_store` 來替換 `ctx->tail` 和 `old_tail->next`,並確保 `tail->next` 必為 NULL ```c int lfq_enqueue(struct lfq_ctx *ctx, void *data) { struct lfq_node *insert_node = calloc(1, sizeof(struct lfq_node)); if (!insert_node) return -errno; insert_node->data = data; struct lfq_node *old_tail = atomic_exchange(&ctx->tail, insert_node); assert(!old_tail->next && "old tail was not NULL"); atomic_store(&old_tail->next, insert_node); return 0; } ``` * `lfq_dequeue` 會移除節點 ```c void *lfq_dequeue(struct lfq_ctx *ctx) { int tid = alloc_tid(ctx); /* many thread race */ if (tid == -1) return (void *) -1; void *ret = lfq_dequeue_tid(ctx, tid); free_tid(ctx, tid); return ret; } ``` 首先呼叫 `alloc_tid`,會檢查目前有哪些 `tid` 正在運行,回傳 `-1` 代表所有的 thread 都正在運行;非 `-1` 回傳剛開始運行的 thread ```c static int alloc_tid(struct lfq_ctx *ctx) { for (int i = 0; i < ctx->MAX_HP_SIZE; i++) { if (ctx->tid_map[i] == 0) { int old = 0; if (atomic_compare_exchange_strong(&ctx->tid_map[i], &old, 1)) return i; } } return -1; } ``` 接著呼叫 `lfq_dequeue_tid`,移除其中一個節點,並記錄是 `tid` 正在操作 存取 `old_head` 、 `new_head` ```c void *lfq_dequeue_tid(struct lfq_ctx *ctx, int tid) { struct lfq_node *old_head, *new_head; ... } ``` `old_head` 為目前的 `head` ,並將 `HP[tid]` 指向此,表示正在讀取此節點。接著再 `atomic_load(&ctx->head)` 檢查是否有被其他 thread 移除,被移除再到 `retry` 重新執行。 ```c do { retry: old_head = atomic_load(&ctx->head); atomic_store(&ctx->HP[tid], old_head); atomic_thread_fence(memory_order_seq_cst); if (old_head != atomic_load(&ctx->head)) goto retry; ... } while (!atomic_compare_exchange_strong(CCCC)); ``` 成功後,以 `new_head` 載入 `old_head->next`,若 `new_head` 為 0 ,則代表為最後一個節點,返回 NULL ```c do { ... new_head = atomic_load(&old_head->next); if (new_head == 0) { atomic_store(&ctx->HP[tid], 0); return NULL; } while (!atomic_compare_exchange_strong(&ctx->head,&old_head,new_head)); ``` 最後要再將 `ctx->head` 換成 `new_head` * `lfg_count_free_list` 計算 `free pool` 中有幾個節點 * `lfq_release` 為釋放整個 `lfq`,分成三個部分釋放 1. 釋放佇列 `ctx` 中的節點 檢查佇列內部是否有資料 ```c if (ctx->tail && ctx->head) { /* if we have data in queue */ while ((struct lfq_node *) ctx->head) { /* while still have node */ struct lfq_node *tmp = (struct lfq_node *) ctx->head->next; safe_free(ctx, (struct lfq_node *) ctx->head); ctx->head = tmp; } ctx->tail = 0; } ``` 2. 釋放佇列 `free pool` 中的節點 ```c if (ctx->fph && ctx->fpt) { free_pool(ctx, true); if (ctx->fph != ctx->fpt) return -1; free(ctx->fpt); /* free the empty node */ ctx->fph = ctx->fpt = 0; } ``` 3. 檢查函式 `free_pool` 是否成功,最後再釋放 `HP` 和 `tid_map`。 ```c if (ctx->fph || ctx->fpt) return -1; free(ctx->HP); free(ctx->tid_map); memset(ctx, 0, sizeof(struct lfq_ctx)); return 0; ``` ## TODO: [第 12 週測驗題之 2, 3](https://hackmd.io/@sysprog/linux2024-quiz12) > 解釋上述程式碼運作原理,包含延伸問題 ### 測驗 `1` 此題希望修改以下程式碼,使用 futex 的方式,達到等待 3 秒的效果 ```c #include <linux/futex.h> #include <stdint.h> #include <stdio.h> #include <sys/syscall.h> #include <time.h> #include <unistd.h> int futex_sleep(time_t sec, long ns) { uint32_t futex_word = 0; struct timespec timeout = {sec, ns}; return syscall(SYS_futex, AAAA, BBBB, futex_word, &timeout, NULL, 0); } int main() { time_t secs = 3; printf("Before futex_sleep for %ld seconds\n", secs); futex_sleep(secs, 0); printf("After futex_sleep\n"); return 0; } ``` 首先,我們先看 [`timespec` ](https://man7.org/linux/man-pages/man3/timespec.3type.html) ,它表示了一個時間,並且精度可以達到奈秒。 接著查看 `futex` 的格式和參數說明 ```c long syscall(SYS_futex, uint32_t *uaddr, int futex_op, uint32_t val, const struct timespec *timeout, /* or: uint32_t val2 */ uint32_t *uaddr2, uint32_t val3); ``` Futex (Fast userspace mutex) 藉由使用一個 32 位元變數`futex word`,來和 Kernal 中的 wait queue 來互動,而需要進行同步的執行緒則會共享此變數。 * `uaddr` 指向著 `futexword` * `futex_op` 則表示著不同的 futex 操作 * `val` 對應著 `futex_op` 中的數值 * `timeout` 、 `uaddr2` 和 `val3` 只有特定的 `futex_op` 會使用到,其餘則可以忽略 FUTEX_WAIT 會查看 `uaddr` 所指向的值,是否和 `val` 相同,此作法是為了避免被其他執行緒改變 `futex_word` 的值,而造成無法進入 sleep。 由於預期是等待 3 秒,因此 `futex_op` 應為 `FUTEX_WAIT` ,而 `*uaddr` 的部份為 `&futex_word` 。 ### 測驗 `2` 此題使用 C11 Atomics 和 Linux 提供的 futex 系統呼叫,來模擬 Go 程式語言的 [goroutine](https://go.dev/tour/concurrency/1) 和 [channel](https://go.dev/tour/concurrency/2) 機制 goroutine 可以想像成一個較輕量的 Thread ,而goroutine共享著同樣的地址,因此同步地訪問共同記憶體也非常重要 ```go func main() { go say("world") say("hello") } ``` ![image](https://hackmd.io/_uploads/rySrlptEA.png) 原先的 goroutine 會呼叫另一個 goroutine ,當原先也就是 main 的 goroutine 結束,其他的 goroutine也會跟著結束。 而 channel 為各個 goroutine 間的通訊,我們可以以此來完成執行緒間的 wait 和同步。 ```go func say(s string, c chan string) { for i := 0; i < 5; i++ { time.Sleep(100 * time.Millisecond) fmt.Println(s) } c <- "FINISH" } func main() { ch := make(chan string) go say("world", ch) go say("hello", ch) <-ch <-ch } ``` ![image](https://hackmd.io/_uploads/HJUS-vj40.png) 在 `main` 中,創建了 channel ,表示用來傳送字串。 ```go ch := make(chan string) ``` 接著創建了兩個 goroutine ,其結束後會往 channel 傳送 "Finish" ```go func say(s string, c chan string) { for i := 0; i < 5; i++ { time.Sleep(100 * time.Millisecond) fmt.Println(s) } c <- "FINISH" } ``` 直到接收到兩個 "Finish" 後,才會結束程式 ```go func main() { ch := make(chan string) go say("world", ch) go say("hello", ch) <-ch <-ch } ``` 接著是使用 C11 Atomics 和 futex 實作 GO channel 首先是 `mutex_unlock` ,使用 atomic 操作對 mutex 減一並解鎖,確保不被其他執行緒影響,並查看目前是否有執行緒在等待,有的話就喚醒。 ```c void mutex_unlock(struct mutex *mu) { uint32_t orig = atomic_fetch_sub_explicit(&mu->val, 1, memory_order_relaxed); if (orig != LOCKED_NO_WAITER) { mu->val = UNLOCKED; futex_wake(&mu->val, 1); } } ``` 使用 [`CAS` ](https://en.wikipedia.org/wiki/Compare-and-swap)操作,會比較 `ptr` 和 `expect` 的值。 **相同**,則將 `new` 放入 `ptr`; **不同的話**,則將 `ptr` 的值放入 `expect` ,最後回傳 expect 。 ```c static uint32_t cas(_Atomic uint32_t *ptr, uint32_t expect, uint32_t new) { atomic_compare_exchange_strong_explicit( ptr, &expect, new, memory_order_acq_rel, memory_order_acquire); return expect; } ``` `mutex_lock` 用於將 lock 上鎖,先使用 `CAS` 看目前的 val 值是否為 `UNLOCKED` ,是的話則替換成 `LOCKED_NO_WAITER` ;否的話代表目前鎖有人在使用,因此會進入 futex_wait 直到解鎖。 ```c void mutex_lock(struct mutex *mu) { uint32_t val = cas(&mu->val, UNLOCKED, LOCKED_NO_WAITER); if (val != UNLOCKED) { do { if (val == LOCKED || cas(&mu->val, LOCKED_NO_WAITER, LOCKED) != UNLOCKED) futex_wait(&mu->val, LOCKED); } while ((val = cas(&mu->val, UNLOCKED, LOCKED)) != UNLOCKED); } } ``` 接著是 `channel` 的宣告,在此有兩種 channel ,分別是 `unbuffer` 和 `ring buffer` 。 ```c struct chan { _Atomic bool closed; /* Unbuffered channels only: the pointer used for data exchange. */ _Atomic(void **) datap; /* Unbuffered channels only: guarantees that at most one writer and one * reader have the right to access. */ struct mutex send_mtx, recv_mtx; /* For unbuffered channels, these futexes start from 1 (CHAN_NOT_READY). * They are incremented to indicate that a thread is waiting. * They are decremented to indicate that data exchange is done. * * For buffered channels, these futexes represent credits for a reader or * write to retry receiving or sending. */ _Atomic uint32_t send_ftx, recv_ftx; /* Buffered channels only: number of waiting threads on the futexes. */ _Atomic size_t send_waiters, recv_waiters; /* Ring buffer */ size_t cap; _Atomic uint64_t head, tail; struct chan_item ring[0]; }; ``` * `closed` 表示通道的開啟與否 ```c _Atomic bool closed; ``` * `datap` 用以指向需要交換的資料 (unbuffered) ```c _Atomic(void **) datap; ``` * `send_mtx` 、` recv_mtx`: 確保只有一位 writer 和 一位 reader (unbuffered) ```c struct mutex send_mtx, recv_mtx; ``` * `send_ftx` 、` recv_ftx` * unbuffered :會從 1 開始,表示 channel 尚未完成。增加代表有執行緒在等待,完成傳輸則減少。 * ring buffer :表示可供給傳送和接收的數量 ```c _Atomic uint32_t send_ftx, recv_ftx; ``` * `send_waiters` 、` recv_waiters` 目前正在等待傳送和接收的 Thread (buffered) ```c _Atomic size_t send_waiters, recv_waiters; ``` * 最後是定義了 ring buffer 的長度 `cap`,並使用 `lap` 來計數。 ```c size_t cap; _Atomic uint64_t head, tail; struct chan_item ring[0]; ``` ```c struct chan_item { _Atomic uint32_t lap; void *data; }; ``` 接著是 channel 的相關操作 ```c enum { CHAN_READY = 0, CHAN_NOT_READY = 1, CHAN_WAITING = 2, CHAN_CLOSED = 3, }; ``` * `chan_init` 初始化一個 channel , 由 `cap` 決定為 unbuffered 還是 buffered channel。 若為 buffered channel ,則使用 `memset` 將所設定的 buffer 以 0 填滿。 ```c static void chan_init(struct chan *ch, size_t cap) { ch->closed = false; ch->datap = NULL; mutex_init(&ch->send_mtx), mutex_init(&ch->recv_mtx); if (!cap) ch->send_ftx = ch->recv_ftx = CHAN_NOT_READY; else ch->send_ftx = ch->recv_ftx = 0; ch->send_waiters = ch->recv_waiters = 0; ch->cap = cap; ch->head = (uint64_t) 1 << 32; ch->tail = 0; if (ch->cap > 0) memset(ch->ring, 0, cap * sizeof(struct chan_item)); } ``` :::info 目前還不清楚為何 head = 1 << 32 ::: * `chan_make` 創建一個 channel ,並以 `alloc` 分配空間給 channel 。 ```c struct chan *chan_make(size_t cap, chan_alloc_func_t alloc) { struct chan *ch; if (!alloc || !(ch = alloc(sizeof(*ch) + cap * sizeof(struct chan_item)))) return NULL; chan_init(ch, cap); return ch; } ``` :::info 可以使用 malloc 嗎 ::: 以下是 buffered 的通道 * `chan_trysend_buf` 執行以下事項 1. 先檢查了 channel 是否開啟 ```c if (atomic_load_explicit(&ch->closed, memory_order_relaxed)) { errno = EPIPE; return -1; } ``` 2. 存取 tail 的位置,並存取目前即將寫入的位置。檢查 `item` 的 `lap` 和 `tail` 的 `lap` 是否一致,表示可寫入狀態 ,並檢查這次寫入後 tail 是否達到 buffer 的最後位置,達到的話就回到第一個位置。 ```c uint64_t tail, new_tail; struct chan_item *item; // check if the tail is the same do { tail = atomic_load_explicit(&ch->tail, memory_order_acquire); uint32_t pos = tail, lap = tail >> 32; item = ch->ring + pos; if (atomic_load_explicit(&item->lap, memory_order_acquire) != lap) { errno = EAGAIN; return -1; } if (pos + 1 == ch->cap) new_tail = (uint64_t)(lap + 2) << 32; else new_tail = tail + 1; } while (!atomic_compare_exchange_weak_explicit(&ch->tail, &tail, new_tail, ``` 6. `!atomic_compare_exchange_weak_explicit` 檢查 `tail` 和 `ch->tail` 是否相同,若失敗則代表有被打斷,重新執行 1 ~ 4 步驟 7. 成功後,將資料放入buffer, ```c item->data = data; atomic_fetch_add_explicit(&item->lap, 1, memory_order_release); ``` * `chan_send_buf` 使用 `chan_trysend_buf` 檢查是否可以傳送,若為 `-1` 則代表目前無法傳送。此時由於交換失敗,因此將 `&ch->send_ftx` 存入 `v` ,因此若 `v` 為零,代表可傳送餘額為 0 進入等待,直到被喚醒。 ```c static int chan_send_buf(struct chan *ch, void *data) { //ready to send while (chan_trysend_buf(ch, data) == -1) { if (errno != EAGAIN) return -1; // uint32_t v = 1; while (!atomic_compare_exchange_weak_explicit(&ch->send_ftx, &v, v - 1, memory_order_acq_rel, memory_order_acquire)) { // v is zero only when &ch->send_ftx is not equal to &v if (v == 0) { atomic_fetch_add_explicit(&ch->send_waiters, 1, memory_order_acq_rel); futex_wait(&ch->send_ftx, 0); atomic_fetch_sub_explicit(&ch->send_waiters, 1, memory_order_acq_rel); v = 1; } } } ``` 接著, `&ch->recv_ftx` 加一,增加可接收的餘額,並叫醒正在等待接收的 Thread。 ```c // recv_ftx + 1 atomic_fetch_add_explicit(&ch->recv_ftx, 1, memory_order_acq_rel); // if there are recv_waiters, wake up one thread if (atomic_load_explicit(&ch->recv_waiters, memory_order_relaxed) > 0) futex_wake(&ch->recv_ftx, 1); return 0; } ``` * `chan_tryrecv_buf` 、`chan_recv_buf` 行為和 `chan_trysendbuf` 、`chan_send_buf` 原理相同。 接下來是 unbuffered 的通道。 * `int chan_send_unbuf(struct chan *ch, void *data)` 先檢查通道是否開啟 ```c if (atomic_load_explicit(&ch->closed, memory_order_relaxed)) { errno = EPIPE; return -1; } ``` 使用 mutex 上鎖 `&ch->send_mtx` ```c mutex_lock(&ch->send_mtx); ``` 接著嘗試將資料放入 channel 中,成功的話,代表`ch->data` 和 `ptr` 一樣,此時 `ch->data` 就會是 `data` ; 失敗的話,則 `ptr` 會被替換成 `ch->datap`。 失敗時,由於目前 `ptr` 為 `ch->datap` ,為 channel 的 data 交換區的指標的指標,使用 `*ptr` 將指標內容置換成 `data` ,接著再將 `ch->datap` 指向 NULL。 使用`atomic_fetch_sub_explicit(&ch->recv_ftx, 1, memory_order_acquire) == CHAN_WAITING` 查看是否接收端在等待接收,有的話則喚醒。 ```c void **ptr = NULL; if (!atomic_compare_exchange_strong_explicit(&ch->datap, &ptr, &data, memory_order_acq_rel, memory_order_acquire)) { *ptr = data; atomic_store_explicit(&ch->datap, NULL, memory_order_release); if (atomic_fetch_sub_explicit(&ch->recv_ftx, 1, memory_order_acquire) == CHAN_WAITING) futex_wake(&ch->recv_ftx, CCCC = 1); } ``` 替換成功,則代表目前 `ch->data` 指向 `data`,使用 atomic 指令將 `&ch->send_ftx` +1,並回傳 `&ch->send_ftx` 初始值是否為 `CHAN_NOT_READY = 1`,是的話就持續等到接收端接收。 `futex_wait(&ch->send_ftx, CHAN_WAITING)` 會持續等待,直到 `&ch->send_ftx` 不等於 `CHAN_WAITING`。 最後再歸還鎖。 ```c else { if (atomic_fetch_add_explicit(&ch->send_ftx, 1, memory_order_acquire) == CHAN_NOT_READY) { do { futex_wait(&ch->send_ftx, CHAN_WAITING); } while (atomic_load_explicit( &ch->send_ftx, memory_order_acquire) == CHAN_WAITING); if (atomic_load_explicit(&ch->closed, memory_order_relaxed)) { errno = EPIPE; return -1; } } } mutex_unlock(&ch->send_mtx); return 0; } ``` * `chan_recv_unbuf` 和`chan_send_unbuf` 概念類似。 * `static int chan_recv_unbuf(struct chan *ch, void **data)` 先檢查要接收的位置 data 是否存在,和檢查 channel 是否開啟 ```c if (!data) { errno = EINVAL; return -1; } if (atomic_load_explicit(&ch->closed, memory_order_relaxed)) { errno = EPIPE; return -1; } ``` 將上鎖 `ch->recv_mtx` 上鎖,避免同時執行緒進行接收。 ```c mutex_lock(&ch->recv_mtx); ``` 檢查 `ch->datap` 是否為 NULL,是的話代表還**沒有傳送端傳送資料**;否的話代表已有**接收端傳送資料**。 已有傳送資料的情況下,會將要資料接收的位置 `data` 替換成 `ptr` ,而目前 `ptr` = `ch->datap` ,為傳送端傳送的資料。再將 `ch->datap` 指向 NULL,代表資料接收已完成,將傳送端的 futex 減 1 並喚醒先前等待的傳送端執行緒。 ```c void **ptr = NULL; if (!atomic_compare_exchange_strong_explicit(&ch->datap, &ptr, data, memory_order_acq_rel, memory_order_acquire)) { *data = *ptr; atomic_store_explicit(&ch->datap, NULL, memory_order_release); if (atomic_fetch_sub_explicit(&ch->send_ftx, 1, memory_order_acquire) == CHAN_WAITING) futex_wake(&ch->send_ftx, 1); } ``` 因為目前傳送端尚未傳送資料,接下來會將 `ch->recv_ftx` 加一表示等待,直到傳送端喚醒此執行緒。 ```c else { if (atomic_fetch_add_explicit(&ch->recv_ftx, 1, memory_order_acquire) == CHAN_NOT_READY) { do { futex_wait(&ch->recv_ftx, CHAN_WAITING); } while (atomic_load_explicit( &ch->recv_ftx, memory_order_acquire) == CHAN_WAITING); if (atomic_load_explicit(&ch->closed, memory_order_relaxed)) { errno = EPIPE; return -1; } } } ``` 最後歸還鎖 ```c mutex_unlock(&ch->recv_mtx); ``` 我們可以歸納出,如果傳送端先傳送資料, `ch->datap` 不為 NULL ,而接收端會進入 CAS 失敗的情況,喚醒傳送端的執行緒;反之,接收端先等待資料, `ch->datap` 不為 NULL,傳送端會進入 CAS 失敗,喚醒等待的接收端執行緒。 * `chan_close` 根據 `ch->cap` 決定要關閉的 channel 種類 ```c void chan_close(struct chan *ch) { ch->closed = true; if (!ch->cap) { atomic_store(&ch->recv_ftx, CHAN_CLOSED); atomic_store(&ch->send_ftx, CHAN_CLOSED); } futex_wake(&ch->recv_ftx, INT_MAX); futex_wake(&ch->send_ftx, INT_MAX); } ``` ### 排除 ThreadSanitizer 的錯誤訊息並提出改進方案 ThreadSanitizer 是一個可以檢查各個執行續間有沒有發生 Data race,並且會在編譯時進行偵測程式碼,並在程式運行時紀錄每個執行緒的行為,如造訪的記憶體位置、讀寫行為等,最後再進行報告。 我們可以寫一個會造成 Data race 的程式 `data_race.c` ```c #include <stdio.h> #include <pthread.h> pthread_mutex_t lock; int shared_var = 0; int times = 1000000; void* increment(void* arg) { for (int i = 0; i < times; ++i) { ++shared_var; } return NULL; } int main() { pthread_t t1, t2; pthread_create(&t1, NULL, increment, NULL); pthread_create(&t2, NULL, increment, NULL); pthread_join(t1, NULL); pthread_join(t2, NULL); printf("Final value: %d\n", shared_var); return 0; } ``` 並使用 ThreadSanitizer 進行編譯 ``` gcc -g -fsanitize=thread -o data_race data_race.c ``` 執行 `data_race.c` ``` FATAL: ThreadSanitizer: unexpected memory mapping 0x63faef1d9000-0x63faef1da000 ``` 反而出現的不是 Data race 的問題,而是 memory mapping 的問題,在 [Thread Sanitizer FATAL error on kernel version](https://github.com/google/sanitizers/issues/1716) 說明到可能是 ASLR(Address space layout randomization) 的問題。 ASLR 通過隨機地分配程序的記憶體位置,防止惡意程式或攻擊者利用已知的記憶體位置來進行攻擊,預設是 32 bit,我們將其設置為 28 bit。 ```shell $ sudo sysctl -w vm.mmap_rnd_bits=28 vm.mmap_rnd_bits = 28 ``` 再執行一次 `data_race.c` ```c ================== WARNING: ThreadSanitizer: data race (pid=43074) Read of size 4 at 0x55939c8a7018 by thread T2: #0 increment /home/kkkkk1109/2024q1week12/exam2/data_race.c:13 (data_race+0x129d) Previous write of size 4 at 0x55939c8a7018 by thread T1: #0 increment /home/kkkkk1109/2024q1week12/exam2/data_race.c:13 (data_race+0x12b5) Location is global '<null>' at 0x000000000000 (data_race+0x000000004018) Thread T2 (tid=43077, running) created by main thread at: #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8) #1 main /home/kkkkk1109/2024q1week12/exam2/data_race.c:26 (data_race+0x134e) Thread T1 (tid=43076, running) created by main thread at: #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8) #1 main /home/kkkkk1109/2024q1week12/exam2/data_race.c:25 (data_race+0x1331) SUMMARY: ThreadSanitizer: data race /home/kkkkk1109/2024q1week12/exam2/data_race.c:13 in increment ================== Final value: 2000000 ThreadSanitizer: reported 1 warnings ``` 說明發生了 data race。 接著在程式中加入 mutex 進行保護 ::: spoiler `data_race_protect.c` ```diff #include <stdio.h> #include <pthread.h> ++pthread_mutex_t lock; int shared_var = 0; int times = 1000000; void* increment(void* arg) { for (int i = 0; i < times; ++i) { ++ pthread_mutex_lock(&lock); shared_var; ++ pthread_mutex_unlock(&lock); } return NULL; } int main() { pthread_t t1, t2; ++ pthread_mutex_init(&lock, NULL); pthread_create(&t1, NULL, increment, NULL); pthread_create(&t2, NULL, increment, NULL); pthread_join(t1, NULL); pthread_join(t2, NULL); ++ pthread_mutex_destroy(&lock); printf("Final value: %d\n", shared_var); return 0; } ``` ::: ThreadSanitizer 就沒有報錯了 ``` $ ./data_race_protect Final value: 2000000 ``` 接著在測驗 `2` 加入 ThreadSanitizer 進行編譯 ```c CFLAGS = -std=c11 -Wall -Wextra -pthread -fsanitize=thread ``` 也出現了 data race 的問題,發現是 unbuffered 發生了問題 ```c WARNING: ThreadSanitizer: data race (pid=77546) Read of size 8 at 0x7f96901be1c8 by thread T1: #0 reader <null> (exam2+0x16a0) Previous write of size 8 at 0x7f96901be1c8 by thread T81: #0 chan_send_unbuf <null> (exam2+0x2e58) #1 chan_send <null> (exam2+0x3477) #2 writer <null> (exam2+0x15b2) Location is stack of thread T1. Thread T1 (tid=77548, running) created by main thread at: #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8) #1 create_threads <null> (exam2+0x18b0) #2 test_chan <null> (exam2+0x1b81) #3 main <null> (exam2+0x1d3b) Thread T81 (tid=77628, finished) created by main thread at: #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8) #1 create_threads <null> (exam2+0x18b0) #2 test_chan <null> (exam2+0x1baf) #3 main <null> (exam2+0x1d3b) ``` 執行緒 `T1` 在 `reader` 和執行緒 `T81` 在 `chan_send_unbuf` 同時存取同個記憶體位置 `0x7f96901be1c8` ,不過只靠此訊息無法得知實際是哪個變數被同時存取,因此我使用了 [Helgrind](https://valgrind.org/docs/manual/hg-manual.html) 來輔助進行 data race 的除錯。 Helgind 是 Valgrind 的其中一個工具,可以檢測使用以下事項 * POSIX thread API 的使用錯誤 * deadlock * data race 使用 helgrind 來檢查 data race ``` $ valgrind --tool=helgrind ./exam2 ``` :::spoiler ```c Possible data race during write of size 8 at 0x4AA1048 by thread #12 ==7850== Locks held: none ==7850== at 0x10A22E: chan_send_unbuf (in /home/kkkkk1109/2024q1week12/exam2/exam2) ==7850== by 0x10A5F6: chan_send (in /home/kkkkk1109/2024q1week12/exam2/exam2) ==7850== by 0x1092A5: writer (in /home/kkkkk1109/2024q1week12/exam2/exam2) ==7850== by 0x485396A: ??? (in /usr/libexec/valgrind/vgpreload_helgrind-amd64-linux.so) ==7850== by 0x4909AC2: start_thread (pthread_create.c:442) ==7850== by 0x499AA03: clone (clone.S:100) ==7850== ==7850== This conflicts with a previous read of size 8 by thread #2 ==7850== Locks held: none ==7850== at 0x10A3AF: chan_recv_unbuf (in /home/kkkkk1109/2024q1week12/exam2/exam2) ==7850== by 0x10A641: chan_recv (in /home/kkkkk1109/2024q1week12/exam2/exam2) ==7850== by 0x109329: reader (in /home/kkkkk1109/2024q1week12/exam2/exam2) ==7850== by 0x485396A: ??? (in /usr/libexec/valgrind/vgpreload_helgrind-amd64-linux.so) ==7850== by 0x4909AC2: start_thread (pthread_create.c:442) ==7850== by 0x499AA03: clone (clone.S:100) ==7850== Address 0x4aa1048 is 8 bytes inside a block of size 80 alloc'd ==7850== at 0x484A919: malloc (in /usr/libexec/valgrind/vgpreload_helgrind-amd64-linux.so) ==7850== by 0x109BCE: chan_make (in /home/kkkkk1109/2024q1week12/exam2/exam2) ==7850== by 0x1095B3: test_chan (in /home/kkkkk1109/2024q1week12/exam2/exam2) ==7850== by 0x1097DA: main (in /home/kkkkk1109/2024q1week12/exam2/exam2) ==7850== Block was alloc'd by thread #1 ``` ::: 說明是 `chan_send_unbuf` 和 `chan_recv_unbuf` 中同時寫入和讀取了一個 `size = 8` 的變數,而此變數是在 72byte 的結構中宣告的,推測是在 channel 中的變數,而透過觀察`chan_send_unbuf` 和 `chan_recv_unbuf` ```c static int chan_send_unbuf(struct chan *ch, void *data) { ... void **ptr = NULL; if (!atomic_compare_exchange_strong_explicit(&ch->datap, &ptr, &data, memory_order_acq_rel, memory_order_acquire)) { *ptr = data; atomic_store_explicit(&ch->datap, NULL, memory_order_release); if (atomic_fetch_sub_explicit(&ch->recv_ftx, 1, memory_order_acquire) == CHAN_WAITING) futex_wake(&ch->recv_ftx, CCCC); } ... } ``` ```c static int chan_recv_unbuf(struct chan *ch, void **data) { ... void **ptr = NULL; if (!atomic_compare_exchange_strong_explicit(&ch->datap, &ptr, data, memory_order_acq_rel, memory_order_acquire)) { *data = *ptr; atomic_store_explicit(&ch->datap, NULL, memory_order_release); if (atomic_fetch_sub_explicit(&ch->send_ftx, 1, memory_order_acquire) == CHAN_WAITING) futex_wake(&ch->send_ftx, 1); } ... } ``` 推測可能在 `*ptr = data` 和 `*data = *ptr` 沒有保護,可能造成同時寫入和讀取的問題 嘗試加入一個新的 mutex `data_mtx` ,在要修改前先使用鎖保護 ```diff static int chan_recv_unbuf(struct chan *ch, void **data) { ... ++ mutex_lock(&ch->data_mtx) *data = *ptr; ++ mutex_unlock(&ch->data_mtx); ... } static int chan_send_unbuf(struct chan *ch, void *data) { ... ++ mutex_lock(&ch->data_mtx) *ptr = data; ++ mutex_unlock(&ch->data_mtx); ... } ``` 仍然沒有解決 data race 的問題。 ::: info 但後來思考,不應該兩者同時進入此區塊,因為在 `chan_send_unbuf` 和 `chan_recv_unbuf` 中,比較式為 atomic 操作,若一方替換成功,則另一方必定去到替換失敗區,可能問題不出在這? ::: 透過讓程式 print 出 目前所在的區塊,發現確實不會有兩者同時在 cas 成功或 cas 失敗的區塊 ``` ... recv in cas success send in cas failed send in cas success recv in cas failed ... ``` 改成使用 `atomic` 指令進行 `*ptr = data` 和 `*data = *ptr`。 ```diff static int chan_send_unbuf(struct chan *ch, void *data) { ... - //*ptr = data; + atomic_store_explicit(ptr, data, memory_order_release); ... } static int chan_recv_unbuf(struct chan *ch, void *data) { ... - //*data= *ptr; + atomic_store_explicit(data, *ptr, memory_order_release); ... } ``` 還有其他可能發生 data race 的兩個地方,在 `reader` 和 `chan_send_unbuf` :::spoiler ```c ==12438== Possible data race during read of size 8 at 0x56A0DB8 by thread #2 ==12438== Locks held: none ==12438== at 0x10932F: reader (in /home/kkkkk1109/2024q1week12/exam2/exam2) ==12438== by 0x485396A: ??? (in /usr/libexec/valgrind/vgpreload_helgrind-amd64-linux.so) ==12438== by 0x4909AC2: start_thread (pthread_create.c:442) ==12438== by 0x499AA03: clone (clone.S:100) ==12438== ==12438== This conflicts with a previous write of size 8 by thread #12 ==12438== Locks held: none ==12438== at 0x10A1FC: chan_send_unbuf (in /home/kkkkk1109/2024q1week12/exam2/exam2) ==12438== by 0x10A5F6: chan_send (in /home/kkkkk1109/2024q1week12/exam2/exam2) ==12438== by 0x1092A5: writer (in /home/kkkkk1109/2024q1week12/exam2/exam2) ==12438== by 0x485396A: ??? (in /usr/libexec/valgrind/vgpreload_helgrind-amd64-linux.so) ==12438== by 0x4909AC2: start_thread (pthread_create.c:442) ==12438== by 0x499AA03: clone (clone.S:100) ==12438== Address 0x56a0db8 is on thread #2's stack ==12438== in frame #0, created by reader (???:) ``` ::: 以下是 `reader` 程式碼 ```c void *reader(void *arg) { struct thread_arg *a = arg; size_t msg = 0, received = 0, expect = a->to - a->from; printf("address of msg is %lx\n",&msg); while (received < expect) { if (chan_recv(a->ch, (void **) &msg) == -1) break; atomic_fetch_add_explicit(&msg_count[msg], 1, memory_order_relaxed); ++received; } return 0; } ``` 推測是 `reader` 創造的 `msg` 發生了 data race,將 `msg` 的記憶體位置 `print` 出 ```c address of msg is 7fecce6be1c8 ================== WARNING: ThreadSanitizer: data race (pid=14787) Read of size 8 at 0x7fecce6be1c8 by thread T1: #0 reader <null> (exam2+0x16cf) Previous atomic write of size 8 at 0x7fecce6be1c8 by thread T11: ``` 確實是 msg 的問題 :::info 懷疑是 `reader` 中的 `msg_count[msg]` 的 `msg`,需要先經過 `atomic` 的處理 ::: 將 `msg` 的讀取改成以變數 `count` 進行 atomic 操作 ```diff static void *reader(void *arg) { ... + count = atomic_load(&msg); + atomic_fetch_add_explicit(&msg_count[count], 1, memory_order_relaxed); - atomic_fetch_add_explicit(&msg_count[msg], 1, memory_order_relaxed); ++received; } return 0; } ``` 便成功沒有報錯! > [commit](https://github.com/kkkkk1109/2024q1week12/commit/188319d5598c10b58f3c2c0280d1e0aa88e402e8) ### 測驗 `3` 此題為實作一個 lock-free 的 single-producer/single-consumer,使用 ring buffer 且避免造成 false sharing。 定義了 `counter_t` 用為計數,注意到這邊使用 `union` ,並用 `w` 和 `r` 區分寫入和讀取。 ```c typedef union { volatile uint32_t w; volatile const uint32_t r; } counter_t; ``` 接著看 `spsc_queue` ```c typedef struct spsc_queue { counter_t head; /* Mostly accessed by producer */ volatile uint32_t batch_head; counter_t tail __ALIGN; /* Mostly accessed by consumer */ volatile uint32_t batch_tail; unsigned long batch_history; /* For testing purpose */ uint64_t start_c __ALIGN; uint64_t stop_c; element_t data[SPSC_QUEUE_SIZE] __ALIGN; /* accessed by prod and coms */ } __ALIGN spsc_queue_t; ``` 可以看到使用了 `head` 、 `tail` 作為 ring buffer 的開頭和結尾。而這邊使用了 ` __ALIGN`,其定義為 ```c #define __ALIGN __attribute__((aligned(64))) ``` 由於題目有說明要避免 false sharing,因此將常使用到的物件以 64byte 為一個單位,如此一來就不會被存在同個 cacheline,而不會造成頻繁的 cache coherence。 :::info `batch_head` 、`batch_tail` 和 `batch_history` 為批次處理,是一次讀取 `batch_size` 的量嗎 ::: `element_t data[SPSC_QUEUE_SIZE] __ALIGN` 宣告了此 ring buffer 的大小。 接著是關於 queue 的操作 `queue_init` 使用 [`memset`](https://man7.org/linux/man-pages/man3/memset.3.html) 將 queue 中的各個數值以 0 填滿。 ```c static void queue_init(spsc_queue_t *self) { memset(self, 0, sizeof(spsc_queue_t)); self->batch_history = SPSC_BATCH_SIZE; } ``` * `SPSC_QUEUE_SIZE = (1024 * 8)` 為 ring buffer 的大小 * `SPSC_BATCH_SIZE` 為 queue_size 除以 16 * `SPSC_BATCH_INCREAMENT = (SPSC_BATCH_SIZE / 2)` 為 batch 一次增加的量 * `SPSC_CONGESTION_PENALTY (1000)` 作為等待的時間 接著看 `dequeue` 首先,先取得上一次放入的 batch 大小, `*val_ptr` 為將取出資料的放入的空間。 :::info 若是第一次執行的話,batch_history = SPSC_BATCH_SIZE ::: ```c static int dequeue(spsc_queue_t *self, element_t *val_ptr) { unsigned long batch_size = self->batch_history; *val_ptr = SPSC_QUEUE_ELEMENT_ZERO; ... } ``` :::success 第一次執行的話, tail.r = batch_tail = 0 ::: 會判斷目前 `tail` 是否達到 `batch_tail` ,達到的話就要增加 `bathc_tail` 的量,先以 `tmp_tail` 去判斷要增加的量。 檢查 `tmp_tail` 是否達到 ring buffer 結尾,接著若 `batch_history` 小於 `SPSC_BATCH_SIZE` 的話,則要增加 `batch_history` 的大小,有兩個選項,比較 `SPSC_BATCH_SIZE` 和 `batch_history + SPSC_BATCH_INCREAMENT` ,選擇較小的一個做為新的 `batch_history` ```c /* Try to zero in on the next batch tail */ if (self->tail.r == self->batch_tail) { uint32_t tmp_tail = self->tail.r + SPSC_BATCH_SIZE; // check if the batch_tail meet the end of ring buffer if (tmp_tail >= SPSC_QUEUE_SIZE) { tmp_tail = 0; // determine the batch_history if (self->batch_history < SPSC_BATCH_SIZE) { self->batch_history = (SPSC_BATCH_SIZE < (self->batch_history + SPSC_BATCH_INCREAMENT)) ? SPSC_BATCH_SIZE : (self->batch_history + SPSC_BATCH_INCREAMENT); } } ... } ``` 目前已經決定 `tmp_tail` ,不過並非直接指定這樣大小的 batch_size,會先等待後續的空間已經有資料可以讀取,因此以 `while (!(self->data[tmp_tail]))` 判斷是否有資料,沒有的話便等待 producer 產生資料,並將 `batch_size` 縮小,重複步驟直到有 producer 產生的資料。 ```c if (self->tail.r == self->batch_tail) { ... batch_size = self->batch_history; while (!(self->data[tmp_tail])) { wait_ticks(SPSC_CONGESTION_PENALTY); batch_size >>= 1; if (batch_size == 0) return SPSC_Q_EMPTY; tmp_tail = self->tail.r + batch_size; if (tmp_tail >= SPSC_QUEUE_SIZE) tmp_tail = 0; } self->batch_history = batch_size; if (tmp_tail == self->tail.r) tmp_tail = (tmp_tail + 1) >= SPSC_QUEUE_SIZE ? 0 : tmp_tail + 1; self->batch_tail = tmp_tail; } ``` :::info 最後 `if (tmp_tail == self->tail.r)` 猜測為判斷是否回到 tail.r 因為 ring buffer 為環形結構。 ::: 接著是實際取出資料,使用 `val_ptr` 獲得資料 `self->data[self->tail.r]` ```c *val_ptr = self->data[self->tail.r]; self->data[self->tail.r] = SPSC_QUEUE_ELEMENT_ZERO; self->tail.w++; if (self->tail.r >= SPSC_QUEUE_SIZE) self->tail.w = 0; return SPSC_OK; ``` 接下來看如何使用 `enqueue` 放入資料,若 `head` 達到 `batch_head`,則使用 tmp_head 去決定下次 batch_head 的位置。 這邊和 consumer 不同的是,每次都是以 `SPSC_BATCH_SIZE` 來增加。 若 `tmp_head` 的部分有資料,則代表已經 ring buffer 已經滿了,無法再放入資料。 ```c static int enqueue(spsc_queue_t *self, element_t value) { /* Try to zero in on the next batch head. */ if (self->head.r == self->batch_head) { uint32_t tmp_head = self->head.r + SPSC_BATCH_SIZE; if (tmp_head >= SPSC_QUEUE_SIZE) tmp_head = 0; // wait if (self->data[tmp_head]) { /* run spin cycle penality */ wait_ticks(SPSC_CONGESTION_PENALTY); return SPSC_Q_FULL; } self->batch_head = tmp_head; } // put value in ring buffer self->data[self->head.r] = value; self->head.w++; if (self->head.r >= SPSC_QUEUE_SIZE) self->head.w = 0; return SPSC_OK; } ``` 接著看對應的 consumer 和 producer 操作 定義了 `struct init_info_t` ```c typedef struct { uint32_t cpu_id; pthread_barrier_t *barrier; } init_info_t; ``` `cpu_id` 對應到不同的 cpu ,`barrier` 作為一個同步的手段,會進入等待其他執行緒完成特定事項。 * consumer ```c void *consumer(void *arg) { element_t value = 0, old_value = 0; init_info_t *init = (init_info_t *) arg; uint32_t cpu_id = init->cpu_id; pthread_barrier_t *barrier = init->barrier; /* user needs tune this according to their machine configurations. */ cpu_set_t cur_mask; CPU_ZERO(&cur_mask); CPU_SET(cpu_id * 2, &cur_mask); printf("consumer %d: ---%d----\n", cpu_id, 2 * cpu_id); if (sched_setaffinity(0, sizeof(cur_mask), &cur_mask) < 0) { printf("Error: sched_setaffinity\n"); return NULL; } ... } ``` 首先,[`cpu_set_t`](https://man7.org/linux/man-pages/man3/CPU_SET.3.html) 定義為多個 cpu 的集合,而使用 `CPU_ZERO` 將 `cur_mask` 設為沒有 CPU,再使用 `CPU_SET` 加入 `cpu_id * 2` 的 CPU。 接著,使用 [`sched_setaffinity`](https://man7.org/linux/man-pages/man2/sched_setaffinity.2.html) 將 thread 運行在特定的 CPU ```c int sched_setaffinity(pid_t pid, size_t cpusetsize, const cpu_set_t *mask); ``` 如圖 ![image](https://hackmd.io/_uploads/H1QPqSTI0.png) ``` consumer 1: ---2---- consumer 2: ---4---- Consumer created... consumer 3: ---6---- producer 0: ---1---- Consumer created... consumer 5: ---10---- Consumer created... Consumer created... consumer 4: ---8---- Consumer created... consumer: 27 cycles/op consumer: 27 cycles/op consumer: 27 cycles/op consumer: 27 cycles/op consumer: 27 cycles/op producer 5 cycles/op Done! ``` 若 pid 為零,則會選擇呼叫此函式的 thread ```c ... printf("Consumer created...\n"); pthread_barrier_wait(barrier); queues[cpu_id].start_c = read_tsc(); ... ``` 接著,使用 `barrier` 等到所有 `consumer` 完成 `sched_setaffinity` `read_tsc()` 對應如下,主要為獲取時間 ```c static inline uint64_t read_tsc() { uint64_t time; uint32_t msw, lsw; __asm__ __volatile__( "rdtsc\n\t" "movl %%edx, %0\n\t" "movl %%eax, %1\n\t" : "=r"(msw), "=r"(lsw) : : "%edx", "%eax"); time = ((uint64_t) msw << 32) | lsw; return time; } ``` ```c for (uint64_t i = 1; i <= TEST_SIZE; i++) { while (dequeue(&queues[cpu_id], &value) != 0) ; assert((old_value + 1) == value); old_value = value; } queues[cpu_id].stop_c = read_tsc(); printf( "consumer: %ld cycles/op\n", ((queues[cpu_id].stop_c - queues[cpu_id].start_c) / (TEST_SIZE + 1))); pthread_barrier_wait(barrier); return NULL; ``` 接著,執行 `dequeue` ,並記錄完成時間,使用 `barrier` 等到所有 consumer 完成。 * producer ```c void producer(void *arg, uint32_t num) { ... for (uint64_t i = 1; i <= TEST_SIZE + SPSC_BATCH_SIZE; i++) { for (int32_t j = 1; j < num; j++) { element_t value = i; while (enqueue(&queues[j], value) != 0) ; } } ... } ``` 內容和 `consumer` 僅有從 `dequeue` 換成 `enqueue`。 ### 排除 ThreadSanitizer 的錯誤訊息並提出改進方案 在編譯時加入 ```shell gcc -Wall -O2 -I. -o main main.c -lpthread -fsanitize=thread ``` 產生錯誤訊息 ```c ================== WARNING: ThreadSanitizer: thread leak (pid=6908) Thread T1 (tid=6910, finished) created by main thread at: #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8) #1 main <null> (exam3+0x1293) And 1 more similar thread leaks. SUMMARY: ThreadSanitizer: thread leak (/home/kkkkk1109/2024q1week12/exam3/exam3+0x1293) in main ``` 說明發生 thread leak ,可能有 thread 沒有適當的釋放,猜測是 `consumer_thread` 沒有被安全釋放,加上 `pthread_detach` 來安全釋放 thread ```c for (int i = 1; i < max_threads; i++) { INIT_CPU_ID(i) = i; INIT_BARRIER(i) = &barrier; error = pthread_create(&consumer_thread, &consumer_attr, consumer, INIT_PTR(i)); pthread_detach(consumer_thread); } ``` 便沒有報錯訊息了 > [commit](https://github.com/kkkkk1109/2024q1week12/commit/4672babc762546b51f7cba571619225979a5db9b)

    Import from clipboard

    Paste your markdown or webpage here...

    Advanced permission required

    Your current role can only read. Ask the system administrator to acquire write and comment permission.

    This team is disabled

    Sorry, this team is disabled. You can't edit this note.

    This note is locked

    Sorry, only owner can edit this note.

    Reach the limit

    Sorry, you've reached the max length this note can be.
    Please reduce the content or divide it to more notes, thank you!

    Import from Gist

    Import from Snippet

    or

    Export to Snippet

    Are you sure?

    Do you really want to delete this note?
    All users will lose their connection.

    Create a note from template

    Create a note from template

    Oops...
    This template has been removed or transferred.
    Upgrade
    All
    • All
    • Team
    No template.

    Create a template

    Upgrade

    Delete template

    Do you really want to delete this template?
    Turn this template into a regular note and keep its content, versions, and comments.

    This page need refresh

    You have an incompatible client version.
    Refresh to update.
    New version available!
    See releases notes here
    Refresh to enjoy new features.
    Your user state has changed.
    Refresh to load new user state.

    Sign in

    Forgot password

    or

    By clicking below, you agree to our terms of service.

    Sign in via Facebook Sign in via Twitter Sign in via GitHub Sign in via Dropbox Sign in with Wallet
    Wallet ( )
    Connect another wallet

    New to HackMD? Sign up

    Help

    • English
    • 中文
    • Français
    • Deutsch
    • 日本語
    • Español
    • Català
    • Ελληνικά
    • Português
    • italiano
    • Türkçe
    • Русский
    • Nederlands
    • hrvatski jezik
    • język polski
    • Українська
    • हिन्दी
    • svenska
    • Esperanto
    • dansk

    Documents

    Help & Tutorial

    How to use Book mode

    Slide Example

    API Docs

    Edit in VSCode

    Install browser extension

    Contacts

    Feedback

    Discord

    Send us email

    Resources

    Releases

    Pricing

    Blog

    Policy

    Terms

    Privacy

    Cheatsheet

    Syntax Example Reference
    # Header Header 基本排版
    - Unordered List
    • Unordered List
    1. Ordered List
    1. Ordered List
    - [ ] Todo List
    • Todo List
    > Blockquote
    Blockquote
    **Bold font** Bold font
    *Italics font* Italics font
    ~~Strikethrough~~ Strikethrough
    19^th^ 19th
    H~2~O H2O
    ++Inserted text++ Inserted text
    ==Marked text== Marked text
    [link text](https:// "title") Link
    ![image alt](https:// "title") Image
    `Code` Code 在筆記中貼入程式碼
    ```javascript
    var i = 0;
    ```
    var i = 0;
    :smile: :smile: Emoji list
    {%youtube youtube_id %} Externals
    $L^aT_eX$ LaTeX
    :::info
    This is a alert area.
    :::

    This is a alert area.

    Versions and GitHub Sync
    Get Full History Access

    • Edit version name
    • Delete

    revision author avatar     named on  

    More Less

    Note content is identical to the latest version.
    Compare
      Choose a version
      No search result
      Version not found
    Sign in to link this note to GitHub
    Learn more
    This note is not linked with GitHub
     

    Feedback

    Submission failed, please try again

    Thanks for your support.

    On a scale of 0-10, how likely is it that you would recommend HackMD to your friends, family or business associates?

    Please give us some advice and help us improve HackMD.

     

    Thanks for your feedback

    Remove version name

    Do you want to remove this version name and description?

    Transfer ownership

    Transfer to
      Warning: is a public team. If you transfer note to this team, everyone on the web can find and read this note.

        Link with GitHub

        Please authorize HackMD on GitHub
        • Please sign in to GitHub and install the HackMD app on your GitHub repo.
        • HackMD links with GitHub through a GitHub App. You can choose which repo to install our App.
        Learn more  Sign in to GitHub

        Push the note to GitHub Push to GitHub Pull a file from GitHub

          Authorize again
         

        Choose which file to push to

        Select repo
        Refresh Authorize more repos
        Select branch
        Select file
        Select branch
        Choose version(s) to push
        • Save a new version and push
        • Choose from existing versions
        Include title and tags
        Available push count

        Pull from GitHub

         
        File from GitHub
        File from HackMD

        GitHub Link Settings

        File linked

        Linked by
        File path
        Last synced branch
        Available push count

        Danger Zone

        Unlink
        You will no longer receive notification when GitHub file changes after unlink.

        Syncing

        Push failed

        Push successfully