Try   HackMD

2021q3 Homework2 (quiz2)

contributed by <93i7xo2>

Source: quiz2

Lock-Free Data Structures with Hazard Pointers

Source

本偏論文旨在提出一個 lock-free "Write-Rarely-Read-Many" maps,基於 Lock-Free Data Structures 提出的方法進行改良。

後者的設計是 writer 在寫入時 map 產生一份 map 副本,再修改副本對應的值;reader 讀取時增加型態為 map 指標的 reference count,相對的於結束時減少,為 0 時更新至新的 map 並刪除舊 map。之所以要用指標而非 map 本身,原因在於 map 大小不定而指標是固定的。

原本只要使用 CAS 操作 reference count,但為了保證不會遇到執行緒 A 判斷要刪除 map、而 B 正要讀取同一份 map 的情況,有幾種解決方式:

  1. Wait & delete: 等待一段時間再進行刪除 map,無法保證該等待多久,糟糕的設計。
  2. 使用 DCAS 將 reference count 與 map 進行關聯,使得在操作同樣 map 的執行緒知道 reference count 經過更動再重新嘗試。 DCAS 使用兩個不相鄰的 words ,然而實作 DCAS 的硬體非常少見且毫無效率。另一種是 CAS2 ,採用相鄰 words (應該是指 double-width compare-and-swap, DWCAS),同樣也是缺乏實作。

雖然 DCAS 能用 CAS 合成,CAS 也有許多把戲,例如指標後方幾位元塞 reference count,但 Hazard Pointer 直接提出新的架構解決且保證 wait-free

Data structure

在 不考慮 compiler reordering 和 out-of-ouder execution 的前提下,Hazard Pointers 概念是:reader 將欲讀寫資料 map 使用 hazard pointer 包裝,加入與 writer 共享的 list (pHead_),此 list 內的元素只增不減,因此排除 ABA problem。writer 在複製一份副本 map 進行增減,將舊 map 放到所處執行緒的 retired list,待舊 map 累積至一定數量,執行緒再掃描將 retired list 內無 reader 使用的 map 刪除。

hazard pointer 結構如下:

HPRecType {
    static HPRecType *pHead_;
    static int listLen_;
    int active_;
    HPRecType * pNext_;

public:
    void * pHazard_;
}
  • pHead_ & listLen_: 紀錄 hazard pointer 及長度
  • pNext_: 指向下一個 hazard pointer
  • active_: 設計上為可回收使用,由 reader 結束時標記為 0 而不釋放,供下一個 reader 使用。
  • pHazard_: 指向資料 map 的指標

Scan 用來釋放沒有 hazard pointer 指向的 map

void Scan(HPRecType * head){
    // 複製一份 hazard pointer 內的 non-null ptr: pHazard_
    vector<void*> hp;
    while (head) {
        void * p = head->pHazard_;
        if (p) hp.push_back(p);
        head = head->pNext_;
    }

    // 排序
    // Complexity: O(n log n)
    sort(hp.begin(), hp.end()), less<void*>());

    // 走訪 retired list,若不存在於任何一份 hazard pointer 內則釋放
    // 這邊因為刪除後 *i 會是 dangling pointer,於是替換成 rlist.back()
    // Complexity: O(n log n)
    vector<Map<K, V>*>::iterator i = rlist.begin();
    while (i != rlist.end()) {
        if(!binary_search(hp.begin(), hp.end(), *i)){
            delete *i;
            if (&*i != &rlist.back()){
                *i = rlist.back();
            }
            rlist.pop_back();
        }else{
            ++i;
        }
    }
}

Acqurie & Release 為 reader 建立及釋放 hazard pointer 的函式。

static HPRecType *Acquire(){
    HPRecType *p = pHead_;
    for (;p;p = p->pNext_){
        if (p->active_ || !CAS(&p->active_, 0, 1))
            continue;
        // 使用既有 hazard pointer
        return p;
    }

    int oldLen;
    do{
        oldLen = listLen_;
    }while(!CAS(&listLen_, oldLen, oldLen+1));
    
    // 建立新的 hazard pointer
    HPRecType *p = new HPRecType;
    p->active_ = 1;
    p->pHazard_ = 0;
    do{
        old = pHead_;
        p->pNext_ = old;
    }while(!CAS(&pHead_, old, p));
    return p;
}

static void Release(HPRecType *p){
    p->pHazard_ = 0;
    p->active_ = 0;
}

當任一執行緒想釋放 map,呼叫 Retire。等到 retired list 的 old map 達到一定數量 Retire 才會呼叫 Scan 進行清理。

static void Retire(Map<K, V> *pOld){
    rlist.push_back(pOld);
    if (rlist.size() >= R){
        Scan(HPRecType::Head());
    }
}

WRRMMap 實作

利用 HPRecType 所提供的方法實作 WRRMMap 兩種操作 - Update & Lookup

void Update(const K&k, const V&v){
    // 替換 pMap_,將舊有 pOld 塞進 retired list
    Map<K, V> *pNew = 0;
    do {
        Map<K, V> *pOld = pMap_;
        delete pNew;
        pNew = new Map<K, V>(*pOld);
        (*pNew)[k] = v;
    }while(!CAS(&pMap_, pOld, pNew));
    Retire(pOld);
}

V Lookup(const K&k){
    HPRecType *pRec = HPRecType::Acquire();
    Map<K, V> *ptr;
    do{
        // writer 有可能將 map 加入 retired list
        // 因 pRec->pHazard_ 尚未指向 map,writer 有可能沒看到 hazard pointer 而釋放掉 map
        // 但只要確立 pMap_ = ptr,就能保證 hazard pointer 成功建立
        // 使得 reader 能正常讀取 map
        ptr = pMap_;
        pRec->pHazard_ = ptr;
    }while(pMap_ != ptr);
    
    V result = (*ptr)[k];
    
    // 釋放 hazard pointer
    HPRecType::Release(pRec);
    return result;
}

可能會好奇短短一行

pRec->pHazard_ = pMap_;

為何要大費周章寫成

do{
    ptr = pMap_;
    pRec->pHazard_ = ptr;
}while(pMap_ != ptr);

原因是只要保證指向 map 的 hazard pointer 建立,writer 就不可能在 Retire 時將其釋放,reader 可對其進行存取。另一方面 ptr 是 thread local variable,減少頻繁存取 pRec->pHazard_ 的機會。

Scan 的 wait-free 屬性

節錄 wiki

An algorithm is wait-free if every operation has a bound on the number of steps the algorithm will take before the operation completes.

原文假設 Scan 使用 hashtable 來儲存 hazard pointer,使其複雜度降到

O(R)
R
為 retired list 進行清理的臨界值,N 個 writer 情況下最多有
NR
個需要刪除。挑選
R=(1+k)H
H
listLen 也就是 hazard pointer 總量,定
k=0.25
,這樣一來每次 Scan 只會有
RH
個被刪除。

Scan 也不需要仰賴其他執行緒的行為,本身而言也能在有限時間內完成(wait-free)。最後總結 LookupUpdate 都是 lock-free,reader 不干擾 writer ,而 writer 也能保證運行,更提到 Lookup 有 wait-free 的解法。

A Pragmatic Implementation of Non-Blocking Linked-Lists

Source
Cited by 584

考慮一帶有 dummy node (head, tail) 的 ordered list,要進行 non-blocking insertion/deletion,每個 node 結構如下:

class Node<KeyType>{
    KeyType key;
    Node *next;
}
  • Insertion
    先看 insertion,在 10 和 30 間想插入 20,得先將節點指向 30 (左),再將 10 的 next field 以 CAS 替換成節點 20 (右)。

    Image Not Showing Possible Reasons
    • The image file may be corrupted
    • The server hosting the image is unavailable
    • The image path is incorrect
    • The image format is not supported
    Learn More →

  • Deletion
    head 和 30 間刪除 10,若以 CAS 檢查 head.next == 10 然後替換成 30 ,無法預防在 10 和 30 間有新節點 20 插入,從而導致節點遺失。

本篇論文提出新的演算法為了解決此問題,將欲刪除的節點標記(mark),稱作 logically delete,之後再進行刪除(physically delete),而插入時所找到的左右節點彼此不能為被標記(marked)。論文使用節點的 next 儲存狀態,在 pointer 對齊的狀態下可使用 next 的 LSB,mark/unmark 都是操作此 bit。

以下就論文提出的四種操作進行說明。

考慮到一集合有三種操作

  • Insert(k)
  • Delete(k)
  • Find(k)

三種操作的返回值代表成功與否,在這裡用 single linked list 呈現集合,而在 list 的實作中三種操作都會使用到 Search 方法。

list 初始化

class Node<KeyType>{
    KeyType key;
    Node *next;
    
    Node (KeyType key){
        this.key = key;
    }
}

class List<KeyType> {
    Node<KeyType> *head;
    Node<KeyType> *tail;

    List() {
        head = new Node<KeyType>();
        tail = new Node<KeyType>();
        head.next = tail;
    }
}
private Node *List::search (KeyType search_key, Node **left_node);

search 返回 left_noderight_node,用來

  • left_noderight_node 間插入節點

或是

  • 刪除 right_node

無論用途為何,需滿足以下條件

  1. left_node.key
    <
    search_key
    right_node.key
  2. left_noderight_node 皆為 unmarked
  3. left_node 相鄰 right_node

程式碼細分為 3 個 Stage 說明

  • Stage 1:

    • list->head 找尋 left_node, right_node
      • 分別為第 1、2 個遇到的 unmarked node
      • list->head, list->tail 皆為 unmarked,因此最差情況就是返回 head & tail
    • 限制 search_key
      right_node.key
    ​​​​/* 1: Find left_node and right_node */
    ​​​​do {
    ​​​​  if (!is_marked_reference(t_next)) {
    ​​​​    (*left_node) = t;
    ​​​​    left_node_next = t_next;
    ​​​​  }
    ​​​​  t = get_unmarked_reference(t_next);
    ​​​​  if (t == tail)
    ​​​​    break;
    ​​​​  t_next = t.next;
    ​​​​} while (is_marked_reference(t_next) || (t.key < search_key)); /*B1*/
    ​​​​right_node = t;
    
  • Stage 2:

    • search 的另一個目的是找出相鄰 node,這一步是檢查是否相鄰。right_node 可能中途被其他 processer 標記 marked,因此再檢查一次。 left_node 的檢查,就留到使用到 search 的函式。

      Conditions Maintained by Search:
      Nodes never become unmarked and so we may deduce that the right node was unmarked at that earlier point.

    ​​​​/* 2: Check nodes are adjacent */
    ​​​​if (left_node_next == right_node)
    ​​​​  if ((right_node != tail) && is_marked_reference(right_node.next))
    ​​​​    goto search_again; /*G1*/
    ​​​​  else
    ​​​​    return right_node; /*R1*/
    
  • Stage 3:

    • 若不相鄰,則將 left_noderight_node 間的 logically deleted 節點清除,作法是將 left_node.next 指向 right_node

      • 先檢查 left_node.next 是否有更改過,只要 left_node.next 不變就能保證先前檢查 left_noderight_node 間都是 logically deleted node,而可安心鏈結至 right_node
        ​​​​​​​​​​​​CAS(&(*left_node)->next, left_node_next, right_node) 寫法似乎有誤?
        
    • 之後仿照 Stage 2. 檢查 right_node 的狀態

    ​​​​/* 3: Remove one or more marked nodes */
    ​​​​if (CAS(&(left_node.next), left_node_next, right_node)) /*C1*/
    ​​​​  if ((right_node != tail) && is_marked_reference(right_node.next))
    ​​​​    goto search_again; /*G2*/
    ​​​​  else
    ​​​​    return right_node; /*R2*/
    

Stage 1+2+3

private Node *List::search(KeyType search_key, Node **left_node) { Node *left_node_next, *right_node; search_again: do { Node *t = head; Node *t_next = head.next; /* 1: Find left_node and right_node */ do { if (!is_marked_reference(t_next)) { (*left_node) = t; left_node_next = t_next; } t = get_unmarked_reference(t_next); if (t == tail) break; t_next = t.next; } while (is_marked_reference(t_next) || (t.key < search_key)); /*B1*/ right_node = t; /* 2: Check nodes are adjacent */ if (left_node_next == right_node) if ((right_node != tail) && is_marked_reference(right_node.next)) goto search_again; /*G1*/ else return right_node; /*R1*/ /* 3: Remove one or more marked nodes */ if (CAS(&(left_node.next), left_node_next, right_node)) /*C1*/ if ((right_node != tail) && is_marked_reference(right_node.next)) goto search_again; /*G2*/ else return right_node; /*R2*/ } while (true); /*B2*/ }

Insert

List::search 找到的 left_node, right_node 間插入新節點,行 6 檢查右節點 key 是否與新節點重複

  • 返回 false 代表重複插入
public boolean List::insert(KeyType key) { Node *new_node = new Node(key); Node *right_node, *left_node; do { right_node = search(key, &left_node); if ((right_node != tail) && (right_node.key == key)) /*T1*/ return false; new_node.next = right_node; if (CAS(&(left_node.next), right_node, new_node)) /*C2*/ return true; } while (true); /*B3*/ }

Find

public boolean List::find(KeyType search_key) {
  Node *right_node, *left_node;
  right_node = search(search_key, &left_node);
  if ((right_node =! tail) || (right_node.key != search_key))
    return false;
  else
    return true;
}

Delete

返回 false 代表無對應 key 的節點存在

  • 行 4-6 重複 List::find 尋找節點的過程
  • 行 8-11 (C3) 先確定 right_node 非 logically deleted 再用 CAS 標記,與 List::insert 的 (C2) 一樣都是確保 search 之後獲得的節點在中途不會被其他 processor 所標記。
  • 最後 (C4) 使用 CAS 將 left_node 連結到 right_node_next 完成移除(remove),可能成功或失敗。失敗原因之一可能如下
    1. Thread A 想刪除 R、Thread B 想刪除 N1
      
      
      
      
      
      
      L
      
      
      
      node1
      
      L
      
      
      
      node2
      
      R
      
      
      
      node1->node2
      
      
      
      
      
      node3
      
      N1
      
      
      
      node2->node3
      
      
      
      
      
      node4
      
      N2
      
      
      
      node3->node4
      
      
      
      
      
      
    2. Thread A 想刪除 R、Thread B 想刪除 N1,各自標記對應節點
      
      
      
      
      
      
      L
      
      
      
      node1
      
      L
      
      
      
      node2
      
      R
      
      
      
      node1->node2
      
      
      
      
      
      node3
      
      N1
      
      
      
      node2->node3
      
      
      
      
      
      node4
      
      N2
      
      
      
      node3->node4
      
      
      
      
      
      
    3. 假設 Thread A 先成功執行 CAS,Thread B 因此執行 CAS 失敗
      
      
      
      
      
      
      L
      
      
      
      node1
      
      L
      
      
      
      node2
      
      R
      
      
      
      node1->node2
      
      
      
      
      
      node3
      
      N1
      
      
      
      node1->node3
      
      
      
      
      
      node2->node3
      
      
      
      
      
      node4
      
      N2
      
      
      
      node3->node4
      
      
      
      
      
      
      • 沒有在 List::delete 實作刪除節點 R
    4. Thread B 在 CAS 失敗後反而呼叫 search,在 (C1) 將 L1 連結至 N2
      
      
      
      
      
      
      L
      
      
      
      node1
      
      L
      
      
      
      node3
      
      N1
      
      
      
      node1->node3
      
      
      
      
      
      node4
      
      N2
      
      
      
      node1->node4
      
      
      
      
      
      node3->node4
      
      
      
      
      
      
      • 沒有在 List::search 實作刪除節點 N1
    5. 綜合以上,我們能看到在 quiz2 範例程式碼中將欲刪除的 right_node 放進 retire list,補足 List::deleteList::search 沒有實作到的部份。論文並沒有針對這部份進行推演,而是用了其他方法證明正確性。
public boolean List::delete (KeyType search_key) { Node *right_node, *right_node_next, *left_node; do { right_node = search(search_key, &left_node); if ((right_node == tail) || (right_node.key != search_key)) /*T1*/ return false; right_node_next = right_node.next; if (!is_marked_reference(right_node_next)) if (CAS(&(right_node.next), /*C3*/ right_node_next, get_marked_reference(right_node_next))) break; } while (true); /*B4*/ if (!CAS(&(left_node.next), right_node, right_node_next)) /*C4*/ right_node = search(right_node.key, &left_node); return true; }

quiz2 範例程式碼

Source

基於上述兩篇論文,很清楚的看到 quiz2 是基於 Non-Blocking Linked-Lists 的實作,而在刪除節點時採用 Hazard Pointers 的作法。從 list_find 函數可以看出一些蹤跡。

__list_find

__list_find 函數對應到 List::search,除了 left noderight node 移到引數,原函式外的 right_node.key != search_key 檢查也實作在函數內,返回 boolean。

if (!(get_unmarked_node(curr)->key < *key)) { *par_curr = curr; *par_prev = prev; *par_next = next; return *key == get_unmarked_node(curr)->key; }

而符號與原論文些許不同外,型態也有所不同,但節點的狀態還是儲存在 next 欄位。 基於 hazard pointer 的規則,用到的節點均以 list_hp_protect_ptr 保護。

Thesis quiz2
left_node prev
right_node curr
t_next next

我們將原函式拆成幾個部份探討

static bool __list_find(list_t *list, list_key_t *key, atomic_uintptr_t **par_prev, list_node_t **par_curr, list_node_t **par_next) { atomic_uintptr_t *prev = NULL; list_node_t *curr = NULL, *next = NULL; try_again:

初始化 prevcurr,想像 prev 是指位在 head 前面的 unmarked node 底下的 next 欄位,而 curr 指向 head

prev = &list->head; curr = (list_node_t *) atomic_load(prev); (void) list_hp_protect_ptr(list->hp, HP_CURR, (uintptr_t) curr); if (atomic_load(prev) != get_unmarked(curr)) goto try_again;

14, 15 行的作用如在確保 curr 成功放入 hazard pointer,對應到如下的實作。之後多次見到此種用法。

do{
    ptr = pMap_;
    pRec->pHazard_ = ptr;
}while(pMap_ != ptr);

再來找下一個 unmarked node

while (true) { if (!get_unmarked_node(curr)) return false; next = (list_node_t *) atomic_load(&get_unmarked_node(curr)->next); (void) list_hp_protect_ptr(list->hp, HP_NEXT, get_unmarked(next)); if (atomic_load(&get_unmarked_node(curr)->next) != (uintptr_t) next) break; if (get_unmarked(next) == atomic_load((atomic_uintptr_t *) &list->tail)) break; if (atomic_load(prev) != get_unmarked(curr)) goto try_again;
  • 21-22: get_unmarked(next) 沒放入 hazard pointer 則跳出迴圈,不懂這裡的邏輯
    Image Not Showing Possible Reasons
    • The image file may be corrupted
    • The server hosting the image is unavailable
    • The image path is incorrect
    • The image format is not supported
    Learn More →
  • 23-24: 走到 list->tail 前一個節點跳出迴圈
    Image Not Showing Possible Reasons
    • The image file may be corrupted
    • The server hosting the image is unavailable
    • The image path is incorrect
    • The image format is not supported
    Learn More →
  • 25-26: 確保 curr 放入 hazard pointer

21-24 行的問題後來由 RinHizakura 在 commit 83ef91 修正:

@@ -230,14 +230,14 @@ try_again:
     if (atomic_load(prev) != get_unmarked(curr))
         goto try_again;
     while (true) {
-        if (!get_unmarked_node(curr))
-            return false;
         next = (list_node_t *) atomic_load(&get_unmarked_node(curr)->next);
         (void) list_hp_protect_ptr(list->hp, HP_NEXT, get_unmarked(next));
+        /* On a CAS failure, the search function, "__list_find," will simply
+         * have to go backwards in the list until an unmarked element is found
+         * from which the search in increasing key order can be started.
+         */
         if (atomic_load(&get_unmarked_node(curr)->next) != (uintptr_t) next)
-            break;
-        if (get_unmarked(next) == atomic_load((atomic_uintptr_t *) &list->tail))
-            break;
+            goto try_again;
         if (atomic_load(prev) != get_unmarked(curr))
             goto try_again;
         if (get_unmarked_node(next) == next) {
@@ -259,11 +259,6 @@ try_again:
         curr = next;
         (void) list_hp_protect_release(list->hp, HP_CURR, get_unmarked(next));
     }
-    *par_curr = curr;
-    *par_prev = prev;
-    *par_next = next;
-
-    return false;
 }

next = atomic_load(&get_unmarked_node(curr)->next)

是 unmarked,就確定 curr 是 unmarked node,加上 prev 總共有兩個 unmarked node。
緊接著 29~34 行檢查 key,若檢查沒過,繼續尋找下一個 unmarked node,同時將 prev 替換成 curr 這個最近發現的的 unmarked node。

if (get_unmarked_node(next) == next) { if (!(get_unmarked_node(curr)->key < *key)) { *par_curr = curr; *par_prev = prev; *par_next = next; return *key == get_unmarked_node(curr)->key; } prev = &get_unmarked_node(curr)->next; (void) list_hp_protect_release(list->hp, HP_PREV, get_unmarked(curr)); } else {

原論文的方法是將 left_node 連結至 right_node 並沒有刪除節點,這裡在遍歷的過程中逐一刪除節點。

uintptr_t tmp = get_unmarked(curr); if (!atomic_compare_exchange_strong(prev, &tmp, get_unmarked(next))) goto try_again; list_hp_retire(list->hp, get_unmarked(curr)); }
curr = next; (void) list_hp_protect_release(list->hp, HP_CURR, get_unmarked(next)); } // ok *par_curr = curr; *par_prev = prev; *par_next = next; return false; }

list_insert

對照以下 Non-blocking Linked List 的實作

public boolean List::insert(KeyType key) {
  Node *new_node = new Node(key);
  Node *right_node, *left_node;
  do {
    right_node = search(key, &left_node);
    if ((right_node != tail) && (right_node.key == key)) /*T1*/
      return false;
    new_node.next = right_node;
    if (CAS(&(left_node.next), right_node, new_node)) /*C2*/
      return true;
  } while (true); /*B3*/
}

不難發現 list_insert 其實就是 List::insert ,除了 List:search__list_find 取代而稍做調整。

  • quiz2 另外加入 list_hp_clear() 來釋放那些在 __list_find 加入 hazard pointer 的節點
bool list_insert(list_t *list, list_key_t key) { list_node_t *curr = NULL, *next = NULL; atomic_uintptr_t *prev = NULL; list_node_t *node = list_node_new(key); while (true) { if (__list_find(list, &key, &prev, &curr, &next)) { list_node_destroy(node); list_hp_clear(list->hp); return false; } atomic_store_explicit(&node->next, (uintptr_t) curr, memory_order_relaxed); uintptr_t tmp = get_unmarked(curr); // ensure curr is unmarked if (atomic_compare_exchange_strong(prev, &tmp, (uintptr_t) node)) { list_hp_clear(list->hp); return true; } } }

list_delete

論文實作:

public boolean List::delete (KeyType search_key) { Node *right_node, *right_node_next, *left_node; do { right_node = search(search_key, &left_node); if ((right_node == tail) || (right_node.key != search_key)) /*T1*/ return false; right_node_next = right_node.next; if (!is_marked_reference(right_node_next)) if (CAS(&(right_node.next), /*C3*/ right_node_next, get_marked_reference(right_node_next))) break; } while (true); /*B4*/ if (!CAS(&(left_node.next), right_node, right_node_next)) /*C4*/ right_node = search(right_node.key, &left_node); return true; }

quiz2 實作:

bool list_delete(list_t *list, list_key_t key) { list_node_t *curr, *next; atomic_uintptr_t *prev; while (true) { if (!__list_find(list, &key, &prev, &curr, &next)) { list_hp_clear(list->hp); return false; } uintptr_t tmp = get_unmarked(next); if (!atomic_compare_exchange_strong(&curr->next, &tmp, get_marked(next))) continue; tmp = get_unmarked(curr); if (atomic_compare_exchange_strong(prev, &tmp, get_unmarked(next))) { list_hp_clear(list->hp); list_hp_retire(list->hp, get_unmarked(curr)); } else { list_hp_clear(list->hp); } return true; } }

對照發現 quiz2 只是把原本迴圈外的移除節點的部份搬進迴圈,在移除成功時呼叫 retire 等待未來刪除節點。

list_hp_retire(list->hp, get_unmarked(curr));

分成 2 個 stage:

  • Stage 1: 標記 unmarked right_node (curr)

    
    
    
    
    
    
    R
    
    
    
    node1
    
    prev
    
    
    
    node2
    
    curr
    
    
    
    node1->node2
    
    
    
    
    
    node3
    
    next
    
    
    
    node2->node3
    
    
    
    
    
    
    • original
    ​​​​ right_node_next = right_node.next; ​​​​ if (!is_marked_reference(right_node_next)) ​​​​ if (CAS(&(right_node.next), /*C3*/ ​​​​ right_node_next, get_marked_reference(right_node_next))) ​​​​ break;
    • quiz2
    ​​​​ uintptr_t tmp = get_unmarked(next); ​​​​ if (!atomic_compare_exchange_strong(&curr->next, &tmp, ​​​​ get_marked(next))) ​​​​ continue;
  • Stage 2:

    
    
    
    
    
    
    R
    
    
    
    node1
    
    prev
    
    
    
    node2
    
    curr
    
    
    
    node1->node2
    
    
    
    
    
    node3
    
    next
    
    
    
    node1->node3
    
    
    
    
    
    node2->node3
    
    
    
    
    
    
    • original
    ​​​​if (!CAS(&(left_node.next), right_node, right_node_next)) /*C4*/ ​​​​ right_node = search(right_node.key, &left_node); ​​​​return true;
    • quiz2
    ​​​​ tmp = get_unmarked(curr); ​​​​ if (atomic_compare_exchange_strong(prev, &tmp, get_unmarked(next))) { ​​​​ list_hp_clear(list->hp); ​​​​ list_hp_retire(list->hp, get_unmarked(curr)); ​​​​ } else { ​​​​ list_hp_clear(list->hp); ​​​​ } ​​​​ return true;

指出改進空間並著手實作

對比 rcu_list,解釋同為 lock-free 演算法,跟上述 Hazard pointer 手法有何異同?