# 2021q1 quiz8-1 contributed by < `cyrong` > > [quiz8-1](https://hackmd.io/@sysprog/linux2021-quiz8/https%3A%2F%2Fhackmd.io%2F%40sysprog%2FB1pE7D-8d#undefined) ::: spoiler 參考解答 #### `AAA` = ? - [x] `&node->back` #### `BBB` = ? - [x] `&node->back` #### `CCC` = ? - [x] `1` #### `DDD` = ? - [x] `&node->front` #### `EEE` = ? - [x] `&node->front` ::: ## 程式運作原理 實做 SPSC (single-producer and single-consumer) ### `struct` #### `spmc_node_t` ```c typedef struct __spmc_node { size_t cap; /* One more than the number of slots available */ _Atomic size_t front, back; struct __spmc_node *_Atomic next; uintptr_t buf[]; } spmc_node_t; ``` spmc queue 中操作的 node - `cap` : 在這個 node 中可以容納多少個 element - `front` : node 中會實做 ring buffer , `front` 用來指明在此 ring 中的開頭 index ,用來 dequeue - `back` : 與 `front` 配合用來指名 ring 中的結尾,用來 enqueue - `next` : 在 spmc queue 中用來鏈接,指向下一個 spmc node #### `spmc_base` ```c struct spmc_base { /* current node which enqueues/dequeues */ spmc_node_t *_Atomic curr_enqueue, *_Atomic curr_dequeue; uint8_t last_power; spmc_destructor_t destructor; }; ``` spmc queue 的本體 - `curr_enqueue` : 目前用來 enqueue 的 spmc node - `curr_dequeue` : 目前用來 dequeue 的 spmc node - `last_power` : 當 enqueue 數量越來越多,建立新 node 時會將 `last_power` 加一,讓新 node 能夠容納更多的 element - `destructor` : 在 `spmc_delete` 時會用到 ### `static void init_node()` 初始化一個新的 spmc queue node 在建立 spmc 時用來建立 head 也會在 enqueue 發現前一個 node 空間不足時呼叫 ### `spmc_ref_t spmc_new()` 建立一個 spmc queue 建立 `spmc_base` 時也建立一個 `spmc_node` 作為這個 spmc queue 的開頭 `head` 此 `head` 會在 `spmc_base` 後一位的記憶體 將 `curr_enqueue` 與 `curr_dequeue` 初始為 `head` ### `void spmc_delete()` 消除整個 spmc queue ### `bool spmc_enqueue()` spmc queue 的 enqueue 機制 先取得 `curr_enqueue` 後進行操作 若是 enqueue node 中 buffer 並沒有超過容量上限 則直接將 `element` 放入 `buf[node->back]` 並且將 `node->back` 增加一 但若是 buffer 滿了 先確認下一個 node (`node->next`) 是不是還有空間可以 enqueue (`IS_WRITEABLE`) 若是在確認 `node_next` 時遇到了 `curr_dequeue` ,為了不同時修改正在被讀的 node (`curr_dequeue`) 建立一個新的 node ,並且將 `last_power` 加一 建立完了之後將 `element` 放進新的 node 中 (`buf[node->back]`) ### `bool spmc_dequeue()` spmc queue 的 dequeue 機制 先取得 `curr_dequeue` 後進行操作 和 enqueue 類似的判斷 先判斷此 node 的 buffer 中是否有 element 可以用來 dequeue (`IS_READABLE`) 若是有,則把 `buf[node->front]` 放入 `*slot` 中 若是沒有,處理手法和 enqueue 類似,原則是不去 read / write 同一個 node ,去 `node->next` 判斷 `READABLE` 最後使用 `atomic_compare_exchange_weak` 將 `node->front` 變更為 `idx+1` 若是此變更失敗,代表有別的 consumer 拿走了剛剛拿的 element 就再實行一次上述動作 ### `static void *producer_thread()` producer task enqueue `N_ITEM` 個 element 到 spmc 設定為 `1024UL * 8` ### `static void *mc_thread()` consumer task 在執行下會有三種錯誤訊息 - 1. `Falied to dequeue in mc_thread.` 代表 dequeue function 失敗 但是在 dequeue function 沒有設定什麼條件會 return false 因此發生此錯誤時是因為呼叫程式時記憶體不足 - 2. `consumed twice!` 代表同一個 element 被 consume 兩次 在上方有建立一個 array `observed_count` 用來監測 element 是否有被 consume 而預期結果是每個 element 只會被 consume 一次 - 3. `"%zu after %zu; bad order!\n", (size_t) element, (size_t) greatest` 代表 `element` < `greatest` 預期的 consume 順序會是從小到大,這樣即是 consumer 回頭拿到了早應該被 consume 的元素 最後設定一個 sentinal signal 當 consume 的 element 已經到了 (`N_MC_ITEM - 1`) 將會 enqueue `element + 1` 用作通知下一個應該結束的 consumer ### `int main()` 測試整個 spmc 先建立一個 spmc 一個 producer 再建立 `N_MC_THREAD` 個 consumer 所有 thread 結束後 join 檢驗 `observe_count` 是否每個 element 都被 consume ## 問體討論 ### [spsc](https://hackmd.io/@sysprog/linux2021-quiz8/https%3A%2F%2Fhackmd.io%2F%40sysprog%2FB1pE7D-8d#undefined) vs spmc 兩個最大的差別還是 single consumer 跟 multiple consumer 程式實作方面 ringbuffer 沒有用到多執行緒 ringbuffer 流程是 producer 先填滿 buffer 再換 consumer 拿空 buffer #### enqueue spsc 在 enqueue 時只有檢查 buffer size 是否還夠 spmc 則是會檢查 `curr_enqueue` 是否有和 `curr_dequeue` 重疊到,用意在於不會同時修改和讀取同一個 node #### dequeue 和 enqueue 類似,不會同時修改和讀取同一個 node #### memory order 在程式中有用到 atomic instruction 其中可以設定這些操作的 memory order 來避免 data race ##### relaxed 使用 relaxed 的話,這個指令不會和其他 atomic 指令有順序關係,因此在 load 和 store 到同一個記憶體位址可能會造成 data race,因此要避免這種情況 ##### consume load 使用的 order ,在 current thread 中 dependant on 此 atomic variable 的 讀寫不能被 reorder 到此 load 之前。 還有在別的 threads 中對此 atomic variable 的 release 在此 thread 變成 visible,不會被亂改到 ##### aquire 也是 load 使用的 order ,跟 consume 的差別在不管什麼讀寫都不能被 reorder 到此 load 之前 ##### release store 用的 order ,通常跟 consume, release 一起使用 release 的 reorder 限制和 acquire 一樣是對所有讀寫 ##### acq_rel read-modify-write 用的 oreder 同時擁有 acquire 和 release 的特性 ##### seq_cst 可以當成 load, store, read-modify-write 的指令 這個 order 可以用在 acquire, release, acq_rel 多加了一條特性是,在所有 thread 中看到的 seq_cst 的 order 都要是相同 #### thread sanitizer 使用以下方式編譯 ```c gcc -fsanitize=thread -fPIE -pie -Wall -Wextra -o spmc spmc.c -lpthread ``` 產生的問題數量不是定值,但是有在 Observed 63. 處一定會出現 ```c ================== WARNING: ThreadSanitizer: data race (pid=108857) Atomic read of size 8 at 0x7b6400000008 by thread T2: #0 __tsan_atomic64_load <null> (libtsan.so.0+0x7cec7) #1 spmc_dequeue <null> (spmc+0x1de8) #2 mc_thread <null> (spmc+0x216c) #3 <null> <null> (libtsan.so.0+0x2d1af) Previous write of size 8 at 0x7b6400000008 by thread T1: #0 malloc <null> (libtsan.so.0+0x30343) #1 spmc_enqueue <null> (spmc+0x1be1) #2 producer_thread <null> (spmc+0x2097) #3 <null> <null> (libtsan.so.0+0x2d1af) Location is heap block of size 1056 at 0x7b6400000000 allocated by thread T1: #0 malloc <null> (libtsan.so.0+0x30343) #1 spmc_enqueue <null> (spmc+0x1be1) #2 producer_thread <null> (spmc+0x2097) #3 <null> <null> (libtsan.so.0+0x2d1af) Thread T2 (tid=108860, running) created by main thread at: #0 pthread_create <null> (libtsan.so.0+0x5ea99) #1 main <null> (spmc+0x23d9) Thread T1 (tid=108859, running) created by main thread at: #0 pthread_create <null> (libtsan.so.0+0x5ea99) #1 main <null> (spmc+0x239a) SUMMARY: ThreadSanitizer: data race (/lib/x86_64-linux-gnu/libtsan.so.0+0x7cec7) in __tsan_atomic64_load ================== ``` 這邊看到出問題的地方會是在第65個 observed 這邊剛好是預設的 `cap` 再多一個 也就是 producer 要建立一個新的 node 的時候 兩邊 thread 會 race 的地方 consumer : ```c #0 __tsan_atomic64_load <null> (libtsan.so.0+0x7cec7) #1 spmc_dequeue <null> (spmc+0x1de8) ``` spmc_dequeue : :::spoiler 程式片段 ```c= /* Recieve (dequeue) an item from the SPMC */ bool spmc_dequeue(spmc_ref_t spmc, uintptr_t *slot) { spmc_node_t *node = atomic_load_explicit(&spmc->curr_dequeue, memory_order_consume); size_t idx; no_increment: do { idx = atomic_load_explicit(&node->front, memory_order_consume); if (!IS_READABLE(idx, node)) { if (node != spmc->curr_enqueue) atomic_compare_exchange_strong( &spmc->curr_dequeue, &node, atomic_load_explicit(&node->next, memory_order_relaxed)); goto no_increment; } else *slot = node->buf[INDEX_OF(idx, node)]; } while ( !atomic_compare_exchange_weak(&node->front, &(size_t){idx}, idx + 1)); return true; } ``` ::: `__tsan_atomic64_load` 是在 14 行 load `&node->next` producer : ```c #0 malloc <null> (libtsan.so.0+0x30343) #1 spmc_enqueue <null> (spmc+0x1be1) ``` spmc_enqueue : :::spoiler 程式片段 ```c= /* Send (enqueue) an item onto the SPMC */ bool spmc_enqueue(spmc_ref_t spmc, uintptr_t element) { spmc_node_t *node = atomic_load_explicit(&spmc->curr_enqueue, memory_order_relaxed); size_t idx; retry: idx = atomic_load_explicit(&node->back, memory_order_consume); if (!IS_WRITABLE(idx, node)) { spmc_node_t *const next = atomic_load_explicit(&node->next, memory_order_relaxed); /* Never move to write on top of the node that is currently being read; * In that case, items would be read out of order they were enqueued. */ if (next != atomic_load_explicit(&spmc->curr_dequeue, memory_order_relaxed)) { node = next; goto retry; } const uint8_t power = ++spmc->last_power; assert(power < sizeof(size_t) * CHAR_BIT); const size_t cap = 1 << power; spmc_node_t *new_node = malloc(SIZE_FROM_CAP(cap, sizeof(spmc_node_t))); if (!new_node) return false; init_node(new_node, next, cap); atomic_store_explicit(&node->next, new_node, memory_order_release); idx = 0; node = new_node; } node->buf[INDEX_OF(idx, node)] = element; atomic_store_explicit(&spmc->curr_enqueue, node, memory_order_relaxed); atomic_fetch_add_explicit(&node->back, 1, memory_order_release); return true; } ``` ::: malloc 在 24 行 不過這裡指的是上次 write 的地方 真正會發生的跟 consumer 存取到同一個地址的地方是下方 29 行 store `new_node` 到 `&node->next` 其中 `node->next` 就是 data race 的中心 這邊 store 用的 memory order 是 release 而在 consumer 中 load 部份的 memory order 是 relaxed 這樣子這兩條指令因為 relaxed 的關係,所以不會有 order constraint ,就可能發生 data race 於是將 `spmc_dequeue` 14 行的 memory order 改成 consume 這樣就可以解決 Observed 64 處的 data race