--- ###### tags: `sysprog2021q1` --- # 2021q3 Homework2 (quiz2) contributed by <`93i7xo2`> > Source: [quiz2](https://hackmd.io/@sysprog/linux2021-summer-quiz2) # Lock-Free Data Structures with Hazard Pointers > [Source](https://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.112.6406&rep=rep1&type=pdf) 本偏論文旨在提出一個 lock-free "Write-Rarely-Read-Many" maps,基於 [Lock-Free Data Structures](http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.98.3363) 提出的方法進行改良。 後者的設計是 writer 在寫入時 `map` 產生一份 `map` 副本,再修改副本對應的值;reader 讀取時增加型態為 `map` 指標的 reference count,相對的於結束時減少,為 0 時更新至新的 `map` 並刪除舊 `map`。之所以要用指標而非 `map` 本身,原因在於 `map` 大小不定而指標是固定的。 原本只要使用 CAS 操作 reference count,但為了保證不會遇到執行緒 A 判斷要刪除 `map`、而 B 正要讀取同一份 `map` 的情況,有幾種解決方式: 1. Wait & delete: 等待一段時間再進行刪除 `map`,無法保證該等待多久,糟糕的設計。 2. 使用 DCAS 將 reference count 與 `map` 進行關聯,使得在操作同樣 `map` 的執行緒知道 reference count 經過更動再重新嘗試。 DCAS 使用兩個不相鄰的 words ,然而實作 DCAS 的硬體非常少見且毫無效率。另一種是 CAS2 ,採用相鄰 words (應該是指 double-width compare-and-swap, [DWCAS](https://en.wikipedia.org/wiki/Double_compare-and-swap)),同樣也是缺乏實作。 雖然 DCAS 能用 CAS 合成,CAS 也有許多把戲,例如指標後方幾位元塞 reference count,但 Hazard Pointer 直接提出新的架構解決且**保證 wait-free**。 ## Data structure 在 不考慮 compiler reordering 和 out-of-ouder execution 的前提下,Hazard Pointers 概念是:reader 將欲讀寫資料 `map` 使用 hazard pointer 包裝,加入與 writer 共享的 list (`pHead_`),此 list 內的元素只增不減,因此排除 ABA problem。writer 在複製一份副本 `map` 進行增減,將舊 `map` 放到所處執行緒的 retired list,待舊 `map` 累積至一定數量,執行緒再掃描將 retired list 內無 reader 使用的 `map` 刪除。 hazard pointer 結構如下: ```cpp HPRecType { static HPRecType *pHead_; static int listLen_; int active_; HPRecType * pNext_; public: void * pHazard_; } ``` - `pHead_` & `listLen_`: 紀錄 hazard pointer 及長度 - `pNext_`: 指向下一個 hazard pointer - `active_`: 設計上為可回收使用,由 reader 結束時標記為 0 而不釋放,供下一個 reader 使用。 - `pHazard_`: 指向資料 `map` 的指標 `Scan` 用來釋放沒有 hazard pointer 指向的 `map`。 ```cpp void Scan(HPRecType * head){ // 複製一份 hazard pointer 內的 non-null ptr: pHazard_ vector<void*> hp; while (head) { void * p = head->pHazard_; if (p) hp.push_back(p); head = head->pNext_; } // 排序 // Complexity: O(n log n) sort(hp.begin(), hp.end()), less<void*>()); // 走訪 retired list,若不存在於任何一份 hazard pointer 內則釋放 // 這邊因為刪除後 *i 會是 dangling pointer,於是替換成 rlist.back() // Complexity: O(n log n) vector<Map<K, V>*>::iterator i = rlist.begin(); while (i != rlist.end()) { if(!binary_search(hp.begin(), hp.end(), *i)){ delete *i; if (&*i != &rlist.back()){ *i = rlist.back(); } rlist.pop_back(); }else{ ++i; } } } ``` `Acqurie` & `Release` 為 reader 建立及釋放 hazard pointer 的函式。 ```cpp static HPRecType *Acquire(){ HPRecType *p = pHead_; for (;p;p = p->pNext_){ if (p->active_ || !CAS(&p->active_, 0, 1)) continue; // 使用既有 hazard pointer return p; } int oldLen; do{ oldLen = listLen_; }while(!CAS(&listLen_, oldLen, oldLen+1)); // 建立新的 hazard pointer HPRecType *p = new HPRecType; p->active_ = 1; p->pHazard_ = 0; do{ old = pHead_; p->pNext_ = old; }while(!CAS(&pHead_, old, p)); return p; } static void Release(HPRecType *p){ p->pHazard_ = 0; p->active_ = 0; } ``` 當任一執行緒想釋放 `map`,呼叫 `Retire`。等到 retired list 的 old `map` 達到一定數量 `Retire` 才會呼叫 `Scan` 進行清理。 ```cpp static void Retire(Map<K, V> *pOld){ rlist.push_back(pOld); if (rlist.size() >= R){ Scan(HPRecType::Head()); } } ``` ## WRRMMap 實作 利用 `HPRecType` 所提供的方法實作 `WRRMMap` 兩種操作 - `Update` & `Lookup` ```cpp void Update(const K&k, const V&v){ // 替換 pMap_,將舊有 pOld 塞進 retired list Map<K, V> *pNew = 0; do { Map<K, V> *pOld = pMap_; delete pNew; pNew = new Map<K, V>(*pOld); (*pNew)[k] = v; }while(!CAS(&pMap_, pOld, pNew)); Retire(pOld); } V Lookup(const K&k){ HPRecType *pRec = HPRecType::Acquire(); Map<K, V> *ptr; do{ // writer 有可能將 map 加入 retired list // 因 pRec->pHazard_ 尚未指向 map,writer 有可能沒看到 hazard pointer 而釋放掉 map // 但只要確立 pMap_ = ptr,就能保證 hazard pointer 成功建立 // 使得 reader 能正常讀取 map ptr = pMap_; pRec->pHazard_ = ptr; }while(pMap_ != ptr); V result = (*ptr)[k]; // 釋放 hazard pointer HPRecType::Release(pRec); return result; } ``` 可能會好奇短短一行 ```cpp pRec->pHazard_ = pMap_; ``` 為何要大費周章寫成 ```cpp do{ ptr = pMap_; pRec->pHazard_ = ptr; }while(pMap_ != ptr); ``` 原因是只要保證指向 map 的 hazard pointer 建立,writer 就不可能在 `Retire` 時將其釋放,reader 可對其進行存取。另一方面 `ptr` 是 thread local variable,減少頻繁存取 `pRec->pHazard_` 的機會。 ## `Scan` 的 wait-free 屬性 節錄 [wiki](https://en.wikipedia.org/wiki/Non-blocking_algorithm) > An algorithm is **wait-free** if every operation has a bound on the number of steps the algorithm will take before the operation completes. 原文假設 `Scan` 使用 hashtable 來儲存 hazard pointer,使其複雜度降到 $O(R)$。$R$ 為 retired list 進行清理的臨界值,N 個 writer 情況下最多有 $N\cdot R$ 個需要刪除。挑選 $R=(1+k)H$,$H$ 為 `listLen` 也就是 hazard pointer 總量,定 $k=0.25$,這樣一來每次 `Scan` 只會有 $R-H$ 個被刪除。 `Scan` 也不需要仰賴其他執行緒的行為,本身而言也能在有限時間內完成(wait-free)。最後總結 `Lookup` 和 `Update` 都是 lock-free,reader 不干擾 writer ,而 writer 也能保證運行,更提到 `Lookup` 有 wait-free 的解法。 # A Pragmatic Implementation of Non-Blocking Linked-Lists > [Source](https://www.cl.cam.ac.uk/research/srg/netos/papers/2001-caslists.pdf) > Cited by 584 考慮一帶有 dummy node (`head`, `tail`) 的 ordered list,要進行 non-blocking insertion/deletion,每個 node 結構如下: ```cpp class Node<KeyType>{ KeyType key; Node *next; } ``` - Insertion 先看 insertion,在 10 和 30 間想插入 20,得先將節點指向 30 (左),再將 10 的 next field 以 CAS 替換成節點 20 (右)。 ![](https://i.imgur.com/zaGMaSJ.png) - Deletion 在 `head` 和 30 間刪除 10,若以 CAS 檢查 `head.next == 10` 然後替換成 30 ,無法預防在 10 和 30 間有新節點 20 插入,從而導致節點遺失。 ![](https://i.imgur.com/kUFl6ud.png) 本篇論文提出新的演算法為了解決此問題,將欲刪除的節點標記(mark),稱作 logically delete,之後再進行刪除(physically delete),而插入時所找到的左右節點彼此不能為被標記(marked)。論文使用節點的 `next` 儲存狀態,在 pointer 對齊的狀態下可使用 next 的 LSB,mark/unmark 都是操作此 bit。 以下就論文提出的四種操作進行說明。 ## Insert, Delete, Find and Search 考慮到一集合有三種操作 - Insert(k) - Delete(k) - Find(k) 三種操作的返回值代表成功與否,在這裡用 single linked list 呈現集合,而在 list 的實作中三種操作都會使用到 Search 方法。 ### list 初始化 ```cpp class Node<KeyType>{ KeyType key; Node *next; Node (KeyType key){ this.key = key; } } class List<KeyType> { Node<KeyType> *head; Node<KeyType> *tail; List() { head = new Node<KeyType>(); tail = new Node<KeyType>(); head.next = tail; } } ``` ### Search ```cpp private Node *List::search (KeyType search_key, Node **left_node); ``` `search` 返回 `left_node` 及 `right_node`,用來 - 在 `left_node` 及 `right_node` 間插入節點 或是 - 刪除 `right_node` 無論用途為何,需滿足以下條件 1. `left_node.key` $<$ `search_key` $\leq$ `right_node.key` 2. `left_node` 與 `right_node` 皆為 unmarked 3. `left_node` 相鄰 `right_node` 程式碼細分為 3 個 Stage 說明 - Stage 1: - 從 `list->head` 找尋 `left_node`, `right_node` - 分別為第 1、2 個遇到的 unmarked node - `list->head`, `list->tail` 皆為 unmarked,因此最差情況就是返回 `head` & `tail` - 限制 `search_key` $\leq$ `right_node.key` ```cpp /* 1: Find left_node and right_node */ do { if (!is_marked_reference(t_next)) { (*left_node) = t; left_node_next = t_next; } t = get_unmarked_reference(t_next); if (t == tail) break; t_next = t.next; } while (is_marked_reference(t_next) || (t.key < search_key)); /*B1*/ right_node = t; ``` - Stage 2: - `search` 的另一個目的是找出相鄰 node,這一步是檢查是否相鄰。`right_node` 可能中途被其他 processer 標記 marked,因此再檢查一次。 `left_node` 的檢查,就留到使用到 `search` 的函式。 > Conditions Maintained by Search: > ...Nodes never become unmarked and so we may deduce that the right node was unmarked at that earlier point. ```cpp /* 2: Check nodes are adjacent */ if (left_node_next == right_node) if ((right_node != tail) && is_marked_reference(right_node.next)) goto search_again; /*G1*/ else return right_node; /*R1*/ ``` - Stage 3: - 若不相鄰,則將 `left_node` 與 `right_node` 間的 logically deleted 節點清除,作法是將 `left_node.next` 指向 `right_node`。 - 先檢查 `left_node.next` 是否有更改過,只要 `left_node.next` 不變就能保證先前檢查 `left_node` 和 `right_node` 間都是 logically deleted node,而可安心鏈結至 `right_node` ```cpp CAS(&(*left_node)->next, left_node_next, right_node) 寫法似乎有誤? ``` - 之後仿照 Stage 2. 檢查 `right_node` 的狀態 ```cpp /* 3: Remove one or more marked nodes */ if (CAS(&(left_node.next), left_node_next, right_node)) /*C1*/ if ((right_node != tail) && is_marked_reference(right_node.next)) goto search_again; /*G2*/ else return right_node; /*R2*/ ``` Stage 1+2+3 ```cpp= private Node *List::search(KeyType search_key, Node **left_node) { Node *left_node_next, *right_node; search_again: do { Node *t = head; Node *t_next = head.next; /* 1: Find left_node and right_node */ do { if (!is_marked_reference(t_next)) { (*left_node) = t; left_node_next = t_next; } t = get_unmarked_reference(t_next); if (t == tail) break; t_next = t.next; } while (is_marked_reference(t_next) || (t.key < search_key)); /*B1*/ right_node = t; /* 2: Check nodes are adjacent */ if (left_node_next == right_node) if ((right_node != tail) && is_marked_reference(right_node.next)) goto search_again; /*G1*/ else return right_node; /*R1*/ /* 3: Remove one or more marked nodes */ if (CAS(&(left_node.next), left_node_next, right_node)) /*C1*/ if ((right_node != tail) && is_marked_reference(right_node.next)) goto search_again; /*G2*/ else return right_node; /*R2*/ } while (true); /*B2*/ } ``` ### Insert 在 `List::search` 找到的 `left_node`, `right_node` 間插入新節點,行 6 檢查右節點 `key` 是否與新節點重複 - 返回 false 代表重複插入 ```cpp= public boolean List::insert(KeyType key) { Node *new_node = new Node(key); Node *right_node, *left_node; do { right_node = search(key, &left_node); if ((right_node != tail) && (right_node.key == key)) /*T1*/ return false; new_node.next = right_node; if (CAS(&(left_node.next), right_node, new_node)) /*C2*/ return true; } while (true); /*B3*/ } ``` ### Find ```cpp public boolean List::find(KeyType search_key) { Node *right_node, *left_node; right_node = search(search_key, &left_node); if ((right_node =! tail) || (right_node.key != search_key)) return false; else return true; } ``` ### Delete 返回 false 代表無對應 key 的節點存在 - 行 4-6 重複 `List::find` 尋找節點的過程 - 行 8-11 (C3) 先確定 `right_node` 非 logically deleted 再用 CAS 標記,與 `List::insert` 的 (C2) 一樣都是確保 `search` 之後獲得的節點在中途不會被其他 processor 所標記。 - 最後 (C4) 使用 CAS 將 `left_node` 連結到 `right_node_next` 完成移除(remove),可能成功或失敗。失敗原因之一可能如下 1. Thread A 想刪除 R、Thread B 想刪除 N1 ```graphviz digraph L { rankdir=LR node [style="record", shape="box"] node1 [label="L"] node2 [label="R"] node3 [label="N1"] node4 [label="N2"] node1 -> node2 -> node3 -> node4 } ``` 2. Thread A 想刪除 R、Thread B 想刪除 N1,各自標記對應節點 ```graphviz digraph L { rankdir=LR node [style="record", shape="box"] node1 [label="L"] node2 [label="R" fillcolor=gray, style="filled"] node3 [label="N1" fillcolor=gray, style="filled"] node4 [label="N2"] node1 -> node2 -> node3 -> node4 } ``` 3. 假設 Thread A 先成功執行 CAS,Thread B 因此執行 CAS 失敗 ```graphviz digraph L { rankdir=LR node [style="record", shape="box"] node1 [label="L"] node2 [label="R" fillcolor=gray, style="filled"] node3 [label="N1" fillcolor=gray, style="filled"] node4 [label="N2"] node1 -> node2 -> node3 [color="none"] node1 -> node3 -> node4 } ``` - **沒有在 `List::delete` 實作刪除節點 `R`** 4. Thread B 在 CAS 失敗後反而呼叫 `search`,在 (C1) 將 L1 連結至 N2 ```graphviz digraph L { rankdir=LR node [style="record", shape="box"] node1 [label="L"] node3 [label="N1" fillcolor=gray, style="filled"] node4 [label="N2"] node1 -> node3 -> node4 [color="none"] node1 -> node4 } ``` - **沒有在 `List::search` 實作刪除節點 `N1`** 5. 綜合以上,我們能看到在 quiz2 範例程式碼中將欲刪除的 `right_node` 放進 retire list,補足 `List::delete` 和 `List::search` 沒有實作到的部份。論文並沒有針對這部份進行推演,而是用了其他方法證明正確性。 ```cpp= public boolean List::delete (KeyType search_key) { Node *right_node, *right_node_next, *left_node; do { right_node = search(search_key, &left_node); if ((right_node == tail) || (right_node.key != search_key)) /*T1*/ return false; right_node_next = right_node.next; if (!is_marked_reference(right_node_next)) if (CAS(&(right_node.next), /*C3*/ right_node_next, get_marked_reference(right_node_next))) break; } while (true); /*B4*/ if (!CAS(&(left_node.next), right_node, right_node_next)) /*C4*/ right_node = search(right_node.key, &left_node); return true; } ``` # quiz2 範例程式碼 > [Source](https://hackmd.io/@sysprog/linux2021-summer-quiz2) 基於上述兩篇論文,很清楚的看到 quiz2 是基於 Non-Blocking Linked-Lists 的實作,而在刪除節點時採用 Hazard Pointers 的作法。從 `list_find` 函數可以看出一些蹤跡。 ### __list_find `__list_find` 函數對應到 `List::search`,除了 `left node` 和 `right node` 移到引數,原函式外的 `right_node.key != search_key` 檢查也實作在函數內,返回 boolean。 ```cpp=29 if (!(get_unmarked_node(curr)->key < *key)) { *par_curr = curr; *par_prev = prev; *par_next = next; return *key == get_unmarked_node(curr)->key; } ``` 而符號與原論文些許不同外,型態也有所不同,**但節點的狀態還是儲存在 `next` 欄位。** 基於 hazard pointer 的規則,用到的節點均以 `list_hp_protect_ptr` 保護。 Thesis | quiz2 -|- left_node | prev right_node | curr t_next | next 我們將原函式拆成幾個部份探討 ```cpp= static bool __list_find(list_t *list, list_key_t *key, atomic_uintptr_t **par_prev, list_node_t **par_curr, list_node_t **par_next) { atomic_uintptr_t *prev = NULL; list_node_t *curr = NULL, *next = NULL; try_again: ``` 初始化 `prev` 及 `curr`,想像 `prev` 是指位在 `head` 前面的 unmarked node 底下的 `next` 欄位,而 `curr` 指向 `head`。 ```cpp=11 prev = &list->head; curr = (list_node_t *) atomic_load(prev); (void) list_hp_protect_ptr(list->hp, HP_CURR, (uintptr_t) curr); if (atomic_load(prev) != get_unmarked(curr)) goto try_again; ``` 14, 15 行的作用如在確保 `curr` 成功放入 hazard pointer,對應到如下的實作。之後多次見到此種用法。 ```cpp do{ ptr = pMap_; pRec->pHazard_ = ptr; }while(pMap_ != ptr); ``` 再來找下一個 unmarked node ```cpp=16 while (true) { if (!get_unmarked_node(curr)) return false; next = (list_node_t *) atomic_load(&get_unmarked_node(curr)->next); (void) list_hp_protect_ptr(list->hp, HP_NEXT, get_unmarked(next)); if (atomic_load(&get_unmarked_node(curr)->next) != (uintptr_t) next) break; if (get_unmarked(next) == atomic_load((atomic_uintptr_t *) &list->tail)) break; if (atomic_load(prev) != get_unmarked(curr)) goto try_again; ``` - 21-22: `get_unmarked(next)` 沒放入 hazard pointer 則跳出迴圈,不懂這裡的邏輯 :confused: - 23-24: 走到 `list->tail` 前一個節點跳出迴圈 :confused: - 25-26: 確保 `curr` 放入 hazard pointer :::info 21-24 行的問題後來由 RinHizakura 在 commit [83ef91](https://github.com/sysprog21/concurrent-programs/pull/8/) 修正: ```diff @@ -230,14 +230,14 @@ try_again: if (atomic_load(prev) != get_unmarked(curr)) goto try_again; while (true) { - if (!get_unmarked_node(curr)) - return false; next = (list_node_t *) atomic_load(&get_unmarked_node(curr)->next); (void) list_hp_protect_ptr(list->hp, HP_NEXT, get_unmarked(next)); + /* On a CAS failure, the search function, "__list_find," will simply + * have to go backwards in the list until an unmarked element is found + * from which the search in increasing key order can be started. + */ if (atomic_load(&get_unmarked_node(curr)->next) != (uintptr_t) next) - break; - if (get_unmarked(next) == atomic_load((atomic_uintptr_t *) &list->tail)) - break; + goto try_again; if (atomic_load(prev) != get_unmarked(curr)) goto try_again; if (get_unmarked_node(next) == next) { @@ -259,11 +259,6 @@ try_again: curr = next; (void) list_hp_protect_release(list->hp, HP_CURR, get_unmarked(next)); } - *par_curr = curr; - *par_prev = prev; - *par_next = next; - - return false; } ``` ::: 當 ```cpp=19 next = atomic_load(&get_unmarked_node(curr)->next) ``` 是 unmarked,就確定 `curr` 是 unmarked node,加上 `prev` 總共有兩個 unmarked node。 緊接著 29~34 行檢查 `key`,若檢查沒過,繼續尋找下一個 unmarked node,同時將 `prev` 替換成 `curr` 這個最近發現的的 unmarked node。 ```cpp=27 if (get_unmarked_node(next) == next) { if (!(get_unmarked_node(curr)->key < *key)) { *par_curr = curr; *par_prev = prev; *par_next = next; return *key == get_unmarked_node(curr)->key; } prev = &get_unmarked_node(curr)->next; (void) list_hp_protect_release(list->hp, HP_PREV, get_unmarked(curr)); } else { ``` 原論文的方法是將 `left_node` 連結至 `right_node` 並沒有刪除節點,這裡在遍歷的過程中逐一刪除節點。 ```cpp=37 uintptr_t tmp = get_unmarked(curr); if (!atomic_compare_exchange_strong(prev, &tmp, get_unmarked(next))) goto try_again; list_hp_retire(list->hp, get_unmarked(curr)); } ``` ```cpp=44 curr = next; (void) list_hp_protect_release(list->hp, HP_CURR, get_unmarked(next)); } // ok *par_curr = curr; *par_prev = prev; *par_next = next; return false; } ``` ### list_insert 對照以下 Non-blocking Linked List 的實作 ```cpp public boolean List::insert(KeyType key) { Node *new_node = new Node(key); Node *right_node, *left_node; do { right_node = search(key, &left_node); if ((right_node != tail) && (right_node.key == key)) /*T1*/ return false; new_node.next = right_node; if (CAS(&(left_node.next), right_node, new_node)) /*C2*/ return true; } while (true); /*B3*/ } ``` 不難發現 `list_insert` 其實就是 `List::insert` ,除了 `List:search` 被 `__list_find` 取代而稍做調整。 - quiz2 另外加入 `list_hp_clear()` 來釋放那些在 `__list_find` 加入 hazard pointer 的節點 ```cpp= bool list_insert(list_t *list, list_key_t key) { list_node_t *curr = NULL, *next = NULL; atomic_uintptr_t *prev = NULL; list_node_t *node = list_node_new(key); while (true) { if (__list_find(list, &key, &prev, &curr, &next)) { list_node_destroy(node); list_hp_clear(list->hp); return false; } atomic_store_explicit(&node->next, (uintptr_t) curr, memory_order_relaxed); uintptr_t tmp = get_unmarked(curr); // ensure curr is unmarked if (atomic_compare_exchange_strong(prev, &tmp, (uintptr_t) node)) { list_hp_clear(list->hp); return true; } } } ``` ### list_delete 論文實作: ```cpp= public boolean List::delete (KeyType search_key) { Node *right_node, *right_node_next, *left_node; do { right_node = search(search_key, &left_node); if ((right_node == tail) || (right_node.key != search_key)) /*T1*/ return false; right_node_next = right_node.next; if (!is_marked_reference(right_node_next)) if (CAS(&(right_node.next), /*C3*/ right_node_next, get_marked_reference(right_node_next))) break; } while (true); /*B4*/ if (!CAS(&(left_node.next), right_node, right_node_next)) /*C4*/ right_node = search(right_node.key, &left_node); return true; } ``` quiz2 實作: ```cpp= bool list_delete(list_t *list, list_key_t key) { list_node_t *curr, *next; atomic_uintptr_t *prev; while (true) { if (!__list_find(list, &key, &prev, &curr, &next)) { list_hp_clear(list->hp); return false; } uintptr_t tmp = get_unmarked(next); if (!atomic_compare_exchange_strong(&curr->next, &tmp, get_marked(next))) continue; tmp = get_unmarked(curr); if (atomic_compare_exchange_strong(prev, &tmp, get_unmarked(next))) { list_hp_clear(list->hp); list_hp_retire(list->hp, get_unmarked(curr)); } else { list_hp_clear(list->hp); } return true; } } ``` 對照發現 quiz2 只是把原本迴圈外的移除節點的部份搬進迴圈,在移除成功時呼叫 `retire` 等待未來刪除節點。 ```cpp=20 list_hp_retire(list->hp, get_unmarked(curr)); ``` 分成 2 個 stage: - Stage 1: 標記 unmarked `right_node` (`curr`) ```graphviz digraph R { rankdir=LR node [style="record", shape="box"] node1 [label="prev"] node2 [label="curr" fillcolor=gray, style="filled"] node3 [label="next"] node1 -> node2 -> node3 } ``` - original ```cpp=8 right_node_next = right_node.next; if (!is_marked_reference(right_node_next)) if (CAS(&(right_node.next), /*C3*/ right_node_next, get_marked_reference(right_node_next))) break; ``` - quiz2 ```cpp=11 uintptr_t tmp = get_unmarked(next); if (!atomic_compare_exchange_strong(&curr->next, &tmp, get_marked(next))) continue; ``` - Stage 2: ```graphviz digraph R { rankdir=LR node [style="record", shape="box"] node1 [label="prev"] node2 [label="curr" fillcolor=gray, style="filled"] node3 [label="next"] node1 -> node2 -> node3 [color="none"] node1 -> node3 } ``` - original ```cpp=13 if (!CAS(&(left_node.next), right_node, right_node_next)) /*C4*/ right_node = search(right_node.key, &left_node); return true; ``` - quiz2 ```cpp=17 tmp = get_unmarked(curr); if (atomic_compare_exchange_strong(prev, &tmp, get_unmarked(next))) { list_hp_clear(list->hp); list_hp_retire(list->hp, get_unmarked(curr)); } else { list_hp_clear(list->hp); } return true; ``` # 指出改進空間並著手實作 # 對比 rcu_list,解釋同為 lock-free 演算法,跟上述 Hazard pointer 手法有何異同?