# Concurrency Study -- MPSC 參考 1. [mpsc](https://github.com/sysprog21/concurrent-programs/blob/master/mpsc/mpsc.c) : An unbounded lockless single-consumer/multiple-producer FIFO queue. ## Introduce 參考的範例[[1](https://github.com/sysprog21/concurrent-programs/blob/master/mpsc/mpsc.c)]為 FIFO queue,所以建立出來的行為必須符合First IN First Out。這邊的範例是1個consumer thread然後搭配N個producer thread。==所以重點是N個producer thread如何不互相影響的把資料依序寫入。== 使用下列的方法把資料讀出,因為只有一個consumer thread,我覺得應該不需要使用atomic_load和atomic_fetch_add。 ```c= static void *test_consumer(void *arg) { queue_test_t *test = (queue_test_t *) arg; while (atomic_load(&test->consume_count) < THREAD_COUNT) { if (Queue.hasFront(test->q)) { atomic_fetch_add(&test->consume_count, 1); queue_result_t result = Queue.pop(test->q); assert(result == QUEUE_SUCCESS); } } return NULL; } ``` 另外會不會有Queue.hasFront成立,但是進到function內卻沒有data的情況??答案是不會。因為只有一個consumer thread,所以data只會更多而已,不影響後續的操作。 使用下列測試程式把資料寫入。因為有N個thread來寫入資料,所以只要判斷in > THREAD_COUNT就可以結束。每個thread取到的in都不一樣,然後把資料推入Queue。==但是資料不會依照順序寫入。因為取到in之後,可能被其他thread打斷,較大的in會先被寫入。== ```c= static void *test_producer(void *arg) { queue_test_t *test = (queue_test_t *) arg; assert(test->q); while (1) { int in = atomic_fetch_add(&test->producer_count, 1); // 保證每個thread都可以看到不同的值 if (in >= THREAD_COUNT) break; queue_result_t result = Queue.push(test->q, &in); assert(result == QUEUE_SUCCESS); } return NULL; } ``` ## 資料結構 這邊有兩個重要的資料結構。 + struct queue_internal 為主要結構,目的是在指enqueue/dequeue的位置。 + struct node 為queue中的一個node。 ```c= typedef struct node { atomic_uintptr_t next; } node; struct queue_internal { atomic_uintptr_t head, tail; size_t item_size; }; ``` ![queue_data_struct](https://i.imgur.com/M2FPt69.png) 另外這邊使用struct來把全部的function統整起來。是一種類似kernel的做法。好處是==可以有統一的介面==,裡面的實做可以不一樣。 ```c= typedef enum { QUEUE_FALSE, QUEUE_SUCCESS, QUEUE_TRUE, QUEUE_OUT_OF_MEMORY = -1, } queue_result_t; struct __QUEUE_API__ { queue_p (*create)(size_t size); queue_result_t (*push)(queue_p, void *data); queue_result_t (*hasFront)(queue_p); queue_result_t (*front)(queue_p, void *data); queue_result_t (*pop)(queue_p); queue_result_t (*clear)(queue_p); queue_result_t (*destroy)(queue_p); } Queue; /* API gateway */ struct __QUEUE_API__ Queue = { .create = queue_create, .hasFront = queue_has_front, .front = queue_front, .pop = queue_pop, .push = queue_push, .clear = queue_clear, .destroy = queue_destroy, }; ``` ## queue_create() 創建queue所以基本的結構。 ```c= static queue_p queue_create(size_t item_size) { size_t *ptr = calloc(sizeof(struct queue_internal) + alignment, 1); // calloc(n, size) is good expression assert(ptr); ptr[0] = sentinel; queue_p q = (queue_p)(ptr + 1); atomic_init(&q->head, 0); atomic_init(&q->tail, 0); q->item_size = item_size; //4(32bits) or 8(64bits) return q; } ``` 其中多allocate了一個alignment的大小是為了放sentinel。 再destory queue的時候,檢查是否有存取超過sentinel的問題。 ```c= static queue_result_t queue_destroy(queue_p q) { size_t *ptr = (size_t *) q - 1; assert(ptr[0] == sentinel); free(ptr); return QUEUE_SUCCESS; } ``` 因為多了一個sentinel所以取得queue為 ```c queue_p q = (queue_p)(ptr + 1); ``` 最後item_size為每個node除了,指向下一個node的pointer之外,存放資料的大小。 ![queue_create](https://i.imgur.com/7gXx6F2.png) ## queue_push() N個producer thread會執行這個function。依序把data放進去。這邊的步驟是 1. 準備新的node(create the new tail) 2. q->tail指向新的node(swap the new tail with the old) 3. old_tail->next指向新的node(link the old tail to the new) >每個thread分別執行步驟1準備自己的new node >==同時只有一個thread會執行到步驟2==,因為他是atomic操作。所以可以確定new node會被接到q->tail之後。 >因為old_tail為local variable所以執行完步驟2之後,再把old_tail->next連結到new node。 ```c= static queue_result_t queue_push(queue_p q, void *data) { assert(q); /* 1. create the new tail */ node *new_tail = malloc(sizeof(node) + q->item_size); if (!new_tail) return QUEUE_OUT_OF_MEMORY; atomic_init(&new_tail->next, 0); memcpy(new_tail + 1, data, q->item_size); /* 2. swap the new tail with the old */ node *old_tail = (node *) atomic_exchange(&q->tail, (atomic_uintptr_t) new_tail); // 使用exchange來取得舊的tail /* 3. link the old tail to the new */ atomic_store(old_tail ? &old_tail->next : &q->head, (atomic_uintptr_t) new_tail); return QUEUE_SUCCESS; } ``` ![queue_push_not_first_data](https://i.imgur.com/v9hFzCY.png) ```c= atomic_store(old_tail ? &old_tail->next : &q->head, (atomic_uintptr_t) new_tail); ``` 這邊有兩種情況, 1. queue沒有資料。則old_tail == NULL,只需要把q->head連結到new node。 2. queue已經有資料。則old_tail != NULL,只需要old_tail->next = new node。 ![queue_push_first_data](https://i.imgur.com/dx3K7yd.png) :::warning 但是這樣的設計不太好,因為q->head同時也被consumer thread存取,最好的設計是盡量做到資源切割,所以這邊可以使用dummy node來改善存取q->head的問題。因為有了dummy node就不會case1(queue為空)的狀況。 ::: ## queue_pop() 只有一個consumer thread會呼叫這個function,須注意producer thread和consumer thread會共同存取的變數,q->head和q->tail。 ```c= static queue_result_t queue_pop(queue_p q) { assert(q); assert(Queue.hasFront(q) == QUEUE_TRUE); /* get the head */ node *popped = (node *) atomic_load(&q->head); node *compare = popped; /* set the tail and head to nothing if they are the same */ if (atomic_compare_exchange_strong(&q->tail, &compare, 0)) { compare = popped; /* It is possible for another thread to have pushed after * we swap out the tail. In this case, the head will be different * then what was popped, so we just do a blind exchange regardless * of the result. */ atomic_compare_exchange_strong(&q->head, &compare, 0); } else { /* tail is different from head, set the head to the next value */ node *new_head = 0; while (!new_head) { /* It is possible that the next node has not been assigned yet, * so just spin until the pushing thread stores the value. */ new_head = (node *) atomic_load(&popped->next); } atomic_store(&q->head, (atomic_uintptr_t) new_head); } free(popped); return QUEUE_SUCCESS; } ``` 這邊也有兩種情況 1. pop後queue為空, 2. pop後queue不為空, ### case1. pop後queue為空 如果只剩一個資料,則下面就會成立。因為q->head和q->tail指向同一個node。因為是最後一個資料pop後head和tail都會指向NULL。 ```c= node *popped = (node *) atomic_load(&q->head); node *compare = popped; atomic_compare_exchange_strong(&q->tail, &compare, 0) ``` ![queue_pop_last_data](https://i.imgur.com/xSF0HUe.png) ==這邊程式碼的註解提到,如果這個時候又有新的資料被push進來,則tail和head就會指向新的node,因為tail為NULL。== ![](https://i.imgur.com/SJ5sLZu.png) ![queue_pop_last_data_add_one](https://i.imgur.com/ZDaqXjd.png) ==這時候因為有compare_exchange所以q->head,不會改變。== ```c= atomic_compare_exchange_strong(&q->head, &compare, 0); ``` ### case2. pop後queue不為空 就是queue中有兩筆以上的資料,因為consumer thread只有一個,所以資料只會更多,不會更少。只需要把head往後移動即可。 ![queue_pop_not_last](https://i.imgur.com/uxigNRz.png) 程式碼註解提到,為了避免剛好要pop的下一個節點還沒指定上去,所以必須busy wait直到下一個節點被接上去為止。 ```c= node *new_head = 0; while (!new_head) { /* It is possible that the next node has not been assigned yet, * so just spin until the pushing thread stores the value. */ new_head = (node *) atomic_load(&popped->next); } ``` ![queue_pop_case2_next_not_ready](https://i.imgur.com/Z9Nwe1z.png) :::warning 同理使用dummy node來改善存取q->tail的問題。因為有了dummy node就不會case1(pop完queue為空)的狀況。都可以視為同一種狀況處理。 ::: ## 自我檢查 ### 不同thread間存取的相同資源是什麼?? 有三個不同種類的thread,main thread,producer thread和consumer thread。main thread建立起producer和consumer之後就進入等待狀態。 存取的共同資源 producer thread : q->head, q->tail。 consumer thread : q->head, q->tail。 如果使用dummy node應該可以減少存取的共同資源。 ### 有沒有ABA的問題? 沒有。 即使使用了malloc和free。但是因為consumer thread只有一個,原本指到A node不會因為其他consumer thread被free掉,然後又被producer thread malloc回來。 ### 盡量做到資源切割,或是使用Thread local storage。 可以使用dummy node來改善共同存取資源的問題。 ### 資料間有沒有false sharing問題?? 共同存取的資料為q->head和q->tail。 ```c= struct queue_internal { atomic_uintptr_t head, tail; size_t item_size; }; ``` 其中atomic_uintptr_t在32bit和64bit系統間分別為32bits和64bits,剛好等於cache line的大小。 所以不會有false sharing的問題。 ### 單一資料有沒有跨過兩個cahce line? 沒有。 ### 共同變數上使用volatile來防止compiler優化。 沒使用volatile來保護thread間共同存取的變數。 ###### tags: `linux2021`