# 2023 Homework2 (quiz1) contributed by < `valley-ant` > ## 程式碼運作機制 程式碼的運作機制,可分為三大類: 1. 與共享機制無關的部分 (以下稱為lock-independent) 2. 與共享機制有關的部分 (以下稱為lock-dependent),主要focus在mutex/conditional variable等相關的設計。 4. memory reodering 機制 其中欲了解1,必須對2有一定程度的理解。要了解2又不能完全錯過3,因為部分lock機制在不了解reordering機制的情況下,會無法正確使用。這裡稍加定義一下何謂『正確』。最顯而易見的錯誤使用方式就是不經意地導致date race的情況,其他則是基於對這方面機制的理解不足導致的效能損失。因此在探討lock-independent part之前。我想先從memory reordering開始描述自己的理解。 ### memory reordering mechanism #### references 1. [並行程式設計](https://hackmd.io/@sysprog/concurrency-ordering) 2. 過程還參考了其他網站,留個洞 再補充。 #### Intention 降低諸如cache coherance等會使CPU需要等待的情況出現(比如load可以經由重排後使load's'可以一次進行)。 #### Description * Ordering指的是指令在實際執行時的順序,該順序有可能被compiler調整(可透過memory barrier避免),也可能是CPU本身的超序執行(out-of-order execution),這則是透過memory order的weakness來決定可以重排的等級。下面將簡述一下weakness在C裡面的保證與意圖。 * Relax * Only guarantees the atomiciy of "load" and "store" of an 32-bit (64-bit) integer. * 這裡能保證的atomic理論上只有CPU instruction可以保證的load/store operation,不能保證在多執行緒下指令的先後順序打亂後的sychornize-with關係(即多thread間的happens-before)。再換句話說,執行緒A對mutex的取得或修改,經指令重排後以B的角度來看並不能保證是visible的。 * Release-Acquire ordering * All load/store instructions "before" atomic store, cannot be executed "after" the atomic store. * All load/store instructions "after" atomic load, cannot be executed "before" the atomic load. ### lock-dependent code #### futex * 本題的實作中,futex系列的程式碼裡面都會使用到系統呼叫(SYSCALL)。下面闡述一下各函數的作用: ```c /* Atomically check if '*futex == value', and if so, go to sleep */ static inline void futex_wait(atomic int *futex, int value) { syscall(SYS_futex, futex, FUTEX_WAIT_PRIVATE, value, NULL); } /* Wake up 'limit' threads currently waiting on 'futex' */ static inline void futex_wake(atomic int *futex, int limit) { syscall(SYS_futex, futex, FUTEX_WAKE_PRIVATE, limit); } /* Wake up 'limit' waiters, and re-queue the rest onto a different futex */ static inline void futex_requeue(atomic int *futex, int limit, atomic int *other) { syscall(SYS_futex, futex, FUTEX_REQUEUE_PRIVATE, limit, INT_MAX, other); } ``` * futex_wait: pass a pointer and an expected value. block this thread if *pointer equals to the expected value. * futex_wake: wake "limit" amounts of threads in the futex waiter_list. * futex_requeue: wake "limit" amounts of threads and requeue them to another futex. * 這些函數會在thread等不到時sleep或是需要醒來時被呼叫到。 #### mutex * 主要有三個操作: ```c static inline void mutex_init(mutex_t *mutex) { atomic_init(&mutex->state, 0); } static bool mutex_trylock(mutex_t *mutex) { int state = load(&mutex->state, relaxed); if (state & MUTEX_LOCKED) return false; state = fetch_or(&mutex->state, MUTEX_LOCKED, relaxed); if (state & MUTEX_LOCKED) return false; thread_fence(&mutex->state, acquire); return true; } static inline void mutex_lock(mutex_t *mutex) { #define MUTEX_SPINS 128 for (int i = 0; i < MUTEX_SPINS; ++i) { // optimization: try lock first to avoid heavy // operations like SYSCALL. if (mutex_trylock(mutex)) return; spin_hint(); } int state = exchange(&mutex->state, MUTEX_LOCKED | MUTEX_SLEEPING, relaxed); while (state & MUTEX_LOCKED) { futex_wait(&mutex->state, MUTEX_LOCKED | MUTEX_SLEEPING); state = exchange(&mutex->state, MUTEX_LOCKED | MUTEX_SLEEPING, relaxed); } thread_fence(&mutex->state, acquire); } static inline void mutex_unlock(mutex_t *mutex) { // clear lock status int state = exchange(&mutex->state, 0, release); // wake a process that is waiting the mutex. // every waked process will mark the state to be LOCK and SLEEP. if (state & MUTEX_SLEEPING) futex_wake(&mutex->state, 1); } ``` * mutex_trylock: 嘗試take mutex,失敗直接回傳false (代表沒有取得)。 * mutex_lock: 取得mutex並lock mutex,若無法取得會透過futex_wait來判斷有沒有需要sleep。這裡標注的MUTEX_LOCKED | MUTEX_SLEEPING,前者是用來標注mutex被鎖定了:後者是用來標示mutex wait list裡面有thread正在等。這裡會標注SLEEPING有個用意是避免exchange時把SLEEPING誤寫掉了。 * mutex_unlock: reset mutex->status並把處在mutex waiter_list中的thread叫醒。醒來的thread下一行會再mark LOCK and SLEEP。 #### conditional variable (cond) * 主要有三個操作: ```c static inline void cond_wait(cond_t *cond, mutex_t *mutex) { int seq = load(&cond->seq, relaxed); // owner release the mutex. If any thread waked and signaled, it can take the mutex. mutex_unlock(mutex); #define COND_SPINS 128 for (int i = 0; i < COND_SPINS; ++i) { // optimization for avoiding SYSCALL if (load(&cond->seq, relaxed) != seq) { mutex_lock(mutex); return; } spin_hint(); } // if no one signal, goto sleep. futex_wait(&cond->seq, seq); // waked up or be able to take the mutex. Lock it. mutex_lock(mutex); fetch_or(&mutex->state, MUTEX_SLEEPING, relaxed); } static inline void cond_signal(cond_t *cond, mutex_t *mutex) { fetch_add(&cond->seq, 1, relaxed); futex_wake(&cond->seq, 1); } static inline void cond_broadcast(cond_t *cond, mutex_t *mutex) { fetch_add(&cond->seq, 1, relaxed); futex_requeue(&cond->seq, 1, &mutex->state); } ``` * cond_wait: 等待某個條件發生再往下執行。若沒發生則會透過futex_wait來判斷是否要睡。這隻函數先mutex_unlock是為了讓其他也使用到相同mutex的thread可以往下走。若沒有其他thread使用cond_signal or cond_broadcast,則會用futex_wait讓他sleep。再醒來後則會把mutex鎖起來並標記為sleeping,未來在unlock時可以被喚起。 * cond_signal: increment condition variable and wake a process. * cond_broadcast: increment condition variable and requeue the wait queue using futex_requeue. cond_broadcast會透過futex_requeue將condition variable中的wait queue換到mutex中。這是因為被fetux_wait sleep的thread會在condition variable wait list中,而這些thread實際上在等待的是取得clock->tick mutex。因此把condition variable中的queue插入至mutex中,可確保接下來在mutex_unlock時能將這些排隊中的thread叫醒。 ### lock independent #### struct clock * structure clock為共享的資源,主要提供三個函數: ```c static void clock_init(struct clock *clock) { mutex_init(&clock->mutex); cond_init(&clock->cond); clock->ticks = 0; } static bool clock_wait(struct clock *clock, int ticks) { mutex_lock(&clock->mutex); while (clock->ticks >= 0 && clock->ticks < ticks) cond_wait(&clock->cond, &clock->mutex); bool ret = clock->ticks >= ticks; mutex_unlock(&clock->mutex); return ret; } static void clock_tick(struct clock *clock) { mutex_lock(&clock->mutex); if (clock->ticks >= 0) ++clock->ticks; mutex_unlock(&clock->mutex); cond_broadcast(&clock->cond, &clock->mutex); } static void clock_stop(struct clock *clock) { mutex_lock(&clock->mutex); clock->ticks = -1; mutex_unlock(&clock->mutex); cond_broadcast(&clock->cond, &clock->mutex); } ``` * clock_wait: 等待共享的clock直到比ticks小為止。告知caller是否還需要等待。 * clock_tick: increment clock->tick並叫醒thread。 * clock_stop: stop clock (-1 will indicate a stop condition in clock_wait)並叫醒thread。 #### struct node * 每個NODE有自己的parent,並有自己的status。每個thread需要等到parent node is ready才可以往下跑。 * 主要可用的函數有2個: ```c static void node_wait(struct node *node) { mutex_lock(&node->mutex); while (!node->ready) cond_wait(&node->cond, &node->mutex); node->ready = false; mutex_unlock(&node->mutex); } static void node_signal(struct node *node) { mutex_lock(&node->mutex); node->ready = true; mutex_unlock(&node->mutex); cond_signal(&node->cond, &node->mutex); } ``` * node_wait: 藉由帶入的節點來看需不需要等待,在本題的應用是判斷親代節點是否ready。如尚未ready則wait(從使用者角度來看,不需要知道thread是被sleep or busy-wait)。 * node_signal: 把節點設為ready,並signal等待中的節點。這裡使用cond_signal是因為會等待node->mutex的節點僅有自己的子節點。 #### thread_func * thread_func uses clock_wait to know if next loop is required. * Every node must wait its parent to be ready to execute further. * The clock_wait will fail when main_thread has wait 1 << N_NODES ticks. ## Qsort- futex vs pthread [code] https://github.com/valley-ant/linux_kernel_2023 * USE_LINUX (test_linux): futex version * USE_PTHREADS (test_pthread): pthread version | ITEM | TIME (K) | | -------- | -------- | | USE_LINUX | 0.012~0.044 | | USE_PTHREADS| 0.024~0.056| * 以1000000資料長度進行10次實驗,執行下來的結果使用PTHREAD會在kernel space花較多時間。使其平均執行時間也些微的較長。 * 更多樣本數生成更多的執行序(比較一下更多執行緒下的行為)。 ## Priority inheritance mutex ### Reference [RinHizakura](https://hackmd.io/@RinHizakura/linux2023-summer-hw2) [futex man page](https://man7.org/linux/man-pages/man2/futex.2.html) [並行程式設計:建立相容於 POSIX Thread 的實作](https://hackmd.io/@sysprog/concurrency-thread-package) ### Implementation (On going...) [src](https://github.com/valley-ant/lk_summer_2023) ```c /* futex(2): use atomic compare-and-change. * memory model failed for strong? */ #pragma once #if USE_PTHREADS #include <pthread.h> #define mutex_t pthread_mutex_t #define muattr_t pthread_mutexattr_t #define mutex_init(m, t) pthread_mutex_init(m, t) #define MUTEX_INITIALIZER PTHREAD_MUTEX_INITIALIZER #define mutex_trylock(m) (!pthread_mutex_trylock(m)) #define mutex_lock pthread_mutex_lock #define mutex_unlock pthread_mutex_unlock // for pthread compatible #define muattr_setprotocol pthread_mutexattr_setprotocol #define PRIO_NONE PTHREAD_PRIO_NONE #define PRIO_INHERIT PTHREAD_PRIO_INHERIT #else #include <stdbool.h> #include <stdint.h> #include "atomic.h" #include "futex.h" #include "spinlock.h" typedef int mutype_t; #define gettid() syscall(SYS_gettid) /* futex(2): use atomic compare-and-change. * memory model failed for strong? */ #define cmpxchg(obj, expect, desired) \ compare_exchange_strong(obj, expect, desired, relaxed, relaxed) #define MUTEX_SPINS 128 #define FUTEX_IDLE_ID 0 enum { PRIO_NONE = 0, PRIO_INHERIT }; typedef struct { int protocol; } muattr_t; typedef struct { atomic int state; mutype_t protocol; pid_t owner; } mutex_t; enum { MUTEX_LOCKED = 1 << 0, MUTEX_SLEEPING = 1 << 1, }; #define MUTEX_INITIALIZER \ { \ .state = 0, .protocol = 0, \ } static inline void muattr_setprotocol(muattr_t *muattr, int protocol) { muattr->protocol = protocol; } static inline void mutex_init(mutex_t *mutex, const muattr_t *attr) { atomic_init(&mutex->state, 0); mutex->protocol = attr->protocol; } static bool mutex_trylock_pi(mutex_t *mutex) { pid_t expect = FUTEX_IDLE_ID; pid_t tid; /* TODO: A non-syscall version to gettid like x86-64 */ tid = gettid(); /* 0: mutex is idle, exchange it and return true. */ /* if state==expect, then set state to tid. (see futex(2) man page */ if (cmpxchg(&mutex->state, &expect, tid)) return true; /* lock is taken, return false.. */ thread_fence(&mutex->state, acquire); return false; } static inline void mutex_lock_pi(mutex_t *mutex) { pid_t self = NULL; for (int i = 0; i < MUTEX_SPINS; ++i) { // Try to take if (mutex_trylock_pi(mutex)) return; spin_hint(); } self = gettid(); /* can not take the lock... */ /* FUTEX_LOCK_PI to sleep */ futex_lock_pi(&mutex->state,0 , NULL); /* lock aquired set owner to self. */ mutex->owner = self; thread_fence(&mutex->state, acquire); } static inline void mutex_unlock_pi(mutex_t *mutex) { pid_t tid = gettid(); /* If mutex is aquired, the tid will be the owner thread */ if (cmpxchg(&mutex->state, &tid, FUTEX_IDLE_ID)) return; futex_unlock_pi(&mutex->state); } static bool mutex_trylock_default(mutex_t *mutex) { int state = load(&mutex->state, relaxed); if (state & MUTEX_LOCKED) return false; state = fetch_or(&mutex->state, MUTEX_LOCKED, relaxed); if (state & MUTEX_LOCKED) return false; thread_fence(&mutex->state, acquire); return true; } static inline void mutex_lock_default(mutex_t *mutex) { for (int i = 0; i < MUTEX_SPINS; ++i) { if (mutex_trylock_default(mutex)) return; spin_hint(); } int state = exchange(&mutex->state, MUTEX_LOCKED | MUTEX_SLEEPING, relaxed); while (state & MUTEX_LOCKED) { futex_wait(&mutex->state, MUTEX_LOCKED | MUTEX_SLEEPING); state = exchange(&mutex->state, MUTEX_LOCKED | MUTEX_SLEEPING, relaxed); } thread_fence(&mutex->state, acquire); } static inline void mutex_unlock_default(mutex_t *mutex) { int state = exchange(&mutex->state, 0, release); if (state & MUTEX_SLEEPING) futex_wake(&mutex->state, 1); } static bool mutex_trylock(mutex_t *mutex) { return true; } static inline void mutex_lock(mutex_t *mutex) { #if 1 switch (mutex->protocol) { case PRIO_NONE: mutex_lock_default(mutex); break; case PRIO_INHERIT: mutex_lock_pi(mutex); break; default: break; } #endif } static inline void mutex_unlock(mutex_t *mutex) { #if 1 switch (mutex->protocol) { case PRIO_NONE: mutex_unlock_default(mutex); break; case PRIO_INHERIT: mutex_unlock_pi(mutex); break; default: break; } #endif } #endif // TODO: futex_requeue_pi ``` ### Testing FIFO encounter RTPRIO LIMIT issue.sydo ```c #include <pthread.h> #include <stdbool.h> #include <stdlib.h> #include "cond.h" #include "futex.h" #include "mutex.h" #define N_THREADS 3 mutex_t lock; void task_high() { printf("high (before lock)\n"); mutex_lock(&lock); printf("high\n"); mutex_unlock(&lock); printf("end of high\n"); } void task_med() { printf("med\n"); sleep(2); printf("end of med\n"); } void task_low() { printf("low\n"); mutex_lock(&lock); sleep(1); printf("low goes on\n"); mutex_unlock(&lock); printf("end of low\n"); } static void mutex_init_wrap(mutex_t *mu) { muattr_t muattr; muattr_setprotocol(&muattr, PRIO_INHERIT); mutex_init(mu, &muattr); } void (*tasks[])() = { task_high, task_med, task_low }; int main(void) { printf("start\n"); int state; struct sched_param param; pthread_attr_t attr; pthread_t ids[N_THREADS]; pthread_attr_init(&attr); pthread_attr_setschedpolicy(&attr, SCHED_FIFO); pthread_attr_setinheritsched (&attr, PTHREAD_EXPLICIT_SCHED); mutex_init_wrap(&lock); for (int i = N_THREADS - 1; i >= 0; --i) { printf("%d\n", i); param.sched_priority = (N_THREADS - i) * 10; pthread_attr_setschedparam(&attr, &param); state = pthread_create(&ids[i], &attr, (void *)tasks[i], (void *)NULL); if (state) printf("thread %d create failed. Error: %d\n", i, state); } for (int i = 0; i < N_THREADS; ++i) { pthread_join(ids[i], NULL); } return EXIT_SUCCESS; } ``` * Building a priority inversion scenario: * Let L be Low priority process, M be medium priority process and H be high priority process. * The scenerio will be as follow: 1. L get mutex A. 2. M wakes (created) and preempts L. 3. H wakse (created) and preempts M. 4. H tries to aquire A. 5. H fails to aquire A, and goes to sleep or is blocked by CPU scheduler. 6. M wakes since the highest priority process in ready queue is M. 7. M finished, L wakes until A is released. 8. H wakes since it can now aquire A. 9. H fininshed. 10. L finished. ### test result ``` Running test_linux ... start 2 low 1 med 0 high (before lock) low goes on // inherit the high priority to exit CS first end of low high end of high end of med [OK] ```