Try   HackMD

2021q3 Homework2 (hp)

contributed by < vectorr >

2021 年暑期 Linux 核心 第 2 週測驗題


1. 解釋上述程式碼運作原理

先看 list 提供給使用者的 API 如下。

list_new 會回傳一個可以使用的 list object ,使用完畢後要將此 list object 傳入 list_destroy 來釋放相關資源。

list_insert 會將傳入的 key 存放在此 list object 內,如果成功會回傳 true,失敗會回傳 false。 如果 list object 內已存在此 key ,算是失敗的一種狀況,會回傳 false

list_delete 會找此 list object 內是否有此 key,若有則把它從 list object 中移除。
有成功移除掉此 key 會回傳 true ,不然會回傳 false (包含 list object 中不存在此 key ,也是回傳 false)。

list_t *list_new(void);
void list_destroy(list_t *list);
bool list_insert(list_t *list, list_key_t key);
bool list_delete(list_t *list, list_key_t key);

一開始的 list object 會長的如下圖:







structs



s1

list_t

head

tail



s2

list_node_t

key  (0) 

next



s1:l1->s2





s3

list_node_t

key (MAX)

next



s1:l2->s3





s2:ln2->s3





list_insert 一開始會先呼叫 __list_find ,如果 list 內已經有要 insert 的 key 值,就回傳 false 。如果沒有,這時候回傳的 curr 就是指到含有 key 值大於要 insert 的 key 值裡最小的那個 list_node,prev 指到 curr 的前一個 list_node 的 next 指標 (即 *prev == curr) , next 指到 curr 的下一個 list_node。
__list_find 裡,已經將curr 的前一個 list_node (這個 list_node 包含的 next 的記憶體位置即是 prev) , currnext 指到的三個 list_node 的記憶體位置,存放在此 thread 所屬的 hazard pointers 裡,也就是這三個 list_nodes ,即使被其他的 thread retired ,記憶體位置也暫時不會被釋放,存取到這三個 list_nodes 暫時不會有危險。
接下來先將 curr 設到新創建出來的 list_node 的 next ,這時候還只是 thread local 的更動,還未公開讓其他 thread 知道這個新 list_node 的存在,可以使用 memory_orde_relaxed
接下來的 atomic_compare_exchange_strong 確保前一個 list_node 的 next 還是指到 curr (也是目前要插進來的新的 list_node 的 next), 是的話就把 *prev 設成這次要 insert 進來的新的 list_node,完成 insert。不是的話,代表執行到這裡,本來要插入的位置的前後關係有所改變了 (可能是其他 list_node 插入,或者是本來指到三個 list_node 有任一被刪除),整個流程從 __list_find 重頭來一次。

bool list_insert(list_t *list, list_key_t key) { list_node_t *curr = NULL, *next = NULL; atomic_uintptr_t *prev = NULL; list_node_t *node = list_node_new(key); while (true) { if (__list_find(list, &key, &prev, &curr, &next)) { list_node_destroy(node); list_hp_clear(list->hp); return false; } atomic_store_explicit(&node->next, (uintptr_t) curr, memory_order_relaxed); uintptr_t tmp = get_unmarked(curr); if (atomic_compare_exchange_strong(prev, &tmp, (uintptr_t) node)) { list_hp_clear(list->hp); return true; } } }

要 insert 一個新的 list_node 要比 delete 一個 list_node ,要來的單純。只要找到要 insert 的位置,譬如要 insert 一個新的 list_node C ,到原先的 list_node A 與 list_node B 之間,只要先把 C 的 next 設成 B ,然後在要把 A 的 next 設成 C 時,做一次 CAS (atomic_compare_exchange_strong) 確保當下的 A 的 next 確實還是 C 即可。如果不是,就是 A 或 C 的之間的關係有變化了,那麼重頭再找一次 C 要插入的位置。
要 delete 掉一個 list_node 就比較麻煩,不能只靠一次 CAS 來達到確保當下欲 delete 掉的 list_node 與前後 list_node 的關係。舉例來說, 要 delete 的 list_node B 目前在 list_node A 與 list_node C 之間,現在要做的是把 A 的 next 設成 C 即完成 delete B。
但是一次 CAS 只能拿來確認 A 跟 B 的關係是否維持,或者 B 跟 C 的關係是否維持,無法確保 A 與 B 與 C 的這三者關係的維持,所以在這裡引入一個機制,將欲被 delete 的 node 的 next 裡的值加了一個 mark bit 。
以上面的例子來說, B 的 next 裡的值會在用 CAS 確認當下是指到 C 後,被設一個 mark bit ,然後 B 跟 C 的關係就會在此時被維持住。
為什麼呢? 如果在 B 沒有被刪除的前提下, B 跟 C 的連結如果要被改變,只有兩種情況:

  1. 有新的 list_node 插入 B 跟 C 中間。
    此情況無法發生,因為 B 的 next 裡的值有 mark bit,所以在要插入新的 list_node 到 B 後面時,會在上面 list_insert 這個函式裡 line 17 的 CAS 失敗而無法插入。
  2. C 被刪除。
    此情況也無法發生,因為 B 的 next 裡的值有 mark bit,所以當要把 B 的 next 值設成 C 的 next 時,即 list_delete 函式 line 18 或 __list_find 函式 line 39 的 CAS 會回傳失敗而無法將 C 刪除。

當 B 與 C 的關係透過 mark bit 來維持住後,接下來要把 B 刪除掉,就只要再透過一次 CAS 確保當下 A 還是 B 的前一個 list_node。
是的話,就把 A 的 next 設成 C ,然後可以 retire B。不是的話也沒關係,因為 B 的 next 的 Mark bit 被設起來了,代表 B 是一個要被移除掉的 list_node,在之後其他的 list_insertlist_delete 呼叫 __list_find ,當輪詢經過 B 時即會將此 list_node 從 linked list 中移除,並將它 retired ( 即 __list_find 的 line 38 ~ 41 )。所以一旦 list_delete 執行到 line 18後,不管 CAS 成功與否,要被 delete 掉的 list_node 都不能再被使用,並且遲早會被從此 linked list 中移除掉,所以在 line 24 可以回傳 true 代表成功。

bool list_delete(list_t *list, list_key_t key) { list_node_t *curr, *next; atomic_uintptr_t *prev; while (true) { if (!__list_find(list, &key, &prev, &curr, &next)) { list_hp_clear(list->hp); return false; } uintptr_t tmp = get_unmarked(next); if (!atomic_compare_exchange_strong(&curr->next, &tmp, get_marked(next))) continue; tmp = get_unmarked(curr); if (atomic_compare_exchange_strong(prev, &tmp, get_unmarked(next))) { list_hp_clear(list->hp); list_hp_retire(list->hp, get_unmarked(curr)); } else { list_hp_clear(list->hp); } return true; } }

__list_find 中, prev 代表的是存放 curr 位址的指標的位址 ( 即 *prev == curr ), 一開始的初始值是 &list->head ( __list_find line 11 )。 curr 一開始是從 prev 取得 ( __list_find line 12 ),一旦取得立刻存入此 thread 的 hazard pointer 的 HP_CURR 裡。 接著再確認一次 prev 跟 curr 的關係是否不變,因為如果 curr 在被設進 hazard pointer 前就被其他 thread retired 的話,此時 prev 跟 curr 的關係已經改變了 ( 會先將 list_node 從 linked list 移除,才會 retire 此 list_node ),那麼就重頭尋找一次,如果 prev 跟 curr 的關係目前 (__list_find line 14) 還是沒有改變的話,代表之後 curr 如果被其他 thread retired 的話, curr 已經存在此 thread 的 hazard pointer 紀錄裡, curr 的記憶體位置不會被釋放掉,下面存取 curr 裡的 next 值時 ( __list_find line 17 )是安全的。
我們在把 next 加入此 thread 的 hazard pointers 紀錄裡的 HP_NEXT 位置後,也要檢查 next 與 curr 的關係是否保持不變,還有再次檢查 prev 跟 curr 的關係是否保持不變,只要其中有一個有變動,就重頭開始尋找,因為此 linked list 的目前此 thread 關注的狀態已經跟此 thread local 端的認知不同了。
到了 __list_find line 27 ,就是上面在解釋 list_delete 時所說的,檢查 curr 的 next 存的值的 mark bit 是否被設定, 若沒有被設定就會執行 __list_find 的 line 28 ~ 36,看是找到要擁有要找的 key 值或第一個比要找的 key 值大的 list_node ( line 29 ~ 32) 回傳目前的 prev, curr, next 以及是否找到符合 key 的 list_node 結果給 caller ; 或把 prev 設成 &curr->next 繼續輪詢 ( line 34 ~ 36)。若 curr->next 的 mark bit 被設起來了,那麼就把 curr 從此 linked list 中移除,並 retire 它。
然後都把 curr 設成 next,繼續往下一個 list_node 輪詢下去。

static bool __list_find(list_t *list, list_key_t *key, atomic_uintptr_t **par_prev, list_node_t **par_curr, list_node_t **par_next) { atomic_uintptr_t *prev = NULL; list_node_t *curr = NULL, *next = NULL; try_again: prev = &list->head; curr = (list_node_t *) atomic_load(prev); (void) list_hp_protect_ptr(list->hp, HP_CURR, (uintptr_t) curr); if (atomic_load(prev) != get_unmarked(curr)) goto try_again; while (true) { next = (list_node_t *) atomic_load(&get_unmarked_node(curr)->next); (void) list_hp_protect_ptr(list->hp, HP_NEXT, get_unmarked(next)); /* On a CAS failure, the search function, "__list_find," will simply * have to go backwards in the list until an unmarked element is found * from which the search in increasing key order can be started. */ if (atomic_load(&get_unmarked_node(curr)->next) != (uintptr_t) next) goto try_again; if (atomic_load(prev) != get_unmarked(curr)) goto try_again; if (get_unmarked_node(next) == next) { if (!(get_unmarked_node(curr)->key < *key)) { *par_curr = curr; *par_prev = prev; *par_next = next; return (get_unmarked_node(curr)->key == *key); } prev = &get_unmarked_node(curr)->next; (void) list_hp_protect_release(list->hp, HP_PREV, get_unmarked(curr)); } else { uintptr_t tmp = get_unmarked(curr); if (!atomic_compare_exchange_strong(prev, &tmp, get_unmarked(next))) goto try_again; list_hp_retire(list->hp, get_unmarked(curr)); } curr = next; (void) list_hp_protect_release(list->hp, HP_CURR, get_unmarked(next)); } }

2. 指出改進空間並著手實作

3. 對比 rcu_list,解釋同為 lock-free 演算法,跟上述 Hazard pointer 手法有何異同?能否指出 rcu_list 實作缺陷並改進?