---
tags: linux2022, quiz
---
# 2022q1 Homework5 (quiz6)
contributed by < `freshLiver` >
> [quiz6](https://hackmd.io/@sysprog/linux2022-quiz6)
## [第一題](https://hackmd.io/@sysprog/linux2022-quiz6#%E6%B8%AC%E9%A9%97-1) - [Multithreaded Read Write](https://gist.github.com/jserv/f1c55b97ae6cb0ae8f4945203c2b12c9)
### Main
```c
static mutex_t lock, rwlock;
static spin_t printlock;
static int readers = 0;
static int n_readers_in = 0, n_writers_in = 0;
int main()
{
mutex_init(&lock);
mutex_init(&rwlock);
spin_init(&printlock);
atomic_init(&n_readers_in, 0);
atomic_init(&n_writers_in, 0);
thread readers[5], writers[5];
for (int i = 0; i < 5; i++) {
thread_create(&readers[i], f1, NULL);
thread_create(&writers[i], f2, NULL);
}
for (int i = 0; i < 5; i++) {
thread_join(writers[i], NULL);
thread_join(readers[i], NULL);
}
return 0;
}
```
首先,可以從 main 中知道這個程式會建立十個執行緒,一半作為 `reader` 執行函式 `f1`,另一半則作為 `writer` 執行函式 `f2`。除此之外還建立了 `spin_lock` 以及 `mutex` 等並行程式所需的物件。
### Locks
https://www.kernel.org/doc/html/latest/locking/spinlocks.html
https://hackmd.io/@sysprog/multicore-locks?type=view
https://hackmd.io/@sysprog/linux-sync?type=view#QueuedMCS-Spinlock
### Reader 與 Writer
```c=
static void f1()
{
mutex_acquire(&lock);
readers += 1;
if (readers == 1)
mutex_acquire(&rwlock);
mutex_release(&lock);
safeprintf(&printlock, "Reader process in\n");
atomic_fetch_add(&n_readers_in, 1);
mutex_acquire(&lock);
readers -= 1;
if (readers == 0)
mutex_release(&rwlock);
mutex_release(&lock);
atomic_fetch_sub(&n_readers_in, 1);
safeprintf(&printlock, "Reader process out\n");
}
```
在沒有 writer 寫入資料時,可以同時有多個 reader 讀取資料;但若是有 writer 正在寫入資料時,所有 reader 則必須等到寫入結束才能進行讀取,因此在 `f1` 讀取資料前,必須先確認 `rwlock` **沒有被 writer 持有**。
由於 writer 持有 `rwlock` 時不會有 reader 在讀資料,所以能夠確認 writer 是否有 `rwlock` 的執行緒只有**第一個嘗試讀取資料的 reader**,因此第 3 行到第 7 行的部份需要有 reader 持有 `rwlock` 後其他 reader 才能通過。
而所有 reader 都沒在讀取資料時,則不應有任何 reader 持有 `rwlock`,否則 writer 則無法進行寫入,因此**最後一個讀取完資料的 reader** 需要負責釋放 `rwlock`,即第 13 到 14 行的部份。
```c
static void f2()
{
mutex_acquire(&rwlock);
atomic_fetch_add(&n_writers_in, 1);
safeprintf(&printlock, "Writer process in\n");
mutex_release(&rwlock);
atomic_fetch_sub(&n_writers_in, 1);
safeprintf(&printlock, "Writers process out\n");
}
```
而 writer 部份則比較單純,任何 writer 要寫入資料時都只需要檢查沒有其他 writer 或 reader 在讀取資料即可,因此在一開始就透過 mutex 檢查 `rwlock` 是否已經被其他執行緒持有。
### Create New Thread (EXP1, EXP2)
建立 thread 的相關操作定義在 `thread_create` 函式中:
```c=
int thread_create(thread *t, void *routine, void *arg)
{
spin_acquire(&__globalLock);
static int initState = 0;
if (!t || !routine) {
spin_release(&__globalLock);
return EINVAL;
}
...
```
一開始會進行防呆,若是沒有傳入 `thread` 物件或是要執行的工作的話,救回傳錯誤值。
:::warning
防呆的部份應該可以在 spin lock 前進行確認
:::
若是參數有正確傳入的話,就可以開始建立執行緒:
```c=10
thread tid;
void *thread_stack;
if (initState == 0) {
initState = 1;
init();
}
```
根據 n1570 中 [6.2.4 Storage durations of objects](http://www.open-std.org/jtc1/sc22/wg14/www/docs/n1570.pdf#%5B%7B%22num%22%3A114%2C%22gen%22%3A0%7D%2C%7B%22name%22%3A%22XYZ%22%7D%2C-27%2C816%2Cnull%5D) 關於 static 的說明:
> 3. An object whose identifier is declared without the storage-class specifier _Thread_local, and either with external or internal linkage or with the storage-class specifier static, has **static storage duration. Its lifetime is the entire execution of the program and its stored value is initialized only once, prior to program startup**.
可以知道 `static` 變數會一直存在到程式結束,且只會在一開始進行初始化,即第 4 行的宣告只會做一次,因此第 12 到 14 行的部份只會在第一次呼叫 `thread_create` 時執行到,而 `init()` 函式的定義是:
```c
static void init()
{
spin_init(&__globalLock);
INIT_SIGNALS
singlyLLInit(&__tidList);
node *insertedNode = singlyLLInsert(&__tidList, getpid());
insertedNode->tidCpy = insertedNode->tid;
insertedNode->fa = NULL;
atexit(cleanup);
}
```
是負責初始化 `__globalLock` 以及用來儲存 TID 的 `__tidList` 串列,並在初始化 `__tidList` 串列後會將前的 PID 加入到串列中。而在程式結束後則會透過 `cleanup` 函式進行資源釋放。
:::warning
TODO : Signal 相關處理
:::
接著要為建立執行緒做準備:
```c=16
node *insertedNode = singlyLLInsert(&__tidList, 0);
if (!insertedNode) {
printf("Thread address not found\n");
spin_release(&__globalLock);
return -1;
}
funcargs *fa;
fa = (funcargs *) malloc(sizeof(funcargs));
if (!fa) {
printf("Malloc failed\n");
spin_release(&__globalLock);
return -1;
}
fa->f = routine;
fa->arg = arg;
```
要做的準備包含:
- 為新的執行緒準備一個 `__tidList` 串列的節點 `insertedNode` 儲存資訊,並加入到 `__tidList` 串列中,但這時還沒正式建立執行緒,所以一開始的 TID 會被設為 0。
- 準備一個 `funcargs` 結構體的物件 `fa` 儲存要給新執行緒的參數(要執行的函式的指標、該函式的參數)
若失敗的話就直接離開 `thread_thread` 函式,否則開始建立執行緒:
```c=31
thread_stack = allocStack(STACK_SZ, GUARD_SZ);
if (!thread_stack) {
perror("thread create");
spin_release(&__globalLock);
return errno;
}
fa->stack = thread_stack;
thread_t tid = clone(wrap,
(char *) thread_stack + STACK_SZ + GUARD_SZ,
CLONE_FLAGS,
fa,
&(EXP1),
NULL,
&(EXP2));
```
首先在 `allocStack` 中嘗試用 `mmap` 分配一塊大小為 `STACK_SZ + GUARD_SZ` 的空間 `thread_stack`,而這個空間會作為 `clone` 的引數使用。
:::warning
TODO : `GUARD_SZ` 相當於 `getpagesize()`,為什麼會需要 PAGE_SIZE
:::
而根據 clone 的 prototype 以及 [Man Page](https://man7.org/linux/man-pages/man2/clone.2.html) 的說明:
```c
int clone(int (*fn)(void *), void *stack, int flags, void *arg,
...
/* pid_t *parent_tid, * void *tls, * pid_t *child_tid */)
```
> These system calls create a new ("child") process, in a manner similar to fork(2).
> On success, the thread ID of the child process is returned in the caller's thread of execution. On failure, -1 is returned in the caller's context, no child process is created, and errno is set to indicate the error.
這兩點可以知道 `clone` 與 [`fork`](https://man7.org/linux/man-pages/man2/fork.2.html) 相似,能夠用來建立一個子行程,若是成功的話會回傳 child 的 TID,若是失敗的話,則會回傳 -1。
> The **stack** argument specifies the location of the stack used by the child process. Since the child and calling process may share memory, it is not possible for the child process to execute in the same stack as the calling process. The calling process must therefore set up memory space for the child stack and pass a pointer to this space to clone(). Stacks grow downward on all processors that run Linux (except the HP PA processors), so stack usually points to the topmost address of the memory space set up for the child stack. Note that clone() does not provide a means whereby the caller can inform the kernel of the size of the stack area.
而這邊則可知道 `thread_stack` 會被當作新的行程的 stack 使用,但 stack 應該從高位址向低位址增長,因此要傳入的應該是該 stack 區塊的上界地址,即 `thread_stack + STACK_SIZE`。
而 `CLONE_FLAGS` 中包含了:
```c
#define CLONE_FLAGS \
CLONE_VM | CLONE_FS | CLONE_FILES | CLONE_SIGHAND | CLONE_THREAD | \
CLONE_SYSVSEM | CLONE_PARENT_SETTID | CLONE_CHILD_CLEARTID
```
其中 `parent_tid` 以及 `child_tid` 相關的 FLAG 有:
> **CLONE_PARENT_SETTID**
> Store the child thread ID at the location pointed to by parent_tid (clone()) or cl_args.parent_tid (clone3()) in the parent's memory. (In Linux 2.5.32-2.5.48 there was a flag CLONE_SETTID that did this.) The store operation completes before the clone call returns control to user space.
> **CLONE_CHILD_CLEARTID**
> Clear (zero) the child thread ID at the location pointed to by child_tid (clone()) or cl_args.child_tid (clone3()) in child memory when the child exits, and do a wakeup on the futex at that address. The address involved may be changed by the set_tid_address(2) system call. This is used by threading libraries.
因此 child 的 TID 會儲存在 `parent_tid` 指向的物件中,而 child 結束時將 `child_tid` 指向的物件的值重設為 0。
所以在這個函式中,child 的 TID 將會被儲存在 **EXP1** 指向的物件,而當 child 結束時,將會清除 **EXP2** 指向的物件的值。而在這題的程式中,使用到 TID 相關的部份都是與結束執行緒相關的操作,包含 `thread_kill`、`killAllThreads`、`deleteAllThreads` 以及 `getReturnValue`。
以傳送 `TGKILL` 給所有 active threads 的 `killAllThreads` 為例:
```c=
/**
* @brief Send process wide signal dispositions to all active threads
* @param ll Pointer to linked list
* @param signum Signal number
* @return On success 0, On failure errno
*/
int killAllThreads(singlyLL *ll, int signum)
{
node *tmp = ll->head;
pid_t pid = getpid();
int ret;
pid_t delpid[100];
int counter = 0;
while (tmp) {
if (tmp->tid == gettid()) {
tmp = tmp->next;
continue;
}
printf("Killed thread %ld\n", tmp->tid);
ret = syscall(TGKILL, pid, tmp->tid, signum);
if (ret == -1) {
perror("tgkill");
return errno;
} else {
if (signum == SIGINT || signum == SIGKILL)
delpid[counter++] = tmp->tid;
}
tmp = tmp->next;
}
if (signum == SIGINT || signum == SIGKILL) {
for (int i = 0; i < counter; i++)
singlyLLDelete(ll, delpid[i]);
}
return 0;
}
```
其中串列 `ll`就是 `__tidList`,因此這個函式會依序對 clone 產生的執行緒(不包含 caller 的執行緒)呼叫 `TGKILL` 的系統呼叫,若是成功且 `signum` 為 `SIGINT`、`SIGKILL` 之一的話則會將目標執行緒的 TID 加入到 `delpid` 陣列中,並在檢查完 `__tidList` 後一併從串列中刪除。
:::warning
`delpid` 是固定大小,雖然足夠處理這個程式的 10 個讀寫執行緒,但若是執行緒數量增加的話,則需要修改大小。
:::
而 [`tgkill` 的 Man Page](https://man7.org/linux/man-pages/man2/tgkill.2.html) 中可以找到相關說明:
:::info
巨集 `TGKILL` 定義的數值是 234,對應到要執行的系統呼叫的編號,而這個編號定義在 `<asm/unistd.h>` 中,從中可以找到 `#define __NR_tgkill 234`。
:::
```c
int tgkill(pid_t tgid, pid_t tid, int sig);
```
> tgkill() sends the signal sig to the thread with the thread ID tid in the thread group tgid. (By contrast, kill(2) can be used to send a signal only to a process (i.e., thread group) as a whole, and the signal will be delivered to an arbitrary thread within that process.)
因此在這個函式中的 `syscall` 會對 TGID 為 `pid` 的行程中 TID 為 `tmp->tid` 的執行緒傳送一個編號為 `signum` 的 signal,但在 `thread_create` 的第 16 行新增執行緒節點時賦予 `tid` 的值是 `0`,所以 `tid` 勢必有在其他地方被修改,而這個程式中只有 `singlyLLInsert` 有修改 `tid`,因此 ==EXP1== 必須為 `&insertedNode->tid`,才能須透過 `CLONE_PARENT_SETTID` 這個 flag 將 `parent_tid` 指向的物件的值,也就是新節點的 `tid`,修改成新建立的執行緒的 TID。
:::warning
TODO : 而在這個函式唯一的 caller function `thread_kill` 中則限制了 `signum` 會是 `SIGINT`、`SIGCONT`、`SIGSTOP` 之一,雖然 `thread_kill` 並沒有在這個程式中被呼叫,但若是被呼叫且傳送三個可能的 signal 的話會發生什麼事?
:::
而在 `cleanup` 時呼叫的 `deleteAllThreads` 中:
```c
void cleanup()
{
deleteAllThreads(&__tidList);
free(__tidList.head);
}
```
```c
void deleteAllThreads(singlyLL *l)
{
node *tmp = l->head;
int *deleted = NULL;
int numDeleted = 0;
while (tmp) {
if (tmp->tid == 0) {
deleted = (int *) realloc(deleted, (++numDeleted) * sizeof(int));
deleted[numDeleted - 1] = tmp->tidCpy;
}
tmp = tmp->next;
}
for (int i = 0; i < numDeleted; i++)
singlyLLDelete(l, deleted[i]);
free(deleted);
}
```
則可以知道 `deleteAllThreads` 會在串列 `__tidList` 中尋找 **TID 為 0 的執行緒**,但從前面的 EXP1 可知,只要執行緒有成功被建立,`tid` 就會從 0 被修改成真正的 TID,所以若要刪除「所有」執行緒的話,`tid` 勢必要在執行緒結束後被修改成 0,也就是對應到 clone 的 這個 FLAG 的作用,因此 ==EXP2== 應指向 `tid`,即 `&insertedNode->tid`。
而 `CLONE_VM` 以及 `CLONE_THREAD` 則是用來建立一個 child ,**依序**分別代表:
- caller 與 child 會使用相同的 Memory Space,因此執行緒間能夠看到彼此記憶體操作的結果,包含 `mmap` 之類操作分配的記憶體內容,因此在此程式中, child 對 stack 的修改會影響到 caller 分配的空間中的內容。
- 新 child 會被新增到與 caller 相同的 Thread Group 中,因此 caller 的 PID 與 Child 的 PID 會相同,而相同 PID 的執行緒會有相同的 parent(在這個程式中,該 parent 應為 caller)。但若是在任一執行緒中呼叫 [execve](https://man7.org/linux/man-pages/man2/execve.2.html) 執行一程式的話,會造成 parent 以外的執行緒皆被終止,而該程式將會由 parent 來執行。若 `flags` 中包含 `CLONE_THREAD` 的話,`CLONE_SIGHAND` 也必須包含在 `flags` 中。
由於 caller 與 child 共用 Address Space,所以 `clone` 新增的 child 行程其實就是一個執行緒,與這個程式的目的(建立多個執行緒來進行讀寫)相同。
最後,`CLONE_FS`、`CLONE_FILES`、`CLONE_SIGHAND`、`CLONE_SYSVSEM` 則**依序**分別代表新的執行緒與 caller 之間會**共用**以下資訊:
- **檔案系統的相關狀態**,包含 root、CWD 等資訊
- **File Descriptor Table**。(CS:APP 10.8 有 FD Table 相關的資訊)
- **Signal Handler Table**。。若 `flags` 中包含 `CLONE_SIGHAND` 的話,`CLONE_VM` 也必須包含在 `flags` 中。
- **System V semaphore adjustment (semadj) values**
:::warning
TODO
:::
最後則是檢查執行緒是否正確被建立:
```c=45
insertedNode->tidCpy = tid;
insertedNode->fa = fa;
if (tid == -1) {
perror("thread create");
free(thread_stack);
spin_release(&__globalLock);
return errno;
}
*t = tid;
spin_release(&__globalLock);
return 0;
```
若是失敗的話,新節點的 `tidCpy` 會被設為 -1,且需要清除前面分配的 stack 空間;若是成功被建立的話,就會 `tidCpy` 則會被設為 child 的 TID,且 `*t` 將會被設為 child 的 TID,才能夠在 `thread_join` 時等待正確的執行緒結束。
:::warning
spin_release 以及 return 可以用 goto 與函式最後的合併
:::
### Join (Wait) Thread
而在 `thread_create` 完成後,主程式(caller)就要透過 `thread_join` 等待所有執行緒完成工作,而 `thread_join` 的函式定義為:
```c=
int thread_join(thread t, void **retLocation)
{
spin_acquire(&__globalLock);
void *addr = returnCustomTidAddress(&__tidList, t);
if (!addr) {
spin_release(&__globalLock);
return ESRCH;
}
if (*((pid_t *) addr) == 0) {
spin_release(&__globalLock);
return EINVAL;
}
```
首先會呼叫 `returnCustomTidAddress` 函式,該函式定義為:
```c
unsigned long int *returnCustomTidAddress(singlyLL *ll, unsigned long int tid)
{
node *tmp = ll->head;
while (tmp) {
if (tmp->tidCpy == tid)
return &(tmp->tid);
tmp = tmp->next;
}
return NULL;
}
```
是用來從 `__tidList` 中尋找 TID 為 `*t` 的執行緒,若是有成功找到的話,就會回傳 `tid` **的地址**;若目標執行緒不存在的話,則會回傳 NULL。需要注意的是,由於 `tid` 會在執行緒結束後被修改成 0,所以這個函式中使用的是 `tidCpy`。
接著則會回到 `thread_join` 檢查回傳的執行緒的 `tid`,若是回傳 `NULL` 的話,就回傳 `ESRCH`,從 [errno 的 Man Page](https://man7.org/linux/man-pages/man3/errno.3.html) 中可以知道它表示 **"No such process"**;而若是有找到執行緒,但該執行緒的 TID 為 0 的話,則代表該執行緒已經結束或是透過 clone 建立執行緒時發生錯誤,此時會回傳 `EINVAL` 表示 **"Invalid argument"**。
接著則是透過 `while` 等待執行緒結束(`tid` 從非零變成零):
```c=13
int ret;
while (*((pid_t *) addr) == t) {
spin_release(&__globalLock);
ret = syscall(SYS_futex, addr, FUTEX_WAIT, t, NULL, NULL, 0);
spin_acquire(&__globalLock);
}
syscall(SYS_futex, addr, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
if (retLocation) {
node *insertedNode = returnCustomNode(&__tidList, t);
*retLocation = insertedNode->retVal;
}
spin_release(&__globalLock);
return ret;
}
```
而在檢查的過程中並非單純 busy waiting,而是會呼叫編號為 `SYS_futex` 的系統呼叫,
:::warning
TODO :
[futex Man Page](https://man7.org/linux/man-pages/man2/futex.2.html)
[futex2](https://www.kernel.org/doc/html/latest/userspace-api/futex2.html)
:::
:::warning
`thread_create` 跟 `thread_join` 都應該只有 caller 這個執行緒會執行到,且 `__globalLock` 也只有用在這兩個函式中,`__globalLock` 看起來是多餘的。
:::
### [uThread](https://github.com/samanbarghi/uThreads)
實作效能分析,比較 POSIX Thread (NPTL) 和自行建立的 N:1 Threading model 之間的效能落差
---
## [第二題](https://hackmd.io/@sysprog/linux2022-quiz6#%E6%B8%AC%E9%A9%97-2) - [Atomic Hash Table](https://gist.github.com/jserv/c3823ea893e08607b432827a11ec4b69)
這提要實作的部份是 `hashmap.c` 中的 `hashmap_del` 以及 `hashmap_put` 兩個函式,而這個雜湊表的資料結構定義在 `hashmap.h` 中:
```c
/* links in the linked lists that each bucket uses */
typedef struct hashmap_keyval {
struct hashmap_keyval *next;
const void *key;
void *value;
} hashmap_kv_t;
/* main hashmap struct with buckets of linked lists */
typedef struct {
hashmap_kv_t **buckets;
uint32_t n_buckets;
uint32_t length; /* total count of entries */
/* pointer to the hash and comparison functions */
uint64_t (*hash)(const void *key);
uint8_t (*cmp)(const void *x, const void *y);
/* custom memory management of internal linked lists */
void *opaque;
hashmap_kv_t *(*create_node)(void *opaque, const void *key, void *data);
void (*destroy_node)(void *opaque, hashmap_kv_t *node);
} hashmap_t;
```
是一個有 `n_buckets` 個欄位的雜湊表,每個欄位都是 `hashmap_kv_t` 組成的單向鏈結串列,由 `buckets[i]` 指向第 `i` 個欄位的首個節點。
而 `hashmap_put` 以及 `hashmap_del` 分別是對這個雜湊表新增以及刪除節點:
### hashmap_put
```c
while (true) {
/* copy the head of the list before checking entries for equality */
head = map->buckets[bucket_index];
/* find any existing matches to this key */
prev = NULL;
if (head) {
for (kv = head; kv; kv = kv->next) {
if (map->cmp(key, kv->key) == 0)
break;
prev = kv;
}
}
if (kv) { /* if the key exists, update and return it */
...
} else { /* if the key does not exist, try adding it */
...
}
}
```
不論是 `if` 還是 `else` 部份都是放在無限 `while` 迴圈中,因此 `put` 操作會執行到確實將新的 `key` 更新或是新增到雜湊表中之後才會結束,並分別回傳 `true` 或 `false` 代表是**更新相同 `key` 的節點的資料**或是**新增新的 `key` 的節點**。
而在這個迴圈中,由於新增或更新節點可能會失敗,所以每次迴圈都需要先尋找雜湊表中是否有相同 `key` 的節點存在,若有的話 `kv` 會指向該節點,否則 `kv` 則會是 `NULL`,接著就會根據結果決定要新增節點還是更新資料:
#### 新增節點(`kv == NULL`)
```c=
if (kv) {
...
} else { /* if the key does not exist, try adding it */
if (!next) /* make the next key-value pair to append */
next = map->create_node(map->opaque, key, value);
next->next = NULL;
if (head) /* make sure the reference to existing nodes is kept */
next->next = head;
/* prepend the kv-pair or lazy-make the bucket */
if (__atomic_compare_exchange(&map->buckets[bucket_index],
&head,
&next,
false,
__ATOMIC_SEQ_CST,
__ATOMIC_SEQ_CST)) {
__atomic_fetch_add(&map->length, 1, __ATOMIC_SEQ_CST);
return false;
}
/* failure means another thead updated head before this.
* track the CAS failure for tests -- non-atomic to minimize
* thread contention
*/
hashmap_put_retries += 1;
}
```
新增節點部份是在 `else` 中完成。由於新增節點可能會失敗,因此在第 4 行的 `if` 首先要檢查是否已經有新增過節點了,以免每次失敗都要將節點 `free` 掉並重新建立節點;若是還沒建立過的話則會建立一個新的節點,並將對應欄位的首位節點 `map->buckets[bucket_index]` 指向新的節點。
:::warning
TODO : map->opaque 用途
:::
而建立節點後則分成兩種情況:
- 目前欄位中不存在節點:新節點的 `next` 指向 `NULL`
- 目前欄位中有其他節點:新節點的 `next` 指向原本的 `map->buckets[bucket_index]`
:::warning
當目前欄位中不存在節點的話,`head` 即為 `NULL`,因此可以直接 `next->next = head`,不需要進行額外判斷。
:::
將新節點指向原 `head` 之後則要將新節點更新到雜湊表上,這部份由於是多執行緒的程式,所以需要透過 [`__atomic_fetch_add` 及 `__atomic_compare_exchange`](https://gcc.gnu.org/onlinedocs/gcc/_005f_005fatomic-Builtins.html) 這兩個 GCC 內建的 atomic 操作完成:
```c
bool __atomic_compare_exchange (
type *ptr,
type *expected,
type *desired,
bool weak,
int success_memorder,
int failure_memorder
)
```
這個函式會將 `ptr` 指向物件的值(`*ptr`)與 `expected` 指向的物件的值(`*expected`)進行比較,如果兩者相同的話則會依照 [read-modify-write](https://en.wikipedia.org/wiki/Read%E2%80%93modify%E2%80%93write) 的方式以及 `success_memorder` 的限制將 `*ptr` 的值修改成 `*desired` 並回傳 `true`;若不同的話則會依照 `failure_memorder` 的限制(不可為 `__ATOMIC_RELEASE` 或 `__ATOMIC_ACQ_REL`)將 `*exprected` 修改成實際讀取到的值並回傳 `false`。
:::success
`memorder` 參數的部份則可以用來限制編譯器對指令排序的最佳化處理,分別對應到[並行程式設計: Atomics 操作](https://hackmd.io/@sysprog/concurrency-atomics#%E8%BB%9F%E9%AB%94%E8%A7%80%E9%BB%9E)中介紹的 6 種類型。
:::
因此第 12 行的 `if` 判斷是用來判斷一開始讀取到的 `head` 是否仍是同一個 `head`:
- 若不同的話代表需要改將新節點 `next` 的下個節點 `next->next` 指向新的 `head`
- 若相同的話則會將 `map->buckets[bucket_index]` 指向新的節點 `next`,並透過 `__atomic_fetch_add` 修改雜湊表中的節點總數:
```c
type __atomic_fetch_add (
type *ptr,
type val,
int memorder
)
```
這個內建函式會取得 `ptr` 所指向物件的值(`*ptr`)並對其進行加法運算,最後再**回傳加法運算前的值**,相當於**以 `memorder` 指定的方式**進行以下操作:
```c
{ tmp = *ptr; *ptr += val; return tmp; }
```
比較需要注意的是,若 `*ptr` 是指標型態的話,則相當於對 `uintptr_t` 進行運算,而不是以 `sizeof(*ptr)` 作為單位進行加法運算。
#### 更新資料
```c=
if (kv) { /* if the key exists, update and return it */
if (!next) /* lazy make the next key-value pair to append */
next = map->create_node(map->opaque, key, value);
/* ensure the linked list's existing node chain persists */
next->next = kv->next;
/* CAS-update the reference in the previous node */
if (prev) {
...
} else { /* no previous link, update the head of the list */
...
}
else {
...
}
```
`if` 部份則是用來更新相同 `key` 的節點中的資料,會由**擁有新 `data` 的節點 `next`** 將**原先相同 `key` 的節點 `kv`** 替換掉。
:::warning
由於更新節點是由新節點取代原先相同 `key` 的節點,所以新增節點的 `next = map->create_node(map->opaque, key, value)` 可以在 `while` 外先做。
:::
而要替換掉 `kv` 的話,需要將 `next` 的下個節點指向 `kv->next`,並將 `prev` 的下個節點指向新的節點,但與單向鏈結串列的問題相同,要取代的節點可能是首位節點,而當 `kv` 是首位節點時 `prev` 會是 `NULL`,所以需要分成兩種狀況處理:
1. 非首位節點
```c
/* CAS-update the reference in the previous node */
if (prev) {
/* replace this link, assuming it has not changed by another
* thread
*/
if (__atomic_compare_exchange(KKKK,
&kv,
&next,
false,
__ATOMIC_SEQ_CST,
__ATOMIC_SEQ_CST)) {
/* this node, key and value are never again used by this */
map->destroy_node(map->opaque, kv);
return true;
}
hashmap_put_replace_fail += 1;
} else { /* no previous link, update the head of the list */
...
}
```
當 `kv` 是中間節點時,只需要直接將 `prev` 指向的下個節點換成新節點即可,但與新增節點一樣要考慮到其他執行緒,所以要透過 `__atomic_compare_exchange` 進行 atomic 操作來取代原節點,因此 ==KKKK== 應為 `&prev->next`。而成功取代後則需要將原本的節點 `kv` 刪除,並回傳 `true` 代表是取代原先的節點,而不是新增節點。
2. 首位節點
```c
/* CAS-update the reference in the previous node */
if (prev) {
...
} else { /* no previous link, update the head of the list */
/* set the head of the list to be whatever this node points to
* (NULL or other links)
*/
if (__atomic_compare_exchange(QQQQ,
&kv,
&next,
false,
__ATOMIC_SEQ_CST,
__ATOMIC_SEQ_CST)) {
map->destroy_node(map->opaque, kv);
return true;
}
/* failure means at least one new entry was added, retry the
* whole match/del process
*/
hashmap_put_head_fail += 1;
}
```
當沒有 `prev` 的時候,需要取代的節點則是 `map->buckets[bucket_index]` 指向的節點,因此 ==QQQQ== 應為 `&map->buckets[bucket_index]`,其他部份則與 `kv` 在中間的情況相同。
:::warning
將 prev 改成 indirect pointer 移除 branch。
:::
#### 為什麼 CAS 部份要用 `__ATOMIC_SEQ_CST`
:::warning
TODO
:::
### hashmap_del
`hashmap_del` 函式則是負責從雜湊表中移除相同 `key` 的節點:
```c
/* try to find a match, loop in case a delete attempt fails */
while (true) {
hashmap_kv_t *match, *prev = NULL;
for (match = map->buckets[bucket_index]; match; match = match->next) {
if ((*map->cmp)(key, match->key) == 0)
break;
prev = match;
}
/* exit if no match was found */
if (!match)
return false;
/* previous means this not the head but a link in the list */
if (prev) { /* try the delete but fail if another thread did delete */
if (__atomic_compare_exchange(ZZZZ, &match, &match->next,
false, __ATOMIC_SEQ_CST,
__ATOMIC_SEQ_CST)) {
__atomic_fetch_sub(&map->length, 1, __ATOMIC_SEQ_CST);
map->destroy_node(map->opaque, match);
return true;
}
hashmap_del_fail += 1;
} else { /* no previous link means this needs to leave empty bucket */
...
}
}
```
因此首先要先從對應的欄位中檢查是否有相同 `key` 的節點存在。若不存在的話就回傳 `false` 代表目標 `key` 不存在;否則嘗試從雜湊表中移除目標節點,直到成功移除或是被其他執行緒移除。
而移除節點與更新節點相同,需要透過 `prev` 檢查目標節點是否為該欄位的首位節點,因此大致上與[更新資料](#更新資料)要做的事相同,首先要透過 `__atomic_compare_exchange` 修改目標節點前一節點的 `prev` 的下個節點,因此 ==ZZZZ== 應為 `&prev->next`。
而若是成功移除的話,除了釋放原節點所佔用的空間之外,還需要負責將雜湊表節點總數減 1,而這個操作與新增節點相同,需要考慮到其他執行緒,所以要使用 atomic 操作的 `__atomic_fetch_sub` 對 `map->length` 減 1。
:::warning
indirect pointer
:::
### 設計和實作的缺失
#### `malloc`、`calloc` 沒有檢查回傳值
如 [\[Commit\]](https://github.com/freshLiver/2022q1-jserv-linux-course/commit/870750a1f040fe18ca058eabb3c2d92b3d8ca340) 中指出的,`malloc` 以及 `calloc` 後應檢查是否成功分配記憶體空間,因此使用巨集 `dma_wrap` 包裝 `malloc` 以及 `calloc`。
```c
#define dma_wrap(dma, ...) \
({ \
void *_p = dma(__VA_ARGS__); \
if (!_p) { \
printf("%s failed in %s:%d\n", #dma, __func__, __LINE__); \
exit(1); \
} \
_p; \
})
```
#### `hashmap_put_retries` 以及其他四個統計錯誤次數的 `volatile` 變數
`hashmap_put_retries`、`hashmap_put_replace_fail`、`hashmap_put_head_fail`、`hashmap_del_fail`、`hashmap_del_fail_new_head` 五個 `volatile` 變數在 [ `hashmap.c` 的註解](https://github.com/freshLiver/2022q1-jserv-linux-course/blob/2c771a2372904d73458664818a4534d9ce3a117c/quizs/2022q1/w6/q2/hashmap.c#L140) 中提到是為了減少 Thread Contention 所以未使用 atomic 運算,但考慮到程式正確性,使用了 `#ifdef` 搭配巨集在編譯時期根據巨集的定義與否來決定實作方式,實際修改則在 [\[Commit\]](https://github.com/freshLiver/2022q1-jserv-linux-course/commit/86c4e5b2b96fc6ab31cb9dcd5f7c38b210819295) 中。
```c
#ifndef THREAD_CONTENTION_MIN
__atomic_fetch_add(&hashmap_put_retries, 1, __ATOMIC_SEQ_CST);
#else
/* failure means another thead updated head before this.
* track the CAS failure for tests -- non-atomic to minimize
* thread contention
*/
hashmap_put_retries += 1;
#endif
```
#### Memory Leak
如同 [`hankluo6` 的報告中](https://hackmd.io/@hankluo6/2022q1quiz6#%E5%AF%A6%E4%BD%9C%E7%BC%BA%E5%A4%B1)提到的數個未釋放記憶體的問題:
1. 雜湊表 `map` 未被釋放
在 `test-hashmap.c` 中宣告的雜湊表物件 `map` 未在結束程式前釋放記憶體空間,因此在 `hashmap.[ch]` 中新增一個函式 `hashmap_free_later`,將雜湊表以及其中需要釋放的物件透過 `free_later()` 以及 `map->destroy_node` 加入到 free later 串列中,等待程式結束前一併釋放。
```c
void hashmap_free_later(hashmap_t *map)
{
// traverse each bucket and free their list later
for (int i = 0; i < map->n_buckets; ++i)
for (hashmap_kv_t *node = map->buckets[i]; node; node = node->next)
map->destroy_node(map->opaque, node);
free_later(map->buckets, free);
free_later(map, free);
}
```
2. `free_later_run()` 未正確釋放 free later 串列的節點
```dif
- for (list_node_t *n = buffer_prev->head; n; n = n->next) {
- free_later_t *v = n->val;
- v->free(v->var);
- free(n);
+ for (list_node_t *node = buffer_prev->head, *next; node; node = next) {
+ // keep next node's address
+ next = node->next;
+
+ // free node
+ free_later_t *var = node->val;
+ if (var->free)
+ var->free(var->var);
+ free(var);
+ free(node);
```
在原先的實作中有兩個問題:
1. `n = n->next` 會試圖存取已經被釋放掉的 `n` 使用的空間。
2. free later 串列中節點 `n` 的 `val` 在這個程式中是用來保存雜湊表的節點,而該節點是透過動態記憶體分配的空間,因此 `v` 也需要被釋放。
3. `free_later_stage()` 中的條件判斷
```c
...
if (!buffer_prev || buffer_prev->length == 0) {
release_lock(&lock);
return;
}
/* swap the buffers */
buffer_prev = buffer;
buffer = list_new();
release_lock(&lock);
}
```
一開始的條件判斷會在 `buffer_prev` 為 `NULL` 時直接結束函式,造成 `buffer` 的值無法存到 `buffer_prev` 中,因此首先需要將 `buffer` 為空以及尚未分配記憶體位置兩種情況分開處理:
- 尚未分配 `buffer` 時應建立一個串列作給 `buffer` 使用
- `buffer` 為空時則不須將當前 `buffer` 中的資料存到 `buffer_prev`,直接結束函式即可
而當 `buffer` 中有資料時,則須分成 `buffer_prev` 是否存有資料兩種情況處理,但由於 `free_later_run()` 中會檢查 `buffer_prev` 因此兩種情況可直接合併,並在最後將 `buffer` 的資料存到 `buffer_prev` 中。
```c
...
// empty buffer, nothing to do
if (!buffer)
goto newlist;
if (!buffer->length)
goto finish;
free_later_run();
buffer_prev = buffer;
newlist:
buffer = list_new();
finish:
release_lock(&lock);
}
```
除此報告中指出的部份之外,還有其他部份也可能會造成 Memory Leak:
1. `test_del` 中重新建立雜湊表:
```c
while (!hashmap_del_fail || !hashmap_del_fail_new_head) {
if (!map)
map = hashmap_new(10, cmp_uint32, hash_uint32);
/* multi-threaded add values */
if (!mt_del_vals()) {
printf("test_del() is failing. Can't complete mt_del_vals()");
return false;
}
...
```
在 `test-hashmap.c` 中會直接重新建立一個雜湊表並覆蓋掉原本的 `map`、造成原本的雜湊表未被正確釋放,因此需要在重新建立前先檢查 `map` 是否為 `NULL`。
2. `mt_add_vals` 中分配的 `offset` 未被正確釋放
在 `mt_add_vals` 函式中會建立 `N_THREADS` 個執行緒並透過動態記憶體配置空間給 `offset` 作為 `add_vals` 的參數,但 `offset` 這變數會被重複配置記憶體空間,因此需要在 `add_vals` 中釋放對應的參數 `args`:
```c
static void *add_vals(void *args)
{
int *offset = args;
for (int j = 0; j < N_LOOPS; j++) {
int *val = dma_wrap(malloc, sizeof(int));
*val = (*offset * N_LOOPS) + j;
hashmap_put(map, val, val);
}
free(args);
return NULL;
}
```
#### Invalid Free
在 `mt_del_vals` 中會先建立 `N_THREADS` 個執行緒執行 `add_val` 函式,而該函式會將 `N_LOOPS` 個 `(MAX_VAL_PLUS_ONE, MAX_VAL_PLUS_ONE)` 的 Key Value Pair 新增到雜湊表中。
```c
static uint32_t MAX_VAL_PLUS_ONE = N_THREADS * N_LOOPS + 1;
```
其中 `MAX_VAL_PLUS_ONE` 的定義如上,是宣告在 `test-hashmap.c` 中的靜態變數。然而在 `mt_del_vals` 的另外 `N_THREADS` 個執行緒執行 `del_val` 函式時,會將找到的 `(MAX_VAL_PLUS_ONE, MAX_VAL_PLUS_ONE)` 的 Key Value Pair 加入到 free later 串列中,並在程式結束前透過 `free` 釋放,造成 Invalid Free。
#### return false
### 研讀以下資料
1. [Lock-free wait-free hash table using C11 atomics 程式碼](https://github.com/ksandstr/lfht)
2. 論文 [Lock-Free Linked Lists and Skip Lists](http://www.cse.yorku.ca/~ruppert/papers/lfll.pdf)
### 參照 atomic_hash 相關程式碼,以更大規模進行測試