# Linux 核心專題: 並行程式設計 > 執行人: kkkkk1109 > [簡報內容](https://docs.google.com/presentation/d/1MTa8o6FMPKAB01YYOXVmzj_e0gA1vfJe/edit?usp=drive_link&ouid=104070822328263146319&rtpof=true&sd=true) > [講解影片](https://www.youtube.com/watch?v=JDrb_v_Im5g) ### Reviewed by `chloe0919` > EMPTY 代表搶工作失敗,試著在同個佇列中再搶一次 應為 ABORT 才是搶工作失敗,需要在同佇列中再嘗試偷竊一次 > 收到!感謝提醒 ### Reviewed by `fennecJ` * 透過 Work-stealing 從其他 theads 把工作偷過來執行的手法雖然可以提昇總體的 throughput ,卻會因為 task migration 的關係造成額外的成本,若 task migration 頻繁發生,甚至可能導致執行上花費大量時間處理因 task migration 產生的成本反而使總體執行效率降低。為了降低 task migration 造成的衝擊,當今天有 task 滿足可說明影片中被竊取的條件時「 Top <= bottom 」,我們還有哪些可以考慮的因素決定是否要竊取該 task ? > 我認為還可以從剩餘工作較多的佇列偷取,並限制可偷取工作執行緒數量來減少 task migration 的大量成本 * Bloom filter 透過 hash 達成高效查詢的能力,但它卻有無法刪除存在於 table 內的 element 的顯著缺點,針對這個問題,資料科學家提出了改善的資料結構: [Counting Bloom Filter](https://en.wikipedia.org/wiki/Counting_Bloom_filter) 以及 [Quotient filter](https://en.wikipedia.org/wiki/Quotient_filter) 可否請你就上述不同資料結構進行比較,針對實做成本、支援操作、時間複雜度進行探討,並歸納出各自適合的應用場景。 > 可以! 感謝補充,我會實作看看 ### Reviewed by `Wufangni` > 判斷目前佇列的剩餘工作,由 top 和 bottom 的關係判斷 若能利用 top 和 bottom 的差當作佇列目前的任務多寡來判斷該優先偷取哪個佇列的任務,是否能降低空佇列(佇列內部)發生的情況? > 收到,我認為此想法是可行的,可以實驗看看。 ### Reviewed by `stevendd543` 想請問後面有提到將 channel 資料透過互斥鎖保護 ```c mutex_lock(&ch->data_mtx); *data = *ptr; mutex_unlock(&ch->data_mtx); ``` > 之後改成使用 atomic 操作,沒有使用互斥鎖是因為在 `chan_sen_unbuf` 和 `chan_recv_unbuf` 不會同時進入到 `*data = *ptr` 和 `*ptr = data` 操作中,因此在前後加上互斥鎖無法解決 data race 的問題。實際上是外部 `reader` 存取 `msg` 時造成 data race,且已經使用了 `send_mtx` 和 `recv_mtx`,想要避免使用多個鎖而造成 dead lock 的情況。 就我了解 msg 也是指向 chnnel 為什麼前面將其保護,後面 main 在存取的時候還需要再使用 atomic 操作? ` atomic_fetch_add_explicit(&msg_count[count], 1, memory_order_relaxed);` > 雖然已經使用了 atomic 操作在進行 `atomic_fetch_add_explicit(&msg_count[msg], 1, memory_order_relaxed)` ,但這個 `msg` 需要再進行讀取一次,並且是沒有進行保護的,很容易就被其他執行緒同時更改這個值,而導致讀取失敗和寫入失敗,因此才使用 `count` 進行 atomic 操作來避免上述事件發生。 ### Reviewed by `Shiang1212` * 建議一 > 成功後,以 new_head 存取 old_head->next,若 new_head 為 0 ,則代表為最後一個節點,返回 NULL。 這段話看起來是用來描述這行程式碼: ```c new_head = atomic_load(&old_head->next); ``` 這裡使用 "存取" 這個詞不太恰當,函式名稱都有 "load" 字眼出現了,應該使用 "載入" 或 "讀取" 之類的詞,建議可以改成:使用 `atomic_load` 載入 `old_head->next` 的值,並將其賦值給 `new_head`。 > 收到!謝謝指正 * 建議二 > `fph` 和 `fpt` 對應到 `free_pool` 的頭和尾 >`insert_pool` 將 `node` 放入 `free_pool` 的節尾 >`free_pool` 為釋放 `free_pool` 中的節點 `free_pool` 應是 [lfq.c](https://gist.github.com/jserv/128b735e8e73846d38bf1e98ba553607) 中的一個函式,如上面的例子,若你想表達是 retire list,你應該使用其他詞來表示,否則容易造成誤會。 > 收到!謝謝指正 ## TODO: 紀錄閱讀 [並行程式設計](https://hackmd.io/@sysprog/concurrency) 教材中遇到的問題 > 重現實驗並嘗試對內文做出貢獻 > [閱讀筆記](https://hackmd.io/@kkkkk1109/ry9tV2hzA/edit) #### 排程器 CPU 會為各個工作進行排程,來決定下一個工作,可以是 static 排程,也可以是 dynamic 的排程。 排程有多種演算法,依照時間分割(round-robin scheduling 或 time slicing) 用於處理相同重要性的工作,而任務優先順序的 (priority scheduling)則處理那些有不同時效性的任務,也就是 hard real-time 的工作。 :::info 排程器是在哪裡運行的 是會有一個CPU來完成這些事情嗎 排程這件事算一個task嗎 ::: #### 搶佔式與非強取式核心 兩者的核心差別為,工作交出CPU的使用權是強制性的或是非強制的。 非搶佔式(non-preemptive)不會依照工作的優先順序交出CPU的使用權,而是不定期的交出使用權。為了達到並行,因此頻率要夠快,否則讓使用者感受到等待時間,有下列好處 1. 實作單純 2. 工作中可使用**非再進入程式碼(non-reentrant code)**,換言之,每個工作不需擔心在程式未執行完畢時又重新進入。因此該工作本身所用的記憶區不會有被污染 (corruption) 的可能; 3. 對系統共用記憶區的保護動作可減至最少,因為每一工作在未使用完記憶區時不會放棄 CPU ,無須擔心會被其他工作在半途中修改; :::info 2、3項<s>不太懂</s> > 不懂就說不懂,不要說「不太懂」。教材有講解錄影,搭配課程相關教材。對照閱讀後,紀錄詳盡的問題。 ::: ## TODO: [第 9 週測驗題之 3](https://hackmd.io/@sysprog/linux2024-quiz9) > 解釋上述程式碼運作原理,包含延伸問題 此題是嘗試以 C11 Atomics 撰寫一 [work stealing](https://en.wikipedia.org/wiki/Work_stealing) 的程式碼 ![image](https://hackmd.io/_uploads/ByWOf_uUA.png) 可以把 work stealing 想成,兩執行緒各自有分配的任務,但若其中一方較早完成,便可以**偷走**另一方的任務,來避免空等的情況。 為了避免發生搶到另一方正在執行的任務,通常這種任務佇列會使用雙向佇列(double-ended queue),不同執行緒分別從佇列的頭和尾去完成任務。 接著介紹定義 * `work_t` ```c typedef struct work_internal { task_t code; atomic_int join_count; void *args[]; } work_t; ``` `task_t` 會回傳一 `work_t` 的指標,其定義如下,主要是回傳此工作的指標 ```c typedef struct work_internal *(*task_t)(struct work_internal *); ``` 而 `join_count` 為計算此任務還需要多少參數,`args` 即為此任務所需要的參數 ```c typedef struct { atomic_size_t size; _Atomic work_t *buffer[]; } array_t; ``` ```c typedef struct { /* Assume that they never overflow */ atomic_size_t top, bottom; _Atomic(array_t *) array; } deque_t; ``` 接著是工作佇列的定義,使用 `top` 和 `bottom` 指向佇列的頭和尾 接著來解釋各個操作 * `init` 使用 `atomic_init` 來初始 `deque_t` 的各個參數 * 將 `top` 、 `bottom` 設為 0 * `size_hint` 為 可放入幾項任務,並使用 malloc 配置記憶體空間 * 將 `size` 以 `size_hint` 初始化 * 將 `array` 指向 `a` 的指標 ```c void init(deque_t *q, int size_hint) { atomic_init(&q->top, 0); atomic_init(&q->bottom, 0); array_t *a = malloc(sizeof(array_t) + sizeof(work_t *) * size_hint); atomic_init(&a->size, size_hint); atomic_init(&q->array, a); } ``` 而主要操作有 `push` 、 `take` 和 `steal`,下列是一個 deque 的範例 ```graphviz digraph G { rankdir=LR; node[shape=record]; map [label="A[0]|<a1>A[1]|A[2]|A[3]|... |<an>A[n-2]|A[n-1] "]; node[shape=plaintext] top [fontcolor="red"] bottom [fontcolor="red"] top->map:a1 bottom->map:an } ``` * `push` 的操作為將新的任務放入 `bottom` 的位置,並使 `bottom += 1 ` ```c void push(deque_t *q, work_t *w) { size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed); size_t t = atomic_load_explicit(&q->top, memory_order_acquire); array_t *a = atomic_load_explicit(&q->array, memory_order_relaxed); if (b - t > a->size - 1) { /* Full queue */ resize(q); a = atomic_load_explicit(&q->array, memory_order_relaxed); } atomic_store_explicit(&a->buffer[b % a->size], w, memory_order_relaxed); atomic_thread_fence(memory_order_release); atomic_store_explicit(&q->bottom, DDDD, memory_order_relaxed); } ``` 這邊要注意,由於 `push` 可能會使佇列的空間改變,因此當佇列滿時會調用到 `resize` 來改變佇列大小 `atomic_thread_fence(memory_order_release)` 用來確保 fence 後的 `store` 必定排在 fence 前的操作 :::success `DDDD` 應為 `b+1` ::: * `take` 的操作為從自己的佇列中,將 `bottom - 1` 的工作取出執行,並將 `bottom -= 1` ,但要考慮到佇列內剩餘工作的數量,考量是否會發生和 `steal` 同時發生競爭的情況,主要分為兩個部分 1. 預設取得任務會成功,將 `bottom -1` 並存回 `bottom`,取得 `top` 和 `bottom` 的數值 ```c work_t *take(deque_t *q) { size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed) - 1; array_t *a = atomic_load_explicit(&q->array, memory_order_relaxed); atomic_store_explicit(&q->bottom, b, memory_order_relaxed); atomic_thread_fence(memory_order_seq_cst); size_t t = atomic_load_explicit(&q->top, memory_order_relaxed); work_t *x; ... } ``` 2. 判斷目前佇列的剩餘工作,由 `top` 和 `bottom` 的關係判斷 * `top` < `bottom` 代表還有工作,可直接取出 * `top` = `bottom` 代表只剩一個工作,可能造成和 `steal` 競爭最後一個工作,以 `atomic_compare_exchange_strong_explicit` 判斷是否失敗,失敗代表被 `steal` 搶走工作,復原 `bottom` ;反之則代表取得工作成功,將 `top += 1`, * `top` > `bottom` 代表沒有工作,需復原 `bottom` ```c ... if (t <= b) { /* Non-empty queue */ x = atomic_load_explicit(&a->buffer[b % a->size], memory_order_relaxed); if (t == b) { /* Single last element in queue */ if (!atomic_compare_exchange_strong_explicit(&q->top, &t, t + 1, memory_order_seq_cst, memory_order_relaxed)) /* Failed race */ x = EMPTY; atomic_store_explicit(&q->bottom,b + 1, memory_order_relaxed); } } else { /* Empty queue */ x = EMPTY; atomic_store_explicit(&q->bottom, b + 1, memory_order_relaxed); } return x; ``` <s> `AAAA` 應為 `t+1` ,`BBBB` 應為 `b+1` ,`CCCC` 應為 `b+1` </s> :::danger 專注在程式碼本身,不用抄題目,更正上方程式碼的內容。 ::: * `steal` 從 `top` 的方向取得工作,取得 `top` 和 `bottom` 後,一樣使用 `atomic_compare_exchange_strong_explicit` 判斷是否會和 `take` 競爭 ```c work_t *steal(deque_t *q) { size_t t = atomic_load_explicit(&q->top, memory_order_acquire); atomic_thread_fence(memory_order_seq_cst); size_t b = atomic_load_explicit(&q->bottom, memory_order_acquire); work_t *x = EMPTY; if (t < b) { /* Non-empty queue */ array_t *a = atomic_load_explicit(&q->array, memory_order_consume); x = atomic_load_explicit(&a->buffer[t % a->size], memory_order_relaxed); if (!atomic_compare_exchange_strong_explicit( &q->top, &t, t + 1, memory_order_seq_cst, memory_order_relaxed)) /* Failed race */ return ABORT; } return x; } ``` <s> `EEEE` 應為 `t+1` </s> :::danger 專注在程式碼本身,不用抄寫題目。 不要急著解說程式碼,應揣摩程式開發者的本意,說明「如果是我設計這段程式碼,我會怎麼做?」 ::: * `resize` 的動作為將佇列的空間加大,會直接開兩倍的空間給新的佇列。 ```c void resize(deque_t *q) { array_t *a = atomic_load_explicit(&q->array, memory_order_relaxed); size_t old_size = a->size; size_t new_size = old_size * 2; array_t *new = malloc(sizeof(array_t) + sizeof(work_t *) * new_size); atomic_init(&new->size, new_size); size_t t = atomic_load_explicit(&q->top, memory_order_relaxed); size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed); for (size_t i = t; i < b; i++) new->buffer[i % new_size] = a->buffer[i % old_size]; atomic_store_explicit(&q->array, new, memory_order_relaxed); ``` 接著是實際操作 `thread` 為每個 thread 各自處理自己的工作佇列,使用 `take` 取得任務 ```c void *thread(void *payload) { int id = *(int *) payload; deque_t *my_queue = &thread_queues[id]; while (true) { work_t *work = take(my_queue); if (work != EMPTY) { do_work(id, work); }else { ... ``` 當沒有工作後,則可以開始 `steal` 別個 thread 的工作 `ABORT` 代表搶工作失敗,試著在同個佇列中再搶一次;`EMPTY` 代表此佇列已空。只要偷到工作便會跳離迴圈 ```c for (int i = 0; i < N_THREADS; ++i) { if (i == id) continue; stolen = steal(&thread_queues[i]); if (stolen == ABORT) { i--; continue; /* Try again at the same i */ } else if (stolen == EMPTY) continue; /* Found some work to do */ break; } ``` 如果全部跑完後,都沒有偷到任務的話,則檢查 `done` ,代表所有任務都已做完,沒有的話再重新跑一次迴圈。 ```c if (stolen == EMPTY) { if (atomic_load(&done)) break; continue; } else { do_work(id, stolen); } printf("work item %d finished\n", id); return NULL; ``` --- ## TODO: [第 10 週測驗題之 1, 2, 3](https://hackmd.io/@sysprog/linux2024-quiz10) > 解釋上述程式碼運作原理,包含延伸問題 ### 測驗 `1` 此題延伸 [第 6 周測驗題](https://hackmd.io/@sysprog/linux2024-quiz6)中的 [bloom filter](https://en.wikipedia.org/wiki/Bloom_filter),發展在並行環境中的程式碼,觀察理論與實際效能。 Bloom Filter 利用雜湊函數,在不用走訪全部元素的前提,預測特定字串是否在資料結構中。 其架構如下 * n 個位元構成的 table * k 個雜湊函數:$h_1$、$h_2$、...、$h_k$ * 當有新的字串 s ,將 s 透過 k 個雜湊函數,會得到 k 個 `index` ,並將 `table[index]` 設為 `1` 當要檢查字串 s 是否存在,只需要透過這 k 個雜湊函數觀察 table 上的 index 是否為 1 即可。 ![image](https://hackmd.io/_uploads/rJ3g6OYUR.png) 不過此作法存在著錯誤,有可能字串 s1 和 s2 經過 k 個雜湊函數後的 `table` 一模一樣,實際上我們只有放入字串 s2 。因此我們只能確保說,此字串一定不在資料結構中,但不能確保一定存在。 此外,無法將字串刪除,因為刪除可能會影響到其他字串的 index ,因此 bloom filter **只能新增,不能移除**。 此題的 table 為 $2^{28}$ 個位元,並使用 2 個雜湊函數 * `bloom_s` 為 bloom filter 的 table,而 table size 為 `2^28` 個位元 ```c #define BLOOMFILTER_SIZE 268435456 (2^28) #define BLOOMFILTER_SIZE_BYTE BLOOMFILTER_SIZE / sizeof(volatile char) struct bloom_s { volatile char data[BLOOMFILTER_SIZE_BYTE]; }; ``` 並將 `bloom_s` 定義成 `bloom_t` ```c typedef struct bloom_s bloom_t; ``` * `get` 為獲得 table 中的 key 的 bit ```c static inline int get(bloom_t *filter, size_t key) { uint64_t index = key / sizeof(char); uint8_t bit = 1u << (key % sizeof(char)); return (filter->data[index] & bit) != 0; } ``` * `set` 將 table 中的 key 的 bit 設為 1 ```c static inline int set(bloom_t *filter, size_t key) { uint64_t index = key / sizeof(char); uint64_t bit = 1u << (key % sizeof(char)); return (atomic_fetch_or(&filter->data[index], bit) & bit) == 0; } ``` <s> `AAAA` 應為 `atomic_fetch_or` ,和 `bit` 做 `or` 運算 </s> :::danger 不用抄寫題目,專注在程式碼本身。 ::: * `bloom_new` 為開啟一個新的 bloom filter,使用 `memset` 將所有 bit 設為 0 ```c bloom_t *bloom_new(bloom_allocator allocator) { bloom_t *filter = allocator(sizeof(bloom_t)); memset(filter, 0, sizeof(bloom_t)); return filter; } ``` * `bloom_add` 將 key 透過 hash 產生 `hbase`,再將 `hbase` 分成 `h1` 和 `h2` 放入 table 中。 ```c void bloom_add(bloom_t *filter, const void *key, size_t keylen) { uint64_t hbase = hash(key, keylen); uint32_t h1 = (hbase >> 32) % BLOOMFILTER_SIZE; uint32_t h2 = hbase % BLOOMFILTER_SIZE; set(filter, h1); set(filter, h2); } ``` * `bloom_test` 測試 `key` 是否存在於資料結構中 ```c int bloom_test(bloom_t *filter, const void *key, size_t keylen) { uint64_t hbase = hash(key, keylen); uint32_t h1 = (hbase >> 32) % BLOOMFILTER_SIZE; uint32_t h2 = hbase % BLOOMFILTER_SIZE; return get(filter,h1) & get(filter,h2); } ``` * `bloom_destroy` 會釋放 bloom filter 中的記憶體 * `bloom_clear` 將 bloom filter 所有 bit 設為 0 接著看測試檔案 * `globals_t` ```c typedef struct { int parent_fd; bloom_t *filter; uint32_t op_counter; } globals_t; globals_t globals; ``` :::info `parent_fd` 可以讓 child process 和 parent process 透過 pipe 傳輸資料 `op_counter` 做了幾筆操作 ::: * `worker_loop` `key` 為放入的字串,`key_len` 為字串長度 ```c void worker_loop() { const u_int8_t key[] = "wiki.csie.ncku.edu.tw"; u_int64_t key_len = strlen((const char *) key); ... } ``` [`getpid`](https://man7.org/linux/man-pages/man2/getpid.2.html) 可以得到目前使用的 process id,[`srand`](https://man7.org/linux/man-pages/man3/rand.3p.html)可以產生亂數種子,使用 `rand` 基於產生的亂數種子產生亂數 ```c ... int *k = (int *) key; srand(getpid()); *k = rand(); ... ``` 根據 `k` 的數值,決定是要將 key 放入資料結構中或是測試,並將 `*k++` 重複迴圈 ```c while (1) { int n = (*k) % 100; if (n < CONTAINS_P) { bloom_test(globals.filter, key, key_len); } else { bloom_add(globals.filter, key, key_len); } (*k)++; globals.op_counter++; } ``` :::info 使用 85% 的機率進行測試,15%的機率放入 bloom filter ::: * `create_worker` 首先,使用 [`pipe`](https://man7.org/linux/man-pages/man2/pipe.2.html) 創造可以在 process 間傳送訊息的通道,而可以透過 `fd` 讀寫。使用 [`fork()`](https://man7.org/linux/man-pages/man2/fork.2.html) 創造 child process , 並運行 `worker_loop`。 `fork()` 會回傳 pid, `pid = 0` 代表為 child process, `pid = -1` 則為 fork 失敗。 ```c int create_worker(worker *wrk) { int fd[2]; int ret = pipe(fd); if (ret == -1) return -1; pid_t pid = fork(); if (!pid) { /* Worker */ close(fd[0]); globals.parent_fd = fd[1]; worker_loop(); exit(0); } if (pid < 0) { printf("ERROR[%d]:%s", errno, strerror(errno)); close(fd[0]); close(fd[1]); return -1; } close(fd[1]); wrk->pid = pid; wrk->fd = fd[0]; return 0; } ``` 在 `main` 中,會創建 `N_WORKERS` 個 `create_worker` ,最後再透過 [kill](https://man7.org/linux/man-pages/man2/kill.2.html)去中止 process ```c for (int i = 0; i < N_WORKERS; i++) { uint32_t worker_out = 0; if (kill(workers[i].pid, SIGKILL)) { bloom_destroy(&globals.filter, bloom_free); printf("ERROR[%d]:%s", errno, strerror(errno)); exit(1); } (void) read(workers[i].fd, &worker_out, sizeof(uint32_t)); globals.op_counter += worker_out; } ``` ### 使用 [Counting Bloom filter](https://en.wikipedia.org/wiki/Counting_Bloom_filter) 來刪除字串 原先的 Bloom Filter 中,若隨意刪除字串,可能會影響其他字串的在 table 中的 index,Counting Bloom filter 使用了 counter 來計算每個 index 被使用了幾次,當要進行刪除時,只需要將其對應的 counter 減一即可。 在 filter 結構中,加入 counter ```c struct bloom_s { volatile char data[BLOOMFILTER_SIZE_BYTE]; uint8_t counter[BLOOMFILTER_SIZE]; }; ``` 在 `set` 中,將其對應的 counter 加一 ```c static inline int set(bloom_t *filter, size_t key) { uint64_t index = key / sizeof(char); uint64_t bit = 1u << (key % sizeof(char)); filter->counter[key]++; return (atomic_fetch_or(&filter->data[index], bit) & bit) == 0; } ``` 增加了 `bloom_delete`,首先獲得字串對應到的 index `h1`、`h2`,並確保此字串在 filter 中,再將其刪除。 ```c int bloom_delete(bloom_t *filter, const void *key, size_t keylen) { uint64_t hbase = hash(key, keylen); uint32_t h1 = (hbase >> 32) % BLOOMFILTER_SIZE; uint32_t h2 = hbase % BLOOMFILTER_SIZE; // make sure the delete string is in the table if(get(filter,h1) & get(filter,h2)) { filter->counter[h1]--; filter->counter[h2]--; if(filter->counter[h1] == 0 ) set_zero(filter,h1); if(filter->counter[h2] == 0 ) set_zero(filter,h2); return 1; } return 0; } ``` 使用 `set_zero` 刪除 table 中的 index ```c static inline int set_zero(bloom_t *filter, size_t key) { uint64_t index = key / sizeof(char); uint64_t bit = 1u << (key % sizeof(char)); uint64_t mask = 0xffffffffffffffff ^ bit; filter->data[index] &= mask; return 0; } ``` ### 測驗 `2` `lfq` 嘗試實作精簡的並行佇列 (concurrent queue),運用 [hazard pointer](https://en.wikipedia.org/wiki/Hazard_pointer) 來釋放並行處理過程中的記憶體。 首先,先介紹 Hazard pointer,於〈[並行程式設計: Hazard pointer](https://hackmd.io/@sysprog/concurrency-hazard-pointer#%E4%B8%A6%E8%A1%8C%E7%A8%8B%E5%BC%8F%E8%A8%AD%E8%A8%88-Hazard-pointer)〉中提到 > 在並行程式設計中,當我們在存取共用的記憶體物件時,需要考慮到其他執行緒是否有可能也正在存取同一個物件,若要釋放該記憶體物件時,不考慮這個問題,會引發嚴重的後果,例如 dangling pointer。 > 使用 mutex 是最簡單且直觀的方法:存取共用記憶體時,acquire lock 即可保證沒有其他執行緒正在存取同一物件,也就可安全地釋放記憶體。但若我們正在存取的是一種 lock-free 資料結構,當然就不能恣意地使用 lock,因為會違反 lock-free 特性,即無論任何執行緒失敗,其他執行緒都能可繼續執行。於是乎,我們需要某種同為 lock-free 的記憶體物件回收機制。 因此使用了 Hazard pointer 的設計,其架構如下 ![image](https://hackmd.io/_uploads/HkjJTvcL0.png) 每個 thread 都有自己的 hazard pointer 和 retire list。 設想以下情境,若同個物件, thread 1 正在讀取,但 thread 2 要釋放此記憶體,若是 thread 2 先進行釋放的動作,那麼 thread 1 便會出錯。因此要進行釋放前,要先確保無人讀取或已經讀取完畢,才能進行釋放。 * Hazard pointer : 此 thread 正在讀取的物件指標 * Retire list : 此 thread 即將釋放的物件指標 因此在要釋放前,會加入 retire list,並查看每個 thread 的 hazard pointer 是否指向要釋放的物件,若有,則等到其讀取完成,最後再進行釋放。 而 hazard 涉及為單一 thread 寫入,多個 thread 讀取,才能符合以上情境。 接著來看 `lfq` 中的各種定義 * `lfq_node` `data` 指向物件,`next ` 指向下個節點, `can_free` 表示這個物件是否可以被釋放 :::success `free_next` 指向 `free pool` 中的下個節點,應該就是 `retire list` ::: ```c struct lfq_node { void *data; union { struct lfq_node *next; struct lfq_node *free_next; }; bool can_free; }; ``` * `lfq_ctx` * `head` 和 `tail` 分別指向佇列的頭和尾,使用 `alignas(64)` 避免造成 false sharing * `HP` 即為 hazard pointer,`MAX_HP_SIZE` 為最多可以有幾個 `HP` ? * `fph` 和 `fpt` 對應到 `free pool` 的頭和尾 * `bool is_freeing` 確認是否正在釋放 ```c struct lfq_ctx { alignas(64) struct lfq_node *head; int count; struct lfq_node **HP; /* hazard pointers */ int *tid_map; bool is_freeing; struct lfq_node *fph, *fpt; /* free pool head/tail */ /* FIXME: get rid of struct. Make it configurable */ int MAX_HP_SIZE; /* avoid cacheline contention */ alignas(64) struct lfq_node *tail; }; ``` :::success `tid_map` 目前不清楚 thread id, ::: 接下來是 `lfq` 的各項操作 * `lfq_init` 會初始化整個 `lfq_ctx` * `lfq_release` 會釋放整個 `lfq_ctx` * `insert_pool` 將 `node` 放入 `free_pool` 的節尾 * 函式 `free_pool` 為釋放 `free pool` 中的節點 :::success `AAAA` 應為檢查目前是否有人正在`free` ,所以應為 ``,操作成功即可進入迴圈,失敗代表已經有人在操作。 ::: 獲得 `free_pool` 的 `head` 後,逐一檢查其中的節點是否可以釋放,若為以下條件 1. `(!atomic_load(&p->can_free)` 代表 `can_free = 0` 無法釋放 2. `(!atomic_load(&p->free_next))` 代表此節點已經是佇列末端,無法釋放 3. `in_hp(ctx, (struct lfq_node *) p)` 此節點被 `HP` 所指向,有 thread 正在讀取 則會跳出迴圈,並說明目前並無釋放記憶體,或者是已經釋放完成。 ```c static void free_pool(struct lfq_ctx *ctx, bool freeall) { bool old = 0; if (!atomic_compare_exchange_strong(&ctx->freeing,0,1)) return; for (int i = 0; i < MAX_FREE || freeall; i++) { struct lfq_node *p = ctx->fph; if ((!atomic_load(&p->can_free)) || (!atomic_load(&p->free_next)) || in_hp(ctx, (struct lfq_node *) p)) break; ctx->fph = p->free_next; free(p); } atomic_store(&ctx->is_freeing, false); atomic_thread_fence(memory_order_seq_cst); } ``` * `safe_free` 為釋放特定節點 首先,檢查此節點是可以釋放且不在 `HP` 中,接著判斷是否有人在釋放節點中 成功替換,則將此節點釋放,並將 `is_freeing` 改回 `false` 代表釋放完成。 若無法釋放,則加入 `free pool` 即可。 ```c static void safe_free(struct lfq_ctx *ctx, struct lfq_node *node) { if (atomic_load(&node->can_free) && !in_hp(ctx, node)) { /* free is not thread-safe */ bool old = 0; if (atomic_compare_exchange_strong(&ctx->freeing,0,1)) { /* poison the pointer to detect use-after-free */ node->next = (void *) -1; free(node); /* we got the lock; actually free */ atomic_store(&ctx->is_freeing, false); atomic_thread_fence(memory_order_seq_cst); } else /* we did not get the lock; only add to a freelist */ insert_pool(ctx, node); } else insert_pool(ctx, node); free_pool(ctx, false); } ``` * `lfq_enqueue` 在 `lfq_ctx` 中新增節點,使用 `atomic_exchange` 和 `atomic_store` 來替換 `ctx->tail` 和 `old_tail->next`,並確保 `tail->next` 必為 NULL ```c int lfq_enqueue(struct lfq_ctx *ctx, void *data) { struct lfq_node *insert_node = calloc(1, sizeof(struct lfq_node)); if (!insert_node) return -errno; insert_node->data = data; struct lfq_node *old_tail = atomic_exchange(&ctx->tail, insert_node); assert(!old_tail->next && "old tail was not NULL"); atomic_store(&old_tail->next, insert_node); return 0; } ``` * `lfq_dequeue` 會移除節點 ```c void *lfq_dequeue(struct lfq_ctx *ctx) { int tid = alloc_tid(ctx); /* many thread race */ if (tid == -1) return (void *) -1; void *ret = lfq_dequeue_tid(ctx, tid); free_tid(ctx, tid); return ret; } ``` 首先呼叫 `alloc_tid`,會檢查目前有哪些 `tid` 正在運行,回傳 `-1` 代表所有的 thread 都正在運行;非 `-1` 回傳剛開始運行的 thread ```c static int alloc_tid(struct lfq_ctx *ctx) { for (int i = 0; i < ctx->MAX_HP_SIZE; i++) { if (ctx->tid_map[i] == 0) { int old = 0; if (atomic_compare_exchange_strong(&ctx->tid_map[i], &old, 1)) return i; } } return -1; } ``` 接著呼叫 `lfq_dequeue_tid`,移除其中一個節點,並記錄是 `tid` 正在操作 存取 `old_head` 、 `new_head` ```c void *lfq_dequeue_tid(struct lfq_ctx *ctx, int tid) { struct lfq_node *old_head, *new_head; ... } ``` `old_head` 為目前的 `head` ,並將 `HP[tid]` 指向此,表示正在讀取此節點。接著再 `atomic_load(&ctx->head)` 檢查是否有被其他 thread 移除,被移除再到 `retry` 重新執行。 ```c do { retry: old_head = atomic_load(&ctx->head); atomic_store(&ctx->HP[tid], old_head); atomic_thread_fence(memory_order_seq_cst); if (old_head != atomic_load(&ctx->head)) goto retry; ... } while (!atomic_compare_exchange_strong(CCCC)); ``` 成功後,以 `new_head` 載入 `old_head->next`,若 `new_head` 為 0 ,則代表為最後一個節點,返回 NULL ```c do { ... new_head = atomic_load(&old_head->next); if (new_head == 0) { atomic_store(&ctx->HP[tid], 0); return NULL; } while (!atomic_compare_exchange_strong(&ctx->head,&old_head,new_head)); ``` 最後要再將 `ctx->head` 換成 `new_head` * `lfg_count_free_list` 計算 `free pool` 中有幾個節點 * `lfq_release` 為釋放整個 `lfq`,分成三個部分釋放 1. 釋放佇列 `ctx` 中的節點 檢查佇列內部是否有資料 ```c if (ctx->tail && ctx->head) { /* if we have data in queue */ while ((struct lfq_node *) ctx->head) { /* while still have node */ struct lfq_node *tmp = (struct lfq_node *) ctx->head->next; safe_free(ctx, (struct lfq_node *) ctx->head); ctx->head = tmp; } ctx->tail = 0; } ``` 2. 釋放佇列 `free pool` 中的節點 ```c if (ctx->fph && ctx->fpt) { free_pool(ctx, true); if (ctx->fph != ctx->fpt) return -1; free(ctx->fpt); /* free the empty node */ ctx->fph = ctx->fpt = 0; } ``` 3. 檢查函式 `free_pool` 是否成功,最後再釋放 `HP` 和 `tid_map`。 ```c if (ctx->fph || ctx->fpt) return -1; free(ctx->HP); free(ctx->tid_map); memset(ctx, 0, sizeof(struct lfq_ctx)); return 0; ``` ## TODO: [第 12 週測驗題之 2, 3](https://hackmd.io/@sysprog/linux2024-quiz12) > 解釋上述程式碼運作原理,包含延伸問題 ### 測驗 `1` 此題希望修改以下程式碼,使用 futex 的方式,達到等待 3 秒的效果 ```c #include <linux/futex.h> #include <stdint.h> #include <stdio.h> #include <sys/syscall.h> #include <time.h> #include <unistd.h> int futex_sleep(time_t sec, long ns) { uint32_t futex_word = 0; struct timespec timeout = {sec, ns}; return syscall(SYS_futex, AAAA, BBBB, futex_word, &timeout, NULL, 0); } int main() { time_t secs = 3; printf("Before futex_sleep for %ld seconds\n", secs); futex_sleep(secs, 0); printf("After futex_sleep\n"); return 0; } ``` 首先,我們先看 [`timespec` ](https://man7.org/linux/man-pages/man3/timespec.3type.html) ,它表示了一個時間,並且精度可以達到奈秒。 接著查看 `futex` 的格式和參數說明 ```c long syscall(SYS_futex, uint32_t *uaddr, int futex_op, uint32_t val, const struct timespec *timeout, /* or: uint32_t val2 */ uint32_t *uaddr2, uint32_t val3); ``` Futex (Fast userspace mutex) 藉由使用一個 32 位元變數`futex word`,來和 Kernal 中的 wait queue 來互動,而需要進行同步的執行緒則會共享此變數。 * `uaddr` 指向著 `futexword` * `futex_op` 則表示著不同的 futex 操作 * `val` 對應著 `futex_op` 中的數值 * `timeout` 、 `uaddr2` 和 `val3` 只有特定的 `futex_op` 會使用到,其餘則可以忽略 FUTEX_WAIT 會查看 `uaddr` 所指向的值,是否和 `val` 相同,此作法是為了避免被其他執行緒改變 `futex_word` 的值,而造成無法進入 sleep。 由於預期是等待 3 秒,因此 `futex_op` 應為 `FUTEX_WAIT` ,而 `*uaddr` 的部份為 `&futex_word` 。 ### 測驗 `2` 此題使用 C11 Atomics 和 Linux 提供的 futex 系統呼叫,來模擬 Go 程式語言的 [goroutine](https://go.dev/tour/concurrency/1) 和 [channel](https://go.dev/tour/concurrency/2) 機制 goroutine 可以想像成一個較輕量的 Thread ,而goroutine共享著同樣的地址,因此同步地訪問共同記憶體也非常重要 ```go func main() { go say("world") say("hello") } ``` ![image](https://hackmd.io/_uploads/rySrlptEA.png) 原先的 goroutine 會呼叫另一個 goroutine ,當原先也就是 main 的 goroutine 結束,其他的 goroutine也會跟著結束。 而 channel 為各個 goroutine 間的通訊,我們可以以此來完成執行緒間的 wait 和同步。 ```go func say(s string, c chan string) { for i := 0; i < 5; i++ { time.Sleep(100 * time.Millisecond) fmt.Println(s) } c <- "FINISH" } func main() { ch := make(chan string) go say("world", ch) go say("hello", ch) <-ch <-ch } ``` ![image](https://hackmd.io/_uploads/HJUS-vj40.png) 在 `main` 中,創建了 channel ,表示用來傳送字串。 ```go ch := make(chan string) ``` 接著創建了兩個 goroutine ,其結束後會往 channel 傳送 "Finish" ```go func say(s string, c chan string) { for i := 0; i < 5; i++ { time.Sleep(100 * time.Millisecond) fmt.Println(s) } c <- "FINISH" } ``` 直到接收到兩個 "Finish" 後,才會結束程式 ```go func main() { ch := make(chan string) go say("world", ch) go say("hello", ch) <-ch <-ch } ``` 接著是使用 C11 Atomics 和 futex 實作 GO channel 首先是 `mutex_unlock` ,使用 atomic 操作對 mutex 減一並解鎖,確保不被其他執行緒影響,並查看目前是否有執行緒在等待,有的話就喚醒。 ```c void mutex_unlock(struct mutex *mu) { uint32_t orig = atomic_fetch_sub_explicit(&mu->val, 1, memory_order_relaxed); if (orig != LOCKED_NO_WAITER) { mu->val = UNLOCKED; futex_wake(&mu->val, 1); } } ``` 使用 [`CAS` ](https://en.wikipedia.org/wiki/Compare-and-swap)操作,會比較 `ptr` 和 `expect` 的值。 **相同**,則將 `new` 放入 `ptr`; **不同的話**,則將 `ptr` 的值放入 `expect` ,最後回傳 expect 。 ```c static uint32_t cas(_Atomic uint32_t *ptr, uint32_t expect, uint32_t new) { atomic_compare_exchange_strong_explicit( ptr, &expect, new, memory_order_acq_rel, memory_order_acquire); return expect; } ``` `mutex_lock` 用於將 lock 上鎖,先使用 `CAS` 看目前的 val 值是否為 `UNLOCKED` ,是的話則替換成 `LOCKED_NO_WAITER` ;否的話代表目前鎖有人在使用,因此會進入 futex_wait 直到解鎖。 ```c void mutex_lock(struct mutex *mu) { uint32_t val = cas(&mu->val, UNLOCKED, LOCKED_NO_WAITER); if (val != UNLOCKED) { do { if (val == LOCKED || cas(&mu->val, LOCKED_NO_WAITER, LOCKED) != UNLOCKED) futex_wait(&mu->val, LOCKED); } while ((val = cas(&mu->val, UNLOCKED, LOCKED)) != UNLOCKED); } } ``` 接著是 `channel` 的宣告,在此有兩種 channel ,分別是 `unbuffer` 和 `ring buffer` 。 ```c struct chan { _Atomic bool closed; /* Unbuffered channels only: the pointer used for data exchange. */ _Atomic(void **) datap; /* Unbuffered channels only: guarantees that at most one writer and one * reader have the right to access. */ struct mutex send_mtx, recv_mtx; /* For unbuffered channels, these futexes start from 1 (CHAN_NOT_READY). * They are incremented to indicate that a thread is waiting. * They are decremented to indicate that data exchange is done. * * For buffered channels, these futexes represent credits for a reader or * write to retry receiving or sending. */ _Atomic uint32_t send_ftx, recv_ftx; /* Buffered channels only: number of waiting threads on the futexes. */ _Atomic size_t send_waiters, recv_waiters; /* Ring buffer */ size_t cap; _Atomic uint64_t head, tail; struct chan_item ring[0]; }; ``` * `closed` 表示通道的開啟與否 ```c _Atomic bool closed; ``` * `datap` 用以指向需要交換的資料 (unbuffered) ```c _Atomic(void **) datap; ``` * `send_mtx` 、` recv_mtx`: 確保只有一位 writer 和 一位 reader (unbuffered) ```c struct mutex send_mtx, recv_mtx; ``` * `send_ftx` 、` recv_ftx` * unbuffered :會從 1 開始,表示 channel 尚未完成。增加代表有執行緒在等待,完成傳輸則減少。 * ring buffer :表示可供給傳送和接收的數量 ```c _Atomic uint32_t send_ftx, recv_ftx; ``` * `send_waiters` 、` recv_waiters` 目前正在等待傳送和接收的 Thread (buffered) ```c _Atomic size_t send_waiters, recv_waiters; ``` * 最後是定義了 ring buffer 的長度 `cap`,並使用 `lap` 來計數。 ```c size_t cap; _Atomic uint64_t head, tail; struct chan_item ring[0]; ``` ```c struct chan_item { _Atomic uint32_t lap; void *data; }; ``` 接著是 channel 的相關操作 ```c enum { CHAN_READY = 0, CHAN_NOT_READY = 1, CHAN_WAITING = 2, CHAN_CLOSED = 3, }; ``` * `chan_init` 初始化一個 channel , 由 `cap` 決定為 unbuffered 還是 buffered channel。 若為 buffered channel ,則使用 `memset` 將所設定的 buffer 以 0 填滿。 ```c static void chan_init(struct chan *ch, size_t cap) { ch->closed = false; ch->datap = NULL; mutex_init(&ch->send_mtx), mutex_init(&ch->recv_mtx); if (!cap) ch->send_ftx = ch->recv_ftx = CHAN_NOT_READY; else ch->send_ftx = ch->recv_ftx = 0; ch->send_waiters = ch->recv_waiters = 0; ch->cap = cap; ch->head = (uint64_t) 1 << 32; ch->tail = 0; if (ch->cap > 0) memset(ch->ring, 0, cap * sizeof(struct chan_item)); } ``` :::info 目前還不清楚為何 head = 1 << 32 ::: * `chan_make` 創建一個 channel ,並以 `alloc` 分配空間給 channel 。 ```c struct chan *chan_make(size_t cap, chan_alloc_func_t alloc) { struct chan *ch; if (!alloc || !(ch = alloc(sizeof(*ch) + cap * sizeof(struct chan_item)))) return NULL; chan_init(ch, cap); return ch; } ``` :::info 可以使用 malloc 嗎 ::: 以下是 buffered 的通道 * `chan_trysend_buf` 執行以下事項 1. 先檢查了 channel 是否開啟 ```c if (atomic_load_explicit(&ch->closed, memory_order_relaxed)) { errno = EPIPE; return -1; } ``` 2. 存取 tail 的位置,並存取目前即將寫入的位置。檢查 `item` 的 `lap` 和 `tail` 的 `lap` 是否一致,表示可寫入狀態 ,並檢查這次寫入後 tail 是否達到 buffer 的最後位置,達到的話就回到第一個位置。 ```c uint64_t tail, new_tail; struct chan_item *item; // check if the tail is the same do { tail = atomic_load_explicit(&ch->tail, memory_order_acquire); uint32_t pos = tail, lap = tail >> 32; item = ch->ring + pos; if (atomic_load_explicit(&item->lap, memory_order_acquire) != lap) { errno = EAGAIN; return -1; } if (pos + 1 == ch->cap) new_tail = (uint64_t)(lap + 2) << 32; else new_tail = tail + 1; } while (!atomic_compare_exchange_weak_explicit(&ch->tail, &tail, new_tail, ``` 6. `!atomic_compare_exchange_weak_explicit` 檢查 `tail` 和 `ch->tail` 是否相同,若失敗則代表有被打斷,重新執行 1 ~ 4 步驟 7. 成功後,將資料放入buffer, ```c item->data = data; atomic_fetch_add_explicit(&item->lap, 1, memory_order_release); ``` * `chan_send_buf` 使用 `chan_trysend_buf` 檢查是否可以傳送,若為 `-1` 則代表目前無法傳送。此時由於交換失敗,因此將 `&ch->send_ftx` 存入 `v` ,因此若 `v` 為零,代表可傳送餘額為 0 進入等待,直到被喚醒。 ```c static int chan_send_buf(struct chan *ch, void *data) { //ready to send while (chan_trysend_buf(ch, data) == -1) { if (errno != EAGAIN) return -1; // uint32_t v = 1; while (!atomic_compare_exchange_weak_explicit(&ch->send_ftx, &v, v - 1, memory_order_acq_rel, memory_order_acquire)) { // v is zero only when &ch->send_ftx is not equal to &v if (v == 0) { atomic_fetch_add_explicit(&ch->send_waiters, 1, memory_order_acq_rel); futex_wait(&ch->send_ftx, 0); atomic_fetch_sub_explicit(&ch->send_waiters, 1, memory_order_acq_rel); v = 1; } } } ``` 接著, `&ch->recv_ftx` 加一,增加可接收的餘額,並叫醒正在等待接收的 Thread。 ```c // recv_ftx + 1 atomic_fetch_add_explicit(&ch->recv_ftx, 1, memory_order_acq_rel); // if there are recv_waiters, wake up one thread if (atomic_load_explicit(&ch->recv_waiters, memory_order_relaxed) > 0) futex_wake(&ch->recv_ftx, 1); return 0; } ``` * `chan_tryrecv_buf` 、`chan_recv_buf` 行為和 `chan_trysendbuf` 、`chan_send_buf` 原理相同。 接下來是 unbuffered 的通道。 * `int chan_send_unbuf(struct chan *ch, void *data)` 先檢查通道是否開啟 ```c if (atomic_load_explicit(&ch->closed, memory_order_relaxed)) { errno = EPIPE; return -1; } ``` 使用 mutex 上鎖 `&ch->send_mtx` ```c mutex_lock(&ch->send_mtx); ``` 接著嘗試將資料放入 channel 中,成功的話,代表`ch->data` 和 `ptr` 一樣,此時 `ch->data` 就會是 `data` ; 失敗的話,則 `ptr` 會被替換成 `ch->datap`。 失敗時,由於目前 `ptr` 為 `ch->datap` ,為 channel 的 data 交換區的指標的指標,使用 `*ptr` 將指標內容置換成 `data` ,接著再將 `ch->datap` 指向 NULL。 使用`atomic_fetch_sub_explicit(&ch->recv_ftx, 1, memory_order_acquire) == CHAN_WAITING` 查看是否接收端在等待接收,有的話則喚醒。 ```c void **ptr = NULL; if (!atomic_compare_exchange_strong_explicit(&ch->datap, &ptr, &data, memory_order_acq_rel, memory_order_acquire)) { *ptr = data; atomic_store_explicit(&ch->datap, NULL, memory_order_release); if (atomic_fetch_sub_explicit(&ch->recv_ftx, 1, memory_order_acquire) == CHAN_WAITING) futex_wake(&ch->recv_ftx, CCCC = 1); } ``` 替換成功,則代表目前 `ch->data` 指向 `data`,使用 atomic 指令將 `&ch->send_ftx` +1,並回傳 `&ch->send_ftx` 初始值是否為 `CHAN_NOT_READY = 1`,是的話就持續等到接收端接收。 `futex_wait(&ch->send_ftx, CHAN_WAITING)` 會持續等待,直到 `&ch->send_ftx` 不等於 `CHAN_WAITING`。 最後再歸還鎖。 ```c else { if (atomic_fetch_add_explicit(&ch->send_ftx, 1, memory_order_acquire) == CHAN_NOT_READY) { do { futex_wait(&ch->send_ftx, CHAN_WAITING); } while (atomic_load_explicit( &ch->send_ftx, memory_order_acquire) == CHAN_WAITING); if (atomic_load_explicit(&ch->closed, memory_order_relaxed)) { errno = EPIPE; return -1; } } } mutex_unlock(&ch->send_mtx); return 0; } ``` * `chan_recv_unbuf` 和`chan_send_unbuf` 概念類似。 * `static int chan_recv_unbuf(struct chan *ch, void **data)` 先檢查要接收的位置 data 是否存在,和檢查 channel 是否開啟 ```c if (!data) { errno = EINVAL; return -1; } if (atomic_load_explicit(&ch->closed, memory_order_relaxed)) { errno = EPIPE; return -1; } ``` 將上鎖 `ch->recv_mtx` 上鎖,避免同時執行緒進行接收。 ```c mutex_lock(&ch->recv_mtx); ``` 檢查 `ch->datap` 是否為 NULL,是的話代表還**沒有傳送端傳送資料**;否的話代表已有**接收端傳送資料**。 已有傳送資料的情況下,會將要資料接收的位置 `data` 替換成 `ptr` ,而目前 `ptr` = `ch->datap` ,為傳送端傳送的資料。再將 `ch->datap` 指向 NULL,代表資料接收已完成,將傳送端的 futex 減 1 並喚醒先前等待的傳送端執行緒。 ```c void **ptr = NULL; if (!atomic_compare_exchange_strong_explicit(&ch->datap, &ptr, data, memory_order_acq_rel, memory_order_acquire)) { *data = *ptr; atomic_store_explicit(&ch->datap, NULL, memory_order_release); if (atomic_fetch_sub_explicit(&ch->send_ftx, 1, memory_order_acquire) == CHAN_WAITING) futex_wake(&ch->send_ftx, 1); } ``` 因為目前傳送端尚未傳送資料,接下來會將 `ch->recv_ftx` 加一表示等待,直到傳送端喚醒此執行緒。 ```c else { if (atomic_fetch_add_explicit(&ch->recv_ftx, 1, memory_order_acquire) == CHAN_NOT_READY) { do { futex_wait(&ch->recv_ftx, CHAN_WAITING); } while (atomic_load_explicit( &ch->recv_ftx, memory_order_acquire) == CHAN_WAITING); if (atomic_load_explicit(&ch->closed, memory_order_relaxed)) { errno = EPIPE; return -1; } } } ``` 最後歸還鎖 ```c mutex_unlock(&ch->recv_mtx); ``` 我們可以歸納出,如果傳送端先傳送資料, `ch->datap` 不為 NULL ,而接收端會進入 CAS 失敗的情況,喚醒傳送端的執行緒;反之,接收端先等待資料, `ch->datap` 不為 NULL,傳送端會進入 CAS 失敗,喚醒等待的接收端執行緒。 * `chan_close` 根據 `ch->cap` 決定要關閉的 channel 種類 ```c void chan_close(struct chan *ch) { ch->closed = true; if (!ch->cap) { atomic_store(&ch->recv_ftx, CHAN_CLOSED); atomic_store(&ch->send_ftx, CHAN_CLOSED); } futex_wake(&ch->recv_ftx, INT_MAX); futex_wake(&ch->send_ftx, INT_MAX); } ``` ### 排除 ThreadSanitizer 的錯誤訊息並提出改進方案 ThreadSanitizer 是一個可以檢查各個執行續間有沒有發生 Data race,並且會在編譯時進行偵測程式碼,並在程式運行時紀錄每個執行緒的行為,如造訪的記憶體位置、讀寫行為等,最後再進行報告。 我們可以寫一個會造成 Data race 的程式 `data_race.c` ```c #include <stdio.h> #include <pthread.h> pthread_mutex_t lock; int shared_var = 0; int times = 1000000; void* increment(void* arg) { for (int i = 0; i < times; ++i) { ++shared_var; } return NULL; } int main() { pthread_t t1, t2; pthread_create(&t1, NULL, increment, NULL); pthread_create(&t2, NULL, increment, NULL); pthread_join(t1, NULL); pthread_join(t2, NULL); printf("Final value: %d\n", shared_var); return 0; } ``` 並使用 ThreadSanitizer 進行編譯 ``` gcc -g -fsanitize=thread -o data_race data_race.c ``` 執行 `data_race.c` ``` FATAL: ThreadSanitizer: unexpected memory mapping 0x63faef1d9000-0x63faef1da000 ``` 反而出現的不是 Data race 的問題,而是 memory mapping 的問題,在 [Thread Sanitizer FATAL error on kernel version](https://github.com/google/sanitizers/issues/1716) 說明到可能是 ASLR(Address space layout randomization) 的問題。 ASLR 通過隨機地分配程序的記憶體位置,防止惡意程式或攻擊者利用已知的記憶體位置來進行攻擊,預設是 32 bit,我們將其設置為 28 bit。 ```shell $ sudo sysctl -w vm.mmap_rnd_bits=28 vm.mmap_rnd_bits = 28 ``` 再執行一次 `data_race.c` ```c ================== WARNING: ThreadSanitizer: data race (pid=43074) Read of size 4 at 0x55939c8a7018 by thread T2: #0 increment /home/kkkkk1109/2024q1week12/exam2/data_race.c:13 (data_race+0x129d) Previous write of size 4 at 0x55939c8a7018 by thread T1: #0 increment /home/kkkkk1109/2024q1week12/exam2/data_race.c:13 (data_race+0x12b5) Location is global '<null>' at 0x000000000000 (data_race+0x000000004018) Thread T2 (tid=43077, running) created by main thread at: #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8) #1 main /home/kkkkk1109/2024q1week12/exam2/data_race.c:26 (data_race+0x134e) Thread T1 (tid=43076, running) created by main thread at: #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8) #1 main /home/kkkkk1109/2024q1week12/exam2/data_race.c:25 (data_race+0x1331) SUMMARY: ThreadSanitizer: data race /home/kkkkk1109/2024q1week12/exam2/data_race.c:13 in increment ================== Final value: 2000000 ThreadSanitizer: reported 1 warnings ``` 說明發生了 data race。 接著在程式中加入 mutex 進行保護 ::: spoiler `data_race_protect.c` ```diff #include <stdio.h> #include <pthread.h> ++pthread_mutex_t lock; int shared_var = 0; int times = 1000000; void* increment(void* arg) { for (int i = 0; i < times; ++i) { ++ pthread_mutex_lock(&lock); shared_var; ++ pthread_mutex_unlock(&lock); } return NULL; } int main() { pthread_t t1, t2; ++ pthread_mutex_init(&lock, NULL); pthread_create(&t1, NULL, increment, NULL); pthread_create(&t2, NULL, increment, NULL); pthread_join(t1, NULL); pthread_join(t2, NULL); ++ pthread_mutex_destroy(&lock); printf("Final value: %d\n", shared_var); return 0; } ``` ::: ThreadSanitizer 就沒有報錯了 ``` $ ./data_race_protect Final value: 2000000 ``` 接著在測驗 `2` 加入 ThreadSanitizer 進行編譯 ```c CFLAGS = -std=c11 -Wall -Wextra -pthread -fsanitize=thread ``` 也出現了 data race 的問題,發現是 unbuffered 發生了問題 ```c WARNING: ThreadSanitizer: data race (pid=77546) Read of size 8 at 0x7f96901be1c8 by thread T1: #0 reader <null> (exam2+0x16a0) Previous write of size 8 at 0x7f96901be1c8 by thread T81: #0 chan_send_unbuf <null> (exam2+0x2e58) #1 chan_send <null> (exam2+0x3477) #2 writer <null> (exam2+0x15b2) Location is stack of thread T1. Thread T1 (tid=77548, running) created by main thread at: #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8) #1 create_threads <null> (exam2+0x18b0) #2 test_chan <null> (exam2+0x1b81) #3 main <null> (exam2+0x1d3b) Thread T81 (tid=77628, finished) created by main thread at: #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8) #1 create_threads <null> (exam2+0x18b0) #2 test_chan <null> (exam2+0x1baf) #3 main <null> (exam2+0x1d3b) ``` 執行緒 `T1` 在 `reader` 和執行緒 `T81` 在 `chan_send_unbuf` 同時存取同個記憶體位置 `0x7f96901be1c8` ,不過只靠此訊息無法得知實際是哪個變數被同時存取,因此我使用了 [Helgrind](https://valgrind.org/docs/manual/hg-manual.html) 來輔助進行 data race 的除錯。 Helgind 是 Valgrind 的其中一個工具,可以檢測使用以下事項 * POSIX thread API 的使用錯誤 * deadlock * data race 使用 helgrind 來檢查 data race ``` $ valgrind --tool=helgrind ./exam2 ``` :::spoiler ```c Possible data race during write of size 8 at 0x4AA1048 by thread #12 ==7850== Locks held: none ==7850== at 0x10A22E: chan_send_unbuf (in /home/kkkkk1109/2024q1week12/exam2/exam2) ==7850== by 0x10A5F6: chan_send (in /home/kkkkk1109/2024q1week12/exam2/exam2) ==7850== by 0x1092A5: writer (in /home/kkkkk1109/2024q1week12/exam2/exam2) ==7850== by 0x485396A: ??? (in /usr/libexec/valgrind/vgpreload_helgrind-amd64-linux.so) ==7850== by 0x4909AC2: start_thread (pthread_create.c:442) ==7850== by 0x499AA03: clone (clone.S:100) ==7850== ==7850== This conflicts with a previous read of size 8 by thread #2 ==7850== Locks held: none ==7850== at 0x10A3AF: chan_recv_unbuf (in /home/kkkkk1109/2024q1week12/exam2/exam2) ==7850== by 0x10A641: chan_recv (in /home/kkkkk1109/2024q1week12/exam2/exam2) ==7850== by 0x109329: reader (in /home/kkkkk1109/2024q1week12/exam2/exam2) ==7850== by 0x485396A: ??? (in /usr/libexec/valgrind/vgpreload_helgrind-amd64-linux.so) ==7850== by 0x4909AC2: start_thread (pthread_create.c:442) ==7850== by 0x499AA03: clone (clone.S:100) ==7850== Address 0x4aa1048 is 8 bytes inside a block of size 80 alloc'd ==7850== at 0x484A919: malloc (in /usr/libexec/valgrind/vgpreload_helgrind-amd64-linux.so) ==7850== by 0x109BCE: chan_make (in /home/kkkkk1109/2024q1week12/exam2/exam2) ==7850== by 0x1095B3: test_chan (in /home/kkkkk1109/2024q1week12/exam2/exam2) ==7850== by 0x1097DA: main (in /home/kkkkk1109/2024q1week12/exam2/exam2) ==7850== Block was alloc'd by thread #1 ``` ::: 說明是 `chan_send_unbuf` 和 `chan_recv_unbuf` 中同時寫入和讀取了一個 `size = 8` 的變數,而此變數是在 72byte 的結構中宣告的,推測是在 channel 中的變數,而透過觀察`chan_send_unbuf` 和 `chan_recv_unbuf` ```c static int chan_send_unbuf(struct chan *ch, void *data) { ... void **ptr = NULL; if (!atomic_compare_exchange_strong_explicit(&ch->datap, &ptr, &data, memory_order_acq_rel, memory_order_acquire)) { *ptr = data; atomic_store_explicit(&ch->datap, NULL, memory_order_release); if (atomic_fetch_sub_explicit(&ch->recv_ftx, 1, memory_order_acquire) == CHAN_WAITING) futex_wake(&ch->recv_ftx, CCCC); } ... } ``` ```c static int chan_recv_unbuf(struct chan *ch, void **data) { ... void **ptr = NULL; if (!atomic_compare_exchange_strong_explicit(&ch->datap, &ptr, data, memory_order_acq_rel, memory_order_acquire)) { *data = *ptr; atomic_store_explicit(&ch->datap, NULL, memory_order_release); if (atomic_fetch_sub_explicit(&ch->send_ftx, 1, memory_order_acquire) == CHAN_WAITING) futex_wake(&ch->send_ftx, 1); } ... } ``` 推測可能在 `*ptr = data` 和 `*data = *ptr` 沒有保護,可能造成同時寫入和讀取的問題 嘗試加入一個新的 mutex `data_mtx` ,在要修改前先使用鎖保護 ```diff static int chan_recv_unbuf(struct chan *ch, void **data) { ... ++ mutex_lock(&ch->data_mtx) *data = *ptr; ++ mutex_unlock(&ch->data_mtx); ... } static int chan_send_unbuf(struct chan *ch, void *data) { ... ++ mutex_lock(&ch->data_mtx) *ptr = data; ++ mutex_unlock(&ch->data_mtx); ... } ``` 仍然沒有解決 data race 的問題。 ::: info 但後來思考,不應該兩者同時進入此區塊,因為在 `chan_send_unbuf` 和 `chan_recv_unbuf` 中,比較式為 atomic 操作,若一方替換成功,則另一方必定去到替換失敗區,可能問題不出在這? ::: 透過讓程式 print 出 目前所在的區塊,發現確實不會有兩者同時在 cas 成功或 cas 失敗的區塊 ``` ... recv in cas success send in cas failed send in cas success recv in cas failed ... ``` 改成使用 `atomic` 指令進行 `*ptr = data` 和 `*data = *ptr`。 ```diff static int chan_send_unbuf(struct chan *ch, void *data) { ... - //*ptr = data; + atomic_store_explicit(ptr, data, memory_order_release); ... } static int chan_recv_unbuf(struct chan *ch, void *data) { ... - //*data= *ptr; + atomic_store_explicit(data, *ptr, memory_order_release); ... } ``` 還有其他可能發生 data race 的兩個地方,在 `reader` 和 `chan_send_unbuf` :::spoiler ```c ==12438== Possible data race during read of size 8 at 0x56A0DB8 by thread #2 ==12438== Locks held: none ==12438== at 0x10932F: reader (in /home/kkkkk1109/2024q1week12/exam2/exam2) ==12438== by 0x485396A: ??? (in /usr/libexec/valgrind/vgpreload_helgrind-amd64-linux.so) ==12438== by 0x4909AC2: start_thread (pthread_create.c:442) ==12438== by 0x499AA03: clone (clone.S:100) ==12438== ==12438== This conflicts with a previous write of size 8 by thread #12 ==12438== Locks held: none ==12438== at 0x10A1FC: chan_send_unbuf (in /home/kkkkk1109/2024q1week12/exam2/exam2) ==12438== by 0x10A5F6: chan_send (in /home/kkkkk1109/2024q1week12/exam2/exam2) ==12438== by 0x1092A5: writer (in /home/kkkkk1109/2024q1week12/exam2/exam2) ==12438== by 0x485396A: ??? (in /usr/libexec/valgrind/vgpreload_helgrind-amd64-linux.so) ==12438== by 0x4909AC2: start_thread (pthread_create.c:442) ==12438== by 0x499AA03: clone (clone.S:100) ==12438== Address 0x56a0db8 is on thread #2's stack ==12438== in frame #0, created by reader (???:) ``` ::: 以下是 `reader` 程式碼 ```c void *reader(void *arg) { struct thread_arg *a = arg; size_t msg = 0, received = 0, expect = a->to - a->from; printf("address of msg is %lx\n",&msg); while (received < expect) { if (chan_recv(a->ch, (void **) &msg) == -1) break; atomic_fetch_add_explicit(&msg_count[msg], 1, memory_order_relaxed); ++received; } return 0; } ``` 推測是 `reader` 創造的 `msg` 發生了 data race,將 `msg` 的記憶體位置 `print` 出 ```c address of msg is 7fecce6be1c8 ================== WARNING: ThreadSanitizer: data race (pid=14787) Read of size 8 at 0x7fecce6be1c8 by thread T1: #0 reader <null> (exam2+0x16cf) Previous atomic write of size 8 at 0x7fecce6be1c8 by thread T11: ``` 確實是 msg 的問題 :::info 懷疑是 `reader` 中的 `msg_count[msg]` 的 `msg`,需要先經過 `atomic` 的處理 ::: 將 `msg` 的讀取改成以變數 `count` 進行 atomic 操作 ```diff static void *reader(void *arg) { ... + count = atomic_load(&msg); + atomic_fetch_add_explicit(&msg_count[count], 1, memory_order_relaxed); - atomic_fetch_add_explicit(&msg_count[msg], 1, memory_order_relaxed); ++received; } return 0; } ``` 便成功沒有報錯! > [commit](https://github.com/kkkkk1109/2024q1week12/commit/188319d5598c10b58f3c2c0280d1e0aa88e402e8) ### 測驗 `3` 此題為實作一個 lock-free 的 single-producer/single-consumer,使用 ring buffer 且避免造成 false sharing。 定義了 `counter_t` 用為計數,注意到這邊使用 `union` ,並用 `w` 和 `r` 區分寫入和讀取。 ```c typedef union { volatile uint32_t w; volatile const uint32_t r; } counter_t; ``` 接著看 `spsc_queue` ```c typedef struct spsc_queue { counter_t head; /* Mostly accessed by producer */ volatile uint32_t batch_head; counter_t tail __ALIGN; /* Mostly accessed by consumer */ volatile uint32_t batch_tail; unsigned long batch_history; /* For testing purpose */ uint64_t start_c __ALIGN; uint64_t stop_c; element_t data[SPSC_QUEUE_SIZE] __ALIGN; /* accessed by prod and coms */ } __ALIGN spsc_queue_t; ``` 可以看到使用了 `head` 、 `tail` 作為 ring buffer 的開頭和結尾。而這邊使用了 ` __ALIGN`,其定義為 ```c #define __ALIGN __attribute__((aligned(64))) ``` 由於題目有說明要避免 false sharing,因此將常使用到的物件以 64byte 為一個單位,如此一來就不會被存在同個 cacheline,而不會造成頻繁的 cache coherence。 :::info `batch_head` 、`batch_tail` 和 `batch_history` 為批次處理,是一次讀取 `batch_size` 的量嗎 ::: `element_t data[SPSC_QUEUE_SIZE] __ALIGN` 宣告了此 ring buffer 的大小。 接著是關於 queue 的操作 `queue_init` 使用 [`memset`](https://man7.org/linux/man-pages/man3/memset.3.html) 將 queue 中的各個數值以 0 填滿。 ```c static void queue_init(spsc_queue_t *self) { memset(self, 0, sizeof(spsc_queue_t)); self->batch_history = SPSC_BATCH_SIZE; } ``` * `SPSC_QUEUE_SIZE = (1024 * 8)` 為 ring buffer 的大小 * `SPSC_BATCH_SIZE` 為 queue_size 除以 16 * `SPSC_BATCH_INCREAMENT = (SPSC_BATCH_SIZE / 2)` 為 batch 一次增加的量 * `SPSC_CONGESTION_PENALTY (1000)` 作為等待的時間 接著看 `dequeue` 首先,先取得上一次放入的 batch 大小, `*val_ptr` 為將取出資料的放入的空間。 :::info 若是第一次執行的話,batch_history = SPSC_BATCH_SIZE ::: ```c static int dequeue(spsc_queue_t *self, element_t *val_ptr) { unsigned long batch_size = self->batch_history; *val_ptr = SPSC_QUEUE_ELEMENT_ZERO; ... } ``` :::success 第一次執行的話, tail.r = batch_tail = 0 ::: 會判斷目前 `tail` 是否達到 `batch_tail` ,達到的話就要增加 `bathc_tail` 的量,先以 `tmp_tail` 去判斷要增加的量。 檢查 `tmp_tail` 是否達到 ring buffer 結尾,接著若 `batch_history` 小於 `SPSC_BATCH_SIZE` 的話,則要增加 `batch_history` 的大小,有兩個選項,比較 `SPSC_BATCH_SIZE` 和 `batch_history + SPSC_BATCH_INCREAMENT` ,選擇較小的一個做為新的 `batch_history` ```c /* Try to zero in on the next batch tail */ if (self->tail.r == self->batch_tail) { uint32_t tmp_tail = self->tail.r + SPSC_BATCH_SIZE; // check if the batch_tail meet the end of ring buffer if (tmp_tail >= SPSC_QUEUE_SIZE) { tmp_tail = 0; // determine the batch_history if (self->batch_history < SPSC_BATCH_SIZE) { self->batch_history = (SPSC_BATCH_SIZE < (self->batch_history + SPSC_BATCH_INCREAMENT)) ? SPSC_BATCH_SIZE : (self->batch_history + SPSC_BATCH_INCREAMENT); } } ... } ``` 目前已經決定 `tmp_tail` ,不過並非直接指定這樣大小的 batch_size,會先等待後續的空間已經有資料可以讀取,因此以 `while (!(self->data[tmp_tail]))` 判斷是否有資料,沒有的話便等待 producer 產生資料,並將 `batch_size` 縮小,重複步驟直到有 producer 產生的資料。 ```c if (self->tail.r == self->batch_tail) { ... batch_size = self->batch_history; while (!(self->data[tmp_tail])) { wait_ticks(SPSC_CONGESTION_PENALTY); batch_size >>= 1; if (batch_size == 0) return SPSC_Q_EMPTY; tmp_tail = self->tail.r + batch_size; if (tmp_tail >= SPSC_QUEUE_SIZE) tmp_tail = 0; } self->batch_history = batch_size; if (tmp_tail == self->tail.r) tmp_tail = (tmp_tail + 1) >= SPSC_QUEUE_SIZE ? 0 : tmp_tail + 1; self->batch_tail = tmp_tail; } ``` :::info 最後 `if (tmp_tail == self->tail.r)` 猜測為判斷是否回到 tail.r 因為 ring buffer 為環形結構。 ::: 接著是實際取出資料,使用 `val_ptr` 獲得資料 `self->data[self->tail.r]` ```c *val_ptr = self->data[self->tail.r]; self->data[self->tail.r] = SPSC_QUEUE_ELEMENT_ZERO; self->tail.w++; if (self->tail.r >= SPSC_QUEUE_SIZE) self->tail.w = 0; return SPSC_OK; ``` 接下來看如何使用 `enqueue` 放入資料,若 `head` 達到 `batch_head`,則使用 tmp_head 去決定下次 batch_head 的位置。 這邊和 consumer 不同的是,每次都是以 `SPSC_BATCH_SIZE` 來增加。 若 `tmp_head` 的部分有資料,則代表已經 ring buffer 已經滿了,無法再放入資料。 ```c static int enqueue(spsc_queue_t *self, element_t value) { /* Try to zero in on the next batch head. */ if (self->head.r == self->batch_head) { uint32_t tmp_head = self->head.r + SPSC_BATCH_SIZE; if (tmp_head >= SPSC_QUEUE_SIZE) tmp_head = 0; // wait if (self->data[tmp_head]) { /* run spin cycle penality */ wait_ticks(SPSC_CONGESTION_PENALTY); return SPSC_Q_FULL; } self->batch_head = tmp_head; } // put value in ring buffer self->data[self->head.r] = value; self->head.w++; if (self->head.r >= SPSC_QUEUE_SIZE) self->head.w = 0; return SPSC_OK; } ``` 接著看對應的 consumer 和 producer 操作 定義了 `struct init_info_t` ```c typedef struct { uint32_t cpu_id; pthread_barrier_t *barrier; } init_info_t; ``` `cpu_id` 對應到不同的 cpu ,`barrier` 作為一個同步的手段,會進入等待其他執行緒完成特定事項。 * consumer ```c void *consumer(void *arg) { element_t value = 0, old_value = 0; init_info_t *init = (init_info_t *) arg; uint32_t cpu_id = init->cpu_id; pthread_barrier_t *barrier = init->barrier; /* user needs tune this according to their machine configurations. */ cpu_set_t cur_mask; CPU_ZERO(&cur_mask); CPU_SET(cpu_id * 2, &cur_mask); printf("consumer %d: ---%d----\n", cpu_id, 2 * cpu_id); if (sched_setaffinity(0, sizeof(cur_mask), &cur_mask) < 0) { printf("Error: sched_setaffinity\n"); return NULL; } ... } ``` 首先,[`cpu_set_t`](https://man7.org/linux/man-pages/man3/CPU_SET.3.html) 定義為多個 cpu 的集合,而使用 `CPU_ZERO` 將 `cur_mask` 設為沒有 CPU,再使用 `CPU_SET` 加入 `cpu_id * 2` 的 CPU。 接著,使用 [`sched_setaffinity`](https://man7.org/linux/man-pages/man2/sched_setaffinity.2.html) 將 thread 運行在特定的 CPU ```c int sched_setaffinity(pid_t pid, size_t cpusetsize, const cpu_set_t *mask); ``` 如圖 ![image](https://hackmd.io/_uploads/H1QPqSTI0.png) ``` consumer 1: ---2---- consumer 2: ---4---- Consumer created... consumer 3: ---6---- producer 0: ---1---- Consumer created... consumer 5: ---10---- Consumer created... Consumer created... consumer 4: ---8---- Consumer created... consumer: 27 cycles/op consumer: 27 cycles/op consumer: 27 cycles/op consumer: 27 cycles/op consumer: 27 cycles/op producer 5 cycles/op Done! ``` 若 pid 為零,則會選擇呼叫此函式的 thread ```c ... printf("Consumer created...\n"); pthread_barrier_wait(barrier); queues[cpu_id].start_c = read_tsc(); ... ``` 接著,使用 `barrier` 等到所有 `consumer` 完成 `sched_setaffinity` `read_tsc()` 對應如下,主要為獲取時間 ```c static inline uint64_t read_tsc() { uint64_t time; uint32_t msw, lsw; __asm__ __volatile__( "rdtsc\n\t" "movl %%edx, %0\n\t" "movl %%eax, %1\n\t" : "=r"(msw), "=r"(lsw) : : "%edx", "%eax"); time = ((uint64_t) msw << 32) | lsw; return time; } ``` ```c for (uint64_t i = 1; i <= TEST_SIZE; i++) { while (dequeue(&queues[cpu_id], &value) != 0) ; assert((old_value + 1) == value); old_value = value; } queues[cpu_id].stop_c = read_tsc(); printf( "consumer: %ld cycles/op\n", ((queues[cpu_id].stop_c - queues[cpu_id].start_c) / (TEST_SIZE + 1))); pthread_barrier_wait(barrier); return NULL; ``` 接著,執行 `dequeue` ,並記錄完成時間,使用 `barrier` 等到所有 consumer 完成。 * producer ```c void producer(void *arg, uint32_t num) { ... for (uint64_t i = 1; i <= TEST_SIZE + SPSC_BATCH_SIZE; i++) { for (int32_t j = 1; j < num; j++) { element_t value = i; while (enqueue(&queues[j], value) != 0) ; } } ... } ``` 內容和 `consumer` 僅有從 `dequeue` 換成 `enqueue`。 ### 排除 ThreadSanitizer 的錯誤訊息並提出改進方案 在編譯時加入 ```shell gcc -Wall -O2 -I. -o main main.c -lpthread -fsanitize=thread ``` 產生錯誤訊息 ```c ================== WARNING: ThreadSanitizer: thread leak (pid=6908) Thread T1 (tid=6910, finished) created by main thread at: #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8) #1 main <null> (exam3+0x1293) And 1 more similar thread leaks. SUMMARY: ThreadSanitizer: thread leak (/home/kkkkk1109/2024q1week12/exam3/exam3+0x1293) in main ``` 說明發生 thread leak ,可能有 thread 沒有適當的釋放,猜測是 `consumer_thread` 沒有被安全釋放,加上 `pthread_detach` 來安全釋放 thread ```c for (int i = 1; i < max_threads; i++) { INIT_CPU_ID(i) = i; INIT_BARRIER(i) = &barrier; error = pthread_create(&consumer_thread, &consumer_attr, consumer, INIT_PTR(i)); pthread_detach(consumer_thread); } ``` 便沒有報錯訊息了 > [commit](https://github.com/kkkkk1109/2024q1week12/commit/4672babc762546b51f7cba571619225979a5db9b)