---
###### tags: `sysprog2021q1`
---
# 2021q3 Homework2 (quiz2)
contributed by <`93i7xo2`>
> Source: [quiz2](https://hackmd.io/@sysprog/linux2021-summer-quiz2)
# Lock-Free Data Structures with Hazard Pointers
> [Source](https://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.112.6406&rep=rep1&type=pdf)
本偏論文旨在提出一個 lock-free "Write-Rarely-Read-Many" maps,基於 [Lock-Free Data Structures](http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.98.3363) 提出的方法進行改良。
後者的設計是 writer 在寫入時 `map` 產生一份 `map` 副本,再修改副本對應的值;reader 讀取時增加型態為 `map` 指標的 reference count,相對的於結束時減少,為 0 時更新至新的 `map` 並刪除舊 `map`。之所以要用指標而非 `map` 本身,原因在於 `map` 大小不定而指標是固定的。
原本只要使用 CAS 操作 reference count,但為了保證不會遇到執行緒 A 判斷要刪除 `map`、而 B 正要讀取同一份 `map` 的情況,有幾種解決方式:
1. Wait & delete: 等待一段時間再進行刪除 `map`,無法保證該等待多久,糟糕的設計。
2. 使用 DCAS 將 reference count 與 `map` 進行關聯,使得在操作同樣 `map` 的執行緒知道 reference count 經過更動再重新嘗試。 DCAS 使用兩個不相鄰的 words ,然而實作 DCAS 的硬體非常少見且毫無效率。另一種是 CAS2 ,採用相鄰 words (應該是指 double-width compare-and-swap, [DWCAS](https://en.wikipedia.org/wiki/Double_compare-and-swap)),同樣也是缺乏實作。
雖然 DCAS 能用 CAS 合成,CAS 也有許多把戲,例如指標後方幾位元塞 reference count,但 Hazard Pointer 直接提出新的架構解決且**保證 wait-free**。
## Data structure
在 不考慮 compiler reordering 和 out-of-ouder execution 的前提下,Hazard Pointers 概念是:reader 將欲讀寫資料 `map` 使用 hazard pointer 包裝,加入與 writer 共享的 list (`pHead_`),此 list 內的元素只增不減,因此排除 ABA problem。writer 在複製一份副本 `map` 進行增減,將舊 `map` 放到所處執行緒的 retired list,待舊 `map` 累積至一定數量,執行緒再掃描將 retired list 內無 reader 使用的 `map` 刪除。
hazard pointer 結構如下:
```cpp
HPRecType {
static HPRecType *pHead_;
static int listLen_;
int active_;
HPRecType * pNext_;
public:
void * pHazard_;
}
```
- `pHead_` & `listLen_`: 紀錄 hazard pointer 及長度
- `pNext_`: 指向下一個 hazard pointer
- `active_`: 設計上為可回收使用,由 reader 結束時標記為 0 而不釋放,供下一個 reader 使用。
- `pHazard_`: 指向資料 `map` 的指標
`Scan` 用來釋放沒有 hazard pointer 指向的 `map`。
```cpp
void Scan(HPRecType * head){
// 複製一份 hazard pointer 內的 non-null ptr: pHazard_
vector<void*> hp;
while (head) {
void * p = head->pHazard_;
if (p) hp.push_back(p);
head = head->pNext_;
}
// 排序
// Complexity: O(n log n)
sort(hp.begin(), hp.end()), less<void*>());
// 走訪 retired list,若不存在於任何一份 hazard pointer 內則釋放
// 這邊因為刪除後 *i 會是 dangling pointer,於是替換成 rlist.back()
// Complexity: O(n log n)
vector<Map<K, V>*>::iterator i = rlist.begin();
while (i != rlist.end()) {
if(!binary_search(hp.begin(), hp.end(), *i)){
delete *i;
if (&*i != &rlist.back()){
*i = rlist.back();
}
rlist.pop_back();
}else{
++i;
}
}
}
```
`Acqurie` & `Release` 為 reader 建立及釋放 hazard pointer 的函式。
```cpp
static HPRecType *Acquire(){
HPRecType *p = pHead_;
for (;p;p = p->pNext_){
if (p->active_ || !CAS(&p->active_, 0, 1))
continue;
// 使用既有 hazard pointer
return p;
}
int oldLen;
do{
oldLen = listLen_;
}while(!CAS(&listLen_, oldLen, oldLen+1));
// 建立新的 hazard pointer
HPRecType *p = new HPRecType;
p->active_ = 1;
p->pHazard_ = 0;
do{
old = pHead_;
p->pNext_ = old;
}while(!CAS(&pHead_, old, p));
return p;
}
static void Release(HPRecType *p){
p->pHazard_ = 0;
p->active_ = 0;
}
```
當任一執行緒想釋放 `map`,呼叫 `Retire`。等到 retired list 的 old `map` 達到一定數量 `Retire` 才會呼叫 `Scan` 進行清理。
```cpp
static void Retire(Map<K, V> *pOld){
rlist.push_back(pOld);
if (rlist.size() >= R){
Scan(HPRecType::Head());
}
}
```
## WRRMMap 實作
利用 `HPRecType` 所提供的方法實作 `WRRMMap` 兩種操作 - `Update` & `Lookup`
```cpp
void Update(const K&k, const V&v){
// 替換 pMap_,將舊有 pOld 塞進 retired list
Map<K, V> *pNew = 0;
do {
Map<K, V> *pOld = pMap_;
delete pNew;
pNew = new Map<K, V>(*pOld);
(*pNew)[k] = v;
}while(!CAS(&pMap_, pOld, pNew));
Retire(pOld);
}
V Lookup(const K&k){
HPRecType *pRec = HPRecType::Acquire();
Map<K, V> *ptr;
do{
// writer 有可能將 map 加入 retired list
// 因 pRec->pHazard_ 尚未指向 map,writer 有可能沒看到 hazard pointer 而釋放掉 map
// 但只要確立 pMap_ = ptr,就能保證 hazard pointer 成功建立
// 使得 reader 能正常讀取 map
ptr = pMap_;
pRec->pHazard_ = ptr;
}while(pMap_ != ptr);
V result = (*ptr)[k];
// 釋放 hazard pointer
HPRecType::Release(pRec);
return result;
}
```
可能會好奇短短一行
```cpp
pRec->pHazard_ = pMap_;
```
為何要大費周章寫成
```cpp
do{
ptr = pMap_;
pRec->pHazard_ = ptr;
}while(pMap_ != ptr);
```
原因是只要保證指向 map 的 hazard pointer 建立,writer 就不可能在 `Retire` 時將其釋放,reader 可對其進行存取。另一方面 `ptr` 是 thread local variable,減少頻繁存取 `pRec->pHazard_` 的機會。
## `Scan` 的 wait-free 屬性
節錄 [wiki](https://en.wikipedia.org/wiki/Non-blocking_algorithm)
> An algorithm is **wait-free** if every operation has a bound on the number of steps the algorithm will take before the operation completes.
原文假設 `Scan` 使用 hashtable 來儲存 hazard pointer,使其複雜度降到 $O(R)$。$R$ 為 retired list 進行清理的臨界值,N 個 writer 情況下最多有 $N\cdot R$ 個需要刪除。挑選 $R=(1+k)H$,$H$ 為 `listLen` 也就是 hazard pointer 總量,定 $k=0.25$,這樣一來每次 `Scan` 只會有 $R-H$ 個被刪除。
`Scan` 也不需要仰賴其他執行緒的行為,本身而言也能在有限時間內完成(wait-free)。最後總結 `Lookup` 和 `Update` 都是 lock-free,reader 不干擾 writer ,而 writer 也能保證運行,更提到 `Lookup` 有 wait-free 的解法。
# A Pragmatic Implementation of Non-Blocking Linked-Lists
> [Source](https://www.cl.cam.ac.uk/research/srg/netos/papers/2001-caslists.pdf)
> Cited by 584
考慮一帶有 dummy node (`head`, `tail`) 的 ordered list,要進行 non-blocking insertion/deletion,每個 node 結構如下:
```cpp
class Node<KeyType>{
KeyType key;
Node *next;
}
```
- Insertion
先看 insertion,在 10 和 30 間想插入 20,得先將節點指向 30 (左),再將 10 的 next field 以 CAS 替換成節點 20 (右)。

- Deletion
在 `head` 和 30 間刪除 10,若以 CAS 檢查 `head.next == 10` 然後替換成 30 ,無法預防在 10 和 30 間有新節點 20 插入,從而導致節點遺失。

本篇論文提出新的演算法為了解決此問題,將欲刪除的節點標記(mark),稱作 logically delete,之後再進行刪除(physically delete),而插入時所找到的左右節點彼此不能為被標記(marked)。論文使用節點的 `next` 儲存狀態,在 pointer 對齊的狀態下可使用 next 的 LSB,mark/unmark 都是操作此 bit。
以下就論文提出的四種操作進行說明。
## Insert, Delete, Find and Search
考慮到一集合有三種操作
- Insert(k)
- Delete(k)
- Find(k)
三種操作的返回值代表成功與否,在這裡用 single linked list 呈現集合,而在 list 的實作中三種操作都會使用到 Search 方法。
### list 初始化
```cpp
class Node<KeyType>{
KeyType key;
Node *next;
Node (KeyType key){
this.key = key;
}
}
class List<KeyType> {
Node<KeyType> *head;
Node<KeyType> *tail;
List() {
head = new Node<KeyType>();
tail = new Node<KeyType>();
head.next = tail;
}
}
```
### Search
```cpp
private Node *List::search (KeyType search_key, Node **left_node);
```
`search` 返回 `left_node` 及 `right_node`,用來
- 在 `left_node` 及 `right_node` 間插入節點
或是
- 刪除 `right_node`
無論用途為何,需滿足以下條件
1. `left_node.key` $<$ `search_key` $\leq$ `right_node.key`
2. `left_node` 與 `right_node` 皆為 unmarked
3. `left_node` 相鄰 `right_node`
程式碼細分為 3 個 Stage 說明
- Stage 1:
- 從 `list->head` 找尋 `left_node`, `right_node`
- 分別為第 1、2 個遇到的 unmarked node
- `list->head`, `list->tail` 皆為 unmarked,因此最差情況就是返回 `head` & `tail`
- 限制 `search_key` $\leq$ `right_node.key`
```cpp
/* 1: Find left_node and right_node */
do {
if (!is_marked_reference(t_next)) {
(*left_node) = t;
left_node_next = t_next;
}
t = get_unmarked_reference(t_next);
if (t == tail)
break;
t_next = t.next;
} while (is_marked_reference(t_next) || (t.key < search_key)); /*B1*/
right_node = t;
```
- Stage 2:
- `search` 的另一個目的是找出相鄰 node,這一步是檢查是否相鄰。`right_node` 可能中途被其他 processer 標記 marked,因此再檢查一次。 `left_node` 的檢查,就留到使用到 `search` 的函式。
> Conditions Maintained by Search:
> ...Nodes never become unmarked and so we may deduce that the right node was unmarked at that earlier point.
```cpp
/* 2: Check nodes are adjacent */
if (left_node_next == right_node)
if ((right_node != tail) && is_marked_reference(right_node.next))
goto search_again; /*G1*/
else
return right_node; /*R1*/
```
- Stage 3:
- 若不相鄰,則將 `left_node` 與 `right_node` 間的 logically deleted 節點清除,作法是將 `left_node.next` 指向 `right_node`。
- 先檢查 `left_node.next` 是否有更改過,只要 `left_node.next` 不變就能保證先前檢查 `left_node` 和 `right_node` 間都是 logically deleted node,而可安心鏈結至 `right_node`
```cpp
CAS(&(*left_node)->next, left_node_next, right_node) 寫法似乎有誤?
```
- 之後仿照 Stage 2. 檢查 `right_node` 的狀態
```cpp
/* 3: Remove one or more marked nodes */
if (CAS(&(left_node.next), left_node_next, right_node)) /*C1*/
if ((right_node != tail) && is_marked_reference(right_node.next))
goto search_again; /*G2*/
else
return right_node; /*R2*/
```
Stage 1+2+3
```cpp=
private Node *List::search(KeyType search_key, Node **left_node) {
Node *left_node_next, *right_node;
search_again:
do {
Node *t = head;
Node *t_next = head.next;
/* 1: Find left_node and right_node */
do {
if (!is_marked_reference(t_next)) {
(*left_node) = t;
left_node_next = t_next;
}
t = get_unmarked_reference(t_next);
if (t == tail)
break;
t_next = t.next;
} while (is_marked_reference(t_next) || (t.key < search_key)); /*B1*/
right_node = t;
/* 2: Check nodes are adjacent */
if (left_node_next == right_node)
if ((right_node != tail) && is_marked_reference(right_node.next))
goto search_again; /*G1*/
else
return right_node; /*R1*/
/* 3: Remove one or more marked nodes */
if (CAS(&(left_node.next), left_node_next, right_node)) /*C1*/
if ((right_node != tail) && is_marked_reference(right_node.next))
goto search_again; /*G2*/
else
return right_node; /*R2*/
} while (true); /*B2*/
}
```
### Insert
在 `List::search` 找到的 `left_node`, `right_node` 間插入新節點,行 6 檢查右節點 `key` 是否與新節點重複
- 返回 false 代表重複插入
```cpp=
public boolean List::insert(KeyType key) {
Node *new_node = new Node(key);
Node *right_node, *left_node;
do {
right_node = search(key, &left_node);
if ((right_node != tail) && (right_node.key == key)) /*T1*/
return false;
new_node.next = right_node;
if (CAS(&(left_node.next), right_node, new_node)) /*C2*/
return true;
} while (true); /*B3*/
}
```
### Find
```cpp
public boolean List::find(KeyType search_key) {
Node *right_node, *left_node;
right_node = search(search_key, &left_node);
if ((right_node =! tail) || (right_node.key != search_key))
return false;
else
return true;
}
```
### Delete
返回 false 代表無對應 key 的節點存在
- 行 4-6 重複 `List::find` 尋找節點的過程
- 行 8-11 (C3) 先確定 `right_node` 非 logically deleted 再用 CAS 標記,與 `List::insert` 的 (C2) 一樣都是確保 `search` 之後獲得的節點在中途不會被其他 processor 所標記。
- 最後 (C4) 使用 CAS 將 `left_node` 連結到 `right_node_next` 完成移除(remove),可能成功或失敗。失敗原因之一可能如下
1. Thread A 想刪除 R、Thread B 想刪除 N1
```graphviz
digraph L {
rankdir=LR
node [style="record", shape="box"]
node1 [label="L"]
node2 [label="R"]
node3 [label="N1"]
node4 [label="N2"]
node1 -> node2 -> node3 -> node4
}
```
2. Thread A 想刪除 R、Thread B 想刪除 N1,各自標記對應節點
```graphviz
digraph L {
rankdir=LR
node [style="record", shape="box"]
node1 [label="L"]
node2 [label="R" fillcolor=gray, style="filled"]
node3 [label="N1" fillcolor=gray, style="filled"]
node4 [label="N2"]
node1 -> node2 -> node3 -> node4
}
```
3. 假設 Thread A 先成功執行 CAS,Thread B 因此執行 CAS 失敗
```graphviz
digraph L {
rankdir=LR
node [style="record", shape="box"]
node1 [label="L"]
node2 [label="R" fillcolor=gray, style="filled"]
node3 [label="N1" fillcolor=gray, style="filled"]
node4 [label="N2"]
node1 -> node2 -> node3 [color="none"]
node1 -> node3 -> node4
}
```
- **沒有在 `List::delete` 實作刪除節點 `R`**
4. Thread B 在 CAS 失敗後反而呼叫 `search`,在 (C1) 將 L1 連結至 N2
```graphviz
digraph L {
rankdir=LR
node [style="record", shape="box"]
node1 [label="L"]
node3 [label="N1" fillcolor=gray, style="filled"]
node4 [label="N2"]
node1 -> node3 -> node4 [color="none"]
node1 -> node4
}
```
- **沒有在 `List::search` 實作刪除節點 `N1`**
5. 綜合以上,我們能看到在 quiz2 範例程式碼中將欲刪除的 `right_node` 放進 retire list,補足 `List::delete` 和 `List::search` 沒有實作到的部份。論文並沒有針對這部份進行推演,而是用了其他方法證明正確性。
```cpp=
public boolean List::delete (KeyType search_key) {
Node *right_node, *right_node_next, *left_node;
do {
right_node = search(search_key, &left_node);
if ((right_node == tail) || (right_node.key != search_key)) /*T1*/
return false;
right_node_next = right_node.next;
if (!is_marked_reference(right_node_next))
if (CAS(&(right_node.next), /*C3*/
right_node_next, get_marked_reference(right_node_next)))
break;
} while (true); /*B4*/
if (!CAS(&(left_node.next), right_node, right_node_next)) /*C4*/
right_node = search(right_node.key, &left_node);
return true;
}
```
# quiz2 範例程式碼
> [Source](https://hackmd.io/@sysprog/linux2021-summer-quiz2)
基於上述兩篇論文,很清楚的看到 quiz2 是基於 Non-Blocking Linked-Lists 的實作,而在刪除節點時採用 Hazard Pointers 的作法。從 `list_find` 函數可以看出一些蹤跡。
### __list_find
`__list_find` 函數對應到 `List::search`,除了 `left node` 和 `right node` 移到引數,原函式外的 `right_node.key != search_key` 檢查也實作在函數內,返回 boolean。
```cpp=29
if (!(get_unmarked_node(curr)->key < *key)) {
*par_curr = curr;
*par_prev = prev;
*par_next = next;
return *key == get_unmarked_node(curr)->key;
}
```
而符號與原論文些許不同外,型態也有所不同,**但節點的狀態還是儲存在 `next` 欄位。** 基於 hazard pointer 的規則,用到的節點均以 `list_hp_protect_ptr` 保護。
Thesis | quiz2
-|-
left_node | prev
right_node | curr
t_next | next
我們將原函式拆成幾個部份探討
```cpp=
static bool __list_find(list_t *list,
list_key_t *key,
atomic_uintptr_t **par_prev,
list_node_t **par_curr,
list_node_t **par_next)
{
atomic_uintptr_t *prev = NULL;
list_node_t *curr = NULL, *next = NULL;
try_again:
```
初始化 `prev` 及 `curr`,想像 `prev` 是指位在 `head` 前面的 unmarked node 底下的 `next` 欄位,而 `curr` 指向 `head`。
```cpp=11
prev = &list->head;
curr = (list_node_t *) atomic_load(prev);
(void) list_hp_protect_ptr(list->hp, HP_CURR, (uintptr_t) curr);
if (atomic_load(prev) != get_unmarked(curr))
goto try_again;
```
14, 15 行的作用如在確保 `curr` 成功放入 hazard pointer,對應到如下的實作。之後多次見到此種用法。
```cpp
do{
ptr = pMap_;
pRec->pHazard_ = ptr;
}while(pMap_ != ptr);
```
再來找下一個 unmarked node
```cpp=16
while (true) {
if (!get_unmarked_node(curr))
return false;
next = (list_node_t *) atomic_load(&get_unmarked_node(curr)->next);
(void) list_hp_protect_ptr(list->hp, HP_NEXT, get_unmarked(next));
if (atomic_load(&get_unmarked_node(curr)->next) != (uintptr_t) next)
break;
if (get_unmarked(next) == atomic_load((atomic_uintptr_t *) &list->tail))
break;
if (atomic_load(prev) != get_unmarked(curr))
goto try_again;
```
- 21-22: `get_unmarked(next)` 沒放入 hazard pointer 則跳出迴圈,不懂這裡的邏輯 :confused:
- 23-24: 走到 `list->tail` 前一個節點跳出迴圈 :confused:
- 25-26: 確保 `curr` 放入 hazard pointer
:::info
21-24 行的問題後來由 RinHizakura 在 commit [83ef91](https://github.com/sysprog21/concurrent-programs/pull/8/) 修正:
```diff
@@ -230,14 +230,14 @@ try_again:
if (atomic_load(prev) != get_unmarked(curr))
goto try_again;
while (true) {
- if (!get_unmarked_node(curr))
- return false;
next = (list_node_t *) atomic_load(&get_unmarked_node(curr)->next);
(void) list_hp_protect_ptr(list->hp, HP_NEXT, get_unmarked(next));
+ /* On a CAS failure, the search function, "__list_find," will simply
+ * have to go backwards in the list until an unmarked element is found
+ * from which the search in increasing key order can be started.
+ */
if (atomic_load(&get_unmarked_node(curr)->next) != (uintptr_t) next)
- break;
- if (get_unmarked(next) == atomic_load((atomic_uintptr_t *) &list->tail))
- break;
+ goto try_again;
if (atomic_load(prev) != get_unmarked(curr))
goto try_again;
if (get_unmarked_node(next) == next) {
@@ -259,11 +259,6 @@ try_again:
curr = next;
(void) list_hp_protect_release(list->hp, HP_CURR, get_unmarked(next));
}
- *par_curr = curr;
- *par_prev = prev;
- *par_next = next;
-
- return false;
}
```
:::
當
```cpp=19
next = atomic_load(&get_unmarked_node(curr)->next)
```
是 unmarked,就確定 `curr` 是 unmarked node,加上 `prev` 總共有兩個 unmarked node。
緊接著 29~34 行檢查 `key`,若檢查沒過,繼續尋找下一個 unmarked node,同時將 `prev` 替換成 `curr` 這個最近發現的的 unmarked node。
```cpp=27
if (get_unmarked_node(next) == next) {
if (!(get_unmarked_node(curr)->key < *key)) {
*par_curr = curr;
*par_prev = prev;
*par_next = next;
return *key == get_unmarked_node(curr)->key;
}
prev = &get_unmarked_node(curr)->next;
(void) list_hp_protect_release(list->hp, HP_PREV,
get_unmarked(curr));
} else {
```
原論文的方法是將 `left_node` 連結至 `right_node` 並沒有刪除節點,這裡在遍歷的過程中逐一刪除節點。
```cpp=37
uintptr_t tmp = get_unmarked(curr);
if (!atomic_compare_exchange_strong(prev, &tmp, get_unmarked(next)))
goto try_again;
list_hp_retire(list->hp, get_unmarked(curr));
}
```
```cpp=44
curr = next;
(void) list_hp_protect_release(list->hp, HP_CURR, get_unmarked(next));
}
// ok
*par_curr = curr;
*par_prev = prev;
*par_next = next;
return false;
}
```
### list_insert
對照以下 Non-blocking Linked List 的實作
```cpp
public boolean List::insert(KeyType key) {
Node *new_node = new Node(key);
Node *right_node, *left_node;
do {
right_node = search(key, &left_node);
if ((right_node != tail) && (right_node.key == key)) /*T1*/
return false;
new_node.next = right_node;
if (CAS(&(left_node.next), right_node, new_node)) /*C2*/
return true;
} while (true); /*B3*/
}
```
不難發現 `list_insert` 其實就是 `List::insert` ,除了 `List:search` 被 `__list_find` 取代而稍做調整。
- quiz2 另外加入 `list_hp_clear()` 來釋放那些在 `__list_find` 加入 hazard pointer 的節點
```cpp=
bool list_insert(list_t *list, list_key_t key)
{
list_node_t *curr = NULL, *next = NULL;
atomic_uintptr_t *prev = NULL;
list_node_t *node = list_node_new(key);
while (true) {
if (__list_find(list, &key, &prev, &curr, &next)) {
list_node_destroy(node);
list_hp_clear(list->hp);
return false;
}
atomic_store_explicit(&node->next, (uintptr_t) curr,
memory_order_relaxed);
uintptr_t tmp = get_unmarked(curr);
// ensure curr is unmarked
if (atomic_compare_exchange_strong(prev, &tmp, (uintptr_t) node)) {
list_hp_clear(list->hp);
return true;
}
}
}
```
### list_delete
論文實作:
```cpp=
public boolean List::delete (KeyType search_key) {
Node *right_node, *right_node_next, *left_node;
do {
right_node = search(search_key, &left_node);
if ((right_node == tail) || (right_node.key != search_key)) /*T1*/
return false;
right_node_next = right_node.next;
if (!is_marked_reference(right_node_next))
if (CAS(&(right_node.next), /*C3*/
right_node_next, get_marked_reference(right_node_next)))
break;
} while (true); /*B4*/
if (!CAS(&(left_node.next), right_node, right_node_next)) /*C4*/
right_node = search(right_node.key, &left_node);
return true;
}
```
quiz2 實作:
```cpp=
bool list_delete(list_t *list, list_key_t key)
{
list_node_t *curr, *next;
atomic_uintptr_t *prev;
while (true) {
if (!__list_find(list, &key, &prev, &curr, &next)) {
list_hp_clear(list->hp);
return false;
}
uintptr_t tmp = get_unmarked(next);
if (!atomic_compare_exchange_strong(&curr->next, &tmp,
get_marked(next)))
continue;
tmp = get_unmarked(curr);
if (atomic_compare_exchange_strong(prev, &tmp, get_unmarked(next))) {
list_hp_clear(list->hp);
list_hp_retire(list->hp, get_unmarked(curr));
} else {
list_hp_clear(list->hp);
}
return true;
}
}
```
對照發現 quiz2 只是把原本迴圈外的移除節點的部份搬進迴圈,在移除成功時呼叫 `retire` 等待未來刪除節點。
```cpp=20
list_hp_retire(list->hp, get_unmarked(curr));
```
分成 2 個 stage:
- Stage 1: 標記 unmarked `right_node` (`curr`)
```graphviz
digraph R {
rankdir=LR
node [style="record", shape="box"]
node1 [label="prev"]
node2 [label="curr" fillcolor=gray, style="filled"]
node3 [label="next"]
node1 -> node2 -> node3
}
```
- original
```cpp=8
right_node_next = right_node.next;
if (!is_marked_reference(right_node_next))
if (CAS(&(right_node.next), /*C3*/
right_node_next, get_marked_reference(right_node_next)))
break;
```
- quiz2
```cpp=11
uintptr_t tmp = get_unmarked(next);
if (!atomic_compare_exchange_strong(&curr->next, &tmp,
get_marked(next)))
continue;
```
- Stage 2:
```graphviz
digraph R {
rankdir=LR
node [style="record", shape="box"]
node1 [label="prev"]
node2 [label="curr" fillcolor=gray, style="filled"]
node3 [label="next"]
node1 -> node2 -> node3 [color="none"]
node1 -> node3
}
```
- original
```cpp=13
if (!CAS(&(left_node.next), right_node, right_node_next)) /*C4*/
right_node = search(right_node.key, &left_node);
return true;
```
- quiz2
```cpp=17
tmp = get_unmarked(curr);
if (atomic_compare_exchange_strong(prev, &tmp, get_unmarked(next))) {
list_hp_clear(list->hp);
list_hp_retire(list->hp, get_unmarked(curr));
} else {
list_hp_clear(list->hp);
}
return true;
```
# 指出改進空間並著手實作
# 對比 rcu_list,解釋同為 lock-free 演算法,跟上述 Hazard pointer 手法有何異同?