# 2021q3 Homework2 (hp) contributed by < `u1f383` > ## 1. 解釋上述程式碼運作原理 首先介紹 macro 以及資料結構: ``` c #define HP_MAX_THREADS 128 /* hp 能 handle 的最大 thread 數量 */ #define HP_MAX_HPS 5 /* 最多可以維護多少個 hp */ /** * 為了避免 false sharing 的問題,必須將 struct padding 至 cacheline 的倍數 (64 * n) * CLPAD 應該是 cacheline padding 的縮寫 * > 看了人家的紀錄,intel 一次 cache 的資料量為 128 bytes */ #define CLPAD (128 / sizeof(uintptr_t)) // 16 #define HP_THRESHOLD_R 0 /* This is named 'R' in the HP paper */ /* 每個 thread 擁有的最大 retired objects 數量 */ #define HP_MAX_RETIRED (HP_MAX_THREADS * HP_MAX_HPS) #define TID_UNKNOWN -1 /* 每個 thread 會配發一個 tid,而在還沒配發之前都是 -1 */ typedef struct { int size; uintptr_t *list; } retirelist_t; typedef struct list_hp list_hp_t; typedef void(list_hp_deletefunc_t)(void *); /** * hazard pointer 的 list 結構 * @max_hps: 該 list 中最多能有多少 hp * @hp: 儲存指向 hp, array 的 index 代表對應的 thread * @rl: 指向 retire_list, array 的 index 代表對應的 thread * @deletefunc: 指向可以用來刪除 hp node 的 function */ struct list_hp { int max_hps; /* atomic_uintptr_t 可以想像成就是 unsigned long */ alignas(128) atomic_uintptr_t *hp[HP_MAX_THREADS]; /* 每個 element 間隔 128 bytes */ alignas(128) retirelist_t *rl[HP_MAX_THREADS * CLPAD]; list_hp_deletefunc_t *deletefunc; }; /** * 設定 tid_v 為 per-thread data,一開始的初始值即為 -1, * 用來代表每個 thread (類似 thread id) */ static thread_local int tid_v = TID_UNKNOWN; /* 此 function 用來初始化 atomic type variable */ static atomic_int_fast32_t tid_v_base = ATOMIC_VAR_INIT(0); ``` 新建一個大小為 `HP_MAX_HPS` 的 hp array,並且考慮到 cacheline 作了許多優化,可謂說以空間 (memory space) 換取時間 (no false sharing): ``` c /* Create a new hazard pointer array of size 'max_hps' (or a reasonable * default value if 'max_hps' is 0). The function 'deletefunc' will be * used to delete objects protected by hazard pointers when it becomes * safe to retire them. */ list_hp_t *list_hp_new(size_t max_hps, list_hp_deletefunc_t *deletefunc) { list_hp_t *hp = aligned_alloc(128, sizeof(*hp)); assert(hp); /* 如果沒有給定 max_hps,預設值就是 HP_MAX_HPS (5) */ if (max_hps == 0) max_hps = HP_MAX_HPS; /** * 初始化 list_hp 部分的 member,仍有 hp 以及 rl 尚未初始化 * 而這邊 deletefunc 會是 __list_node_delete */ *hp = (list_hp_t){.max_hps = max_hps, .deletefunc = deletefunc}; for (int i = 0; i < HP_MAX_THREADS; i++) { /** * 猜測 CLPAD * 2 的原因是 hp->hp[i][0] 為 uint (4 bytes), * 原本是以 sizeof(uintptr_t) 作為最小單位,現在必須改成 4 bytes, * 因此 padding 的 size 必須要 * 2 * * hp 一共有 [HP_MAX_THREADS][128] **/ hp->hp[i] = calloc(CLPAD * 2, sizeof(hp->hp[i][0])); /** * rl 每個 CLPAD 為一個單位去分配記憶體,而資料型態又是 uintptr_t, * 因此每個 rl element 間隔 128 bytes */ hp->rl[i * CLPAD] = calloc(1, sizeof(*hp->rl[0])); for (int j = 0; j < hp->max_hps; j++) /* 將初始值都設為 0 */ atomic_init(&hp->hp[i][j], 0); /* 每個 thread 分配 (最大 thread 數 * 最多 HP 數) 的記憶體空間 */ hp->rl[i * CLPAD]->list = calloc(HP_MAX_RETIRED, sizeof(uintptr_t)); } return hp; } ``` 當 hp array 使用完畢,在結束程式前必須將其刪除以釋放空間: ``` c /* Destroy a hazard pointer array and clean up all objects protected * by hazard pointers. */ void list_hp_destroy(list_hp_t *hp) { /** * 這邊就是清空每個 hp array element 使用到的資源, * 不過到這仍不確定每個 data struct 的使用方式 / 行為 */ for (int i = 0; i < HP_MAX_THREADS; i++) { free(hp->hp[i]); retirelist_t *rl = hp->rl[i * CLPAD]; for (int j = 0; j < rl->size; j++) { void *data = (void *) rl->list[j]; hp->deletefunc(data); } free(rl->list); free(rl); } free(hp); } ``` 當某個 thread 在 hp 使用完畢後,必須要清空 hp 而非刪除,只需要把 hp list 對到的 hp 設為 0 即可: ``` c /* Clear all hazard pointers in the array for the current thread. * Progress condition: wait-free bounded (by max_hps) */ void list_hp_clear(list_hp_t *hp) { for (int i = 0; i < hp->max_hps; i++) atomic_store_explicit(&hp->hp[tid()][i], 0, memory_order_release); } ``` 當某 thread 正在使用 ptr 時,可以將 ptr 加進 hp list 當中保護,其他 thread 在 release node 時會先檢查是否在其他 thread 的 hp 內,如果沒有的話才會 release: ``` c /* This returns the same value that is passed as ptr. * Progress condition: wait-free population oblivious. */ uintptr_t list_hp_protect_ptr(list_hp_t *hp, int ihp, uintptr_t ptr) { /* ihp 為 hp index */ atomic_store(&hp->hp[tid()][ihp], ptr); return ptr; } ``` 操作完畢後,若 ptr 沒有要受到保護時就可以執行 `protect_release`,從 hp list 當中移除: ``` c /* Same as list_hp_protect_ptr(), but explicitly uses memory_order_release. * Progress condition: wait-free population oblivious. */ uintptr_t list_hp_protect_release(list_hp_t *hp, int ihp, uintptr_t ptr) { atomic_store_explicit(&hp->hp[tid()][ihp], ptr, memory_order_release); return ptr; } ``` 把一個不再需要用到的 ptr 給釋放,但是釋放前必須再三確定沒人在使用: ``` c /* Retire an object that is no longer in use by any thread, calling * the delete function that was specified in list_hp_new(). * * Progress condition: wait-free bounded (by the number of threads squared) */ void list_hp_retire(list_hp_t *hp, uintptr_t ptr) { /* 取得該 thread 的 retire list */ retirelist_t *rl = hp->rl[tid() * CLPAD]; /* 新增一個要被釋放的 object */ rl->list[rl->size++] = ptr; assert(rl->size < HP_MAX_RETIRED); /** * 當數量小於 HP_THRESHOLD_R,就不先 retire object * 而是等到 threshold 到達時在一次釋放 */ if (rl->size < HP_THRESHOLD_R) return; for (size_t iret = 0 /* iret == retire index (?) */; iret < rl->size; iret++) { uintptr_t obj = rl->list[iret]; bool can_delete = true; /* 遍歷每個 thread */ for (int itid = 0; itid < HP_MAX_THREADS && can_delete; itid++) { /* 遍歷每個 thread 的 hp list*/ for (int ihp = hp->max_hps - 1; ihp >= 0; ihp--) { /* reverse order 檢查有沒有在 hp 當中,如果有的話就不能 delete */ if (atomic_load(&hp->hp[itid][ihp]) == obj) { can_delete = false; break; } } } if (can_delete) { size_t bytes = (rl->size - iret) * sizeof(rl->list[0]); /** * 從 &rl->list[iret + 1] copy "bytes" 數量的資料到 &rl->list[iret], * 意思就是把之後的資料整個往前挪 */ memmove(&rl->list[iret], &rl->list[iret + 1], bytes); rl->size--; // RRR; hp->deletefunc((void *) obj); iret--; } } } ``` 而之後的部分屬於 linked list 的實作,一樣先看使用到哪些 macro、function prototype 以及 data structure: ``` c #include <pthread.h> #define N_ELEMENTS 128 /* list 最多的 element 數量 */ #define N_THREADS (64) /* 使用多少 thread */ /** * NNN = 64 的原因在於,輸出結果為 4098,而其中兩個 node 為頭尾, * 每個 thread 則會 insert 128 個,因此 (4098 - 2) / 128 = 32, * 但是實際上 delete 也占了一半,因此總共 thread 的數量為 64 */ #define MAX_THREADS 128 /* 用於紀錄刪除以及插入的次數 */ static atomic_uint_fast32_t deletes = 0, inserts = 0; /** * hp 的 enum,感覺應該要放在上面 (?),也有可能是因為現在這個部分 * 比較偏向 user 自己定義 */ enum { HP_NEXT = 0, HP_CURR = 1, HP_PREV }; #define is_marked(p) (bool) ((uintptr_t)(p) &0x01) #define get_marked(p) ((uintptr_t)(p) | (0x01)) #define get_unmarked(p) ((uintptr_t)(p) & (~0x01)) #define get_marked_node(p) ((list_node_t *) get_marked(p)) #define get_unmarked_node(p) ((list_node_t *) get_unmarked(p)) typedef uintptr_t list_key_t; typedef struct list_node { alignas(128) uint32_t magic; /* 放 magic number (0xdeadbeef) */ alignas(128) atomic_uintptr_t next; list_key_t key; } list_node_t; /* Per list variables */ typedef struct list { atomic_uintptr_t head, tail; /* 每個 linked list 都會 maintain 一個 hp array */ list_hp_t *hp; } list_t; #define LIST_MAGIC (0xDEADBEAF) /* list element 的 magic number */ ``` 接下來是一些常見的 linked list function,像是 delete / insert / find / new 等等。 刪除 node 使用的是 `list_node_destroy()`: ``` c void list_node_destroy(list_node_t *node) { if (!node) /* invalid */ return; /* 檢查 node 是否有被意外改寫 */ assert(node->magic == LIST_MAGIC); free(node); /* delete 成功時 delete counter + 1 */ (void) atomic_fetch_add(&deletes, 1); } ``` 而 `__list_node_delete()` 為 `list_node_destroy()` 的 wrapper function,同時也是 hp 的 deletefunc: ``` c static void __list_node_delete(void *arg) { list_node_t *node = (list_node_t *) arg; list_node_destroy(node); } ``` find node function 可以說是 hp 的關鍵,用來找 `key` 對應到的 node 是否存在於 list 當中,並且過程中並無使用 lock: ``` c static bool __list_find(list_t *list, list_key_t *key, atomic_uintptr_t **par_prev, list_node_t **par_curr, list_node_t **par_next) { atomic_uintptr_t *prev = NULL; list_node_t *curr = NULL, *next = NULL; try_again: prev = &list->head; curr = (list_node_t *) atomic_load(prev); /* 告訴大家正在使用 curr 此 ptr */ (void) list_hp_protect_ptr(list->hp, HP_CURR, (uintptr_t) curr); /** * 在過程中原本的 curr (old list->head) 被更新,必須重來, * 因為有可能此時 curr (old list->head) 已經被 delete 了 */ if (atomic_load(prev) != get_unmarked(curr)) goto try_again; /* 確保目前 curr 不會被動到 */ while (true) { /* curr 是 NULL ptr,代表整個 list 已經空了 */ if (!get_unmarked_node(curr)) return false; /* 取得 curr->next */ next = (list_node_t *) atomic_load(&get_unmarked_node(curr)->next); /* 一樣加到 hp array 當中保護 */ (void) list_hp_protect_ptr(list->hp, HP_NEXT, get_unmarked(next)); /** * 結果在加到 hp 的過程中 next 又被更動,代表已經被 delete 掉, * 因為只有 list_delete() 會動到 curr->next * ...應該吧 @_@ */ if (atomic_load(&get_unmarked_node(curr)->next) != (uintptr_t) next) break; /** * next 為 curr->next,也就是 list->head->next,如果接到 tail, * 代表 list 是空的 */ if (get_unmarked(next) == atomic_load((atomic_uintptr_t *) &list->tail)) break; /* 在過程中 list->head 又被更動,可能已經不一樣了,所以必須重新來過 */ /* 因為我們用 hp 保護 curr 以及 next,因此不需要考慮到他們被 delete 的狀況 */ if (atomic_load(prev) != get_unmarked(curr)) goto try_again; /* 當 next 被 marked 時 condition 為 false */ if (get_unmarked_node(next) == next) { /* 當前 node 的 key >= 要找的 key */ if (!(get_unmarked_node(curr)->key < *key)) { /* par 是 parallel 的意思嗎 ? */ *par_curr = curr; *par_prev = prev; *par_next = next; /* 看是不是要找的 node */ // return FFF; return get_unmarked_node(curr)->key == *key; } /* prev 更新成下一個 node @_@? */ prev = &get_unmarked_node(curr)->next; /* 將 curr 設為 hp array 中的 PREV 作為保護 */ (void) list_hp_protect_release(list->hp, HP_PREV, get_unmarked(curr)); } else { uintptr_t tmp = get_unmarked(curr); if (!atomic_compare_exchange_strong(prev, &tmp, get_unmarked(next))) goto try_again; list_hp_retire(list->hp, get_unmarked(curr)); } /* 更新 curr 成 next,代表要繼續 traverse */ curr = next; /** * 更新 curr 並用 hp array 作保護, * 而 release 代表用 memory_order_release */ (void) list_hp_protect_release(list->hp, HP_CURR, get_unmarked(next)); } /* 沒有很懂這三個是什麼 @_@ */ *par_curr = curr; *par_prev = prev; *par_next = next; return false; } ``` insert node 時呼叫 `list_insert()`: ``` c bool list_insert(list_t *list, list_key_t key) { list_node_t *curr = NULL, *next = NULL; atomic_uintptr_t *prev = NULL; /* 新增一個 node */ list_node_t *node = list_node_new(key); while (true) { if (__list_find(list, &key, &prev, &curr, &next)) { /* 如果 node 重複,就不新增,釋放 node 以及清空 hp */ list_node_destroy(node); list_hp_clear(list->hp); return false; } /* 如果沒找到對應 key 的 node,就將 node 加在 list 的一開始 */ atomic_store_explicit(&node->next, (uintptr_t) curr, memory_order_relaxed); uintptr_t tmp = get_unmarked(curr); /** * 如果 prev == curr 則 prev = node,不過 prev 也沒用到, * 為什麼不直接 compare 就好 ? */ if (atomic_compare_exchange_strong(prev, &tmp, (uintptr_t) node)) { /* 確實新增成功,清空 hp array 後 return */ list_hp_clear(list->hp); return true; } } } ``` 使用 `list_delete()` 刪除 node: ``` c bool list_delete(list_t *list, list_key_t key) { list_node_t *curr, *next; atomic_uintptr_t *prev; while (true) { if (!__list_find(list, &key, &prev, &curr, &next)) { /* 如果找不到對應的 node 也不需要 delete 了 */ list_hp_clear(list->hp); return false; } uintptr_t tmp = get_unmarked(next); /* 將 next mark 起來,代表上一個點有被動到 */ if (!atomic_compare_exchange_strong(&curr->next, &tmp, get_marked(next))) continue; tmp = get_unmarked(curr); if (atomic_compare_exchange_strong(prev, &tmp, get_unmarked(next))) { list_hp_clear(list->hp); // DDD; /* 將當前的點 retire,表示刪除 */ list_hp_retire(list->hp, get_unmarked(curr)); } else { list_hp_clear(list->hp); } return true; } } ``` 新增新的 list 時呼叫 `list_new()`: ``` c list_t *list_new(void) { /* 分配一個 list 大小的空間 */ list_t *list = calloc(1, sizeof(*list)); assert(list); /* 新增頭尾兩個點 */ list_node_t *head = list_node_new(0), *tail = list_node_new(UINTPTR_MAX /* 0xffffffff */); assert(head), assert(tail); /* 新增一個 hp */ list_hp_t *hp = list_hp_new(3, __list_node_delete); /** * list->head -> head ---next-- * | * list->tail -> tail <--------| */ atomic_init(&head->next, (uintptr_t) tail); *list = (list_t){.hp = hp}; /* assign hp */ atomic_init(&list->head, (uintptr_t) head); atomic_init(&list->tail, (uintptr_t) tail); return list; } ``` 而 `insert_thread()` 以及 `delete_thread()` 為 thread 執行的兩個 function,主要就是持續新增以及刪除 node: ``` c static uintptr_t elements[MAX_THREADS + 1][N_ELEMENTS]; static void *insert_thread(void *arg) { list_t *list = (list_t *) arg; for (size_t i = 0; i < N_ELEMENTS; i++) (void) list_insert(list, (uintptr_t) &elements[tid()][i]); return NULL; } static void *delete_thread(void *arg) { list_t *list = (list_t *) arg; for (size_t i = 0; i < N_ELEMENTS; i++) (void) list_delete(list, (uintptr_t) &elements[tid()][i]); return NULL; } ``` `main()`: ``` c int main(void) { list_t *list = list_new(); /* 建立 + 初始化 list */ pthread_t thr[N_THREADS]; /* delete_thread 以及 insert_thread 各半 */ for (size_t i = 0; i < N_THREADS; i++) pthread_create(&thr[i], NULL, (i & 1) ? delete_thread : insert_thread, list); for (size_t i = 0; i < N_THREADS; i++) pthread_join(thr[i], NULL); /* 確實將每個 node delete 乾淨 */ for (size_t i = 0; i < N_ELEMENTS; i++) { for (size_t j = 0; j < tid_v_base; j++) list_delete(list, (uintptr_t) &elements[j][i]); } /* 釋放 list 的資源 */ list_destroy(list); fprintf(stderr, "inserts = %zu, deletes = %zu\n", atomic_load(&inserts), atomic_load(&deletes)); return 0; } ``` ## 2. 指出改進空間並著手實作 執行`list_hp_retire()` 時,會將即將被刪除的 node 之後的資料往前移動,不過 iret 在下一個 iteration 時還是會執行 `iret++`,可是 `rl->list[iret]` 還沒處理過,因此加上 `iret--` 使下次 iteration 時能走訪到還沒被處理的資料 ``` c hp->deletefunc((void *) obj); iret--; ``` P.S. `__list_find()` 的實作看不是很懂,所以也沒辦法做更深入的優化 ## 3. 對比 rcu_list,解釋同為 lock-free 演算法,跟上述 Hazard pointer 手法有何異同?能否指出 rcu_list 實作缺陷並改進? 程式碼中定義 rcu 結構的部分: ``` c /* A concurrent linked list utilizing the simplified RCU algorithm */ #include <stdbool.h> typedef struct rcu_list rcu_list_t; /* 用來遍歷整個 list 的 iterator type */ typedef struct { struct list_node *entry; } iterator_t; typedef struct { struct rcu_list *list; struct zombie_node *zombie; } rcu_handle_t; typedef rcu_handle_t read_handle_t; typedef rcu_handle_t write_handle_t; typedef void (*deleter_func_t)(void *); typedef bool (*finder_func_t)(void *, void *); #define _GNU_SOURCE #include <pthread.h> #include <stdbool.h> #include <stdlib.h> /* list node 的資料結構,為雙向的 linked list */ typedef struct list_node { bool deleted; struct list_node *next, *prev; void *data; } list_node_t; /* 為單向的 linked list */ typedef struct zombie_node { struct zombie_node *next; struct list_node *zombie; rcu_handle_t *owner; } zombie_node_t; /** * rcu list 的資料結構 * @write_lock: 用來限制只能有一個 writer 的 mutex lock * @head, tail: 指向 list 頭尾的 pointer * @zombie_head: 指向 zombie list * @deleter: 為指向 delete function 的 function pointer */ struct rcu_list { pthread_mutex_t write_lock; /* exclusive lock acquired by writers */ list_node_t *head, *tail; /* head and tail of the "live" list */ zombie_node_t *zombie_head; /* head of the zombie list */ deleter_func_t deleter; }; static list_node_t *make_node(void *data); static zombie_node_t *make_zombie_node(void); static void lock_for_write(rcu_list_t *list); static void unlock_for_write(rcu_list_t *list); ``` 新增一個初始化 delete function pointer 的 list: ``` c static rcu_list_t *list_new_with_deleter(deleter_func_t deleter) { if (!deleter) return NULL; rcu_list_t *list = malloc(sizeof(rcu_list_t)); if (!list) return NULL; /* 初始化 mutex lock */ if (pthread_mutex_init(&list->write_lock, NULL) != 0) { free(list); return NULL; } list->head = list->tail = NULL; list->zombie_head = NULL; /* 初始化 delete func ptr 指向 deleter */ list->deleter = deleter; return list; } ``` 釋放 rcu list: ``` c static void list_free(void *arg) { rcu_list_t *list = arg; /* iterator 遍歷整格 list,一個個刪除 rcu 上的 node */ for (list_node_t *iter = list->head; iter;) { list_node_t *tmp = iter->next; free(iter->data); free(iter); iter = tmp; } free(list); } ``` 新增一個以 `list_free()` 作為 delete function 的 list,因此在這個例子中 delete == free: ``` c rcu_list_t *list_new(void) { return list_new_with_deleter(list_free); } ``` list delete wrapper: ``` c void list_delete(rcu_list_t *list) { /* 檢查 list 以及 list->deleter 存在 */ if (!list || !list->deleter) return; list->deleter(list); } ``` 新增一筆資料到 list 中,而過程中會先為資料新增一個 node,並以 node 的形式加到 list 當中: ``` c void list_push_front(rcu_list_t *list, void *data, write_handle_t *handle) { if (!list) return; /* 為 data 新增一個 node */ list_node_t *node = make_node(data); if (!node) return; /* 為 list 上鎖,確保一次只有一個 writer (updater) */ /* 都用 lock 了,為什麼還需要 atomic ? */ lock_for_write(list); list_node_t *old_head; __atomic_load(&list->head, &old_head, __ATOMIC_RELAXED); if (!old_head) { /* list is currently empty */ /* empty list,頭尾直接指向新的 node */ __atomic_store(&list->head, &node, __ATOMIC_SEQ_CST); __atomic_store(&list->tail, &node, __ATOMIC_SEQ_CST); } else { /* general case */ /* 將 node 加到 list 的開頭 */ __atomic_store(&node->next, &old_head, __ATOMIC_SEQ_CST); __atomic_store(&old_head->prev, &node, __ATOMIC_SEQ_CST); __atomic_store(&list->head, &node, __ATOMIC_SEQ_CST); } unlock_for_write(list); } ``` list 找是否有某個 node maintain 特定的 data: ``` c iterator_t list_find(rcu_list_t *list, void *data, finder_func_t finder, read_handle_t *handle) { iterator_t iter = {.entry = NULL}; /* initialize an invalid iterator */ if (!list) return iter; list_node_t *current; __atomic_load(&list->head, &current, __ATOMIC_SEQ_CST); /* traverse 所有的 list node,並利用 finder 來檢測資料是否相同 */ while (current) { if (finder(current->data, data)) { iter.entry = current; break; } __atomic_load(&current->next, &current, __ATOMIC_SEQ_CST); } return iter; } ``` 指向 list 開頭的 iterator: ``` c /* handler 根本沒被執行 @_@ */ iterator_t list_begin(rcu_list_t *list, read_handle_t *handle) { iterator_t iter = {.entry = NULL}; if (!list) return iter; list_node_t *head; __atomic_load(&list->head, &head, __ATOMIC_SEQ_CST); iter.entry = head; return iter; } ``` 回傳 iterator 指向的 data: ``` c void *iterator_get(iterator_t *iter) { /* 基本檢測 iter 以及 iter->entry 是否存在 */ return (iter && iter->entry) ? iter->entry->data : NULL; } ``` 為 list 註冊 reader / writer handler: ``` c read_handle_t list_register_reader(rcu_list_t *list) { read_handle_t handle = {.list = list, .zombie = NULL}; return handle; } write_handle_t list_register_writer(rcu_list_t *list) { write_handle_t handle = {.list = list, .zombie = NULL}; return handle; } ``` `rcu_read_lock()` 看起來是在新增一個 zombie node,猜測是用 zombie node 作為 reader 在過渡 update 的時候存取的值: ``` c void rcu_read_lock(read_handle_t *handle) { /* 新增一個 zombie node */ zombie_node_t *z_node = make_zombie_node(); /* 標記 zombie node 的主人 */ z_node->owner = handle; /* 更新該 handler 指向的 zombie node */ handle->zombie = z_node; rcu_list_t *list = handle->list; zombie_node_t *old_head; /* 取得當前的 zombie_head */ __atomic_load(&list->zombie_head, &old_head, __ATOMIC_SEQ_CST); do { /** * 將新增的 node 的 next 指向當前的 zombie_head, * 並且嘗試將自己更新成 zombie head */ __atomic_store(&z_node->next, &old_head, __ATOMIC_SEQ_CST); } while (!__atomic_compare_exchange(&list->zombie_head, &old_head, &z_node, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)); } ``` unlock 呢,既然 lock 是以 zombie node 作為記號, unlock 應該是將 zombie node 移除: ``` c void rcu_read_unlock(read_handle_t *handle) { /* 取得 handle maintain 的 zombie node */ zombie_node_t *z_node = handle->zombie; zombie_node_t *cached_next; __atomic_load(&z_node->next, &cached_next, __ATOMIC_SEQ_CST); bool last = true; /* walk through the zombie list to determine if this is the last active * reader in the list. */ zombie_node_t *n = cached_next; /** * 走訪 handle->zombie 之後的 node,看是否 handle->zombie * 是最後一個 active 的 node */ while (n) { list_node_t *owner; __atomic_load(&n->owner, &owner, __ATOMIC_SEQ_CST); /* 後面還有 zombie 屬於某個 handler,代表不是最後一個 */ if (owner) { last = false; /* this is not the last active reader */ break; } __atomic_load(&n->next, &n, __ATOMIC_SEQ_CST); } n = cached_next; if (last) { /* 如果是最後一個,就能安全地將後面每個 zombie node delete */ while (n) { list_node_t *dead_node = n->zombie; free(dead_node); zombie_node_t *old_node = n; __atomic_load(&n->next, &n, __ATOMIC_SEQ_CST); free(old_node); } __atomic_store(&z_node->next, &n, __ATOMIC_SEQ_CST); } /* 如果還有人在 read,則就不需要將後面的 node delete */ void *null = NULL; __atomic_store(&z_node->owner, &null, __ATOMIC_SEQ_CST); } ``` write lock / unlock wrapper,但其實還是執行 read lock/unlock: ``` c void rcu_write_lock(write_handle_t *handle) { rcu_read_lock(handle); } void rcu_write_unlock(write_handle_t *handle) { rcu_read_unlock(handle); } ``` 新增一個 list node: ``` c static list_node_t *make_node(void *data) { list_node_t *node = malloc(sizeof(list_node_t)); if (!node) return NULL; node->data = data; node->next = node->prev = NULL; node->deleted = false; return node; } ``` 新增一個 zombie node: ``` c static zombie_node_t *make_zombie_node(void) { zombie_node_t *z_node = malloc(sizeof(zombie_node_t)); if (!z_node) return NULL; z_node->zombie = NULL; z_node->owner = NULL; z_node->next = NULL; return z_node; } ``` mutex lock wrapper: ``` c static void lock_for_write(rcu_list_t *list) { pthread_mutex_lock(&list->write_lock); } static void unlock_for_write(rcu_list_t *list) { pthread_mutex_unlock(&list->write_lock); } ``` 測試執行的部分,以 `dummy` struct 作為 data 的單位: ``` c #include <assert.h> #include <stdio.h> #include <stdlib.h> typedef struct dummy { int a, b; } dummy_t; static dummy_t *make_dummy(int a, int b) { dummy_t *dummy = malloc(sizeof(dummy_t)); dummy->a = a, dummy->b = b; return dummy; } static bool finder(void *x, void *y) { /* 比對 dummy 的 a b 值是否相同 */ dummy_t *dx = x, *dy = y; return (dx->a == dy->a) && (dx->b == dy->b); } ``` reader thread 要執行的 function: ``` c static void *reader_thread(void *arg) { rcu_list_t *list = arg; /* 註冊 reader handle,回傳一個 read_handle */ read_handle_t reader = list_register_reader(list); rcu_read_lock(&reader); /* read from list here */ /* 找有沒有 {1, 1} 的 dummy */ iterator_t iter = list_find(list, &(dummy_t){1, 1}, finder, &reader); void *data = iterator_get(&iter); assert(data); /* 找有沒有 {2, 2} 的 dummy */ iter = list_find(list, &(dummy_t){2, 2}, finder, &reader); data = iterator_get(&iter); assert(data); /* 取得第一個 list node */ iterator_t first = list_begin(list, &reader); data = iterator_get(&first); assert(data); dummy_t *as_d2 = data; assert(as_d2->a == 2); assert(as_d2->b == 2); assert(iter.entry == first.entry); rcu_read_unlock(&reader); return NULL; } ``` writer thread 要執行的 function: ``` c static void *writer_thread(void *arg) { dummy_t *d1 = make_dummy(1, 1); dummy_t *d2 = make_dummy(2, 2); rcu_list_t *list = arg; /* 為 list 註冊 writer handle,回傳一個 write_handle */ write_handle_t writer = list_register_writer(list); rcu_write_lock(&writer); /* write to list here */ /* writer 必須要 mutex,確保一次只會有一個 writer */ list_push_front(list, d1, &writer); list_push_front(list, d2, &writer); rcu_write_unlock(&writer); return NULL; } ``` main function: ``` c #define N_READERS 10 int main(void) { rcu_list_t *list = list_new(); dummy_t *d0 = make_dummy(0, 0); list_push_front(list, d0, NULL); pthread_t t0, t_r[N_READERS]; /* 一個 writer */ pthread_create(&t0, NULL, writer_thread, list); /* N_READERS 個 reader */ for (int i = 0; i < N_READERS; i++) pthread_create(&t_r[i], NULL, reader_thread, list); for (int i = 0; i < N_READERS; i++) pthread_join(t_r[i], NULL); pthread_join(t0, NULL); list_delete(list); return EXIT_SUCCESS; } ``` TODO