2021q1 quiz8-1

contributed by < cyrong >

quiz8-1

參考解答

AAA = ?

  • &node->back

BBB = ?

  • &node->back

CCC = ?

  • 1

DDD = ?

  • &node->front

EEE = ?

  • &node->front

程式運作原理

實做 SPSC (single-producer and single-consumer)

struct

spmc_node_t

typedef struct __spmc_node {
    size_t cap; /* One more than the number of slots available */
    _Atomic size_t front, back;
    struct __spmc_node *_Atomic next;
    uintptr_t buf[];
} spmc_node_t;

spmc queue 中操作的 node

  • cap : 在這個 node 中可以容納多少個 element
  • front : node 中會實做 ring buffer , front 用來指明在此 ring 中的開頭 index ,用來 dequeue
  • back : 與 front 配合用來指名 ring 中的結尾,用來 enqueue
  • next : 在 spmc queue 中用來鏈接,指向下一個 spmc node

spmc_base

struct spmc_base {
    /* current node which enqueues/dequeues */
    spmc_node_t *_Atomic curr_enqueue, *_Atomic curr_dequeue;
    uint8_t last_power;
    spmc_destructor_t destructor;
};

spmc queue 的本體

  • curr_enqueue : 目前用來 enqueue 的 spmc node
  • curr_dequeue : 目前用來 dequeue 的 spmc node
  • last_power : 當 enqueue 數量越來越多,建立新 node 時會將 last_power 加一,讓新 node 能夠容納更多的 element
  • destructor : 在 spmc_delete 時會用到

static void init_node()

初始化一個新的 spmc queue node
在建立 spmc 時用來建立 head
也會在 enqueue 發現前一個 node 空間不足時呼叫

spmc_ref_t spmc_new()

建立一個 spmc queue
建立 spmc_base 時也建立一個 spmc_node 作為這個 spmc queue 的開頭 head
head 會在 spmc_base 後一位的記憶體
curr_enqueuecurr_dequeue 初始為 head

void spmc_delete()

消除整個 spmc queue

bool spmc_enqueue()

spmc queue 的 enqueue 機制

先取得 curr_enqueue 後進行操作
若是 enqueue node 中 buffer 並沒有超過容量上限
則直接將 element 放入 buf[node->back] 並且將 node->back 增加一
但若是 buffer 滿了
先確認下一個 node (node->next) 是不是還有空間可以 enqueue (IS_WRITEABLE)
若是在確認 node_next 時遇到了 curr_dequeue ,為了不同時修改正在被讀的 node (curr_dequeue)
建立一個新的 node ,並且將 last_power 加一
建立完了之後將 element 放進新的 node 中 (buf[node->back])

bool spmc_dequeue()

spmc queue 的 dequeue 機制

先取得 curr_dequeue 後進行操作
和 enqueue 類似的判斷
先判斷此 node 的 buffer 中是否有 element 可以用來 dequeue (IS_READABLE)
若是有,則把 buf[node->front] 放入 *slot
若是沒有,處理手法和 enqueue 類似,原則是不去 read / write 同一個 node ,去 node->next 判斷 READABLE
最後使用 atomic_compare_exchange_weaknode->front 變更為 idx+1
若是此變更失敗,代表有別的 consumer 拿走了剛剛拿的 element
就再實行一次上述動作

static void *producer_thread()

producer task

enqueue N_ITEM 個 element 到 spmc 設定為 1024UL * 8

static void *mc_thread()

consumer task

在執行下會有三種錯誤訊息

    1. Falied to dequeue in mc_thread.
      代表 dequeue function 失敗
      但是在 dequeue function 沒有設定什麼條件會 return false
      因此發生此錯誤時是因為呼叫程式時記憶體不足
    1. consumed twice!
      代表同一個 element 被 consume 兩次
      在上方有建立一個 array observed_count
      用來監測 element 是否有被 consume
      而預期結果是每個 element 只會被 consume 一次
    1. "%zu after %zu; bad order!\n", (size_t) element, (size_t) greatest
      代表 element < greatest
      預期的 consume 順序會是從小到大,這樣即是 consumer 回頭拿到了早應該被 consume 的元素
      最後設定一個 sentinal signal
      當 consume 的 element 已經到了 (N_MC_ITEM - 1)
      將會 enqueue element + 1 用作通知下一個應該結束的 consumer

int main()

測試整個 spmc

先建立一個 spmc 一個 producer
再建立 N_MC_THREAD 個 consumer
所有 thread 結束後 join
檢驗 observe_count 是否每個 element 都被 consume

問體討論

spsc vs spmc

兩個最大的差別還是 single consumer 跟 multiple consumer

程式實作方面 ringbuffer 沒有用到多執行緒
ringbuffer 流程是 producer 先填滿 buffer
再換 consumer 拿空 buffer

enqueue

spsc 在 enqueue 時只有檢查 buffer size 是否還夠
spmc 則是會檢查 curr_enqueue 是否有和 curr_dequeue 重疊到,用意在於不會同時修改和讀取同一個 node

dequeue

和 enqueue 類似,不會同時修改和讀取同一個 node

memory order

在程式中有用到 atomic instruction
其中可以設定這些操作的 memory order 來避免 data race

relaxed

使用 relaxed 的話,這個指令不會和其他 atomic 指令有順序關係,因此在 load 和 store 到同一個記憶體位址可能會造成 data race,因此要避免這種情況

consume

load 使用的 order ,在 current thread 中 dependant on 此 atomic variable 的 讀寫不能被 reorder 到此 load 之前。
還有在別的 threads 中對此 atomic variable 的 release 在此 thread 變成 visible,不會被亂改到

aquire

也是 load 使用的 order ,跟 consume 的差別在不管什麼讀寫都不能被 reorder 到此 load 之前

release

store 用的 order ,通常跟 consume, release 一起使用
release 的 reorder 限制和 acquire 一樣是對所有讀寫

acq_rel

read-modify-write 用的 oreder
同時擁有 acquire 和 release 的特性

seq_cst

可以當成 load, store, read-modify-write 的指令
這個 order 可以用在 acquire, release, acq_rel
多加了一條特性是,在所有 thread 中看到的 seq_cst 的 order 都要是相同

thread sanitizer

使用以下方式編譯

gcc -fsanitize=thread -fPIE -pie -Wall -Wextra -o spmc spmc.c -lpthread

產生的問題數量不是定值,但是有在 Observed 63. 處一定會出現

==================
WARNING: ThreadSanitizer: data race (pid=108857)
  Atomic read of size 8 at 0x7b6400000008 by thread T2:
    #0 __tsan_atomic64_load <null> (libtsan.so.0+0x7cec7)
    #1 spmc_dequeue <null> (spmc+0x1de8)
    #2 mc_thread <null> (spmc+0x216c)
    #3 <null> <null> (libtsan.so.0+0x2d1af)

  Previous write of size 8 at 0x7b6400000008 by thread T1:
    #0 malloc <null> (libtsan.so.0+0x30343)
    #1 spmc_enqueue <null> (spmc+0x1be1)
    #2 producer_thread <null> (spmc+0x2097)
    #3 <null> <null> (libtsan.so.0+0x2d1af)

  Location is heap block of size 1056 at 0x7b6400000000 allocated by thread T1:
    #0 malloc <null> (libtsan.so.0+0x30343)
    #1 spmc_enqueue <null> (spmc+0x1be1)
    #2 producer_thread <null> (spmc+0x2097)
    #3 <null> <null> (libtsan.so.0+0x2d1af)

  Thread T2 (tid=108860, running) created by main thread at:
    #0 pthread_create <null> (libtsan.so.0+0x5ea99)
    #1 main <null> (spmc+0x23d9)

  Thread T1 (tid=108859, running) created by main thread at:
    #0 pthread_create <null> (libtsan.so.0+0x5ea99)
    #1 main <null> (spmc+0x239a)

SUMMARY: ThreadSanitizer: data race (/lib/x86_64-linux-gnu/libtsan.so.0+0x7cec7) in __tsan_atomic64_load
==================

這邊看到出問題的地方會是在第65個 observed
這邊剛好是預設的 cap 再多一個
也就是 producer 要建立一個新的 node 的時候

兩邊 thread 會 race 的地方
consumer :

    #0 __tsan_atomic64_load <null> (libtsan.so.0+0x7cec7)
    #1 spmc_dequeue <null> (spmc+0x1de8)

spmc_dequeue :

程式片段
/* Recieve (dequeue) an item from the SPMC */ bool spmc_dequeue(spmc_ref_t spmc, uintptr_t *slot) { spmc_node_t *node = atomic_load_explicit(&spmc->curr_dequeue, memory_order_consume); size_t idx; no_increment: do { idx = atomic_load_explicit(&node->front, memory_order_consume); if (!IS_READABLE(idx, node)) { if (node != spmc->curr_enqueue) atomic_compare_exchange_strong( &spmc->curr_dequeue, &node, atomic_load_explicit(&node->next, memory_order_relaxed)); goto no_increment; } else *slot = node->buf[INDEX_OF(idx, node)]; } while ( !atomic_compare_exchange_weak(&node->front, &(size_t){idx}, idx + 1)); return true; }

__tsan_atomic64_load 是在 14 行 load &node->next

producer :

    #0 malloc <null> (libtsan.so.0+0x30343)
    #1 spmc_enqueue <null> (spmc+0x1be1)

spmc_enqueue :

程式片段
/* Send (enqueue) an item onto the SPMC */ bool spmc_enqueue(spmc_ref_t spmc, uintptr_t element) { spmc_node_t *node = atomic_load_explicit(&spmc->curr_enqueue, memory_order_relaxed); size_t idx; retry: idx = atomic_load_explicit(&node->back, memory_order_consume); if (!IS_WRITABLE(idx, node)) { spmc_node_t *const next = atomic_load_explicit(&node->next, memory_order_relaxed); /* Never move to write on top of the node that is currently being read; * In that case, items would be read out of order they were enqueued. */ if (next != atomic_load_explicit(&spmc->curr_dequeue, memory_order_relaxed)) { node = next; goto retry; } const uint8_t power = ++spmc->last_power; assert(power < sizeof(size_t) * CHAR_BIT); const size_t cap = 1 << power; spmc_node_t *new_node = malloc(SIZE_FROM_CAP(cap, sizeof(spmc_node_t))); if (!new_node) return false; init_node(new_node, next, cap); atomic_store_explicit(&node->next, new_node, memory_order_release); idx = 0; node = new_node; } node->buf[INDEX_OF(idx, node)] = element; atomic_store_explicit(&spmc->curr_enqueue, node, memory_order_relaxed); atomic_fetch_add_explicit(&node->back, 1, memory_order_release); return true; }

malloc 在 24 行
不過這裡指的是上次 write 的地方

真正會發生的跟 consumer 存取到同一個地址的地方是下方
29 行 store new_node&node->next
其中 node->next 就是 data race 的中心
這邊 store 用的 memory order 是 release
而在 consumer 中 load 部份的 memory order 是 relaxed

這樣子這兩條指令因為 relaxed 的關係,所以不會有 order constraint ,就可能發生 data race

於是將 spmc_dequeue 14 行的 memory order 改成 consume
這樣就可以解決 Observed 64 處的 data race