Try   HackMD

2021q3 Homework2 (hp)

contributed by < fdgkhdkgh >

tags: linux2021 kernel

第二週隨堂測驗題

解釋上述程式碼運作原理

/* This returns the same value that is passed as ptr. * Progress condition: wait-free population oblivious. */ uintptr_t list_hp_protect_ptr(list_hp_t *hp, int ihp, uintptr_t ptr) { atomic_store(&hp->hp[tid()][ihp], ptr); return ptr; }

每當有一個 thread 想要使用某個 pointer 時,就需要將此 pointer 放進 hazard point 做保護,避免在 "list_hp_retire" 階段,將這一個 pointer 給 free 掉


/* Clear all hazard pointers in the array for the current thread. * Progress condition: wait-free bounded (by max_hps) */ void list_hp_clear(list_hp_t *hp) { for (int i = 0; i < hp->max_hps; i++) atomic_store_explicit(&hp->hp[tid()][i], 0, memory_order_release); }

當某一個 thread 使用完某一個 pointer 後,將其從 hazard point 移除,表示我們不需要再保護這個 pointer 了。若有人想要 free 掉這一塊記憶體,就請將它 free 掉吧。


bool list_delete(list_t *list, list_key_t key) { list_node_t *curr, *next; atomic_uintptr_t *prev; while (true) { // 在 list 裡尋找有沒有符合鍵值為 key 的節點,若無則直接返回刪除失敗。 if (!__list_find(list, &key, &prev, &curr, &next)) { list_hp_clear(list->hp); return false; } uintptr_t tmp = get_unmarked(next); // 假如 curr->next 已經被 mark 了, // 那就表示這個節點已經準備被其他 thread 給刪除了。 // 我們需要從頭開始進行新的一輪 // 刪除節點的動作 // 假如 curr->next 沒有被 mark // 那我們便將它 mark ,並準備將此 // 節點刪除 if (!CAS(&curr->next, &tmp, get_marked(next))) { TRACE(del); continue; } // 將 curr 指向的節點從 list // 中脫開鏈結 ( 但不會直接 free 掉此 pointer ) // 會藉由 list_hp_retire 來將 curr 丟進當前 thread 的 retire list。 tmp = get_unmarked(curr); if (CAS(prev, &tmp, get_unmarked(next))) { list_hp_clear(list->hp); list_hp_retire(list->hp, get_unmarked(curr)); } else { list_hp_clear(list->hp); } return true; } }

根據論文 第 4.3 小節可以得知,當我們想要刪除一個節點的時候,需要在該節點的 next 指標打上標記 ( 在 lowest bit 設 1 )。

以避免其他 thread 將新的節點與此待刪除的節點進行連接。


bool list_insert(list_t *list, list_key_t key) { list_node_t *curr = NULL, *next = NULL; atomic_uintptr_t *prev = NULL; list_node_t *node = list_node_new(key); while (true) { // 插入一個節點到 list 內。 // 假如使用 "__list_find" 找到了相同鍵值 ( key ) 的節點, // 就宣告插入失敗。 if (__list_find(list, &key, &prev, &curr, &next)) { list_node_destroy(node); list_hp_clear(list->hp); return false; } ATOMIC_STORE_EXPLICIT(&node->next, (uintptr_t) curr, memory_order_relaxed); // 假如 list 內沒有相同鍵值的節點, // 就表示我們可以進行插入了。 // 在進行插入時,會用 "CAS" 再做一次檢查, // 假如該點被 mark 的話, // 表示該節點有其他 thread 正在進行刪除, // 所以我們必須重新從 "__list_find" 開始新的一輪插入節點的動作。 uintptr_t tmp = get_unmarked(curr); if (CAS(prev, &tmp, (uintptr_t) node)) { list_hp_clear(list->hp); return true; } TRACE(ins); } }

static bool __list_find(list_t *list, list_key_t *key, atomic_uintptr_t **par_prev, list_node_t **par_curr, list_node_t **par_next) { atomic_uintptr_t *prev = NULL; list_node_t *curr = NULL, *next = NULL; try_again: prev = &list->head; curr = (list_node_t *) ATOMIC_LOAD(prev); (void) list_hp_protect_ptr(list->hp, HP_CURR, (uintptr_t) curr); if (ATOMIC_LOAD(prev) != get_unmarked(curr)) { TRACE(retry); goto try_again; } while (true) { if (is_marked(curr)) TRACE(contains); next = (list_node_t *) ATOMIC_LOAD(&get_unmarked_node(curr)->next); (void) list_hp_protect_ptr(list->hp, HP_NEXT, get_unmarked(next)); /* On a CAS failure, the search function, "__list_find," will simply * have to go backwards in the list until an unmarked element is found * from which the search in increasing key order can be started. */ /* * CAS 失敗,或是遇到任何被 mark 的節點,都會重頭開始搜尋, * 直到我們搜尋到沒有被 mark 的節點為止 ( 不論有無搜尋到符合鍵值的節點 )。 */ if (ATOMIC_LOAD(&get_unmarked_node(curr)->next) != (uintptr_t) next) { TRACE(retry); goto try_again; } if (ATOMIC_LOAD(prev) != get_unmarked(curr)) { TRACE(retry); goto try_again; } if (get_unmarked_node(next) == next) { if (!(get_unmarked_node(curr)->key < *key)) { *par_curr = curr; *par_prev = prev; *par_next = next; return (get_unmarked_node(curr)->key == *key); } prev = &get_unmarked_node(curr)->next; (void) list_hp_protect_release(list->hp, HP_PREV, get_unmarked(curr)); } else { uintptr_t tmp = get_unmarked(curr); if (!CAS(prev, &tmp, get_unmarked(next))) { TRACE(retry); goto try_again; } list_hp_retire(list->hp, get_unmarked(curr)); } curr = next; (void) list_hp_protect_release(list->hp, HP_CURR, get_unmarked(next)); TRACE(traversal); } }

/* Retire an object that is no longer in use by any thread, calling * the delete function that was specified in list_hp_new(). * * Progress condition: wait-free bounded (by the number of threads squared) */ void list_hp_retire(list_hp_t *hp, uintptr_t ptr) { // 用 CLPAD 是為了 cache friendly retirelist_t *rl = hp->rl[tid() * CLPAD]; rl->list[rl->size++] = ptr; assert(rl->size < HP_MAX_RETIRED); if (rl->size < HP_THRESHOLD_R) return; // 遍尋該 thread 的 retire list for (size_t iret = 0; iret < rl->size; iret++) { uintptr_t obj = rl->list[iret]; bool can_delete = true; // 遍尋每一個 thread 的 hazard pointers // 假如我們的 retire list 裡有 pointer 不是其他 thread 的 hazard pointer 就可以 // 成功的被 free 掉 // 反之,假如我們的 retire list 裡的 pointer 是其他 thread // 的 hazard pointer ,則不能被 free 掉 for (int itid = 0; itid < HP_MAX_THREADS && can_delete; itid++) { for (int ihp = hp->max_hps - 1; ihp >= 0; ihp--) { if (atomic_load(&hp->hp[itid][ihp]) == obj) { can_delete = false; break; } } } if (can_delete) { size_t bytes = (rl->size - iret) * sizeof(rl->list[0]); memmove(&rl->list[iret], &rl->list[iret + 1], bytes); rl->size--; hp->deletefunc((void *) obj); } } }

指出改進空間並著手實作

優化 list_hp_retire

int compareHazardPointers(const void *a, const void *b) { return *(atomic_uintptr_t *)a - *(atomic_uintptr_t *)b; } void list_hp_retire(list_hp_t *hp, uintptr_t ptr) { int i = 0; int num_of_delete = 0; retirelist_t *rl = hp->rl[tid() * CLPAD]; rl->list[rl->size++] = ptr; assert(rl->size < HP_MAX_RETIRED); if (rl->size < HP_THRESHOLD_R) return; atomic_uintptr_t *tempUintptrArray = (atomic_uintptr_t *)malloc(sizeof(atomic_uintptr_t) * HP_MAX_THREADS * hp->max_hps); i = 0; for (int itid = 0; itid < HP_MAX_THREADS; itid++) { for (int ihp = hp->max_hps - 1; ihp >= 0; ihp--) { tempUintptrArray[i++] = atomic_load(&hp->hp[itid][ihp]); } } qsort(tempUintptrArray, HP_MAX_THREADS * hp->max_hps, sizeof(atomic_uintptr_t), compareHazardPointers); i = 0; for(size_t iret = 0; iret < rl->size; iret++) { int left = 0; int right = HP_MAX_THREADS * hp->max_hps - 1; int found = 0; uintptr_t obj = rl->list[iret]; while(left <= right) { int mid = (left + right) / 2; if(tempUintptrArray[mid] > obj) { left = mid + 1; } else if(tempUintptrArray[mid] < obj) { right = mid - 1; } else { found = 1; } } if(found == 1) { rl->list[i++] = rl->list[iret]; } else { num_of_delete++; hp->deletefunc((void *) obj); } } rl->size -= num_of_delete; free(tempUintptrArray); }

可先收集所有的 Hazard Pointers , 經排序後,便可用二分搜尋來查看 retire list 內所有 pointer 是否為 hazard pointer。
令 Harzard Pointer 的數量為 M,retire list 裡 pointer 的數量為 N。
原本的複雜度為 O(M*N) ,經排序改良後可達到 O((M + N) * log(M))。 但因為我這邊使用 quick sort 作為簡單的展示,所以最差的複雜度仍需要 O(M * M + (M + N) * log(M))

且原本會在每次成功 free 一個 retire list 上的 pointer 時,都會用 memmove 去挪動全部 retire list 上被刪除的 pointer 後的所有 pointer,而這裡也省略了這一步驟。

不每次 "list_hp_retire" 都進行 Garbage collection

不每次 "list_hp_retire" 都進行 GC,雖然可以提昇運行效率,但會浪費記憶體的使用量

這個選項老師已經實作在 HP_THRESHOLD_R 這個 macro , 可供我們調整 retire list 最多可容納多少個 pointer

根據論文 3.1 小節 ,約可設置成 R = (Thread 數目) * (每個 thread 所擁有的 hazard pointer 數目)

對比 rcu_list 以及 Hazard pointer ?

對比 rcu_list,解釋同為 lock-free 演算法,跟上述 Hazard pointer 手法有何異同?

writer reader reclaim memory
RCU 只能有一個執行 使用 rcu_lock 以及 rcu_unlock 來幫助檢測寬限期 ( grace period ) 是否已經結束 grace period
Hazard Pointer 可以同時執行,多個 writer 有機會同時修改 使用 hazard pointer 來保護使用中的 pointer retire list

能否指出 rcu_list 實作缺陷並改進?

// TODO , 還在閱讀程式碼
// 在 deadline 前沒看完 QQ ,目前陷入了加班地獄,先繼續從作業三開始做