contributed by < cyrong
>
AAA
= ?&node->back
BBB
= ?&node->back
CCC
= ?1
DDD
= ?&node->front
EEE
= ?&node->front
實做 SPSC (single-producer and single-consumer)
struct
spmc_node_t
spmc queue 中操作的 node
cap
: 在這個 node 中可以容納多少個 elementfront
: node 中會實做 ring buffer , front
用來指明在此 ring 中的開頭 index ,用來 dequeueback
: 與 front
配合用來指名 ring 中的結尾,用來 enqueuenext
: 在 spmc queue 中用來鏈接,指向下一個 spmc nodespmc_base
spmc queue 的本體
curr_enqueue
: 目前用來 enqueue 的 spmc nodecurr_dequeue
: 目前用來 dequeue 的 spmc nodelast_power
: 當 enqueue 數量越來越多,建立新 node 時會將 last_power
加一,讓新 node 能夠容納更多的 elementdestructor
: 在 spmc_delete
時會用到static void init_node()
初始化一個新的 spmc queue node
在建立 spmc 時用來建立 head
也會在 enqueue 發現前一個 node 空間不足時呼叫
spmc_ref_t spmc_new()
建立一個 spmc queue
建立 spmc_base
時也建立一個 spmc_node
作為這個 spmc queue 的開頭 head
此 head
會在 spmc_base
後一位的記憶體
將 curr_enqueue
與 curr_dequeue
初始為 head
void spmc_delete()
消除整個 spmc queue
bool spmc_enqueue()
spmc queue 的 enqueue 機制
先取得 curr_enqueue
後進行操作
若是 enqueue node 中 buffer 並沒有超過容量上限
則直接將 element
放入 buf[node->back]
並且將 node->back
增加一
但若是 buffer 滿了
先確認下一個 node (node->next
) 是不是還有空間可以 enqueue (IS_WRITEABLE
)
若是在確認 node_next
時遇到了 curr_dequeue
,為了不同時修改正在被讀的 node (curr_dequeue
)
建立一個新的 node ,並且將 last_power
加一
建立完了之後將 element
放進新的 node 中 (buf[node->back]
)
bool spmc_dequeue()
spmc queue 的 dequeue 機制
先取得 curr_dequeue
後進行操作
和 enqueue 類似的判斷
先判斷此 node 的 buffer 中是否有 element 可以用來 dequeue (IS_READABLE
)
若是有,則把 buf[node->front]
放入 *slot
中
若是沒有,處理手法和 enqueue 類似,原則是不去 read / write 同一個 node ,去 node->next
判斷 READABLE
最後使用 atomic_compare_exchange_weak
將 node->front
變更為 idx+1
若是此變更失敗,代表有別的 consumer 拿走了剛剛拿的 element
就再實行一次上述動作
static void *producer_thread()
producer task
enqueue N_ITEM
個 element 到 spmc 設定為 1024UL * 8
static void *mc_thread()
consumer task
在執行下會有三種錯誤訊息
Falied to dequeue in mc_thread.
consumed twice!
observed_count
"%zu after %zu; bad order!\n", (size_t) element, (size_t) greatest
element
< greatest
N_MC_ITEM - 1
)element + 1
用作通知下一個應該結束的 consumerint main()
測試整個 spmc
先建立一個 spmc 一個 producer
再建立 N_MC_THREAD
個 consumer
所有 thread 結束後 join
檢驗 observe_count
是否每個 element 都被 consume
兩個最大的差別還是 single consumer 跟 multiple consumer
程式實作方面 ringbuffer 沒有用到多執行緒
ringbuffer 流程是 producer 先填滿 buffer
再換 consumer 拿空 buffer
spsc 在 enqueue 時只有檢查 buffer size 是否還夠
spmc 則是會檢查 curr_enqueue
是否有和 curr_dequeue
重疊到,用意在於不會同時修改和讀取同一個 node
和 enqueue 類似,不會同時修改和讀取同一個 node
在程式中有用到 atomic instruction
其中可以設定這些操作的 memory order 來避免 data race
使用 relaxed 的話,這個指令不會和其他 atomic 指令有順序關係,因此在 load 和 store 到同一個記憶體位址可能會造成 data race,因此要避免這種情況
load 使用的 order ,在 current thread 中 dependant on 此 atomic variable 的 讀寫不能被 reorder 到此 load 之前。
還有在別的 threads 中對此 atomic variable 的 release 在此 thread 變成 visible,不會被亂改到
也是 load 使用的 order ,跟 consume 的差別在不管什麼讀寫都不能被 reorder 到此 load 之前
store 用的 order ,通常跟 consume, release 一起使用
release 的 reorder 限制和 acquire 一樣是對所有讀寫
read-modify-write 用的 oreder
同時擁有 acquire 和 release 的特性
可以當成 load, store, read-modify-write 的指令
這個 order 可以用在 acquire, release, acq_rel
多加了一條特性是,在所有 thread 中看到的 seq_cst 的 order 都要是相同
使用以下方式編譯
產生的問題數量不是定值,但是有在 Observed 63. 處一定會出現
這邊看到出問題的地方會是在第65個 observed
這邊剛好是預設的 cap
再多一個
也就是 producer 要建立一個新的 node 的時候
兩邊 thread 會 race 的地方
consumer :
spmc_dequeue :
__tsan_atomic64_load
是在 14 行 load &node->next
producer :
spmc_enqueue :
malloc 在 24 行
不過這裡指的是上次 write 的地方
真正會發生的跟 consumer 存取到同一個地址的地方是下方
29 行 store new_node
到 &node->next
其中 node->next
就是 data race 的中心
這邊 store 用的 memory order 是 release
而在 consumer 中 load 部份的 memory order 是 relaxed
這樣子這兩條指令因為 relaxed 的關係,所以不會有 order constraint ,就可能發生 data race
於是將 spmc_dequeue
14 行的 memory order 改成 consume
這樣就可以解決 Observed 64 處的 data race