# Concurrency Study -- MPSC
參考
1. [mpsc](https://github.com/sysprog21/concurrent-programs/blob/master/mpsc/mpsc.c) : An unbounded lockless single-consumer/multiple-producer FIFO queue.
## Introduce
參考的範例[[1](https://github.com/sysprog21/concurrent-programs/blob/master/mpsc/mpsc.c)]為 FIFO queue,所以建立出來的行為必須符合First IN First Out。這邊的範例是1個consumer thread然後搭配N個producer thread。==所以重點是N個producer thread如何不互相影響的把資料依序寫入。==
使用下列的方法把資料讀出,因為只有一個consumer thread,我覺得應該不需要使用atomic_load和atomic_fetch_add。
```c=
static void *test_consumer(void *arg)
{
queue_test_t *test = (queue_test_t *) arg;
while (atomic_load(&test->consume_count) < THREAD_COUNT) {
if (Queue.hasFront(test->q)) {
atomic_fetch_add(&test->consume_count, 1);
queue_result_t result = Queue.pop(test->q);
assert(result == QUEUE_SUCCESS);
}
}
return NULL;
}
```
另外會不會有Queue.hasFront成立,但是進到function內卻沒有data的情況??答案是不會。因為只有一個consumer thread,所以data只會更多而已,不影響後續的操作。
使用下列測試程式把資料寫入。因為有N個thread來寫入資料,所以只要判斷in > THREAD_COUNT就可以結束。每個thread取到的in都不一樣,然後把資料推入Queue。==但是資料不會依照順序寫入。因為取到in之後,可能被其他thread打斷,較大的in會先被寫入。==
```c=
static void *test_producer(void *arg)
{
queue_test_t *test = (queue_test_t *) arg;
assert(test->q);
while (1) {
int in = atomic_fetch_add(&test->producer_count, 1); // 保證每個thread都可以看到不同的值
if (in >= THREAD_COUNT)
break;
queue_result_t result = Queue.push(test->q, &in);
assert(result == QUEUE_SUCCESS);
}
return NULL;
}
```
## 資料結構
這邊有兩個重要的資料結構。
+ struct queue_internal 為主要結構,目的是在指enqueue/dequeue的位置。
+ struct node 為queue中的一個node。
```c=
typedef struct node {
atomic_uintptr_t next;
} node;
struct queue_internal {
atomic_uintptr_t head, tail;
size_t item_size;
};
```

另外這邊使用struct來把全部的function統整起來。是一種類似kernel的做法。好處是==可以有統一的介面==,裡面的實做可以不一樣。
```c=
typedef enum {
QUEUE_FALSE,
QUEUE_SUCCESS,
QUEUE_TRUE,
QUEUE_OUT_OF_MEMORY = -1,
} queue_result_t;
struct __QUEUE_API__ {
queue_p (*create)(size_t size);
queue_result_t (*push)(queue_p, void *data);
queue_result_t (*hasFront)(queue_p);
queue_result_t (*front)(queue_p, void *data);
queue_result_t (*pop)(queue_p);
queue_result_t (*clear)(queue_p);
queue_result_t (*destroy)(queue_p);
} Queue;
/* API gateway */
struct __QUEUE_API__ Queue = {
.create = queue_create,
.hasFront = queue_has_front,
.front = queue_front,
.pop = queue_pop,
.push = queue_push,
.clear = queue_clear,
.destroy = queue_destroy,
};
```
## queue_create()
創建queue所以基本的結構。
```c=
static queue_p queue_create(size_t item_size)
{
size_t *ptr = calloc(sizeof(struct queue_internal) + alignment, 1); // calloc(n, size) is good expression
assert(ptr);
ptr[0] = sentinel;
queue_p q = (queue_p)(ptr + 1);
atomic_init(&q->head, 0);
atomic_init(&q->tail, 0);
q->item_size = item_size; //4(32bits) or 8(64bits)
return q;
}
```
其中多allocate了一個alignment的大小是為了放sentinel。
再destory queue的時候,檢查是否有存取超過sentinel的問題。
```c=
static queue_result_t queue_destroy(queue_p q)
{
size_t *ptr = (size_t *) q - 1;
assert(ptr[0] == sentinel);
free(ptr);
return QUEUE_SUCCESS;
}
```
因為多了一個sentinel所以取得queue為
```c
queue_p q = (queue_p)(ptr + 1);
```
最後item_size為每個node除了,指向下一個node的pointer之外,存放資料的大小。

## queue_push()
N個producer thread會執行這個function。依序把data放進去。這邊的步驟是
1. 準備新的node(create the new tail)
2. q->tail指向新的node(swap the new tail with the old)
3. old_tail->next指向新的node(link the old tail to the new)
>每個thread分別執行步驟1準備自己的new node
>==同時只有一個thread會執行到步驟2==,因為他是atomic操作。所以可以確定new node會被接到q->tail之後。
>因為old_tail為local variable所以執行完步驟2之後,再把old_tail->next連結到new node。
```c=
static queue_result_t queue_push(queue_p q, void *data)
{
assert(q);
/* 1. create the new tail */
node *new_tail = malloc(sizeof(node) + q->item_size);
if (!new_tail)
return QUEUE_OUT_OF_MEMORY;
atomic_init(&new_tail->next, 0);
memcpy(new_tail + 1, data, q->item_size);
/* 2. swap the new tail with the old */
node *old_tail =
(node *) atomic_exchange(&q->tail, (atomic_uintptr_t) new_tail); // 使用exchange來取得舊的tail
/* 3. link the old tail to the new */
atomic_store(old_tail ? &old_tail->next : &q->head,
(atomic_uintptr_t) new_tail);
return QUEUE_SUCCESS;
}
```

```c=
atomic_store(old_tail ? &old_tail->next : &q->head,
(atomic_uintptr_t) new_tail);
```
這邊有兩種情況,
1. queue沒有資料。則old_tail == NULL,只需要把q->head連結到new node。
2. queue已經有資料。則old_tail != NULL,只需要old_tail->next = new node。

:::warning
但是這樣的設計不太好,因為q->head同時也被consumer thread存取,最好的設計是盡量做到資源切割,所以這邊可以使用dummy node來改善存取q->head的問題。因為有了dummy node就不會case1(queue為空)的狀況。
:::
## queue_pop()
只有一個consumer thread會呼叫這個function,須注意producer thread和consumer thread會共同存取的變數,q->head和q->tail。
```c=
static queue_result_t queue_pop(queue_p q)
{
assert(q);
assert(Queue.hasFront(q) == QUEUE_TRUE);
/* get the head */
node *popped = (node *) atomic_load(&q->head);
node *compare = popped;
/* set the tail and head to nothing if they are the same */
if (atomic_compare_exchange_strong(&q->tail, &compare, 0)) {
compare = popped;
/* It is possible for another thread to have pushed after
* we swap out the tail. In this case, the head will be different
* then what was popped, so we just do a blind exchange regardless
* of the result.
*/
atomic_compare_exchange_strong(&q->head, &compare, 0);
} else {
/* tail is different from head, set the head to the next value */
node *new_head = 0;
while (!new_head) {
/* It is possible that the next node has not been assigned yet,
* so just spin until the pushing thread stores the value.
*/
new_head = (node *) atomic_load(&popped->next);
}
atomic_store(&q->head, (atomic_uintptr_t) new_head);
}
free(popped);
return QUEUE_SUCCESS;
}
```
這邊也有兩種情況
1. pop後queue為空,
2. pop後queue不為空,
### case1. pop後queue為空
如果只剩一個資料,則下面就會成立。因為q->head和q->tail指向同一個node。因為是最後一個資料pop後head和tail都會指向NULL。
```c=
node *popped = (node *) atomic_load(&q->head);
node *compare = popped;
atomic_compare_exchange_strong(&q->tail, &compare, 0)
```

==這邊程式碼的註解提到,如果這個時候又有新的資料被push進來,則tail和head就會指向新的node,因為tail為NULL。==


==這時候因為有compare_exchange所以q->head,不會改變。==
```c=
atomic_compare_exchange_strong(&q->head, &compare, 0);
```
### case2. pop後queue不為空
就是queue中有兩筆以上的資料,因為consumer thread只有一個,所以資料只會更多,不會更少。只需要把head往後移動即可。

程式碼註解提到,為了避免剛好要pop的下一個節點還沒指定上去,所以必須busy wait直到下一個節點被接上去為止。
```c=
node *new_head = 0;
while (!new_head) {
/* It is possible that the next node has not been assigned yet,
* so just spin until the pushing thread stores the value.
*/
new_head = (node *) atomic_load(&popped->next);
}
```

:::warning
同理使用dummy node來改善存取q->tail的問題。因為有了dummy node就不會case1(pop完queue為空)的狀況。都可以視為同一種狀況處理。
:::
## 自我檢查
### 不同thread間存取的相同資源是什麼??
有三個不同種類的thread,main thread,producer thread和consumer thread。main thread建立起producer和consumer之後就進入等待狀態。
存取的共同資源
producer thread : q->head, q->tail。
consumer thread : q->head, q->tail。
如果使用dummy node應該可以減少存取的共同資源。
### 有沒有ABA的問題?
沒有。
即使使用了malloc和free。但是因為consumer thread只有一個,原本指到A node不會因為其他consumer thread被free掉,然後又被producer thread malloc回來。
### 盡量做到資源切割,或是使用Thread local storage。
可以使用dummy node來改善共同存取資源的問題。
### 資料間有沒有false sharing問題??
共同存取的資料為q->head和q->tail。
```c=
struct queue_internal {
atomic_uintptr_t head, tail;
size_t item_size;
};
```
其中atomic_uintptr_t在32bit和64bit系統間分別為32bits和64bits,剛好等於cache line的大小。
所以不會有false sharing的問題。
### 單一資料有沒有跨過兩個cahce line?
沒有。
### 共同變數上使用volatile來防止compiler優化。
沒使用volatile來保護thread間共同存取的變數。
###### tags: `linux2021`