# 2024q1 Homework(quiz10) contributed by < yourui1017 > ## 開發環境 ``` shell $ gcc --version (Ubuntu 11.4.0-1ubuntu1~22.04) 11.4.0 $ lscpu Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Address sizes: 39 bits physical, 48 bits virtual Byte Order: Little Endian CPU(s): 12 On-line CPU(s) list: 0-11 Vendor ID: GenuineIntel Model name: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz CPU family: 6 Model: 158 Thread(s) per core: 2 Core(s) per socket: 6 Socket(s): 1 Stepping: 10 CPU max MHz: 4100.0000 CPU min MHz: 800.0000 BogoMIPS: 4399.99 Virtualization: VT-x Caches (sum of all): L1d: 192 KiB (6 instances) L1i: 192 KiB (6 instances) L2: 1.5 MiB (6 instances) L3: 9 MiB (1 instance) NUMA: NUMA node(s): 1 NUMA node0 CPU(s): 0-11 ``` ## 第十週測驗題 ### 測驗2 :::success 延伸問題: 1. 解釋上述程式碼運作原理,程式碼參照論文〈[A Scalable, Portable, and Memory-Efficient Lock-Free FIFO Queue](https://drops.dagstuhl.de/storage/00lipics/lipics-vol146-disc2019/LIPIcs.DISC.2019.28/LIPIcs.DISC.2019.28.pdf)〉提及的 SCQ (Scalable Circular Queue) 2. 比照 [rcu-list](https://github.com/sysprog21/concurrent-programs/tree/master/rcu-list) 的實作手法,藉由 [thread-rcu](https://github.com/sysprog21/concurrent-programs/tree/master/thread-rcu) 來改寫上述使用到 hazard pointer (HP) 的場景,比較效能落差並解讀 3. 以 [userspace RCU](https://hackmd.io/@sysprog/ry_6RHgS3) 改寫上述程式碼並探討效能表現 4. 綜合上述實驗,提出改進 lfq 效能的方案 5. 在 Linux 核心找出類似 lfq 需求的程式碼和應用場景 ::: lock-free: 強調以系統的觀點來看,只要執行足夠長的時間,至少會有一個執行緒會有進展。因此不能夠恣意地使用 lock,否則就會違反 lock-free 的特性。 於是乎,我們需要某種同為 lock-free 的記憶體物件回收機制。 對於 C 這樣缺乏內建 concurrent GC 機制的程式語言來說,若要實作 lock-free 演算法,就要自行處理記憶體釋放的議題。[Hazard pointer](https://en.wikipedia.org/wiki/Hazard_pointer) 是其中一種解決方案,其原理是讀取端執行緒對指標進行識別,指標 (特別是指向的記憶體區塊) 若要釋放時,會事先保存,延遲到確認沒有讀取端執行緒,才進行真正的釋放。Linux 核心的 [RCU 同步機制](https://hackmd.io/@sysprog/ry_6RHgS3)是另一種 lock-free 程式設計演算法和記憶體回收機制。 > 參考自 [並行程式設計](https://hackmd.io/@sysprog/concurrency/%2F%40sysprog%2Fconcurrency-concepts): Hazard pointer #### 重要結構體 ```c struct lfq_node { void *data; union { struct lfq_node *next; struct lfq_node *free_next; }; bool can_free; }; struct lfq_ctx { alignas(64) struct lfq_node *head; int count; struct lfq_node **HP; /* hazard pointers */ int *tid_map; bool is_freeing; struct lfq_node *fph, *fpt; /* free pool head/tail */ /* FIXME: get rid of struct. Make it configurable */ int MAX_HP_SIZE; /* avoid cacheline contention */ alignas(64) struct lfq_node *tail; }; ``` #### 程式碼運作原理 首先,會先建立 lfq_ctx ,並且初始化此 lfq_ctx 。 ```c int lfq_init(struct lfq_ctx *ctx, int max_consume_thread) { struct lfq_node *tmp = calloc(1, sizeof(struct lfq_node)); if (!tmp) return -errno; struct lfq_node *node = calloc(1, sizeof(struct lfq_node)); if (!node) return -errno; tmp->can_free = node->can_free = true; memset(ctx, 0, sizeof(struct lfq_ctx)); ctx->MAX_HP_SIZE = max_consume_thread; ctx->HP = calloc(max_consume_thread, sizeof(struct lfq_node)); ctx->tid_map = calloc(max_consume_thread, sizeof(struct lfq_node)); ctx->head = ctx->tail = tmp; ctx->fph = ctx->fpt = node; return 0; } ``` 並且我做了以下的改變: ```diff int lfq_init(struct lfq_ctx *ctx, int max_consume_thread) { struct lfq_node *tmp = calloc(1, sizeof(struct lfq_node)); if (!tmp) return -errno; struct lfq_node *node = calloc(1, sizeof(struct lfq_node)); if (!node) return -errno; + memset(ctx, 0, sizeof(struct lfq_ctx)); tmp->can_free = node->can_free = true; - memset(ctx, 0, sizeof(struct lfq_ctx)); ctx->MAX_HP_SIZE = max_consume_thread; ctx->HP = calloc(max_consume_thread, sizeof(struct lfq_node)); - ctx->tid_map = calloc(max_consume_thread, sizeof(struct lfq_node)); + ctx->tid_map = calloc(max_consume_thread, sizeof(int)); ctx->head = ctx->tail = tmp; ctx->fph = ctx->fpt = node; return 0; } ``` 在原本的程式碼中會先將 tmp->can_free 和 node->can_free 設為 True ,但卻又在隨後把所有 ctx 中的位元組皆設為 0 ,讓前面的初始動作失去效用,因此把 `memset(ctx, 0, sizeof(struct lfq_ctx));` 移至 `tmp->can_free = node->can_free = true;` 之前。 另外,原始的程式碼配置記憶體大小為結構體 `struct lfq_node` 給 `ctx->tid_map` ,但是因為 `ctx->tid_map` 的資料型別是 `*int` ,因此更改為配置記憶體大小為 `int` 給 `ctx->tid_map` 。 接下來則是建立 consumer 和 producer 的 thread ,並且在沒有使用到 lock 的情況下,完成 [Producer–consumer problem](https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem)。 其中會使用到 4 個變數儲存當前的狀態 * uint64_t cnt_added * 紀錄 Producer 加入多少次資料 * uint64_t cnt_removed * 紀錄 Consumer 拿走多少次資料 * int cnt_thread * 紀錄當前 Consumer 的 thread ID * int cnt_producer * 紀錄當前 Producer 的數量 ::: info 問題1: 為什麼在 main 函式中要先給定 consumer thread 的任務,而不是先給定 producer thread 的任務? 如此一來不是可以省去給定任務前後的 atomic_fetch_add(&cnt_producer, 1); 和 atomic_fetch_sub(&cnt_producer, 1); 操作嗎? ```c atomic_fetch_add(&cnt_producer, 1); for (int i = 0; i < MAX_CONSUMER; i++) { pthread_create(&thread_cons[i], NULL, remove_queue, (void *) &ctx); } for (int i = 0; i < MAX_PRODUCER; i++) { atomic_fetch_add(&cnt_producer, 1); pthread_create(&thread_pros[i], NULL, add_queue, (void *) &ctx); } atomic_fetch_sub(&cnt_producer, 1); ``` 問題2: 在 `lfq.c` 中有 `lfq_dequeue` 函式,但卻沒有用到,他的用途是什麼? ::: **首先先了解 producer 的任務。** producer 的任務是把資料放入佇列中,等待 consumer 提取資料。 執行函式如下: ```c void *add_queue(void *data) { struct lfq_ctx *ctx = data; long added; for (added = 0; added < 500000; added++) { struct user_data *p = malloc(sizeof(struct user_data)); p->data = SOME_ID; int ret = 0; if ((ret = lfq_enqueue(ctx, p)) != 0) { printf("lfq_enqueue failed, reason:%s\n", strerror(-ret)); atomic_fetch_add(&cnt_added, added); atomic_fetch_sub(&cnt_producer, 1); return 0; } } atomic_fetch_add(&cnt_added, added); atomic_fetch_sub(&cnt_producer, 1); printf("Producer thread [%lu] exited! Still %d running...\n", pthread_self(), atomic_load(&cnt_producer)); return 0; } ``` ```c int lfq_enqueue(struct lfq_ctx *ctx, void *data) { struct lfq_node *insert_node = calloc(1, sizeof(struct lfq_node)); if (!insert_node) return -errno; insert_node->data = data; struct lfq_node *old_tail = atomic_exchange(&ctx->tail, insert_node); /* We have claimed our spot in the insertion order by modifying tail. * we are the only inserting thread with a pointer to the old tail. * * Now we can make it part of the list by overwriting the NULL pointer in * the old tail. This is safe whether or not other threads have updated * ->next in our insert_node. */ assert(!old_tail->next && "old tail was not NULL"); atomic_store(&old_tail->next, insert_node); return 0; } ``` 每個 producer thread 會有各自的區域參數 `added`,紀錄當前輸入資料的次數,並且在最後將每個 producer thread 輸入資料的次數總和儲存在主進程的 cnt_added。每個 producer thread 預設會輸入 500000 次資料進入佇列。因此,如果 producer thread 沒有完成指定的輸入資料次數,代表在建立 insert_node 時發生錯誤。 另外可以注意到在針對 `struct lfq_ctx *ctx` 的資料進行操作時,會使用到 atomic 操作,防止 condition variable 的發生,導致資料不正確。 **再來是 consumer 的任務。** consumer 的任務是等待 producer 把資料放入佇列中,並提取資料。 執行函式如下: ```c void *remove_queue(void *data) { struct lfq_ctx *ctx = data; int tid = atomic_fetch_add(&cnt_thread, 1); long deleted = 0; while (1) { struct user_data *p = lfq_dequeue_tid(ctx, tid); if (p) { if (p->data != SOME_ID) { printf("data wrong!!\n"); exit(1); } free(p); deleted++; } else { if (!ctx->count && !atomic_load(&cnt_producer)) break; /* queue is empty and no more producers */ sched_yield(); /* queue is empty, release CPU slice */ } } atomic_fetch_add(&cnt_removed, deleted); printf("Consumer thread [%lu] exited %d\n", pthread_self(), cnt_producer); return 0; } ``` ```c void *lfq_dequeue_tid(struct lfq_ctx *ctx, int tid) { struct lfq_node *old_head, *new_head; /* HP[tid] is necessary for deallocation. */ do { retry: /* continue jumps to the bottom of the loop, and would attempt a * atomic_compare_exchange_strong with uninitialized new_head. */ old_head = atomic_load(&ctx->head); atomic_store(&ctx->HP[tid], old_head); atomic_thread_fence(memory_order_seq_cst); /* another thread freed it before seeing our HP[tid] store */ if (old_head != atomic_load(&ctx->head)) goto retry; new_head = atomic_load(&old_head->next); if (new_head == 0) { atomic_store(&ctx->HP[tid], 0); return NULL; /* never remove the last node */ } } while (!atomic_compare_exchange_strong(&ctx->head, &old_head, new_head)); /* We have atomically advanced head, and we are the thread that won the race * to claim a node. We return the data from the *new* head. The list starts * off with a dummy node, so the current head is always a node that is * already been read. */ atomic_store(&ctx->HP[tid], 0); void *ret = new_head->data; atomic_store(&new_head->can_free, true); /* we need to avoid freeing until other readers are definitely not going to * load its ->next in the atomic_compare_exchange_strong loop */ safe_free(ctx, (struct lfq_node *) old_head); return ret; } ``` 每個 consumer thread 會有各自的區域參數 `deleted`,紀錄當前讀取資料的次數,並且在最後將每個 consumer thread 讀取資料的次數總和儲存在主進程的 cnt_removed。每個 consumer thread 預設會讀取 500000 次資料。如果 consumer thread 從佇列中得到空的資料,並且還有 producer thread 正在執行的話就讓出 CPU 的使用資源,否則退出 while 迴圈,完成 consumer thread 的任務。 另外,要特別注意的地方是 lfq_ctx 結構體的記憶體釋放。 > 在並行程式設計中,當我們在存取共用的記憶體物件時,需要考慮到其他執行緒是否有可能也正在存取同一個物件,若要釋放該記憶體物件時,不考慮這個問題,會引發嚴重的後果,例如 [dangling pointer](https://www.wikiwand.com/en/Dangling_pointer)。 因為本程式碼旨在實作 lock free 演算法。因此沒有使用到 lock 保證沒有其他執行緒正在存取同一物件,並安全地釋放記憶體。為了要實現 lock free 演算法的實作,這邊使用的是 Hazard pointer 的實作方式。 並且可以在函式 `lfq_dequeue_tid` 中注意到只要是有關 `lfq_ctx` 結構體的操作,都需要使用 atomic 操作,確保資料的正確性。另外在 `lfq_dequeue_tid` 中,會把正在使用的節點儲存在 Hazard pointer 中。在此 `lfq_dequeue_tid` 函式中, `lfq_ctx` 的 head 會是 dummy head ,也就是目前的 head 早已經被讀取過了,head 的下一個 node 才是儲存需要的資料的節點,並因為已經取得需要的資料,所以在該執行緒中可以把 Hazard pointer(儲存 head 位址) 移除。 再來則是如何安全的釋放該記憶體位置的資料,執行函式如下: ```c static bool in_hp(struct lfq_ctx *ctx, struct lfq_node *node) { for (int i = 0; i < ctx->MAX_HP_SIZE; i++) { if (atomic_load(&ctx->HP[i]) == node) return true; } return false; } /* add to tail of the free list */ static void insert_pool(struct lfq_ctx *ctx, struct lfq_node *node) { atomic_store(&node->free_next, NULL); struct lfq_node *old_tail = atomic_exchange(&ctx->fpt, node); /* seq_cst */ atomic_store(&old_tail->free_next, node); } static void free_pool(struct lfq_ctx *ctx, bool freeall) { bool old = 0; if (!atomic_compare_exchange_strong(&ctx->is_freeing, &old, true)) return; for (int i = 0; i < MAX_FREE || freeall; i++) { struct lfq_node *p = ctx->fph; if ((!atomic_load(&p->can_free)) || (!atomic_load(&p->free_next)) || in_hp(ctx, (struct lfq_node *) p)) break; ctx->fph = p->free_next; free(p); } atomic_store(&ctx->is_freeing, false); atomic_thread_fence(memory_order_seq_cst); } static void safe_free(struct lfq_ctx *ctx, struct lfq_node *node) { if (atomic_load(&node->can_free) && !in_hp(ctx, node)) { /* free is not thread-safe */ bool old = 0; if (atomic_compare_exchange_strong(&ctx->is_freeing, &old, true)) { /* poison the pointer to detect use-after-free */ node->next = (void *) -1; free(node); /* we got the lock; actually free */ atomic_store(&ctx->is_freeing, false); atomic_thread_fence(memory_order_seq_cst); } else /* we did not get the lock; only add to a freelist */ insert_pool(ctx, node); } else insert_pool(ctx, node); free_pool(ctx, false); } ``` 在 `safe_free` 函式中,會確認該 node 的狀態是否可以 free ,並且確認是否還有其他執行緒正在使用此 node ,如果該 node 可以 free 並且沒有執行緒正在使用此 node ,就會再進一步判斷 lfq_ctx 是否正在 free ,避免有多個執行緒同時 free ,確認 lfq_ctx 可以 free 就會把該 node 的記憶體位置釋放,並且為了確保之後的執行緒會看到此變化,還加入 `atomic_thread_fence(memory_order_seq_cst);` 。相反的,如果上述條件不符合的話,就會把該 node 放入 free pool 中等待釋放。 並且再接下來會持續呼叫函式 `free_pool` ,確認 free pool 中的節點是否還有執行緒在使用,如果沒有就把節點的記憶體釋放,並且在最後要加入 memory barrier 確保其他執行緒在執行 store/load 動作時,會看到此改動。 :::info `safe_free` 和 `free_pool` 的功用: `safe_free` 的功用是會確認是否有執行緒正在使用該記憶體位置,如果沒有就馬上釋放剛剛存取過得記憶體位置,否則就把它加入 free pool 等待釋放。 `free_pool` 的功用則是隨時檢查 free pool 是否滿足可釋放的條件, free pool 中可能會存有時間叫久遠的記憶體位置,但因為還有執行緒在使用,所以無法釋放。 因此 `safe_free` 和 `free_pool` 的差別是釋放記憶體位置的時間。`safe_free` 會釋放該執行緒最近使用的記憶體,而 `free_pool` 則是會隨時查看 free pool 是否能夠釋放,不在乎記憶體最近一次被使用的時間。 ::: 最後,當 producer 和 consumer 的任務都完成後,就會將先前配置的動態記憶體位置釋放,並將靜態記憶體位置初始化。