# 2024q1 Homework(quiz10)
contributed by < yourui1017 >
## 開發環境
``` shell
$ gcc --version
(Ubuntu 11.4.0-1ubuntu1~22.04) 11.4.0
$ lscpu
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Address sizes: 39 bits physical, 48 bits virtual
Byte Order: Little Endian
CPU(s): 12
On-line CPU(s) list: 0-11
Vendor ID: GenuineIntel
Model name: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
CPU family: 6
Model: 158
Thread(s) per core: 2
Core(s) per socket: 6
Socket(s): 1
Stepping: 10
CPU max MHz: 4100.0000
CPU min MHz: 800.0000
BogoMIPS: 4399.99
Virtualization: VT-x
Caches (sum of all):
L1d: 192 KiB (6 instances)
L1i: 192 KiB (6 instances)
L2: 1.5 MiB (6 instances)
L3: 9 MiB (1 instance)
NUMA:
NUMA node(s): 1
NUMA node0 CPU(s): 0-11
```
## 第十週測驗題
### 測驗2
:::success
延伸問題:
1. 解釋上述程式碼運作原理,程式碼參照論文〈[A Scalable, Portable, and Memory-Efficient Lock-Free FIFO Queue](https://drops.dagstuhl.de/storage/00lipics/lipics-vol146-disc2019/LIPIcs.DISC.2019.28/LIPIcs.DISC.2019.28.pdf)〉提及的 SCQ (Scalable Circular Queue)
2. 比照 [rcu-list](https://github.com/sysprog21/concurrent-programs/tree/master/rcu-list) 的實作手法,藉由 [thread-rcu](https://github.com/sysprog21/concurrent-programs/tree/master/thread-rcu) 來改寫上述使用到 hazard pointer (HP) 的場景,比較效能落差並解讀
3. 以 [userspace RCU](https://hackmd.io/@sysprog/ry_6RHgS3) 改寫上述程式碼並探討效能表現
4. 綜合上述實驗,提出改進 lfq 效能的方案
5. 在 Linux 核心找出類似 lfq 需求的程式碼和應用場景
:::
lock-free: 強調以系統的觀點來看,只要執行足夠長的時間,至少會有一個執行緒會有進展。因此不能夠恣意地使用 lock,否則就會違反 lock-free 的特性。
於是乎,我們需要某種同為 lock-free 的記憶體物件回收機制。
對於 C 這樣缺乏內建 concurrent GC 機制的程式語言來說,若要實作 lock-free 演算法,就要自行處理記憶體釋放的議題。[Hazard pointer](https://en.wikipedia.org/wiki/Hazard_pointer) 是其中一種解決方案,其原理是讀取端執行緒對指標進行識別,指標 (特別是指向的記憶體區塊) 若要釋放時,會事先保存,延遲到確認沒有讀取端執行緒,才進行真正的釋放。Linux 核心的 [RCU 同步機制](https://hackmd.io/@sysprog/ry_6RHgS3)是另一種 lock-free 程式設計演算法和記憶體回收機制。
> 參考自 [並行程式設計](https://hackmd.io/@sysprog/concurrency/%2F%40sysprog%2Fconcurrency-concepts): Hazard pointer
#### 重要結構體
```c
struct lfq_node {
void *data;
union {
struct lfq_node *next;
struct lfq_node *free_next;
};
bool can_free;
};
struct lfq_ctx {
alignas(64) struct lfq_node *head;
int count;
struct lfq_node **HP; /* hazard pointers */
int *tid_map;
bool is_freeing;
struct lfq_node *fph, *fpt; /* free pool head/tail */
/* FIXME: get rid of struct. Make it configurable */
int MAX_HP_SIZE;
/* avoid cacheline contention */
alignas(64) struct lfq_node *tail;
};
```
#### 程式碼運作原理
首先,會先建立 lfq_ctx ,並且初始化此 lfq_ctx 。
```c
int lfq_init(struct lfq_ctx *ctx, int max_consume_thread)
{
struct lfq_node *tmp = calloc(1, sizeof(struct lfq_node));
if (!tmp)
return -errno;
struct lfq_node *node = calloc(1, sizeof(struct lfq_node));
if (!node)
return -errno;
tmp->can_free = node->can_free = true;
memset(ctx, 0, sizeof(struct lfq_ctx));
ctx->MAX_HP_SIZE = max_consume_thread;
ctx->HP = calloc(max_consume_thread, sizeof(struct lfq_node));
ctx->tid_map = calloc(max_consume_thread, sizeof(struct lfq_node));
ctx->head = ctx->tail = tmp;
ctx->fph = ctx->fpt = node;
return 0;
}
```
並且我做了以下的改變:
```diff
int lfq_init(struct lfq_ctx *ctx, int max_consume_thread)
{
struct lfq_node *tmp = calloc(1, sizeof(struct lfq_node));
if (!tmp)
return -errno;
struct lfq_node *node = calloc(1, sizeof(struct lfq_node));
if (!node)
return -errno;
+ memset(ctx, 0, sizeof(struct lfq_ctx));
tmp->can_free = node->can_free = true;
- memset(ctx, 0, sizeof(struct lfq_ctx));
ctx->MAX_HP_SIZE = max_consume_thread;
ctx->HP = calloc(max_consume_thread, sizeof(struct lfq_node));
- ctx->tid_map = calloc(max_consume_thread, sizeof(struct lfq_node));
+ ctx->tid_map = calloc(max_consume_thread, sizeof(int));
ctx->head = ctx->tail = tmp;
ctx->fph = ctx->fpt = node;
return 0;
}
```
在原本的程式碼中會先將 tmp->can_free 和 node->can_free 設為 True ,但卻又在隨後把所有 ctx 中的位元組皆設為 0 ,讓前面的初始動作失去效用,因此把 `memset(ctx, 0, sizeof(struct lfq_ctx));` 移至 `tmp->can_free = node->can_free = true;` 之前。
另外,原始的程式碼配置記憶體大小為結構體 `struct lfq_node` 給 `ctx->tid_map` ,但是因為 `ctx->tid_map` 的資料型別是 `*int` ,因此更改為配置記憶體大小為 `int` 給 `ctx->tid_map` 。
接下來則是建立 consumer 和 producer 的 thread ,並且在沒有使用到 lock 的情況下,完成 [Producer–consumer problem](https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem)。
其中會使用到 4 個變數儲存當前的狀態
* uint64_t cnt_added
* 紀錄 Producer 加入多少次資料
* uint64_t cnt_removed
* 紀錄 Consumer 拿走多少次資料
* int cnt_thread
* 紀錄當前 Consumer 的 thread ID
* int cnt_producer
* 紀錄當前 Producer 的數量
::: info
問題1: 為什麼在 main 函式中要先給定 consumer thread 的任務,而不是先給定 producer thread 的任務? 如此一來不是可以省去給定任務前後的 atomic_fetch_add(&cnt_producer, 1); 和 atomic_fetch_sub(&cnt_producer, 1); 操作嗎?
```c
atomic_fetch_add(&cnt_producer, 1);
for (int i = 0; i < MAX_CONSUMER; i++) {
pthread_create(&thread_cons[i], NULL, remove_queue, (void *) &ctx);
}
for (int i = 0; i < MAX_PRODUCER; i++) {
atomic_fetch_add(&cnt_producer, 1);
pthread_create(&thread_pros[i], NULL, add_queue, (void *) &ctx);
}
atomic_fetch_sub(&cnt_producer, 1);
```
問題2: 在 `lfq.c` 中有 `lfq_dequeue` 函式,但卻沒有用到,他的用途是什麼?
:::
**首先先了解 producer 的任務。**
producer 的任務是把資料放入佇列中,等待 consumer 提取資料。
執行函式如下:
```c
void *add_queue(void *data)
{
struct lfq_ctx *ctx = data;
long added;
for (added = 0; added < 500000; added++) {
struct user_data *p = malloc(sizeof(struct user_data));
p->data = SOME_ID;
int ret = 0;
if ((ret = lfq_enqueue(ctx, p)) != 0) {
printf("lfq_enqueue failed, reason:%s\n", strerror(-ret));
atomic_fetch_add(&cnt_added, added);
atomic_fetch_sub(&cnt_producer, 1);
return 0;
}
}
atomic_fetch_add(&cnt_added, added);
atomic_fetch_sub(&cnt_producer, 1);
printf("Producer thread [%lu] exited! Still %d running...\n",
pthread_self(), atomic_load(&cnt_producer));
return 0;
}
```
```c
int lfq_enqueue(struct lfq_ctx *ctx, void *data)
{
struct lfq_node *insert_node = calloc(1, sizeof(struct lfq_node));
if (!insert_node)
return -errno;
insert_node->data = data;
struct lfq_node *old_tail = atomic_exchange(&ctx->tail, insert_node);
/* We have claimed our spot in the insertion order by modifying tail.
* we are the only inserting thread with a pointer to the old tail.
*
* Now we can make it part of the list by overwriting the NULL pointer in
* the old tail. This is safe whether or not other threads have updated
* ->next in our insert_node.
*/
assert(!old_tail->next && "old tail was not NULL");
atomic_store(&old_tail->next, insert_node);
return 0;
}
```
每個 producer thread 會有各自的區域參數 `added`,紀錄當前輸入資料的次數,並且在最後將每個 producer thread 輸入資料的次數總和儲存在主進程的 cnt_added。每個 producer thread 預設會輸入 500000 次資料進入佇列。因此,如果 producer thread 沒有完成指定的輸入資料次數,代表在建立 insert_node 時發生錯誤。
另外可以注意到在針對 `struct lfq_ctx *ctx` 的資料進行操作時,會使用到 atomic 操作,防止 condition variable 的發生,導致資料不正確。
**再來是 consumer 的任務。**
consumer 的任務是等待 producer 把資料放入佇列中,並提取資料。
執行函式如下:
```c
void *remove_queue(void *data)
{
struct lfq_ctx *ctx = data;
int tid = atomic_fetch_add(&cnt_thread, 1);
long deleted = 0;
while (1) {
struct user_data *p = lfq_dequeue_tid(ctx, tid);
if (p) {
if (p->data != SOME_ID) {
printf("data wrong!!\n");
exit(1);
}
free(p);
deleted++;
} else {
if (!ctx->count && !atomic_load(&cnt_producer))
break; /* queue is empty and no more producers */
sched_yield(); /* queue is empty, release CPU slice */
}
}
atomic_fetch_add(&cnt_removed, deleted);
printf("Consumer thread [%lu] exited %d\n", pthread_self(), cnt_producer);
return 0;
}
```
```c
void *lfq_dequeue_tid(struct lfq_ctx *ctx, int tid)
{
struct lfq_node *old_head, *new_head;
/* HP[tid] is necessary for deallocation. */
do {
retry:
/* continue jumps to the bottom of the loop, and would attempt a
* atomic_compare_exchange_strong with uninitialized new_head.
*/
old_head = atomic_load(&ctx->head);
atomic_store(&ctx->HP[tid], old_head);
atomic_thread_fence(memory_order_seq_cst);
/* another thread freed it before seeing our HP[tid] store */
if (old_head != atomic_load(&ctx->head))
goto retry;
new_head = atomic_load(&old_head->next);
if (new_head == 0) {
atomic_store(&ctx->HP[tid], 0);
return NULL; /* never remove the last node */
}
} while (!atomic_compare_exchange_strong(&ctx->head, &old_head, new_head));
/* We have atomically advanced head, and we are the thread that won the race
* to claim a node. We return the data from the *new* head. The list starts
* off with a dummy node, so the current head is always a node that is
* already been read.
*/
atomic_store(&ctx->HP[tid], 0);
void *ret = new_head->data;
atomic_store(&new_head->can_free, true);
/* we need to avoid freeing until other readers are definitely not going to
* load its ->next in the atomic_compare_exchange_strong loop
*/
safe_free(ctx, (struct lfq_node *) old_head);
return ret;
}
```
每個 consumer thread 會有各自的區域參數 `deleted`,紀錄當前讀取資料的次數,並且在最後將每個 consumer thread 讀取資料的次數總和儲存在主進程的 cnt_removed。每個 consumer thread 預設會讀取 500000 次資料。如果 consumer thread 從佇列中得到空的資料,並且還有 producer thread 正在執行的話就讓出 CPU 的使用資源,否則退出 while 迴圈,完成 consumer thread 的任務。
另外,要特別注意的地方是 lfq_ctx 結構體的記憶體釋放。
> 在並行程式設計中,當我們在存取共用的記憶體物件時,需要考慮到其他執行緒是否有可能也正在存取同一個物件,若要釋放該記憶體物件時,不考慮這個問題,會引發嚴重的後果,例如 [dangling pointer](https://www.wikiwand.com/en/Dangling_pointer)。
因為本程式碼旨在實作 lock free 演算法。因此沒有使用到 lock 保證沒有其他執行緒正在存取同一物件,並安全地釋放記憶體。為了要實現 lock free 演算法的實作,這邊使用的是 Hazard pointer 的實作方式。
並且可以在函式 `lfq_dequeue_tid` 中注意到只要是有關 `lfq_ctx` 結構體的操作,都需要使用 atomic 操作,確保資料的正確性。另外在 `lfq_dequeue_tid` 中,會把正在使用的節點儲存在 Hazard pointer 中。在此 `lfq_dequeue_tid` 函式中, `lfq_ctx` 的 head 會是 dummy head ,也就是目前的 head 早已經被讀取過了,head 的下一個 node 才是儲存需要的資料的節點,並因為已經取得需要的資料,所以在該執行緒中可以把 Hazard pointer(儲存 head 位址) 移除。
再來則是如何安全的釋放該記憶體位置的資料,執行函式如下:
```c
static bool in_hp(struct lfq_ctx *ctx, struct lfq_node *node)
{
for (int i = 0; i < ctx->MAX_HP_SIZE; i++) {
if (atomic_load(&ctx->HP[i]) == node)
return true;
}
return false;
}
/* add to tail of the free list */
static void insert_pool(struct lfq_ctx *ctx, struct lfq_node *node)
{
atomic_store(&node->free_next, NULL);
struct lfq_node *old_tail = atomic_exchange(&ctx->fpt, node); /* seq_cst */
atomic_store(&old_tail->free_next, node);
}
static void free_pool(struct lfq_ctx *ctx, bool freeall)
{
bool old = 0;
if (!atomic_compare_exchange_strong(&ctx->is_freeing, &old, true))
return;
for (int i = 0; i < MAX_FREE || freeall; i++) {
struct lfq_node *p = ctx->fph;
if ((!atomic_load(&p->can_free)) || (!atomic_load(&p->free_next)) ||
in_hp(ctx, (struct lfq_node *) p))
break;
ctx->fph = p->free_next;
free(p);
}
atomic_store(&ctx->is_freeing, false);
atomic_thread_fence(memory_order_seq_cst);
}
static void safe_free(struct lfq_ctx *ctx, struct lfq_node *node)
{
if (atomic_load(&node->can_free) && !in_hp(ctx, node)) {
/* free is not thread-safe */
bool old = 0;
if (atomic_compare_exchange_strong(&ctx->is_freeing, &old, true)) {
/* poison the pointer to detect use-after-free */
node->next = (void *) -1;
free(node); /* we got the lock; actually free */
atomic_store(&ctx->is_freeing, false);
atomic_thread_fence(memory_order_seq_cst);
} else /* we did not get the lock; only add to a freelist */
insert_pool(ctx, node);
} else
insert_pool(ctx, node);
free_pool(ctx, false);
}
```
在 `safe_free` 函式中,會確認該 node 的狀態是否可以 free ,並且確認是否還有其他執行緒正在使用此 node ,如果該 node 可以 free 並且沒有執行緒正在使用此 node ,就會再進一步判斷 lfq_ctx 是否正在 free ,避免有多個執行緒同時 free ,確認 lfq_ctx 可以 free 就會把該 node 的記憶體位置釋放,並且為了確保之後的執行緒會看到此變化,還加入 `atomic_thread_fence(memory_order_seq_cst);` 。相反的,如果上述條件不符合的話,就會把該 node 放入 free pool 中等待釋放。
並且再接下來會持續呼叫函式 `free_pool` ,確認 free pool 中的節點是否還有執行緒在使用,如果沒有就把節點的記憶體釋放,並且在最後要加入 memory barrier 確保其他執行緒在執行 store/load 動作時,會看到此改動。
:::info
`safe_free` 和 `free_pool` 的功用:
`safe_free` 的功用是會確認是否有執行緒正在使用該記憶體位置,如果沒有就馬上釋放剛剛存取過得記憶體位置,否則就把它加入 free pool 等待釋放。
`free_pool` 的功用則是隨時檢查 free pool 是否滿足可釋放的條件, free pool 中可能會存有時間叫久遠的記憶體位置,但因為還有執行緒在使用,所以無法釋放。
因此 `safe_free` 和 `free_pool` 的差別是釋放記憶體位置的時間。`safe_free` 會釋放該執行緒最近使用的記憶體,而 `free_pool` 則是會隨時查看 free pool 是否能夠釋放,不在乎記憶體最近一次被使用的時間。
:::
最後,當 producer 和 consumer 的任務都完成後,就會將先前配置的動態記憶體位置釋放,並將靜態記憶體位置初始化。