# 2023 Homework2 (quiz1)
contributed by < `Hao-yu-lin` >
[題目連結](https://hackmd.io/@sysprog/linux2023-summer-quiz1)
###### [github](https://github.com/Hao-yu-lin/linux2023-summer)
---
測驗解答
AAAA:MUTEX_SLEEPING
BBBB:1
CCCC:1
DDDD:1
EEEE:futex_wake
FFFF:futex_wake
---
## Overview futex
[reference:Basics of Futexes](https://eli.thegreenplace.net/2018/basics-of-futexes/)
futex("Fast userspace mutex") 為 userspace code 提供一種執行緒之間的同步(synchronize)同時伴隨最小程度的 linux kernel 參與。
### Motivaiton
再引入 futexes 前,使用 syscall 來鎖定共享資源(ex:semop),隨著 programs 的 concurrent 需求增加,lock 行為在整體的執行時間佔了很大一部分,然而 lock 並沒有做任何實際的工作,它只有保證共享資源是安全的。在執行緒間很少競爭共享資源(low contention rate)的情況下,沒有其他執行緒在 wait queue 中等待 lock,但還是要從使用者層級切換到核心模式去檢查,這樣頻繁的 CPU 模式切換會對效能有負面影響,futex 是該問題解法之一。
在大部分的情形下,locks 不存在競爭關係的,如果一個執行緒遇到一個 free lock,代表在同一時間沒有其他的執行緒嘗試鎖定,可以在不使用 syscall 下調用完成,嘗試使用執行成本更低的 atomic operatoins。
然而,當另一個執行緒在同一個時間點,嘗試獲取 lock ,atomic operations 可能會失敗,此時有兩種選項。
1. busy wait : **busy-loop** using the atomic until the lock is cleared
busy-loop 的方法,這完全在 userspace 中進行,但仍然會造成浪費,因為會佔用核心,並且鎖定狀態有可能保持非常一段時間。
2. syscall : **sleep** until the lock is free (or at least there's a high chance that it's free)
sleep 可將 cpu 的資源讓出,但需要透過 kernel 協助,使用 futexes 來完成。
futex:wait queue(kernl space) + atomic integer(user sapce),透過 atomic integer,可以知道是否有執行緒在 wait queue 中等待。
- 無競爭(contention):不需要進行 CPU mode 切換,進到 kernel 喚醒其他執行緒或是到 wait queue 等待
- 有競爭(contention):利用 futex syscall 搭配 `FUTEX_WAIT` 和 `FUTEX_WAKE`,來喚醒其他執行緒或是到 wait queue 等待。
futex 的操作幾乎都在 userspace 完成,只有當操作結果不一致需要仲裁時,才會需要進入 kernel space 執行。這讓以 futex 為基礎的 lock 可以高效進行。
futex 在核心中藉由特製的佇列來管理執行緒或行程,可要求某個行程/執行緒 suspend 直到某個條件成立,或 signal 某個條件,來喚醒行程/執行緒。
#### futex syscall
```cpp=
long syscall(SYS_futex, uint32_t *uaddr, int futex_op, uint32_t val,
const struct timespec *timeout,
uint32_t *uaddr2, uint32_t val3);
```
- `uint32_t *uaddr`:futex 中使用者層級 atomic integer 所存放的地址
- `int futex_op`:futex operator
1. `FUTEX_WAIT`
2. `FUTEX_WAKE`
- `uint32_t val`:
1. 在 `FUTEX_WAIT` 代表預期使用者層級 atomic integer 的值
2. 在 `FUTEX_WAKE` 代表喚醒的執行緒數量
### Simple futex use:waiting and waking
[簡單小範例](https://github.com/Hao-yu-lin/linux2023-summer/blob/main/hw2/futex_simple_example/futex_example.c)
`main` function 設定共享內存的相關設定後,建立一個 child process,行為如下:
##### child process
- 等待 0xA 被寫入 shared_data 中
- 將 0xB 寫入 shared_data 中
##### parent process
- 將 0xA 寫入 shared_data 中
- 等待 0xB 被寫入 shared data 中
```cpp=
int main(int argc, char** argv) {
int shm_id = shmget(IPC_PRIVATE, 4096, IPC_CREAT | 0666);
if (shm_id < 0) {
perror("shmget");
exit(1);
}
int* shared_data = shmat(shm_id, NULL, 0);
*shared_data = 0;
int forkstatus = fork();
if (forkstatus < 0) {
perror("fork");
exit(1);
}
if (forkstatus == 0) {
// Child process
//
// 2.
printf("child waiting for A\n");
wait_on_futex_value(shared_data, 0xA);
// 4.
printf("child writing B\n");
// Write 0xB to the shared data and wake up parent.
*shared_data = 0xB;
wake_futex_blocking(shared_data);
} else {
// Parent process.
// 1.
printf("parent writing A\n");
// Write 0xA to the shared data and wake up child.
*shared_data = 0xA;
wake_futex_blocking(shared_data);
// 3.
printf("parent waiting for B\n");
wait_on_futex_value(shared_data, 0xB);
// Wait for the child to terminate.
wait(NULL);
shmdt(shared_data);
}
return 0;
}
```
執行結果
```
parent writing A
child waiting for A
parent waiting for B
child writing B
```
##### wait_on_futex_value
使用 futex 等待 futex_addr 有 val,只有當 futex_addr 有值時,會去檢查 futex_addr 的值是否與 val 相等,如果相等 return,否則就 abort()
```cpp=
void wait_on_futex_value(int* futex_addr, int val) {
while (1) {
int futex_rc = futex(futex_addr, FUTEX_WAIT, val, NULL, NULL, 0);
if (futex_rc == -1) {
if (errno != EAGAIN) {
perror("futex");
exit(1);
}
} else if (futex_rc == 0) {
if (*futex_addr == val) {
// This is a real wakeup.
return;
}
} else {
abort();
}
}
}
```
##### wake_futex_blocking
一個用於喚醒的 wrapper,只有被喚醒時,才會返回。
```cpp=
void wake_futex_blocking(int* futex_addr) {
while (1) {
int futex_rc = futex(futex_addr, FUTEX_WAKE, 1, NULL, NULL, 0);
if (futex_rc == -1) {
perror("futex wake");
exit(1);
} else if (futex_rc > 0) {
return;
}
}
}
```
## 程式碼運作原理
### futex.h
`futex.h` 檔案中有三種的 futex 操作,`futex_wait`、`futex_wake` 、`futex_requeue` 使用 syscall 機制存取 Linux Kernel 內的 futex。
在 [/kernel/futex/syscalls.c](https://elixir.bootlin.com/linux/latest/source/kernel/futex/syscalls.c) 會根據不同的 `FUTEX_[xxx]_PRIVATE` 透過 `do_futex` 執行對應的 futex。
```cpp=
long do_futex(u32 __user *uaddr, int op, u32 val, ktime_t *timeout,
u32 __user *uaddr2, u32 val2, u32 val3)
{
int cmd = op & FUTEX_CMD_MASK;
unsigned int flags = 0;
if (!(op & FUTEX_PRIVATE_FLAG))
flags |= FLAGS_SHARED;
if (op & FUTEX_CLOCK_REALTIME) {
flags |= FLAGS_CLOCKRT;
if (cmd != FUTEX_WAIT_BITSET && cmd != FUTEX_WAIT_REQUEUE_PI &&
cmd != FUTEX_LOCK_PI2)
return -ENOSYS;
}
switch (cmd) {
case FUTEX_WAIT:
val3 = FUTEX_BITSET_MATCH_ANY;
fallthrough;
case FUTEX_WAIT_BITSET:
return futex_wait(uaddr, flags, val, timeout, val3);
case FUTEX_WAKE:
val3 = FUTEX_BITSET_MATCH_ANY;
fallthrough;
case FUTEX_WAKE_BITSET:
return futex_wake(uaddr, flags, val, val3);
case FUTEX_REQUEUE:
return futex_requeue(uaddr, flags, uaddr2, val, val2, NULL, 0);
...
}
return -ENOSYS;
}
```
##### futex_wait -> [kernel/futex/waitwake.c:futex_wait](https://elixir.bootlin.com/linux/latest/source/kernel/futex/waitwake.c#L632)
在 futex.h 中的 futex_wait 實作部分,透過 syscall 方式呼叫 Linux Kernel 中的 `futex_wait`
```cpp
/* futex.h */
/* Atomically check if '*futex == value', and if so, go to sleep */
static inline void futex_wait(atomic int *futex, int value)
{
syscall(SYS_futex, futex, FUTEX_WAIT_PRIVATE, value, NULL);
}
```
此為 Linux Kernel 中的 `futex_wait` 實作,透過 `futex_wait_setup` 檢查 uaddr 中的值(futex 值) 是否與 val 相等:
- 不相同:ret < 1,執行 out 然後 return
- 相同:ret = 0,代表 uaddr 有此 val 並且處於 locked狀態,放入 `futex_wait_queu` 中等待被喚醒。
```cpp=
/*kernel/futex/waitwake.c:futex_wait*/
/*
futex_wait_setup:
- 0 - uaddr contains val and hb has been locked;
- <1 - -EFAULT or -EWOULDBLOCK (uaddr does not contain val) and hb is unlocked
*/
int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val, ktime_t *abs_time, u32 bitset)
{
...
retry:
/*
* Prepare to wait on uaddr. On success, it holds hb->lock and q
* is initialized.
*/
ret = futex_wait_setup(uaddr, val, flags, &q, &hb);
if (ret)
goto out;
/* futex_queue and wait for wakeup, timeout, or a signal. */
futex_wait_queue(hb, &q, to);
/* If we were woken (and unqueued), we succeeded, whatever. */
ret = 0;
if (!futex_unqueue(&q))
goto out;
ret = -ETIMEDOUT;
...
out:
if (to) {
hrtimer_cancel(&to->timer);
destroy_hrtimer_on_stack(&to->timer);
}
return ret;
}
```
##### futex_wake -> [kernel/futex/waitwake.c:futex_wake](https://elixir.bootlin.com/linux/latest/source/kernel/futex/waitwake.c#L143)
在 futex.h 中的 futex_wake 實作部分,透過 syscall 方式呼叫 Linux Kernel 中的 `futex_wake` 喚醒 limit 個等待者。
```cpp
/* futex.h */
/* Wake up 'limit' threads currently waiting on 'futex' */
static inline void futex_wake(atomic int *futex, int limit)
{
syscall(SYS_futex, futex, FUTEX_WAKE_PRIVATE, limit);
}
```
futex.h 中的 limit 對應 Linux kernel 中的 nr_wake,透過 `plist_for_each_entry_safe` 來決定要喚醒哪些等待者,在執行 `plist_for_each_entry_safe` 前,使用 spin_lock 方式,進行 lock 避免喚醒的等待者順序錯誤,將欲喚醒的等待者,放入 `wake_q` 中,最後透過 wake_up_q 來進行喚醒。
```cpp=
/*kernel/futex/waitwake.c:futex_wake*/
int futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
{
struct futex_hash_bucket *hb;
struct futex_q *this, *next;
union futex_key key = FUTEX_KEY_INIT;
int ret;
DEFINE_WAKE_Q(wake_q);
if (!bitset)
return -EINVAL;
ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &key, FUTEX_READ);
if (unlikely(ret != 0))
return ret;
hb = futex_hash(&key);
/* Make sure we really have tasks to wakeup */
if (!futex_hb_waiters_pending(hb))
return ret;
spin_lock(&hb->lock);
plist_for_each_entry_safe(this, next, &hb->chain, list) {
if (futex_match (&this->key, &key)) {
if (this->pi_state || this->rt_waiter) {
ret = -EINVAL;
break;
}
/* Check if one of the bits is set in both bitsets */
if (!(this->bitset & bitset))
continue;
futex_wake_mark(&wake_q, this);
if (++ret >= nr_wake)
break;
}
}
spin_unlock(&hb->lock);
wake_up_q(&wake_q);
return ret;
}
```
##### futex_requeue -> [kernel/futex/requeue.c:futex_requeue](https://elixir.bootlin.com/linux/latest/source/kernel/futex/requeue.c#L364)
在 futex.h 中的 futex_wake 實作部分,會最多喚醒 limit 等待者,和 wait 不同的是,如果 futex 中的等待者超過 limit 個,
則超過的部分,會在對應的 wait queue 中等待,在 syscall 中的 INT_MAX 代表可以從 futex 中轉移到 wait queue 的最大數量。
```cpp
/* futex.h */
/* Wake up 'limit' waiters, and re-queue the rest onto a different futex */
static inline void futex_requeue(atomic int *futex,
int limit,
atomic int *other)
{
syscall(SYS_futex, futex, FUTEX_REQUEUE_PRIVATE, limit, INT_MAX, other);
}
```
根據 [futex(2)](https://man7.org/linux/man-pages/man2/futex.2.html) 的敘述,futex 會一次喚醒 uaddr1 中的所有等待者,如果超過 nr_wake 個等待者時,會將剩餘的且數量不超過 nr_requeue 等待者放入 uaddr2 的 wait queue 中,這樣的做法是避免 thundering herd(驚群效應)。
固定設定的參數 nr_wake = 1, nr_requeue = INT_MAX,代表只會喚醒一個執行緒去競爭 uaddr2 對應的 futex,剩餘的放入 uaddr2 對應的 wait queue
### mutex.h
mutex.h 中有三種 lock 操作,`mutex_trylock`、`mutex_lock`、`mutex_unlock`
##### mutex_trylock
先用 `atomic_load` 取得 mutex 狀態
- 如果 mutex 目前為 lock 則返回 false。
- 否則,透過 `fetch_or` 對 mutex 進行上鎖。
對 mutex 進行 `atomic_fetch_or_explicit` ,此時 mutex->state 與 MUTEX_LOCKED 進行 or,並返回 mutex->state 的舊值。
透過這樣的方式,在上鎖時經由下一行(8)同步確認是否已經被其他人給上鎖,如果已經被上鎖,則此次行為是失敗的回傳 false。
當上鎖成功後,透過 `atomic_thread_fence` 進行 mutex 狀態的同步。
```cpp=
static bool mutex_trylock(mutex_t *mutex)
{
int state = load(&mutex->state, relaxed);
if (state & MUTEX_LOCKED)
return false;
state = fetch_or(&mutex->state, MUTEX_LOCKED, relaxed);
if (state & MUTEX_LOCKED)
return false;
thread_fence(&mutex->state, acquire);
return true;
}
```
##### mutex_lock
`mutex_lock` 在最一開始的 for loop,不斷地透過 `mutex_trylock` 來持有鎖。
如果一直得不到鎖,則透過 `atomic_exchange_explicit` 將 state 狀態改成 `MUTEX_LOCKED | MUTEX_SLEEPING`,並回傳先前的 state 值。
```cpp=
static inline void mutex_lock(mutex_t *mutex)
{
#define MUTEX_SPINS 128
for (int i = 0; i < MUTEX_SPINS; ++i) {
if (mutex_trylock(mutex))
return;
spin_hint();
}
int state = exchange(&mutex->state, MUTEX_LOCKED | MUTEX_SLEEPING, relaxed);
while (state & MUTEX_LOCKED) {
futex_wait(&mutex->state, MUTEX_LOCKED | MUTEX_SLEEPING);
state = exchange(&mutex->state, MUTEX_LOCKED | MUTEX_SLEEPING, relaxed);
}
thread_fence(&mutex->state, acquire);
}
```
##### mutex_unlock
將 mutex->state 設定為 0,如果上鎖時有等待者,
則透過 futex_wake 來喚醒等待者。
```cpp=
static inline void mutex_unlock(mutex_t *mutex)
{
int state = exchange(&mutex->state, 0, release);
if (state & MUTEX_SLEEPING)
futex_wake(&mutex->state, 1); // FFFF
}
```
### cond.h
cond.h 有三種操作,`cond_wait`、`cond_signal`、`cond_broadcast`
##### cond_wait
透過 load 方式獲取 cond->seq,透過 mutex 方式,避免 futex 系統呼叫的可能性,在有限次數上,不斷地監看 cond->seq 的值改變。
- 若 cond->seq 發生改變,則進行上鎖。
- 若嘗試 `COND_SPINS` 次後,仍然失敗,則回到 `futex_wait` 進行等待。
隨後進入 while loop 每次會檢查狀態是否上鎖,如果上鎖,會透過 futex_wait 放入 wait queue 中等待。
>參考 RinHizakura 同學的
>在 fetch_or 地方,對於 mutex->state 添加 `MUTEX_SLEEPING` ,是為了在之後 `mutex_unlock` 時都能確保呼叫 `futex_wake`
>
```cpp=
static inline void cond_wait(cond_t *cond, mutex_t *mutex)
{
int seq = load(&cond->seq, relaxed);
mutex_unlock(mutex);
#define COND_SPINS 128
for (int i = 0; i < COND_SPINS; ++i) {
if (load(&cond->seq, relaxed) != seq) {
mutex_lock(mutex);
return;
}
spin_hint();
}
futex_wait(&cond->seq, seq);
mutex_lock(mutex);
fetch_or(&mutex->state, MUTEX_SLEEPING, relaxed); // AAAAA
}
```
##### cond_signal
將 cond->seq 進行 + 1後,將對應的 cond->seq 給喚醒
```cpp=
static inline void cond_signal(cond_t *cond, mutex_t *mutex)
{
fetch_add(&cond->seq, 1, relaxed); // BBBB
futex_wake(&cond->seq, 1);
}
```
##### cond_broadcast
對 cond->seq 進行 + 1 後,透過 futex_requee 喚醒等待者,進行查看
```cpp=
static inline void cond_broadcast(cond_t *cond, mutex_t *mutex)
{
fetch_add(&cond->seq, 1, relaxed); // CCCC
futex_requeue(&cond->seq, 1, &mutex->state); // DDDD
}
```
### main.c
有兩個 struct node 與 clock
- clock:內部紀錄 ticks,作為時間點的判斷,每一個 thread 接會根據 ticks 的值而作相應的行爲
- node:為工作節點,建立後轉交給 thread 進行操作,一個 thread 負責一個 node
皆需要 conditional variable 和 mutex 機制,保護 clock->ticks 與 node->ready
```cpp=
struct clock {
mutex_t mutex;
cond_t cond;
int ticks;
};
/* A node in a computation graph */
struct node {
struct clock *clock;
struct node *parent;
mutex_t mutex;
cond_t cond;
bool ready;
};
```
#### clock
clock 有三個操作 `clock_wait`、`clock_tick`、`clock_stop`
##### clock_wait
使用 mutex 進行上鎖,藉此檢查 clock->ticks 的值,如果 clock->ticks 小於 ticks 時,則進入 `cond_wait` 進行監看 ticks 是否改變
```cpp=
static bool clock_wait(struct clock *clock, int ticks)
{
mutex_lock(&clock->mutex);
while (clock->ticks >= 0 && clock->ticks < ticks)
cond_wait(&clock->cond, &clock->mutex);
bool ret = clock->ticks >= ticks;
mutex_unlock(&clock->mutex);
return ret;
}
```
##### clock_tick
使用 mutex 進行上鎖,對於 ticks 進行 + 1,之後透過 cond_broadcast 進行廣播告知 clock->ticks 已經更新
```cpp=
static void clock_tick(struct clock *clock)
{
mutex_lock(&clock->mutex);
if (clock->ticks >= 0)
++clock->ticks;
mutex_unlock(&clock->mutex);
cond_broadcast(&clock->cond, &clock->mutex);
}
```
##### clock_stop
使用 mutex 進行上鎖,對於 ticks 設定為 - 1,使得 clock->ticks 的累加停止,之後透過 cond_broadcast 進行廣播告知 clock->ticks 已經更新
```cpp=
static void clock_stop(struct clock *clock)
{
mutex_lock(&clock->mutex);
clock->ticks = -1;
mutex_unlock(&clock->mutex);
cond_broadcast(&clock->cond, &clock->mutex);
}
```
#### node
clock 有二個操作 `node_wait`、`node_signal` ,該 node 有幾個特性:
1. 共享同一個 clock->ticks
2. $n_i$ 的 parent 為 $n_{i-1}$
3. 有 condtion variable + mutex 機制
##### node_wait
檢查 node 的 ready 狀態,若為 false,則進入 cond_wait 進行等待
```cpp=
static void node_wait(struct node *node)
{
mutex_lock(&node->mutex);
while (!node->ready)
cond_wait(&node->cond, &node->mutex);
node->ready = false;
mutex_unlock(&node->mutex);
}
```
##### node_signal
將 ready 設定為 true 後,透過 cond_signal 喚醒等待者。
```cpp=
static void node_signal(struct node *node)
{
mutex_lock(&node->mutex);
node->ready = true;
mutex_unlock(&node->mutex);
cond_signal(&node->cond, &node->mutex);
}
```
#### main
main function 如下,建立一個 clock,並新增 16 個 nodes ($n_0 ~ n_{15}$)
對於每一個 node 均建立一個 thread,並透過 `thread_func` 進行測試。
```cpp=
int main(void)
{
struct clock clock;
clock_init(&clock);
#define N_NODES 16
struct node nodes[N_NODES];
node_init(&clock, NULL, &nodes[0]);
for (int i = 1; i < N_NODES; ++i)
node_init(&clock, &nodes[i - 1], &nodes[i]);
pthread_t threads[N_NODES];
for (int i = 0; i < N_NODES; ++i) {
if (pthread_create(&threads[i], NULL, thread_func, &nodes[i]) != 0)
return EXIT_FAILURE;
}
clock_tick(&clock);
clock_wait(&clock, 1 << N_NODES);
clock_stop(&clock);
for (int i = 0; i < N_NODES; ++i) {
if (pthread_join(threads[i], NULL) != 0)
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
```
在建立 thread 後,透過 `clock_tick`、`clock_wait`、`clock_stop` 進行操作。
透過呼叫第一個 `clock_tick` 使得 ticks變為 1,藉此讓其他的 thread 可以開始根據 tick 逐步進行。
`clock_wait` 會一直等待 tick 變成 1 << N_NODES 後再透過 `clock_stop` 來讓 node 的 thread 結束
```cpp=
clock_tick(&clock);
clock_wait(&clock, 1 << N_NODES);
clock_stop(&clock);
```
##### thread_func
thread 在 clock->tick 小於 i 時,會進行等待,開始執行後,透過 `node_wait` 檢查該 node 的 parent 是否 ready,接著若 bit 是 false 則執行 `clock_tick` 進行 tick ++,若為 true,則透過 node_signal 喚醒下一個 thread。
```cpp=
static void *thread_func(void *ptr)
{
struct node *self = ptr;
bool bit = false;
for (int i = 1; clock_wait(self->clock, i); ++i) {
if (self->parent)
node_wait(self->parent);
if (bit) {
node_signal(self);
} else {
clock_tick(self->clock);
}
bit = !bit;
}
node_signal(self);
return NULL;
}
```
## 延伸問題 - 2
修改第 1 次作業的測驗 γ 提供的並行版本快速排序法實作,使其得以搭配上述 futex 程式碼運作
[修改部分](https://github.com/Hao-yu-lin/linux2023-summer/blob/main/hw2/qsort/qsort_futex.c)
修改方法並沒有很複雜
#### struct 部分
將 `pthread_mutex_t` 以及 `pthread_cond_t` 修改成 `mutex_t` 與 `cond_t`
```cpp=
/* Variant part passed to qsort invocations. */
struct qsort {
enum thread_state st; /* For coordinating work. */
struct common *common; /* Common shared elements. */
void *a; /* Array base. */
size_t n; /* Number of elements. */
pthread_t id; /* Thread id. */
// pthread_mutex_t mtx_st; /* For signalling state change. */
// pthread_cond_t cond_st; /* For signalling state change. */
mutex_t fmutex_st;
cond_t fcond_st;
};
/* Invariant common part, shared across invocations. */
struct common {
int swaptype; /* Code to use for swapping */
size_t es; /* Element size. */
void *thunk; /* Thunk for qsort_r */
cmp_t *cmp; /* Comparison function */
int nthreads; /* Total number of pool threads. */
int idlethreads; /* Number of idle threads in pool. */
int forkelem; /* Minimum number of elements for a new thread. */
struct qsort *pool; /* Fixed pool of threads. */
// pthread_mutex_t mtx_al; /* For allocating threads in the pool. */
mutex_t fmutex_al;
};
```
在後續的程式部分,更換如下
- `pthread_mutex_init` -> `mutex_init`
- `pthread_mutex_lock` -> `mutex_lock`
- `pthread_mutex_unlock` -> `mutex_unlock`
- `pthread_cond_init` -> `cond_init`
- `pthread_cond_signal` -> `cond_signal`
- `pthread_cond_wait` -> `cond_wait`
下圖為 第 1 次作業的測驗 γ (mt 橘色)與修改成 futex 版本(ft 藍色),x 軸為 thread 數量,y 軸為執行時間。
每次新增 thread 會重複執行 100 次取平均時間,最後結果如圖,由圖可知換成 futex 後整體執行時間增加很多,且隨著 thread 增加執行時間有越長的趨勢。

後來發現是我編譯時,多加了-fsanitize=thread。
正確的
```
gcc -Wall -DUSE_LINUX -o qsort_futex qsort_futex.c -lpthread
```
經過修正後如圖
