# 2021q3 Homework2 (hp)
contributed by < `mickey30606` >
## 程式運作原理
```cpp
static bool __list_find(list_t *list,
list_key_t *key,
atomic_uintptr_t **par_prev,
list_node_t **par_curr,
list_node_t **par_next)
{
atomic_uintptr_t *prev = NULL;
list_node_t *curr = NULL, *next = NULL;
try_again:
prev = &list->head;
curr = (list_node_t *) ATOMIC_LOAD(prev);
(void) list_hp_protect_ptr(list->hp, HP_CURR, (uintptr_t) curr);
if (ATOMIC_LOAD(prev) != get_unmarked(curr)) {
TRACE(retry);
goto try_again;
}
while (true) {
if (is_marked(curr))
TRACE(contains);
next = (list_node_t *) ATOMIC_LOAD(&get_unmarked_node(curr)->next);
(void) list_hp_protect_ptr(list->hp, HP_NEXT, get_unmarked(next));
/* On a CAS failure, the search function, "__list_find," will simply
* have to go backwards in the list until an unmarked element is found
* from which the search in increasing key order can be started.
*/
if (ATOMIC_LOAD(&get_unmarked_node(curr)->next) != (uintptr_t) next) {
TRACE(retry);
goto try_again;
}
if (ATOMIC_LOAD(prev) != get_unmarked(curr)) {
TRACE(retry);
goto try_again;
}
if (get_unmarked_node(next) == next) {
if (!(get_unmarked_node(curr)->key < *key)) {
*par_curr = curr;
*par_prev = prev;
*par_next = next;
return (get_unmarked_node(curr)->key == *key);
}
prev = &get_unmarked_node(curr)->next;
(void) list_hp_protect_release(list->hp, HP_PREV,
get_unmarked(curr));
} else {
uintptr_t tmp = get_unmarked(curr);
if (!CAS(prev, &tmp, get_unmarked(next))) {
TRACE(retry);
goto try_again;
}
list_hp_retire(list->hp, get_unmarked(curr));
}
curr = next;
(void) list_hp_protect_release(list->hp, HP_CURR, get_unmarked(next));
TRACE(traversal);
}
}
```
我們先來看看 `__list_find()` 函式,他的重點是要去尋找目標 key 的節點,並且保證 `prev` 、 `curr` 與 `curr` 、 `next` 這兩個組合『曾經』是連續的,而且 prev 與 curr 在找到的當下,是沒有標記的,這邊之所以會講曾經是因為在回傳之後, prev 中的值隨時都有可能被標記或是不是指向 curr 。這邊可以看到收先去讀取 prev 與 curr 的值,並且把 curr 的地址放入 hazard pointer 當中保護,由於 ```prev = &list->head``` 所以可以直接保證 ```*prev``` 必定存在於 list 當中,不必去保護。在保護 curr 完成的當下,去檢查 curr 是否仍然存在於 list 當中,如果要確認,就必須同時確認 prev 是否也還存在於 list 當中,所以我們可以看到檢查的程式碼是 ```atomic_load(prev) != get_unmarked(curr)``` ,如果 prev 中的值已經被標記了,就代表 prev 這個節點有可能已經不存在於 list 當中,所以跟當時讀到的值 curr 進行 unmarked 來確認 prev 中的值有沒有被標記 ,而確認沒有被標記後,當 ```*prev == curr``` 的時候,保證 curr 此時仍存在於 list 當中。在這邊存在於 list 當中是非常重要的一個確認,因為這完全保證了我目前找到的這個節點並沒有被放入 ```hp_retire()``` 當中在執行,因為在 ```list_delete()``` 當中可以看到,他是先把 curr->next 裡面的值標記起來,再把節點移出 list ,再進入 ```hp_retire()``` 中釋放節點,所以當我找到的節點被保護了之後,還存在於 list 當中,即可以保證 ```list_delete()``` 還沒有把該點放入 ```hp_retire()``` 當中,所以也不會存在如果在掃 hazard pointer 的當下放入的話,那我保護的節點會無效的問題。
對於下面找的 next 也是以同樣的方式來確認與保護,不過這邊就不去確認 curr->next 裡面的值有沒有被標記了。在下一行在一次去做 prev 與 curr 的檢查是真的必要嘛?這個狀況下可以分成以下三種:
1. prev and curr->next is marked:
這種狀況下,下一個 ```if(get_unmarked_node_next == next)``` 的兩種狀況都有可能會發生,所以當 next is marked 的話,在要把 curr 移出的 CAS 會失敗,並且 ```goto try_again``` ,當 next is unmarked 的話,如果 curr 是我要找的點,不論是 insert 或是 delete 都會失敗,並在一次呼叫 ```__list_find()``` ,如果 curr 不是我要找的點的話,再跑下一個點,最後如果能夠跑回存在於 list 當中的節點當然最好,但是若回傳的 prev 或 curr is marked 的話,不論是 insert/delete 都會失敗。
2. perv is marked but curr->next is unmarked:
在這種狀況下,還有分如果 curr 是我們要找的點與不是我們要找的點,如果不是我們要找的點的話,就直接接到下面的 ```prev = &get_unmarked_node(curr)->next;``` 而因為 curr 是存在於 list 當中的,所以在 assign 完之後也能保證 prev 也是在 list 中的。不過如果 curr 是我們要找的點的話,就會使我回傳之後的 CAS 沒有辦法成功,導致重新在進行 ```__list_find()``` ,這也是我們最不樂見的狀況。
3. prev is unmarked:
代表 prev 存在於 list 當中,可正常執行
經由上面的討論,我們可以發現,如果沒有這個檢查而且運氣有夠差,每次都跑進 prev and curr->next is marked 但是 next is unmarked (assign 完之後 curr->next 被標記) 時,會一直搜尋到被刪除的節點,倒致 ```__list_find()``` 的結果會與我們的預期有所差距。
不過看到下一行,有會有 curr->next != next 的問題,所以我們也來討論一下:
1. curr->next == next:
這就是我們期望的狀況。
2. curr->next != next and next is unmarked:
這時,有可能是 curr 被其他的 thread 進行標記了或是 curr 與 next 當中被 insert 了其他的節點,如果此時 curr 是我們要找的節點的話,對於 insert 來說是沒有差的,但在 delete 的話,第一個 CAS 就會被擋下來並且進行重找。而如果 curr 不是我們要的節點的話,下一個 prev 中的值就會是 marked 的了,這時候就會回到上面討論的狀況,若 prev 中的值是 marked 的處理。
看到這裡我們可以發現,前面對於 prev 與 curr 的檢查對於不直接去讀 curr->next 中的值的話會是有必要的,而且我們能夠理解到,如果 prev 有可能不存在於 list 當中的話,對於尋訪下一個節點我也不能夠保證他是存在於 list 當中的,而如果不在 list 當中的話,我就不能夠保證依據我們的 key 找到的 prev 、 curr 、 next 是有意義的。
乍看之下,對於那行 curr 與 prev 的判斷不是一瞬間就能夠看出他的重要性,而且又是在 next 進行保護即確認之後,回頭看 insert 與 delete 的 function 發現 next 是否存在於 list 當中,甚至是 next 是否被釋放掉似乎沒有那麼的重要。對於 insert 來說 next 根本無用武之地,對於 delete 來說,只是用來確認 curr 後面的節點是不是沒有變化而已,所以我們真的有需要去保護 next 嘛?
## 改進空間
將 ```___list_find()``` 進行改寫:
```cpp
static bool __list_find(list_t *list,
list_key_t *key,
atomic_uintptr_t **par_prev,
list_node_t **par_curr,
list_node_t **par_next)
{
atomic_uintptr_t *prev = NULL;
list_node_t *curr = NULL, *next = NULL;
try_again:
prev = &list->head;
curr = (list_node_t *) ATOMIC_LOAD(prev);
(void) list_hp_protect_ptr(list->hp, HP_CURR, (uintptr_t) curr);
if (atomic_load(prev) != get_unmarked(curr))
GOTO_TRY_AGAIN(tid(), OUT_SIDE_WHILE);
while (true) {
TRAVEL(tid());
KEEP_GOING(tid());
next = (list_node_t *) ATOMIC_LOAD(&get_unmarked_node(curr)->next);
if (atomic_load(prev) != get_unmarked(curr))
GOTO_TRY_AGAIN(tid(), NEXT_PROTECT_FAILED);
if (get_unmarked_node(next) == next) {
if (!(get_unmarked_node(curr)->key < *key)) {
*par_curr = curr;
*par_prev = prev;
*par_next = next;
return (get_unmarked_node(curr)->key == *key);
}
prev = &get_unmarked_node(curr)->next;
(void) list_hp_protect_release(list->hp, HP_PREV,
get_unmarked(curr));
} else {
uintptr_t tmp = get_unmarked(curr);
if (!CAS(prev, &tmp, get_unmarked(next)))
GOTO_TRY_AGAIN(tid(), DEL_MARKED_FAILED);
list_hp_retire(list->hp, get_unmarked(curr));
HELP_RETIRE(tid());
}
curr = (list_node_t *) ATOMIC_LOAD(prev);
(void) list_hp_protect_ptr(list->hp, HP_CURR, (uintptr_t) curr);
if (atomic_load(prev) != get_unmarked(curr))
GOTO_TRY_AGAIN(tid(), OUT_SIDE_WHILE);
}
}
```
經測試過後,程式可正常執行,而且可以將 hp_retire 中讀取 hazard pointer 的數量減少,使 hp_retire 的速度能更快。
將 ```hp_retire()``` 的函式改成使用 qsort 排序並使用 binary search 查找:
```cpp
int comp(const void *a, const void *b){
return *(uintptr_t *)a < *(uintptr_t *)b;
}
void list_hp_retire(list_hp_t *hp, uintptr_t ptr)
{
retirelist_t *rl = hp->rl[tid() * CLPAD];
rl->list[rl->size++] = ptr;
assert(rl->size < HP_MAX_RETIRED);
if (rl->size < HP_THRESHOLD_R)
return;
uintptr_t *hazard_pointer = calloc(HP_MAX_THREADS*HP_MAX_HPS, sizeof(uintptr_t));
size_t hp_size = 0;
bool can_delete = true;
for(int itid=0; itid < HP_MAX_THREADS; itid++){
//for(int ihp=hp->max_hps-1;ihp >=0; ihp--){
for(int ihp=1;ihp >=0; ihp--){
uintptr_t tmp = atomic_load(&hp->hp[itid][ihp]);
if(tmp){
hazard_pointer[hp_size++] = tmp;
}
}
}
qsort(hazard_pointer, hp_size, sizeof(uintptr_t), comp);
for(size_t iret=0; iret < rl->size; iret++){
uintptr_t obj = rl->list[iret];
long int middle, right = hp_size-1, left = 0;
while(left <= right){
middle = (right + left)/2;
if(hazard_pointer[middle] > obj){
left = middle + 1;
}else if(hazard_pointer[middle] < obj){
right = middle -1;
}else{
can_delete = false;
break;
}
}
if(can_delete){
size_t bytes = (rl->size - iret) * sizeof(rl->list[0]);
memmove(&rl->list[iret], &rl->list[iret + 1], bytes);
rl->size--;
hp->deletefunc((void*) obj);
RETIRE_TRACE(tid(), success);
}else{
RETIRE_TRACE(tid(), failed);
}
}
free(hazard_pointer);
}
```
依據論文 [Lock-Free Data Structures with Hazard Pointers](https://erdani.org/publications/cuj-2004-12.pdf) 的內容:
> A good choice for R is (1 + k)H, where H is the number of hazard pointers (listLen in our code, and equal to the number of readers in our example) and k is some small positive constant,
我們可以決定 threshold $R = (1 + k)H$ ,而 $k$ 是一個足夠小的正數,當我們取 $k = \frac{1}{4}$ 時, $R = (1+\frac{1}{4}) * (TotalThread * hpperThread)$ ,依據我的測試程式有 16 個 insert/delete thread 而每個 thread 都有兩個 hazardpointers 可以算出我的 $R = (1+
\frac{1}{4}) * 64 = 80$ 。這樣做的目的是為了在每次的 hp_retire 中,最少都可以刪掉 $R - H$ 數量的 node 。
測試運行 100 次並取 [retire times, retire success, retire failed] 平均:
```
[ 30.62 785.81 1000.98]
```
依據論文,每次最少都應該要成功釋放掉 $R-H = 24$ 個 node , $30.62 * (R - H) = 734.88 < 785.81$ 符合理論。
## 比對 rcu_list
先去大概了解 rcu 的原理,閱讀文章:[What is RCU, Fundamentally?](https://hackmd.io/@mickey30606/RCU)
程式分成 read-side 和 write-side 兩部份,對於 write-side 來說比較簡單,在要寫入的時候,把 `list->head` 使用 mutex lock ,等 push 完之後,在 mutex unlock 。而對於 read-side 來說,首先先 `rcu_read_lock()` , `rcu_read_lock()` 會建立一個 zombie node ,然後把這個 zombie push 入 `list->zombie_head` 中,然後開始讀取 list 尋找目標 node ,找到之後在 `rcu_read_unlock()` , `rcu_read_unlock()` 首先先去確定目前 `read_handle->zombie` 後面的 node->owner 是不是都是 NULL ,如果不是的話,就把自己的 `read_handle->zombie->owner = NULL` ,如果是的話,就逐一去釋放自己與在自己後面的 node 。不過我對 `rcu_read_unlock()` 的實做有些不解:
```cpp
void rcu_read_unlock(read_handle_t *handle)
{
zombie_node_t *z_node = handle->zombie;
zombie_node_t *cached_next;
__atomic_load(&z_node->next, &cached_next, __ATOMIC_SEQ_CST);
bool last = true;
/* walk through the zombie list to determine if this is the last active
* reader in the list.
*/
zombie_node_t *n = cached_next;
while (n) {
list_node_t *owner;
__atomic_load(&n->owner, &owner, __ATOMIC_SEQ_CST);
if (owner) {
last = false; /* this is not the last active reader */
break;
}
__atomic_load(&n->next, &n, __ATOMIC_SEQ_CST);
}
n = cached_next;
if (last) {
while (n) {
list_node_t *dead_node = n->zombie;
printf("%p\n", dead_node);
free(dead_node);
zombie_node_t *old_node = n;
__atomic_load(&n->next, &n, __ATOMIC_SEQ_CST);
free(old_node);
}
__atomic_store(&z_node->next, &n, __ATOMIC_SEQ_CST);
}
void *null = NULL;
__atomic_store(&z_node->owner, &null, __ATOMIC_SEQ_CST);
}
```
在 `free(dead_node)` 那邊,雖然程式裡面沒有去 assign 他在 zombie node 當中的值,不過在 read-side 的角度來看,他是怎麼知道那個 node 是可以 free 掉的?而且這個程式沒有在 thread 中使用 delete function ,也不知道實際的狀況會是如何。而且在這個函式中,如果運氣不好的話,是有可能 `list->zombie_head` 這條 linked list 是沒有辦法全部釋放完的。
這個程式在 write-side 那邊也是缺少寬限期的。不過換個角度想,如果持續有 read thread 的 zombie 寫入,那些 owner = NULL 的 node 遲早會被釋放掉的,而如果在 deletion 的時候也加入一個 zombie thread 代表版本的切換,這樣就可以達成在 release 版本之後所有 read-side 都已經離開時釋放節點了。
運作原理:
1. read-side: 以讀取的觀點來說很簡單,在要進入 critical section 之前,先創造一個 zombie node 並把 node push 入 zombie linked list head ,等 critical section 結束之後,從自己加入的 zombie node 順著 linked list 往後找,如果後面的 node 的 owner 都是 null 的話,就從自己後面的 node 全部釋放掉,最後將自己的 owner 設為 null 。
2. write-side:
對於 write-side 也必須要建立 zombie node 並加入節點,但是加入的時間點非常重要,以原本程式的實作,若是發生以下的狀況:
```graphviz
digraph p1{
rankdir=LR
node[shape=rect]
write, readA, readB
readB->readA->write
}
```
1. 假設 write thread 已經把 zombie node push 入 zombie list 中了,但是被 context switch 換成 readA thread 與 readB thread。
2. 而 readA and readB thread 在讀取的過程中被 context switch 回到 write thread 。
3. 此時 write thread 把節點移出 linked list ,並將 write zombie node 的 owner 設為 null ,再次 context switch 回 readA and readB thread 。
這時,雖然 readA and readB thread 的 zombie node 都在 write node 的 zombie thread 後面,但是他們原本讀取到的資料是舊的,若是此時 readA 先行完成,並且把 write zombie node 釋放掉的話,很有可能與 readB thread 形成 data race 。
1. push: 這個操作不需要去考慮到釋放節點的問題,所以只需要去確保對於 list head 的 atomic 與使用 mutex 來保護寫入就可以了。
2. delete: 如果要進行 delete 就要保證 zombie node push 進去的時間要比將 node 移出 linked list 還要晚,才能夠保證有正確的 grace peroid 。
實做:[Github](https://github.com/mickey30606/rcu_list)
測試部分結果:

修改內容:
1. 增加 ANALYZE 去紀錄 zombie list 的順序以及讀到的 head pointer 與 list 長度。
2. 將 writer mutex 的 scale 擴大避免 writer 的 zombie node 順序問題。
3. 新增 delete fucntion 可以刪除指定內容的節點。