# 2023 Homework2 (quiz1)
contributed by < `valley-ant` >
## 程式碼運作機制
程式碼的運作機制,可分為三大類:
1. 與共享機制無關的部分 (以下稱為lock-independent)
2. 與共享機制有關的部分 (以下稱為lock-dependent),主要focus在mutex/conditional variable等相關的設計。
4. memory reodering 機制
其中欲了解1,必須對2有一定程度的理解。要了解2又不能完全錯過3,因為部分lock機制在不了解reordering機制的情況下,會無法正確使用。這裡稍加定義一下何謂『正確』。最顯而易見的錯誤使用方式就是不經意地導致date race的情況,其他則是基於對這方面機制的理解不足導致的效能損失。因此在探討lock-independent part之前。我想先從memory reordering開始描述自己的理解。
### memory reordering mechanism
#### references
1. [並行程式設計](https://hackmd.io/@sysprog/concurrency-ordering)
2. 過程還參考了其他網站,留個洞 再補充。
#### Intention
降低諸如cache coherance等會使CPU需要等待的情況出現(比如load可以經由重排後使load's'可以一次進行)。
#### Description
* Ordering指的是指令在實際執行時的順序,該順序有可能被compiler調整(可透過memory barrier避免),也可能是CPU本身的超序執行(out-of-order execution),這則是透過memory order的weakness來決定可以重排的等級。下面將簡述一下weakness在C裡面的保證與意圖。
* Relax
* Only guarantees the atomiciy of "load" and "store" of an 32-bit (64-bit) integer.
* 這裡能保證的atomic理論上只有CPU instruction可以保證的load/store operation,不能保證在多執行緒下指令的先後順序打亂後的sychornize-with關係(即多thread間的happens-before)。再換句話說,執行緒A對mutex的取得或修改,經指令重排後以B的角度來看並不能保證是visible的。
* Release-Acquire ordering
* All load/store instructions "before" atomic store, cannot be executed "after" the atomic store.
* All load/store instructions "after" atomic load, cannot be executed "before" the atomic load.
### lock-dependent code
#### futex
* 本題的實作中,futex系列的程式碼裡面都會使用到系統呼叫(SYSCALL)。下面闡述一下各函數的作用:
```c
/* Atomically check if '*futex == value', and if so, go to sleep */
static inline void futex_wait(atomic int *futex, int value)
{
syscall(SYS_futex, futex, FUTEX_WAIT_PRIVATE, value, NULL);
}
/* Wake up 'limit' threads currently waiting on 'futex' */
static inline void futex_wake(atomic int *futex, int limit)
{
syscall(SYS_futex, futex, FUTEX_WAKE_PRIVATE, limit);
}
/* Wake up 'limit' waiters, and re-queue the rest onto a different futex */
static inline void futex_requeue(atomic int *futex,
int limit,
atomic int *other)
{
syscall(SYS_futex, futex, FUTEX_REQUEUE_PRIVATE, limit, INT_MAX, other);
}
```
* futex_wait: pass a pointer and an expected value. block this thread if *pointer equals to the expected value.
* futex_wake: wake "limit" amounts of threads in the futex waiter_list.
* futex_requeue: wake "limit" amounts of threads and requeue them to another futex.
* 這些函數會在thread等不到時sleep或是需要醒來時被呼叫到。
#### mutex
* 主要有三個操作:
```c
static inline void mutex_init(mutex_t *mutex)
{
atomic_init(&mutex->state, 0);
}
static bool mutex_trylock(mutex_t *mutex)
{
int state = load(&mutex->state, relaxed);
if (state & MUTEX_LOCKED)
return false;
state = fetch_or(&mutex->state, MUTEX_LOCKED, relaxed);
if (state & MUTEX_LOCKED)
return false;
thread_fence(&mutex->state, acquire);
return true;
}
static inline void mutex_lock(mutex_t *mutex)
{
#define MUTEX_SPINS 128
for (int i = 0; i < MUTEX_SPINS; ++i) {
// optimization: try lock first to avoid heavy
// operations like SYSCALL.
if (mutex_trylock(mutex))
return;
spin_hint();
}
int state = exchange(&mutex->state, MUTEX_LOCKED | MUTEX_SLEEPING, relaxed);
while (state & MUTEX_LOCKED) {
futex_wait(&mutex->state, MUTEX_LOCKED | MUTEX_SLEEPING);
state = exchange(&mutex->state, MUTEX_LOCKED | MUTEX_SLEEPING, relaxed);
}
thread_fence(&mutex->state, acquire);
}
static inline void mutex_unlock(mutex_t *mutex)
{
// clear lock status
int state = exchange(&mutex->state, 0, release);
// wake a process that is waiting the mutex.
// every waked process will mark the state to be LOCK and SLEEP.
if (state & MUTEX_SLEEPING)
futex_wake(&mutex->state, 1);
}
```
* mutex_trylock: 嘗試take mutex,失敗直接回傳false (代表沒有取得)。
* mutex_lock: 取得mutex並lock mutex,若無法取得會透過futex_wait來判斷有沒有需要sleep。這裡標注的MUTEX_LOCKED | MUTEX_SLEEPING,前者是用來標注mutex被鎖定了:後者是用來標示mutex wait list裡面有thread正在等。這裡會標注SLEEPING有個用意是避免exchange時把SLEEPING誤寫掉了。
* mutex_unlock: reset mutex->status並把處在mutex waiter_list中的thread叫醒。醒來的thread下一行會再mark LOCK and SLEEP。
#### conditional variable (cond)
* 主要有三個操作:
```c
static inline void cond_wait(cond_t *cond, mutex_t *mutex)
{
int seq = load(&cond->seq, relaxed);
// owner release the mutex. If any thread waked and signaled, it can take the mutex.
mutex_unlock(mutex);
#define COND_SPINS 128
for (int i = 0; i < COND_SPINS; ++i) {
// optimization for avoiding SYSCALL
if (load(&cond->seq, relaxed) != seq) {
mutex_lock(mutex);
return;
}
spin_hint();
}
// if no one signal, goto sleep.
futex_wait(&cond->seq, seq);
// waked up or be able to take the mutex. Lock it.
mutex_lock(mutex);
fetch_or(&mutex->state, MUTEX_SLEEPING, relaxed);
}
static inline void cond_signal(cond_t *cond, mutex_t *mutex)
{
fetch_add(&cond->seq, 1, relaxed);
futex_wake(&cond->seq, 1);
}
static inline void cond_broadcast(cond_t *cond, mutex_t *mutex)
{
fetch_add(&cond->seq, 1, relaxed);
futex_requeue(&cond->seq, 1, &mutex->state);
}
```
* cond_wait: 等待某個條件發生再往下執行。若沒發生則會透過futex_wait來判斷是否要睡。這隻函數先mutex_unlock是為了讓其他也使用到相同mutex的thread可以往下走。若沒有其他thread使用cond_signal or cond_broadcast,則會用futex_wait讓他sleep。再醒來後則會把mutex鎖起來並標記為sleeping,未來在unlock時可以被喚起。
* cond_signal: increment condition variable and wake a process.
* cond_broadcast: increment condition variable and requeue the wait queue using futex_requeue. cond_broadcast會透過futex_requeue將condition variable中的wait queue換到mutex中。這是因為被fetux_wait sleep的thread會在condition variable wait list中,而這些thread實際上在等待的是取得clock->tick mutex。因此把condition variable中的queue插入至mutex中,可確保接下來在mutex_unlock時能將這些排隊中的thread叫醒。
### lock independent
#### struct clock
* structure clock為共享的資源,主要提供三個函數:
```c
static void clock_init(struct clock *clock)
{
mutex_init(&clock->mutex);
cond_init(&clock->cond);
clock->ticks = 0;
}
static bool clock_wait(struct clock *clock, int ticks)
{
mutex_lock(&clock->mutex);
while (clock->ticks >= 0 && clock->ticks < ticks)
cond_wait(&clock->cond, &clock->mutex);
bool ret = clock->ticks >= ticks;
mutex_unlock(&clock->mutex);
return ret;
}
static void clock_tick(struct clock *clock)
{
mutex_lock(&clock->mutex);
if (clock->ticks >= 0)
++clock->ticks;
mutex_unlock(&clock->mutex);
cond_broadcast(&clock->cond, &clock->mutex);
}
static void clock_stop(struct clock *clock)
{
mutex_lock(&clock->mutex);
clock->ticks = -1;
mutex_unlock(&clock->mutex);
cond_broadcast(&clock->cond, &clock->mutex);
}
```
* clock_wait: 等待共享的clock直到比ticks小為止。告知caller是否還需要等待。
* clock_tick: increment clock->tick並叫醒thread。
* clock_stop: stop clock (-1 will indicate a stop condition in clock_wait)並叫醒thread。
#### struct node
* 每個NODE有自己的parent,並有自己的status。每個thread需要等到parent node is ready才可以往下跑。
* 主要可用的函數有2個:
```c
static void node_wait(struct node *node)
{
mutex_lock(&node->mutex);
while (!node->ready)
cond_wait(&node->cond, &node->mutex);
node->ready = false;
mutex_unlock(&node->mutex);
}
static void node_signal(struct node *node)
{
mutex_lock(&node->mutex);
node->ready = true;
mutex_unlock(&node->mutex);
cond_signal(&node->cond, &node->mutex);
}
```
* node_wait: 藉由帶入的節點來看需不需要等待,在本題的應用是判斷親代節點是否ready。如尚未ready則wait(從使用者角度來看,不需要知道thread是被sleep or busy-wait)。
* node_signal: 把節點設為ready,並signal等待中的節點。這裡使用cond_signal是因為會等待node->mutex的節點僅有自己的子節點。
#### thread_func
* thread_func uses clock_wait to know if next loop is required.
* Every node must wait its parent to be ready to execute further.
* The clock_wait will fail when main_thread has wait 1 << N_NODES ticks.
## Qsort- futex vs pthread
[code] https://github.com/valley-ant/linux_kernel_2023
* USE_LINUX (test_linux): futex version
* USE_PTHREADS (test_pthread): pthread version
| ITEM | TIME (K) |
| -------- | -------- |
| USE_LINUX | 0.012~0.044 |
| USE_PTHREADS| 0.024~0.056|
* 以1000000資料長度進行10次實驗,執行下來的結果使用PTHREAD會在kernel space花較多時間。使其平均執行時間也些微的較長。
* 更多樣本數生成更多的執行序(比較一下更多執行緒下的行為)。
## Priority inheritance mutex
### Reference
[RinHizakura](https://hackmd.io/@RinHizakura/linux2023-summer-hw2)
[futex man page](https://man7.org/linux/man-pages/man2/futex.2.html)
[並行程式設計:建立相容於 POSIX Thread 的實作](https://hackmd.io/@sysprog/concurrency-thread-package)
### Implementation (On going...)
[src](https://github.com/valley-ant/lk_summer_2023)
```c
/* futex(2): use atomic compare-and-change.
* memory model failed for strong?
*/
#pragma once
#if USE_PTHREADS
#include <pthread.h>
#define mutex_t pthread_mutex_t
#define muattr_t pthread_mutexattr_t
#define mutex_init(m, t) pthread_mutex_init(m, t)
#define MUTEX_INITIALIZER PTHREAD_MUTEX_INITIALIZER
#define mutex_trylock(m) (!pthread_mutex_trylock(m))
#define mutex_lock pthread_mutex_lock
#define mutex_unlock pthread_mutex_unlock
// for pthread compatible
#define muattr_setprotocol pthread_mutexattr_setprotocol
#define PRIO_NONE PTHREAD_PRIO_NONE
#define PRIO_INHERIT PTHREAD_PRIO_INHERIT
#else
#include <stdbool.h>
#include <stdint.h>
#include "atomic.h"
#include "futex.h"
#include "spinlock.h"
typedef int mutype_t;
#define gettid() syscall(SYS_gettid)
/* futex(2): use atomic compare-and-change.
* memory model failed for strong?
*/
#define cmpxchg(obj, expect, desired) \
compare_exchange_strong(obj, expect, desired, relaxed, relaxed)
#define MUTEX_SPINS 128
#define FUTEX_IDLE_ID 0
enum {
PRIO_NONE = 0,
PRIO_INHERIT
};
typedef struct {
int protocol;
} muattr_t;
typedef struct {
atomic int state;
mutype_t protocol;
pid_t owner;
} mutex_t;
enum {
MUTEX_LOCKED = 1 << 0,
MUTEX_SLEEPING = 1 << 1,
};
#define MUTEX_INITIALIZER \
{ \
.state = 0, .protocol = 0, \
}
static inline void muattr_setprotocol(muattr_t *muattr, int protocol)
{
muattr->protocol = protocol;
}
static inline void mutex_init(mutex_t *mutex, const muattr_t *attr)
{
atomic_init(&mutex->state, 0);
mutex->protocol = attr->protocol;
}
static bool mutex_trylock_pi(mutex_t *mutex)
{
pid_t expect = FUTEX_IDLE_ID;
pid_t tid;
/* TODO: A non-syscall version to gettid like x86-64 */
tid = gettid();
/* 0: mutex is idle, exchange it and return true. */
/* if state==expect, then set state to tid. (see futex(2) man page */
if (cmpxchg(&mutex->state, &expect, tid))
return true;
/* lock is taken, return false.. */
thread_fence(&mutex->state, acquire);
return false;
}
static inline void mutex_lock_pi(mutex_t *mutex)
{
pid_t self = NULL;
for (int i = 0; i < MUTEX_SPINS; ++i) {
// Try to take
if (mutex_trylock_pi(mutex))
return;
spin_hint();
}
self = gettid();
/* can not take the lock... */
/* FUTEX_LOCK_PI to sleep */
futex_lock_pi(&mutex->state,0 , NULL);
/* lock aquired set owner to self. */
mutex->owner = self;
thread_fence(&mutex->state, acquire);
}
static inline void mutex_unlock_pi(mutex_t *mutex)
{
pid_t tid = gettid();
/* If mutex is aquired, the tid will be the owner thread */
if (cmpxchg(&mutex->state, &tid, FUTEX_IDLE_ID))
return;
futex_unlock_pi(&mutex->state);
}
static bool mutex_trylock_default(mutex_t *mutex)
{
int state = load(&mutex->state, relaxed);
if (state & MUTEX_LOCKED)
return false;
state = fetch_or(&mutex->state, MUTEX_LOCKED, relaxed);
if (state & MUTEX_LOCKED)
return false;
thread_fence(&mutex->state, acquire);
return true;
}
static inline void mutex_lock_default(mutex_t *mutex)
{
for (int i = 0; i < MUTEX_SPINS; ++i) {
if (mutex_trylock_default(mutex))
return;
spin_hint();
}
int state = exchange(&mutex->state, MUTEX_LOCKED | MUTEX_SLEEPING, relaxed);
while (state & MUTEX_LOCKED) {
futex_wait(&mutex->state, MUTEX_LOCKED | MUTEX_SLEEPING);
state = exchange(&mutex->state, MUTEX_LOCKED | MUTEX_SLEEPING, relaxed);
}
thread_fence(&mutex->state, acquire);
}
static inline void mutex_unlock_default(mutex_t *mutex)
{
int state = exchange(&mutex->state, 0, release);
if (state & MUTEX_SLEEPING)
futex_wake(&mutex->state, 1);
}
static bool mutex_trylock(mutex_t *mutex)
{
return true;
}
static inline void mutex_lock(mutex_t *mutex)
{
#if 1
switch (mutex->protocol) {
case PRIO_NONE:
mutex_lock_default(mutex);
break;
case PRIO_INHERIT:
mutex_lock_pi(mutex);
break;
default:
break;
}
#endif
}
static inline void mutex_unlock(mutex_t *mutex)
{
#if 1
switch (mutex->protocol) {
case PRIO_NONE:
mutex_unlock_default(mutex);
break;
case PRIO_INHERIT:
mutex_unlock_pi(mutex);
break;
default:
break;
}
#endif
}
#endif
// TODO: futex_requeue_pi
```
### Testing FIFO encounter RTPRIO LIMIT issue.sydo
```c
#include <pthread.h>
#include <stdbool.h>
#include <stdlib.h>
#include "cond.h"
#include "futex.h"
#include "mutex.h"
#define N_THREADS 3
mutex_t lock;
void task_high() {
printf("high (before lock)\n");
mutex_lock(&lock);
printf("high\n");
mutex_unlock(&lock);
printf("end of high\n");
}
void task_med() {
printf("med\n");
sleep(2);
printf("end of med\n");
}
void task_low() {
printf("low\n");
mutex_lock(&lock);
sleep(1);
printf("low goes on\n");
mutex_unlock(&lock);
printf("end of low\n");
}
static void mutex_init_wrap(mutex_t *mu) {
muattr_t muattr;
muattr_setprotocol(&muattr, PRIO_INHERIT);
mutex_init(mu, &muattr);
}
void (*tasks[])() = {
task_high,
task_med,
task_low
};
int main(void)
{
printf("start\n");
int state;
struct sched_param param;
pthread_attr_t attr;
pthread_t ids[N_THREADS];
pthread_attr_init(&attr);
pthread_attr_setschedpolicy(&attr, SCHED_FIFO);
pthread_attr_setinheritsched (&attr, PTHREAD_EXPLICIT_SCHED);
mutex_init_wrap(&lock);
for (int i = N_THREADS - 1; i >= 0; --i) {
printf("%d\n", i);
param.sched_priority = (N_THREADS - i) * 10;
pthread_attr_setschedparam(&attr, ¶m);
state = pthread_create(&ids[i], &attr, (void *)tasks[i], (void *)NULL);
if (state)
printf("thread %d create failed. Error: %d\n", i, state);
}
for (int i = 0; i < N_THREADS; ++i) {
pthread_join(ids[i], NULL);
}
return EXIT_SUCCESS;
}
```
* Building a priority inversion scenario:
* Let L be Low priority process, M be medium priority process and H be high priority process.
* The scenerio will be as follow:
1. L get mutex A.
2. M wakes (created) and preempts L.
3. H wakse (created) and preempts M.
4. H tries to aquire A.
5. H fails to aquire A, and goes to sleep or is blocked by CPU scheduler.
6. M wakes since the highest priority process in ready queue is M.
7. M finished, L wakes until A is released.
8. H wakes since it can now aquire A.
9. H fininshed.
10. L finished.
### test result
```
Running test_linux ... start
2
low
1
med
0
high (before lock)
low goes on // inherit the high priority to exit CS first
end of low
high
end of high
end of med
[OK]
```