# [2021q1](http://wiki.csie.ncku.edu.tw/linux/schedule) 第 9 週測驗題 ###### tags: `linux2021` :::info 目的: 檢驗學員對 ==[Linux 核心設計: 不僅是個執行單元的 Process](https://hackmd.io/@sysprog/linux-process)== 和 ==[並行程式設計](https://hackmd.io/@sysprog/concurrency)== 的認知 ::: ==[作答表單](https://docs.google.com/forms/d/e/1FAIpQLScFl109nDCoatMgaFsc_eQR2yCfPPSUZKtoKst8GPC7JKZztg/viewform)== ### 測驗 `1` 延伸[第 8 週測驗題](https://hackmd.io/@sysprog/linux2021-quiz8)提到的 single-producer/multiple-consumer (SPMC) queue,以下實作提出 multiple-producer/multiple-consumer (MPMC),使用 GCC [Built-in Functions for Memory Model Aware Atomic Operations](https://gcc.gnu.org/onlinedocs/gcc/_005f_005fatomic-Builtins.html) 和 [futex](https://en.wikipedia.org/wiki/Futex): ![](https://i.imgur.com/CwpYmcI.png) > [futex](https://en.wikipedia.org/wiki/Futex) 全名為 fast user-space mutex locking,是 Linux 核心一種機制,主要提供使用者層級中有效與多執行緒的同步方式,並降低 Linux 核心的介入。可參考 [Basics of Futexes](https://eli.thegreenplace.net/2018/basics-of-futexes/)。futex 主要有 wait 和 wake 等二個操作,其定義如下: > ```cpp > /* uaddr 指向一個地址,val 代表這個地址期待的值, 當 *uaddr == val 時,才會進行 wait */ > int futex_wait(int *uaddr, int val); > > /* 喚醒 n 個 uaddr 指向的 lock 變數,對應到等待中的行程或執行緒 */ > int futex_wake(int *uaddr, int n); > ``` ```cpp #ifndef _GNU_SOURCE #define _GNU_SOURCE #endif #include <malloc.h> #include <pthread.h> #include <stdbool.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #define PAGE_SIZE 4096 /* FIXME: avoid hard-coded */ #define CACHE_LINE_SIZE 64 /* FIXME: make it configurable */ #define CACHE_ALIGNED __attribute__((aligned(CACHE_LINE_SIZE))) #define DOUBLE_CACHE_ALIGNED __attribute__((aligned(2 * CACHE_LINE_SIZE))) static inline void *align_malloc(size_t align, size_t size) { void *ptr; int ret = posix_memalign(&ptr, align, size); if (ret != 0) { perror(strerror(ret)); abort(); } return ptr; } #define N (1 << 12) /* node size */ #define NBITS (N - 1) typedef struct __node { struct __node *volatile next DOUBLE_CACHE_ALIGNED; long id DOUBLE_CACHE_ALIGNED; void *volatile cells[N] DOUBLE_CACHE_ALIGNED; } node_t; #define HANDLES 128 /* support 127 threads */ typedef struct { node_t *spare; node_t *volatile put_node CACHE_ALIGNED; node_t *volatile pop_node CACHE_ALIGNED; } handle_t; typedef struct { node_t *init_node; volatile long init_id DOUBLE_CACHE_ALIGNED; volatile long put_index DOUBLE_CACHE_ALIGNED; volatile long pop_index DOUBLE_CACHE_ALIGNED; handle_t *volatile enq_handles[HANDLES]; handle_t *volatile deq_handles[HANDLES]; int threshold; pthread_barrier_t enq_barrier, deq_barrier; } mpmc_t; static inline node_t *mpmc_new_node() { node_t *n = align_malloc(PAGE_SIZE, sizeof(node_t)); memset(n, 0, sizeof(node_t)); return n; } enum queue_ops { ENQ = 1 << 1, DEQ = 1 << 0, }; /* regiseter the enqueuers first, dequeuers second. */ void mpmc_queue_register(mpmc_t *q, handle_t *th, int flag) { th->spare = mpmc_new_node(); th->put_node = th->pop_node = q->init_node; if (flag & ENQ) { handle_t **tail = q->enq_handles; for (int i = 0;; ++i) { handle_t *init = NULL; if (!tail[i] && __atomic_compare_exchange_n(tail + i, &init, th, 0, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) break; } /* wait for the other enqueuers to register */ pthread_barrier_wait(&q->enq_barrier); } if (flag & DEQ) { handle_t **tail = q->deq_handles; for (int i = 0;; ++i) { handle_t *init = NULL; if (!tail[i] && __atomic_compare_exchange_n(tail + i, &init, th, 0, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) break; } /* wait for the other dequeuers to register */ pthread_barrier_wait(&q->deq_barrier); } } void mpmc_init_queue(mpmc_t *q, int enqs, int deqs, int threshold) { q->init_node = mpmc_new_node(); q->threshold = threshold; q->put_index = q->pop_index = q->init_id = 0; pthread_barrier_init(&q->enq_barrier, NULL, enqs); /* enqueuers */ pthread_barrier_init(&q->deq_barrier, NULL, deqs); /* dequeuers */ } /* locate the offset on the nodes and nodes needed. */ static void *mpmc_find_cell(node_t *volatile *ptr, long i, handle_t *th) { node_t *curr = *ptr; /* get current node */ /* j is thread's local node'id (put node or pop node), (i / N) is the cell * needed node'id. and we shoud take it, By filling the nodes between the j * and (i / N) through 'next' field */ for (long j = curr->id; j < i / N; ++j) { node_t *next = curr->next; if (!next) { /* start filling */ /* use thread's standby node */ node_t *tmp = th->spare; if (!tmp) { tmp = mpmc_new_node(); th->spare = tmp; } tmp->id = NNN; /* next node's id */ /* if true, then use this thread's node, else then thread has have * done this. */ /* __atomic_compare_exchange_n(ptr, cmp, val, 0, __ATOMIC_RELEASE, * __ATOMIC_ACQUIRE) is an atomic compare-and-swap that ensures * release semantic when succeed or acquire semantic when failed. */ if (__atomic_compare_exchange_n(&curr->next, &next, tmp, 0, __ATOMIC_RELEASE, __ATOMIC_ACQUIRE)) { next = tmp; th->spare = NULL; /* now thread there is no standby node */ } } curr = next; /* take the next node */ } *ptr = curr; /* update our node to the present node */ /* Orders processor execution, so other thread can see the '*ptr = curr' */ __asm("sfence" ::: "cc", "memory"); /* FIXME: x86-only */ /* now we get the needed cell, its node is curr */ return &curr->cells[CCC]; } #include <linux/futex.h> #include <sys/syscall.h> #include <unistd.h> #ifndef SYS_futex #define SYS_futex __NR_futex #endif static inline int mpmc_futex_wake(void *addr, int val) { return syscall(SYS_futex, addr, FUTEX_WAKE, val, NULL, NULL, 0); } static inline int mpmc_futex_wait(void *addr, int val) { return syscall(SYS_futex, addr, FUTEX_WAIT, val, NULL, NULL, 0); } void mpmc_enqueue(mpmc_t *q, handle_t *th, void *v) { /* return the needed index */ void *volatile *c = mpmc_find_cell( &th->put_node, __atomic_fetch_add(&q->put_index, 1, __ATOMIC_SEQ_CST), th); /* __atomic_fetch_add(ptr, val) is an atomic fetch-and-add that also * ensures sequential consistency */ /* now c is the nedded cell */ void *cv; /* if XCHG (ATOMIC: Exchange Register/Memory with Register) return NULL, * so our value has put into the cell, just return. */ if ((cv = __atomic_exchange_n(c, v, __ATOMIC_ACQ_REL)) == NULL) return; /* else the couterpart pop thread has wait this cell, so we just change the * wating value and wake it */ *((int *) cv) = VVV; mpmc_futex_wake(cv, 1); } void *mpmc_dequeue(mpmc_t *q, handle_t *th) { void *cv; int futex_addr = 1; /* the needed pop_index */ long index = __atomic_fetch_add(&q->pop_index, DDD, __ATOMIC_SEQ_CST); /* locate the needed cell */ void *volatile *c = mpmc_find_cell(&th->pop_node, index, th); /* because the queue is a blocking queue, so we just use more spin. */ int times = (1 << 20); do { cv = *c; if (cv) goto over; __asm__("pause"); /* FIXME: x86-only */ } while (times-- > 0); /* XCHG, if return NULL so this cell is NULL, we just wait and observe the * futex_addr's value to 0. */ if ((cv = __atomic_exchange_n(c, &futex_addr, __ATOMIC_ACQ_REL)) == NULL) { /* call wait before compare futex_addr to prevent use-after-free of * futex_addr at mpmc_enqueue(call wake) */ do { mpmc_futex_wait(&futex_addr, 1); } while (futex_addr == 1); /* the couterpart put thread has change futex_addr's value to 0. and the * data has into cell(c). */ cv = *c; } over: /* if the index is the node's last cell: (NBITS == 4095), it Try to reclaim * the memory. so we just take the smallest ID node that is not * reclaimed(init_node), and At the same time, by traversing the local data * of other threads, we get a larger ID node(min_node). So it is safe to * recycle the memory [init_node, min_node). */ if ((index & NBITS) == NBITS) { /* __atomic_load_n(ptr, __ATOMIC_ACQUIRE) is a load with a following * acquire fence to ensure no following load and stores can start before * the current load completes. */ long init_index = __atomic_load_n(&q->init_id, __ATOMIC_ACQUIRE); /* __atomic_compare_exchange_n(ptr, cmp, val, 0, __ATOMIC_ACQUIRE, * __ATOMIC_RELAXED) is an atomic compare-and-swap that ensures acquire * semantic when succeed or relaxed semantic when failed. */ if ((th->pop_node->id - init_index) >= q->threshold && init_index >= 0 && __atomic_compare_exchange_n(&q->init_id, &init_index, -1, 0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) { node_t *init_node = q->init_node; th = q->deq_handles[0]; node_t *min_node = th->pop_node; int i; handle_t *next = q->deq_handles[i = 1]; while (next) { node_t *next_min = next->pop_node; if (next_min->id < min_node->id) min_node = next_min; if (min_node->id <= init_index) break; next = q->deq_handles[++i]; } next = q->enq_handles[i = 0]; while (next) { node_t *next_min = next->put_node; if (next_min->id < min_node->id) min_node = next_min; if (min_node->id <= init_index) break; next = q->enq_handles[++i]; } long new_id = min_node->id; if (new_id <= init_index) /* __atomic_store_n(ptr, val, __ATOMIC_RELEASE) is a store with * a preceding release fence to ensure all previous load and * stores completes before the current store is visiable. */ __atomic_store_n(&q->init_id, init_index, __ATOMIC_RELEASE); else { q->init_node = min_node; __atomic_store_n(&q->init_id, new_id, __ATOMIC_RELEASE); do { node_t *tmp = init_node->next; free(init_node); init_node = tmp; } while (init_node != min_node); } } } return cv; } #include <sys/time.h> static long COUNTS_PER_THREAD = 2500000; static int threshold = 8; static mpmc_t mpmc; static pthread_barrier_t prod_barrier, cons_barrier; static void *producer(void *index) { mpmc_t *q = &mpmc; handle_t *th = malloc(sizeof(handle_t)); memset(th, 0, sizeof(handle_t)); mpmc_queue_register(q, th, ENQ); for (;;) { pthread_barrier_wait(&prod_barrier); for (int i = 0; i < COUNTS_PER_THREAD; ++i) mpmc_enqueue(q, th, 1 + i + ((int) index) * COUNTS_PER_THREAD); pthread_barrier_wait(&prod_barrier); } return NULL; } #define THREAD_NUM 4 static bool *array; static void *consumer(void *index) { mpmc_t *q = &mpmc; handle_t *th = malloc(sizeof(handle_t)); memset(th, 0, sizeof(handle_t)); mpmc_queue_register(q, th, DEQ); for (;;) { pthread_barrier_wait(&cons_barrier); for (long i = 0; i < COUNTS_PER_THREAD; ++i) { int value; if (!(value = mpmc_dequeue(q, th))) return NULL; array[value] = true; } pthread_barrier_wait(&cons_barrier); } fflush(stdout); return NULL; } int main(int argc, char *argv[]) { pthread_barrier_init(&prod_barrier, NULL, THREAD_NUM + 1); pthread_barrier_init(&cons_barrier, NULL, THREAD_NUM + 1); if (argc >= 3) { COUNTS_PER_THREAD = atol(argv[1]); threshold = atoi(argv[2]); } printf("Amount: %ld\n", THREAD_NUM * COUNTS_PER_THREAD); fflush(stdout); array = malloc((1 + THREAD_NUM * COUNTS_PER_THREAD) * sizeof(bool)); memset(array, 0, (1 + THREAD_NUM * COUNTS_PER_THREAD) * sizeof(bool)); mpmc_init_queue(&mpmc, THREAD_NUM, THREAD_NUM, threshold); pthread_t pids[THREAD_NUM]; for (int i = 0; i < THREAD_NUM; ++i) { if (-1 == pthread_create(&pids[i], NULL, producer, i) || -1 == pthread_create(&pids[i], NULL, consumer, i)) { printf("error create thread\n"); exit(1); } } for (int i = 0; i < 8; i++) { printf("\n#%d\n", i); pthread_barrier_wait(&cons_barrier); sleep(1); struct timeval start, prod_end; gettimeofday(&start, NULL); pthread_barrier_wait(&prod_barrier); pthread_barrier_wait(&prod_barrier); pthread_barrier_wait(&cons_barrier); gettimeofday(&prod_end, NULL); bool verify = true; for (int j = 1; j <= THREAD_NUM * COUNTS_PER_THREAD; ++j) { if (!array[j]) { printf("Error: ints[%d]\n", j); verify = false; break; } } if (verify) printf("ints[1-%ld] have been verified through\n", THREAD_NUM * COUNTS_PER_THREAD); float cost_time = (prod_end.tv_sec - start.tv_sec) + (prod_end.tv_usec - start.tv_usec) / 1000000.0; printf("elapsed time: %f seconds\n", cost_time); printf("DONE #%d\n", i); fflush(stdout); memset(array, 0, (1 + THREAD_NUM * COUNTS_PER_THREAD) * sizeof(bool)); } return 0; } ``` 編譯方式: ```shell $ gcc -Wall -std=c11 -o mpmc mpmc.c -lpthread ``` 參考執行輸出如下: ``` Amount: 10000000 #0 ints[1-10000000] have been verified through elapsed time: 0.837920 seconds DONE #0 #1 ints[1-10000000] have been verified through elapsed time: 1.278960 seconds DONE #1 ... #7 ints[1-10000000] have been verified through elapsed time: 1.745488 seconds DONE #7 ``` ==作答區== VVV = ? NNN = ? DDD = ? CCC = ? :::success 延伸問題: 1. 解釋上述程式碼運作原理,指出實作上的限制 2. 研讀 [logger-thread](https://github.com/ledav-net/logger-thread),此為 MPSC 的應用,請說明原理,並分析其效能改進的機會 3. 研讀 Linux 核心 [Concurrency Managed Workqueue](https://www.kernel.org/doc/html/latest/core-api/workqueue.html) 的文件和實作,指出其設計和實作考量 :::