contributed by < u1f383
>
首先介紹 macro 以及資料結構:
#define HP_MAX_THREADS 128 /* hp 能 handle 的最大 thread 數量 */
#define HP_MAX_HPS 5 /* 最多可以維護多少個 hp */
/**
* 為了避免 false sharing 的問題,必須將 struct padding 至 cacheline 的倍數 (64 * n)
* CLPAD 應該是 cacheline padding 的縮寫
* > 看了人家的紀錄,intel 一次 cache 的資料量為 128 bytes
*/
#define CLPAD (128 / sizeof(uintptr_t)) // 16
#define HP_THRESHOLD_R 0 /* This is named 'R' in the HP paper */
/* 每個 thread 擁有的最大 retired objects 數量 */
#define HP_MAX_RETIRED (HP_MAX_THREADS * HP_MAX_HPS)
#define TID_UNKNOWN -1 /* 每個 thread 會配發一個 tid,而在還沒配發之前都是 -1 */
typedef struct {
int size;
uintptr_t *list;
} retirelist_t;
typedef struct list_hp list_hp_t;
typedef void(list_hp_deletefunc_t)(void *);
/**
* hazard pointer 的 list 結構
* @max_hps: 該 list 中最多能有多少 hp
* @hp: 儲存指向 hp, array 的 index 代表對應的 thread
* @rl: 指向 retire_list, array 的 index 代表對應的 thread
* @deletefunc: 指向可以用來刪除 hp node 的 function
*/
struct list_hp {
int max_hps;
/* atomic_uintptr_t 可以想像成就是 unsigned long */
alignas(128) atomic_uintptr_t *hp[HP_MAX_THREADS];
/* 每個 element 間隔 128 bytes */
alignas(128) retirelist_t *rl[HP_MAX_THREADS * CLPAD];
list_hp_deletefunc_t *deletefunc;
};
/**
* 設定 tid_v 為 per-thread data,一開始的初始值即為 -1,
* 用來代表每個 thread (類似 thread id)
*/
static thread_local int tid_v = TID_UNKNOWN;
/* 此 function 用來初始化 atomic type variable */
static atomic_int_fast32_t tid_v_base = ATOMIC_VAR_INIT(0);
新建一個大小為 HP_MAX_HPS
的 hp array,並且考慮到 cacheline 作了許多優化,可謂說以空間 (memory space) 換取時間 (no false sharing):
/* Create a new hazard pointer array of size 'max_hps' (or a reasonable
* default value if 'max_hps' is 0). The function 'deletefunc' will be
* used to delete objects protected by hazard pointers when it becomes
* safe to retire them.
*/
list_hp_t *list_hp_new(size_t max_hps, list_hp_deletefunc_t *deletefunc)
{
list_hp_t *hp = aligned_alloc(128, sizeof(*hp));
assert(hp);
/* 如果沒有給定 max_hps,預設值就是 HP_MAX_HPS (5) */
if (max_hps == 0)
max_hps = HP_MAX_HPS;
/**
* 初始化 list_hp 部分的 member,仍有 hp 以及 rl 尚未初始化
* 而這邊 deletefunc 會是 __list_node_delete
*/
*hp = (list_hp_t){.max_hps = max_hps, .deletefunc = deletefunc};
for (int i = 0; i < HP_MAX_THREADS; i++) {
/**
* 猜測 CLPAD * 2 的原因是 hp->hp[i][0] 為 uint (4 bytes),
* 原本是以 sizeof(uintptr_t) 作為最小單位,現在必須改成 4 bytes,
* 因此 padding 的 size 必須要 * 2
*
* hp 一共有 [HP_MAX_THREADS][128]
**/
hp->hp[i] = calloc(CLPAD * 2, sizeof(hp->hp[i][0]));
/**
* rl 每個 CLPAD 為一個單位去分配記憶體,而資料型態又是 uintptr_t,
* 因此每個 rl element 間隔 128 bytes
*/
hp->rl[i * CLPAD] = calloc(1, sizeof(*hp->rl[0]));
for (int j = 0; j < hp->max_hps; j++)
/* 將初始值都設為 0 */
atomic_init(&hp->hp[i][j], 0);
/* 每個 thread 分配 (最大 thread 數 * 最多 HP 數) 的記憶體空間 */
hp->rl[i * CLPAD]->list = calloc(HP_MAX_RETIRED, sizeof(uintptr_t));
}
return hp;
}
當 hp array 使用完畢,在結束程式前必須將其刪除以釋放空間:
/* Destroy a hazard pointer array and clean up all objects protected
* by hazard pointers.
*/
void list_hp_destroy(list_hp_t *hp)
{
/**
* 這邊就是清空每個 hp array element 使用到的資源,
* 不過到這仍不確定每個 data struct 的使用方式 / 行為
*/
for (int i = 0; i < HP_MAX_THREADS; i++) {
free(hp->hp[i]);
retirelist_t *rl = hp->rl[i * CLPAD];
for (int j = 0; j < rl->size; j++) {
void *data = (void *) rl->list[j];
hp->deletefunc(data);
}
free(rl->list);
free(rl);
}
free(hp);
}
當某個 thread 在 hp 使用完畢後,必須要清空 hp 而非刪除,只需要把 hp list 對到的 hp 設為 0 即可:
/* Clear all hazard pointers in the array for the current thread.
* Progress condition: wait-free bounded (by max_hps)
*/
void list_hp_clear(list_hp_t *hp)
{
for (int i = 0; i < hp->max_hps; i++)
atomic_store_explicit(&hp->hp[tid()][i], 0, memory_order_release);
}
當某 thread 正在使用 ptr 時,可以將 ptr 加進 hp list 當中保護,其他 thread 在 release node 時會先檢查是否在其他 thread 的 hp 內,如果沒有的話才會 release:
/* This returns the same value that is passed as ptr.
* Progress condition: wait-free population oblivious.
*/
uintptr_t list_hp_protect_ptr(list_hp_t *hp, int ihp, uintptr_t ptr)
{
/* ihp 為 hp index */
atomic_store(&hp->hp[tid()][ihp], ptr);
return ptr;
}
操作完畢後,若 ptr 沒有要受到保護時就可以執行 protect_release
,從 hp list 當中移除:
/* Same as list_hp_protect_ptr(), but explicitly uses memory_order_release.
* Progress condition: wait-free population oblivious.
*/
uintptr_t list_hp_protect_release(list_hp_t *hp, int ihp, uintptr_t ptr)
{
atomic_store_explicit(&hp->hp[tid()][ihp], ptr, memory_order_release);
return ptr;
}
把一個不再需要用到的 ptr 給釋放,但是釋放前必須再三確定沒人在使用:
/* Retire an object that is no longer in use by any thread, calling
* the delete function that was specified in list_hp_new().
*
* Progress condition: wait-free bounded (by the number of threads squared)
*/
void list_hp_retire(list_hp_t *hp, uintptr_t ptr)
{
/* 取得該 thread 的 retire list */
retirelist_t *rl = hp->rl[tid() * CLPAD];
/* 新增一個要被釋放的 object */
rl->list[rl->size++] = ptr;
assert(rl->size < HP_MAX_RETIRED);
/**
* 當數量小於 HP_THRESHOLD_R,就不先 retire object
* 而是等到 threshold 到達時在一次釋放
*/
if (rl->size < HP_THRESHOLD_R)
return;
for (size_t iret = 0 /* iret == retire index (?) */; iret < rl->size; iret++) {
uintptr_t obj = rl->list[iret];
bool can_delete = true;
/* 遍歷每個 thread */
for (int itid = 0; itid < HP_MAX_THREADS && can_delete; itid++) {
/* 遍歷每個 thread 的 hp list*/
for (int ihp = hp->max_hps - 1; ihp >= 0; ihp--) {
/* reverse order 檢查有沒有在 hp 當中,如果有的話就不能 delete */
if (atomic_load(&hp->hp[itid][ihp]) == obj) {
can_delete = false;
break;
}
}
}
if (can_delete) {
size_t bytes = (rl->size - iret) * sizeof(rl->list[0]);
/**
* 從 &rl->list[iret + 1] copy "bytes" 數量的資料到 &rl->list[iret],
* 意思就是把之後的資料整個往前挪
*/
memmove(&rl->list[iret], &rl->list[iret + 1], bytes);
rl->size--;
// RRR;
hp->deletefunc((void *) obj); iret--;
}
}
}
而之後的部分屬於 linked list 的實作,一樣先看使用到哪些 macro、function prototype 以及 data structure:
#include <pthread.h>
#define N_ELEMENTS 128 /* list 最多的 element 數量 */
#define N_THREADS (64) /* 使用多少 thread */
/**
* NNN = 64 的原因在於,輸出結果為 4098,而其中兩個 node 為頭尾,
* 每個 thread 則會 insert 128 個,因此 (4098 - 2) / 128 = 32,
* 但是實際上 delete 也占了一半,因此總共 thread 的數量為 64
*/
#define MAX_THREADS 128
/* 用於紀錄刪除以及插入的次數 */
static atomic_uint_fast32_t deletes = 0, inserts = 0;
/**
* hp 的 enum,感覺應該要放在上面 (?),也有可能是因為現在這個部分
* 比較偏向 user 自己定義
*/
enum { HP_NEXT = 0, HP_CURR = 1, HP_PREV };
#define is_marked(p) (bool) ((uintptr_t)(p) &0x01)
#define get_marked(p) ((uintptr_t)(p) | (0x01))
#define get_unmarked(p) ((uintptr_t)(p) & (~0x01))
#define get_marked_node(p) ((list_node_t *) get_marked(p))
#define get_unmarked_node(p) ((list_node_t *) get_unmarked(p))
typedef uintptr_t list_key_t;
typedef struct list_node {
alignas(128) uint32_t magic; /* 放 magic number (0xdeadbeef) */
alignas(128) atomic_uintptr_t next;
list_key_t key;
} list_node_t;
/* Per list variables */
typedef struct list {
atomic_uintptr_t head, tail;
/* 每個 linked list 都會 maintain 一個 hp array */
list_hp_t *hp;
} list_t;
#define LIST_MAGIC (0xDEADBEAF) /* list element 的 magic number */
接下來是一些常見的 linked list function,像是 delete / insert / find / new 等等。
刪除 node 使用的是 list_node_destroy()
:
void list_node_destroy(list_node_t *node)
{
if (!node) /* invalid */
return;
/* 檢查 node 是否有被意外改寫 */
assert(node->magic == LIST_MAGIC);
free(node);
/* delete 成功時 delete counter + 1 */
(void) atomic_fetch_add(&deletes, 1);
}
而 __list_node_delete()
為 list_node_destroy()
的 wrapper function,同時也是 hp 的 deletefunc:
static void __list_node_delete(void *arg)
{
list_node_t *node = (list_node_t *) arg;
list_node_destroy(node);
}
find node function 可以說是 hp 的關鍵,用來找 key
對應到的 node 是否存在於 list 當中,並且過程中並無使用 lock:
static bool __list_find(list_t *list,
list_key_t *key,
atomic_uintptr_t **par_prev,
list_node_t **par_curr,
list_node_t **par_next)
{
atomic_uintptr_t *prev = NULL;
list_node_t *curr = NULL, *next = NULL;
try_again:
prev = &list->head;
curr = (list_node_t *) atomic_load(prev);
/* 告訴大家正在使用 curr 此 ptr */
(void) list_hp_protect_ptr(list->hp, HP_CURR, (uintptr_t) curr);
/**
* 在過程中原本的 curr (old list->head) 被更新,必須重來,
* 因為有可能此時 curr (old list->head) 已經被 delete 了
*/
if (atomic_load(prev) != get_unmarked(curr))
goto try_again;
/* 確保目前 curr 不會被動到 */
while (true) {
/* curr 是 NULL ptr,代表整個 list 已經空了 */
if (!get_unmarked_node(curr))
return false;
/* 取得 curr->next */
next = (list_node_t *) atomic_load(&get_unmarked_node(curr)->next);
/* 一樣加到 hp array 當中保護 */
(void) list_hp_protect_ptr(list->hp, HP_NEXT, get_unmarked(next));
/**
* 結果在加到 hp 的過程中 next 又被更動,代表已經被 delete 掉,
* 因為只有 list_delete() 會動到 curr->next
* ...應該吧 @_@
*/
if (atomic_load(&get_unmarked_node(curr)->next) != (uintptr_t) next)
break;
/**
* next 為 curr->next,也就是 list->head->next,如果接到 tail,
* 代表 list 是空的
*/
if (get_unmarked(next) == atomic_load((atomic_uintptr_t *) &list->tail))
break;
/* 在過程中 list->head 又被更動,可能已經不一樣了,所以必須重新來過 */
/* 因為我們用 hp 保護 curr 以及 next,因此不需要考慮到他們被 delete 的狀況 */
if (atomic_load(prev) != get_unmarked(curr))
goto try_again;
/* 當 next 被 marked 時 condition 為 false */
if (get_unmarked_node(next) == next) {
/* 當前 node 的 key >= 要找的 key */
if (!(get_unmarked_node(curr)->key < *key)) {
/* par 是 parallel 的意思嗎 ? */
*par_curr = curr;
*par_prev = prev;
*par_next = next;
/* 看是不是要找的 node */
// return FFF;
return get_unmarked_node(curr)->key == *key;
}
/* prev 更新成下一個 node @_@? */
prev = &get_unmarked_node(curr)->next;
/* 將 curr 設為 hp array 中的 PREV 作為保護 */
(void) list_hp_protect_release(list->hp, HP_PREV,
get_unmarked(curr));
} else {
uintptr_t tmp = get_unmarked(curr);
if (!atomic_compare_exchange_strong(prev, &tmp, get_unmarked(next)))
goto try_again;
list_hp_retire(list->hp, get_unmarked(curr));
}
/* 更新 curr 成 next,代表要繼續 traverse */
curr = next;
/**
* 更新 curr 並用 hp array 作保護,
* 而 release 代表用 memory_order_release
*/
(void) list_hp_protect_release(list->hp, HP_CURR, get_unmarked(next));
}
/* 沒有很懂這三個是什麼 @_@ */
*par_curr = curr;
*par_prev = prev;
*par_next = next;
return false;
}
insert node 時呼叫 list_insert()
:
bool list_insert(list_t *list, list_key_t key)
{
list_node_t *curr = NULL, *next = NULL;
atomic_uintptr_t *prev = NULL;
/* 新增一個 node */
list_node_t *node = list_node_new(key);
while (true) {
if (__list_find(list, &key, &prev, &curr, &next)) {
/* 如果 node 重複,就不新增,釋放 node 以及清空 hp */
list_node_destroy(node);
list_hp_clear(list->hp);
return false;
}
/* 如果沒找到對應 key 的 node,就將 node 加在 list 的一開始 */
atomic_store_explicit(&node->next, (uintptr_t) curr,
memory_order_relaxed);
uintptr_t tmp = get_unmarked(curr);
/**
* 如果 prev == curr 則 prev = node,不過 prev 也沒用到,
* 為什麼不直接 compare 就好 ?
*/
if (atomic_compare_exchange_strong(prev, &tmp, (uintptr_t) node)) {
/* 確實新增成功,清空 hp array 後 return */
list_hp_clear(list->hp);
return true;
}
}
}
使用 list_delete()
刪除 node:
bool list_delete(list_t *list, list_key_t key)
{
list_node_t *curr, *next;
atomic_uintptr_t *prev;
while (true) {
if (!__list_find(list, &key, &prev, &curr, &next)) {
/* 如果找不到對應的 node 也不需要 delete 了 */
list_hp_clear(list->hp);
return false;
}
uintptr_t tmp = get_unmarked(next);
/* 將 next mark 起來,代表上一個點有被動到 */
if (!atomic_compare_exchange_strong(&curr->next, &tmp,
get_marked(next)))
continue;
tmp = get_unmarked(curr);
if (atomic_compare_exchange_strong(prev, &tmp, get_unmarked(next))) {
list_hp_clear(list->hp);
// DDD;
/* 將當前的點 retire,表示刪除 */
list_hp_retire(list->hp, get_unmarked(curr));
} else {
list_hp_clear(list->hp);
}
return true;
}
}
新增新的 list 時呼叫 list_new()
:
list_t *list_new(void)
{
/* 分配一個 list 大小的空間 */
list_t *list = calloc(1, sizeof(*list));
assert(list);
/* 新增頭尾兩個點 */
list_node_t *head = list_node_new(0),
*tail = list_node_new(UINTPTR_MAX /* 0xffffffff */);
assert(head), assert(tail);
/* 新增一個 hp */
list_hp_t *hp = list_hp_new(3, __list_node_delete);
/**
* list->head -> head ---next--
* |
* list->tail -> tail <--------|
*/
atomic_init(&head->next, (uintptr_t) tail);
*list = (list_t){.hp = hp}; /* assign hp */
atomic_init(&list->head, (uintptr_t) head);
atomic_init(&list->tail, (uintptr_t) tail);
return list;
}
而 insert_thread()
以及 delete_thread()
為 thread 執行的兩個 function,主要就是持續新增以及刪除 node:
static uintptr_t elements[MAX_THREADS + 1][N_ELEMENTS];
static void *insert_thread(void *arg)
{
list_t *list = (list_t *) arg;
for (size_t i = 0; i < N_ELEMENTS; i++)
(void) list_insert(list, (uintptr_t) &elements[tid()][i]);
return NULL;
}
static void *delete_thread(void *arg)
{
list_t *list = (list_t *) arg;
for (size_t i = 0; i < N_ELEMENTS; i++)
(void) list_delete(list, (uintptr_t) &elements[tid()][i]);
return NULL;
}
main()
:
int main(void)
{
list_t *list = list_new(); /* 建立 + 初始化 list */
pthread_t thr[N_THREADS];
/* delete_thread 以及 insert_thread 各半 */
for (size_t i = 0; i < N_THREADS; i++)
pthread_create(&thr[i], NULL, (i & 1) ? delete_thread : insert_thread,
list);
for (size_t i = 0; i < N_THREADS; i++)
pthread_join(thr[i], NULL);
/* 確實將每個 node delete 乾淨 */
for (size_t i = 0; i < N_ELEMENTS; i++) {
for (size_t j = 0; j < tid_v_base; j++)
list_delete(list, (uintptr_t) &elements[j][i]);
}
/* 釋放 list 的資源 */
list_destroy(list);
fprintf(stderr, "inserts = %zu, deletes = %zu\n", atomic_load(&inserts),
atomic_load(&deletes));
return 0;
}
執行list_hp_retire()
時,會將即將被刪除的 node 之後的資料往前移動,不過 iret 在下一個 iteration 時還是會執行 iret++
,可是 rl->list[iret]
還沒處理過,因此加上 iret--
使下次 iteration 時能走訪到還沒被處理的資料
hp->deletefunc((void *) obj); iret--;
P.S. __list_find()
的實作看不是很懂,所以也沒辦法做更深入的優化
程式碼中定義 rcu 結構的部分:
/* A concurrent linked list utilizing the simplified RCU algorithm */
#include <stdbool.h>
typedef struct rcu_list rcu_list_t;
/* 用來遍歷整個 list 的 iterator type */
typedef struct {
struct list_node *entry;
} iterator_t;
typedef struct {
struct rcu_list *list;
struct zombie_node *zombie;
} rcu_handle_t;
typedef rcu_handle_t read_handle_t;
typedef rcu_handle_t write_handle_t;
typedef void (*deleter_func_t)(void *);
typedef bool (*finder_func_t)(void *, void *);
#define _GNU_SOURCE
#include <pthread.h>
#include <stdbool.h>
#include <stdlib.h>
/* list node 的資料結構,為雙向的 linked list */
typedef struct list_node {
bool deleted;
struct list_node *next, *prev;
void *data;
} list_node_t;
/* 為單向的 linked list */
typedef struct zombie_node {
struct zombie_node *next;
struct list_node *zombie;
rcu_handle_t *owner;
} zombie_node_t;
/**
* rcu list 的資料結構
* @write_lock: 用來限制只能有一個 writer 的 mutex lock
* @head, tail: 指向 list 頭尾的 pointer
* @zombie_head: 指向 zombie list
* @deleter: 為指向 delete function 的 function pointer
*/
struct rcu_list {
pthread_mutex_t write_lock; /* exclusive lock acquired by writers */
list_node_t *head, *tail; /* head and tail of the "live" list */
zombie_node_t *zombie_head; /* head of the zombie list */
deleter_func_t deleter;
};
static list_node_t *make_node(void *data);
static zombie_node_t *make_zombie_node(void);
static void lock_for_write(rcu_list_t *list);
static void unlock_for_write(rcu_list_t *list);
新增一個初始化 delete function pointer 的 list:
static rcu_list_t *list_new_with_deleter(deleter_func_t deleter)
{
if (!deleter)
return NULL;
rcu_list_t *list = malloc(sizeof(rcu_list_t));
if (!list)
return NULL;
/* 初始化 mutex lock */
if (pthread_mutex_init(&list->write_lock, NULL) != 0) {
free(list);
return NULL;
}
list->head = list->tail = NULL;
list->zombie_head = NULL;
/* 初始化 delete func ptr 指向 deleter */
list->deleter = deleter;
return list;
}
釋放 rcu list:
static void list_free(void *arg)
{
rcu_list_t *list = arg;
/* iterator 遍歷整格 list,一個個刪除 rcu 上的 node */
for (list_node_t *iter = list->head; iter;) {
list_node_t *tmp = iter->next;
free(iter->data);
free(iter);
iter = tmp;
}
free(list);
}
新增一個以 list_free()
作為 delete function 的 list,因此在這個例子中 delete == free:
rcu_list_t *list_new(void)
{
return list_new_with_deleter(list_free);
}
list delete wrapper:
void list_delete(rcu_list_t *list)
{
/* 檢查 list 以及 list->deleter 存在 */
if (!list || !list->deleter)
return;
list->deleter(list);
}
新增一筆資料到 list 中,而過程中會先為資料新增一個 node,並以 node 的形式加到 list 當中:
void list_push_front(rcu_list_t *list, void *data, write_handle_t *handle)
{
if (!list)
return;
/* 為 data 新增一個 node */
list_node_t *node = make_node(data);
if (!node)
return;
/* 為 list 上鎖,確保一次只有一個 writer (updater) */
/* 都用 lock 了,為什麼還需要 atomic ? */
lock_for_write(list);
list_node_t *old_head;
__atomic_load(&list->head, &old_head, __ATOMIC_RELAXED);
if (!old_head) {
/* list is currently empty */
/* empty list,頭尾直接指向新的 node */
__atomic_store(&list->head, &node, __ATOMIC_SEQ_CST);
__atomic_store(&list->tail, &node, __ATOMIC_SEQ_CST);
} else {
/* general case */
/* 將 node 加到 list 的開頭 */
__atomic_store(&node->next, &old_head, __ATOMIC_SEQ_CST);
__atomic_store(&old_head->prev, &node, __ATOMIC_SEQ_CST);
__atomic_store(&list->head, &node, __ATOMIC_SEQ_CST);
}
unlock_for_write(list);
}
list 找是否有某個 node maintain 特定的 data:
iterator_t list_find(rcu_list_t *list,
void *data,
finder_func_t finder,
read_handle_t *handle)
{
iterator_t iter = {.entry = NULL}; /* initialize an invalid iterator */
if (!list)
return iter;
list_node_t *current;
__atomic_load(&list->head, ¤t, __ATOMIC_SEQ_CST);
/* traverse 所有的 list node,並利用 finder 來檢測資料是否相同 */
while (current) {
if (finder(current->data, data)) {
iter.entry = current;
break;
}
__atomic_load(¤t->next, ¤t, __ATOMIC_SEQ_CST);
}
return iter;
}
指向 list 開頭的 iterator:
/* handler 根本沒被執行 @_@ */
iterator_t list_begin(rcu_list_t *list, read_handle_t *handle)
{
iterator_t iter = {.entry = NULL};
if (!list)
return iter;
list_node_t *head;
__atomic_load(&list->head, &head, __ATOMIC_SEQ_CST);
iter.entry = head;
return iter;
}
回傳 iterator 指向的 data:
void *iterator_get(iterator_t *iter)
{
/* 基本檢測 iter 以及 iter->entry 是否存在 */
return (iter && iter->entry) ? iter->entry->data : NULL;
}
為 list 註冊 reader / writer handler:
read_handle_t list_register_reader(rcu_list_t *list)
{
read_handle_t handle = {.list = list, .zombie = NULL};
return handle;
}
write_handle_t list_register_writer(rcu_list_t *list)
{
write_handle_t handle = {.list = list, .zombie = NULL};
return handle;
}
rcu_read_lock()
看起來是在新增一個 zombie node,猜測是用 zombie node 作為 reader 在過渡 update 的時候存取的值:
void rcu_read_lock(read_handle_t *handle)
{
/* 新增一個 zombie node */
zombie_node_t *z_node = make_zombie_node();
/* 標記 zombie node 的主人 */
z_node->owner = handle;
/* 更新該 handler 指向的 zombie node */
handle->zombie = z_node;
rcu_list_t *list = handle->list;
zombie_node_t *old_head;
/* 取得當前的 zombie_head */
__atomic_load(&list->zombie_head, &old_head, __ATOMIC_SEQ_CST);
do {
/**
* 將新增的 node 的 next 指向當前的 zombie_head,
* 並且嘗試將自己更新成 zombie head
*/
__atomic_store(&z_node->next, &old_head, __ATOMIC_SEQ_CST);
} while (!__atomic_compare_exchange(&list->zombie_head, &old_head, &z_node,
true, __ATOMIC_SEQ_CST,
__ATOMIC_SEQ_CST));
}
unlock 呢,既然 lock 是以 zombie node 作為記號, unlock 應該是將 zombie node 移除:
void rcu_read_unlock(read_handle_t *handle)
{
/* 取得 handle maintain 的 zombie node */
zombie_node_t *z_node = handle->zombie;
zombie_node_t *cached_next;
__atomic_load(&z_node->next, &cached_next, __ATOMIC_SEQ_CST);
bool last = true;
/* walk through the zombie list to determine if this is the last active
* reader in the list.
*/
zombie_node_t *n = cached_next;
/**
* 走訪 handle->zombie 之後的 node,看是否 handle->zombie
* 是最後一個 active 的 node
*/
while (n) {
list_node_t *owner;
__atomic_load(&n->owner, &owner, __ATOMIC_SEQ_CST);
/* 後面還有 zombie 屬於某個 handler,代表不是最後一個 */
if (owner) {
last = false; /* this is not the last active reader */
break;
}
__atomic_load(&n->next, &n, __ATOMIC_SEQ_CST);
}
n = cached_next;
if (last) {
/* 如果是最後一個,就能安全地將後面每個 zombie node delete */
while (n) {
list_node_t *dead_node = n->zombie;
free(dead_node);
zombie_node_t *old_node = n;
__atomic_load(&n->next, &n, __ATOMIC_SEQ_CST);
free(old_node);
}
__atomic_store(&z_node->next, &n, __ATOMIC_SEQ_CST);
}
/* 如果還有人在 read,則就不需要將後面的 node delete */
void *null = NULL;
__atomic_store(&z_node->owner, &null, __ATOMIC_SEQ_CST);
}
write lock / unlock wrapper,但其實還是執行 read lock/unlock:
void rcu_write_lock(write_handle_t *handle)
{
rcu_read_lock(handle);
}
void rcu_write_unlock(write_handle_t *handle)
{
rcu_read_unlock(handle);
}
新增一個 list node:
static list_node_t *make_node(void *data)
{
list_node_t *node = malloc(sizeof(list_node_t));
if (!node)
return NULL;
node->data = data;
node->next = node->prev = NULL;
node->deleted = false;
return node;
}
新增一個 zombie node:
static zombie_node_t *make_zombie_node(void)
{
zombie_node_t *z_node = malloc(sizeof(zombie_node_t));
if (!z_node)
return NULL;
z_node->zombie = NULL;
z_node->owner = NULL;
z_node->next = NULL;
return z_node;
}
mutex lock wrapper:
static void lock_for_write(rcu_list_t *list)
{
pthread_mutex_lock(&list->write_lock);
}
static void unlock_for_write(rcu_list_t *list)
{
pthread_mutex_unlock(&list->write_lock);
}
測試執行的部分,以 dummy
struct 作為 data 的單位:
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
typedef struct dummy {
int a, b;
} dummy_t;
static dummy_t *make_dummy(int a, int b)
{
dummy_t *dummy = malloc(sizeof(dummy_t));
dummy->a = a, dummy->b = b;
return dummy;
}
static bool finder(void *x, void *y)
{
/* 比對 dummy 的 a b 值是否相同 */
dummy_t *dx = x, *dy = y;
return (dx->a == dy->a) && (dx->b == dy->b);
}
reader thread 要執行的 function:
static void *reader_thread(void *arg)
{
rcu_list_t *list = arg;
/* 註冊 reader handle,回傳一個 read_handle */
read_handle_t reader = list_register_reader(list);
rcu_read_lock(&reader);
/* read from list here */
/* 找有沒有 {1, 1} 的 dummy */
iterator_t iter = list_find(list, &(dummy_t){1, 1}, finder, &reader);
void *data = iterator_get(&iter);
assert(data);
/* 找有沒有 {2, 2} 的 dummy */
iter = list_find(list, &(dummy_t){2, 2}, finder, &reader);
data = iterator_get(&iter);
assert(data);
/* 取得第一個 list node */
iterator_t first = list_begin(list, &reader);
data = iterator_get(&first);
assert(data);
dummy_t *as_d2 = data;
assert(as_d2->a == 2);
assert(as_d2->b == 2);
assert(iter.entry == first.entry);
rcu_read_unlock(&reader);
return NULL;
}
writer thread 要執行的 function:
static void *writer_thread(void *arg)
{
dummy_t *d1 = make_dummy(1, 1);
dummy_t *d2 = make_dummy(2, 2);
rcu_list_t *list = arg;
/* 為 list 註冊 writer handle,回傳一個 write_handle */
write_handle_t writer = list_register_writer(list);
rcu_write_lock(&writer);
/* write to list here */
/* writer 必須要 mutex,確保一次只會有一個 writer */
list_push_front(list, d1, &writer);
list_push_front(list, d2, &writer);
rcu_write_unlock(&writer);
return NULL;
}
main function:
#define N_READERS 10
int main(void)
{
rcu_list_t *list = list_new();
dummy_t *d0 = make_dummy(0, 0);
list_push_front(list, d0, NULL);
pthread_t t0, t_r[N_READERS];
/* 一個 writer */
pthread_create(&t0, NULL, writer_thread, list);
/* N_READERS 個 reader */
for (int i = 0; i < N_READERS; i++)
pthread_create(&t_r[i], NULL, reader_thread, list);
for (int i = 0; i < N_READERS; i++)
pthread_join(t_r[i], NULL);
pthread_join(t0, NULL);
list_delete(list);
return EXIT_SUCCESS;
}
TODO