contributed by < fdgkhdkgh >
linux2021
kernel
/* This returns the same value that is passed as ptr.
* Progress condition: wait-free population oblivious.
*/
uintptr_t list_hp_protect_ptr(list_hp_t *hp, int ihp, uintptr_t ptr)
{
atomic_store(&hp->hp[tid()][ihp], ptr);
return ptr;
}
每當有一個 thread 想要使用某個 pointer 時,就需要將此 pointer 放進 hazard point 做保護,避免在 "list_hp_retire" 階段,將這一個 pointer 給 free 掉
/* Clear all hazard pointers in the array for the current thread.
* Progress condition: wait-free bounded (by max_hps)
*/
void list_hp_clear(list_hp_t *hp)
{
for (int i = 0; i < hp->max_hps; i++)
atomic_store_explicit(&hp->hp[tid()][i], 0, memory_order_release);
}
當某一個 thread 使用完某一個 pointer 後,將其從 hazard point 移除,表示我們不需要再保護這個 pointer 了。若有人想要 free 掉這一塊記憶體,就請將它 free 掉吧。
bool list_delete(list_t *list, list_key_t key)
{
list_node_t *curr, *next;
atomic_uintptr_t *prev;
while (true) {
// 在 list 裡尋找有沒有符合鍵值為 key 的節點,若無則直接返回刪除失敗。
if (!__list_find(list, &key, &prev, &curr, &next)) {
list_hp_clear(list->hp);
return false;
}
uintptr_t tmp = get_unmarked(next);
// 假如 curr->next 已經被 mark 了,
// 那就表示這個節點已經準備被其他 thread 給刪除了。
// 我們需要從頭開始進行新的一輪
// 刪除節點的動作
// 假如 curr->next 沒有被 mark
// 那我們便將它 mark ,並準備將此
// 節點刪除
if (!CAS(&curr->next, &tmp, get_marked(next))) {
TRACE(del);
continue;
}
// 將 curr 指向的節點從 list
// 中脫開鏈結 ( 但不會直接 free 掉此 pointer )
// 會藉由 list_hp_retire 來將 curr 丟進當前 thread 的 retire list。
tmp = get_unmarked(curr);
if (CAS(prev, &tmp, get_unmarked(next))) {
list_hp_clear(list->hp);
list_hp_retire(list->hp, get_unmarked(curr));
} else {
list_hp_clear(list->hp);
}
return true;
}
}
根據論文 第 4.3 小節可以得知,當我們想要刪除一個節點的時候,需要在該節點的 next 指標打上標記 ( 在 lowest bit 設 1 )。
以避免其他 thread 將新的節點與此待刪除的節點進行連接。
bool list_insert(list_t *list, list_key_t key)
{
list_node_t *curr = NULL, *next = NULL;
atomic_uintptr_t *prev = NULL;
list_node_t *node = list_node_new(key);
while (true) {
// 插入一個節點到 list 內。
// 假如使用 "__list_find" 找到了相同鍵值 ( key ) 的節點,
// 就宣告插入失敗。
if (__list_find(list, &key, &prev, &curr, &next)) {
list_node_destroy(node);
list_hp_clear(list->hp);
return false;
}
ATOMIC_STORE_EXPLICIT(&node->next, (uintptr_t) curr,
memory_order_relaxed);
// 假如 list 內沒有相同鍵值的節點,
// 就表示我們可以進行插入了。
// 在進行插入時,會用 "CAS" 再做一次檢查,
// 假如該點被 mark 的話,
// 表示該節點有其他 thread 正在進行刪除,
// 所以我們必須重新從 "__list_find" 開始新的一輪插入節點的動作。
uintptr_t tmp = get_unmarked(curr);
if (CAS(prev, &tmp, (uintptr_t) node)) {
list_hp_clear(list->hp);
return true;
}
TRACE(ins);
}
}
static bool __list_find(list_t *list,
list_key_t *key,
atomic_uintptr_t **par_prev,
list_node_t **par_curr,
list_node_t **par_next)
{
atomic_uintptr_t *prev = NULL;
list_node_t *curr = NULL, *next = NULL;
try_again:
prev = &list->head;
curr = (list_node_t *) ATOMIC_LOAD(prev);
(void) list_hp_protect_ptr(list->hp, HP_CURR, (uintptr_t) curr);
if (ATOMIC_LOAD(prev) != get_unmarked(curr)) {
TRACE(retry);
goto try_again;
}
while (true) {
if (is_marked(curr))
TRACE(contains);
next = (list_node_t *) ATOMIC_LOAD(&get_unmarked_node(curr)->next);
(void) list_hp_protect_ptr(list->hp, HP_NEXT, get_unmarked(next));
/* On a CAS failure, the search function, "__list_find," will simply
* have to go backwards in the list until an unmarked element is found
* from which the search in increasing key order can be started.
*/
/*
* CAS 失敗,或是遇到任何被 mark 的節點,都會重頭開始搜尋,
* 直到我們搜尋到沒有被 mark 的節點為止 ( 不論有無搜尋到符合鍵值的節點 )。
*/
if (ATOMIC_LOAD(&get_unmarked_node(curr)->next) != (uintptr_t) next) {
TRACE(retry);
goto try_again;
}
if (ATOMIC_LOAD(prev) != get_unmarked(curr)) {
TRACE(retry);
goto try_again;
}
if (get_unmarked_node(next) == next) {
if (!(get_unmarked_node(curr)->key < *key)) {
*par_curr = curr;
*par_prev = prev;
*par_next = next;
return (get_unmarked_node(curr)->key == *key);
}
prev = &get_unmarked_node(curr)->next;
(void) list_hp_protect_release(list->hp, HP_PREV,
get_unmarked(curr));
} else {
uintptr_t tmp = get_unmarked(curr);
if (!CAS(prev, &tmp, get_unmarked(next))) {
TRACE(retry);
goto try_again;
}
list_hp_retire(list->hp, get_unmarked(curr));
}
curr = next;
(void) list_hp_protect_release(list->hp, HP_CURR, get_unmarked(next));
TRACE(traversal);
}
}
/* Retire an object that is no longer in use by any thread, calling
* the delete function that was specified in list_hp_new().
*
* Progress condition: wait-free bounded (by the number of threads squared)
*/
void list_hp_retire(list_hp_t *hp, uintptr_t ptr)
{
// 用 CLPAD 是為了 cache friendly
retirelist_t *rl = hp->rl[tid() * CLPAD];
rl->list[rl->size++] = ptr;
assert(rl->size < HP_MAX_RETIRED);
if (rl->size < HP_THRESHOLD_R)
return;
// 遍尋該 thread 的 retire list
for (size_t iret = 0; iret < rl->size; iret++) {
uintptr_t obj = rl->list[iret];
bool can_delete = true;
// 遍尋每一個 thread 的 hazard pointers
// 假如我們的 retire list 裡有 pointer 不是其他 thread 的 hazard pointer 就可以
// 成功的被 free 掉
// 反之,假如我們的 retire list 裡的 pointer 是其他 thread
// 的 hazard pointer ,則不能被 free 掉
for (int itid = 0; itid < HP_MAX_THREADS && can_delete; itid++) {
for (int ihp = hp->max_hps - 1; ihp >= 0; ihp--) {
if (atomic_load(&hp->hp[itid][ihp]) == obj) {
can_delete = false;
break;
}
}
}
if (can_delete) {
size_t bytes = (rl->size - iret) * sizeof(rl->list[0]);
memmove(&rl->list[iret], &rl->list[iret + 1], bytes);
rl->size--;
hp->deletefunc((void *) obj);
}
}
}
int compareHazardPointers(const void *a, const void *b) {
return *(atomic_uintptr_t *)a - *(atomic_uintptr_t *)b;
}
void list_hp_retire(list_hp_t *hp, uintptr_t ptr)
{
int i = 0;
int num_of_delete = 0;
retirelist_t *rl = hp->rl[tid() * CLPAD];
rl->list[rl->size++] = ptr;
assert(rl->size < HP_MAX_RETIRED);
if (rl->size < HP_THRESHOLD_R)
return;
atomic_uintptr_t *tempUintptrArray = (atomic_uintptr_t *)malloc(sizeof(atomic_uintptr_t) * HP_MAX_THREADS * hp->max_hps);
i = 0;
for (int itid = 0; itid < HP_MAX_THREADS; itid++) {
for (int ihp = hp->max_hps - 1; ihp >= 0; ihp--) {
tempUintptrArray[i++] = atomic_load(&hp->hp[itid][ihp]);
}
}
qsort(tempUintptrArray, HP_MAX_THREADS * hp->max_hps, sizeof(atomic_uintptr_t), compareHazardPointers);
i = 0;
for(size_t iret = 0; iret < rl->size; iret++) {
int left = 0;
int right = HP_MAX_THREADS * hp->max_hps - 1;
int found = 0;
uintptr_t obj = rl->list[iret];
while(left <= right) {
int mid = (left + right) / 2;
if(tempUintptrArray[mid] > obj) {
left = mid + 1;
} else if(tempUintptrArray[mid] < obj) {
right = mid - 1;
} else {
found = 1;
}
}
if(found == 1) {
rl->list[i++] = rl->list[iret];
} else {
num_of_delete++;
hp->deletefunc((void *) obj);
}
}
rl->size -= num_of_delete;
free(tempUintptrArray);
}
可先收集所有的 Hazard Pointers , 經排序後,便可用二分搜尋來查看 retire list 內所有 pointer 是否為 hazard pointer。
令 Harzard Pointer 的數量為 M,retire list 裡 pointer 的數量為 N。
原本的複雜度為 O(M*N) ,經排序改良後可達到 O((M + N) * log(M))。 但因為我這邊使用 quick sort 作為簡單的展示,所以最差的複雜度仍需要 O(M * M + (M + N) * log(M))
且原本會在每次成功 free 一個 retire list 上的 pointer 時,都會用 memmove 去挪動全部 retire list 上被刪除的 pointer 後的所有 pointer,而這裡也省略了這一步驟。
不每次 "list_hp_retire" 都進行 GC,雖然可以提昇運行效率,但會浪費記憶體的使用量
這個選項老師已經實作在 HP_THRESHOLD_R 這個 macro , 可供我們調整 retire list 最多可容納多少個 pointer
根據論文 3.1 小節 ,約可設置成 R = (Thread 數目) * (每個 thread 所擁有的 hazard pointer 數目)
writer | reader | reclaim memory | |
---|---|---|---|
RCU | 只能有一個執行 | 使用 rcu_lock 以及 rcu_unlock 來幫助檢測寬限期 ( grace period ) 是否已經結束 | grace period |
Hazard Pointer | 可以同時執行,多個 writer 有機會同時修改 | 使用 hazard pointer 來保護使用中的 pointer | retire list |
// TODO , 還在閱讀程式碼
// 在 deadline 前沒看完 QQ ,目前陷入了加班地獄,先繼續從作業三開始做