# Linux 核心專題: 並行佇列設計和實作
> 執行人: Shiang1212
> [專題解說錄影](https://youtu.be/CB06kFPb754)
### Reviewed by `allenliao666`
"最後,再執行 free_pool 將 retire list 裡的所有節點 free 掉。"這句話中英夾雜,應修正為正體中文。
> [name=Shiang1212]
> 已修改,感謝建議。
### Reviewed by `Petakuo`
"首先會判斷該節點是否能被 free 且不在任何消費者的 hazard pointer 裡的話" 這句話不太通順,能再描述清楚一點嗎?
> [name=Shiang1212]
> 原本打算直翻程式碼的意思,沒想到會不便於閱讀,已修改,感謝建議。
### Reviewed by `weihsinyeh`
問題 1 : 執行時間中的 hazard pointer 的時間測量是在 `lfq_dequeue_tid` 中加入 `usleep(10)` 並等待`free_pool` 節點數量到達 20 個以上才釋放,即在「提出效能改進方案中的情況二」測量出來的實驗結果嗎?
> [name=Shiang1212]
> 不是,是沒有加 `usleep` 的版本,但確實可以測量一下「提出效能改進方案中的情況二」的執行時間。
問題 2 : 延續問題 1 ,因為如果是用原本的程式碼,依據仍可改進的地方的描述:「但該何時釋放 retire list 裡的節點呢?」,這個意思是指從程式開始執行到程式結束,`retire list` 節點中的節點都不會被釋放嗎?
> [name=Shiang1212]
> [第十週測驗](https://hackmd.io/@sysprog/linux2024-quiz10)的測驗二中,每次讀取端結束共享資料的存取時,就會執行一次 `safe_free` 釋放 retire list 中節點記憶體。原本的描述確實容易讓人混淆,已修改,感謝建議。
### Reviewed by `Ackerman666`
我覺得這句話能再嚴謹一點,應是寫入端修改資料完畢後,一直到等待讀取端結束存取舊資料,之間所經歷的時間為寬限期。
> 如果讀取端存取共享資料的同時,寫入端也想修改這份資料的話,就會進入寬限期 (Grace Period)
> [name=Shiang1212]
> 已修改,感謝建議。
## Reviewed by `yuyuan0625`
想請問為何 hazard pointer 相較於 RCU 會對於並行佇列能有更好的效能呢,是因為其回收機制嗎?
> [name=Shiang1212]
> 從執行結果來看,當生產者多、消費者多的情況下,RCU 的執行時間比 hazard pointer 短,我認為 RCU 的回收機制確實比 hazard pointer 好,畢竟省去了將節點丟進 retire list 等待釋放的這個步驟。但實際上為何 RCU 的表現比 hazard pointer 好,可能還要進行相關實驗才能驗證。
### Review by 'Ken-LuWeiRu'
'線程運行結束前使用', thread 是「執行緒」而非線程。參見: https://hackmd.io/@sysprog/it-vocabulary
> [name=Shiang1212]
> 已修改,感謝建議。
## 任務簡介
從並行佇列的實作出發,探討 hazard pointer 和 RCU 對其整合的影響,評估執行效益,過程中參照經典論文,並予以落實,最終體會 Linux 核心相關的設計考量。
## TODO: 設計精簡的並行佇列
> 重做第 10 週測驗的測驗 2 (lfq: 精簡的並行佇列),包含延伸問題,務必探究其理論分析。
> 比較 hazard pointer 和 RCU 對其整合的影響,評估執行效益。
在並行程式設計中,假設今天執行緒 A 正在存取某個共享的記憶體內容,但同時執行緒 B 也正在修改這份共享資料,那這個情況下,可能就會導致執行緒 A 資訊不同步,或是共享資料的記憶體已被釋放等等狀況,因而導致程式運行錯誤,例如 [dangling pointer](https://www.wikiwand.com/en/Dangling_pointer)。因此,本專題採用 hazard pointer 以及 RCU 同步機制來解決上述問題,並針對兩者的性能進行評估。
### 使用 [hazard pointer](https://hackmd.io/@sysprog/concurrency/%2F%40sysprog%2Fconcurrency-hazard-pointer#%E5%AF%A6%E4%BD%9C%E7%9A%84%E6%94%B9%E9%80%B2) 實作 lock-free queue ([第十週測驗](https://hackmd.io/@sysprog/linux2024-quiz10)的測驗二)
#### 概述
hazard pointer 是一種可以控管共享資料的記憶體修改與釋放的方法。其基本的概念是允許共享資料可以被一個寫入端修改而多個讀取端存取,當存在讀取端正在存取該共享資料時,hazard pointer 會將其標註,於是寫入端可得知此訊息並延遲對其釋放。
根據 〈[Lock-Free Data Structures with Hazard Pointers](http://erdani.org/publications/cuj-2004-12.pdf)〉 這篇論文提出的架構,在 hazard pointer 架構中,每個執行緒需要維護的兩個關鍵集合:
* hazard pointer:儲存正在被存取的共享資料,因此該資料的記憶體空間不能直接被釋放
* retire list:也可稱為 free pool,儲存被要求釋放的共享資料,但實際尚未釋放
此外,hazard pointer 架構會在特定條件下 (請見[呼叫 free_pool 的時機](https://hackmd.io/sevSO8DbQXW5s8LIB2mX6g#%E5%91%BC%E5%8F%AB-free_pool-%E7%9A%84%E6%99%82%E6%A9%9F)) 檢查 retire list,計算執行緒的 retire list 與其他執行緒的 hazard pointer 之差集,找出實際上能釋放的共享資料並真正釋放其記憶體。
因此 hazard pointer 架構能安全的釋放記憶體的基本想法就是:
* 被執行緒放在其 hazard pointer 中的共享資料尚不能被釋放
* 被執行緒要求釋放的共享資料必須先放在 retire list 中
* 透過定期檢查 retire list 並釋放其中共享內容的記憶體
而第十週測驗的測驗二對 hazard pointer 的實作做了改良,透過一個全域的管理者,僅使用指標的指標紀錄每個執行緒正在使用的共享資料,與一個 retire list 紀錄所有執行緒要求釋放的共享資料。
#### 流程
main thread 使用 `lfq_init` 初始化 handler `ctx`,用來管理上述的 hazard pointer 與 retire list。前者使用指標的指標 `HP` 來紀錄每個讀取端正在使用的節點。後者使用 `fph` 和 `fpt` 將每個寫入端要求釋放的節點串起來形成一個鏈結串列,等某個特定時機再一次釋放。再按照輸入的參數產生 `MAX_PRODUCER` 個生產者以及 `MAX_CONSUMER` 個消費者。
其中,在 `ctx` 使用 `head` 和 `tail` 指標維護用來存放資料的佇列,生產者執行 `lfq_enqueue` 將東西放進佇列,消費者執行 `lfq_dequeue_tid` 將東西從佇列移除。
在消費者執行 `lfq_dequeue_tid` 時,最後會使用 `safe_free` 釋放節點的記憶體,此時必須考慮到是否還有其他 thread 正在使用該節點,這裡採用 hazard pointer 解決存取共用記憶體可能導致的問題。使用 `ctx->HP` 紀錄每個消費者正在使用的指標,讓每個 thread 在執行 `safe_free` 釋放節點記憶體前,先透過 `ctx->HP` 檢查是否還有其他 thread 正在存取該節點,藉此避免不小心釋放掉別人正在使用的節點,導致程式運行發生錯誤。
在使用 `safe_free` 釋放節點記憶體時,會先判斷是否能釋放該節點的記憶體,如果可以,就直接釋放;如果不行,就執行 `insert_pool` 將該節點放進 retire list,等待後續呼叫 `free_pool` 再一口氣釋放 retire list 裡所有節點的記憶體。
等到生產者結束生產,消費者也結束運行了,程式最後會執行 `lfq_release` 將還存在佇列裡的節點、retire list 裡的節點、`ctx` 本身的記憶體全部釋放。
#### 結構體介紹
##### `lfq_node`
lock-free queue node 的結構,也就是佇列裡的基本元素,包含資料、是否能釋放,以及一個 union 裝著指向下一個元素的指標 `next` 與 指向 retire list 的下一個節點的指標 `free_next`。
* next : 透過每個節點的 `next`,將節點串連起來形成佇列
* free_next : retire list 會透過每個節點的 `free_next`,將每個消費者要求釋放的節點串連起來形成一個鏈結串列
```c
struct lfq_node {
int data;
union {
struct lfq_node *next;
struct lfq_node *free_next;
};
bool can_free;
};
```
##### `lfq_ctx`
lock-free queue 的 handler,用來儲存佇列及 retire list 的頭尾,及 tid 的 map,還有個 bool 型態的變數用來確認是否正在執行釋放,此外,也儲存 hazard pointers 和他的最大長度。
* `head` / `tail`:指向 queue 中的首個 / 末個元素
* `HP`:使用指標的指標,紀錄每個消費者的 hazard pointer
* `fph` / `fpt`:retire list 的首個 / 末個元素
```c
struct lfq_ctx {
alignas(64) struct lfq_node *head;
int count;
struct lfq_node **HP; /* hazard pointers */
int *tid_map;
bool is_freeing;
struct lfq_node *fph, *fpt; /* retire list head/tail */
/* FIXME: get rid of struct. Make it configurable */
int MAX_HP_SIZE;
/* avoid cacheline contention */
alignas(64) struct lfq_node *tail;
}
```
示意圖:
![image](https://hackmd.io/_uploads/r1_zTloIC.png)
:::danger
使用 Graphviz 或其他向量製圖語法來繪製。
:::
* 藍色箭頭:節點 `lfq_node` 透過 `next` 與其他節點串連起來,形成佇列
* 紅色箭頭:retire list,當節點被要求釋放時,會透過該節點的 `free_next` 將該節點串在 `ctx->fpt` 後面,形成一條 retire list
* 綠色箭頭:HP,紀錄正在被存取的節點
#### 函式
##### `lfq_enqueue`
生產者透過 `lfq_enqueue` 將資料 push 進佇列中。
宣告欲新增的節點內容 `insert_head`,讓 `ctx->tail` 指向這個節點,並將 `old_tail` 的 `next` 指向 `insert_head`。
```c
int lfq_enqueue(struct lfq_ctx *ctx, void *data)
{
struct lfq_node *insert_node = calloc(1, sizeof(struct lfq_node));
if (!insert_node)
return -errno;
insert_node->data = data;
struct lfq_node *old_tail = atomic_exchange(&ctx->tail, insert_node);
/* We have claimed our spot in the insertion order by modifying tail.
* we are the only inserting thread with a pointer to the old tail.
*
* Now we can make it part of the list by overwriting the NULL pointer in
* the old tail. This is safe whether or not other threads have updated
* ->next in our insert_node.
*/
assert(!old_tail->next && "old tail was not NULL");
atomic_store(&old_tail->next, insert_node);
return 0;
}
```
##### `lfq_dequeue_tid`
消費者透過 `lfq_dequeue_tid` 將資料從佇列中 pop 掉。
從佇列的前端讀取一個節點 `old_head`,並存入 `ctx->HP[tid]` 用來告知其他 thread 該節點目前有人使用。再用 `ctx_head` 指向 `new_head` (也就是 `old_head->next`),最後將 `ctx_HP[tid]` 重新設為 0,再用 `safe_free` 釋放 `old_head` 的記憶體。
```c
void *lfq_dequeue_tid(struct lfq_ctx *ctx, int tid)
{
struct lfq_node *old_head, *new_head;
/* HP[tid] is necessary for deallocation. */
do {
retry:
/* continue jumps to the bottom of the loop, and would attempt a
* atomic_compare_exchange_strong with uninitialized new_head.
*/
old_head = atomic_load(&ctx->head);
atomic_store(&ctx->HP[tid], old_head);
atomic_thread_fence(memory_order_seq_cst);
/* another thread freed it before seeing our HP[tid] store */
if (old_head != atomic_load(&ctx->head))
goto retry;
new_head = atomic_load(&old_head->next);
if (new_head == 0) {
atomic_store(&ctx->HP[tid], 0);
return NULL; /* never remove the last node */
}
} while (!atomic_compare_exchange_strong(&ctx->head, &old_head, new_head));
// Do something
atomic_store(&ctx->HP[tid], 0);
void *ret = new_head->data;
atomic_store(&new_head->can_free, true);
/* we need to avoid freeing until other readers are definitely not going to
* load its ->next in the atomic_compare_exchange_strong loop
*/
safe_free(ctx, (struct lfq_node *) old_head);
return ret;
}
```
##### `safe_free`
`lfq_dequeue_tid` 裡使用 `safe_free` 嘗試釋放記憶體。首先判斷該節點是否存在 hazard pointer 裡,如果是,就代表有其他執行緒正在存取這個節點,執行 `insert_pool` 將該節點串連進 retire list 等待後續釋放;如果不是,則代表可以毫無顧慮的使用 `free`,將該節點的記憶體釋放。最後,執行 `free_pool` 將 retire list 裡的所有節點的記憶體釋放掉。
其中,可以看到呼叫 `safe_free` 嘗試釋放節點時,「每次」都會執行 `free_pool` 掃描一次 retire list 裡的節點並將其釋放,這樣做能確保被要求釋放的節點能很快獲得釋放,但過於頻繁的呼叫 `safe_free` 也有可能導致效能下降(明明 retire list 是空的但卻呼叫 `free_pool`,白呼叫了!)。
:::danger
避免只是張貼程式碼而沒有對等的解說和討論。HackMD 不該是張貼完整程式碼的地方,GitHub 才是。
改進你的漢語表達。
:::
```c
static void safe_free(struct lfq_ctx *ctx, struct lfq_node *node)
{
if (atomic_load(&node->can_free) && !in_hp(ctx, node)) {
/* free is not thread-safe */
bool old = 0;
if (atomic_compare_exchange_strong(&ctx->is_freeing, &old, 1)) {
/* poison the pointer to detect use-after-free */
node->next = (void *) -1;
free(node); /* we got the lock; actually free */
atomic_store(&ctx->is_freeing, false);
atomic_thread_fence(memory_order_seq_cst);
} else /* we did not get the lock; only add to a freelist */
insert_pool(ctx, node);
} else
insert_pool(ctx, node);
free_pool(ctx, false);
}
```
##### `insert_pool` & `free_pool`
`insert_pool` 將尚有人使用的節點存進 retire list,以待後續使用 `free_pool` 釋放 retire list 裡節點的記憶體。
```c=
static void insert_pool(struct lfq_ctx *ctx, struct lfq_node *node)
{
atomic_store(&node->free_next, NULL);
struct lfq_node *old_tail = atomic_exchange(&ctx->fpt, node);
atomic_store(&old_tail->free_next, node);
}
static void free_pool(struct lfq_ctx *ctx, bool freeall)
{
bool old = 0;
if (!atomic_compare_exchange_strong(&ctx->is_freeing, &old, 1))
return;
for (int i = 0; i < MAX_FREE || freeall; i++) {
struct lfq_node *p = ctx->fph;
if ((!atomic_load(&p->can_free)) || (!atomic_load(&p->free_next)) ||
in_hp(ctx, (struct lfq_node *) p))
break;
ctx->fph = p->free_next;
free(p);
}
atomic_store(&ctx->is_freeing, false);
atomic_thread_fence(memory_order_seq_cst);
}
```
其中,程式碼第 16 行的 `if` 判斷,我們使用這個判斷式推斷這個節點是否能正常釋放,如果判斷通過(也就是該節點現在不能被釋放),就會用 `break` 跳出迴圈,結束 retire list 的掃描與釋放,但這樣貿然結束掃描會導致 retire list 中的節點進行無謂的等待。
考慮到以下例子:retire list 中有 7 個節點,其中節點 3 被 hazard pointer 標注。
```
HP
|
|
V
retire list: 1 -> 2 -> 3 -> 4 -> 5 -> 6 -> 7
```
這個情況下呼叫 `free_pool` 的話,`free_pool` 釋放完節點 1 和節點 2 的記憶體後,在判斷節點 3 是否能被釋放時,會因為該節點當前被 hazard pointer 標注而停止掃描,導致節點 3 後面的節點無法被釋放。
##### `lfq_init` & `lfq_release`
`lfq_init` 將 `lfq_ctx` 初始化,`lfq_release` 將還存在佇列裡的節點、retire list 裡的節點、ctx 本身的記憶體全部釋放。
```c
int lfq_init(struct lfq_ctx *ctx, int max_consume_thread)
{
struct lfq_node *tmp = calloc(1, sizeof(struct lfq_node));
if (!tmp)
return -errno;
struct lfq_node *node = calloc(1, sizeof(struct lfq_node));
if (!node)
return -errno;
tmp->can_free = node->can_free = true;
memset(ctx, 0, sizeof(struct lfq_ctx));
ctx->MAX_HP_SIZE = max_consume_thread;
ctx->HP = calloc(max_consume_thread, sizeof(struct lfq_node));
ctx->tid_map = calloc(max_consume_thread, sizeof(struct lfq_node));
ctx->head = ctx->tail = tmp;
ctx->fph = ctx->fpt = node;
return 0;
}
int lfq_release(struct lfq_ctx *ctx)
{
if (ctx->tail && ctx->head) { /* if we have data in queue */
while ((struct lfq_node *) ctx->head) { /* while still have node */
struct lfq_node *tmp = (struct lfq_node *) ctx->head->next;
safe_free(ctx, (struct lfq_node *) ctx->head);
ctx->head = tmp;
}
ctx->tail = 0;
}
if (ctx->fph && ctx->fpt) {
free_pool(ctx, true);
if (ctx->fph != ctx->fpt)
return -1;
free(ctx->fpt); /* free the empty node */
ctx->fph = ctx->fpt = 0;
}
if (ctx->fph || ctx->fpt)
return -1;
free(ctx->HP);
free(ctx->tid_map);
memset(ctx, 0, sizeof(struct lfq_ctx));
return 0;
}
```
:::danger
評估上述程式碼的正確性 (correctness) 和資料處理吞吐量 (throughput)
:::
#### 仍可改進的地方
* 從 `lfq_dequeue_tid` 可以看到,每次讀取端結束共享資料的存取時,會呼叫 `safe_free` 將 retire list 裡的節點記憶體釋放掉。但過於頻繁的呼叫 `safe_free`,可能會導致程式運行效率下降,所以該何時釋放 retire list 裡的節點比較好呢?
---
### 改用 [RCU](https://hackmd.io/@sysprog/linux-rcu) 實作 lock-free queue
接下來這裡使用 RCU (Read-Copy-Update) 機制來實現 lock-free queue 的實作,RCU 同步機制是另一種 lock-free 程式設計演算法和記憶體回收機制,透過其特有的「寬限期」,確保所有執行緒能正確且安全的使用共享記憶體內容。
#### 概述
RCU 同步機制於 2002 年新增至 Linux 核心,RCU 允許讀取與更新同時進行,提高並行程式的 Scalability。適合使用在多讀取少寫入,且對資料沒有 strong consistency 需求的場景。
讀取端紀錄讀取資料的開始與結束,以便寫入端能夠確定該資料可以被安全釋放的時機。一旦所有讀取端都傳送讀取結束的訊號給寫入端時,此時寫入端就能安全的釋放該資料的記憶體,在不使用鎖的情況下達成執行緒的同步。
在 RCU 架構下,寫入端打算修改資料時,會需要等待所有讀取端結束讀取,才能對其修改,而從寫入端打算修改資料,直到等待讀取端結束存取該資料,之間所經歷的時間為寬限期 (Grace Period)。在寬限期,寫入端會先將要寫入的資料修改在副本裡,等到所有讀取端結束存取時,再將用副本將共享資料覆蓋過去,這樣就可以在確保讀取端運行正確的同時,達成寫入端的安全修改。相比於 hazard pointer,使用 RCU 進行實作可以精確的得知何時能安全的釋放共享資料。
![圖片](https://hackmd.io/_uploads/SJZEhOtIR.png)
#### 流程
[code](https://github.com/Shiang1212/rcu/commit/7a9f5d89cdc290a792fcb7518ffabee2dde5f778)
這裡使用 [Userspace RCU library](https://github.com/datacratic/userspace-rcu) 來實作,執行前請先確認系統已完成下載。
```shell
sudo apt-get install liburcu-dev
```
這份程式碼參考 [sample code](https://github.com/urcu/userspace-rcu/blob/master/doc/examples/rculfqueue/cds_lfq_dequeue.c),如同 hazard pointer 版本,改用 RCU 進行 lock-free queue 的實作。main thread 使用 `urcu_memb_register_thread` 初始化一個 userspace RCU ,然後按照輸入的參數產生 `MAX_PRODUCER` 個生產者以及 `MAX_CONSUMER` 個消費者,並宣告佇列的開頭 `queue`。
其中,在 `cds_lfq_queue_rcu` 裡面存放受 RCU 保護的節點 `node`,透過 `node` 裡的 `cds_lfq_node_rcu` 將節點串成一個佇列。
所有打算存取受 RCU 保護資料的 thread,都要先使用 `urcu_memb_register_thread` 將該 thread 註冊到 RCU 中,這樣這個 thread 就可以使用 RCU 的讀取和更新操作。運行結束前再用 `urcu_memb_unregister_thread` 註銷該 thread。
生產者透過 `cds_lfq_queue_rcu` 將資料 push 進入佇列,消費者使用 `cds_lfq_dequeue_rcu` 將資料從佇列移除且釋放該資料的記憶體。而兩者在執行前後都要使用 `urcu_memb_read_lock` 和 `urcu_memb_read_unlock` 聲明該執行緒已進入臨界區間的開始與結束,以確保受 RCU 保護的資料的一致性。
最後,main thread 使用 `urcu_memb_unregister_thread` 註銷 RCU。
:::danger
選擇 Userspace RCU 裡頭 flavor 的考慮因素是什麼?針對並行佇列,該怎麼選擇?
:::
>待完成:聽完 [yeh-sudo](https://hackmd.io/@sysprog/B1W2HKTER) 的報告,有五種 flavor。
>這份程式碼的讀寫行為接近 1:1
#### 結構體
##### `node`
佇列中的基本元素,`value` 用來儲存資料,`next` 用來指向佇列中的下一個元素。
```c
struct node {
int value;
struct cds_lfq_node_rcu next;
};
```
##### `cds_lfq_node_rcu`
* next:用來指向下一個節點的指標
```c
struct cds_lfq_node_rcu {
struct cds_lfq_node_rcu *next;
};
```
##### `cds_lfq_queue_rcu`
* head / tail:指向佇列首個 / 末個節點的指標
* queue_call_rcu:這是一個函數指標,用於在 RCU 機制下使用回調函數(RCU Callback)。當需要釋放或清理某個節點時,會使用這個回調函數
```c
struct cds_lfq_queue_rcu {
struct cds_lfq_node_rcu *head, *tail;
void (*queue_call_rcu)(struct rcu_head *head,
void (*func)(struct rcu_head *head));
};
```
示意圖:
![image](https://hackmd.io/_uploads/B1vfdxsIC.png)
#### 函式
##### `urcu_memb_register_thread` & `urcu_memb_unregister_thread`
在 RCU 同步機制下,打算存取受 RCU 保護的資料的 thread,需要使用 `urcu_memb_register_thread` 向 RCU 註冊,當前 thread 被註冊之後,該 thread 就能夠安全地進行 RCU 讀取操作,並且 RCU 會追蹤這個 thread 的狀態,以便在進行更新操作時確保資料的一致性。
如下方範例,thread 使用 `urcu_memb_register_thread` 向 RCU 註冊自己的 thread,執行緒運行結束前使用 `urcu_memb_unregister_thread` 向 RCU 註銷。
```c
void *thread_function() {
urcu_memb_register_thread();
// From now on, this thread can use RCU safely
do_something();
...
urcu_memb_register_thread();
}
```
##### `urcu_memb_read_lock` & `urcu_memb_read_unlock`
除了向 RCU 註冊自己的 thread,在存取 RCU 保護的資料前,還需要使用 `urcu_memb_read_lock` 以及 `urcu_memb_read_unlock` 告知 RCU 該 thread 進行讀取操作的時間區段,以便資料能在安全的時機被修改。
如下方範例,在進入 critical section 前使用 `urcu_memb_read_lock`,在離開 critical section 前使用 `urcu_memb_read_unlock`。
```c
void *thread_function() {
urcu_memb_register_thread();
// From now on, this thread can use RCU safely
urcu_memb_read_lock();
// enter critical section
do_something();
urcu_memb_read_unlock();
// exit critical section
urcu_memb_register_thread();
}
```
##### `cds_lfq_enqueue_rcu` & `cds_lfq_dequeue_rcu`
生產者使用 `cds_lfq_enqueue_rcu` 將資料 push 進佇列,消費者使用 `cds_lfq_denqueue_rcu` 將資料從佇列中 pop 掉。兩者在使用前都要先用 `urcu_memb_read_lock` 以及 `urcu_memb_read_unlock` 告知 RCU,確保已進入 critical section 才能繼續執行。其他細節請見 [urcu/static/rculfqueue.h](https://github.com/datacratic/userspace-rcu/blob/master/urcu/static/rculfqueue.h)。
```c
static inline void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q,
struct cds_lfq_node_rcu *node)
{
for (;;) {
struct cds_lfq_node_rcu *tail, *next;
tail = rcu_dereference(q->tail);
next = uatomic_cmpxchg(&tail->next, NULL, node);
if (next == NULL) {
(void) uatomic_cmpxchg(&q->tail, tail, node);
return;
} else {
(void) uatomic_cmpxchg(&q->tail, tail, next);
continue;
}
}
}
static inline struct cds_lfq_node_rcu *_cds_lfq_dequeue_rcu(
struct cds_lfq_queue_rcu *q)
{
for (;;) {
struct cds_lfq_node_rcu *head, *next;
head = rcu_dereference(q->head);
next = rcu_dereference(head->next);
if (head->dummy && next == NULL)
return NULL; /* empty */
if (!next) {
enqueue_dummy(q);
next = rcu_dereference(head->next);
}
if (uatomic_cmpxchg(&q->head, head, next) != head)
continue; /* Concurrently pushed. */
if (head->dummy) {
/* Free dummy after grace period. */
rcu_free_dummy(head);
continue; /* try again */
}
return head;
}
}
```
## TODO: 設計實驗探討並行佇列的效能表現
> 採用第 12 週測驗 3 提到的效能分析方法,量化實作表現 (without RCU vs. with RCU)。
> 務必確認執行結果的驗證,善用 ThreadSanitizer 一類的工具
### 編譯
#### hazard pointer
```shell
gcc -std=gnu99 -O3 -Wall -Wextra -o test -D MAX_PRODUCER=2 -D MAX_CONSUMER=10 lfq.c test.c -lpthread
```
#### RCU
編譯:
```shell
gcc -std=gnu99 -O3 -Wall -Wextra -o test -D MAX_PRODUCER=2 -D MAX_CONSUMER=10 rcu.c -lurcu-memb -lurcu-cds -lpthread
```
### 執行時間(ms)、吞吐量(個/s)、記憶體使用(Byte)
#### producer:2、consumer:10
##### 執行時間
![2_10](https://hackmd.io/_uploads/H171y_Nv0.png)
##### 吞吐量
| hazard pointer | RCU |
| :----: | :----: |
| 117647 | 108108 |
##### 記憶體使用狀況
```
--------------------------------------------------------------------------------
total(B) useful-heap(B) extra-heap(B) stacks(B)
--------------------------------------------------------------------------------
hazard pointer 16,736 9,392 7,344 0
RCU 11,744 8,424 3,320 0
```
#### producer:20、consumer:100
##### 執行時間
![20_100](https://hackmd.io/_uploads/ByxaAvNPC.png)
##### 吞吐量
| hazard pointer | RCU |
| :----: | :----: |
| 833333 | 909090 |
##### 記憶體使用狀況
```
--------------------------------------------------------------------------------
total(B) useful-heap(B) extra-heap(B) stacks(B)
--------------------------------------------------------------------------------
hazard pointer 112,096 57,728 54,368 0
RCU 83,000 52,392 30,608 0
```
#### producer:100、consumer:100
##### 執行時間
![100_100](https://hackmd.io/_uploads/rkYlyuVDA.png)
##### 吞吐量
| hazard pointer | RCU |
| :----: | :----: |
| 1538461 | 2352941 |
##### 記憶體使用狀況
```
--------------------------------------------------------------------------------
total(B) useful-heap(B) extra-heap(B) stacks(B)
--------------------------------------------------------------------------------
hazard pointer 673,176 312,320 360,856 0
RCU 435,120 274,152 160,968 0
```
以上述的結果來看,可以看到當生產者與消費者的數量小,hazard pointer 在執行時間與記憶體需求上的表現勝過 RCU,這是因為 hazard pointer 的架構較簡單輕便,不必嚴格紀錄每個執行緒的讀取時段的開端與結束,在執行緒數量小的情況能有比較好的效能。
隨著生產者與消費者的數量越來越大,RCU 在執行時間與記憶體需求上的表現逐漸勝過 hazard pointer,這是因為當執行緒數量上升,共享資料的存取情況會更加複雜,而在 RCU 架構下,透過寬限期我們可以更精確的得知何時能釋放共享記憶體,減少節點等待釋放的時間,進而降低執行時間與程式對於記憶體的需求。
### 記憶體釋放
使用 Valgrind 查看記憶體釋放狀況:producer:2、consumer:10
#### hazard pointer
```
==21129== HEAP SUMMARY:
==21129== in use at exit: 0 bytes in 0 blocks
==21129== total heap usage: 417 allocs, 417 frees, 10,416 bytes allocated
==21129==
==21129== All heap blocks were freed -- no leaks are possible
==21129==
==21129== For lists of detected and suppressed errors, rerun with: -s
==21129== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
```
可以看到所有從 heap 請求配置的記憶體空間都成功釋放掉了。
#### RCU
```
==21544== HEAP SUMMARY:
==21544== in use at exit: 456 bytes in 3 blocks
==21544== total heap usage: 216 allocs, 213 frees, 9,488 bytes allocated
==21544==
==21544== LEAK SUMMARY:
==21544== definitely lost: 0 bytes in 0 blocks
==21544== indirectly lost: 0 bytes in 0 blocks
==21544== possibly lost: 288 bytes in 1 blocks
==21544== still reachable: 168 bytes in 2 blocks
==21544== suppressed: 0 bytes in 0 blocks
==21544== Rerun with --leak-check=full to see details of leaked memory
==21544==
==21544== Use --track-origins=yes to see where uninitialised values come from
==21544== For lists of detected and suppressed errors, rerun with: -s
==21544== ERROR SUMMARY: 601 errors from 5 contexts (suppressed: 0 from 0)
```
可以看到程式配置了 216 個 block,直到程式結束運行,仍有 3 個 block 沒有被釋放,造成記憶體的浪費。我觀察程式碼,發現原來宣告的佇列的開頭 `queue` 沒有釋放,於是我做了以下改動:
```diff
int main() {
pthread_t thread_cons[MAX_CONSUMER], thread_pros[MAX_PRODUCER];
cds_lfq_init_rcu(&queue, urcu_memb_call_rcu);
for (int i = 0; i < MAX_CONSUMER; i++) {
pthread_create(&thread_cons[i], NULL, remove_queue, NULL);
}
for (int i = 0; i < MAX_PRODUCER; i++) {
// atomic_fetch_add(&cnt_producer, 1);
pthread_create(&thread_pros[i], NULL, add_queue, NULL);
}
for (int i = 0; i < MAX_PRODUCER; i++)
pthread_join(thread_pros[i], NULL);
for (int i = 0; i < MAX_CONSUMER; i++)
pthread_join(thread_cons[i], NULL);
+ int ret = cds_lfq_destroy_rcu(&queue);
+ if (ret) {
+ printf("Error destroying queue (non-empty)\n");
+ }
return 0;
}
```
結果:
```
==21457== HEAP SUMMARY:
==21457== in use at exit: 416 bytes in 2 blocks
==21457== total heap usage: 216 allocs, 214 frees, 9,488 bytes allocated
==21457==
==21457== LEAK SUMMARY:
==21457== definitely lost: 0 bytes in 0 blocks
==21457== indirectly lost: 0 bytes in 0 blocks
==21457== possibly lost: 288 bytes in 1 blocks
==21457== still reachable: 128 bytes in 1 blocks
==21457== suppressed: 0 bytes in 0 blocks
==21457== Rerun with --leak-check=full to see details of leaked memory
==21457==
==21457== Use --track-origins=yes to see where uninitialised values come from
==21457== For lists of detected and suppressed errors, rerun with: -s
==21457== ERROR SUMMARY: 601 errors from 5 contexts (suppressed: 0 from 0)
```
相比原本,又多釋放一個 block,但仍然有兩個 block 還沒釋放。
### 正確性
使 ThreadSanitizer 工具檢測 hazard pointer 與 RCU 兩個實作中,是否存在 data race。
> 一開始執行時會跳出錯誤訊息:
> ```
> FATAL: ThreadSanitizer: unexpected memory mapping
> ```
> 後來把 ASLR 關掉,就可以正常運行了。
>
> hazard pointer 與 RCU 皆正常執行,沒有跳出錯誤訊息
>
:::warning
後續使用 Helgrind 進行測試,檢測結果顯示有 data race 的情況發生。
:::
## TODO: 提出效能改進方案
### 呼叫 `free_pool` 的時機
前面提到,hazard pointer 機制會將 thread 要求釋放的節點存放於 retire list 裡,等待後續使用 `free_pool` 釋放。那麼,使用 `free_pool` 的時機是什麼呢?
很直觀地,我們會希望 retire list 裡的節點個數超過某個 threshold 時,再呼叫 `free_pool` 來清除所有節點的記憶體。為此我們需要先實作如何計算 retire list 裡的節點個數:增加下面兩行程式來追蹤 retire list 的節點個數。
```diff
static void insert_pool(struct lfq_ctx *ctx, struct lfq_node *node)
{
+ atomic_fetch_add(&ctx->count, 1);
atomic_store(&node->free_next, NULL);
struct lfq_node *old_tail = atomic_exchange(&ctx->fpt, node); /* seq_cst */
atomic_store(&old_tail->free_next, node);
}
static void free_pool(struct lfq_ctx *ctx, bool freeall)
{
bool old = 0;
if (!atomic_compare_exchange_strong(&ctx->is_freeing, &old, 1))
return;
for (int i = 0; i < MAX_FREE || freeall; i++) {
struct lfq_node *p = ctx->fph;
if ((!atomic_load(&p->can_free)) || (!atomic_load(&p->free_next)) ||
in_hp(ctx, (struct lfq_node *) p))
break;
ctx->fph = p->free_next;
+ atomic_fetch_sub(&ctx->count, 1);
free(p);
}
atomic_store(&ctx->is_freeing, false);
atomic_thread_fence(memory_order_seq_cst);
}
```
為了配合實驗,我在 `lfq_dequeue_tid` 中加入 `usleep(10)`,延長消費者存取該節點的時間,讓該節點被紀錄在 `ctx->HP[tid]` 的時間久一點,這樣做也變相讓 `safe_free` 在執行時,比較傾向將待釋放的節點放進 retire list,而不是直接被釋放掉。
接下來,我修改 `safe_free` 函式,讓他可以依照 retire list 裡的節點個數來決定要不要執行 `free_pool`。
```diff
static void safe_free(struct lfq_ctx *ctx, struct lfq_node *node)
{
if (atomic_load(&node->can_free) && !in_hp(ctx, node)) {
/* free is not thread-safe */
bool old = 0;
if (atomic_compare_exchange_strong(&ctx->is_freeing, &old, 1)) {
/* poison the pointer to detect use-after-free */
node->next = (void *) -1;
free(node); /* we got the lock; actually free */
atomic_store(&ctx->is_freeing, false);
atomic_thread_fence(memory_order_seq_cst);
} else { /* we did not get the lock; only add to a freelist */
insert_pool(ctx, node);
}
} else {
insert_pool(ctx, node);
}
printf("ctx->count = %d\n", ctx->count);
+ if (ctx->count > 20)
+ free_pool(ctx, true);
- free_pool(ctx, false);
}
```
> 使用 ThreadSanitizer 檢測修改過後的程式,發現加了這兩行判斷會導致 data race。
最後,分別比較 `free_pool` 在以下時機使用的效能:
* 情況一:每次 `safe_free` 都執行一次 `free_pool`
```
--------------------------------------------------------------------------------
n time(i) total(B) useful-heap(B) extra-heap(B) stacks(B)
--------------------------------------------------------------------------------
26 648,306 36,160 34,000 2,160 0
27 666,714 36,304 34,076 2,228 0
28 681,351 36,288 34,036 2,252 0
29 698,080 36,112 33,880 2,232 0
30 713,134 36,032 33,832 2,200 0
31 733,100 36,256 33,896 2,360 0
32 755,433 36,176 33,848 2,328 0
33 769,131 36,272 33,876 2,396 0
34 788,671 36,312 33,920 2,392 0
35 812,359 36,608 34,016 2,592 0
36 827,196 36,568 33,972 2,596 0
37 839,785 36,456 33,904 2,552 0
38 855,384 36,824 34,024 2,800 0
39 867,688 36,992 34,064 2,928 0
```
* 情況二:節點個數大於 20 時執行
```
--------------------------------------------------------------------------------
n time(i) total(B) useful-heap(B) extra-heap(B) stacks(B)
--------------------------------------------------------------------------------
36 807,658 36,800 34,232 2,568 0
37 832,040 37,048 34,300 2,748 0
38 844,492 37,360 34,416 2,944 0
```
* 情況三:不在 `safe_free` 裡執行 `free_pool`,等待程式運行結束才呼叫。
```
--------------------------------------------------------------------------------
n time(i) total(B) useful-heap(B) extra-heap(B) stacks(B)
--------------------------------------------------------------------------------
32 784,637 38,256 34,748 3,508 0
33 813,255 38,264 34,772 3,492 0
34 828,424 38,448 34,832 3,616 0
35 844,211 38,496 34,860 3,636 0
36 866,800 38,848 35,000 3,848 0
```
可以看到,情況一所需要的記憶體是最少的,因為執行 `free_pool` 的頻率最高,被要求釋放的節點很快就能獲得釋放,但也因為 `free_pool` 使用頻率最高,當 `free_pool` 執行成本很大時,這個情況下的程式運行成本也會隨之上升。反過來說,雖然情況三所需要的記憶體是最多的,但當 `free_pool` 執行成本很大時,這個情況下的程式運行成本反而不會那麼大。
而在〈[Lock-Free Data Structures with Hazard Pointers](http://erdani.org/publications/cuj-2004-12.pdf)〉提到:
> A good choice for $R$ is $(1 + k)H$, where $H$ is the number of hazard pointers and $k$ is some small positive constant, say $1/4$. In that case, each scan is guaranteed to delete $R − H$ node.
選擇一個比讀取端個數還大的值作為 threshold,能有效提高程式運行效率。假設今天有 100 個讀取端執行緒,如論文範例,將 threshold 設為 125。假設所有執行緒的 hazard pointer 各自為互斥集且其元素皆存在於 retire list,那此時呼叫 `safe_free`,就能確保每次都有 25 個共享資料能獲得釋放。這樣的設計在減少 `safe_free` 呼叫次數的同時,也提高 `safe_free` 的使用效率。
:::danger
換言之,你需要引入符合應用場景的實驗,分類探討。
:::
## 參考資料
* [Linux 核心專題: RCU 研究](https://hackmd.io/@sysprog/ry_6RHgS3)
* [並行程式設計: hazard pointer](https://hackmd.io/@sysprog/concurrency/%2F%40sysprog%2Fconcurrency-hazard-pointer#%E5%AF%A6%E4%BD%9C%E7%9A%84%E6%94%B9%E9%80%B2)
* [What is RCU, Fundamentally?](https://lwn.net/Articles/262464/)
* [userspace-rcu/doc/examples/rculfqueue/cds_lfq_dequeue.c](https://github.com/urcu/userspace-rcu/blob/master/doc/examples/rculfqueue/cds_lfq_dequeue.c)
* [userspace-rcu/urcu/static/rculfqueue.h](https://github.com/datacratic/userspace-rcu/blob/master/urcu/static/rculfqueue.h)
* [Lock-Free Data Structures with Hazard Pointers](http://erdani.org/publications/cuj-2004-12.pdf)