--- tags: linux-summer-2021 --- # 2021q3 Homework2 (hp) contributed by < `RinHizakura` > > [第 2 週測驗題](https://hackmd.io/@sysprog/linux2021-summer-quiz2) > [GitHub](https://github.com/RinHizakura/concurrent-programs/tree/hp_list) ## Hazard pointer 在沒有 garbage collection 的語言環境中,lock-free data 的主動釋放需要考慮是否有其他執行緒也同時在持有的問題,而 [hazard pointer](https://en.wikipedia.org/wiki/Hazard_pointer) 是一種可以解決 lock-free data 在動態記憶體配置的問題之方法。其基本的概念是允許 hazard pointer 可以被一個 thread 寫而多個 thread 讀,當存在 reader thread 正在操作該 hazard pointer 時,hazard pointer 會被標註,於是 writer thread 可得知此訊息並延遲對其釋放。 在 hazard pointer 架構中,每個 thread 首先需要各自維護的兩個關鍵集合是: * hazard pointer: 儲存此 thread 正在訪問的指標,因此該指標不能直接被釋放 * retire list: 被這個 thread 要求釋放的指標,但實際尚未釋放 因此要安全的釋放記憶體,其基本想法就是: * 每個 thread 放在 hazard pointer 中的指標尚不能被釋放 * 每個 thread 要求釋放的指標先放在 retire list 中 * 掃描 retire list 可以得知所有 thread 皆不使用的指標,則可以將其真正的釋放給作業系統 :::success TODO: 需要閱讀論文以更詳細的認識 hazard pointer 中更多細節 - [x] [Lock-Free Data Structures with Hazard Pointers](https://erdani.org/publications/cuj-2004-12.pdf) - [ ] [Hazard pointers: safe memory reclamation for lock-free objects](https://ieeexplore.ieee.org/document/1291819) - [x] [A Pragmatic Implementation of Non-Blocking Linked-Lists](https://www.cl.cam.ac.uk/research/srg/netos/papers/2001-caslists.pdf) ::: ## More hazard pointer 以下透過文章 [Lock-Free Data Structures with Hazard Pointers](https://erdani.org/publications/cuj-2004-12.pdf) 理解 hazard pointer 的更多細節,並摘錄關鍵的部份描述: ### 挑戰 文中提到,lock-free programing 為了保證持續進展的特性,每個 thread 都有機會在任意時間去操作一個 object。當 thread A 在釋放一個 object 時,需確保沒有另一個 thread B 仍取得該 object 的 reference 且正要存取。如果先行釋放此 object,另一個 thread 的存取就會出錯。有幾個比較經典的解決方式: 1. [Reference counting](https://en.wikipedia.org/wiki/Reference_counting): 基本想法是每當一個 thread 取得 reference 就將一個計數(其地址不同於 pointer)加一,解除 reference 時則減一。但是實際上其正確性要確保取得 pointer 和 reference count 加一是一個 atomic operation,需要使用特殊的 [DCAS (Double Compare And-Swap)](https://en.wikipedia.org/wiki/Double_compare-and-swap) 指令,但其效率不彰 2. Wait and delete: 就是 「等一段時間」 再刪除,但誰知道該等多久呢? ~~God knows~~ 3. Keep a reference count next to the pointer: 使用比起 DCAS 較合理且效率比較好的 CAS2,可以 atomic 的交換 memory 中的兩個相鄰的 words,不過 64 位元的處理器上一般不存在這種指令(但可以透過一些操作指標的技巧達到同等效果) 最後的方案看似最為可行,但實際上仍有缺陷,因為 CAS 語意要求 object 和 reference count 要同時一致才能替換上新的數值,write 操作需要等待到沒有 read 操作(reference count)才能進行。換句話說,在一個 read 操作結束前如果又有下一個 read 進來,write 就只能乾等。 :::danger 有一個疑惑是,後面一段提到可以將 reference count 和 pointer 分離來避免 writer 被 reader block 的問題 > A possible approach using reference counting is to separate the reference counter from the pointer. Now writers will not be blocked by readers anymore, but they can free replaced maps only after they observe the reference counter with a zero value [5]. Also, using this approach we only need single word CAS, which makes the technique portable to architectures that don’t support CAS2 但如果 pointer 和 reference 兩者不同時 CAS 不是就會發生錯誤嗎? 不太確定這裡描述的具體行為。 ::: ### Hazard pointer 的實現 為克服上述方法的問題,hazard pointer 是一個更佳方案(尤其針對 Write-Rarely-Read-Many 的情境)。內文中以 lock-free 的 map 為案例: ```cpp template <class K, class V> class WRRMMap { Map<K, V> * pMap_; ... }; ``` 當 `WRRMMap` 需要被更新時,writer thread 會去取得一個 `*pMap_` 指向 map 的副本並更改,然後再將 `*pMap_` 指向新的副本,並且回收舊的。麻煩的問題是這個舊的 map 可能有其他 thread 正嘗試讀取。 而 hazard pointer 的概念就是讓 reader 將正在存取的 map 加入到自己的單讀多寫(single-writer-multiple-reader)的 hazard pointer 中,其目的等同向其他 writer 宣告禁止回收該 map 。當 writer 把舊的 map 替換下來,map 先被放在一個 thread 獨立的 list 而不先釋放 ,直到 list 中的舊 map 達到一個上限,再去掃描每個 reader 的 hazard pointer 中是否有相匹配。如果某個舊 map 不與任何 reader 的 hazard pointer 匹配,那麼釋放該 map 就是安全的,否則就繼續將其留在 list 中,直到下次掃描。 ```cpp // Hazard pointer record class HPRecType { HPRecType * pNext_; int active_; static HPRecType * pHead_; static int listLen_; public: void * pHazard_; static HPRecType * Head() { return pHead_; } ... ``` hazard pointer 的基本結構由 linked list: * `*pNext` 指向下個節點 * `active_` 表示該節點被使用與否 * `pHead_` 是 linked list 的首個節點 * `listLen_` 則是 list 中的節點數量 * `pHazard_` 是物件的指標本體(例如 map) ```cpp // Acquires one hazard pointer static HPRecType * Acquire() { // Try to reuse a retired HP record HPRecType * p = pHead_; for (; p; p = p->pNext_) { if (p->active_ || !CAS(&p->active_, 0, 1)) continue; return p; } // Increment the list length int oldLen; do { oldLen = listLen_; } while (!CAS(&listLen_, oldLen, oldLen + 1)); // Allocate a new one HPRecType * p = new HPRecType; p->active_ = 1; p->pHazard_ = 0; // Push it to the front do { old = pHead_; p->pNext_ = old; } while (!CAS(&pHead_, old, p)); return p; } ... ``` 要將一個指標加入 hazard pointer 需要 acquire 一個可用的節點,首先先看看有沒有已經被建立的空節點(沒有對應要保護的 `pHazard` 實體)可以直接使用。如果沒有,則需要建立新的 node 插入到 hazard pointer linked list 的開頭,`listLen_` 也要相應的做加一 ```cpp // Releases a hazard pointer static void Release(HPRecType* p) { p->pHazard_ = 0; p->active_ = 0; } }; ``` 要解除對指標的保護則透過 release,單純的將該節點的 `pHazard_` 重置且 `active_` 允許重用即可。 ```cpp // Per-thread private variable __per_thread__ vector<Map<K, V> *> rlist; ``` 每個 thread 還有一個獨立存取的 retire list,存放該 thread 不再需要,若其它thread 也不再需要就可以被釋放 pointer。 retire list 因為只有每個 thread 自己可以存取的,因此不需要被同步。 ```cpp template <class K, class V> class WRRMMap { Map<K, V> * pMap_; ... private: static void Retire(Map<K, V> * pOld) { // put it in the retired list rlist.push_back(pOld); if (rlist.size() >= R) { Scan(HPRecType::Head()) } } }; ``` `Retire` 操作將要原本應該要釋放的 pointer 加入 vector 中,先儲存起來延遲釋放。當 vector 所放的 pointer 超過一個大小就呼叫 `Scan` 去掃描是否有可以真正釋放的 pointer。 ```cpp void Scan(HPRecType * head) { // Stage 1: Scan hazard pointers list // collecting all non-null ptrs vector<void*> hp; while (head) { void * p = head->pHazard_; if (p) hp.push_back(p); head = head->pNext_; } // Stage 2: sort the hazard pointers sort(hp.begin(), hp.end(), less<void*>()); // Stage 3: Search for'em! vector<Map<K, V>*>::iterator i = rlist.begin(); while (i != rlist.end()) { if (!binary_search(hp.begin(), hp.end(), *i) { // Aha! delete *i; if (&*i != &rlist.back()) { *i = rlist.back(); } rlist.pop_back(); } else { ++i; } } } ``` `Scan` 的任務是找出 retire list 集合和所有 thread 的 hazard pointer 集合的差集,那些 pointer 就是可以被釋放而且不會導致錯誤的。 值得關注的是實作上與本測驗題範例的差異: * 其一是我們也可以利用這裡對 retire list 的重排技巧做簡單的優化: 如果找到 retire list 中可以刪除的 pointer,刪除之後把 retire list 最尾端的 pointer 填充到這個位置而不需要測驗題中 `memmove` 作過度的記憶體搬動。 * 其二是這裡的比對過程中 hazard pointer 先被按照地址排序後再透過 binary search 搜尋,或者透過 hash table 來維護 hazard pointer 或者是更有效的作法,都是可以考慮去優化的。 文中還提到了最佳的 `HP_THRESHOLD_R` 設定,假設每個 retire list 大小為 `R` 時觸發 `Scan`,且有 `N` 個 writer thread,`H` 個 hazard pointer 存在(例如在本文中 `H` 是 reader thread 數量,因為每個 thread 同時只會存取一個 pointer),那 `R` 的最佳設定建議是 $(1 + \frac{1}{k})H$,k 是一個小的正數(例如 1/4)。這可以使 `R` 大於 `H` 且與 `H` 成正比,每次 `Scan` 都能保證釋放掉 `R-H` 個 pointer。 最後就可以把 hazard pointer 融合進本文的 `WRRMMap` 範例中,當然也可以使用在更廣泛的應用上。 ## 範例程式碼 以下先透過閱讀測驗題的程式碼以初步的理解 hazard pointer 的並行 linked list 實作想法: #### 資料結構 ```cpp typedef struct list_node { alignas(128) uint32_t magic; alignas(128) atomic_uintptr_t next; list_key_t key; } list_node_t; typedef struct list { atomic_uintptr_t head, tail; list_hp_t *hp; } list_t; ``` * `list_t` 是 linked list 的主結構,包含了與存取真正資料相關的 `head` / `tail` (雖然是 `uintptr_t` 型別但會被 cast 成 `list_node_t` 做使用) * `list_hp_t` 則與 hazard pointer 相關,其目的是確保正確操作記憶體 ```cpp typedef struct { int size; uintptr_t *list; } retirelist_t; typedef struct list_hp list_hp_t; typedef void(list_hp_deletefunc_t)(void *); struct list_hp { int max_hps; alignas(128) atomic_uintptr_t *hp[HP_MAX_THREADS]; alignas(128) retirelist_t *rl[HP_MAX_THREADS * CLPAD]; list_hp_deletefunc_t *deletefunc; }; ``` `list_hp_t` 中包含了一個 `uintptr_t` 的 hazard pointer array 和 `retirelist_t` 結構的 pointer array,此結構的詳細架構在 [`list_hp_new`](#list_hp_new) 章節更深入的揭露。 * [`alignas`](https://en.cppreference.com/w/c/language/_Alignas) 要求指標對齊 128 bytes * `CLPAD` 將每個 thread 所對應的 retire list 作 padding,其目的是減少 [false sharing](https://en.wikipedia.org/wiki/False_sharing) :::danger 但是為甚麼是對齊 128 bytes 呢,是否應該考慮不同架構的 cacheline 實際大小? hint: 課程中提到要看論文 ::: #### `tid` ```cpp static thread_local int tid_v = TID_UNKNOWN; static atomic_int_fast32_t tid_v_base = ATOMIC_VAR_INIT(0); static inline int tid(void) { if (tid_v == TID_UNKNOWN) { tid_v = atomic_fetch_add(&tid_v_base, 1); assert(tid_v < HP_MAX_THREADS); } return tid_v; } ``` `tid` 為每個 thread 配置一個獨立的編號。這裡的關鍵在於 `tid_v_base` 是 thread 共享的 id counter,而 [`thread_local`](https://en.cppreference.com/w/c/thread/thread_local) 的 `tid_v` 則每個 thread 各自獨立。 #### `list_new` ```cpp list_t *list_new(void) { list_t *list = calloc(1, sizeof(*list)); assert(list); list_node_t *head = list_node_new(0), *tail = list_node_new(UINTPTR_MAX); assert(head), assert(tail); list_hp_t *hp = list_hp_new(3, __list_node_delete); atomic_init(&head->next, (uintptr_t) tail); *list = (list_t){.hp = hp}; atomic_init(&list->head, (uintptr_t) head); atomic_init(&list->tail, (uintptr_t) tail); return list; } ``` `list_new` 建立一個 `list_t` 結構,其成員包含兩個 node `head` 和 `tail` 作為 linked list 頭尾的 dummy node,在透過 `list_node_new` 建立後,由 [`atomic_init`](https://en.cppreference.com/w/c/atomic/atomic_init) 去 assign 到正確位置: * 由於一般插入的節點值(來自 `insert_thread`) 是 global variable 的地址,且 list 的節點會按照 key 的大小進行排列,因此將 head 和 tail 的 key 初始為 key `0` 和 key `UINTPTR_MAX` 會是唯一值,可以避免被誤刪除,並從 key 區分兩者 以及由 `list_hp_new` 所配置的 `list_hp_t` 結構之成員。 * 其中參數的 3 表示可存取的 hazard pointer 數量,對應 `enum{HP_NEXT = 0, HP_CURR = 1, HP_PREV}` #### `list_node_new` ```cpp list_node_t *list_node_new(list_key_t key) { list_node_t *node = aligned_alloc(128, sizeof(*node)); assert(node); *node = (list_node_t){.magic = LIST_MAGIC, .key = key}; (void) atomic_fetch_add(&inserts, 1); return node; } ``` 配置一個 `list_node_t` 結構,並且將其 `list_key_t` (實際是 `uintptr_t` 型別透過 typedef 定義) 的值設置為 `key` * `inserts` 變數在函數被呼叫使用 atomic 加法([`atomic_fetch_add`](https://en.cppreference.com/w/c/atomic/atomic_fetch_add))加一 #### `list_hp_new` ```cpp= /* Create a new hazard pointer array of size 'max_hps' (or a reasonable * default value if 'max_hps' is 0). The function 'deletefunc' will be * used to delete objects protected by hazard pointers when it becomes * safe to retire them. */ list_hp_t *list_hp_new(size_t max_hps, list_hp_deletefunc_t *deletefunc) { list_hp_t *hp = aligned_alloc(128, sizeof(*hp)); assert(hp); if (max_hps == 0) max_hps = HP_MAX_HPS; *hp = (list_hp_t){.max_hps = max_hps, .deletefunc = deletefunc}; for (int i = 0; i < HP_MAX_THREADS; i++) { hp->hp[i] = calloc(CLPAD * 2, sizeof(hp->hp[i][0])); hp->rl[i * CLPAD] = calloc(1, sizeof(*hp->rl[0])); for (int j = 0; j < hp->max_hps; j++) atomic_init(&hp->hp[i][j], 0); hp->rl[i * CLPAD]->list = calloc(HP_MAX_RETIRED, sizeof(uintptr_t)); } return hp; } ``` 如註解所描述的,`list_hp_new` 將建立 `max_hps` 個 `list_hp_t` 單元,`deletefunc` 則是在釋放時要去執行的 destructor,負責 hazard pointer 所維護的 object (此案例中是 `list_node_t`)本身釋放前的前置作業(例如 object 中配置資源的先行釋放、記錄釋放的次數等)。 * [`aligned_alloc`](https://en.cppreference.com/w/c/memory/aligned_alloc) 要求系統配置對齊 128 bytes 的位址 * for 迴圈根據允許 program 的最大執行緒數量 `HP_MAX_THREADS` 去初始化兩個 pointer array 的每個元素 * 每個 pointer `hp->hp[i]` 各自是一個 thread `i` 所可以存取的 hazard pointer array,每個 thread 最多可以持有 `CLPAD * 2` 個 hazard pointer * 每個 pointer `hp->rl[i * CLPAD]` 是一個 thread `i` 所可以存取的 retired list,最多可以持有 `HP_MAX_RETIRED` 個 pointer :::danger 17 行的 hazard pointer array 的大小不是由 `max_hps` 決定嗎?為何牽涉 `CLPAD`? 這可能會導致 20 行的操作 overflow 嗎? * 因為這個範例只會用到至多 3 個 hazard pointer,而以 64 位元架構為例, `sizeof(uintptr_t) == 8` 因此 `CLPAD == 16` 不會 overflow。但考慮到擴充性仍應該修正 ::: #### `__list_find` ```cpp= static bool __list_find(list_t *list, list_key_t *key, atomic_uintptr_t **par_prev, list_node_t **par_curr, list_node_t **par_next) { atomic_uintptr_t *prev = NULL; list_node_t *curr = NULL, *next = NULL; try_again: prev = &list->head; curr = (list_node_t *) atomic_load(prev); (void) list_hp_protect_ptr(list->hp, HP_CURR, (uintptr_t) curr); if (atomic_load(prev) != get_unmarked(curr)) goto try_again; ``` `__list_find` 是整個實作的核心函數,其任務為嘗試在 `*list` 中找尋一個節點之 key `node->key` 與函式參數的 `*key` 相符。 * `prev` 是前一個節點的 `next` 指標,從 `&list->head` 開始 * `curr` 是 `prev` 指向的節點,透過 `atomic_load` 載入進來 * `next` 則是 `curr` 的下一個節點 `list_hp_protect_ptr()` 將對 `curr` 節點的存取進行標註,具體行為是將指標加入 `hp->hp[tid()][HP_CURR]`,因此在掃描 retire list 時就可以通過檢索每個 thread 的 `hp->hp` 知道是否可以是否可以釋放該記憶體。 * `prev` 一開始不用被 protect 是因為該節點只有在程式結束時才有釋放的可能 * 14, 15 行確認同步的正確 * 因為從更新 `curr` 到執行此行時,`prev` 指向的節點可能已經不再是 `curr` ```cpp=16 while (true) { if (!get_unmarked_node(curr)) return false; next = (list_node_t *) atomic_load(&get_unmarked_node(curr)->next); (void) list_hp_protect_ptr(list->hp, HP_NEXT, get_unmarked(next)); if (atomic_load(&get_unmarked_node(curr)->next) != (uintptr_t) next) break; if (get_unmarked(next) == atomic_load((atomic_uintptr_t *) &list->tail)) break; if (atomic_load(prev) != get_unmarked(curr)) goto try_again; if (get_unmarked_node(next) == next) { if (!(get_unmarked_node(curr)->key < *key)) { *par_curr = curr; *par_prev = prev; *par_next = next; return get_unmarked_node(curr)->key == *key; } prev = &get_unmarked_node(curr)->next; (void) list_hp_protect_release(list->hp, HP_PREV, get_unmarked(curr)); } ``` * 17, 18 行檢查 `curr` 為 NULL 節點的情形 * 正確運行的話應該是不會發生的?(23 行) * 19, 20 行取得 `next` 並且也加入 hazard pointer 集合中 * 21, 22 行確認同步的正確(取得 `next` 仍然是 `curr->next`) ,若錯誤則跳出迴圈 :::danger 找不到相同的 key 節點與同步的錯誤都是回傳 false 處理,合理嗎? 在 `list_insert` 的視角中是否會無法區辨兩者導致節點其實沒有按照 key 的大小置放? * 修改成 `goto try_again` 是否正確? -> [修改 __list_find 之錯誤](#修改-__list_find-之錯誤) ::: * 23, 24 檢查若搜尋的下一個 node 是結尾則跳出迴圈 :::danger 1. break 之後會直接 return false,但當 `curr` 滿足 `get_unmarked_node(curr)->key == *key` 為 true 時不應該返回 true 嗎? 2. 如此一來,對於新插入的 node,表示其必然至多是 linked list 的倒數第三個 node(不可能是倒數第一或第二個),造成的問題是初始化時 `key == 0` 的 node 會被強行設置為 `key == UINTPTR_MAX` 的 `next` -> [修改 __list_find 之錯誤](#修改-__list_find-之錯誤) ::: * 25, 26 行再次確認同步的正確 * 27 行要確認 `next` 是否被標註,表示該狀態表示 `curr` 處於將被刪除的 mark 狀態 * 程式中的 marked (`ptr | 0x01`) 的目的應該是要標註對於 `node` 與其下個節點 `node->next`,因為 `node` 將被刪除,因此 `node->next` 的取得是無效的 * 如果 `curr` 沒有要被刪除,就比對 `*key` 和該 node 的 key 相比是否較大 * 如果否,回傳 `**par_curr` / `**par_prev` / `**par_next`,以及 key 相比應為較小或相等的資訊(true or false) 給 `list_insert` 和 `list_delete` 判別後續處理 * 如果是,則欲搜尋的節點可能還在 list 的後面,則更新 `prev` / `curr` 並且加入 hazard pointer 中進行保護,進入下一個 iteration 搜尋 ```cpp=38 else { uintptr_t tmp = get_unmarked(curr); if (!atomic_compare_exchange_strong(prev, &tmp, get_unmarked(next))) goto try_again; list_hp_retire(list->hp, get_unmarked(curr)); } curr = next; (void) list_hp_protect_release(list->hp, HP_CURR, get_unmarked(next)); } *par_curr = curr; *par_prev = prev; *par_next = next; return false; } ``` * 如果 `curr` 在其他的 thread 標示被刪除,則先確保同步的正確,比較值得關注的是被標註的 node 之刪除(更準確的說是透過 `list_hp_retire` 加入 retire list)會在此與 `list_delete` 中的刪除產生 race,因此可知一個 node 的刪除可能由一個 thread * 否則要再從頭搜尋一次 :::warning :warning: 個人見解,需要深入更多相關知識與實驗佐證: 原本我以為 retire list 的設計是針對 double free 的問題,但單就解決此問題來看,似乎只要直接讓 thread 競爭釋放節點的 free 呼叫即可,而不需要藉由 retire list 來延遲釋放。 * 各 thread 的 retire list 中的指標仍不允許重複的,否則 double free 問題會產生 hazard pointer 的重點應在於節點的釋放不能在其他 thread 的執行路徑上,可以想像如果某個 thread 在運行途中存取到已經釋放的節點,則錯誤將產生。正因如此需要 defer free 的機制,則個 thread 可以先存取該應該被釋放的節點,後續再透過其他檢查得知存取的無效性。 ::: #### `list_insert` ```cpp bool list_insert(list_t *list, list_key_t key) { list_node_t *curr = NULL, *next = NULL; atomic_uintptr_t *prev = NULL; list_node_t *node = list_node_new(key); while (true) { if (__list_find(list, &key, &prev, &curr, &next)) { list_node_destroy(node); list_hp_clear(list->hp); return false; } atomic_store_explicit(&node->next, (uintptr_t) curr, memory_order_relaxed); uintptr_t tmp = get_unmarked(curr); if (atomic_compare_exchange_strong(prev, &tmp, (uintptr_t) node)) { list_hp_clear(list->hp); return true; } } } ``` `prev` 指向新建立的 `node` 應該被放入的位址(pointer to pointer),該位址原本存放 `curr` ,但 `curr` 節點現在成為了 `node->next` 而其原本存在的位址將被 `node` 取代。然而實際上要確保 `curr` 並未同時正在被 `list_delete` 要求刪除。因此使用 [`atomic_compare_exchange_strong`](https://en.cppreference.com/w/c/atomic/atomic_compare_exchange) 確保置換時的同步正確。 * return 前呼叫 `list_hp_clear`,讓 retire list 知道哪些 pointer 不再有 thread 要存取而可以釋放 #### `list_delete` ```cpp bool list_delete(list_t *list, list_key_t key) { list_node_t *curr, *next; atomic_uintptr_t *prev; while (true) { if (!__list_find(list, &key, &prev, &curr, &next)) { list_hp_clear(list->hp); return false; } uintptr_t tmp = get_unmarked(next); if (!atomic_compare_exchange_strong(&curr->next, &tmp, get_marked(next))) continue; tmp = get_unmarked(curr); if (atomic_compare_exchange_strong(prev, &tmp, get_unmarked(next))) { list_hp_clear(list->hp); list_hp_retire(list->hp, get_unmarked(curr)); } else { list_hp_clear(list->hp); } return true; } } ``` `list_delete` 的目的則是找到 `list` 中的節點與 `key` 相符者,將該節點從 linked list 中取下。 * `atomic_compare_exchange_strong(&curr->next, &tmp, get_marked(next)` 是關鍵,如前曾經提到的,被刪除的節點的 `next` 會先被 mark 起來, * 於是乎後續的刪除會與 `__list_find` 去競爭,也因此我們可以看到十分相像的操作: 將要刪除的節點所在位置用 unmark 的 next 取代,並且刪除的節點加入 retire list #### `list_hp_retire` ```cpp void list_hp_retire(list_hp_t *hp, uintptr_t ptr) { retirelist_t *rl = hp->rl[tid() * CLPAD]; rl->list[rl->size++] = ptr; assert(rl->size < HP_MAX_RETIRED); if (rl->size < HP_THRESHOLD_R) return; ``` `list_hp_retire` 是了解 retire list 如何運作的核心。一開始根據 thread id,找到對應的 retire list 並將要釋放的 pointer 加入即可。 :::warning 可改進: retire list 的大小應該允許擴充 / 更適合用來維護 retire list 的資料結構? ::: ```cpp for (size_t iret = 0; iret < rl->size; iret++) { uintptr_t obj = rl->list[iret]; bool can_delete = true; for (int itid = 0; itid < HP_MAX_THREADS && can_delete; itid++) { for (int ihp = hp->max_hps - 1; ihp >= 0; ihp--) { if (atomic_load(&hp->hp[itid][ihp]) == obj) { can_delete = false; break; } } } if (can_delete) { size_t bytes = (rl->size - iret) * sizeof(rl->list[0]); memmove(&rl->list[iret], &rl->list[iret + 1], bytes); rl->size--; hp->deletefunc((void *)obj); } } } ``` 當 retire list 的大小 `rl->size` 超出上限,則遍歷 retire list 的每個 pointer,確保其不屬於任何一個 thread 的 hazard pointer 後,將其移出 retire list,而該 pointer 可以藉由 `deletefunc` 真正的將配置的資源進行回收。 ## 錯誤修正與程式改進 ### 修改 `__list_find` 之錯誤 list 的節點應該依據按照 `key` 的大小被依序排列,才能使得 `__list_find` 符合 `list_insert` / `list_delete` 的期待,不需要真正走完整個 linked list 就可以得知某個 key 是否存在 linked list 中,提早返回。然後目前實作的多個問題卻可能錯誤的擺放 node,造成錯誤排序。 為了方便檢驗 linked list 的順序,設計一個簡單的 function。可以放在 `pthread_join` 之後做檢查: ```cpp void list_order_check(list_t *list) { assert(list); list_node_t *node = (list_node_t *) atomic_load(&list->head); int error = 0; list_key_t prev_key = 0; while (node) { if(prev_key > node->key) error++; prev_key = node->key; node = (list_node_t *) atomic_load(&node->next); } printf("Find error ordering: %d\n", error); } ``` 為了實驗的清晰,你甚至可以先拿掉 `delete_thread` 來確認排序的錯誤有部份在只有 `list_insert` 下就會出現。以下是範例輸出,錯誤的數量每次的執行會有所不同。 ``` Find error ordering: 13 ``` 首先,在 `__list_find` 中,檢查同步錯誤的 ```cpp if (atomic_load(&get_unmarked_node(curr)->next) != (uintptr_t) next) break; ``` 會直接 break 出迴圈並且返回 false。這導致 `list_insert` 認為已經找出插入的正確位置,而錯誤的放置新節點。需改動成: ```cpp if (atomic_load(&get_unmarked_node(curr)->next) != (uintptr_t) next) goto try_again; ``` 修改之後重新執行程式,會發現錯誤的排序數量恆為 1: ``` Find error ordering: 1 ``` 如果 dump 出 linked list 的節點依序之 key 的話,會發現僅剩的 1 個排序錯誤在於原本初始時 `key == 0` 的節點必定會被擺到 linked list 中的倒數第二個位置,位於初始時 `key == UINTPTR_MAX` 的節點之前,導致 `key == 0` 的節點不恆為 linked list 之 head。而原因在於 `__list_find` 中,以下的檢查是多餘的: ```cpp if (get_unmarked(next) == atomic_load((atomic_uintptr_t *) &list->tail)) break; ``` 我們可以直接將此判斷式移除,因為不再有 break 出迴圈的操作了,順便也就將 `__list_find` 最後的以下幾行移除: ```cpp *par_curr = curr; *par_prev = prev; *par_next = next; return false; ``` 而下面的兩行實際上也不需要: ```cpp if (!get_unmarked_node(curr)) return false; ``` 總結來說,`__list_find` 目前被我更動成下面的樣子: ```cpp static bool __list_find(list_t *list, list_key_t *key, atomic_uintptr_t **par_prev, list_node_t **par_curr, list_node_t **par_next) { atomic_uintptr_t *prev = NULL; list_node_t *curr = NULL, *next = NULL; try_again: prev = &list->head; curr = (list_node_t *) atomic_load(prev); (void) list_hp_protect_ptr(list->hp, HP_CURR, (uintptr_t) curr); if (atomic_load(prev) != get_unmarked(curr)) goto try_again; while (true) { next = (list_node_t *) atomic_load(&get_unmarked_node(curr)->next); (void) list_hp_protect_ptr(list->hp, HP_NEXT, get_unmarked(next)); if (atomic_load(&get_unmarked_node(curr)->next) != (uintptr_t) next) goto try_again; if (atomic_load(prev) != get_unmarked(curr)) goto try_again; if (get_unmarked_node(next) == next) { if (!(get_unmarked_node(curr)->key < *key)) { *par_curr = curr; *par_prev = prev; *par_next = next; return (get_unmarked_node(curr)->key == *key); } prev = &get_unmarked_node(curr)->next; (void) list_hp_protect_release(list->hp, HP_PREV, get_unmarked(curr)); } else { uintptr_t tmp = get_unmarked(curr); if (!atomic_compare_exchange_strong(prev, &tmp, get_unmarked(next))) goto try_again; list_hp_retire(list->hp, get_unmarked(curr)); } curr = next; (void) list_hp_protect_release(list->hp, HP_CURR, get_unmarked(next)); } } ``` 這會不會導致存取到 null pointer 或者無法返回的錯誤呢? 仔細思考,正常情況中應該是不會的,這建立在 `key == UINTPTR_MAX` 的 tail 節點不會被刪除的前提下。因為 tail 節點不會被刪除,可以設想迴圈運行到 `curr` 為 tail 節點時,若同步錯誤,回到 `try_again` 重新搜尋,其他情況下,因為 tail 不被刪除,因此 ```cpp if (get_unmarked_node(next) == next) ``` 恆為 true 且 ```cpp if (!(get_unmarked_node(curr)->key < *key)) ``` 也恆為 true,因此最終得以返回。 ### 減少記憶體搬動 根據 [Hazard pointer 的實現](#Hazard-pointer-的實現)章節所提及的重排技巧做簡單的優化: 如果找到 retire list 中可以刪除的 pointer,刪除之後把 retire list 最尾端的 pointer 填充到這個位置而不需要 `memmove` 作過度的記憶體搬動。 ```diff void list_hp_retire(list_hp_t *hp, uintptr_t ptr) { ... for (size_t iret = 0; iret < rl->size; iret++) { ... + // move the last retire pointer to current position + rl->list[iret] = rl->list[rl->size - 1]; + // check current position of the retire list again + iret--; - size_t bytes = (rl->size - iret) * sizeof(rl->list[0]); - memmove(&rl->list[iret], &rl->list[iret + 1], bytes); ... } } ``` ### 減少 retry 次數 每次的同步錯誤(`__list_find` 中途操作的 `prev` / `curr` / `next` 彼此關係改變)都會觸發 `goto try_again` 來重新遍歷 linked list,思考是否有從 linked list 的中繼重新搜尋而非從頭開始的可能性。 考慮以下條件為 true 的可能狀況 ```cpp if (ATOMIC_LOAD(prev) != get_unmarked(curr)) ``` 1. `curr` 在其他 thread 被刪除 2. `prev` 所屬的節點(需注意到 `prev` 是某個節點的 next 而非節點主體)在其他 thread 被刪除 3. `prev` 所屬節點和 `curr` 之間出現新的節點 insert 其中,若情況屬於 1 和 3,因為 `prev` 所屬節點可以確保其 `key` 值小於所要搜尋的 key,因此可以嘗試將 `curr` 做更新再重啟搜尋的程序即可(嗎?) 以下為對應此想法的修改: 1. `prev` 所屬節點可以透過類似於 `container_of` 的方式從 `prev` 獲得 2. `prev` 所屬節點是否被刪除來自其 next member 是否被標註 ```diff try_again: prev = &list->head; +update_curr_from_prev: curr = (list_node_t *) ATOMIC_LOAD(prev); (void) list_hp_protect_ptr(list->hp, HP_CURR, (uintptr_t) curr); if (ATOMIC_LOAD(prev) != get_unmarked(curr)) { TRACE(retry); ... if (ATOMIC_LOAD(prev) != get_unmarked(curr)) { + list_node_t *prev_node = (list_node_t *)((char *)prev - offsetof(list_node_t, next)); + if (!is_marked(ATOMIC_LOAD(&get_unmarked_node(prev_node)->next))) + goto update_curr_from_prev; TRACE(retry); goto try_again; } ``` :::danger 是否還有其他疏忽的情形? 如何確保更動後的程式之正確性(從目前的 `prev` 開始搜尋確實合法)? ::: 我們首先計算採用前述修改之前(`origin`)與之後(`fast_find`)的運行時間差異。幾個需要注意的實驗設計: 1. 關閉 thread sanitizer(避免檢驗兩個不同程式碼的時間差異產生的誤差) 2. 重複 100 次程式的運行並計算 簡單的將結果視覺化,呈現如下: ![](https://i.imgur.com/p2TTIMD.png) 也用 python 的 scipy 套件([`scipy.stats.ttest_ind`](https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.ttest_ind.html))初略的分析一下兩者統計上是否有顯著差異: ``` Ttest_indResult(statistic=-0.7599871108808405, pvalue=0.4475628030221265) ``` 嘗試統計並比較兩者的 traversal 的次數: ![](https://i.imgur.com/j5Z1OKG.png) ``` Ttest_indResult(statistic=0.7760119889459149, pvalue=0.43867728413846696) ``` 直接從結果來看,這個改動似乎沒有造成實質的效益。然而,可能還需要考慮到 linked list 的實際長度,在兩個方法的走訪時間差異,是否足以用秒/走訪次數為單位看出端倪。 如果嘗試統計並比較兩者的 retry 的次數: ![](https://i.imgur.com/67OA0Ri.png) ``` Ttest_indResult(statistic=40.617863831954175, pvalue=2.158360840419251e-97) ``` 顯然這個差異來自 `goto update_curr_from_prev` 並不計算到一次 retry,是故 `fast_find` 的 retry 次數顯著的更少。然而這樣統計區分兩者優劣是否合理? 並且,假如 linked list 的長度增加,這個差異是否有機會明顯的(以秒為單位)反應於運行時間上? :::danger 直接使用原先範例程式碼的運行方式實驗兩個方法是否合理? 如何設計貼近實際應用案例的實驗? ::: ## RCU List