Try   HackMD

2023 Homework2 (quiz1)

contributed by < Hao-yu-lin >

題目連結

github

測驗解答
AAAA:MUTEX_SLEEPING
BBBB:1
CCCC:1
DDDD:1
EEEE:futex_wake
FFFF:futex_wake


Overview futex

reference:Basics of Futexes
futex("Fast userspace mutex") 為 userspace code 提供一種執行緒之間的同步(synchronize)同時伴隨最小程度的 linux kernel 參與。

Motivaiton

再引入 futexes 前,使用 syscall 來鎖定共享資源(ex:semop),隨著 programs 的 concurrent 需求增加,lock 行為在整體的執行時間佔了很大一部分,然而 lock 並沒有做任何實際的工作,它只有保證共享資源是安全的。在執行緒間很少競爭共享資源(low contention rate)的情況下,沒有其他執行緒在 wait queue 中等待 lock,但還是要從使用者層級切換到核心模式去檢查,這樣頻繁的 CPU 模式切換會對效能有負面影響,futex 是該問題解法之一。

在大部分的情形下,locks 不存在競爭關係的,如果一個執行緒遇到一個 free lock,代表在同一時間沒有其他的執行緒嘗試鎖定,可以在不使用 syscall 下調用完成,嘗試使用執行成本更低的 atomic operatoins。

然而,當另一個執行緒在同一個時間點,嘗試獲取 lock ,atomic operations 可能會失敗,此時有兩種選項。

  1. busy wait : busy-loop using the atomic until the lock is cleared
    busy-loop 的方法,這完全在 userspace 中進行,但仍然會造成浪費,因為會佔用核心,並且鎖定狀態有可能保持非常一段時間。
  2. syscall : sleep until the lock is free (or at least there's a high chance that it's free)
    sleep 可將 cpu 的資源讓出,但需要透過 kernel 協助,使用 futexes 來完成。

futex:wait queue(kernl space) + atomic integer(user sapce),透過 atomic integer,可以知道是否有執行緒在 wait queue 中等待。

  • 無競爭(contention):不需要進行 CPU mode 切換,進到 kernel 喚醒其他執行緒或是到 wait queue 等待
  • 有競爭(contention):利用 futex syscall 搭配 FUTEX_WAITFUTEX_WAKE,來喚醒其他執行緒或是到 wait queue 等待。

futex 的操作幾乎都在 userspace 完成,只有當操作結果不一致需要仲裁時,才會需要進入 kernel space 執行。這讓以 futex 為基礎的 lock 可以高效進行。

futex 在核心中藉由特製的佇列來管理執行緒或行程,可要求某個行程/執行緒 suspend 直到某個條件成立,或 signal 某個條件,來喚醒行程/執行緒。

futex syscall

long syscall(SYS_futex, uint32_t *uaddr, int futex_op, uint32_t val, const struct timespec *timeout, uint32_t *uaddr2, uint32_t val3);
  • uint32_t *uaddr:futex 中使用者層級 atomic integer 所存放的地址
  • int futex_op:futex operator
    1. FUTEX_WAIT
    2. FUTEX_WAKE
  • uint32_t val
    1. FUTEX_WAIT 代表預期使用者層級 atomic integer 的值
    2. FUTEX_WAKE 代表喚醒的執行緒數量

Simple futex use:waiting and waking

簡單小範例

main function 設定共享內存的相關設定後,建立一個 child process,行為如下:

child process
  • 等待 0xA 被寫入 shared_data 中
  • 將 0xB 寫入 shared_data 中
parent process
  • 將 0xA 寫入 shared_data 中
  • 等待 0xB 被寫入 shared data 中
int main(int argc, char** argv) { int shm_id = shmget(IPC_PRIVATE, 4096, IPC_CREAT | 0666); if (shm_id < 0) { perror("shmget"); exit(1); } int* shared_data = shmat(shm_id, NULL, 0); *shared_data = 0; int forkstatus = fork(); if (forkstatus < 0) { perror("fork"); exit(1); } if (forkstatus == 0) { // Child process // // 2. printf("child waiting for A\n"); wait_on_futex_value(shared_data, 0xA); // 4. printf("child writing B\n"); // Write 0xB to the shared data and wake up parent. *shared_data = 0xB; wake_futex_blocking(shared_data); } else { // Parent process. // 1. printf("parent writing A\n"); // Write 0xA to the shared data and wake up child. *shared_data = 0xA; wake_futex_blocking(shared_data); // 3. printf("parent waiting for B\n"); wait_on_futex_value(shared_data, 0xB); // Wait for the child to terminate. wait(NULL); shmdt(shared_data); } return 0; }

執行結果

parent writing A
child waiting for A
parent waiting for B
child writing B
wait_on_futex_value

使用 futex 等待 futex_addr 有 val,只有當 futex_addr 有值時,會去檢查 futex_addr 的值是否與 val 相等,如果相等 return,否則就 abort()

void wait_on_futex_value(int* futex_addr, int val) { while (1) { int futex_rc = futex(futex_addr, FUTEX_WAIT, val, NULL, NULL, 0); if (futex_rc == -1) { if (errno != EAGAIN) { perror("futex"); exit(1); } } else if (futex_rc == 0) { if (*futex_addr == val) { // This is a real wakeup. return; } } else { abort(); } } }
wake_futex_blocking

一個用於喚醒的 wrapper,只有被喚醒時,才會返回。

void wake_futex_blocking(int* futex_addr) { while (1) { int futex_rc = futex(futex_addr, FUTEX_WAKE, 1, NULL, NULL, 0); if (futex_rc == -1) { perror("futex wake"); exit(1); } else if (futex_rc > 0) { return; } } }

程式碼運作原理

futex.h

futex.h 檔案中有三種的 futex 操作,futex_waitfutex_wakefutex_requeue 使用 syscall 機制存取 Linux Kernel 內的 futex。

/kernel/futex/syscalls.c 會根據不同的 FUTEX_[xxx]_PRIVATE 透過 do_futex 執行對應的 futex。

long do_futex(u32 __user *uaddr, int op, u32 val, ktime_t *timeout, u32 __user *uaddr2, u32 val2, u32 val3) { int cmd = op & FUTEX_CMD_MASK; unsigned int flags = 0; if (!(op & FUTEX_PRIVATE_FLAG)) flags |= FLAGS_SHARED; if (op & FUTEX_CLOCK_REALTIME) { flags |= FLAGS_CLOCKRT; if (cmd != FUTEX_WAIT_BITSET && cmd != FUTEX_WAIT_REQUEUE_PI && cmd != FUTEX_LOCK_PI2) return -ENOSYS; } switch (cmd) { case FUTEX_WAIT: val3 = FUTEX_BITSET_MATCH_ANY; fallthrough; case FUTEX_WAIT_BITSET: return futex_wait(uaddr, flags, val, timeout, val3); case FUTEX_WAKE: val3 = FUTEX_BITSET_MATCH_ANY; fallthrough; case FUTEX_WAKE_BITSET: return futex_wake(uaddr, flags, val, val3); case FUTEX_REQUEUE: return futex_requeue(uaddr, flags, uaddr2, val, val2, NULL, 0); ... } return -ENOSYS; }
futex_wait -> kernel/futex/waitwake.c:futex_wait

在 futex.h 中的 futex_wait 實作部分,透過 syscall 方式呼叫 Linux Kernel 中的 futex_wait

/* futex.h */
/* Atomically check if '*futex == value', and if so, go to sleep */
static inline void futex_wait(atomic int *futex, int value)
{
    syscall(SYS_futex, futex, FUTEX_WAIT_PRIVATE, value, NULL);
}

此為 Linux Kernel 中的 futex_wait 實作,透過 futex_wait_setup 檢查 uaddr 中的值(futex 值) 是否與 val 相等:

  • 不相同:ret < 1,執行 out 然後 return
  • 相同:ret = 0,代表 uaddr 有此 val 並且處於 locked狀態,放入 futex_wait_queu 中等待被喚醒。
/*kernel/futex/waitwake.c:futex_wait*/ /* futex_wait_setup: - 0 - uaddr contains val and hb has been locked; - <1 - -EFAULT or -EWOULDBLOCK (uaddr does not contain val) and hb is unlocked */ int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val, ktime_t *abs_time, u32 bitset) { ... retry: /* * Prepare to wait on uaddr. On success, it holds hb->lock and q * is initialized. */ ret = futex_wait_setup(uaddr, val, flags, &q, &hb); if (ret) goto out; /* futex_queue and wait for wakeup, timeout, or a signal. */ futex_wait_queue(hb, &q, to); /* If we were woken (and unqueued), we succeeded, whatever. */ ret = 0; if (!futex_unqueue(&q)) goto out; ret = -ETIMEDOUT; ... out: if (to) { hrtimer_cancel(&to->timer); destroy_hrtimer_on_stack(&to->timer); } return ret; }
futex_wake -> kernel/futex/waitwake.c:futex_wake

在 futex.h 中的 futex_wake 實作部分,透過 syscall 方式呼叫 Linux Kernel 中的 futex_wake 喚醒 limit 個等待者。

/* futex.h */
/* Wake up 'limit' threads currently waiting on 'futex' */
static inline void futex_wake(atomic int *futex, int limit)
{
    syscall(SYS_futex, futex, FUTEX_WAKE_PRIVATE, limit);
}

futex.h 中的 limit 對應 Linux kernel 中的 nr_wake,透過 plist_for_each_entry_safe 來決定要喚醒哪些等待者,在執行 plist_for_each_entry_safe 前,使用 spin_lock 方式,進行 lock 避免喚醒的等待者順序錯誤,將欲喚醒的等待者,放入 wake_q 中,最後透過 wake_up_q 來進行喚醒。

/*kernel/futex/waitwake.c:futex_wake*/ int futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset) { struct futex_hash_bucket *hb; struct futex_q *this, *next; union futex_key key = FUTEX_KEY_INIT; int ret; DEFINE_WAKE_Q(wake_q); if (!bitset) return -EINVAL; ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &key, FUTEX_READ); if (unlikely(ret != 0)) return ret; hb = futex_hash(&key); /* Make sure we really have tasks to wakeup */ if (!futex_hb_waiters_pending(hb)) return ret; spin_lock(&hb->lock); plist_for_each_entry_safe(this, next, &hb->chain, list) { if (futex_match (&this->key, &key)) { if (this->pi_state || this->rt_waiter) { ret = -EINVAL; break; } /* Check if one of the bits is set in both bitsets */ if (!(this->bitset & bitset)) continue; futex_wake_mark(&wake_q, this); if (++ret >= nr_wake) break; } } spin_unlock(&hb->lock); wake_up_q(&wake_q); return ret; }
futex_requeue -> kernel/futex/requeue.c:futex_requeue

在 futex.h 中的 futex_wake 實作部分,會最多喚醒 limit 等待者,和 wait 不同的是,如果 futex 中的等待者超過 limit 個,
則超過的部分,會在對應的 wait queue 中等待,在 syscall 中的 INT_MAX 代表可以從 futex 中轉移到 wait queue 的最大數量。

/* futex.h */
/* Wake up 'limit' waiters, and re-queue the rest onto a different futex */
static inline void futex_requeue(atomic int *futex,
                                 int limit,
                                 atomic int *other)
{
    syscall(SYS_futex, futex, FUTEX_REQUEUE_PRIVATE, limit, INT_MAX, other);
}

根據 futex(2) 的敘述,futex 會一次喚醒 uaddr1 中的所有等待者,如果超過 nr_wake 個等待者時,會將剩餘的且數量不超過 nr_requeue 等待者放入 uaddr2 的 wait queue 中,這樣的做法是避免 thundering herd(驚群效應)。

固定設定的參數 nr_wake = 1, nr_requeue = INT_MAX,代表只會喚醒一個執行緒去競爭 uaddr2 對應的 futex,剩餘的放入 uaddr2 對應的 wait queue

mutex.h

mutex.h 中有三種 lock 操作,mutex_trylockmutex_lockmutex_unlock

mutex_trylock

先用 atomic_load 取得 mutex 狀態

  • 如果 mutex 目前為 lock 則返回 false。
  • 否則,透過 fetch_or 對 mutex 進行上鎖。
    對 mutex 進行 atomic_fetch_or_explicit ,此時 mutex->state 與 MUTEX_LOCKED 進行 or,並返回 mutex->state 的舊值。

透過這樣的方式,在上鎖時經由下一行(8)同步確認是否已經被其他人給上鎖,如果已經被上鎖,則此次行為是失敗的回傳 false。

當上鎖成功後,透過 atomic_thread_fence 進行 mutex 狀態的同步。

static bool mutex_trylock(mutex_t *mutex) { int state = load(&mutex->state, relaxed); if (state & MUTEX_LOCKED) return false; state = fetch_or(&mutex->state, MUTEX_LOCKED, relaxed); if (state & MUTEX_LOCKED) return false; thread_fence(&mutex->state, acquire); return true; }
mutex_lock

mutex_lock 在最一開始的 for loop,不斷地透過 mutex_trylock 來持有鎖。

如果一直得不到鎖,則透過 atomic_exchange_explicit 將 state 狀態改成 MUTEX_LOCKED | MUTEX_SLEEPING,並回傳先前的 state 值。

static inline void mutex_lock(mutex_t *mutex) { #define MUTEX_SPINS 128 for (int i = 0; i < MUTEX_SPINS; ++i) { if (mutex_trylock(mutex)) return; spin_hint(); } int state = exchange(&mutex->state, MUTEX_LOCKED | MUTEX_SLEEPING, relaxed); while (state & MUTEX_LOCKED) { futex_wait(&mutex->state, MUTEX_LOCKED | MUTEX_SLEEPING); state = exchange(&mutex->state, MUTEX_LOCKED | MUTEX_SLEEPING, relaxed); } thread_fence(&mutex->state, acquire); }
mutex_unlock

將 mutex->state 設定為 0,如果上鎖時有等待者,
則透過 futex_wake 來喚醒等待者。

static inline void mutex_unlock(mutex_t *mutex) { int state = exchange(&mutex->state, 0, release); if (state & MUTEX_SLEEPING) futex_wake(&mutex->state, 1); // FFFF }

cond.h

cond.h 有三種操作,cond_waitcond_signalcond_broadcast

cond_wait

透過 load 方式獲取 cond->seq,透過 mutex 方式,避免 futex 系統呼叫的可能性,在有限次數上,不斷地監看 cond->seq 的值改變。

  • 若 cond->seq 發生改變,則進行上鎖。
  • 若嘗試 COND_SPINS 次後,仍然失敗,則回到 futex_wait 進行等待。

隨後進入 while loop 每次會檢查狀態是否上鎖,如果上鎖,會透過 futex_wait 放入 wait queue 中等待。

參考 RinHizakura 同學的
在 fetch_or 地方,對於 mutex->state 添加 MUTEX_SLEEPING ,是為了在之後 mutex_unlock 時都能確保呼叫 futex_wake

static inline void cond_wait(cond_t *cond, mutex_t *mutex) { int seq = load(&cond->seq, relaxed); mutex_unlock(mutex); #define COND_SPINS 128 for (int i = 0; i < COND_SPINS; ++i) { if (load(&cond->seq, relaxed) != seq) { mutex_lock(mutex); return; } spin_hint(); } futex_wait(&cond->seq, seq); mutex_lock(mutex); fetch_or(&mutex->state, MUTEX_SLEEPING, relaxed); // AAAAA }
cond_signal

將 cond->seq 進行 + 1後,將對應的 cond->seq 給喚醒

static inline void cond_signal(cond_t *cond, mutex_t *mutex) { fetch_add(&cond->seq, 1, relaxed); // BBBB futex_wake(&cond->seq, 1); }
cond_broadcast

對 cond->seq 進行 + 1 後,透過 futex_requee 喚醒等待者,進行查看

static inline void cond_broadcast(cond_t *cond, mutex_t *mutex) { fetch_add(&cond->seq, 1, relaxed); // CCCC futex_requeue(&cond->seq, 1, &mutex->state); // DDDD }

main.c

有兩個 struct node 與 clock

  • clock:內部紀錄 ticks,作為時間點的判斷,每一個 thread 接會根據 ticks 的值而作相應的行爲

  • node:為工作節點,建立後轉交給 thread 進行操作,一個 thread 負責一個 node

皆需要 conditional variable 和 mutex 機制,保護 clock->ticks 與 node->ready

struct clock { mutex_t mutex; cond_t cond; int ticks; }; /* A node in a computation graph */ struct node { struct clock *clock; struct node *parent; mutex_t mutex; cond_t cond; bool ready; };

clock

clock 有三個操作 clock_waitclock_tickclock_stop

clock_wait

使用 mutex 進行上鎖,藉此檢查 clock->ticks 的值,如果 clock->ticks 小於 ticks 時,則進入 cond_wait 進行監看 ticks 是否改變

static bool clock_wait(struct clock *clock, int ticks) { mutex_lock(&clock->mutex); while (clock->ticks >= 0 && clock->ticks < ticks) cond_wait(&clock->cond, &clock->mutex); bool ret = clock->ticks >= ticks; mutex_unlock(&clock->mutex); return ret; }
clock_tick

使用 mutex 進行上鎖,對於 ticks 進行 + 1,之後透過 cond_broadcast 進行廣播告知 clock->ticks 已經更新

static void clock_tick(struct clock *clock) { mutex_lock(&clock->mutex); if (clock->ticks >= 0) ++clock->ticks; mutex_unlock(&clock->mutex); cond_broadcast(&clock->cond, &clock->mutex); }
clock_stop

使用 mutex 進行上鎖,對於 ticks 設定為 - 1,使得 clock->ticks 的累加停止,之後透過 cond_broadcast 進行廣播告知 clock->ticks 已經更新

static void clock_stop(struct clock *clock) { mutex_lock(&clock->mutex); clock->ticks = -1; mutex_unlock(&clock->mutex); cond_broadcast(&clock->cond, &clock->mutex); }

node

clock 有二個操作 node_waitnode_signal ,該 node 有幾個特性:

  1. 共享同一個 clock->ticks
  2. ni
    的 parent 為
    ni1
  3. 有 condtion variable + mutex 機制
node_wait

檢查 node 的 ready 狀態,若為 false,則進入 cond_wait 進行等待

static void node_wait(struct node *node) { mutex_lock(&node->mutex); while (!node->ready) cond_wait(&node->cond, &node->mutex); node->ready = false; mutex_unlock(&node->mutex); }
node_signal

將 ready 設定為 true 後,透過 cond_signal 喚醒等待者。

static void node_signal(struct node *node) { mutex_lock(&node->mutex); node->ready = true; mutex_unlock(&node->mutex); cond_signal(&node->cond, &node->mutex); }

main

main function 如下,建立一個 clock,並新增 16 個 nodes (

n0 n15)
對於每一個 node 均建立一個 thread,並透過 thread_func 進行測試。

int main(void) { struct clock clock; clock_init(&clock); #define N_NODES 16 struct node nodes[N_NODES]; node_init(&clock, NULL, &nodes[0]); for (int i = 1; i < N_NODES; ++i) node_init(&clock, &nodes[i - 1], &nodes[i]); pthread_t threads[N_NODES]; for (int i = 0; i < N_NODES; ++i) { if (pthread_create(&threads[i], NULL, thread_func, &nodes[i]) != 0) return EXIT_FAILURE; } clock_tick(&clock); clock_wait(&clock, 1 << N_NODES); clock_stop(&clock); for (int i = 0; i < N_NODES; ++i) { if (pthread_join(threads[i], NULL) != 0) return EXIT_FAILURE; } return EXIT_SUCCESS; }

在建立 thread 後,透過 clock_tickclock_waitclock_stop 進行操作。

透過呼叫第一個 clock_tick 使得 ticks變為 1,藉此讓其他的 thread 可以開始根據 tick 逐步進行。

clock_wait 會一直等待 tick 變成 1 << N_NODES 後再透過 clock_stop 來讓 node 的 thread 結束

clock_tick(&clock); clock_wait(&clock, 1 << N_NODES); clock_stop(&clock);
thread_func

thread 在 clock->tick 小於 i 時,會進行等待,開始執行後,透過 node_wait 檢查該 node 的 parent 是否 ready,接著若 bit 是 false 則執行 clock_tick 進行 tick ++,若為 true,則透過 node_signal 喚醒下一個 thread。

static void *thread_func(void *ptr) { struct node *self = ptr; bool bit = false; for (int i = 1; clock_wait(self->clock, i); ++i) { if (self->parent) node_wait(self->parent); if (bit) { node_signal(self); } else { clock_tick(self->clock); } bit = !bit; } node_signal(self); return NULL; }

延伸問題 - 2

修改第 1 次作業的測驗 γ 提供的並行版本快速排序法實作,使其得以搭配上述 futex 程式碼運作

修改部分

修改方法並沒有很複雜

struct 部分

pthread_mutex_t 以及 pthread_cond_t 修改成 mutex_tcond_t

/* Variant part passed to qsort invocations. */ struct qsort { enum thread_state st; /* For coordinating work. */ struct common *common; /* Common shared elements. */ void *a; /* Array base. */ size_t n; /* Number of elements. */ pthread_t id; /* Thread id. */ // pthread_mutex_t mtx_st; /* For signalling state change. */ // pthread_cond_t cond_st; /* For signalling state change. */ mutex_t fmutex_st; cond_t fcond_st; }; /* Invariant common part, shared across invocations. */ struct common { int swaptype; /* Code to use for swapping */ size_t es; /* Element size. */ void *thunk; /* Thunk for qsort_r */ cmp_t *cmp; /* Comparison function */ int nthreads; /* Total number of pool threads. */ int idlethreads; /* Number of idle threads in pool. */ int forkelem; /* Minimum number of elements for a new thread. */ struct qsort *pool; /* Fixed pool of threads. */ // pthread_mutex_t mtx_al; /* For allocating threads in the pool. */ mutex_t fmutex_al; };

在後續的程式部分,更換如下

  • pthread_mutex_init -> mutex_init
  • pthread_mutex_lock -> mutex_lock
  • pthread_mutex_unlock -> mutex_unlock
  • pthread_cond_init -> cond_init
  • pthread_cond_signal -> cond_signal
  • pthread_cond_wait -> cond_wait

下圖為 第 1 次作業的測驗 γ (mt 橘色)與修改成 futex 版本(ft 藍色),x 軸為 thread 數量,y 軸為執行時間。
每次新增 thread 會重複執行 100 次取平均時間,最後結果如圖,由圖可知換成 futex 後整體執行時間增加很多,且隨著 thread 增加執行時間有越長的趨勢。

qsort

後來發現是我編譯時,多加了-fsanitize=thread。
正確的

gcc -Wall -DUSE_LINUX -o qsort_futex qsort_futex.c -lpthread

經過修正後如圖