# 2023 Homework2 (quiz1) contributed by < chunplusplus > [題目](https://hackmd.io/@sysprog/linux2023-summer-quiz1) ## 解釋上述程式碼運作原理,應該涵蓋測試方法、futex 使用方式、Linux 核心開發者針對 POSIX Threads 強化哪些相關 futex 實作機制等等 futex 是 Linux 提供一個底層的同步功能, 可以用來實作上層同步工具。 程式碼中, 展示了用 atomic operators 實作 spinlock, 然後用 spinlock 和 futex 實作 mutex 和 condition variable。 測試方法中, 用 condition variable 實作出 clock_wait 和 node_wait。 由 thread_func 中的 for loop 看出, 每個 thread 的每個 run 需要等待 clock 和 parent node 兩個 condition; 每個 run 會 signal 自己或是 tick clock 來更新 condition 使其 child thread 有進展。 其中 thread0 沒有 parent, 所以進展得最快; thread1 的進展會比 thread0 慢; thread2 的進展會比 thread1 慢; 如此類推。 GNU C 的 Native POSIX Thread Library (NPTL) 也是用 futex 實作。如 glibc source code 裡的 nptl/lowlevellock.h: ```c /* low level locking for pthread library. Generic futex-using version. Copyright (C) 2003-2023 Free Software Foundation, Inc. This file is part of the GNU C Library. The GNU C Library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. The GNU C Library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with the GNU C Library; if not, see <https://www.gnu.org/licenses/>. */ #include <sysdep.h> #include <futex-internal.h> #include <atomic.h> #include <stap-probe.h> void __lll_lock_wait_private (int *futex) { if (atomic_load_relaxed (futex) == 2) goto futex; while (atomic_exchange_acquire (futex, 2) != 0) { futex: LIBC_PROBE (lll_lock_wait_private, 1, futex); futex_wait ((unsigned int *) futex, 2, LLL_PRIVATE); /* Wait if *futex == 2. */ } } libc_hidden_def (__lll_lock_wait_private) void __lll_lock_wait (int *futex, int private) { if (atomic_load_relaxed (futex) == 2) goto futex; while (atomic_exchange_acquire (futex, 2) != 0) { futex: LIBC_PROBE (lll_lock_wait, 1, futex); futex_wait ((unsigned int *) futex, 2, private); /* Wait if *futex == 2. */ } } libc_hidden_def (__lll_lock_wait) void __lll_lock_wake_private (int *futex) { lll_futex_wake (futex, 1, LLL_PRIVATE); } libc_hidden_def (__lll_lock_wake_private) void __lll_lock_wake (int *futex, int private) { lll_futex_wake (futex, 1, private); } libc_hidden_def (__lll_lock_wake) #if ENABLE_ELISION_SUPPORT int __pthread_force_elision; libc_hidden_data_def (__pthread_force_elision) #endif ``` POSIX Threads 的 mutex 的 priority inheritance (PI) 也是由 futex 實作, glibc source code 的 nptl/pthread_mutex_lock.c ```c=364 /* The PI support requires the Linux futex system call. If that's not available, pthread_mutex_init should never have allowed the type to be set. So it will get the default case for an invalid type. */ #ifdef __NR_futex case PTHREAD_MUTEX_PI_RECURSIVE_NP: case PTHREAD_MUTEX_PI_ERRORCHECK_NP: case PTHREAD_MUTEX_PI_NORMAL_NP: case PTHREAD_MUTEX_PI_ADAPTIVE_NP: case PTHREAD_MUTEX_PI_ROBUST_RECURSIVE_NP: case PTHREAD_MUTEX_PI_ROBUST_ERRORCHECK_NP: case PTHREAD_MUTEX_PI_ROBUST_NORMAL_NP: case PTHREAD_MUTEX_PI_ROBUST_ADAPTIVE_NP: ``` ## 修改第 1 次作業的測驗 γ 提供的並行版本快速排序法實作,使其得以搭配上述 futex 程式碼運作 複製本測驗 lock 實作程式碼到測驗 γ ```shell $ cp ../futex/atomic.h ../futex/cond.h ../futex/futex.h ../futex/mutex.h ../futex/spinlock.h . ``` 建立 Makefile ``` CFLAGS := -std=c11 -Wall -g -O2 -D_GNU_SOURCE -fsanitize=thread LDFLAGS := -lpthread ALL := test_pthread test_linux all: $(ALL) .PHONY: all test_%: qsort-mt.c atomic.h cond.h futex.h mutex.h spinlock.h $(CC) $(CFLAGS) qsort-mt.c -o $@ $(LDFLAGS) test_pthread: CFLAGS += -DUSE_PTHREADS test_linux: CFLAGS += -DUSE_LINUX clean: $(RM) $(ALL) test: ./test_pthread -vt ./test_linux -vt .PHONY: clean ``` include lock 的 header file, 和把 `typeof` 改為 `__typeof__` ```c #include <stdlib.h> #include <string.h> +#include "cond.h" +#include "futex.h" +#include "mutex.h" + #define verify(x) \ do { \ int e; \ @@ -22,8 +26,8 @@ static inline void swapfunc(char *, char *, int, int); #define min(a, b) \ ({ \ - typeof(a) _a = (a); \ - typeof(b) _b = (b); \ + __typeof__(a) _a = (a); \ + __typeof__(b) _b = (b); \ _a < _b ? _a : _b; \ }) ``` ## 研讀〈並行程式設計: 建立相容於 POSIX Thread 的實作〉,在上述程式碼的基礎之上,實作 priority inheritance mutex 並確認與 glibc 實作行為相同,應有對應的 PI 測試程式碼 修改 futex.h ```c /* Atomically check if '*futex == value', and if so, go to sleep */ static inline void futex_wait(atomic int *futex, int value) { - syscall(SYS_futex, futex, FUTEX_WAIT_PRIVATE, value, NULL); + syscall(SYS_futex, futex, FUTEX_LOCK_PI, value, NULL); } /* Wake up 'limit' threads currently waiting on 'futex' */ static inline void futex_wake(atomic int *futex, int limit) { - syscall(SYS_futex, futex, FUTEX_WAKE_PRIVATE, limit); + syscall(SYS_futex, futex, FUTEX_UNLOCK_PI, limit); } /* Wake up 'limit' waiters, and re-queue the rest onto a different futex */ static inline void futex_requeue(atomic int *futex, int limit, - atomic int *other) + atomic int *other, int val3) { - syscall(SYS_futex, futex, FUTEX_REQUEUE_PRIVATE, limit, INT_MAX, other); + syscall(SYS_futex, futex, FUTEX_CMP_REQUEUE_PI, limit, INT_MAX, other, val3); } ``` 修改 mutex.h ```c @@ -32,6 +32,8 @@ enum { .state = 0 \ } +int unlock = 0; + static inline void mutex_init(mutex_t *mutex) { atomic_init(&mutex->state, 0); @@ -40,14 +42,12 @@ static inline void mutex_init(mutex_t *mutex) static bool mutex_trylock(mutex_t *mutex) { int state = load(&mutex->state, relaxed); - if (state & MUTEX_LOCKED) + if (state != 0) return false; - state = fetch_or(&mutex->state, MUTEX_LOCKED, relaxed); - if (state & MUTEX_LOCKED) + if (!compare_exchange_strong(&mutex->state, &state, (int) gettid(), seq_cst, relaxed)) return false; - thread_fence(&mutex->state, acquire); return true; } @@ -60,21 +60,21 @@ static inline void mutex_lock(mutex_t *mutex) spin_hint(); } - int state = exchange(&mutex->state, MUTEX_LOCKED | MUTEX_SLEEPING, relaxed); + int tid = (int) gettid(); + bool state = compare_exchange_strong(&mutex->state, &unlock, tid, seq_cst, seq_cst); - while (state & MUTEX_LOCKED) { - futex_wait(&mutex->state, MUTEX_LOCKED | MUTEX_SLEEPING); - state = exchange(&mutex->state, MUTEX_LOCKED | MUTEX_SLEEPING, relaxed); + while (!state) { + futex_wait(&mutex->state, 0); + state = compare_exchange_strong(&mutex->state, &unlock, tid, seq_cst, seq_cst); } - - thread_fence(&mutex->state, acquire); } static inline void mutex_unlock(mutex_t *mutex) { - int state = exchange(&mutex->state, 0, release); - if (state & MUTEX_SLEEPING) - futex_wake(&mutex->state, 1); + int tid = (int) gettid(); + bool state = compare_exchange_strong(&mutex->state, &tid, 0, seq_cst, seq_cst); + if (!state) + futex_wake(&mutex->state, 0); } #endif ``` 修改 cond.h ```c static inline void cond_broadcast(cond_t *cond, mutex_t *mutex) { fetch_add(&cond->seq, 1, relaxed); - futex_requeue(&cond->seq, 1, &mutex->state); + futex_requeue(&cond->seq, 1, &mutex->state, mutex->state); } ``` 修改 Makefile ```clike clean: - $(RM) $(ALL) + $(RM) $(ALL) $(PI) test: ./test_pthread -vt ./test_linux -vt .PHONY: clean + +PI := pi_pthread pi_linux + +pi: $(PI) + +.PHONY: pi + +pi_%: priority-inversion.c atomic.h cond.h futex.h mutex.h spinlock.h + $(CC) $(CFLAGS) priority-inversion.c -o $@ $(LDFLAGS) + + +pi_pthread: CFLAGS += -DUSE_PTHREADS +pi_linux : CFLAGS += -DUSE_LINUX + +test_pi: + sudo taskset -c 3 sudo ./pi_pthread + sudo taskset -c 3 sudo ./pi_linux ``` 建立測試程式 priority-inversion.c ``` #include <stdlib.h> #include <string.h> #include <unistd.h> #include <sched.h> #include <pthread.h> #include <stdio.h> #include <errno.h> #include "cond.h" #include "futex.h" #include "mutex.h" #define THREADCOUNT 3 /* sleep */ void musleep(int secs) { struct timespec ts = {.tv_sec = secs, .tv_nsec = 0}, rem = {.tv_sec = 0, .tv_nsec = 0}; /* Sleep would be interrupted by signal handler, so we should * record and resume it */ while (nanosleep(&ts, &rem) == -EINTR) { ts.tv_sec = rem.tv_sec; ts.tv_nsec = rem.tv_nsec; } } static mutex_t mutex, mutex_2; void TASK1() { mutex_lock(&mutex); printf("1\n"); mutex_unlock(&mutex); } void TASK2() { mutex_lock(&mutex_2); musleep(1); printf("2\n"); mutex_unlock(&mutex_2); } void TASK3() { mutex_lock(&mutex); musleep(1); printf("3\n"); mutex_unlock(&mutex); } static void (*TASKS[])() = { TASK1, TASK2, TASK3, }; int main() { int st; pthread_t th[THREADCOUNT]; pthread_attr_t attr; pthread_attr_init(&attr); struct sched_param param; mutex_init(&mutex); mutex_init(&mutex_2); pthread_attr_setschedpolicy(&attr, SCHED_FIFO); pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED); for (int i = THREADCOUNT - 1; i >= 0; i--) { param.sched_priority = (THREADCOUNT - i) * 10; pthread_attr_setschedparam(&attr, &param); st = pthread_create(&th[i], &attr, (void *)TASKS[i], NULL); if (st != 0) { fprintf(stderr, "Failed to spawn thread %d: %s\n", i, strerror(-st)); return 1; } } for (int i = THREADCOUNT - 1; i >= 0; i--) pthread_join(th[i], NULL); return 0; } ``` 確認與 glibc 實作行為相同 ```shell 12:06 $ make cc -std=c11 -Wall -g -O2 -D_GNU_SOURCE -fsanitize=thread -DUSE_PTHREADS qsort-mt.c -o test_pthread -lpthread cc -std=c11 -Wall -g -O2 -D_GNU_SOURCE -fsanitize=thread -DUSE_LINUX qsort-mt.c -o test_linux -lpthread 12:07 $ make test ./test_pthread -vt 29 56.1 0.13 ./test_linux -vt 28.9 55.9 0.12 ``` PI 測試 ``` 12:08 $ make pi cc -std=c11 -Wall -g -O2 -D_GNU_SOURCE -fsanitize=thread -DUSE_PTHREADS priority-inversion.c -o pi_pthread -lpthread cc -std=c11 -Wall -g -O2 -D_GNU_SOURCE -fsanitize=thread -DUSE_LINUX priority-inversion.c -o pi_linux -lpthread 12:10 $ make test_pi sudo taskset -c 3 sudo ./pi_pthread 3 2 1 sudo taskset -c 3 sudo ./pi_linux 3 1 2 ``` 由測試結果可見, 采用 priority inheritance 的 futex 實作的 mutex, 使 priority 最高的 TASK1 比 TASK2 先完成。