# 2021q1 quiz10 contributed by < [Julian-Chu](https://github.com/Julian-Chu) > ###### tags: `linux2021` > [第 10 週測驗題](https://hackmd.io/@sysprog/linux2021-quiz10) > [GitHub](https://github.com/Julian-Chu/linux2021-quizzes/tree/main/quiz10-golang_channel_in_c) 設計上區分成兩種 channel - unbuffered channel - buffered channel ## unbuffed channel 利用 futex lock 加上 enum 狀態來操作 chan 的 blocking ```cpp enum { CHAN_READY = 0, CHAN_NOT_READY = 1, CHAN_WAITING = 2, CHAN_CLOSED = 3, }; ``` ## buffed channel ### Data Structure ```cpp struct chan_item { _Atomic uint32_t lap; void *data; }; struct chan { _Atomic bool closed; /* For buffered channels, these futexes represent credits for a reader or * write to retry receiving or sending. */ _Atomic uint32_t send_ftx, recv_ftx; /* Buffered channels only: number of waiting threads on the futexes. */ _Atomic size_t send_waiters, recv_waiters; /* Ring buffer */ size_t cap; _Atomic uint64_t head, tail; struct chan_item ring[0]; }; ``` ### __sync 與 __atomic 在 CAS 行爲上的不同 注意 `__atomic_compare_exchange` 在回傳 false 的情況, 會將舊值放入比較值 :::spoiler `bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)` >type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...) > These builtins perform an atomic compare and swap. That is, if the current value of *ptr is oldval, then write newval into *ptr. > > The “bool” version returns true if the comparison is successful and newval was written. The “val” version returns the contents of *ptr before the operation. ::: :::spoiler `bool __atomic_compare_exchange_n (type *ptr, type *expected, type desired, bool weak, int success_memorder, int failure_memorder)` > This built-in function implements an atomic compare and exchange operation. This compares the contents of *ptr with the contents of *expected. If equal, the operation is a read-modify-write operation that writes desired into *ptr. ==If they are not equal, the operation is a read and the current contents of *ptr are written into *expected.== weak is true for weak compare_exchange, which may fail spuriously, and false for the strong variation, which never fails spuriously. Many targets only offer the strong variation and ignore the parameter. When in doubt, use the strong variation. > > If desired is written into *ptr then true is returned and memory is affected according to the memory order specified by success_memorder. There are no restrictions on what memory order can be used here. > > Otherwise, false is returned and memory is affected according to failure_memorder. This memory order cannot be __ATOMIC_RELEASE nor __ATOMIC_ACQ_REL. It also cannot be a stronger order than that specified by success_memorder. ::: ### send to and recv from chan ```cpp // 負責 send data to chan 的 blocking/waiting static int chan_send_buf(struct chan *ch, void *data) { while (chan_trysend_buf(ch, data) == -1) { // 非 EAGAIN, 可能爲 closed channel 或是其他錯誤, 直接返回 if (errno != EAGAIN) return -1; uint32_t v = 1; // 檢查 channel 的 send futex 的狀態, 如果 send_ftx 爲 0, 則block 當前的 thread , 且將 blocked thread 計數加一 while (!atomic_compare_exchange_weak_explicit(&ch->send_ftx, &v, v - 1, memory_order_acq_rel, memory_order_acquire)) { if (v == 0) { atomic_fetch_add_explicit(&ch->send_waiters, 1, memory_order_acq_rel); // if send_ftx equals 0, block thread futex_wait(&ch->send_ftx, 0); atomic_fetch_sub_explicit(&ch->send_waiters, 1, memory_order_acq_rel); v = 1; } } } atomic_fetch_add_explicit(&ch->recv_ftx, 1, memory_order_acq_rel); // 如果有等待中的 recv, 隨機喚醒其一 if (atomic_load_explicit(&ch->recv_waiters, memory_order_relaxed) > 0) futex_wake(&ch->recv_ftx, 1); return 0; } // 負責 receive data from chan 的 blocking/waiting static int chan_recv_buf(struct chan *ch, void **data) { while (chan_tryrecv_buf(ch, data) == -1) { // 非 EAGAIN, 可能爲 closed channel 或是其他錯誤, 直接返回 if (errno != EAGAIN) return -1; uint32_t v = 1; // 檢查 channel 的 recv futex 的狀態, 如果 recv_ftx 爲 0, 則block 當前的 thread , 且將 blocked thread 計數加一 while (!atomic_compare_exchange_weak_explicit(&ch->recv_ftx, &v, v - 1, memory_order_acq_rel, memory_order_acquire)) { if (v == 0) { atomic_fetch_add_explicit(&ch->recv_waiters, 1, memory_order_acq_rel); // if recv_ftx equals 0, block thread futex_wait(&ch->recv_ftx, 0); atomic_fetch_sub_explicit(&ch->recv_waiters, 1, memory_order_acq_rel); v = 1; } } } atomic_fetch_add_explicit(&ch->send_ftx, 1, memory_order_acq_rel); // 如果有等待中的 send, 隨機喚醒其一 if (atomic_load_explicit(&ch->send_waiters, memory_order_relaxed) > 0) futex_wake(&ch->send_ftx, 1); return 0; } ``` ### chan_items 操作 利用 ring buffer + atomic 避免 mutex lock 的使用 ```cpp /* * 嘗試對 channel 內部的 ring buffer 發送資料, * 檢查 ring buffer 的根據 tail , 檢查目前的 tail 指向 item 是否爲等待被外部發送過來資料或是等待被外部來收取資料的狀態 */ static int chan_trysend_buf(struct chan *ch, void *data) { // 檢查 channel 是否被關閉 if (atomic_load_explicit(&ch->closed, memory_order_relaxed)) { errno = EPIPE; return -1; } uint64_t tail, new_tail; struct chan_item *item; uint32_t lap; do { tail = atomic_load_explicit(&ch->tail, memory_order_acquire); uint32_t pos = tail, lap = tail >> 32; item = ch->ring + pos; // 目前 item 是等待被外部拿取的狀態, 無法往內部送資料, 利用 EAGAIN 通知外層的 chan_send_buf 稍後重試 if (atomic_load_explicit(&item->lap, memory_order_acquire) != lap) { errno = EAGAIN; return -1; } // ring buffer 已滿, 移到下一輪, 還有空間則移到下一位 if (pos + 1 == ch->cap) new_tail = (uint64_t)(lap + 2) << 32; else new_tail = tail + 1; // 確認 tail 在上述流程後沒有被其他 thread 修改過 } while (!atomic_compare_exchange_weak_explicit(&ch->tail, &tail, new_tail, memory_order_acq_rel, memory_order_acquire)); item->data = data; // 資料放入 item 成功, 將 item 標記爲待外部拿取資料的狀態(waiting for recv) atomic_fetch_add_explicit(&item->lap, 1, memory_order_release); return 0; } /* * 嘗試從 channel 內部的 ring buffer 取得資料, * 檢查 ring buffer 的根據 head , 檢查目前的 head 指向 item 是否爲等待被外部發送過來資料或是等待被外部來收取資料的狀態 */ static int chan_tryrecv_buf(struct chan *ch, void **data) { if (atomic_load_explicit(&ch->closed, memory_order_relaxed)) { errno = EPIPE; return -1; } uint64_t head, new_head; struct chan_item *item; do { head = atomic_load_explicit(&ch->head, memory_order_acquire); uint32_t pos = head, lap = head >> 32; item = ch->ring + pos; // 目前 item 是等待外部資料傳入的狀態, 無法從內部取資料, 利用 EAGAIN 通知外層的 chan_recv_buf 稍後重試 if (atomic_load_explicit(&item->lap, memory_order_acquire) != lap) { errno = EAGAIN; return -1; } // ring buffer 已滿, 移到下一輪起點, 還有空間則移到下一位 if (pos + 1 == ch->cap) /* new_head = (uint64_t)(XXX) << 32; */ new_head = (uint64_t)(lap+2) << 32; else new_head = head + 1; // 確認 head 在上述流程後沒有被其他 thread 修改過 } while (!atomic_compare_exchange_weak_explicit(&ch->head, &head, new_head, memory_order_acq_rel, memory_order_acquire)); *data = item->data; // 從 item 拿取資料成功, 將 item 標記爲待外部傳入資料的狀態(waiting for send) atomic_fetch_add_explicit(&item->lap, 1, memory_order_release); return 0; } ``` ### head 與 tail, 在設計上的巧思 head 跟 tail 高位 32 bits 紀錄 lap, 低位 32 bits 紀錄 pos, 由於 head 跟 tail 爲 uint64 型別, 可以適用 atomic operation ``` graphviz digraph structs { rankdir=LR node[shape=record] tail [label="tail|{<pos>lap|{<item>pos}}}"] head [label="head|{<pos>lap|{<item>pos}}}"] } ``` lap 標示 item 接收資料或等候發送的狀態: 可以從外部送入資料(send data to chan): `2n` 外部可以拿取資料(receive data from chan): `2n + 1` 利用這個設計, 剛好 `lap + 1` 後就可以在兩種狀態之間轉換 ### 示意流程 ![](https://i.imgur.com/8qMASFr.png) #### init: head lap 初始化爲 1, tail lap 爲 0 ``` graphviz digraph structs { rankdir=TB node[shape=record] ring [label="ring|{<pos>pos|{<item>item}}|{<p0>0|{<i0>lap:0 data}}|{<p1>1|{<i1>lap:0 data}}|{<p2>2|{<i2>lap:0 data}}|{<p3>3|{<i3>lap:0 data}}"] head[label="{head|{lap:1|pos:0}}"] tail[label="{tail|{lap:0|pos:0}}"] head->ring:p0 tail->ring:p0 } ``` ![](https://i.imgur.com/QcKPabF.png) #### first send: item lap 匹配 tail lap 可以被寫入, 寫入 data 且把 item 的 lap + 1 匹配 head tail, 把 tail pos 移到下一個 item ```graphviz digraph structs { rankdir=TB node[shape=record] ring [label="ring|{<pos>pos|{<item>item}}|{<p0>0|{<i0>lap:1 data}}|{<p1>1|{<i1>lap:0 data}}|{<p2>2|{<i2>lap:0 data}}|{<p3>3|{<i3>lap:0 data}}"] head[label="{head|{lap:1|pos:0}}"] tail[label="{tail|{lap:0|pos:0}}"] head->ring:p0 tail->ring:p1 } ``` ![](https://i.imgur.com/oh4qQLV.png) #### send until full ```graphviz digraph structs { rankdir=TB node[shape=record] ring [label="ring|{<pos>pos|{<item>item}}|{<p0>0|{<i0>lap:1 data}}|{<p1>1|{<i1>lap:1 data}}|{<p2>2|{<i2>lap:1 data}}|{<p3>3|{<i3>lap:1 data}}"] head[label="{head|{lap:1|pos:0}}"] tail[label="{tail|{lap:2|pos:0}}"] head->ring:p0 tail->ring:p0 } ``` 對已滿的 channel 持續 send data, 因為 item lap 不跟 tail 的 lap 相符, 判斷目前 item 是處於等待外部從 chan 接收資料的狀態, 暫時 block ![](https://i.imgur.com/cLMdS1I.png) #### receive from chan ```graphviz digraph structs { rankdir=TB node[shape=record] ring [label="ring|{<pos>pos|{<item>item}}|{<p0>0|{<i0>lap:2 data}}|{<p1>1|{<i1>lap:1 data}}|{<p2>2|{<i2>lap:1 data}}|{<p3>3|{<i3>lap:1 data}}"] head[label="{head|{lap:1|pos:0}}"] tail[label="{tail|{lap:2|pos:0}}"] head->ring:p1 tail->ring:p0 } ``` ![](https://i.imgur.com/wHpRPjr.png) ### 探討 lap 設計 是否可以利用其他方案取代 lap,需要滿足下面三個需求: - lock free in ring buffer: 不考慮 mutex - 需要區分 sendable 跟 receivable: 可以利用 boolean 或是 bitfield 取代來判斷 - 避免 ABA 問題: 利用 bool 或是 bit field 會在 CAS loop 造成 ABA 問題誤判。 ring buffer 下用以比較 tail 或是 head 的條件有 bool + pos, 相同 bool(同樣都是 sendable 或是 recvable) 比較 tail 或是 head 是否有被變更的的情況只有 pos 條件, 在 mpmc 的情況下會遇到 ABA 問題,以下圖為例, 在 lap 設計下不同 producer (lap1, lap2) 讀取同 pos 在 CAS loop 判斷是不相等, 但使用 boolean 的情況,當 producer1 在 CAS loop 中, producer2 更動了 item, 但 producer1 在 CAS loop 終止條件會判別成同一個 tail/head, 造成重複寫入讀出 ```graphviz digraph structs { rankdir=TB node[shape=record] ring [label="ring|{<pos>pos|{<item>item}}|{<p0>0|{<i0>data}}|{<p1>1|{<i1>data}}|{<p2>2|{<i2>data}}|{<p3>3|{<i3>data}}"] tail1[label="{tail|{{lap:1|sendable: true}|pos:1}}"] tail2[label="{tail|{{lap:2|sendable:true}|pos:1}}"] producer1[label="producer1"] producer2[label="producer2"] producer1->tail1 tail1->ring:p1 producer2->tail2 tail2->ring:p1 } ``` > [第 9 週測驗題](https://hackmd.io/@sysprog/linux2021-quiz9)的 MPMC 明確區隔 reader/writer,從而避免上述問題,不過缺點就是同步物件更多且會有潛在的競爭。 > :notes: jserv