# Concurrency Study -- SPMC
參考:
1. [spmc](https://github.com/sysprog21/concurrent-programs/blob/master/spmc/spmc.c): A concurrent single-producer/multiple-consumer queue.
## introduction
參考的範例[[1](https://github.com/sysprog21/concurrent-programs/blob/master/spmc/spmc.c)]是實作一個queue,因為是queue所以必須符合==Fisrt In First Out== 的概念。只有一個producer thread,然後有N個comsumer thread。因為只有一個producer所以一定可以保證我們的測試data一定是依序寫入。==這邊的重點是N個consumer thread如何可以保證依序讀出資料。== 還有producer thread和consumer thread會不會互相影響。
測試資料為0~N_ITEMS - 1的連續數值,這樣我們就可以驗證是否也是連續讀出相對資料。
```c=
#define N_ITEMS (1024UL * 8)
static void *producer_thread(void *arg)
{
spmc_ref_t spmc = arg;
for (uintptr_t i = 0; i < N_ITEMS; ++i) {
if (!spmc_enqueue(spmc, i)) // 推入data i(0 ~ N_ITEMS - 1)
fprintf(stderr, "Failed to enqueue on %zu.\n", (size_t) i);
}
return NULL;
}
```
## 資料結構
smpc_base只要是來紀錄目前enqueue/dequeue所指向的queue。
__spmc_node為主要的queue結構。
```c=
typedef struct __spmc_node {
size_t cap; /* One more than the number of slots available */
_Atomic size_t front, back;
struct __spmc_node *_Atomic next;
uintptr_t buf[];
} spmc_node_t;
typedef void (*spmc_destructor_t)(uintptr_t);
struct spmc_base {
/* current node which enqueues/dequeues */
spmc_node_t *_Atomic curr_enqueue, *_Atomic curr_dequeue;
uint8_t last_power;
spmc_destructor_t destructor;
};
typedef struct spmc_base *spmc_ref_t;
```

## spmc_new
主要的initial function。在操作之前把queue準備好。
這邊會同時allocate一個連續的空間包括spmc_base和__spmc_node。
```c=
#define HEAD_OF(spmc) ((spmc_node_t *) (void *) ((spmc_ref_t)(spmc) + 1))
spmc_node_t *const head = HEAD_OF(spmc);
```
==找出spmc_node_t的方法就是把spmc向下指到下一個。==

## 執行序的分類
1. **main thread** : 負責產生producer thread和consumer thread然後就等這些thread執行完畢。
2. **producer thread** : 一個,負責把資料推進queue中。
3. **consumer thread** : N個,負責把資料拿出queue。
其中,
producer thread 會==讀寫==的global variable:
+ spmc->curr_enqueue(目前要把資料推進去的queue)
+ spmc->curr_enqueue->back(欲寫入的位置)
+ spmc->curr_enqueue->next(如果buf滿了,產生下一個queue)
+ spmc->curr_enqueue->buf[]\(資料存放的地方)
只會讀取的global variable:
+ spmc->curr_dequeue(判斷是否同一個queue)
+ spmc->curr_enqueue->front(判斷是否已經滿了)
+ spmc->curr_enqueue->cap
comsumer thread會==讀寫==的global variable:
+ spmc->curr_dequeue(目前要把資料讀出去的queue)
+ spmc->curr_dequeue->front(欲讀出的位置)
只會讀取的global variable
+ spmc->curr_dequeue->next(沒資料可以讀取,切換到下一個queue)
+ spmc->curr_dequeue->buf[]\(讀取的資料)
+ spmc->curr_enqueue(判斷是否為最後一個queue, 不是就可以切換過去)
## spmc_enqueue
使用兩個idx,分別代表enqueue和dequeue的位置。
back : enqueue的位置。
front : dequeue的位置。

另外他使用了和[ringbuffer](/PcDLBa9kTluu9yXnF4SbgQ)一樣的概念。使用size_t來記錄front和back。
所以判斷有沒有資料,或是queue已經滿了可以用下面的方法。方便理解我修正了原始的程式碼如下:
==back - front 為已寫入的資料長度。==
```c=
#define IS_READABLE(back, front) (back - front != 0)
#define IS_WRITABLE(back, front) (back - front < (node)->cap)
```
既然使用size_t來記錄,讀取和寫入時必須使用module來取得正確的index。因為cap為2的倍數所以可以用以下的方法取得。
index = back % (cap - 1);
程式碼如下:
```c=
#define MODULO(lhs, rhs) ((lhs) & (rhs - 1)) /* Requires rhs is power of 2 */
#define INDEX_OF(idx, node) (MODULO((idx), (node)->cap))
node->buf[INDEX_OF(back, node)] = element;
```
所以他會像ringbuffer一樣循環寫入,直到queue滿了為止。如果queue滿了,他就會建立一個新的queue必且大小為原來的兩倍。

因為只有一個producer thread所以相對單純。不需要考慮兩個producer thread間的互相影響,只需考慮和consumer thread的影響。
> 1. 讀出curr_enqueue
> 2. 讀出back
> 3. 判斷queue是否可以寫入(滿了?),滿了建立一個新的queue
> 4. 寫入資料
> 5. 更新curr_enqueue和back
```c=
bool spmc_enqueue(spmc_ref_t spmc, uintptr_t element)
{
spmc_node_t *node =
atomic_load_explicit(&spmc->curr_enqueue, memory_order_relaxed);
size_t back;
retry:
back = atomic_load_explicit(&node->back, memory_order_consume);
if (!IS_WRITABLE(back, node)) {
// queue is full
...
}
node->buf[INDEX_OF(back, node)] = element;
atomic_store_explicit(&spmc->curr_enqueue, node, memory_order_relaxed);
atomic_fetch_add_explicit(&node->back, 1, memory_order_release);
return true;
}
```
## spmc_dequeue
讀取資料的基本架構,先讀取目前要讀取的queue,然後再讀取接下來要dequeue的位置,==把資料讀出之後判斷是否為原本的front。==
```c=
spmc_node_t *node =
atomic_load_explicit(&spmc->curr_dequeue, memory_order_consume);
size_t front;
no_increment:
do {
front = atomic_load_explicit(&node->front, memory_order_consume);
if (!IS_READABLE(front, node)) {
if (node != spmc->curr_enqueue)
atomic_compare_exchange_strong(
&spmc->curr_dequeue, &node,
atomic_load_explicit(&node->next, memory_order_relaxed));
goto no_increment;
} else
*slot = node->buf[INDEX_OF(front, node)];
} while (
!atomic_compare_exchange_weak(&node->front, &(size_t){front}, front + 1));
```
這邊會存取兩個gloobal variable:
spmc->curr_dequeue和
spmc->curr_dequeue->front。
==如果其他mc_thread修改了front,因為被讀走資料了。則CAS就不會成立,目前的thread就會reload front然後繼續讀下一個。==
==如果其他mc_thread修改了curr_enqueue, 因為這個queue已經沒資料可以讀了。回到原本的thread之後因為front被改變了所以CAS不成立,然後因為沒資料可以讀取,所以也會切到下一個queue。==
其中這邊是判斷queue有沒有資料可以讀取。如果沒有且不是最後一個queue,則切到下一個queue。
```c=
if (!IS_READABLE(front, node)) { //不能讀取,就是queue空了
if (node != spmc->curr_enqueue) //不是最後一個queue
atomic_compare_exchange_strong(
&spmc->curr_dequeue, &node, // 是否已被切換過?
atomic_load_explicit(&node->next, memory_order_relaxed)); //切換到一下個
goto no_increment; // 重跑CAS
}
```
這邊if loop使用strong function避免判斷錯誤,而while loop使用weak function即使判斷錯誤還是有機會重來。
## 測試
判斷queue是否運作正確。
> 1. 是否同一個數值被讀出兩次。
> 2. 因為是queue使否有依序被讀出。
```c=
for (;;) {
// 把最大的數值存起來
greatest = (greatest > element) ? greatest : element;
if (!spmc_dequeue(spmc, &element))
fprintf(stderr, "Failed to dequeue in mc_thread.\n");
// 重複consume
else if (observed_count[element]++)
fprintf(stderr, "Consumed twice!\n");
// 順序不對,因為queue會依序讀出
else if (element < greatest)
fprintf(stderr, "%zu after %zu; bad order!\n", (size_t) element,
(size_t) greatest);
//printf("Observed %zu.\n", (size_t) element);
/* Test for sentinel signalling termination */
//收到最後一個資料後,把element + 1推進去讓下一個thread也中斷迴圈。
if (element >= (N_MC_ITEMS - 1)) {
spmc_enqueue(spmc, element + 1); /* notify other threads */
break;
}
}
```
## 自我檢驗
### 1. 不同thread間存取的相同資源是什麼??
producer thread和consumer thread間會共同存取curr_enqueue, curr_dequeue, front, back, next這些指標。還有buf[]。
producer thread之間會共同存取curr_enqueue, curr_dequeue, front, back, next, buf[]這些資料。
### 2. 有沒有ABA的問題?
這個例子沒有,因為沒有真的allocate/release記憶體空間。==ABA的問題發生在CAS的時候obj或expect被換成不同的值,導致結果錯誤。== 範例中,只有兩個地方有CAS。
```c=149
if (node != spmc->curr_enqueue)
atomic_compare_exchange_strong(
&spmc->curr_dequeue, &node,
atomic_load_explicit(&node->next, memory_order_relaxed));
goto no_increment;
```
其中的expect為node是local variable不可能被其他thread換掉。obj為curr_dequeue可能會被其他thread改掉,但是只會改成下一個queue會不出現A->B->A的狀況。
```c=
do {
idx = atomic_load_explicit(&node->front, memory_order_consume);
// 省略....
*slot = node->buf[INDEX_OF(idx, node)];
} while (
!atomic_compare_exchange_weak(&node->front, &(size_t){idx}, idx + 1));
```
另一個CAS的obj為node->front,會被不同的thread更改,但是這個值只會一直往上累加直到overflow。因為front為size_t除非producer thread很多在目前的thread被切走之後==A(目前的thread)->B(其他thread)->A(其他thread),thread的數量會很大幾乎不可能產生這麼多的thread。==
### 3. 資料間有沒有false sharing問題?(performance issue)
> **On a 32-bit system size_t will take 32 bits, on a 64-bit one 64 bits.** In other words, a variable of size_t type can safely store a pointer.
front, back使用size_t會根據32bits系統還是64bits系統長度分別為4bytes和8bytes,
>To maximize the performance of the FairCom Server under multi-CPU systems, ensure the cache-line setting matches the setting for your equipment. A cache-line is the smallest amount of memory a processor will retrieve and store in its highest speed cache. Using an appropriate CACHE_LINE setting helps reduce false sharing in CPU caches. **Typical cache-line sizes are 32, 64, or 128 bytes.**
因為cacheline最少為32bytes,所以最好使用alignas來對資料做排列。如果不知道要對齊多少就使用128比較保險。但是缺點是data會變得很大。所以最好用在不常allocate的資料結構。
### 4. 單一資料有沒有跨過兩個cahce line?
從宣告的struct來看,因為spmc_base大小應該在32byts以下,所以不會有跨cacheline的問題。spmc_node都是size_t和pointer不考慮buf[]其他所佔的大小剛好是cacheline的一半,所以不會有一個變數跨過cacheline。
### 5. 共同變數上使用volatile來防止compiler優化。
目前的例子裏面**沒有使用volatile來保護被thread間共同存取的變數**﹑所以可以加上來改進。
###### tags: `linux2021`