# Concurrency Study -- SPMC 參考: 1. [spmc](https://github.com/sysprog21/concurrent-programs/blob/master/spmc/spmc.c): A concurrent single-producer/multiple-consumer queue. ## introduction 參考的範例[[1](https://github.com/sysprog21/concurrent-programs/blob/master/spmc/spmc.c)]是實作一個queue,因為是queue所以必須符合==Fisrt In First Out== 的概念。只有一個producer thread,然後有N個comsumer thread。因為只有一個producer所以一定可以保證我們的測試data一定是依序寫入。==這邊的重點是N個consumer thread如何可以保證依序讀出資料。== 還有producer thread和consumer thread會不會互相影響。 測試資料為0~N_ITEMS - 1的連續數值,這樣我們就可以驗證是否也是連續讀出相對資料。 ```c= #define N_ITEMS (1024UL * 8) static void *producer_thread(void *arg) { spmc_ref_t spmc = arg; for (uintptr_t i = 0; i < N_ITEMS; ++i) { if (!spmc_enqueue(spmc, i)) // 推入data i(0 ~ N_ITEMS - 1) fprintf(stderr, "Failed to enqueue on %zu.\n", (size_t) i); } return NULL; } ``` ## 資料結構 smpc_base只要是來紀錄目前enqueue/dequeue所指向的queue。 __spmc_node為主要的queue結構。 ```c= typedef struct __spmc_node { size_t cap; /* One more than the number of slots available */ _Atomic size_t front, back; struct __spmc_node *_Atomic next; uintptr_t buf[]; } spmc_node_t; typedef void (*spmc_destructor_t)(uintptr_t); struct spmc_base { /* current node which enqueues/dequeues */ spmc_node_t *_Atomic curr_enqueue, *_Atomic curr_dequeue; uint8_t last_power; spmc_destructor_t destructor; }; typedef struct spmc_base *spmc_ref_t; ``` ![](https://i.imgur.com/XWnt574.png) ## spmc_new 主要的initial function。在操作之前把queue準備好。 這邊會同時allocate一個連續的空間包括spmc_base和__spmc_node。 ```c= #define HEAD_OF(spmc) ((spmc_node_t *) (void *) ((spmc_ref_t)(spmc) + 1)) spmc_node_t *const head = HEAD_OF(spmc); ``` ==找出spmc_node_t的方法就是把spmc向下指到下一個。== ![](https://i.imgur.com/aAcsoyE.png) ## 執行序的分類 1. **main thread** : 負責產生producer thread和consumer thread然後就等這些thread執行完畢。 2. **producer thread** : 一個,負責把資料推進queue中。 3. **consumer thread** : N個,負責把資料拿出queue。 其中, producer thread 會==讀寫==的global variable: + spmc->curr_enqueue(目前要把資料推進去的queue) + spmc->curr_enqueue->back(欲寫入的位置) + spmc->curr_enqueue->next(如果buf滿了,產生下一個queue) + spmc->curr_enqueue->buf[]\(資料存放的地方) 只會讀取的global variable: + spmc->curr_dequeue(判斷是否同一個queue) + spmc->curr_enqueue->front(判斷是否已經滿了) + spmc->curr_enqueue->cap comsumer thread會==讀寫==的global variable: + spmc->curr_dequeue(目前要把資料讀出去的queue) + spmc->curr_dequeue->front(欲讀出的位置) 只會讀取的global variable + spmc->curr_dequeue->next(沒資料可以讀取,切換到下一個queue) + spmc->curr_dequeue->buf[]\(讀取的資料) + spmc->curr_enqueue(判斷是否為最後一個queue, 不是就可以切換過去) ## spmc_enqueue 使用兩個idx,分別代表enqueue和dequeue的位置。 back : enqueue的位置。 front : dequeue的位置。 ![](https://i.imgur.com/wyqX7Ox.png) 另外他使用了和[ringbuffer](/PcDLBa9kTluu9yXnF4SbgQ)一樣的概念。使用size_t來記錄front和back。 所以判斷有沒有資料,或是queue已經滿了可以用下面的方法。方便理解我修正了原始的程式碼如下: ==back - front 為已寫入的資料長度。== ```c= #define IS_READABLE(back, front) (back - front != 0) #define IS_WRITABLE(back, front) (back - front < (node)->cap) ``` 既然使用size_t來記錄,讀取和寫入時必須使用module來取得正確的index。因為cap為2的倍數所以可以用以下的方法取得。 index = back % (cap - 1); 程式碼如下: ```c= #define MODULO(lhs, rhs) ((lhs) & (rhs - 1)) /* Requires rhs is power of 2 */ #define INDEX_OF(idx, node) (MODULO((idx), (node)->cap)) node->buf[INDEX_OF(back, node)] = element; ``` 所以他會像ringbuffer一樣循環寫入,直到queue滿了為止。如果queue滿了,他就會建立一個新的queue必且大小為原來的兩倍。 ![](https://i.imgur.com/atFnj9S.png) 因為只有一個producer thread所以相對單純。不需要考慮兩個producer thread間的互相影響,只需考慮和consumer thread的影響。 > 1. 讀出curr_enqueue > 2. 讀出back > 3. 判斷queue是否可以寫入(滿了?),滿了建立一個新的queue > 4. 寫入資料 > 5. 更新curr_enqueue和back ```c= bool spmc_enqueue(spmc_ref_t spmc, uintptr_t element) { spmc_node_t *node = atomic_load_explicit(&spmc->curr_enqueue, memory_order_relaxed); size_t back; retry: back = atomic_load_explicit(&node->back, memory_order_consume); if (!IS_WRITABLE(back, node)) { // queue is full ... } node->buf[INDEX_OF(back, node)] = element; atomic_store_explicit(&spmc->curr_enqueue, node, memory_order_relaxed); atomic_fetch_add_explicit(&node->back, 1, memory_order_release); return true; } ``` ## spmc_dequeue 讀取資料的基本架構,先讀取目前要讀取的queue,然後再讀取接下來要dequeue的位置,==把資料讀出之後判斷是否為原本的front。== ```c= spmc_node_t *node = atomic_load_explicit(&spmc->curr_dequeue, memory_order_consume); size_t front; no_increment: do { front = atomic_load_explicit(&node->front, memory_order_consume); if (!IS_READABLE(front, node)) { if (node != spmc->curr_enqueue) atomic_compare_exchange_strong( &spmc->curr_dequeue, &node, atomic_load_explicit(&node->next, memory_order_relaxed)); goto no_increment; } else *slot = node->buf[INDEX_OF(front, node)]; } while ( !atomic_compare_exchange_weak(&node->front, &(size_t){front}, front + 1)); ``` 這邊會存取兩個gloobal variable: spmc->curr_dequeue和 spmc->curr_dequeue->front。 ==如果其他mc_thread修改了front,因為被讀走資料了。則CAS就不會成立,目前的thread就會reload front然後繼續讀下一個。== ==如果其他mc_thread修改了curr_enqueue, 因為這個queue已經沒資料可以讀了。回到原本的thread之後因為front被改變了所以CAS不成立,然後因為沒資料可以讀取,所以也會切到下一個queue。== 其中這邊是判斷queue有沒有資料可以讀取。如果沒有且不是最後一個queue,則切到下一個queue。 ```c= if (!IS_READABLE(front, node)) { //不能讀取,就是queue空了 if (node != spmc->curr_enqueue) //不是最後一個queue atomic_compare_exchange_strong( &spmc->curr_dequeue, &node, // 是否已被切換過? atomic_load_explicit(&node->next, memory_order_relaxed)); //切換到一下個 goto no_increment; // 重跑CAS } ``` 這邊if loop使用strong function避免判斷錯誤,而while loop使用weak function即使判斷錯誤還是有機會重來。 ## 測試 判斷queue是否運作正確。 > 1. 是否同一個數值被讀出兩次。 > 2. 因為是queue使否有依序被讀出。 ```c= for (;;) { // 把最大的數值存起來 greatest = (greatest > element) ? greatest : element; if (!spmc_dequeue(spmc, &element)) fprintf(stderr, "Failed to dequeue in mc_thread.\n"); // 重複consume else if (observed_count[element]++) fprintf(stderr, "Consumed twice!\n"); // 順序不對,因為queue會依序讀出 else if (element < greatest) fprintf(stderr, "%zu after %zu; bad order!\n", (size_t) element, (size_t) greatest); //printf("Observed %zu.\n", (size_t) element); /* Test for sentinel signalling termination */ //收到最後一個資料後,把element + 1推進去讓下一個thread也中斷迴圈。 if (element >= (N_MC_ITEMS - 1)) { spmc_enqueue(spmc, element + 1); /* notify other threads */ break; } } ``` ## 自我檢驗 ### 1. 不同thread間存取的相同資源是什麼?? producer thread和consumer thread間會共同存取curr_enqueue, curr_dequeue, front, back, next這些指標。還有buf[]。 producer thread之間會共同存取curr_enqueue, curr_dequeue, front, back, next, buf[]這些資料。 ### 2. 有沒有ABA的問題? 這個例子沒有,因為沒有真的allocate/release記憶體空間。==ABA的問題發生在CAS的時候obj或expect被換成不同的值,導致結果錯誤。== 範例中,只有兩個地方有CAS。 ```c=149 if (node != spmc->curr_enqueue) atomic_compare_exchange_strong( &spmc->curr_dequeue, &node, atomic_load_explicit(&node->next, memory_order_relaxed)); goto no_increment; ``` 其中的expect為node是local variable不可能被其他thread換掉。obj為curr_dequeue可能會被其他thread改掉,但是只會改成下一個queue會不出現A->B->A的狀況。 ```c= do { idx = atomic_load_explicit(&node->front, memory_order_consume); // 省略.... *slot = node->buf[INDEX_OF(idx, node)]; } while ( !atomic_compare_exchange_weak(&node->front, &(size_t){idx}, idx + 1)); ``` 另一個CAS的obj為node->front,會被不同的thread更改,但是這個值只會一直往上累加直到overflow。因為front為size_t除非producer thread很多在目前的thread被切走之後==A(目前的thread)->B(其他thread)->A(其他thread),thread的數量會很大幾乎不可能產生這麼多的thread。== ### 3. 資料間有沒有false sharing問題?(performance issue) > **On a 32-bit system size_t will take 32 bits, on a 64-bit one 64 bits.** In other words, a variable of size_t type can safely store a pointer. front, back使用size_t會根據32bits系統還是64bits系統長度分別為4bytes和8bytes, >To maximize the performance of the FairCom Server under multi-CPU systems, ensure the cache-line setting matches the setting for your equipment. A cache-line is the smallest amount of memory a processor will retrieve and store in its highest speed cache. Using an appropriate CACHE_LINE setting helps reduce false sharing in CPU caches. **Typical cache-line sizes are 32, 64, or 128 bytes.** 因為cacheline最少為32bytes,所以最好使用alignas來對資料做排列。如果不知道要對齊多少就使用128比較保險。但是缺點是data會變得很大。所以最好用在不常allocate的資料結構。 ### 4. 單一資料有沒有跨過兩個cahce line? 從宣告的struct來看,因為spmc_base大小應該在32byts以下,所以不會有跨cacheline的問題。spmc_node都是size_t和pointer不考慮buf[]其他所佔的大小剛好是cacheline的一半,所以不會有一個變數跨過cacheline。 ### 5. 共同變數上使用volatile來防止compiler優化。 目前的例子裏面**沒有使用volatile來保護被thread間共同存取的變數**﹑所以可以加上來改進。 ###### tags: `linux2021`