# 2021q3 Homework2 (hp) contributed by < `leon123858` > ## 1.解釋 Hazard pointer 程式碼運作原理 ### (1)函式功能分析 以下將把程式碼分成三段來進行分析,以此簡化每一部分的內容, 分別是 `insert_thread` `delete_thread` 以及 `底層方法` 三段。 1. insert_thread 查看 `insert_thread` 內部,發現核心函式就是以下這段,其中 `tid()` 的功能是取得 thread 的編號,( tid 原理後述) `list_insert` 可以給列表加入新節點。所以這段的主要功能就是不斷地加入新節點。 ```cpp for (size_t i = 0; i < N_ELEMENTS; i++){ (void) list_insert(list, (uintptr_t) &elements[tid()][i]); } ``` 查看 `tid()` 其重點在變數 `tid_v` 是 thread_local 變數, `tid_v_base` 是一般的穩態變數。 thread_local 變數表示期在每一個 thread 都會被初始化一次, 而一般穩態變數卻會在各個執行緒的影響下不斷累加。 ```cpp // 只有執行緒初始化時 tid_v == TID_UNKNOWN, 才會被觸發 if (tid_v == TID_UNKNOWN) { // 取得不斷累加的執行緒總數, 不會重複, 因為每條執行緒只會進入一次。 tid_v = atomic_fetch_add(&tid_v_base, 1); assert(tid_v < HP_MAX_THREADS); } // 回傳執行緒編號, 第一次進入拿到多少就多少。 return tid_v; ``` 查看 `list_insert` 函數。 傳入參數 key 為 elements (執行緒數*元素數) 二維陣列其中一格的地址, 這其實就是我們要存入與刪除的節點的數據。 所以利用 `list_node_new(key)` 創建新節點, 接著利用 `__list_find(list, &key, &prev, &curr, &next)` 查看該節點是否已存在, 如果已存在就釋放節點以及釋放標記節點(後述 `list_hp_clear(list->hp);` )。 之後就是新增節點以及釋放標記節點。 ```cpp list_node_t *curr = NULL, *next = NULL; atomic_uintptr_t *prev = NULL; list_node_t *node = list_node_new(key); while (true) { if (__list_find(list, &key, &prev, &curr, &next)) { list_node_destroy(node); list_hp_clear(list->hp); return false; } atomic_store_explicit(&node->next, (uintptr_t) curr, memory_order_relaxed); uintptr_t tmp = get_unmarked(curr); if (atomic_compare_exchange_strong(prev, &tmp, (uintptr_t) node)) { list_hp_clear(list->hp); return true; } } ``` 查看 `list_hp_clear(list->hp)` 其作用為釋放標記節點, Hazard Pointers 可以避免 ABA 問題的主要原因在於, 其使用資料結構內的節點時會將被使用的節點地址記錄在一個臨時存放區, 等到使用結束就將存放區清空, 其意義在於當有其他執行緒想要刪除節點時需要先檢查所有執行緒的暫時存放區, 確認要刪除的節點沒有被使用才可以進行刪除。 ```cpp // hp->hp[執行緒編號] 歸0, 其本來儲存了該編號執行緒當下正在閱讀的節點地址。 for (int i = 0; i < hp->max_hps; i++) atomic_store_explicit(&hp->hp[tid()][i], 0, memory_order_release); ``` 2. delete_thread 直接查看 `list_delete` 內部, 可以發現會先檢查該節點是否存在, 若是存在則刪去該節點。 整個過程中被使用的節點地址要被記錄, 用完後要被釋放。 ```cpp list_node_t *curr, *next; atomic_uintptr_t *prev; while (true) { if (!__list_find(list, &key, &prev, &curr, &next)) { list_hp_clear(list->hp); return false; } uintptr_t tmp = get_unmarked(next); if (!atomic_compare_exchange_strong(&curr->next, &tmp, get_marked(next))) continue; tmp = get_unmarked(curr); if (atomic_compare_exchange_strong(prev, &tmp, get_unmarked(next))) { list_hp_clear(list->hp); list_hp_retire(list->hp,get_unmarked(curr)); } else { list_hp_clear(list->hp); } return true; } ``` 3. 底層方法 首先看 `list_hp_retire(list_hp_t *hp, uintptr_t ptr)` 其作用為確實釋放節點, 由於 hazardPointer 演算法在刪除節點時為了避免 ABA 問題, 不會即時刪除, 只會將其加入到待釋放列表, 須於特定事件時觸發本方法, 才能確實刪除, 以下註釋詳述刪除流程。 ```cpp retirelist_t *rl = hp->rl[tid() * CLPAD]; rl->list[rl->size++] = ptr; assert(rl->size < HP_MAX_RETIRED); if (rl->size < HP_THRESHOLD_R) return; // 掃描該thread待刪除節點 for (size_t iret = 0; iret < rl->size; iret++) { uintptr_t obj = rl->list[iret]; bool can_delete = true; // 掃描全部執行緒 for (int itid = 0; itid < HP_MAX_THREADS && can_delete; itid++) { // 掃描每條執行緒的使用中暫存區 for (int ihp = hp->max_hps - 1; ihp >= 0; ihp--) { // 如果發現刪除列表中的節點被存在某條執行緒的使用中暫存區(正被使用)就放棄刪除 if (atomic_load(&hp->hp[itid][ihp]) == obj) { can_delete = false; break; } } } if (can_delete) { size_t bytes = (rl->size - iret) * sizeof(rl->list[0]); memmove(&rl->list[iret], &rl->list[iret + 1], bytes); rl->size--; hp->deletefunc((void*) obj); } } ``` ### (2) 自己動手做做看 #### CAS 我發現程式中大量使用了 atomic 中 CAS 的技巧來確保其順利運行, 所以試著利用 CAS 寫了一個 [stack](https://github.com/leon123858/linuxClass/blob/main/atomicStack/atomicStack/atomicStack.cpp) 。 以下提供 CAS 在 stack 中的簡單用法。 可以發現 CAS 作為 atomic 程式核心函式的原因在於其可以在一個指令中完成包含比較以及設定兩件任務。 所以只要把整個任務流程設計到只有一處中斷會造成問題, 再將中斷會造成問題的地方利用 CAS 連接即可。 ```cpp void pushNode(int value) { node* n = new node(value); do { n->next = head.load(); }while (!head.compare_exchange_weak(n->next, n)); /* * 比較 head == n -> next 就令 head = n * 若 head != n , 改令 n -> next = head 然後在做下一輪, 做到一樣為止 */ } void popNode() { start: node* curHead = head.load(); do { if (curHead == nullptr) goto start; //一定要刪到, 不放棄 } while (!head.compare_exchange_weak(curHead, curHead->next)); retireList.pushNode(curHead); deletes.fetch_add(1); } ``` #### atomic link list 然而光學會 CAS 並不足以搞懂這份程式碼, 畢竟這 > *只是個 stack 而不是 linkList*, 要完成 linkList 需要一套較為複雜的流程。 所以藉著閱讀教授給的論文中的 pseudoCode , 我實作了一份 [linkList](https://github.com/leon123858/linuxClass/blob/main/atomicLinkListCpp/atomicLinkListCpp/atomicLinkListCpp.cpp) 。 但是 , 這份論文的 linkList 卻並不能解決我們全部的困境, 因為它僅僅做到了邏輯上刪除節點, > 卻沒辦法物理意義上的刪除節點, 因為她並不知道在自己刪除節點的那一刻是否有其他執行緒在對其讀取。 以下簡述我對這份 linkList 的部分理解 1. marked ```cpp /** * 標記 marked 的原理 : * 由於記憶體對齊的原因(朝4的倍數對), * 所以指標的第一個位元不包含資訊, 只要在使用時設為0 即可。 * 標記的目的在於使我們知道一個資料結構是不是已被刪除。 * 又不想用到多餘的空間, * 於是隨意找了資料結構中的一個指標, 其用不到的位元做標記。 */ // true : 被 marked 了 static inline bool is_marked_ref(void* i) { return (bool)((uintptr_t)i & 0x1L); } static inline void* get_unmarked_ref(void* w) { return (void*)((uintptr_t)w & ~0x1L); } static inline void* get_marked_ref(void* w) { return (void*)((uintptr_t)w | 0x1L); } ``` 2. 查找 此段程式碼可以拿來對應題目中的 `__list_find` , 其餘函式因難度不高所以略過不細講。 ```cpp // 獲取兩個相鄰(且未被刪除支節點), 右節點為目標 pointerPair getLeftNodeAndRightNode(int value) { pointerPair pointerPair; node* left_node_next = nullptr; search_again: do { node* tmpNode = head.load(); // 當前節點 node* tmpNode_next = tmpNode->next; // 用來查看當前節點有沒有被標註 // 查找目標節點, 且將其設為右節點。 do { // 當前節點沒被標註, 左節點前進到當前節點。 if (!is_marked_ref(tmpNode_next)) { pointerPair.leftNode = tmpNode; left_node_next = tmpNode_next; } // 當前節點往下走一個節點。 tmpNode = (node*)get_unmarked_ref(tmpNode_next); // 走到尾退出, leftNode為最後一個沒被marked的節點。 if (tmpNode == tail) break; // 得到新節點的標註 tmpNode_next = tmpNode->next; // 當節點被標註刪除或是數字還沒到就loop again } while (is_marked_ref(tmpNode_next) || (tmpNode ->value < value)); // 右節點為第一個數字大於目標且未被標註刪除的節點 , leftNode為最後一個沒被marked的節點。 pointerPair.rightNode = tmpNode; // 如果左右節點相鄰表示不用繞過, 可以回傳目標節點。 if (left_node_next == pointerPair.rightNode) // 回傳前檢查結果是否錯誤 (在查找的過程目標被刪除) #pragma warning(disable:6011) if ((pointerPair.rightNode != tail) && is_marked_ref(pointerPair.rightNode->next)) goto search_again; else return pointerPair; // 繞過被標註節點 if (pointerPair.leftNode->next.compare_exchange_weak(left_node_next, pointerPair.rightNode)) if ((pointerPair.rightNode != tail) && is_marked_ref(pointerPair.rightNode->next)) goto search_again; else return pointerPair; } while (true); } ``` #### 對 Hazard pointer 的假設進行猜測 最後引入 Hazard pointer 的演算法, 終於可以在程式運行的過程中釋放沒被使用的節點。 以下利用 c++ 完成了與範例程式碼同功能的[程式](https://github.com/leon123858/linuxClass/blob/main/hazardPointerCpp/hazardPointerCpp/hazardPointerCpp.cpp)。 以下想要聊聊 Hazard pointer 中最讓我困惑的部分 : 以範例程式碼做舉例, 我在其中的註解描述了會產生錯誤的可能 ```cpp void list_hp_retire(list_hp_t *hp, uintptr_t ptr) { retirelist_t *rl = hp->rl[tid() * CLPAD]; rl->list[rl->size++] = ptr; assert(rl->size < HP_MAX_RETIRED); if (rl->size < HP_THRESHOLD_R) return; for (size_t iret = 0; iret < rl->size; iret++) { uintptr_t obj = rl->list[iret]; bool can_delete = true; // 開始遍歷, 如果有依樣的就跳出 for (int itid = 0; itid < HP_MAX_THREADS && can_delete; itid++) { for (int ihp = hp->max_hps - 1; ihp >= 0; ihp--) { if (atomic_load(&hp->hp[itid][ihp]) == obj) { can_delete = false; break; } } } /* * 因為上下兩段並不是 atomic, * 若是在此處剛好有別的執行緒存入 hp->hp[itid][ihp] 要求不要刪除 * 且其剛好等於 obj (這裡決定要刪除的節點) * 是否會造成誤刪, 若是不會, 是因為甚麼呢? */ // 執行刪除 if (can_delete) { size_t bytes = (rl->size - iret) * sizeof(rl->list[0]); memmove(&rl->list[iret], &rl->list[iret + 1], bytes); rl->size--; hp->deletefunc((void*) obj); } } } ``` 最終我透過[論文](http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.395.378&rep=rep1&type=pdf)以及實驗, 做出了一個推測。 > For a correct algorithm for a dynamic lock-free object to use > the new methodology for memory reclamation and ABA > prevention, it must satisfy a certain condition 他說這個演算法有一個條件 > When a thread assigns a reference (i.e., a node’s address) to one of > its hazard pointers, it basically announces to other threads > that it may use that reference in a hazardous manner (e.g., > access the contents of the node without further validation of > the reference), so that other threads will refrain from > reclaiming or reusing the node until the reference is no > longer hazardous. This announcement (i.e., setting the > hazard pointer) must take place before the node is retired > and the hazard pointer must continue to hold that reference > until the reference is no longer hazardous. > This announcement must take place before the node is retired 重點在他假設所有 Hazard pointer 的設置都需要在節點退休階段之前, 在下方他提到了退休階段的意思, > 2. Reachable: n is reachable by following valid pointers > starting from the roots of an associated object. > 3. Removed: n is no longer reachable, but may still be in > use by the removing thread. > 4. Retired: n is already removed and consumed by the > removing thread, but not yet free 我推測這表示新 Hazard pointer 的加入不可以在確認可以釋放該節點, 與實際釋放該節點的中間發生。 最後我作了以下推測作為總結: **這個演算法做出了以下的假設來確保其正確性**, 使用者會利用某些演算法達成邏輯上刪除(置換)節點, 在這同時所有指針將無法進入該節點, 但可以移出該節點, 所以會發生上述錯誤的可能情況僅有**本來指向節點的指針在還未移出前節點就被釋放**且**加入禁止刪除標籤剛好在遍歷與刪除行為的中間**, 但因為這個條件困難, 發生可能性極低, 所以把這個錯誤假設為不會發生(但我不知道為甚麼, 我寫的版本的 hazard pointer 在特定條件下會一直發生, 所以有可能這份推測有誤, 尚需驗證)。 ## 2.指出 Hazard pointer 改進空間並著手實作 ### 變更方向 事實上這份程式若是要符合 Hazard pointer 的確還有一定的改進空間 : 1. 在 `list_hp_retire` 中遍歷所有正被使用的節點時間複雜度較高, 可以使用速度更快的資料結構來進行存儲。 2. 可以尋找更適合當前測試的釋放節點時機。預期可以提高速度與減少遍歷次數。 [code](https://github.com/leon123858/linuxClass/blob/main/hazardPointer/list.c) 以下將兩項變更整合至一起講述 ```cpp void list_hp_retire(list_hp_t *hp, uintptr_t ptr) { retirelist_t *rl = hp->rl[tid() * CLPAD]; rl->list[rl->size++] = ptr; assert(rl->size < HP_MAX_RETIRED); // 不再是每次新增退休節點就釋放, 而是當累積到一定的數量才釋放 // 此處我將 HP_THRESHOLD_R 設為 3*HP_MAX_THREADS, 當釋放列表達到此長度才進行釋放。 // 公式為 (1+1/k)*(reader count) , k 屬於自訂數,需較小 if (rl->size < HP_THRESHOLD_R) return; // 收集所有正在被使用的節點 (Hazard Pointer) unsigned int array[HP_MAX_THREADS*3]; int len = 0; for (int itid = 0; itid < HP_MAX_THREADS; itid++) { for (int ihp = hp->max_hps - 1; ihp >= 0; ihp--) { array[len++] = atomic_load(&hp->hp[itid][ihp]); } } // 快速排序法 O(nlogn) n 為 hazard pointer 的數量 qsort(array, len, sizeof(unsigned int), cmpfunc); // 遍歷待釋放列表, 每個節點用 binary search 查看是否匹配 // 時間複雜度為 O(mlogn) m 為待釋放列表的長度(就是HP_THRESHOLD_R) for (size_t iret = 0; iret < rl->size; iret++) { uintptr_t obj = rl->list[iret]; // 二元搜索法 O(logn) if(bsearch(&obj,array,len,sizeof(unsigned int),cmpfunc)) { size_t bytes = (rl->size - iret) * sizeof(rl->list[0]); memmove(&rl->list[iret], &rl->list[iret + 1], bytes); rl->size--; hp->deletefunc((void*) obj); } } } ``` 最後經過分析發現運行速度從原版的 其中 m = 待釋放列表的長度, n = hazard pointer 的數量 $$ O(m*n) $$ 降為 $$ O(n + n*logn + m*logn) $$ 此處因為 HP_THRESHOLD_R 計算公式可以做以下化簡 ( N 為一正數 ) $$ O(n + n*logn + m*logn) = O( N*m*logn ) = O( m*logn ) $$ ## 3.對比 rcu_list,解釋同為 lock-free 演算法,跟上述 Hazard pointer 手法有何異同?能否指出 rcu_list 實作缺陷並改進?