# 2021q3 Homework2 (hp) contributed by < [`yian02`](https://github.com/yian02) > ###### tags: `linux2021` # 1. 解釋程式碼運作原理 ## 研讀 [A Pragmatic Implementation of Non Blocking Linked List](https://www.cl.cam.ac.uk/research/srg/netos/papers/2001-caslists.pdf) ### `search` operation `insert` 和 `delete` 都會需要先使用 `search` 找到插入位置或要刪除的 node 後,再進行相對應的操作。 在論文中, search 結束後會得到兩個資訊: left node 和 right node,論文中有說明確保其符合以下條件: 1. left node 的 key 比要尋找的 key 小 2. right node 的 key 大於等於要尋找的 key 3. left node 和 right node 都要是 unmarked 的 4. left node 和 right node 要是緊鄰的 (也就是 search 的操作會去刪除在 left node 和 right node 中間被 marked 的 node) 在論文中,`search` 函式被分為三個部分: 1. 遍歷 list,找出第一個 node 中的 key 大於等於要搜尋的 key 的 node,此為 right node (left node 在此時是 right node 的前一個未被標記為刪除的 node) 2. 檢查 left node 是否緊鄰 right node,若是即回傳,若否則繼續到步驟 3. 3. 將 left node 和 right node 中間被標記為刪除的 node 刪除 :::info :bulb: 在此論文中,是使用 `node->next` 來標註此 node 是否邏輯上被刪除。 ::: ## `insert` operation 共需三個步驟: 1. 使用 search 找到要插入的位置 2. 將要插入的 node 的 `node->next` 設為 `right_node` (此步驟不會影響到原本的 list,因此用一般的賦值方式即可) 3. 使用 CAS 將 `left_node.next` 內儲存的記憶體位置從原本的 `right_node` 換成要插入的 node。(此步驟會影響到原本的 list,需要使用 CAS) ## `delete` operation `delete` 需要兩個 (或以上,如果第一次嘗試刪除失敗的話) 的 CAS 操作來完成: 1. 使用 `search` 找到要刪除的 node (若找到會儲存在 `right_node` 中) 2. 先使用 `right_node_next` 變數來儲存 `right_node->next` 的值,並確認其不是被標註為刪除的 3. 使用 CAS 標記 `right_node->next`,代表 `right_node` 在邏輯上已被刪除 (若此 CAS 操作失敗,則回到步驟 1. 重新開始) 4. 用 CAS 嘗試將 `left_node->next` 修改為之前儲存的 `right_node_next`,此操作成功的條件為 `left_node` 和 `right_node` 依舊是相鄰的。操作成功即回傳 5. 若步驟 4. 的 CAS 失敗,則會呼叫 `search` ,透過 `search` 會刪除找到的 `right_node` 和 `left_node` 間被標註的 node 的特性,嘗試將剛剛標註的 node 刪除。 ## 測驗題程式碼運作原理 ### Hazard Pointer Hazard pointer 負責管理與決定記憶體釋放的時機,在某個 thread 要使用一個特定的記憶體位置時,會將該記憶體位置加入自己的 hazard pointer list 中,一旦使用完畢則會將其清除。若某個特定的 thread 要釋放特定的記憶體空間,就要先去檢查是否有其他 thread 在使用該記憶體位置的資料,確定安全後才能真正的釋放空間。 先從 hazard pointer 中所使用到的資料結構來看: #### Retire List ```c typedef struct { int size; uintptr_t *list; } retirelist_t; ``` `retirelist_t` 會儲存已經被 delete 但空間還未被釋放的 pointer,一旦程式確定 retire list 中儲存的 pointer 已經沒有人在使用之後,就會釋放記憶體空間。在 `retire_list_t` 中,使用一個陣列 `unitptr_t *list` 來儲存準備要被釋放的 pointer 之外,也會儲存這個 retire list 目前的大小。 #### Hazard Pointer List ```c #define CLPAD (128 / sizeof(uintptr_t)) struct list_hp { int max_hps; alignas(128) atomic_uintptr_t *hp[HP_MAX_THREADS]; alignas(128) retirelist_t *rl[HP_MAX_THREADS * CLPAD]; list_hp_deletefunc_t *deletefunc; }; ``` `list_hp` 結構中,使用型態為 `atomic_unitptr_t` 的二維陣列來儲存 hazard pointer,並使用型態為 `retirelist_t` 的二維陣列來儲存 retire list。這個結構中也儲存了最大容許的 hazard pointer 數量,和在從記憶體中釋放 hazard pointer 所要呼叫的函式 `deletefunc` 的 function pointer。 此程式中的每一個 thread 都可以透過呼叫 `static inline tid()` 還取得自己的編號: ```c static thread_local int tid_v = TID_UNKNOWN; static atomic_int_fast32_t tid_v_base = ATOMIC_VAR_INIT(0); static inline int tid(void) { if (tid_v == TID_UNKNOWN) { tid_v = atomic_fetch_add(&tid_v_base, 1); assert(tid_v < HP_MAX_THREADS); } return tid_v; } ``` 這個函式的運作原理,是透過累加一個所有 thread 的共享 atomic 變數 `tid_v_base`,來將每個 thread 編號。每個 thread 在初始化時,其 thread local 的變數 `tid_v` 會被初始化為 `TID_UNKNOWN`,在呼叫 `static inline int tid(void)` 時,會使用 atomic 操作將 `tid_v_base` 加一,並用加完後的回傳值當作自己的 thread ID。 而將記憶體位置加入 hazard pointer 中的函式為以下兩者: ```c uintptr_t list_hp_protect_ptr(list_hp_t *hp, int ihp, uintptr_t ptr) uintptr_t list_hp_protect_release(list_hp_t *hp, int ihp, uintptr_t ptr) ``` 兩個函式都使用 `atomic_store_explicit` 並根據所在的 thread 將 pointer 存入 hazard pointer 中。兩個函式的差異只是一個是使用預設最嚴格的 memeory order,另一個是使用指定的 `memory_order_release`。 負責將準備釋放空間的 pointer 放入 retire list 的,為以下函式: ```c void list_hp_retire(list_hp_t *hp, uintptr_t ptr) ``` 這個函式會將傳入的 `ptr` 根據所在的 thread,放入相對應的 retire list 中,並更新 retire list 中儲存的 pointer 個數,若該數目大於 `HP_THRESHOLD_R` ,則會試圖將該 thread 的 retire list 中的 pointer 釋放 (預設的 `HP_THRESHOLD_R` 為 0,代表每次只要將 pointer retire,就會試圖去釋放空間)。 上述函式是透過遍歷所有 thread 所擁有的 hazard pointer 陣列,確定自己準備釋放的記憶體位置沒有在任何 thread 的 hazard pointer 陣列中,即可執行預先給定的 call back 函式 `deletefunc` 來釋放記憶體空間。在確定要釋放記憶體空間時,會將在 retire list 的下一個 pointer 寫入目前的位置,就是將放在後面的人往前放的意思,可以加速之後的搜尋速度。 ## Linked list ### 結構 #### list node ```c typedef uintptr_t list_key_t; typedef struct list_node { alignas(128) uint32_t magic; alignas(128) atomic_uintptr_t next; list_key_t key; } list_node_t; /* Per list variables */ typedef struct list { atomic_uintptr_t head, tail; list_hp_t *hp; } list_t; ``` - `list_node_t` 中的 `next` 因為會在被(邏輯)上刪除時被標記,標記的動作需要使用到 atomic 操作,所以需使用 `atomic_uintpre_t` - `list_t` 中會儲存兩個 dummy heads `head` and `tail` ### 操作 #### mark ```c #define is_marked(p) (bool) ((uintptr_t)(p) &0x01) #define get_marked(p) ((uintptr_t)(p) | (0x01)) #define get_unmarked(p) ((uintptr_t)(p) & (~0x01)) ``` - 透過作業系統定址的特性,將永遠都是 0 的 LSB 拿來當作標記使用 #### insert `list_insert`  的函式會告如下: ```c=1 bool list_insert(list_t *list, list_key_t key) { list_node_t *curr = NULL, *next = NULL; atomic_uintptr_t *prev = NULL; list_node_t *node = list_node_new(key); while (true) { /* If the key is already stored in the list, destroy the new node, clear * the hazard pointer, then return false. * (The insert operation fails if the key is already in the list) */ if (__list_find(list, &key, &prev, &curr, &next)) { list_node_destroy(node); list_hp_clear(list->hp); return false; } atomic_store_explicit(&node->next, (uintptr_t) curr, memory_order_relaxed); uintptr_t tmp = get_unmarked(curr); if (atomic_compare_exchange_strong(prev, &tmp, (uintptr_t) node)) { list_hp_clear(list->hp); return true; } } } ``` 透過 `__list_find` 函式,先搜尋要插入的 `key` 是否已經存在在 list 中,若存在即插入失敗。若不存在,則開始將新的 node 插入 list。 插入的步驟如下: 1. 先將新的 node 的 `next` 透過 atomic 操作變為 `__list_find` 所找到的 `curr` (此步驟不會變更原本的 list,因此可以使用 `relaxed`) (若 `__list_find` 尋找成功的話,會自動將找到的 pointer 加入 hp 中) 2. 檢查前一個 node 的指標是否還是指著 `tmp`,若是的話則將前一個 node 的指標指向的記憶體位置內的資料換成 node 的位置。 3. 若 2. 成功,則清除 hp 並回傳。 4. 若 2. 失敗,則重複直到成功。 #### delete ```c=1 bool list_delete(list_t *list, list_key_t key) { list_node_t *curr, *next; atomic_uintptr_t *prev; while (true) { if (!__list_find(list, &key, &prev, &curr, &next)) { list_hp_clear(list->hp); return false; } uintptr_t tmp = get_unmarked(next); if (!atomic_compare_exchange_strong(&curr->next, &tmp, get_marked(next))) continue; tmp = get_unmarked(curr); if (atomic_compare_exchange_strong(prev, &tmp, get_unmarked(next))) { list_hp_clear(list->hp); list_hp_retire(list->hp, get_unmarked(curr)); } else { list_hp_clear(list->hp); } return true; } } ``` delete 先使用 `__list_find` 找出要刪除的 `key` 是否有在 list 中,若找到,則: 1. 確認 `curr->next` 和 `next` 是否還相同,若相同,則將 `curr->next` 內儲存的指標換成被標記的,代表 `curr` 在「邏輯上」已被刪除 2. 若 1. 失敗,則重複至成功為止 3. 確認 `curr` 的前一個 node 是否還指著 `curr` ,是的話就把前一個 node 指著的位置換成 `next`。 4. 若 3. 成功,則清除 hp ,並將刪除的 node 加入 retire list 中,回傳 true 5. 若 3. 失敗,則清除 hp,回傳 true (將 node 加入 retire list 的工作就交給之後的 `__list_find`) #### `__list_find` 這個程式中最重要的函式非 `__list_find` 莫屬了。 ```c=1 static bool __list_find(list_t *list, list_key_t *key, atomic_uintptr_t **par_prev, list_node_t **par_curr, list_node_t **par_next) { atomic_uintptr_t *prev = NULL; list_node_t *curr = NULL, *next = NULL; try_again: // search from the head of the list prev = &list->head; curr = (list_node_t *) atomic_load(prev); // add the current to the list of hazard pointers (void) list_hp_protect_ptr(list->hp, HP_CURR, (uintptr_t) curr); if (atomic_load(prev) != get_unmarked(curr)) // the list has been modified goto try_again; while (true) { if (!get_unmarked_node(curr)) return false; next = (list_node_t *) atomic_load(&get_unmarked_node(curr)->next); // add the next node to the list of hazard pointers (void) list_hp_protect_ptr(list->hp, HP_NEXT, get_unmarked(next)); if (atomic_load(&get_unmarked_node(curr)->next) != (uintptr_t) next) break; if (get_unmarked(next) == atomic_load((atomic_uintptr_t *) &list->tail)) break; if (atomic_load(prev) != get_unmarked(curr)) goto try_again; // check if the node is marked or not if (get_unmarked_node(next) == next) { // the node isn't marked if (!(get_unmarked_node(curr)->key < *key)) { *par_curr = curr; *par_prev = prev; *par_next = next; return *key == get_unmarked_node(curr)->key; } prev = &get_unmarked_node(curr)->next; (void) list_hp_protect_release(list->hp, HP_PREV, get_unmarked(curr)); } else { // the node is marked uintptr_t tmp = get_unmarked(curr); // using CAS to remove the marked node if (!atomic_compare_exchange_strong(prev, &tmp, get_unmarked(next))) goto try_again; // add the removed node the retire list list_hp_retire(list->hp, get_unmarked(curr)); } curr = next; (void) list_hp_protect_release(list->hp, HP_CURR, get_unmarked(next)); } *par_curr = curr; *par_prev = prev; *par_next = next; return false; } ``` 這個函式中有三個重要的指標: - `curr`:目前的 node 的記憶體位置 - `next`:下一個 node 的記憶體位置 - `prev`:前一個 node 中指向目前的 node 的指標(也就是前一個 node 中的 `next`) 在第 18 行的 while loop 開始時,即會判斷目前這個 node 的前後是否有被更動過,若被更動過的話,即會需要重新來過一遍。 確認沒有被更動過後,會先判斷目前拿到的 node 是否是被標記刪除的 (也就是 `curr->next` 是否是被標記的),若是被標記的,就會呼叫 `list_hp_retire` 來將其放入 retire list 中,若條件許可也會將其空間釋放。 若 node 不是被標記刪除的,就會判對目前 node 中所儲存的 `key` 是否為目標,因為程式中的 list 是遞增排序,因此只有在現在找到的 node 中的 `key` 比目標的 `key` 還要小的時候,才會需要繼續往下一個 node 找。 若找到的 key 不小於目標的 `key` 時,就可以不用再往下找,只要判斷目前的 `key` 是否等於目標的 `key`,並回傳相對應的布林值。 --- ## 2. 指出改進空間並實作 TODO --- ## 3. 對比 [rcu_list](https://github.com/sysprog21/concurrent-programs/tree/master/rcu_list),解釋同為 lock-free 演算法,跟上述 [Hazard pointer](https://en.wikipedia.org/wiki/Hazard_pointer) 手法有何異同?能否指出 [rcu_list](https://github.com/sysprog21/concurrent-programs/tree/master/rcu_list) 實作缺陷並改進? TODO ---