Try   HackMD

2022q1 Homework6 (quiz6)

contributed by < chengingx >

tags: linux2022

測驗 1

UNIX 作業系統 fork/exec 系統呼叫的前世今生〉提到 clone 系統呼叫,搭配閱讀〈Implementing a Thread Library on Linux〉,以下嘗試撰寫一套使用者層級的執行緒函式庫,程式碼可見: readers.c

預期執行輸出: (順序可能會變異)

Reader process in
Reader process out
Reader process in
Reader process out
Writer process in
Reader process in
Writers process out
Reader process out
Writer process in
Writers process out
Writer process in
Writers process out
Reader process in
Reader process in
Reader process out
Reader process out
Writer process in
Writers process out
Writer process in
Writers process out

觀察 main()

觀察 main(),可以看見有

  • 2 個 mutex lock : lock, rwlock
  • 1 個 spin lock : print_lock
  • 2 個 atomic : n_readers_in, n_writers_in
  • 10 個 thread : readers[5], writers[5]
int main()
{
    mutex_init(&lock);
    mutex_init(&rwlock);
    spin_init(&print_lock);

    atomic_init(&n_readers_in, 0);
    atomic_init(&n_writers_in, 0);

    thread_t readers[5], writers[5];
    for (int i = 0; i < 5; i++) {
        thread_create(&readers[i], f1, NULL);
        thread_create(&writers[i], f2, NULL);
    }

    for (int i = 0; i < 5; i++) {
        thread_join(writers[i], NULL);
        thread_join(readers[i], NULL);
    }

    return 0;
}
  • lock, rwlock 為名 mutex_t 的物件
  • print_lock 為名 spin_t 的物件
typedef struct {
    volatile int lock;
    unsigned int locker;
} spin_t;

typedef struct {
    volatile int lock;
    unsigned int locker;
} mutex_t;

static mutex_t lock, rwlock;
static spin_t print_lock;

lockvolatile,其中 volatile 會抑制編譯器最佳化,要求編譯器每次使用該變數時,都要從記憶體地址中讀出最新內容,但不能保證 CPU 不會遭遇重排

接著觀察 mutex_initspin_init 怎麼使用 mutex_t 物件與 spin_t 物件

static inline int mutex_init(mutex_t *m)
{
    volatile int *lck = &(m->lock);
    int out;
    asm("movl $0x0,(%1);" : "=r"(out) : "r"(lck));
    m->locker = 0;
    return 0;
}

static inline int spin_init(spin_t *l)
{
    volatile int out;
    volatile int *lck = &(l->lock);
    asm("movl $0x0,(%1);" : "=r"(out) : "r"(lck));
    l->locker = 0;
    return 0;
}

volatile int out 這個 output operand 用來 ?

GCC-Inline-Assembly-HOWTO 提到 asm 的閱讀方式

asm ( assembler template 
   : output operands                  /* optional */
   : input operands                   /* optional */
   : list of clobbered registers      /* optional */
   );
舉個例子
int a=10, b;
asm ("movl %1, %%eax; 
      movl %%eax, %0;"
      :"=r"(b)        /* output */
      :"r"(a)         /* input */
      :"%eax"         /* clobbered register */
      );   
  1. "b" is the output operand, referred to by %0 and "a" is the input operand, referred to by %1.
  2. "r" is a constraint on the operands. "r" says to GCC to use any register for storing the operands. output operand constraint should have a constraint modifier "=". And this modifier says that it is the output operand and is write-only.
  3. There are two %’s prefixed to the register name. This helps GCC to distinguish between the operands and registers. operands have a single % as prefix.
  4. The clobbered register %eax after the third colon tells GCC that the value of %eax is to be modified inside "asm", so GCC won’t use this register to store any other value.

The Reader Writer Problem

接著觀察 f1, f2,根據 Linux kernel synchronization,提到

  1. 當 reader 正在讀取時,同時其它 reader 也要能讀取,而此時沒有 writer
  2. writer 需要 mutual exclusion
static int n_readers = 0, n_readers_in = 0, n_writers_in = 0;
  • n_readers 用於紀錄現在有多少 reader
  • n_readers_in 有多少 reader 進行讀取動作
  • n_writers_in 有多少 writer 進行寫入動作
static void f1(void)
{
    mutex_acquire(&lock);
    if (++n_readers == 1)
        mutex_acquire(&rwlock);
    mutex_release(&lock);

    safe_printf(&print_lock, "Reader process in\n");
    atomic_fetch_add(&n_readers_in, 1);
	
    // perform reading here
	
    mutex_acquire(&lock);
    if (--n_readers == 0)
        mutex_release(&rwlock);
    mutex_release(&lock);

    atomic_fetch_sub(&n_readers_in, 1);
    safe_printf(&print_lock, "Reader process out\n");
}

第一個 reader 需要 acquire rwlock 才能進行讀取,也就是說只要第一個 reader 拿到 rwlock 後,其它 readers 就能進行讀取

static void f2(void)
{
    mutex_acquire(&rwlock);
    atomic_fetch_add(&n_writers_in, 1);
    safe_printf(&print_lock, "Writer process in\n");
	
    // perform writing here 
	
    mutex_release(&rwlock);
    atomic_fetch_sub(&n_writers_in, 1);
    safe_printf(&print_lock, "Writers process out\n");
}

writer 需要 rwlock 才可以進行寫入,也就是沒有其它 reader 在讀取或是其它 writer 在寫入才能拿到 rwlock

TODO

  • What if we have a constant stream of readers and a waiting writer ?
    The writer will starve
  • We may want to prioritize writers over readers
    For instance, when readers are polling for the write
    How to do this ? Use Seqlock ?

TCB block

TCB block 包含

typedef struct {
    void (*f)(void *);
    void *arg;
    void *stack;
} funcargs_t;

typedef struct __node {
    unsigned long int tid, tid_copy;
    void *ret_val;
    struct __node *next;
    funcargs_t *fa;
} node_t;

Create threads

int thread_create(thread_t *t, void *routine, void *arg)
{
    spin_acquire(&global_lock);
    static bool init_state = false;
    if (!t || !routine) {
        spin_release(&global_lock);
        return EINVAL;
    }
    if (!init_state) {
        init_state = true;
        init();
    }
    ...

首先 acquire glock_lock,當 troutine 為空指標 release glock_lock,回傳錯誤代碼 EINVAL 用於表示 invalid argument,接著進行初始化 init()

關於 init()
static void init()
{
    spin_init(&global_lock);
    INIT_SIGNALS();
    list_init(&tid_list);	// initialize singly-linked list
    node_t *node = list_insert(&tid_list, getpid());
    node->tid_copy = node->tid;
    node->fa = NULL;
    atexit(cleanup);
}

依據 TGID 透過 list_insert 加入到 linked list 中,從 Demystifying the Linux CPU Scheduler 1.3.1 可以知道 getpid()gettid() 差異

  • getpid() : get the TGID (the group leader PID, identifying the whole process)
  • gettid() : get the PID (which identifies a single thread, not a group)
    node_t *node = list_insert(&tid_list, 0);
    if (!node) {
        printf("Thread address not found\n");
        spin_release(&global_lock);
        return -1;
    }

    funcargs_t *fa = malloc(sizeof(funcargs_t));
    if (!fa) {
        printf("Malloc failed\n");
        spin_release(&global_lock);
        return -1;
    }

    fa->f = routine;
    fa->arg = arg;

    // #define STACK_SZ 65536, #define GUARD_SZ getpagesize()
    void *thread_stack = alloc_stack(STACK_SZ, GUARD_SZ);
    if (!thread_stack) {
        perror("thread create");
        spin_release(&global_lock);
        free(fa);
        return errno;
    }
    fa->stack = thread_stack;
關於 alloc_stack()
static void *alloc_stack(size_t size, size_t guard)
{
    /* Align the memory to a 64 bit compatible page size and associate a guard
     * area for the stack.
     */
    void *stack = mmap(NULL, size + guard, PROT_READ | PROT_WRITE,
                       MAP_PRIVATE | MAP_ANONYMOUS | MAP_STACK, -1, 0);
    if (stack == MAP_FAILED) {
        perror("Stack Allocation");
        return NULL;
    }

    if (mprotect(stack, guard, PROT_NONE)) {
        munmap(stack, size + guard);
        perror("Stack Allocation");
        return NULL;
    }
    return stack;
}

mmap 分配一塊大小為 STACK_SZ + GUARD_SZ 的空間

    thread_t tid = clone(wrap, (char *) thread_stack + STACK_SZ + GUARD_SZ,
                         CLONE_FLAGS, fa, &(EXP1), NULL, &(EXP2));
    node->tid_copy = tid;
    node->fa = fa;

終於到了重頭戲,以下為 clone 的 function prototype,這是用來創建一個 ("child") process

int clone(int (*fn)(void *), void *stack, int flags, void *arg, 
          ... 
          /* pid_t *parent_tid, * void *tls, * pid_t *child_tid */)
  • stack : 用於表示 child process 使用 stack 的位址,因為 Linux 的 stack 是由高位址到低位址長,所以指向高位址 thread_stack + STACK_SZ + GUARD_SZ
  • flags : CLONE_VM | CLONE_FS | CLONE_FILES | CLONE_SIGHAND | CLONE_THREAD | CLONE_SYSVSEM | CLONE_PARENT_SETTID | CLONE_CHILD_CLEARTID
  • parent_tid Where to store child TID, in parent's memory
  • child_tid : Where to store child TID, in child's memory
解釋 flags
  • CLONE_VM : the calling process and the child process run in the same memory space
  • CLONE_FS : the caller and the child process share the same filesystem information
  • CLONE_FILES : the calling process and the child process share the same file descriptor table
  • CLONE_SIGHAND : the calling process and the child process share the same table of signal handlers
  • CLONE_THREAD : the child is placed in the same thread group as the calling process
  • CLONE_SYSVSEM : then the child and the calling process share a single list of System V semaphore adjustment (semadj) values (see semop(2)).
  • CLONE_PARENT_SETTID : Store the child thread ID at the location pointed to by parent_tid (clone()) or cl_args.parent_tid (clone3()) in the parent's memory. The store operation completes before the clone call returns control to user space
  • CLONE_CHILD_CLEARTID : Clear (zero) the child thread ID at the location pointed to by child_tid (clone()) or cl_args.child_tid (clone3()) in child memory when the child exits, and do a wakeup on the futex at that address
  • EXP1pid_t *parent_tidEXP1 = node->tid
  • EXP2pid_t *child_tidEXP2 = node->tid

根據 第 4, 5, 6 週課堂問答簡記 提到
在呼叫 clone() 時給的 flags 中包含 CLONE_PARENT_SETTID 與 CLONE_CHILD_CLEARTID。

  • 當 Flags 中包含 CLONE_PARENT_SETTID 時:
    在建立執行緒時,會將 clone() 參數中的 parent_tid 的值修改成新建立的執行緒的 Thread ID
  • 當 Flags 中包含 CLONE_CHILD_CLEARTID 時:
    當建立的執行緒結束時,會將 clone() 時傳送的參數 child_tid 的值修改為 0

因此若不是傳送建立執行緒時新增的 node * 物件 insertedNode 中的 tid 的話,就無法在 thread_kill() 或是 killAllThreads() 函式中透過檢查 tid 來確認執行緒是否已經結束

static int kill_all_threads(list_t *ll, int signum)
{
    pid_t pid = getpid(), delpid[100];
    int counter = 0;
    for (node_t *tmp = ll->head; tmp; tmp = tmp->next) {
        if (tmp->tid == gettid()) {
            tmp = tmp->next;
            continue;
        }

        printf("Killed thread %lu\n", tmp->tid);
        int ret = syscall(TGKILL, pid, tmp->tid, signum);
        ...

有個問題,為何使用編譯器最佳化 (-O2) 會 segmentation fault ?

Mutex 相關函式

static __attribute__((noinline)) int mutex_acquire(mutex_t *m)
{
    volatile int out;
    volatile int *lck = &(m->lock);
    asm("mutexloop:"
        "mov $1, %%eax;"
        "xchg %%al, (%%rdi);"
        "test %%al,%%al;"
        "je end"
        : "=r"(out)
        : "r"(lck));
    syscall(SYS_futex, m, FUTEX_WAIT, 1, NULL, NULL, 0);
    asm("jmp mutexloop");
    asm("end:");
    return 0;
}

__attribute__((noinline))

告訴編譯器,編譯時,對指定函數採取 noinlinealways_inline,更多 attribute 在 Declaring Attributes of Functions

static  inline __attribute__((noinline)) int func();
static  inline __attribute__((always_inline)) int func();

asm

參考 Synchronization #2 Dave Eckhardt

  1. mov $1, %%eax 看作綠色的圓圈
  2. xchg %%al, (%%rdi)rdi 為藍色的方框,準備用 0 跟綠色圓圈交換,al 是標註 1 的綠色圓圈
  3. 搶到的人就標註為 1,al 就是標註 0 的紅色圓圈
  4. test %%al,%%al,透過 bitwise AND 得到 0,也就是 ZF 為 1,接著 je end
  5. 對於沒搶到的人而言,al 仍然是 1,test %%al,%%al,透過 bitwise AND 得到 1, ZF 為 0,jmp mutexloop 繼續 loop

futex

syscall(SYS_futex, m, FUTEX_WAIT, 1, NULL, NULL, 0);

節錄自 man futex

  • Futex (fast user-space locking)
#include <linux/futex.h>      /* Definition of FUTEX_* constants */
#include <sys/syscall.h>      /* Definition of SYS_* constants */
#include <unistd.h>
long syscall(SYS_futex, uint32_t *uaddr, int futex_op, uint32_t val,
             const struct timespec *timeout,   /* or: uint32_t val2 */
             uint32_t *uaddr2, uint32_t val3);
  • FUTEX_WAIT
    This operation tests that the value at the futex word pointed to by the address uaddr still contains the expected value val, and if so, then sleeps waiting for a FUTEX_WAKE operation on the futex word.
  • FUTEX_WAKE
    This operation wakes at most val of the waiters that are waiting (e.g., inside FUTEX_WAIT) on the futex word at the address uaddr. Most commonly, val is specified as either 1 (wake up a single waiter) or INT_MAX (wake up all waiters).

Spinlock 相關函式

static inline int spin_acquire(spin_t *l)
{
    int out;
    volatile int *lck = &(l->lock);
    asm("whileloop:"
        "xchg %%al, (%1);"
        "test %%al,%%al;"	// set ZF to 1 if al == 0
        "jne whileloop;"	// jump if ZF == 0
        : "=r"(out)
        : "r"(lck));
    return 0;
}

TEST (x86 instruction) explained that the SF is set to the most significant bit of the result of the AND. If the result is 0, the ZF is set to 1, otherwise set to 0.

al is the least significant byte of eax register, which is typically used to return values from function calls

  • xchg : atomic exchange
  • test : 做 bitwise AND,當 lock 為 1 時,ZF 為 0 表示有人在使用 lock
  • jne : jump if not equal,ZF 為 0 繼續 loop

TODO

  • 並行程式的潛在問題 (二)
    執行緒是否能獲取 Spinlock 與投入順序並沒有太大的關係,而是取決於哪一個處理器核心最快查覺到 spinlock 狀態的變化,更精確的說法是,哪個處理器核心的 Cache 更快與主記憶體內容同步執行緒是否能獲取
  • Linux kernel synchronization

    Why 2 loops ?
    • If many CPUs are waiting on this lock, the cache line will bounce between CPUs that are polling its value
    • The inner loop read-shares this cache line, allowing all polling in parallel

設計和實作的缺失

If we have a constant stream of readers and a waiting writer, the writer will starve.

原先版本

$ ./readers 
Reader process in
Reader process out
Reader process in
Reader process out
Reader process in
Reader process out
...
Reader process in
Reader process out
Reader process in
Reader process out
Reader process in
Reader process out
Writer process in
Writers process out

改寫版本

$ ./readers 
...
Sequence = 0, Reader process in
Sequence = 0, Reader process in
################ Writer process in ################
################ Writers process out ################
Sequence = 2, Reader process in
Sequence = 2, Reader process in
Reader process out
Reader process out
...

Reader 與 Writer 函式

static void f1(void *arg) {
    unsigned int seq;
    do {
        seq = read_seqbegin(&seq_lock);
        safe_printf(&print_lock, "Sequence = %d, Reader process in\n", seq);
    } while (read_seqretry(&seq_lock, seq));

    safe_printf(&print_lock, "Reader process out\n");
}

static void f2(void *arg) {
    write_seqlock(&seq_lock);
    safe_printf(&print_lock, "################ Writer process in ################\n");
    write_sequnlock(&seq_lock);
    safe_printf(&print_lock, "################ Writers process out ################\n");
}

sequence 相當於 version,透過 read_seqbegin 得到 sequence,倘若 seq_lock->sequence 與這個 sequence 的值不相同,代表有 writer 正在寫入新的版本,因此 reader 需要再讀一次得到最新版本才能離開

void write_seqlock(spin_t *sl) {
    spin_acquire(&sl->lock);
    sl->sequence++;
}

void write_sequnlock(spin_t *sl) {
    sl->sequence++;
    spin_release(&sl->lock);
}

unsigned int read_seqbegin(const spin_t *sl) {
    unsigned int seq;
    do {
    seq = sl->sequence;
    } while (seq & 1 || seq != sl->sequence);

    return seq;
}

int read_seqretry(const spin_t *sl, unsigned start) {
    return (sl->sequence != start);
}

read_seqbegin 可以看見,當 sequence 為偶數時才能讀,奇數代表有 writer 正在寫入,當 writer 寫完 sequence 才變成偶數