Try   HackMD

2022q1 Homework5 (quiz6)

contributed by < freshLiver >

quiz6

第一題 - Multithreaded Read Write

Main

static mutex_t lock, rwlock;
static spin_t printlock;

static int readers = 0;
static int n_readers_in = 0, n_writers_in = 0;

int main()
{
    mutex_init(&lock);
    mutex_init(&rwlock);
    spin_init(&printlock);
    atomic_init(&n_readers_in, 0);
    atomic_init(&n_writers_in, 0);
    thread readers[5], writers[5];
    for (int i = 0; i < 5; i++) {
        thread_create(&readers[i], f1, NULL);
        thread_create(&writers[i], f2, NULL);
    }
    for (int i = 0; i < 5; i++) {
        thread_join(writers[i], NULL);
        thread_join(readers[i], NULL);
    }
    return 0;
}

首先,可以從 main 中知道這個程式會建立十個執行緒,一半作為 reader 執行函式 f1,另一半則作為 writer 執行函式 f2。除此之外還建立了 spin_lock 以及 mutex 等並行程式所需的物件。

Locks

https://www.kernel.org/doc/html/latest/locking/spinlocks.html

https://hackmd.io/@sysprog/multicore-locks?type=view

https://hackmd.io/@sysprog/linux-sync?type=view#QueuedMCS-Spinlock

Reader 與 Writer

static void f1() { mutex_acquire(&lock); readers += 1; if (readers == 1) mutex_acquire(&rwlock); mutex_release(&lock); safeprintf(&printlock, "Reader process in\n"); atomic_fetch_add(&n_readers_in, 1); mutex_acquire(&lock); readers -= 1; if (readers == 0) mutex_release(&rwlock); mutex_release(&lock); atomic_fetch_sub(&n_readers_in, 1); safeprintf(&printlock, "Reader process out\n"); }

在沒有 writer 寫入資料時,可以同時有多個 reader 讀取資料;但若是有 writer 正在寫入資料時,所有 reader 則必須等到寫入結束才能進行讀取,因此在 f1 讀取資料前,必須先確認 rwlock 沒有被 writer 持有

由於 writer 持有 rwlock 時不會有 reader 在讀資料,所以能夠確認 writer 是否有 rwlock 的執行緒只有第一個嘗試讀取資料的 reader,因此第 3 行到第 7 行的部份需要有 reader 持有 rwlock 後其他 reader 才能通過。

而所有 reader 都沒在讀取資料時,則不應有任何 reader 持有 rwlock,否則 writer 則無法進行寫入,因此最後一個讀取完資料的 reader 需要負責釋放 rwlock,即第 13 到 14 行的部份。

static void f2()
{
    mutex_acquire(&rwlock);
    atomic_fetch_add(&n_writers_in, 1);
    safeprintf(&printlock, "Writer process in\n");
    mutex_release(&rwlock);
    atomic_fetch_sub(&n_writers_in, 1);
    safeprintf(&printlock, "Writers process out\n");
}

而 writer 部份則比較單純,任何 writer 要寫入資料時都只需要檢查沒有其他 writer 或 reader 在讀取資料即可,因此在一開始就透過 mutex 檢查 rwlock 是否已經被其他執行緒持有。

Create New Thread (EXP1, EXP2)

建立 thread 的相關操作定義在 thread_create 函式中:

int thread_create(thread *t, void *routine, void *arg) { spin_acquire(&__globalLock); static int initState = 0; if (!t || !routine) { spin_release(&__globalLock); return EINVAL; } ...

一開始會進行防呆,若是沒有傳入 thread 物件或是要執行的工作的話,救回傳錯誤值。

防呆的部份應該可以在 spin lock 前進行確認

若是參數有正確傳入的話,就可以開始建立執行緒:

thread tid; void *thread_stack; if (initState == 0) { initState = 1; init(); }

根據 n1570 中 6.2.4 Storage durations of objects 關於 static 的說明:

  1. An object whose identifier is declared without the storage-class specifier _Thread_local, and either with external or internal linkage or with the storage-class specifier static, has static storage duration. Its lifetime is the entire execution of the program and its stored value is initialized only once, prior to program startup.

可以知道 static 變數會一直存在到程式結束,且只會在一開始進行初始化,即第 4 行的宣告只會做一次,因此第 12 到 14 行的部份只會在第一次呼叫 thread_create 時執行到,而 init() 函式的定義是:

static void init()
{
    spin_init(&__globalLock);
    INIT_SIGNALS
    singlyLLInit(&__tidList);
    node *insertedNode = singlyLLInsert(&__tidList, getpid());
    insertedNode->tidCpy = insertedNode->tid;
    insertedNode->fa = NULL;
    atexit(cleanup);
}

是負責初始化 __globalLock 以及用來儲存 TID 的 __tidList 串列,並在初始化 __tidList 串列後會將前的 PID 加入到串列中。而在程式結束後則會透過 cleanup 函式進行資源釋放。

TODO : Signal 相關處理

接著要為建立執行緒做準備:

node *insertedNode = singlyLLInsert(&__tidList, 0); if (!insertedNode) { printf("Thread address not found\n"); spin_release(&__globalLock); return -1; } funcargs *fa; fa = (funcargs *) malloc(sizeof(funcargs)); if (!fa) { printf("Malloc failed\n"); spin_release(&__globalLock); return -1; } fa->f = routine; fa->arg = arg;

要做的準備包含:

  • 為新的執行緒準備一個 __tidList 串列的節點 insertedNode 儲存資訊,並加入到 __tidList 串列中,但這時還沒正式建立執行緒,所以一開始的 TID 會被設為 0。
  • 準備一個 funcargs 結構體的物件 fa 儲存要給新執行緒的參數(要執行的函式的指標、該函式的參數)

若失敗的話就直接離開 thread_thread 函式,否則開始建立執行緒:

thread_stack = allocStack(STACK_SZ, GUARD_SZ); if (!thread_stack) { perror("thread create"); spin_release(&__globalLock); return errno; } fa->stack = thread_stack; thread_t tid = clone(wrap, (char *) thread_stack + STACK_SZ + GUARD_SZ, CLONE_FLAGS, fa, &(EXP1), NULL, &(EXP2));

首先在 allocStack 中嘗試用 mmap 分配一塊大小為 STACK_SZ + GUARD_SZ 的空間 thread_stack,而這個空間會作為 clone 的引數使用。

TODO : GUARD_SZ 相當於 getpagesize(),為什麼會需要 PAGE_SIZE

而根據 clone 的 prototype 以及 Man Page 的說明:

int clone(int (*fn)(void *), void *stack, int flags, void *arg, 
          ... 
          /* pid_t *parent_tid, * void *tls, * pid_t *child_tid */)

These system calls create a new ("child") process, in a manner similar to fork(2).

On success, the thread ID of the child process is returned in the caller's thread of execution. On failure, -1 is returned in the caller's context, no child process is created, and errno is set to indicate the error.

這兩點可以知道 clonefork 相似,能夠用來建立一個子行程,若是成功的話會回傳 child 的 TID,若是失敗的話,則會回傳 -1。

The stack argument specifies the location of the stack used by the child process. Since the child and calling process may share memory, it is not possible for the child process to execute in the same stack as the calling process. The calling process must therefore set up memory space for the child stack and pass a pointer to this space to clone(). Stacks grow downward on all processors that run Linux (except the HP PA processors), so stack usually points to the topmost address of the memory space set up for the child stack. Note that clone() does not provide a means whereby the caller can inform the kernel of the size of the stack area.

而這邊則可知道 thread_stack 會被當作新的行程的 stack 使用,但 stack 應該從高位址向低位址增長,因此要傳入的應該是該 stack 區塊的上界地址,即 thread_stack + STACK_SIZE

CLONE_FLAGS 中包含了:

#define CLONE_FLAGS                                                    \
    CLONE_VM | CLONE_FS | CLONE_FILES | CLONE_SIGHAND | CLONE_THREAD | \
        CLONE_SYSVSEM | CLONE_PARENT_SETTID | CLONE_CHILD_CLEARTID

其中 parent_tid 以及 child_tid 相關的 FLAG 有:

CLONE_PARENT_SETTID
Store the child thread ID at the location pointed to by parent_tid (clone()) or cl_args.parent_tid (clone3()) in the parent's memory. (In Linux 2.5.32-2.5.48 there was a flag CLONE_SETTID that did this.) The store operation completes before the clone call returns control to user space.

CLONE_CHILD_CLEARTID
Clear (zero) the child thread ID at the location pointed to by child_tid (clone()) or cl_args.child_tid (clone3()) in child memory when the child exits, and do a wakeup on the futex at that address. The address involved may be changed by the set_tid_address(2) system call. This is used by threading libraries.

因此 child 的 TID 會儲存在 parent_tid 指向的物件中,而 child 結束時將 child_tid 指向的物件的值重設為 0。

所以在這個函式中,child 的 TID 將會被儲存在 EXP1 指向的物件,而當 child 結束時,將會清除 EXP2 指向的物件的值。而在這題的程式中,使用到 TID 相關的部份都是與結束執行緒相關的操作,包含 thread_killkillAllThreadsdeleteAllThreads 以及 getReturnValue

以傳送 TGKILL 給所有 active threads 的 killAllThreads 為例:

/** * @brief Send process wide signal dispositions to all active threads * @param ll Pointer to linked list * @param signum Signal number * @return On success 0, On failure errno */ int killAllThreads(singlyLL *ll, int signum) { node *tmp = ll->head; pid_t pid = getpid(); int ret; pid_t delpid[100]; int counter = 0; while (tmp) { if (tmp->tid == gettid()) { tmp = tmp->next; continue; } printf("Killed thread %ld\n", tmp->tid); ret = syscall(TGKILL, pid, tmp->tid, signum); if (ret == -1) { perror("tgkill"); return errno; } else { if (signum == SIGINT || signum == SIGKILL) delpid[counter++] = tmp->tid; } tmp = tmp->next; } if (signum == SIGINT || signum == SIGKILL) { for (int i = 0; i < counter; i++) singlyLLDelete(ll, delpid[i]); } return 0; }

其中串列 ll就是 __tidList,因此這個函式會依序對 clone 產生的執行緒(不包含 caller 的執行緒)呼叫 TGKILL 的系統呼叫,若是成功且 signumSIGINTSIGKILL 之一的話則會將目標執行緒的 TID 加入到 delpid 陣列中,並在檢查完 __tidList 後一併從串列中刪除。

delpid 是固定大小,雖然足夠處理這個程式的 10 個讀寫執行緒,但若是執行緒數量增加的話,則需要修改大小。

tgkill 的 Man Page 中可以找到相關說明:

巨集 TGKILL 定義的數值是 234,對應到要執行的系統呼叫的編號,而這個編號定義在 <asm/unistd.h> 中,從中可以找到 #define __NR_tgkill 234

int tgkill(pid_t tgid, pid_t tid, int sig);

tgkill() sends the signal sig to the thread with the thread ID tid in the thread group tgid. (By contrast, kill(2) can be used to send a signal only to a process (i.e., thread group) as a whole, and the signal will be delivered to an arbitrary thread within that process.)

因此在這個函式中的 syscall 會對 TGID 為 pid 的行程中 TID 為 tmp->tid 的執行緒傳送一個編號為 signum 的 signal,但在 thread_create 的第 16 行新增執行緒節點時賦予 tid 的值是 0,所以 tid 勢必有在其他地方被修改,而這個程式中只有 singlyLLInsert 有修改 tid,因此 EXP1 必須為 &insertedNode->tid,才能須透過 CLONE_PARENT_SETTID 這個 flag 將 parent_tid 指向的物件的值,也就是新節點的 tid,修改成新建立的執行緒的 TID。

TODO : 而在這個函式唯一的 caller function thread_kill 中則限制了 signum 會是 SIGINTSIGCONTSIGSTOP 之一,雖然 thread_kill 並沒有在這個程式中被呼叫,但若是被呼叫且傳送三個可能的 signal 的話會發生什麼事?

而在 cleanup 時呼叫的 deleteAllThreads 中:

void cleanup()
{
    deleteAllThreads(&__tidList);
    free(__tidList.head);
}
void deleteAllThreads(singlyLL *l)
{
    node *tmp = l->head;
    int *deleted = NULL;
    int numDeleted = 0;
    while (tmp) {
        if (tmp->tid == 0) {
            deleted = (int *) realloc(deleted, (++numDeleted) * sizeof(int));
            deleted[numDeleted - 1] = tmp->tidCpy;
        }
        tmp = tmp->next;
    }
    for (int i = 0; i < numDeleted; i++)
        singlyLLDelete(l, deleted[i]);
    free(deleted);
}

則可以知道 deleteAllThreads 會在串列 __tidList 中尋找 TID 為 0 的執行緒,但從前面的 EXP1 可知,只要執行緒有成功被建立,tid 就會從 0 被修改成真正的 TID,所以若要刪除「所有」執行緒的話,tid 勢必要在執行緒結束後被修改成 0,也就是對應到 clone 的 這個 FLAG 的作用,因此 EXP2 應指向 tid,即 &insertedNode->tid

CLONE_VM 以及 CLONE_THREAD 則是用來建立一個 child ,依序分別代表:

  • caller 與 child 會使用相同的 Memory Space,因此執行緒間能夠看到彼此記憶體操作的結果,包含 mmap 之類操作分配的記憶體內容,因此在此程式中, child 對 stack 的修改會影響到 caller 分配的空間中的內容。
  • 新 child 會被新增到與 caller 相同的 Thread Group 中,因此 caller 的 PID 與 Child 的 PID 會相同,而相同 PID 的執行緒會有相同的 parent(在這個程式中,該 parent 應為 caller)。但若是在任一執行緒中呼叫 execve 執行一程式的話,會造成 parent 以外的執行緒皆被終止,而該程式將會由 parent 來執行。若 flags 中包含 CLONE_THREAD 的話,CLONE_SIGHAND 也必須包含在 flags 中。

由於 caller 與 child 共用 Address Space,所以 clone 新增的 child 行程其實就是一個執行緒,與這個程式的目的(建立多個執行緒來進行讀寫)相同。

最後,CLONE_FSCLONE_FILESCLONE_SIGHANDCLONE_SYSVSEM依序分別代表新的執行緒與 caller 之間會共用以下資訊:

  • 檔案系統的相關狀態,包含 root、CWD 等資訊
  • File Descriptor Table。(CS:APP 10.8 有 FD Table 相關的資訊)
  • Signal Handler Table。。若 flags 中包含 CLONE_SIGHAND 的話,CLONE_VM 也必須包含在 flags 中。
  • System V semaphore adjustment (semadj) values

    TODO

最後則是檢查執行緒是否正確被建立:

insertedNode->tidCpy = tid; insertedNode->fa = fa; if (tid == -1) { perror("thread create"); free(thread_stack); spin_release(&__globalLock); return errno; } *t = tid; spin_release(&__globalLock); return 0;

若是失敗的話,新節點的 tidCpy 會被設為 -1,且需要清除前面分配的 stack 空間;若是成功被建立的話,就會 tidCpy 則會被設為 child 的 TID,且 *t 將會被設為 child 的 TID,才能夠在 thread_join 時等待正確的執行緒結束。

spin_release 以及 return 可以用 goto 與函式最後的合併

Join (Wait) Thread

而在 thread_create 完成後,主程式(caller)就要透過 thread_join 等待所有執行緒完成工作,而 thread_join 的函式定義為:

int thread_join(thread t, void **retLocation) { spin_acquire(&__globalLock); void *addr = returnCustomTidAddress(&__tidList, t); if (!addr) { spin_release(&__globalLock); return ESRCH; } if (*((pid_t *) addr) == 0) { spin_release(&__globalLock); return EINVAL; }

首先會呼叫 returnCustomTidAddress 函式,該函式定義為:

unsigned long int *returnCustomTidAddress(singlyLL *ll, unsigned long int tid)
{
    node *tmp = ll->head;
    while (tmp) {
        if (tmp->tidCpy == tid)
            return &(tmp->tid);
        tmp = tmp->next;
    }
    return NULL;
}

是用來從 __tidList 中尋找 TID 為 *t 的執行緒,若是有成功找到的話,就會回傳 tid 的地址;若目標執行緒不存在的話,則會回傳 NULL。需要注意的是,由於 tid 會在執行緒結束後被修改成 0,所以這個函式中使用的是 tidCpy

接著則會回到 thread_join 檢查回傳的執行緒的 tid,若是回傳 NULL 的話,就回傳 ESRCH,從 errno 的 Man Page 中可以知道它表示 "No such process";而若是有找到執行緒,但該執行緒的 TID 為 0 的話,則代表該執行緒已經結束或是透過 clone 建立執行緒時發生錯誤,此時會回傳 EINVAL 表示 "Invalid argument"

接著則是透過 while 等待執行緒結束(tid 從非零變成零):

int ret; while (*((pid_t *) addr) == t) { spin_release(&__globalLock); ret = syscall(SYS_futex, addr, FUTEX_WAIT, t, NULL, NULL, 0); spin_acquire(&__globalLock); } syscall(SYS_futex, addr, FUTEX_WAKE, INT_MAX, NULL, NULL, 0); if (retLocation) { node *insertedNode = returnCustomNode(&__tidList, t); *retLocation = insertedNode->retVal; } spin_release(&__globalLock); return ret; }

而在檢查的過程中並非單純 busy waiting,而是會呼叫編號為 SYS_futex 的系統呼叫,

thread_createthread_join 都應該只有 caller 這個執行緒會執行到,且 __globalLock 也只有用在這兩個函式中,__globalLock 看起來是多餘的。

uThread

實作效能分析,比較 POSIX Thread (NPTL) 和自行建立的 N:1 Threading model 之間的效能落差


第二題 - Atomic Hash Table

這提要實作的部份是 hashmap.c 中的 hashmap_del 以及 hashmap_put 兩個函式,而這個雜湊表的資料結構定義在 hashmap.h 中:

/* links in the linked lists that each bucket uses */
typedef struct hashmap_keyval {
    struct hashmap_keyval *next;
    const void *key;
    void *value;
} hashmap_kv_t;

/* main hashmap struct with buckets of linked lists */
typedef struct {
    hashmap_kv_t **buckets;
    uint32_t n_buckets;

    uint32_t length; /* total count of entries */

    /* pointer to the hash and comparison functions */
    uint64_t (*hash)(const void *key);
    uint8_t (*cmp)(const void *x, const void *y);

    /* custom memory management of internal linked lists */
    void *opaque;
    hashmap_kv_t *(*create_node)(void *opaque, const void *key, void *data);
    void (*destroy_node)(void *opaque, hashmap_kv_t *node);
} hashmap_t;

是一個有 n_buckets 個欄位的雜湊表,每個欄位都是 hashmap_kv_t 組成的單向鏈結串列,由 buckets[i] 指向第 i 個欄位的首個節點。

hashmap_put 以及 hashmap_del 分別是對這個雜湊表新增以及刪除節點:

hashmap_put

while (true) {
    /* copy the head of the list before checking entries for equality */
    head = map->buckets[bucket_index];

    /* find any existing matches to this key */
    prev = NULL;
    if (head) {
        for (kv = head; kv; kv = kv->next) {
            if (map->cmp(key, kv->key) == 0)
                break;
            prev = kv;
        }
    }

    if (kv) {      /* if the key exists, update and return it */
        ...
    } else {       /* if the key does not exist, try adding it */
        ...
    }
}

不論是 if 還是 else 部份都是放在無限 while 迴圈中,因此 put 操作會執行到確實將新的 key 更新或是新增到雜湊表中之後才會結束,並分別回傳 truefalse 代表是更新相同 key 的節點的資料或是新增新的 key 的節點

而在這個迴圈中,由於新增或更新節點可能會失敗,所以每次迴圈都需要先尋找雜湊表中是否有相同 key 的節點存在,若有的話 kv 會指向該節點,否則 kv 則會是 NULL,接著就會根據結果決定要新增節點還是更新資料:

新增節點(kv == NULL

if (kv) { ... } else { /* if the key does not exist, try adding it */ if (!next) /* make the next key-value pair to append */ next = map->create_node(map->opaque, key, value); next->next = NULL; if (head) /* make sure the reference to existing nodes is kept */ next->next = head; /* prepend the kv-pair or lazy-make the bucket */ if (__atomic_compare_exchange(&map->buckets[bucket_index], &head, &next, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { __atomic_fetch_add(&map->length, 1, __ATOMIC_SEQ_CST); return false; } /* failure means another thead updated head before this. * track the CAS failure for tests -- non-atomic to minimize * thread contention */ hashmap_put_retries += 1; }

新增節點部份是在 else 中完成。由於新增節點可能會失敗,因此在第 4 行的 if 首先要檢查是否已經有新增過節點了,以免每次失敗都要將節點 free 掉並重新建立節點;若是還沒建立過的話則會建立一個新的節點,並將對應欄位的首位節點 map->buckets[bucket_index] 指向新的節點。

TODO : map->opaque 用途

而建立節點後則分成兩種情況:

  • 目前欄位中不存在節點:新節點的 next 指向 NULL
  • 目前欄位中有其他節點:新節點的 next 指向原本的 map->buckets[bucket_index]

當目前欄位中不存在節點的話,head 即為 NULL,因此可以直接 next->next = head,不需要進行額外判斷。

將新節點指向原 head 之後則要將新節點更新到雜湊表上,這部份由於是多執行緒的程式,所以需要透過 __atomic_fetch_add__atomic_compare_exchange 這兩個 GCC 內建的 atomic 操作完成:

bool __atomic_compare_exchange (
    type *ptr, 
    type *expected, 
    type *desired, 
    bool weak, 
    int success_memorder, 
    int failure_memorder
)

這個函式會將 ptr 指向物件的值(*ptr)與 expected 指向的物件的值(*expected)進行比較,如果兩者相同的話則會依照 read-modify-write 的方式以及 success_memorder 的限制將 *ptr 的值修改成 *desired 並回傳 true;若不同的話則會依照 failure_memorder 的限制(不可為 __ATOMIC_RELEASE__ATOMIC_ACQ_REL)將 *exprected 修改成實際讀取到的值並回傳 false

memorder 參數的部份則可以用來限制編譯器對指令排序的最佳化處理,分別對應到並行程式設計: Atomics 操作中介紹的 6 種類型。

因此第 12 行的 if 判斷是用來判斷一開始讀取到的 head 是否仍是同一個 head

  • 若不同的話代表需要改將新節點 next 的下個節點 next->next 指向新的 head

  • 若相同的話則會將 map->buckets[bucket_index] 指向新的節點 next,並透過 __atomic_fetch_add 修改雜湊表中的節點總數:

    ​​​​type __atomic_fetch_add (
    ​​​​    type *ptr, 
    ​​​​    type val, 
    ​​​​    int memorder    
    ​​​​)
    

    這個內建函式會取得 ptr 所指向物件的值(*ptr)並對其進行加法運算,最後再回傳加法運算前的值,相當於memorder 指定的方式進行以下操作:

    ​​​​{ tmp = *ptr; *ptr += val; return tmp; }
    

    比較需要注意的是,若 *ptr 是指標型態的話,則相當於對 uintptr_t 進行運算,而不是以 sizeof(*ptr) 作為單位進行加法運算。

更新資料

if (kv) { /* if the key exists, update and return it */ if (!next) /* lazy make the next key-value pair to append */ next = map->create_node(map->opaque, key, value); /* ensure the linked list's existing node chain persists */ next->next = kv->next; /* CAS-update the reference in the previous node */ if (prev) { ... } else { /* no previous link, update the head of the list */ ... } else { ... }

if 部份則是用來更新相同 key 的節點中的資料,會由擁有新 data 的節點 next原先相同 key 的節點 kv 替換掉。

由於更新節點是由新節點取代原先相同 key 的節點,所以新增節點的 next = map->create_node(map->opaque, key, value) 可以在 while 外先做。

而要替換掉 kv 的話,需要將 next 的下個節點指向 kv->next,並將 prev 的下個節點指向新的節點,但與單向鏈結串列的問題相同,要取代的節點可能是首位節點,而當 kv 是首位節點時 prev 會是 NULL,所以需要分成兩種狀況處理:

  1. 非首位節點

    ​​​​/* CAS-update the reference in the previous node */
    ​​​​if (prev) {
    ​​​​    /* replace this link, assuming it has not changed by another
    ​​​​     * thread
    ​​​​     */
    ​​​​    if (__atomic_compare_exchange(KKKK, 
    ​​​​                                  &kv, 
    ​​​​                                  &next, 
    ​​​​                                  false,
    ​​​​                                  __ATOMIC_SEQ_CST,
    ​​​​                                  __ATOMIC_SEQ_CST)) {
    ​​​​        /* this node, key and value are never again used by this */
    ​​​​        map->destroy_node(map->opaque, kv);
    ​​​​        return true;
    ​​​​    }
    ​​​​    hashmap_put_replace_fail += 1;
    ​​​​} else { /* no previous link, update the head of the list */
    ​​​​    ...
    ​​​​}
    

    kv 是中間節點時,只需要直接將 prev 指向的下個節點換成新節點即可,但與新增節點一樣要考慮到其他執行緒,所以要透過 __atomic_compare_exchange 進行 atomic 操作來取代原節點,因此 KKKK 應為 &prev->next。而成功取代後則需要將原本的節點 kv 刪除,並回傳 true 代表是取代原先的節點,而不是新增節點。

  2. 首位節點

    ​​​​/* CAS-update the reference in the previous node */
    ​​​​if (prev) {
    ​​​​    ...
    ​​​​} else { /* no previous link, update the head of the list */
    ​​​​    /* set the head of the list to be whatever this node points to
    ​​​​     * (NULL or other links)
    ​​​​     */
    ​​​​    if (__atomic_compare_exchange(QQQQ, 
    ​​​​                                  &kv,
    ​​​​                                  &next, 
    ​​​​                                  false, 
    ​​​​                                  __ATOMIC_SEQ_CST,
    ​​​​                                  __ATOMIC_SEQ_CST)) {
    ​​​​        map->destroy_node(map->opaque, kv);
    ​​​​        return true;
    ​​​​    }
    
    ​​​​    /* failure means at least one new entry was added, retry the
    ​​​​     * whole match/del process
    ​​​​     */
    ​​​​    hashmap_put_head_fail += 1;
    ​​​​}
    

    當沒有 prev 的時候,需要取代的節點則是 map->buckets[bucket_index] 指向的節點,因此 QQQQ 應為 &map->buckets[bucket_index],其他部份則與 kv 在中間的情況相同。

    將 prev 改成 indirect pointer 移除 branch。

為什麼 CAS 部份要用 __ATOMIC_SEQ_CST

TODO

hashmap_del

hashmap_del 函式則是負責從雜湊表中移除相同 key 的節點:

/* try to find a match, loop in case a delete attempt fails */
while (true) {
    hashmap_kv_t *match, *prev = NULL;
    for (match = map->buckets[bucket_index]; match; match = match->next) {
        if ((*map->cmp)(key, match->key) == 0)
            break;
        prev = match;
    }

    /* exit if no match was found */
    if (!match)
        return false;

    /* previous means this not the head but a link in the list */
    if (prev) { /* try the delete but fail if another thread did delete */
        if (__atomic_compare_exchange(ZZZZ, &match, &match->next,
                                      false, __ATOMIC_SEQ_CST,
                                      __ATOMIC_SEQ_CST)) {
            __atomic_fetch_sub(&map->length, 1, __ATOMIC_SEQ_CST);
            map->destroy_node(map->opaque, match);
            return true;
        }
        hashmap_del_fail += 1;
    } else { /* no previous link means this needs to leave empty bucket */
        ...
    }
}

因此首先要先從對應的欄位中檢查是否有相同 key 的節點存在。若不存在的話就回傳 false 代表目標 key 不存在;否則嘗試從雜湊表中移除目標節點,直到成功移除或是被其他執行緒移除。

而移除節點與更新節點相同,需要透過 prev 檢查目標節點是否為該欄位的首位節點,因此大致上與更新資料要做的事相同,首先要透過 __atomic_compare_exchange 修改目標節點前一節點的 prev 的下個節點,因此 ZZZZ 應為 &prev->next

而若是成功移除的話,除了釋放原節點所佔用的空間之外,還需要負責將雜湊表節點總數減 1,而這個操作與新增節點相同,需要考慮到其他執行緒,所以要使用 atomic 操作的 __atomic_fetch_submap->length 減 1。

indirect pointer

設計和實作的缺失

malloccalloc 沒有檢查回傳值

[Commit] 中指出的,malloc 以及 calloc 後應檢查是否成功分配記憶體空間,因此使用巨集 dma_wrap 包裝 malloc 以及 calloc

#define dma_wrap(dma, ...)                                            \
    ({                                                                \
        void *_p = dma(__VA_ARGS__);                                  \
        if (!_p) {                                                    \
            printf("%s failed in %s:%d\n", #dma, __func__, __LINE__); \
            exit(1);                                                  \
        }                                                             \
        _p;                                                           \
    })

hashmap_put_retries 以及其他四個統計錯誤次數的 volatile 變數

hashmap_put_retrieshashmap_put_replace_failhashmap_put_head_failhashmap_del_failhashmap_del_fail_new_head 五個 volatile 變數在 hashmap.c 的註解 中提到是為了減少 Thread Contention 所以未使用 atomic 運算,但考慮到程式正確性,使用了 #ifdef 搭配巨集在編譯時期根據巨集的定義與否來決定實作方式,實際修改則在 [Commit] 中。

#ifndef THREAD_CONTENTION_MIN
    __atomic_fetch_add(&hashmap_put_retries, 1, __ATOMIC_SEQ_CST);
#else
    /* failure means another thead updated head before this.
     * track the CAS failure for tests -- non-atomic to minimize
     * thread contention
     */
    hashmap_put_retries += 1;
#endif

Memory Leak

如同 hankluo6 的報告中提到的數個未釋放記憶體的問題:

  1. 雜湊表 map 未被釋放

    test-hashmap.c 中宣告的雜湊表物件 map 未在結束程式前釋放記憶體空間,因此在 hashmap.[ch] 中新增一個函式 hashmap_free_later,將雜湊表以及其中需要釋放的物件透過 free_later() 以及 map->destroy_node 加入到 free later 串列中,等待程式結束前一併釋放。

    ​​​​void hashmap_free_later(hashmap_t *map)
    ​​​​{
    ​​​​    // traverse each bucket and free their list later
    ​​​​    for (int i = 0; i < map->n_buckets; ++i)
    ​​​​        for (hashmap_kv_t *node = map->buckets[i]; node; node = node->next)
    ​​​​            map->destroy_node(map->opaque, node);
    
    ​​​​    free_later(map->buckets, free);
    ​​​​    free_later(map, free);
    ​​​​}
    
  2. free_later_run() 未正確釋放 free later 串列的節點

    ​​​​-    for (list_node_t *n = buffer_prev->head; n; n = n->next) {
    ​​​​-        free_later_t *v = n->val;
    ​​​​-        v->free(v->var);
    ​​​​-        free(n);
    ​​​​+    for (list_node_t *node = buffer_prev->head, *next; node; node = next) {
    ​​​​+        // keep next node's address
    ​​​​+        next = node->next;
    ​​​​+
    ​​​​+        // free node
    ​​​​+        free_later_t *var = node->val;
    ​​​​+        if (var->free)
    ​​​​+            var->free(var->var);
    ​​​​+        free(var);
    ​​​​+        free(node);
    

    在原先的實作中有兩個問題:

    1. n = n->next 會試圖存取已經被釋放掉的 n 使用的空間。
    2. free later 串列中節點 nval 在這個程式中是用來保存雜湊表的節點,而該節點是透過動態記憶體分配的空間,因此 v 也需要被釋放。
  3. free_later_stage() 中的條件判斷

    ​​​​    ...
    ​​​​    if (!buffer_prev || buffer_prev->length == 0) {
    ​​​​        release_lock(&lock);
    ​​​​        return;
    ​​​​    }
    
    ​​​​    /* swap the buffers */
    ​​​​    buffer_prev = buffer;
    ​​​​    buffer = list_new();
    
    ​​​​    release_lock(&lock);
    ​​​​}
    

    一開始的條件判斷會在 buffer_prevNULL 時直接結束函式,造成 buffer 的值無法存到 buffer_prev 中,因此首先需要將 buffer 為空以及尚未分配記憶體位置兩種情況分開處理:

    • 尚未分配 buffer 時應建立一個串列作給 buffer 使用
    • buffer 為空時則不須將當前 buffer 中的資料存到 buffer_prev,直接結束函式即可

    而當 buffer 中有資料時,則須分成 buffer_prev 是否存有資料兩種情況處理,但由於 free_later_run() 中會檢查 buffer_prev 因此兩種情況可直接合併,並在最後將 buffer 的資料存到 buffer_prev 中。

    ​​​​    ...
    ​​​​    // empty buffer, nothing to do
    ​​​​    if (!buffer)
    ​​​​        goto newlist;
    ​​​​    if (!buffer->length)
    ​​​​        goto finish;
    
    ​​​​    free_later_run();
    ​​​​    buffer_prev = buffer;
    
    ​​​​newlist:
    ​​​​    buffer = list_new();
    ​​​​finish:
    ​​​​    release_lock(&lock);
    ​​​​}
    

除此報告中指出的部份之外,還有其他部份也可能會造成 Memory Leak:

  1. test_del 中重新建立雜湊表:

    ​​​​while (!hashmap_del_fail || !hashmap_del_fail_new_head) {
    ​​​​    if (!map)
    ​​​​        map = hashmap_new(10, cmp_uint32, hash_uint32);
    
    ​​​​    /* multi-threaded add values */
    ​​​​    if (!mt_del_vals()) {
    ​​​​        printf("test_del() is failing. Can't complete mt_del_vals()");
    ​​​​        return false;
    ​​​​    }
    ​​​​...
    

    test-hashmap.c 中會直接重新建立一個雜湊表並覆蓋掉原本的 map、造成原本的雜湊表未被正確釋放,因此需要在重新建立前先檢查 map 是否為 NULL

  2. mt_add_vals 中分配的 offset 未被正確釋放

    mt_add_vals 函式中會建立 N_THREADS 個執行緒並透過動態記憶體配置空間給 offset 作為 add_vals 的參數,但 offset 這變數會被重複配置記憶體空間,因此需要在 add_vals 中釋放對應的參數 args

    ​​​​static void *add_vals(void *args)
    ​​​​{
    ​​​​    int *offset = args;
    ​​​​    for (int j = 0; j < N_LOOPS; j++) {
    ​​​​        int *val = dma_wrap(malloc, sizeof(int));
    ​​​​        *val = (*offset * N_LOOPS) + j;
    ​​​​        hashmap_put(map, val, val);
    ​​​​    }
    ​​​​    free(args);
    ​​​​    return NULL;
    ​​​​}
    

Invalid Free

mt_del_vals 中會先建立 N_THREADS 個執行緒執行 add_val 函式,而該函式會將 N_LOOPS(MAX_VAL_PLUS_ONE, MAX_VAL_PLUS_ONE) 的 Key Value Pair 新增到雜湊表中。

static uint32_t MAX_VAL_PLUS_ONE = N_THREADS * N_LOOPS + 1;

其中 MAX_VAL_PLUS_ONE 的定義如上,是宣告在 test-hashmap.c 中的靜態變數。然而在 mt_del_vals 的另外 N_THREADS 個執行緒執行 del_val 函式時,會將找到的 (MAX_VAL_PLUS_ONE, MAX_VAL_PLUS_ONE) 的 Key Value Pair 加入到 free later 串列中,並在程式結束前透過 free 釋放,造成 Invalid Free。

return false

研讀以下資料

  1. Lock-free wait-free hash table using C11 atomics 程式碼
  2. 論文 Lock-Free Linked Lists and Skip Lists

參照 atomic_hash 相關程式碼,以更大規模進行測試