--- tags: linux2024 --- # [2024q1](https://wiki.csie.ncku.edu.tw/linux/schedule) 第 19 週測驗題 :::info 目的: 檢驗學員對 bitwise operation 的認知 ::: ==[作答表單](https://docs.google.com/forms/d/e/1FAIpQLSd5rN6a2ljoNQNXAGJP1GOSzvJjMcRQNXkbQ3D9RY55tcZiQg/viewform)== (針對 Linux 核心「設計」課程) ### 測驗 `1` 考慮以下 SPSC (single-producer/single-consumer) 的實作: (部分遮蔽) - [ ] `queue.h` ```c #pragma once #include <stdatomic.h> /* Queue Description * * Queues operate in index space. * - You provide the buffer. * - Index is modded by the buffer size. * - This allows for arbitrary queue sizes while preserving * power of two optimizations. * - Use queue sizes that are powers of two for optimal performance. * * In the diagram: * 'w' indicates entries being consumed. * 'x' indicates entries in the queue yet to be consumed. * 'p' indicates entries being pushed. * ' ' indicates free space. * * tail side * \ / * committed| |pending * | | * | | | | | |w|w|x|x|x|x|x|x|p|p| | | | | | * | | * committed| |pending * / \ * head side * * The 'committed' and 'pending' values increment sequentially for * both the head and tail sides. * * The diagram describes an MPMC (Multiple Producer Multiple Consumer) * queue. If the queue is single producer, the 'committed' and 'pending' * of the head will always be equal. Similarly, if the queue is single * consumer, the same applies for the tail. * * Usage: * Producer side: * Use 'prepare_push' functions to get indices to work on. * Use 'commit_push' functions to commit to the queue. * * Consumer side: * Use 'prepare_consume' functions to get indices to work on. * Use 'commit_consume' functions to consume from the queue, * freeing indices. */ union MaybeAtomicU32 { atomic_uint atomic_value; unsigned int value; }; union QueueSingleSide { union MaybeAtomicU32 pending, committed; }; typedef struct SPSCQueue { union QueueSingleSide head; union QueueSingleSide tail; unsigned int queue_size; /* Should always be less than INT32_MAX */ union { atomic_uint tail_waiters, head_waiters; }; } SPSCQueue; #define queue_init(queue, size) \ { \ *(queue) = (typeof(*queue)){.queue_size = size}; \ } #define queue_get_used(queue) \ ((queue)->head.pending.atomic_value - (queue)->tail.committed.atomic_value) #define queue_get_free(queue) \ ((int) ((queue)->queue_size - queue_get_used(queue))) #define queue_get_free_explicit(queue_size, head_pending, tail_committed) \ ((int) ((queue_size) - ((head_pending) - (tail_committed)))) #define queue_get_committed(queue) \ ((queue)->head.committed.atomic_value - (queue)->tail.pending.atomic_value) #define queue_get_committed_explicit(head_committed, tail_pending) \ ((head_committed) - (tail_pending)) /* Implementation */ #define uint unsigned int #include <linux/futex.h> #include <stdint.h> #include <sys/syscall.h> #include <unistd.h> static inline void futex_call(_Atomic uint32_t *uaddr, int futex_op, uint32_t val) { syscall(SYS_futex, uaddr, futex_op, val, NULL, NULL, 0); } static inline void atomic_wake_one(_Atomic uint32_t *futex) { futex_call(futex, FUTEX_WAKE_PRIVATE, 1); } static inline void atomic_wait(_Atomic uint32_t *futex, uint32_t expected_value) { futex_call(futex, FUTEX_WAIT_PRIVATE, expected_value); } #define QUEUE_ATOMIC_WAIT(atomic_x, v) atomic_wait(atomic_x, v) #define QUEUE_ATOMIC_WAKE_ONE(atomic_x) atomic_wake_one(atomic_x) #define QUEUE_ATOMIC_WAIT_AND_READ(atomic_x, v) \ { \ QUEUE_ATOMIC_WAIT((atomic_x), (v)); \ (v) = atomic_load(atomic_x); \ } #define QUEUE_WAKE_TAIL_WAITER(tail_committed) \ QUEUE_ATOMIC_WAKE_ONE(tail_committed) #define QUEUE_WAKE_HEAD_WAITER(head_committed) \ QUEUE_ATOMIC_WAKE_ONE(head_committed) #define QUEUE_WAKE_ALL_HEAD_WAITERS(head_committed) \ QUEUE_ATOMIC_WAKE_ALL(head_committed) #define QUEUE_WAIT_FOR_TAIL(tail_waiters, tail_committed, v) \ { \ atomic_fetch_add(tail_waiters, 1); \ QUEUE_ATOMIC_WAIT_AND_READ(tail_committed, v); \ atomic_fetch_sub(tail_waiters, 1); \ } #define QUEUE_WAIT_FOR_HEAD(head_waiters, head_comitted, v) \ { \ atomic_fetch_add(head_waiters, 1); \ QUEUE_ATOMIC_WAIT_AND_READ(head_committed, v); \ atomic_fetch_sub(head_waiters, 1); \ } /* Single producer implementation */ /* Returns an index to push or -1 on failure (Queue is full) */ static inline int sp_prepare_push(uint queue_size, uint head_pending, atomic_uint *tail_committed, atomic_uint *tail_waiters) { uint tail = atomic_load(tail_committed); while (queue_get_free_explicit(queue_size, head_pending, tail) <= 0) AAAA; /* As this is an SP queue, the usage cannot increase from this point * onwards, so we can safely return the current working index. */ return head_pending; } static inline void sp_commit_push(uint prepared_index, atomic_uint *head_committed, atomic_uint *head_waiters) { atomic_fetch_add(head_committed, 1); if (atomic_load(head_waiters)) BBBB; } /* --- Single consumer implementation --- */ /* Returns an index to consume */ static inline int sc_prepare_consume(atomic_uint *head_committed, uint tail_pending, atomic_uint *head_waiters) { uint head = atomic_load(head_committed); while (queue_get_committed_explicit(head, tail_pending) == 0) CCCC; /* As this is an SC queue, the committed value cannot decrease from this * point onwards, so we can consume safely. */ return tail_pending; } static inline void sc_commit_consume(uint prepared_index, atomic_uint *tail_committed, atomic_uint *tail_waiters) { atomic_fetch_add(tail_committed, 1); /* As this is an SC queue, any waiters would have to be the producer in * this case. */ if (atomic_load(tail_waiters)) QUEUE_WAKE_TAIL_WAITER(tail_committed); } /* SPSC implementation */ /* Returns an index to push or -1 on failure (Queue is full) */ static inline int spsc_prepare_push(SPSCQueue *queue) { return sp_prepare_push(queue->queue_size, queue->head.pending.value, &queue->tail.committed.atomic_value, &queue->tail_waiters); } static inline void spsc_commit_push(unsigned int prepared_index, SPSCQueue *queue) { sp_commit_push(prepared_index, &queue->head.committed.atomic_value, &queue->head_waiters); } /* Returns an index to consume */ static inline int spsc_prepare_consume(SPSCQueue *queue) { return sc_prepare_consume(&queue->head.committed.atomic_value, queue->tail.pending.value, &queue->head_waiters); } static inline void spsc_commit_consume(unsigned int prepared_index, SPSCQueue *queue) { sc_commit_consume(prepared_index, &queue->tail.committed.atomic_value, &queue->tail_waiters); } #undef uint ``` - [ ] `test.c` ```c #include <assert.h> #include <stdio.h> #include <threads.h> #include "queue.h" #define DATA_SIZE 1024 #define QUEUE_SIZE 1024 static int input_buffer[DATA_SIZE]; static int output_buffer[DATA_SIZE]; static int queue_buffer[QUEUE_SIZE]; int consumer(SPSCQueue *q) { for (int i = 0; i < DATA_SIZE; i++) { int index = spsc_prepare_consume(q); output_buffer[i] = queue_buffer[index % QUEUE_SIZE]; spsc_commit_consume(index, q); } return 0; } int main(int argc, char *argv[]) { for (int i = 0; i < DATA_SIZE; i++) input_buffer[i] = i + 100; SPSCQueue q; queue_init(&q, QUEUE_SIZE); thrd_t thread; thrd_create(&thread, (void *) consumer, &q); for (int i = 0; i < DATA_SIZE; i++) { int index = spsc_prepare_push(&q); queue_buffer[index % QUEUE_SIZE] = input_buffer[i]; spsc_commit_push(index, &q); } thrd_join(thread, NULL); for (int i = 0; i < DATA_SIZE; i++) { printf(" [%d]: in:%d out:%d\n", i, input_buffer[i], output_buffer[i]); assert(input_buffer[i] == output_buffer[i]); } return 0; } ``` 已知執行過程中不會觸發任何 assert,請補完程式碼,使其符合預期。作答規範: * `AAAA`, `BBBB`, `CCCC` 皆為表示式,不包含任何分號 (`;`) * `AAAA`, `BBBB`, `CCCC` 皆包含 `QUEUE_` 開頭的巨集 * 使用課程指定的程式碼風格,用最精簡的形式書寫