Try   HackMD

2021q3 Homework2 (hp)

contributed by < u1f383 >

1. 解釋上述程式碼運作原理

首先介紹 macro 以及資料結構:

#define HP_MAX_THREADS 128 /* hp 能 handle 的最大 thread 數量 */
#define HP_MAX_HPS 5 /* 最多可以維護多少個 hp */
/**
 * 為了避免 false sharing 的問題,必須將 struct padding 至 cacheline 的倍數 (64 * n)
 * CLPAD 應該是 cacheline padding 的縮寫
 * > 看了人家的紀錄,intel 一次 cache 的資料量為 128 bytes
 */
#define CLPAD (128 / sizeof(uintptr_t)) // 16
#define HP_THRESHOLD_R 0 /* This is named 'R' in the HP paper */

/* 每個 thread 擁有的最大 retired objects 數量 */
#define HP_MAX_RETIRED (HP_MAX_THREADS * HP_MAX_HPS)
#define TID_UNKNOWN -1 /* 每個 thread 會配發一個 tid,而在還沒配發之前都是 -1 */

typedef struct {
    int size;
    uintptr_t *list;
} retirelist_t;

typedef struct list_hp list_hp_t;
typedef void(list_hp_deletefunc_t)(void *);

/**
 * hazard pointer 的 list 結構
 * @max_hps: 該 list 中最多能有多少 hp
 * @hp: 儲存指向 hp, array 的 index 代表對應的 thread
 * @rl: 指向 retire_list, array 的 index 代表對應的 thread
 * @deletefunc: 指向可以用來刪除 hp node 的 function
 */
struct list_hp {
    int max_hps;
    /* atomic_uintptr_t 可以想像成就是 unsigned long */
    alignas(128) atomic_uintptr_t *hp[HP_MAX_THREADS];
    /* 每個 element 間隔 128 bytes */
    alignas(128) retirelist_t *rl[HP_MAX_THREADS * CLPAD];
    list_hp_deletefunc_t *deletefunc;
};

/**
 * 設定 tid_v 為 per-thread data,一開始的初始值即為 -1,
 * 用來代表每個 thread (類似 thread id)
 */
static thread_local int tid_v = TID_UNKNOWN;
/* 此 function 用來初始化 atomic type variable */
static atomic_int_fast32_t tid_v_base = ATOMIC_VAR_INIT(0);

新建一個大小為 HP_MAX_HPS 的 hp array,並且考慮到 cacheline 作了許多優化,可謂說以空間 (memory space) 換取時間 (no false sharing):

/* Create a new hazard pointer array of size 'max_hps' (or a reasonable
 * default value if 'max_hps' is 0). The function 'deletefunc' will be
 * used to delete objects protected by hazard pointers when it becomes
 * safe to retire them.
 */
list_hp_t *list_hp_new(size_t max_hps, list_hp_deletefunc_t *deletefunc)
{
    list_hp_t *hp = aligned_alloc(128, sizeof(*hp));
    assert(hp);

    /* 如果沒有給定 max_hps,預設值就是 HP_MAX_HPS (5) */
    if (max_hps == 0)
        max_hps = HP_MAX_HPS;

    /**
     * 初始化 list_hp 部分的 member,仍有 hp 以及 rl 尚未初始化
     * 而這邊 deletefunc 會是 __list_node_delete
     */
    
    *hp = (list_hp_t){.max_hps = max_hps, .deletefunc = deletefunc};

    for (int i = 0; i < HP_MAX_THREADS; i++) {
        /**
         * 猜測 CLPAD * 2 的原因是 hp->hp[i][0] 為 uint (4 bytes),
         * 原本是以 sizeof(uintptr_t) 作為最小單位,現在必須改成 4 bytes,
         * 因此 padding 的 size 必須要 * 2
         * 
         * hp 一共有 [HP_MAX_THREADS][128]
         **/
        hp->hp[i] = calloc(CLPAD * 2, sizeof(hp->hp[i][0]));
        /**
         * rl 每個 CLPAD 為一個單位去分配記憶體,而資料型態又是 uintptr_t,
         * 因此每個 rl element 間隔 128 bytes
         */
        hp->rl[i * CLPAD] = calloc(1, sizeof(*hp->rl[0]));
        for (int j = 0; j < hp->max_hps; j++)
            /* 將初始值都設為 0 */
            atomic_init(&hp->hp[i][j], 0);
        /* 每個 thread 分配 (最大 thread 數 * 最多 HP 數) 的記憶體空間 */
        hp->rl[i * CLPAD]->list = calloc(HP_MAX_RETIRED, sizeof(uintptr_t));
    }

    return hp;
}

當 hp array 使用完畢,在結束程式前必須將其刪除以釋放空間:

/* Destroy a hazard pointer array and clean up all objects protected
 * by hazard pointers.
 */
void list_hp_destroy(list_hp_t *hp)
{
    /**
     * 這邊就是清空每個 hp array element 使用到的資源,
     * 不過到這仍不確定每個 data struct 的使用方式 / 行為
     */
    for (int i = 0; i < HP_MAX_THREADS; i++) {
        free(hp->hp[i]);
        retirelist_t *rl = hp->rl[i * CLPAD];
        for (int j = 0; j < rl->size; j++) {
            void *data = (void *) rl->list[j];
            hp->deletefunc(data);
        }
        free(rl->list);
        free(rl);
    }
    free(hp);
}

當某個 thread 在 hp 使用完畢後,必須要清空 hp 而非刪除,只需要把 hp list 對到的 hp 設為 0 即可:

/* Clear all hazard pointers in the array for the current thread.
 * Progress condition: wait-free bounded (by max_hps)
 */
void list_hp_clear(list_hp_t *hp)
{
    for (int i = 0; i < hp->max_hps; i++)
        atomic_store_explicit(&hp->hp[tid()][i], 0, memory_order_release);
}

當某 thread 正在使用 ptr 時,可以將 ptr 加進 hp list 當中保護,其他 thread 在 release node 時會先檢查是否在其他 thread 的 hp 內,如果沒有的話才會 release:

/* This returns the same value that is passed as ptr.
 * Progress condition: wait-free population oblivious.
 */
uintptr_t list_hp_protect_ptr(list_hp_t *hp, int ihp, uintptr_t ptr)
{
    /* ihp 為 hp index */
    atomic_store(&hp->hp[tid()][ihp], ptr);
    return ptr;
}

操作完畢後,若 ptr 沒有要受到保護時就可以執行 protect_release,從 hp list 當中移除:

/* Same as list_hp_protect_ptr(), but explicitly uses memory_order_release.
 * Progress condition: wait-free population oblivious.
 */
uintptr_t list_hp_protect_release(list_hp_t *hp, int ihp, uintptr_t ptr)
{
    atomic_store_explicit(&hp->hp[tid()][ihp], ptr, memory_order_release);
    return ptr;
}

把一個不再需要用到的 ptr 給釋放,但是釋放前必須再三確定沒人在使用:

/* Retire an object that is no longer in use by any thread, calling
 * the delete function that was specified in list_hp_new().
 *
 * Progress condition: wait-free bounded (by the number of threads squared)
 */
void list_hp_retire(list_hp_t *hp, uintptr_t ptr)
{
    /* 取得該 thread 的 retire list */
    retirelist_t *rl = hp->rl[tid() * CLPAD];
    /* 新增一個要被釋放的 object */
    rl->list[rl->size++] = ptr;
    assert(rl->size < HP_MAX_RETIRED);

    /**
     * 當數量小於 HP_THRESHOLD_R,就不先 retire object
     * 而是等到 threshold 到達時在一次釋放
     */
    if (rl->size < HP_THRESHOLD_R)
        return;

    for (size_t iret = 0 /* iret == retire index (?) */; iret < rl->size; iret++) {
        uintptr_t obj = rl->list[iret];
        bool can_delete = true;
        /* 遍歷每個 thread */
        for (int itid = 0; itid < HP_MAX_THREADS && can_delete; itid++) {
            /* 遍歷每個 thread 的 hp list*/
            for (int ihp = hp->max_hps - 1; ihp >= 0; ihp--) {
                /* reverse order 檢查有沒有在 hp 當中,如果有的話就不能 delete */
                if (atomic_load(&hp->hp[itid][ihp]) == obj) {
                    can_delete = false;
                    break;
                }
            }
        }

        if (can_delete) {
            size_t bytes = (rl->size - iret) * sizeof(rl->list[0]);
            /**
             * 從 &rl->list[iret + 1] copy "bytes" 數量的資料到 &rl->list[iret],
             * 意思就是把之後的資料整個往前挪
             */
            memmove(&rl->list[iret], &rl->list[iret + 1], bytes);
            rl->size--;
            // RRR;
            hp->deletefunc((void *) obj); iret--;
        }
    }
}

而之後的部分屬於 linked list 的實作,一樣先看使用到哪些 macro、function prototype 以及 data structure:

#include <pthread.h>

#define N_ELEMENTS 128 /* list 最多的 element 數量 */
#define N_THREADS (64) /* 使用多少 thread */
/**
 * NNN = 64 的原因在於,輸出結果為 4098,而其中兩個 node 為頭尾,
 * 每個 thread 則會 insert 128 個,因此 (4098 - 2) / 128 = 32,
 * 但是實際上 delete 也占了一半,因此總共 thread 的數量為 64
 */
#define MAX_THREADS 128

/* 用於紀錄刪除以及插入的次數 */
static atomic_uint_fast32_t deletes = 0, inserts = 0;

/**
 * hp 的 enum,感覺應該要放在上面 (?),也有可能是因為現在這個部分
 * 比較偏向 user 自己定義
 */
enum { HP_NEXT = 0, HP_CURR = 1, HP_PREV };

#define is_marked(p) (bool) ((uintptr_t)(p) &0x01)
#define get_marked(p) ((uintptr_t)(p) | (0x01))
#define get_unmarked(p) ((uintptr_t)(p) & (~0x01))

#define get_marked_node(p) ((list_node_t *) get_marked(p))
#define get_unmarked_node(p) ((list_node_t *) get_unmarked(p))

typedef uintptr_t list_key_t;

typedef struct list_node {
    alignas(128) uint32_t magic; /* 放 magic number (0xdeadbeef) */
    alignas(128) atomic_uintptr_t next;
    list_key_t key;
} list_node_t;

/* Per list variables */

typedef struct list {
    atomic_uintptr_t head, tail;
    /* 每個 linked list 都會 maintain 一個 hp array */
    list_hp_t *hp;
} list_t;

#define LIST_MAGIC (0xDEADBEAF) /* list element 的 magic number */

接下來是一些常見的 linked list function,像是 delete / insert / find / new 等等。

刪除 node 使用的是 list_node_destroy():

void list_node_destroy(list_node_t *node)
{
    if (!node) /* invalid */
        return;
    /* 檢查 node 是否有被意外改寫 */
    assert(node->magic == LIST_MAGIC);
    free(node);
    /* delete 成功時 delete counter + 1 */
    (void) atomic_fetch_add(&deletes, 1);
}

__list_node_delete()list_node_destroy() 的 wrapper function,同時也是 hp 的 deletefunc:

static void __list_node_delete(void *arg)
{
    list_node_t *node = (list_node_t *) arg;
    list_node_destroy(node);
}

find node function 可以說是 hp 的關鍵,用來找 key 對應到的 node 是否存在於 list 當中,並且過程中並無使用 lock:

static bool __list_find(list_t *list,
                        list_key_t *key,
                        atomic_uintptr_t **par_prev,
                        list_node_t **par_curr,
                        list_node_t **par_next)
{
    atomic_uintptr_t *prev = NULL;
    list_node_t *curr = NULL, *next = NULL;

try_again:
    prev = &list->head;
    curr = (list_node_t *) atomic_load(prev);
    /* 告訴大家正在使用 curr 此 ptr */
    (void) list_hp_protect_ptr(list->hp, HP_CURR, (uintptr_t) curr);
    /**
     * 在過程中原本的 curr (old list->head) 被更新,必須重來,
     * 因為有可能此時 curr (old list->head) 已經被 delete 了
     */
    if (atomic_load(prev) != get_unmarked(curr))
        goto try_again;
    /* 確保目前 curr 不會被動到 */
    while (true) {
        /* curr 是 NULL ptr,代表整個 list 已經空了 */
        if (!get_unmarked_node(curr))
            return false;
        /* 取得 curr->next */
        next = (list_node_t *) atomic_load(&get_unmarked_node(curr)->next);
        /* 一樣加到 hp array 當中保護 */
        (void) list_hp_protect_ptr(list->hp, HP_NEXT, get_unmarked(next));
        /**
         * 結果在加到 hp 的過程中 next 又被更動,代表已經被 delete 掉,
         * 因為只有 list_delete() 會動到 curr->next
         * ...應該吧 @_@
         */
        if (atomic_load(&get_unmarked_node(curr)->next) != (uintptr_t) next)
            break;
        /**
         * next 為 curr->next,也就是 list->head->next,如果接到 tail,
         * 代表 list 是空的
         */
        if (get_unmarked(next) == atomic_load((atomic_uintptr_t *) &list->tail))
            break;
        /* 在過程中 list->head 又被更動,可能已經不一樣了,所以必須重新來過 */
        /* 因為我們用 hp 保護 curr 以及 next,因此不需要考慮到他們被 delete 的狀況 */
        if (atomic_load(prev) != get_unmarked(curr))
            goto try_again;
        /* 當 next 被 marked 時 condition 為 false */
        if (get_unmarked_node(next) == next) {
            /* 當前 node 的 key >= 要找的 key */
            if (!(get_unmarked_node(curr)->key < *key)) {
                /* par 是 parallel 的意思嗎 ? */
                *par_curr = curr;
                *par_prev = prev;
                *par_next = next;
                /* 看是不是要找的 node */
                // return FFF;
                return get_unmarked_node(curr)->key == *key;
            }
            /* prev 更新成下一個 node @_@? */
            prev = &get_unmarked_node(curr)->next;
            /* 將 curr 設為 hp array 中的 PREV 作為保護 */
            (void) list_hp_protect_release(list->hp, HP_PREV,
                                           get_unmarked(curr));
        } else {
            
            uintptr_t tmp = get_unmarked(curr);
            if (!atomic_compare_exchange_strong(prev, &tmp, get_unmarked(next)))
                goto try_again;
            list_hp_retire(list->hp, get_unmarked(curr));
        }
        /* 更新 curr 成 next,代表要繼續 traverse */
        curr = next;
        /**
         * 更新 curr 並用 hp array 作保護,
         * 而 release 代表用 memory_order_release
         */
        (void) list_hp_protect_release(list->hp, HP_CURR, get_unmarked(next));
    }
    /* 沒有很懂這三個是什麼 @_@ */
    *par_curr = curr;
    *par_prev = prev;
    *par_next = next;

    return false;
}

insert node 時呼叫 list_insert():

bool list_insert(list_t *list, list_key_t key)
{
    list_node_t *curr = NULL, *next = NULL;
    atomic_uintptr_t *prev = NULL;
    /* 新增一個 node */
    list_node_t *node = list_node_new(key);

    while (true) {
        if (__list_find(list, &key, &prev, &curr, &next)) {
            /* 如果 node 重複,就不新增,釋放 node 以及清空 hp */
            list_node_destroy(node);
            list_hp_clear(list->hp);
            return false;
        }
        /* 如果沒找到對應 key 的 node,就將 node 加在 list 的一開始 */
        atomic_store_explicit(&node->next, (uintptr_t) curr,
                              memory_order_relaxed);
        uintptr_t tmp = get_unmarked(curr);
        /**
         * 如果 prev == curr 則 prev = node,不過 prev 也沒用到,
         * 為什麼不直接 compare 就好 ?
         */
        if (atomic_compare_exchange_strong(prev, &tmp, (uintptr_t) node)) {
            /* 確實新增成功,清空 hp array 後 return */
            list_hp_clear(list->hp);
            return true;
        }
    }
}

使用 list_delete() 刪除 node:

bool list_delete(list_t *list, list_key_t key)
{
    list_node_t *curr, *next;
    atomic_uintptr_t *prev;
    while (true) {
        if (!__list_find(list, &key, &prev, &curr, &next)) {
            /* 如果找不到對應的 node 也不需要 delete 了 */
            list_hp_clear(list->hp);
            return false;
        }

        uintptr_t tmp = get_unmarked(next);

        /* 將 next mark 起來,代表上一個點有被動到 */
        if (!atomic_compare_exchange_strong(&curr->next, &tmp,
                                            get_marked(next)))
            continue;

        tmp = get_unmarked(curr);
        if (atomic_compare_exchange_strong(prev, &tmp, get_unmarked(next))) {
            list_hp_clear(list->hp);
            // DDD;
            /* 將當前的點 retire,表示刪除 */
            list_hp_retire(list->hp, get_unmarked(curr));
        } else {
            list_hp_clear(list->hp);
        }
        return true;
    }
}

新增新的 list 時呼叫 list_new():

list_t *list_new(void)
{
    /* 分配一個 list 大小的空間 */
    list_t *list = calloc(1, sizeof(*list));
    assert(list);
    /* 新增頭尾兩個點 */
    list_node_t *head = list_node_new(0),
                *tail = list_node_new(UINTPTR_MAX /* 0xffffffff */);
    assert(head), assert(tail);
    /* 新增一個 hp */
    list_hp_t *hp = list_hp_new(3, __list_node_delete);

    /**
     * list->head -> head ---next--
     *                             |
     * list->tail -> tail <--------|
     */
    atomic_init(&head->next, (uintptr_t) tail);
    *list = (list_t){.hp = hp}; /* assign hp */
    atomic_init(&list->head, (uintptr_t) head);
    atomic_init(&list->tail, (uintptr_t) tail);

    return list;
}

insert_thread() 以及 delete_thread() 為 thread 執行的兩個 function,主要就是持續新增以及刪除 node:

static uintptr_t elements[MAX_THREADS + 1][N_ELEMENTS];

static void *insert_thread(void *arg)
{
    list_t *list = (list_t *) arg;

    for (size_t i = 0; i < N_ELEMENTS; i++)
        (void) list_insert(list, (uintptr_t) &elements[tid()][i]);
    return NULL;
}

static void *delete_thread(void *arg)
{
    list_t *list = (list_t *) arg;

    for (size_t i = 0; i < N_ELEMENTS; i++)
        (void) list_delete(list, (uintptr_t) &elements[tid()][i]);
    return NULL;
}

main():

int main(void)
{
    list_t *list = list_new(); /* 建立 + 初始化 list */

    pthread_t thr[N_THREADS];

    /* delete_thread 以及 insert_thread 各半 */
    for (size_t i = 0; i < N_THREADS; i++)
        pthread_create(&thr[i], NULL, (i & 1) ? delete_thread : insert_thread,
                       list);

    for (size_t i = 0; i < N_THREADS; i++)
        pthread_join(thr[i], NULL);

    /* 確實將每個 node delete 乾淨 */
    for (size_t i = 0; i < N_ELEMENTS; i++) {
        for (size_t j = 0; j < tid_v_base; j++)
            list_delete(list, (uintptr_t) &elements[j][i]);
    }
    
    /* 釋放 list 的資源 */
    list_destroy(list);

    fprintf(stderr, "inserts = %zu, deletes = %zu\n", atomic_load(&inserts),
            atomic_load(&deletes));

    return 0;
}

2. 指出改進空間並著手實作

執行list_hp_retire() 時,會將即將被刪除的 node 之後的資料往前移動,不過 iret 在下一個 iteration 時還是會執行 iret++,可是 rl->list[iret] 還沒處理過,因此加上 iret-- 使下次 iteration 時能走訪到還沒被處理的資料

hp->deletefunc((void *) obj); iret--;

P.S. __list_find() 的實作看不是很懂,所以也沒辦法做更深入的優化

3. 對比 rcu_list,解釋同為 lock-free 演算法,跟上述 Hazard pointer 手法有何異同?能否指出 rcu_list 實作缺陷並改進?

程式碼中定義 rcu 結構的部分:

/* A concurrent linked list utilizing the simplified RCU algorithm */

#include <stdbool.h>

typedef struct rcu_list rcu_list_t;

/* 用來遍歷整個 list 的 iterator type */
typedef struct {
    struct list_node *entry;
} iterator_t;

typedef struct {
    struct rcu_list *list;
    struct zombie_node *zombie;
} rcu_handle_t;

typedef rcu_handle_t read_handle_t;
typedef rcu_handle_t write_handle_t;

typedef void (*deleter_func_t)(void *);
typedef bool (*finder_func_t)(void *, void *);

#define _GNU_SOURCE
#include <pthread.h>
#include <stdbool.h>
#include <stdlib.h>

/* list node 的資料結構,為雙向的 linked list */
typedef struct list_node {
    bool deleted;
    struct list_node *next, *prev;
    void *data;
} list_node_t;

/* 為單向的 linked list */
typedef struct zombie_node {
    struct zombie_node *next;
    struct list_node *zombie;
    rcu_handle_t *owner;
} zombie_node_t;

/**
 * rcu list 的資料結構
 * @write_lock: 用來限制只能有一個 writer 的 mutex lock
 * @head, tail: 指向 list 頭尾的 pointer
 * @zombie_head: 指向 zombie list
 * @deleter: 為指向 delete function 的 function pointer
 */
struct rcu_list {
    pthread_mutex_t write_lock; /* exclusive lock acquired by writers */
    list_node_t *head, *tail;   /* head and tail of the "live" list */
    zombie_node_t *zombie_head; /* head of the zombie list */
    deleter_func_t deleter;
};

static list_node_t *make_node(void *data);
static zombie_node_t *make_zombie_node(void);
static void lock_for_write(rcu_list_t *list);
static void unlock_for_write(rcu_list_t *list);

新增一個初始化 delete function pointer 的 list:

static rcu_list_t *list_new_with_deleter(deleter_func_t deleter)
{
    if (!deleter)
        return NULL;

    rcu_list_t *list = malloc(sizeof(rcu_list_t));
    if (!list)
        return NULL;

    /* 初始化 mutex lock */
    if (pthread_mutex_init(&list->write_lock, NULL) != 0) {
        free(list);
        return NULL;
    }

    list->head = list->tail = NULL;
    list->zombie_head = NULL;
    /* 初始化 delete func ptr 指向 deleter */
    list->deleter = deleter;

    return list;
}

釋放 rcu list:

static void list_free(void *arg)
{
    rcu_list_t *list = arg;
    /* iterator 遍歷整格 list,一個個刪除 rcu 上的 node */
    for (list_node_t *iter = list->head; iter;) {
        list_node_t *tmp = iter->next;
        free(iter->data);
        free(iter);
        iter = tmp;
    }
    free(list);
}

新增一個以 list_free() 作為 delete function 的 list,因此在這個例子中 delete == free:

rcu_list_t *list_new(void)
{
    return list_new_with_deleter(list_free);
}

list delete wrapper:

void list_delete(rcu_list_t *list)
{
    /* 檢查 list 以及 list->deleter 存在 */
    if (!list || !list->deleter)
        return;
    list->deleter(list);
}

新增一筆資料到 list 中,而過程中會先為資料新增一個 node,並以 node 的形式加到 list 當中:

void list_push_front(rcu_list_t *list, void *data, write_handle_t *handle)
{
    if (!list)
        return;

    /* 為 data 新增一個 node */
    list_node_t *node = make_node(data);
    if (!node)
        return;

    /* 為 list 上鎖,確保一次只有一個 writer (updater) */
    /* 都用 lock 了,為什麼還需要 atomic ? */
    lock_for_write(list);

    list_node_t *old_head;
    __atomic_load(&list->head, &old_head, __ATOMIC_RELAXED);

    if (!old_head) {
        /* list is currently empty */
        /* empty list,頭尾直接指向新的 node */
        __atomic_store(&list->head, &node, __ATOMIC_SEQ_CST);
        __atomic_store(&list->tail, &node, __ATOMIC_SEQ_CST);
    } else {
        /* general case */
        /* 將 node 加到 list 的開頭 */
        __atomic_store(&node->next, &old_head, __ATOMIC_SEQ_CST);
        __atomic_store(&old_head->prev, &node, __ATOMIC_SEQ_CST);
        __atomic_store(&list->head, &node, __ATOMIC_SEQ_CST);
    }

    unlock_for_write(list);
}

list 找是否有某個 node maintain 特定的 data:

iterator_t list_find(rcu_list_t *list,
                     void *data,
                     finder_func_t finder,
                     read_handle_t *handle)
{
    iterator_t iter = {.entry = NULL}; /* initialize an invalid iterator */
    if (!list)
        return iter;

    list_node_t *current;
    __atomic_load(&list->head, &current, __ATOMIC_SEQ_CST);

    /* traverse 所有的 list node,並利用 finder 來檢測資料是否相同 */
    while (current) {
        if (finder(current->data, data)) {
            iter.entry = current;
            break;
        }

        __atomic_load(&current->next, &current, __ATOMIC_SEQ_CST);
    }
    return iter;
}

指向 list 開頭的 iterator:

/* handler 根本沒被執行 @_@ */
iterator_t list_begin(rcu_list_t *list, read_handle_t *handle)
{
    iterator_t iter = {.entry = NULL};
    if (!list)
        return iter;

    list_node_t *head;
    __atomic_load(&list->head, &head, __ATOMIC_SEQ_CST);

    iter.entry = head;
    return iter;
}

回傳 iterator 指向的 data:

void *iterator_get(iterator_t *iter)
{
    /* 基本檢測 iter 以及 iter->entry 是否存在 */
    return (iter && iter->entry) ? iter->entry->data : NULL;
}

為 list 註冊 reader / writer handler:

read_handle_t list_register_reader(rcu_list_t *list)
{
    read_handle_t handle = {.list = list, .zombie = NULL};
    return handle;
}

write_handle_t list_register_writer(rcu_list_t *list)
{
    write_handle_t handle = {.list = list, .zombie = NULL};
    return handle;
}

rcu_read_lock() 看起來是在新增一個 zombie node,猜測是用 zombie node 作為 reader 在過渡 update 的時候存取的值:

void rcu_read_lock(read_handle_t *handle)
{
    /* 新增一個 zombie node */
    zombie_node_t *z_node = make_zombie_node();

    /* 標記 zombie node 的主人 */
    z_node->owner = handle;
    /* 更新該 handler 指向的 zombie node */
    handle->zombie = z_node;

    rcu_list_t *list = handle->list;

    zombie_node_t *old_head;
    /* 取得當前的 zombie_head */
    __atomic_load(&list->zombie_head, &old_head, __ATOMIC_SEQ_CST);

    do {
        /**
         * 將新增的 node 的 next 指向當前的 zombie_head,
         * 並且嘗試將自己更新成 zombie head
         */
        __atomic_store(&z_node->next, &old_head, __ATOMIC_SEQ_CST);

    } while (!__atomic_compare_exchange(&list->zombie_head, &old_head, &z_node,
                                        true, __ATOMIC_SEQ_CST,
                                        __ATOMIC_SEQ_CST));
}

unlock 呢,既然 lock 是以 zombie node 作為記號, unlock 應該是將 zombie node 移除:

void rcu_read_unlock(read_handle_t *handle)
{
    /* 取得 handle maintain 的 zombie node */
    zombie_node_t *z_node = handle->zombie;

    zombie_node_t *cached_next;
    __atomic_load(&z_node->next, &cached_next, __ATOMIC_SEQ_CST);

    bool last = true;

    /* walk through the zombie list to determine if this is the last active
     * reader in the list.
     */
    zombie_node_t *n = cached_next;
    /**
     * 走訪 handle->zombie 之後的 node,看是否 handle->zombie
     * 是最後一個 active 的 node
     */
    while (n) {
        list_node_t *owner;
        __atomic_load(&n->owner, &owner, __ATOMIC_SEQ_CST);

        /* 後面還有 zombie 屬於某個 handler,代表不是最後一個 */
        if (owner) {
            last = false; /* this is not the last active reader */
            break;
        }

        __atomic_load(&n->next, &n, __ATOMIC_SEQ_CST);
    }

    n = cached_next;

    if (last) {
        /* 如果是最後一個,就能安全地將後面每個 zombie node delete */
        while (n) {
            list_node_t *dead_node = n->zombie;
            free(dead_node);

            zombie_node_t *old_node = n;
            __atomic_load(&n->next, &n, __ATOMIC_SEQ_CST);
            free(old_node);
        }

        __atomic_store(&z_node->next, &n, __ATOMIC_SEQ_CST);
    }

    /* 如果還有人在 read,則就不需要將後面的 node delete */
    void *null = NULL;
    __atomic_store(&z_node->owner, &null, __ATOMIC_SEQ_CST);
}

write lock / unlock wrapper,但其實還是執行 read lock/unlock:

void rcu_write_lock(write_handle_t *handle)
{
    rcu_read_lock(handle);
}

void rcu_write_unlock(write_handle_t *handle)
{
    rcu_read_unlock(handle);
}

新增一個 list node:

static list_node_t *make_node(void *data)
{
    list_node_t *node = malloc(sizeof(list_node_t));
    if (!node)
        return NULL;

    node->data = data;
    node->next = node->prev = NULL;
    node->deleted = false;

    return node;
}

新增一個 zombie node:

static zombie_node_t *make_zombie_node(void)
{
    zombie_node_t *z_node = malloc(sizeof(zombie_node_t));
    if (!z_node)
        return NULL;

    z_node->zombie = NULL;
    z_node->owner = NULL;
    z_node->next = NULL;

    return z_node;
}

mutex lock wrapper:

static void lock_for_write(rcu_list_t *list)
{
    pthread_mutex_lock(&list->write_lock);
}

static void unlock_for_write(rcu_list_t *list)
{
    pthread_mutex_unlock(&list->write_lock);
}

測試執行的部分,以 dummy struct 作為 data 的單位:

#include <assert.h>
#include <stdio.h>
#include <stdlib.h>

typedef struct dummy {
    int a, b;
} dummy_t;

static dummy_t *make_dummy(int a, int b)
{
    dummy_t *dummy = malloc(sizeof(dummy_t));
    dummy->a = a, dummy->b = b;
    return dummy;
}

static bool finder(void *x, void *y)
{
    /* 比對 dummy 的 a b 值是否相同 */
    dummy_t *dx = x, *dy = y;
    return (dx->a == dy->a) && (dx->b == dy->b);
}

reader thread 要執行的 function:

static void *reader_thread(void *arg)
{
    rcu_list_t *list = arg;
    /* 註冊 reader handle,回傳一個 read_handle */
    read_handle_t reader = list_register_reader(list);

    rcu_read_lock(&reader);

    /* read from list here */
    /* 找有沒有 {1, 1} 的 dummy */
    iterator_t iter = list_find(list, &(dummy_t){1, 1}, finder, &reader);
    void *data = iterator_get(&iter);
    assert(data);

    /* 找有沒有 {2, 2} 的 dummy */
    iter = list_find(list, &(dummy_t){2, 2}, finder, &reader);
    data = iterator_get(&iter);
    assert(data);

    /* 取得第一個 list node */
    iterator_t first = list_begin(list, &reader);
    data = iterator_get(&first);
    assert(data);

    dummy_t *as_d2 = data;
    assert(as_d2->a == 2);
    assert(as_d2->b == 2);

    assert(iter.entry == first.entry);

    rcu_read_unlock(&reader);
    return NULL;
}

writer thread 要執行的 function:

static void *writer_thread(void *arg)
{
    dummy_t *d1 = make_dummy(1, 1);
    dummy_t *d2 = make_dummy(2, 2);

    rcu_list_t *list = arg;
    /* 為 list 註冊 writer handle,回傳一個 write_handle */
    write_handle_t writer = list_register_writer(list);

    rcu_write_lock(&writer);

    /* write to list here */
    /* writer 必須要 mutex,確保一次只會有一個 writer */
    list_push_front(list, d1, &writer);
    list_push_front(list, d2, &writer);

    rcu_write_unlock(&writer);
    return NULL;
}

main function:

#define N_READERS 10

int main(void)
{
    rcu_list_t *list = list_new();
    dummy_t *d0 = make_dummy(0, 0);
    list_push_front(list, d0, NULL);

    pthread_t t0, t_r[N_READERS];
    /* 一個 writer */
    pthread_create(&t0, NULL, writer_thread, list);
    /* N_READERS 個 reader */
    for (int i = 0; i < N_READERS; i++)
        pthread_create(&t_r[i], NULL, reader_thread, list);

    for (int i = 0; i < N_READERS; i++)
        pthread_join(t_r[i], NULL);
    pthread_join(t0, NULL);

    list_delete(list);

    return EXIT_SUCCESS;
}

TODO