# 2023 Homework2 (quiz1) contributed by < `Hao-yu-lin` > [題目連結](https://hackmd.io/@sysprog/linux2023-summer-quiz1) ###### [github](https://github.com/Hao-yu-lin/linux2023-summer) --- 測驗解答 AAAA:MUTEX_SLEEPING BBBB:1 CCCC:1 DDDD:1 EEEE:futex_wake FFFF:futex_wake --- ## Overview futex [reference:Basics of Futexes](https://eli.thegreenplace.net/2018/basics-of-futexes/) futex("Fast userspace mutex") 為 userspace code 提供一種執行緒之間的同步(synchronize)同時伴隨最小程度的 linux kernel 參與。 ### Motivaiton 再引入 futexes 前,使用 syscall 來鎖定共享資源(ex:semop),隨著 programs 的 concurrent 需求增加,lock 行為在整體的執行時間佔了很大一部分,然而 lock 並沒有做任何實際的工作,它只有保證共享資源是安全的。在執行緒間很少競爭共享資源(low contention rate)的情況下,沒有其他執行緒在 wait queue 中等待 lock,但還是要從使用者層級切換到核心模式去檢查,這樣頻繁的 CPU 模式切換會對效能有負面影響,futex 是該問題解法之一。 在大部分的情形下,locks 不存在競爭關係的,如果一個執行緒遇到一個 free lock,代表在同一時間沒有其他的執行緒嘗試鎖定,可以在不使用 syscall 下調用完成,嘗試使用執行成本更低的 atomic operatoins。 然而,當另一個執行緒在同一個時間點,嘗試獲取 lock ,atomic operations 可能會失敗,此時有兩種選項。 1. busy wait : **busy-loop** using the atomic until the lock is cleared busy-loop 的方法,這完全在 userspace 中進行,但仍然會造成浪費,因為會佔用核心,並且鎖定狀態有可能保持非常一段時間。 2. syscall : **sleep** until the lock is free (or at least there's a high chance that it's free) sleep 可將 cpu 的資源讓出,但需要透過 kernel 協助,使用 futexes 來完成。 futex:wait queue(kernl space) + atomic integer(user sapce),透過 atomic integer,可以知道是否有執行緒在 wait queue 中等待。 - 無競爭(contention):不需要進行 CPU mode 切換,進到 kernel 喚醒其他執行緒或是到 wait queue 等待 - 有競爭(contention):利用 futex syscall 搭配 `FUTEX_WAIT` 和 `FUTEX_WAKE`,來喚醒其他執行緒或是到 wait queue 等待。 futex 的操作幾乎都在 userspace 完成,只有當操作結果不一致需要仲裁時,才會需要進入 kernel space 執行。這讓以 futex 為基礎的 lock 可以高效進行。 futex 在核心中藉由特製的佇列來管理執行緒或行程,可要求某個行程/執行緒 suspend 直到某個條件成立,或 signal 某個條件,來喚醒行程/執行緒。 #### futex syscall ```cpp= long syscall(SYS_futex, uint32_t *uaddr, int futex_op, uint32_t val, const struct timespec *timeout, uint32_t *uaddr2, uint32_t val3); ``` - `uint32_t *uaddr`:futex 中使用者層級 atomic integer 所存放的地址 - `int futex_op`:futex operator 1. `FUTEX_WAIT` 2. `FUTEX_WAKE` - `uint32_t val`: 1. 在 `FUTEX_WAIT` 代表預期使用者層級 atomic integer 的值 2. 在 `FUTEX_WAKE` 代表喚醒的執行緒數量 ### Simple futex use:waiting and waking [簡單小範例](https://github.com/Hao-yu-lin/linux2023-summer/blob/main/hw2/futex_simple_example/futex_example.c) `main` function 設定共享內存的相關設定後,建立一個 child process,行為如下: ##### child process - 等待 0xA 被寫入 shared_data 中 - 將 0xB 寫入 shared_data 中 ##### parent process - 將 0xA 寫入 shared_data 中 - 等待 0xB 被寫入 shared data 中 ```cpp= int main(int argc, char** argv) { int shm_id = shmget(IPC_PRIVATE, 4096, IPC_CREAT | 0666); if (shm_id < 0) { perror("shmget"); exit(1); } int* shared_data = shmat(shm_id, NULL, 0); *shared_data = 0; int forkstatus = fork(); if (forkstatus < 0) { perror("fork"); exit(1); } if (forkstatus == 0) { // Child process // // 2. printf("child waiting for A\n"); wait_on_futex_value(shared_data, 0xA); // 4. printf("child writing B\n"); // Write 0xB to the shared data and wake up parent. *shared_data = 0xB; wake_futex_blocking(shared_data); } else { // Parent process. // 1. printf("parent writing A\n"); // Write 0xA to the shared data and wake up child. *shared_data = 0xA; wake_futex_blocking(shared_data); // 3. printf("parent waiting for B\n"); wait_on_futex_value(shared_data, 0xB); // Wait for the child to terminate. wait(NULL); shmdt(shared_data); } return 0; } ``` 執行結果 ``` parent writing A child waiting for A parent waiting for B child writing B ``` ##### wait_on_futex_value 使用 futex 等待 futex_addr 有 val,只有當 futex_addr 有值時,會去檢查 futex_addr 的值是否與 val 相等,如果相等 return,否則就 abort() ```cpp= void wait_on_futex_value(int* futex_addr, int val) { while (1) { int futex_rc = futex(futex_addr, FUTEX_WAIT, val, NULL, NULL, 0); if (futex_rc == -1) { if (errno != EAGAIN) { perror("futex"); exit(1); } } else if (futex_rc == 0) { if (*futex_addr == val) { // This is a real wakeup. return; } } else { abort(); } } } ``` ##### wake_futex_blocking 一個用於喚醒的 wrapper,只有被喚醒時,才會返回。 ```cpp= void wake_futex_blocking(int* futex_addr) { while (1) { int futex_rc = futex(futex_addr, FUTEX_WAKE, 1, NULL, NULL, 0); if (futex_rc == -1) { perror("futex wake"); exit(1); } else if (futex_rc > 0) { return; } } } ``` ## 程式碼運作原理 ### futex.h `futex.h` 檔案中有三種的 futex 操作,`futex_wait`、`futex_wake` 、`futex_requeue` 使用 syscall 機制存取 Linux Kernel 內的 futex。 在 [/kernel/futex/syscalls.c](https://elixir.bootlin.com/linux/latest/source/kernel/futex/syscalls.c) 會根據不同的 `FUTEX_[xxx]_PRIVATE` 透過 `do_futex` 執行對應的 futex。 ```cpp= long do_futex(u32 __user *uaddr, int op, u32 val, ktime_t *timeout, u32 __user *uaddr2, u32 val2, u32 val3) { int cmd = op & FUTEX_CMD_MASK; unsigned int flags = 0; if (!(op & FUTEX_PRIVATE_FLAG)) flags |= FLAGS_SHARED; if (op & FUTEX_CLOCK_REALTIME) { flags |= FLAGS_CLOCKRT; if (cmd != FUTEX_WAIT_BITSET && cmd != FUTEX_WAIT_REQUEUE_PI && cmd != FUTEX_LOCK_PI2) return -ENOSYS; } switch (cmd) { case FUTEX_WAIT: val3 = FUTEX_BITSET_MATCH_ANY; fallthrough; case FUTEX_WAIT_BITSET: return futex_wait(uaddr, flags, val, timeout, val3); case FUTEX_WAKE: val3 = FUTEX_BITSET_MATCH_ANY; fallthrough; case FUTEX_WAKE_BITSET: return futex_wake(uaddr, flags, val, val3); case FUTEX_REQUEUE: return futex_requeue(uaddr, flags, uaddr2, val, val2, NULL, 0); ... } return -ENOSYS; } ``` ##### futex_wait -> [kernel/futex/waitwake.c:futex_wait](https://elixir.bootlin.com/linux/latest/source/kernel/futex/waitwake.c#L632) 在 futex.h 中的 futex_wait 實作部分,透過 syscall 方式呼叫 Linux Kernel 中的 `futex_wait` ```cpp /* futex.h */ /* Atomically check if '*futex == value', and if so, go to sleep */ static inline void futex_wait(atomic int *futex, int value) { syscall(SYS_futex, futex, FUTEX_WAIT_PRIVATE, value, NULL); } ``` 此為 Linux Kernel 中的 `futex_wait` 實作,透過 `futex_wait_setup` 檢查 uaddr 中的值(futex 值) 是否與 val 相等: - 不相同:ret < 1,執行 out 然後 return - 相同:ret = 0,代表 uaddr 有此 val 並且處於 locked狀態,放入 `futex_wait_queu` 中等待被喚醒。 ```cpp= /*kernel/futex/waitwake.c:futex_wait*/ /* futex_wait_setup: - 0 - uaddr contains val and hb has been locked; - <1 - -EFAULT or -EWOULDBLOCK (uaddr does not contain val) and hb is unlocked */ int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val, ktime_t *abs_time, u32 bitset) { ... retry: /* * Prepare to wait on uaddr. On success, it holds hb->lock and q * is initialized. */ ret = futex_wait_setup(uaddr, val, flags, &q, &hb); if (ret) goto out; /* futex_queue and wait for wakeup, timeout, or a signal. */ futex_wait_queue(hb, &q, to); /* If we were woken (and unqueued), we succeeded, whatever. */ ret = 0; if (!futex_unqueue(&q)) goto out; ret = -ETIMEDOUT; ... out: if (to) { hrtimer_cancel(&to->timer); destroy_hrtimer_on_stack(&to->timer); } return ret; } ``` ##### futex_wake -> [kernel/futex/waitwake.c:futex_wake](https://elixir.bootlin.com/linux/latest/source/kernel/futex/waitwake.c#L143) 在 futex.h 中的 futex_wake 實作部分,透過 syscall 方式呼叫 Linux Kernel 中的 `futex_wake` 喚醒 limit 個等待者。 ```cpp /* futex.h */ /* Wake up 'limit' threads currently waiting on 'futex' */ static inline void futex_wake(atomic int *futex, int limit) { syscall(SYS_futex, futex, FUTEX_WAKE_PRIVATE, limit); } ``` futex.h 中的 limit 對應 Linux kernel 中的 nr_wake,透過 `plist_for_each_entry_safe` 來決定要喚醒哪些等待者,在執行 `plist_for_each_entry_safe` 前,使用 spin_lock 方式,進行 lock 避免喚醒的等待者順序錯誤,將欲喚醒的等待者,放入 `wake_q` 中,最後透過 wake_up_q 來進行喚醒。 ```cpp= /*kernel/futex/waitwake.c:futex_wake*/ int futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset) { struct futex_hash_bucket *hb; struct futex_q *this, *next; union futex_key key = FUTEX_KEY_INIT; int ret; DEFINE_WAKE_Q(wake_q); if (!bitset) return -EINVAL; ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &key, FUTEX_READ); if (unlikely(ret != 0)) return ret; hb = futex_hash(&key); /* Make sure we really have tasks to wakeup */ if (!futex_hb_waiters_pending(hb)) return ret; spin_lock(&hb->lock); plist_for_each_entry_safe(this, next, &hb->chain, list) { if (futex_match (&this->key, &key)) { if (this->pi_state || this->rt_waiter) { ret = -EINVAL; break; } /* Check if one of the bits is set in both bitsets */ if (!(this->bitset & bitset)) continue; futex_wake_mark(&wake_q, this); if (++ret >= nr_wake) break; } } spin_unlock(&hb->lock); wake_up_q(&wake_q); return ret; } ``` ##### futex_requeue -> [kernel/futex/requeue.c:futex_requeue](https://elixir.bootlin.com/linux/latest/source/kernel/futex/requeue.c#L364) 在 futex.h 中的 futex_wake 實作部分,會最多喚醒 limit 等待者,和 wait 不同的是,如果 futex 中的等待者超過 limit 個, 則超過的部分,會在對應的 wait queue 中等待,在 syscall 中的 INT_MAX 代表可以從 futex 中轉移到 wait queue 的最大數量。 ```cpp /* futex.h */ /* Wake up 'limit' waiters, and re-queue the rest onto a different futex */ static inline void futex_requeue(atomic int *futex, int limit, atomic int *other) { syscall(SYS_futex, futex, FUTEX_REQUEUE_PRIVATE, limit, INT_MAX, other); } ``` 根據 [futex(2)](https://man7.org/linux/man-pages/man2/futex.2.html) 的敘述,futex 會一次喚醒 uaddr1 中的所有等待者,如果超過 nr_wake 個等待者時,會將剩餘的且數量不超過 nr_requeue 等待者放入 uaddr2 的 wait queue 中,這樣的做法是避免 thundering herd(驚群效應)。 固定設定的參數 nr_wake = 1, nr_requeue = INT_MAX,代表只會喚醒一個執行緒去競爭 uaddr2 對應的 futex,剩餘的放入 uaddr2 對應的 wait queue ### mutex.h mutex.h 中有三種 lock 操作,`mutex_trylock`、`mutex_lock`、`mutex_unlock` ##### mutex_trylock 先用 `atomic_load` 取得 mutex 狀態 - 如果 mutex 目前為 lock 則返回 false。 - 否則,透過 `fetch_or` 對 mutex 進行上鎖。 對 mutex 進行 `atomic_fetch_or_explicit` ,此時 mutex->state 與 MUTEX_LOCKED 進行 or,並返回 mutex->state 的舊值。 透過這樣的方式,在上鎖時經由下一行(8)同步確認是否已經被其他人給上鎖,如果已經被上鎖,則此次行為是失敗的回傳 false。 當上鎖成功後,透過 `atomic_thread_fence` 進行 mutex 狀態的同步。 ```cpp= static bool mutex_trylock(mutex_t *mutex) { int state = load(&mutex->state, relaxed); if (state & MUTEX_LOCKED) return false; state = fetch_or(&mutex->state, MUTEX_LOCKED, relaxed); if (state & MUTEX_LOCKED) return false; thread_fence(&mutex->state, acquire); return true; } ``` ##### mutex_lock `mutex_lock` 在最一開始的 for loop,不斷地透過 `mutex_trylock` 來持有鎖。 如果一直得不到鎖,則透過 `atomic_exchange_explicit` 將 state 狀態改成 `MUTEX_LOCKED | MUTEX_SLEEPING`,並回傳先前的 state 值。 ```cpp= static inline void mutex_lock(mutex_t *mutex) { #define MUTEX_SPINS 128 for (int i = 0; i < MUTEX_SPINS; ++i) { if (mutex_trylock(mutex)) return; spin_hint(); } int state = exchange(&mutex->state, MUTEX_LOCKED | MUTEX_SLEEPING, relaxed); while (state & MUTEX_LOCKED) { futex_wait(&mutex->state, MUTEX_LOCKED | MUTEX_SLEEPING); state = exchange(&mutex->state, MUTEX_LOCKED | MUTEX_SLEEPING, relaxed); } thread_fence(&mutex->state, acquire); } ``` ##### mutex_unlock 將 mutex->state 設定為 0,如果上鎖時有等待者, 則透過 futex_wake 來喚醒等待者。 ```cpp= static inline void mutex_unlock(mutex_t *mutex) { int state = exchange(&mutex->state, 0, release); if (state & MUTEX_SLEEPING) futex_wake(&mutex->state, 1); // FFFF } ``` ### cond.h cond.h 有三種操作,`cond_wait`、`cond_signal`、`cond_broadcast` ##### cond_wait 透過 load 方式獲取 cond->seq,透過 mutex 方式,避免 futex 系統呼叫的可能性,在有限次數上,不斷地監看 cond->seq 的值改變。 - 若 cond->seq 發生改變,則進行上鎖。 - 若嘗試 `COND_SPINS` 次後,仍然失敗,則回到 `futex_wait` 進行等待。 隨後進入 while loop 每次會檢查狀態是否上鎖,如果上鎖,會透過 futex_wait 放入 wait queue 中等待。 >參考 RinHizakura 同學的 >在 fetch_or 地方,對於 mutex->state 添加 `MUTEX_SLEEPING` ,是為了在之後 `mutex_unlock` 時都能確保呼叫 `futex_wake` > ```cpp= static inline void cond_wait(cond_t *cond, mutex_t *mutex) { int seq = load(&cond->seq, relaxed); mutex_unlock(mutex); #define COND_SPINS 128 for (int i = 0; i < COND_SPINS; ++i) { if (load(&cond->seq, relaxed) != seq) { mutex_lock(mutex); return; } spin_hint(); } futex_wait(&cond->seq, seq); mutex_lock(mutex); fetch_or(&mutex->state, MUTEX_SLEEPING, relaxed); // AAAAA } ``` ##### cond_signal 將 cond->seq 進行 + 1後,將對應的 cond->seq 給喚醒 ```cpp= static inline void cond_signal(cond_t *cond, mutex_t *mutex) { fetch_add(&cond->seq, 1, relaxed); // BBBB futex_wake(&cond->seq, 1); } ``` ##### cond_broadcast 對 cond->seq 進行 + 1 後,透過 futex_requee 喚醒等待者,進行查看 ```cpp= static inline void cond_broadcast(cond_t *cond, mutex_t *mutex) { fetch_add(&cond->seq, 1, relaxed); // CCCC futex_requeue(&cond->seq, 1, &mutex->state); // DDDD } ``` ### main.c 有兩個 struct node 與 clock - clock:內部紀錄 ticks,作為時間點的判斷,每一個 thread 接會根據 ticks 的值而作相應的行爲 - node:為工作節點,建立後轉交給 thread 進行操作,一個 thread 負責一個 node 皆需要 conditional variable 和 mutex 機制,保護 clock->ticks 與 node->ready ```cpp= struct clock { mutex_t mutex; cond_t cond; int ticks; }; /* A node in a computation graph */ struct node { struct clock *clock; struct node *parent; mutex_t mutex; cond_t cond; bool ready; }; ``` #### clock clock 有三個操作 `clock_wait`、`clock_tick`、`clock_stop` ##### clock_wait 使用 mutex 進行上鎖,藉此檢查 clock->ticks 的值,如果 clock->ticks 小於 ticks 時,則進入 `cond_wait` 進行監看 ticks 是否改變 ```cpp= static bool clock_wait(struct clock *clock, int ticks) { mutex_lock(&clock->mutex); while (clock->ticks >= 0 && clock->ticks < ticks) cond_wait(&clock->cond, &clock->mutex); bool ret = clock->ticks >= ticks; mutex_unlock(&clock->mutex); return ret; } ``` ##### clock_tick 使用 mutex 進行上鎖,對於 ticks 進行 + 1,之後透過 cond_broadcast 進行廣播告知 clock->ticks 已經更新 ```cpp= static void clock_tick(struct clock *clock) { mutex_lock(&clock->mutex); if (clock->ticks >= 0) ++clock->ticks; mutex_unlock(&clock->mutex); cond_broadcast(&clock->cond, &clock->mutex); } ``` ##### clock_stop 使用 mutex 進行上鎖,對於 ticks 設定為 - 1,使得 clock->ticks 的累加停止,之後透過 cond_broadcast 進行廣播告知 clock->ticks 已經更新 ```cpp= static void clock_stop(struct clock *clock) { mutex_lock(&clock->mutex); clock->ticks = -1; mutex_unlock(&clock->mutex); cond_broadcast(&clock->cond, &clock->mutex); } ``` #### node clock 有二個操作 `node_wait`、`node_signal` ,該 node 有幾個特性: 1. 共享同一個 clock->ticks 2. $n_i$ 的 parent 為 $n_{i-1}$ 3. 有 condtion variable + mutex 機制 ##### node_wait 檢查 node 的 ready 狀態,若為 false,則進入 cond_wait 進行等待 ```cpp= static void node_wait(struct node *node) { mutex_lock(&node->mutex); while (!node->ready) cond_wait(&node->cond, &node->mutex); node->ready = false; mutex_unlock(&node->mutex); } ``` ##### node_signal 將 ready 設定為 true 後,透過 cond_signal 喚醒等待者。 ```cpp= static void node_signal(struct node *node) { mutex_lock(&node->mutex); node->ready = true; mutex_unlock(&node->mutex); cond_signal(&node->cond, &node->mutex); } ``` #### main main function 如下,建立一個 clock,並新增 16 個 nodes ($n_0 ~ n_{15}$) 對於每一個 node 均建立一個 thread,並透過 `thread_func` 進行測試。 ```cpp= int main(void) { struct clock clock; clock_init(&clock); #define N_NODES 16 struct node nodes[N_NODES]; node_init(&clock, NULL, &nodes[0]); for (int i = 1; i < N_NODES; ++i) node_init(&clock, &nodes[i - 1], &nodes[i]); pthread_t threads[N_NODES]; for (int i = 0; i < N_NODES; ++i) { if (pthread_create(&threads[i], NULL, thread_func, &nodes[i]) != 0) return EXIT_FAILURE; } clock_tick(&clock); clock_wait(&clock, 1 << N_NODES); clock_stop(&clock); for (int i = 0; i < N_NODES; ++i) { if (pthread_join(threads[i], NULL) != 0) return EXIT_FAILURE; } return EXIT_SUCCESS; } ``` 在建立 thread 後,透過 `clock_tick`、`clock_wait`、`clock_stop` 進行操作。 透過呼叫第一個 `clock_tick` 使得 ticks變為 1,藉此讓其他的 thread 可以開始根據 tick 逐步進行。 `clock_wait` 會一直等待 tick 變成 1 << N_NODES 後再透過 `clock_stop` 來讓 node 的 thread 結束 ```cpp= clock_tick(&clock); clock_wait(&clock, 1 << N_NODES); clock_stop(&clock); ``` ##### thread_func thread 在 clock->tick 小於 i 時,會進行等待,開始執行後,透過 `node_wait` 檢查該 node 的 parent 是否 ready,接著若 bit 是 false 則執行 `clock_tick` 進行 tick ++,若為 true,則透過 node_signal 喚醒下一個 thread。 ```cpp= static void *thread_func(void *ptr) { struct node *self = ptr; bool bit = false; for (int i = 1; clock_wait(self->clock, i); ++i) { if (self->parent) node_wait(self->parent); if (bit) { node_signal(self); } else { clock_tick(self->clock); } bit = !bit; } node_signal(self); return NULL; } ``` ## 延伸問題 - 2 修改第 1 次作業的測驗 γ 提供的並行版本快速排序法實作,使其得以搭配上述 futex 程式碼運作 [修改部分](https://github.com/Hao-yu-lin/linux2023-summer/blob/main/hw2/qsort/qsort_futex.c) 修改方法並沒有很複雜 #### struct 部分 將 `pthread_mutex_t` 以及 `pthread_cond_t` 修改成 `mutex_t` 與 `cond_t` ```cpp= /* Variant part passed to qsort invocations. */ struct qsort { enum thread_state st; /* For coordinating work. */ struct common *common; /* Common shared elements. */ void *a; /* Array base. */ size_t n; /* Number of elements. */ pthread_t id; /* Thread id. */ // pthread_mutex_t mtx_st; /* For signalling state change. */ // pthread_cond_t cond_st; /* For signalling state change. */ mutex_t fmutex_st; cond_t fcond_st; }; /* Invariant common part, shared across invocations. */ struct common { int swaptype; /* Code to use for swapping */ size_t es; /* Element size. */ void *thunk; /* Thunk for qsort_r */ cmp_t *cmp; /* Comparison function */ int nthreads; /* Total number of pool threads. */ int idlethreads; /* Number of idle threads in pool. */ int forkelem; /* Minimum number of elements for a new thread. */ struct qsort *pool; /* Fixed pool of threads. */ // pthread_mutex_t mtx_al; /* For allocating threads in the pool. */ mutex_t fmutex_al; }; ``` 在後續的程式部分,更換如下 - `pthread_mutex_init` -> `mutex_init` - `pthread_mutex_lock` -> `mutex_lock` - `pthread_mutex_unlock` -> `mutex_unlock` - `pthread_cond_init` -> `cond_init` - `pthread_cond_signal` -> `cond_signal` - `pthread_cond_wait` -> `cond_wait` 下圖為 第 1 次作業的測驗 γ (mt 橘色)與修改成 futex 版本(ft 藍色),x 軸為 thread 數量,y 軸為執行時間。 每次新增 thread 會重複執行 100 次取平均時間,最後結果如圖,由圖可知換成 futex 後整體執行時間增加很多,且隨著 thread 增加執行時間有越長的趨勢。 ![qsort](https://hackmd.io/_uploads/S1H2LlMCh.png) 後來發現是我編譯時,多加了-fsanitize=thread。 正確的 ``` gcc -Wall -DUSE_LINUX -o qsort_futex qsort_futex.c -lpthread ``` 經過修正後如圖 ![](https://hackmd.io/_uploads/HkWIgkDAn.png)