owned this note
owned this note
Published
Linked with GitHub
# 2021q1 quiz8-1
contributed by < `cyrong` >
> [quiz8-1](https://hackmd.io/@sysprog/linux2021-quiz8/https%3A%2F%2Fhackmd.io%2F%40sysprog%2FB1pE7D-8d#undefined)
::: spoiler 參考解答
#### `AAA` = ?
- [x] `&node->back`
#### `BBB` = ?
- [x] `&node->back`
#### `CCC` = ?
- [x] `1`
#### `DDD` = ?
- [x] `&node->front`
#### `EEE` = ?
- [x] `&node->front`
:::
## 程式運作原理
實做 SPSC (single-producer and single-consumer)
### `struct`
#### `spmc_node_t`
```c
typedef struct __spmc_node {
size_t cap; /* One more than the number of slots available */
_Atomic size_t front, back;
struct __spmc_node *_Atomic next;
uintptr_t buf[];
} spmc_node_t;
```
spmc queue 中操作的 node
- `cap` : 在這個 node 中可以容納多少個 element
- `front` : node 中會實做 ring buffer , `front` 用來指明在此 ring 中的開頭 index ,用來 dequeue
- `back` : 與 `front` 配合用來指名 ring 中的結尾,用來 enqueue
- `next` : 在 spmc queue 中用來鏈接,指向下一個 spmc node
#### `spmc_base`
```c
struct spmc_base {
/* current node which enqueues/dequeues */
spmc_node_t *_Atomic curr_enqueue, *_Atomic curr_dequeue;
uint8_t last_power;
spmc_destructor_t destructor;
};
```
spmc queue 的本體
- `curr_enqueue` : 目前用來 enqueue 的 spmc node
- `curr_dequeue` : 目前用來 dequeue 的 spmc node
- `last_power` : 當 enqueue 數量越來越多,建立新 node 時會將 `last_power` 加一,讓新 node 能夠容納更多的 element
- `destructor` : 在 `spmc_delete` 時會用到
### `static void init_node()`
初始化一個新的 spmc queue node
在建立 spmc 時用來建立 head
也會在 enqueue 發現前一個 node 空間不足時呼叫
### `spmc_ref_t spmc_new()`
建立一個 spmc queue
建立 `spmc_base` 時也建立一個 `spmc_node` 作為這個 spmc queue 的開頭 `head`
此 `head` 會在 `spmc_base` 後一位的記憶體
將 `curr_enqueue` 與 `curr_dequeue` 初始為 `head`
### `void spmc_delete()`
消除整個 spmc queue
### `bool spmc_enqueue()`
spmc queue 的 enqueue 機制
先取得 `curr_enqueue` 後進行操作
若是 enqueue node 中 buffer 並沒有超過容量上限
則直接將 `element` 放入 `buf[node->back]` 並且將 `node->back` 增加一
但若是 buffer 滿了
先確認下一個 node (`node->next`) 是不是還有空間可以 enqueue (`IS_WRITEABLE`)
若是在確認 `node_next` 時遇到了 `curr_dequeue` ,為了不同時修改正在被讀的 node (`curr_dequeue`)
建立一個新的 node ,並且將 `last_power` 加一
建立完了之後將 `element` 放進新的 node 中 (`buf[node->back]`)
### `bool spmc_dequeue()`
spmc queue 的 dequeue 機制
先取得 `curr_dequeue` 後進行操作
和 enqueue 類似的判斷
先判斷此 node 的 buffer 中是否有 element 可以用來 dequeue (`IS_READABLE`)
若是有,則把 `buf[node->front]` 放入 `*slot` 中
若是沒有,處理手法和 enqueue 類似,原則是不去 read / write 同一個 node ,去 `node->next` 判斷 `READABLE`
最後使用 `atomic_compare_exchange_weak` 將 `node->front` 變更為 `idx+1`
若是此變更失敗,代表有別的 consumer 拿走了剛剛拿的 element
就再實行一次上述動作
### `static void *producer_thread()`
producer task
enqueue `N_ITEM` 個 element 到 spmc 設定為 `1024UL * 8`
### `static void *mc_thread()`
consumer task
在執行下會有三種錯誤訊息
- 1. `Falied to dequeue in mc_thread.`
代表 dequeue function 失敗
但是在 dequeue function 沒有設定什麼條件會 return false
因此發生此錯誤時是因為呼叫程式時記憶體不足
- 2. `consumed twice!`
代表同一個 element 被 consume 兩次
在上方有建立一個 array `observed_count`
用來監測 element 是否有被 consume
而預期結果是每個 element 只會被 consume 一次
- 3. `"%zu after %zu; bad order!\n", (size_t) element, (size_t) greatest`
代表 `element` < `greatest`
預期的 consume 順序會是從小到大,這樣即是 consumer 回頭拿到了早應該被 consume 的元素
最後設定一個 sentinal signal
當 consume 的 element 已經到了 (`N_MC_ITEM - 1`)
將會 enqueue `element + 1` 用作通知下一個應該結束的 consumer
### `int main()`
測試整個 spmc
先建立一個 spmc 一個 producer
再建立 `N_MC_THREAD` 個 consumer
所有 thread 結束後 join
檢驗 `observe_count` 是否每個 element 都被 consume
## 問體討論
### [spsc](https://hackmd.io/@sysprog/linux2021-quiz8/https%3A%2F%2Fhackmd.io%2F%40sysprog%2FB1pE7D-8d#undefined) vs spmc
兩個最大的差別還是 single consumer 跟 multiple consumer
程式實作方面 ringbuffer 沒有用到多執行緒
ringbuffer 流程是 producer 先填滿 buffer
再換 consumer 拿空 buffer
#### enqueue
spsc 在 enqueue 時只有檢查 buffer size 是否還夠
spmc 則是會檢查 `curr_enqueue` 是否有和 `curr_dequeue` 重疊到,用意在於不會同時修改和讀取同一個 node
#### dequeue
和 enqueue 類似,不會同時修改和讀取同一個 node
#### memory order
在程式中有用到 atomic instruction
其中可以設定這些操作的 memory order 來避免 data race
##### relaxed
使用 relaxed 的話,這個指令不會和其他 atomic 指令有順序關係,因此在 load 和 store 到同一個記憶體位址可能會造成 data race,因此要避免這種情況
##### consume
load 使用的 order ,在 current thread 中 dependant on 此 atomic variable 的 讀寫不能被 reorder 到此 load 之前。
還有在別的 threads 中對此 atomic variable 的 release 在此 thread 變成 visible,不會被亂改到
##### aquire
也是 load 使用的 order ,跟 consume 的差別在不管什麼讀寫都不能被 reorder 到此 load 之前
##### release
store 用的 order ,通常跟 consume, release 一起使用
release 的 reorder 限制和 acquire 一樣是對所有讀寫
##### acq_rel
read-modify-write 用的 oreder
同時擁有 acquire 和 release 的特性
##### seq_cst
可以當成 load, store, read-modify-write 的指令
這個 order 可以用在 acquire, release, acq_rel
多加了一條特性是,在所有 thread 中看到的 seq_cst 的 order 都要是相同
#### thread sanitizer
使用以下方式編譯
```c
gcc -fsanitize=thread -fPIE -pie -Wall -Wextra -o spmc spmc.c -lpthread
```
產生的問題數量不是定值,但是有在 Observed 63. 處一定會出現
```c
==================
WARNING: ThreadSanitizer: data race (pid=108857)
Atomic read of size 8 at 0x7b6400000008 by thread T2:
#0 __tsan_atomic64_load <null> (libtsan.so.0+0x7cec7)
#1 spmc_dequeue <null> (spmc+0x1de8)
#2 mc_thread <null> (spmc+0x216c)
#3 <null> <null> (libtsan.so.0+0x2d1af)
Previous write of size 8 at 0x7b6400000008 by thread T1:
#0 malloc <null> (libtsan.so.0+0x30343)
#1 spmc_enqueue <null> (spmc+0x1be1)
#2 producer_thread <null> (spmc+0x2097)
#3 <null> <null> (libtsan.so.0+0x2d1af)
Location is heap block of size 1056 at 0x7b6400000000 allocated by thread T1:
#0 malloc <null> (libtsan.so.0+0x30343)
#1 spmc_enqueue <null> (spmc+0x1be1)
#2 producer_thread <null> (spmc+0x2097)
#3 <null> <null> (libtsan.so.0+0x2d1af)
Thread T2 (tid=108860, running) created by main thread at:
#0 pthread_create <null> (libtsan.so.0+0x5ea99)
#1 main <null> (spmc+0x23d9)
Thread T1 (tid=108859, running) created by main thread at:
#0 pthread_create <null> (libtsan.so.0+0x5ea99)
#1 main <null> (spmc+0x239a)
SUMMARY: ThreadSanitizer: data race (/lib/x86_64-linux-gnu/libtsan.so.0+0x7cec7) in __tsan_atomic64_load
==================
```
這邊看到出問題的地方會是在第65個 observed
這邊剛好是預設的 `cap` 再多一個
也就是 producer 要建立一個新的 node 的時候
兩邊 thread 會 race 的地方
consumer :
```c
#0 __tsan_atomic64_load <null> (libtsan.so.0+0x7cec7)
#1 spmc_dequeue <null> (spmc+0x1de8)
```
spmc_dequeue :
:::spoiler 程式片段
```c=
/* Recieve (dequeue) an item from the SPMC */
bool spmc_dequeue(spmc_ref_t spmc, uintptr_t *slot)
{
spmc_node_t *node =
atomic_load_explicit(&spmc->curr_dequeue, memory_order_consume);
size_t idx;
no_increment:
do {
idx = atomic_load_explicit(&node->front, memory_order_consume);
if (!IS_READABLE(idx, node)) {
if (node != spmc->curr_enqueue)
atomic_compare_exchange_strong(
&spmc->curr_dequeue, &node,
atomic_load_explicit(&node->next, memory_order_relaxed));
goto no_increment;
} else
*slot = node->buf[INDEX_OF(idx, node)];
} while (
!atomic_compare_exchange_weak(&node->front, &(size_t){idx}, idx + 1));
return true;
}
```
:::
`__tsan_atomic64_load` 是在 14 行 load `&node->next`
producer :
```c
#0 malloc <null> (libtsan.so.0+0x30343)
#1 spmc_enqueue <null> (spmc+0x1be1)
```
spmc_enqueue :
:::spoiler 程式片段
```c=
/* Send (enqueue) an item onto the SPMC */
bool spmc_enqueue(spmc_ref_t spmc, uintptr_t element)
{
spmc_node_t *node =
atomic_load_explicit(&spmc->curr_enqueue, memory_order_relaxed);
size_t idx;
retry:
idx = atomic_load_explicit(&node->back, memory_order_consume);
if (!IS_WRITABLE(idx, node)) {
spmc_node_t *const next =
atomic_load_explicit(&node->next, memory_order_relaxed);
/* Never move to write on top of the node that is currently being read;
* In that case, items would be read out of order they were enqueued.
*/
if (next !=
atomic_load_explicit(&spmc->curr_dequeue, memory_order_relaxed)) {
node = next;
goto retry;
}
const uint8_t power = ++spmc->last_power;
assert(power < sizeof(size_t) * CHAR_BIT);
const size_t cap = 1 << power;
spmc_node_t *new_node = malloc(SIZE_FROM_CAP(cap, sizeof(spmc_node_t)));
if (!new_node)
return false;
init_node(new_node, next, cap);
atomic_store_explicit(&node->next, new_node, memory_order_release);
idx = 0;
node = new_node;
}
node->buf[INDEX_OF(idx, node)] = element;
atomic_store_explicit(&spmc->curr_enqueue, node, memory_order_relaxed);
atomic_fetch_add_explicit(&node->back, 1, memory_order_release);
return true;
}
```
:::
malloc 在 24 行
不過這裡指的是上次 write 的地方
真正會發生的跟 consumer 存取到同一個地址的地方是下方
29 行 store `new_node` 到 `&node->next`
其中 `node->next` 就是 data race 的中心
這邊 store 用的 memory order 是 release
而在 consumer 中 load 部份的 memory order 是 relaxed
這樣子這兩條指令因為 relaxed 的關係,所以不會有 order constraint ,就可能發生 data race
於是將 `spmc_dequeue` 14 行的 memory order 改成 consume
這樣就可以解決 Observed 64 處的 data race