# [2021q1](http://wiki.csie.ncku.edu.tw/linux/schedule) 第 7 週測驗題: 測驗 `3` ###### tags: `linux2021` > [測驗題目總覽](https://hackmd.io/@sysprog/linux2021-quiz7) :::info 本題目檢驗學員對於 ==[並行程式設計](https://hackmd.io/@sysprog/concurrency)== 和 ==[bitwise 操作](https://hackmd.io/@sysprog/c-bitwise)== 的認知 ::: 給定檔案 `ringbuffer.c` 是一個 [Ring buffer](https://en.wikipedia.org/wiki/Circular_buffer) 的實作,由一組指標表格所構成,設計時已考慮到 [non-blocking](https://en.wikipedia.org/wiki/Non-blocking_algorithm) 的行為表現,實作也將 [CPU cache](https://en.wikipedia.org/wiki/CPU_cache) 列入考量,適用於 [Producer–consumer problem](https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem) 的 SPSC (single-producer and single-consumer) 的情境。 以下是 `ringbuffer.c` 程式碼列表: ```cpp /** * Ring buffer is a fixed-size queue, implemented as a table of * pointers. Head and tail pointers are modified atomically, allowing * concurrent access to it. It has the following features: * - FIFO (First In First Out) * - Maximum size is fixed; the pointers are stored in a table. * - Lockless implementation. * * The ring buffer implementation is not preemptible. */ #include <errno.h> #include <stdbool.h> #include <stdint.h> #include <stdio.h> #include <stdlib.h> #include <string.h> /* typically 64 bytes on x86/x64 CPUs */ #define CACHE_LINE_SIZE 64 #ifndef __compiler_barrier #define __compiler_barrier() \ do { \ asm volatile("" : : : "memory"); \ } while (0) #endif /** * The producer and the consumer have a head and a tail index. The particularity * of these index is that they are not between 0 and size(ring). These indexes * are between 0 and 2^32, and we mask their value when we access the ring[] * field. Thanks to this assumption, we can do subtractions between 2 index * values in a modulo-32bit base: that is why the overflow of the indexes is not * a problem. */ typedef struct { struct { /** Ring producer status. */ uint32_t watermark; /**< Maximum items before EDQUOT. */ uint32_t size; /**< Size of ring. */ uint32_t mask; /**< Mask (size - 1) of ring. */ volatile uint32_t head, tail; /**< Producer head and tail. */ } prod __attribute__((__aligned__(CACHE_LINE_SIZE))); struct { /** Ring consumer status. */ uint32_t size; /**< Size of the ring. */ uint32_t mask; /**< Mask (size - 1) of ring. */ volatile uint32_t head, tail; /**< Consumer head and tail. */ } cons __attribute__((__aligned__(CACHE_LINE_SIZE))); void *ring[] __attribute__((__aligned__(CACHE_LINE_SIZE))); } ringbuf_t; /* true if x is a power of 2 */ #define IS_POWEROF2(x) ((((x) -1) & (x)) == 0) #define RING_SIZE_MASK (unsigned) (0x0fffffff) /**< Ring size mask */ #define ALIGN_FLOOR(val, align) XXXXX /* 提供你的實作 */ /** * Calculate the memory size needed for a ring buffer. * * This function returns the number of bytes needed for a ring buffer, given * the number of elements in it. This value is the sum of the size of the * structure ringbuf and the size of the memory needed by the objects pointers. * The value is aligned to a cache line size. * * @param count * The number of elements in the ring (must be a power of 2). * @return * - The memory size occupied by the ring on success. * - -EINVAL if count is not a power of 2. */ ssize_t ringbuf_get_memsize(const unsigned count) { /* Requested size is invalid, must be power of 2, and do not exceed the * size limit RING_SIZE_MASK. */ if ((!IS_POWEROF2(count)) || (count > RING_SIZE_MASK)) return -EINVAL; ssize_t sz = sizeof(ringbuf_t) + count * sizeof(void *); sz = ALIGN_FLOOR(sz, CACHE_LINE_SIZE); return sz; } /** * Initialize a ring buffer. * * The ring size is set to *count*, which must be a power of two. Water * marking is disabled by default. The real usable ring size is (count - 1) * instead of (count) to differentiate a free ring from an empty ring. * * @param r * The pointer to the ring structure followed by the objects table. * @param count * The number of elements in the ring (must be a power of 2). * @return * 0 on success, or a negative value on error. */ int ringbuf_init(ringbuf_t *r, const unsigned count) { memset(r, 0, sizeof(*r)); r->prod.watermark = count, r->prod.size = r->cons.size = count; r->prod.mask = r->cons.mask = count - 1; r->prod.head = r->cons.head = 0, r->prod.tail = r->cons.tail = 0; return 0; } /** * Create a ring buffer. * * The real usable ring size is (count - 1) instead of (count) to * differentiate a free ring from an empty ring. * * @param count * The size of the ring (must be a power of 2). * @return * On success, the pointer to the new allocated ring. NULL on error with * errno set appropriately. Possible errno values include: * - EINVAL - count provided is not a power of 2 * - ENOSPC - the maximum number of memzones has already been allocated * - EEXIST - a memzone with the same name already exists * - ENOMEM - no appropriate memory area found in which to create memzone */ ringbuf_t *ringbuf_create(const unsigned count) { ssize_t ring_size = ringbuf_get_memsize(count); if (ring_size < 0) return NULL; ringbuf_t *r = malloc(ring_size); if (r) ringbuf_init(r, count); return r; } /** * Release all memory used by the ring. * * @param r * Ring to free */ void ringbuf_free(ringbuf_t *r) { free(r); } /* The actual enqueue of pointers on the ring. * Placed here since identical code needed in both single- and multi- producer * enqueue functions. */ #define ENQUEUE_PTRS() \ do { \ const uint32_t size = r->prod.size; \ uint32_t i, idx = prod_head & mask; \ if (idx + n < size) { \ for (i = 0; i < (n & ((~(unsigned) 0x3))); i += 4, idx += 4) { \ r->ring[idx] = obj_table[i]; \ r->ring[idx + 1] = obj_table[i + 1]; \ r->ring[idx + 2] = obj_table[i + 2]; \ r->ring[idx + 3] = obj_table[i + 3]; \ } \ switch (n & 0x3) { \ case 3: \ r->ring[idx++] = obj_table[i++]; \ case 2: \ r->ring[idx++] = obj_table[i++]; \ case 1: \ r->ring[idx++] = obj_table[i++]; \ } \ } else { \ for (i = 0; idx < size; i++, idx++) \ r->ring[idx] = obj_table[i]; \ for (idx = 0; i < n; i++, idx++) \ r->ring[idx] = obj_table[i]; \ } \ } while (0) /* The actual copy of pointers on the ring to obj_table. * Placed here since identical code needed in both single- and multi- consumer * dequeue functions. */ #define DEQUEUE_PTRS() \ do { \ uint32_t idx = cons_head & mask; \ uint32_t i, size = r->cons.size; \ if (idx + n < size) { \ for (i = 0; i < (n & (~(unsigned) 0x3)); i += 4, idx += 4) { \ obj_table[i] = r->ring[idx]; \ obj_table[i + 1] = r->ring[idx + 1]; \ obj_table[i + 2] = r->ring[idx + 2]; \ obj_table[i + 3] = r->ring[idx + 3]; \ } \ switch (n & 0x3) { \ case 3: \ obj_table[i++] = r->ring[idx++]; \ case 2: \ obj_table[i++] = r->ring[idx++]; \ case 1: \ obj_table[i++] = r->ring[idx++]; \ } \ } else { \ for (i = 0; idx < size; i++, idx++) \ obj_table[i] = r->ring[idx]; \ for (idx = 0; i < n; i++, idx++) \ obj_table[i] = r->ring[idx]; \ } \ } while (0) /** * @internal Enqueue several objects on a ring buffer (NOT multi-producers * safe). * * @param r * A pointer to the ring structure. * @param obj_table * A pointer to a table of void * pointers (objects). * @param n * The number of objects to add in the ring from the obj_table. * @return * - 0: Success; objects enqueue. * - -EDQUOT: Quota exceeded. The objects have been enqueued, but the * high water mark is exceeded. * - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued. */ static inline int ringbuffer_sp_do_enqueue(ringbuf_t *r, void *const *obj_table, const unsigned n) { uint32_t mask = r->prod.mask; uint32_t prod_head = r->prod.head, cons_tail = r->cons.tail; /* The subtraction is done between two unsigned 32-bits value (the result * is always modulo 32 bits even if we have prod_head > cons_tail). So * @free_entries is always between 0 and size(ring) - 1. */ uint32_t free_entries = mask + cons_tail - prod_head; /* check that we have enough room in ring */ if ((n > free_entries)) return -ENOBUFS; uint32_t prod_next = prod_head + n; r->prod.head = prod_next; /* write entries in ring */ ENQUEUE_PTRS(); __compiler_barrier(); r->prod.tail = prod_next; /* if we exceed the watermark */ XXXXX /* 請提交你的實作 */; } /** * @internal Dequeue several objects from a ring buffer (NOT multi-consumers * safe). When the request objects are more than the available objects, only * dequeue the actual number of objects * * @param r * A pointer to the ring structure. * @param obj_table * A pointer to a table of void * pointers (objects) that will be filled. * @param n * The number of objects to dequeue from the ring to the obj_table. * @return * - 0: Success; objects dequeued. * - -ENOENT: Not enough entries in the ring to dequeue; no object is * dequeued. */ static inline int ringbuffer_sc_do_dequeue(ringbuf_t *r, void **obj_table, const unsigned n) { uint32_t mask = r->prod.mask; uint32_t cons_head = r->cons.head, prod_tail = r->prod.tail; /* The subtraction is done between two unsigned 32-bits value (the result * is always modulo 32 bits even if we have cons_head > prod_tail). So * @entries is always between 0 and size(ring) - 1. */ uint32_t entries = prod_tail - cons_head; if (n > entries) return -ENOENT; uint32_t cons_next = cons_head + n; r->cons.head = cons_next; /* copy in table */ DEQUEUE_PTRS(); __compiler_barrier(); r->cons.tail = cons_next; return 0; } /** * Enqueue one object on a ring buffer (NOT multi-producers safe). * * @param r * A pointer to the ring structure. * @param obj * A pointer to the object to be added. * @return * - 0: Success; objects enqueued. * - -EDQUOT: Quota exceeded. The objects have been enqueued, but the * high water mark is exceeded. * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. */ static inline int ringbuf_sp_enqueue(ringbuf_t *r, void *obj) { return ringbuffer_sp_do_enqueue(r, &obj, 1); } /** * Dequeue one object from a ring buffer (NOT multi-consumers safe). * * @param r * A pointer to the ring structure. * @param obj_p * A pointer to a void * pointer (object) that will be filled. * @return * - 0: Success; objects dequeued. * - -ENOENT: Not enough entries in the ring to dequeue, no object is * dequeued. */ static inline int ringbuf_sc_dequeue(ringbuf_t *r, void **obj_p) { return ringbuffer_sc_do_dequeue(r, obj_p, 1); } /** * Test if a ring buffer is full. * * @param r * A pointer to the ring structure. * @return * - true: The ring is full. * - false: The ring is not full. */ static inline bool ringbuf_is_full(const ringbuf_t *r) { uint32_t prod_tail = r->prod.tail, cons_tail = r->cons.tail; return ((cons_tail - prod_tail - 1) & r->prod.mask) == 0; } /** * Test if a ring buffer is empty. * * @param r * A pointer to the ring structure. * @return * - true: The ring is empty. * - false: The ring is not empty. */ static inline bool ringbuf_is_empty(const ringbuf_t *r) { uint32_t prod_tail = r->prod.tail, cons_tail = r->cons.tail; return cons_tail == prod_tail; } #include <assert.h> int main(void) { ringbuf_t *r = ringbuf_create((1 << 6)); if (!r) { printf("Fail to create ring buffer.\n"); return -1; } for (int i = 0; !ringbuf_is_full(r); i++) ringbuf_sp_enqueue(r, *(void **) &i); for (int i = 0; !ringbuf_is_empty(r); i++) { void *obj; ringbuf_sc_dequeue(r, &obj); assert(i == *(int *) &obj); } ringbuf_free(r); return 0; } ``` 請依據上述程式行為和註解,補完程式碼。作答時,請列出 `ALIGN_FLOOR` 巨集的完整定義,以及 `ringbuffer_sp_do_enqueue` 函式的完整內容,並回答以下問題: 1. 上述程式碼的 `watermark` 有何作用?跟 [lock-less](https://en.wikipedia.org/wiki/Non-blocking_algorithm) 演算法有何關聯? 2. 除了題目提到的 SPSC (single-producer and single-consumer),該如何擴展為 single producer + multiple consumer (SPMC) 和 single producer + multiple consumer (MPMC) 呢?請簡述你的手段,都針對 fixed-size queue。 :::success 延伸問題: 1. 解釋上述程式碼運作原理,並解說 [lock-less](https://en.wikipedia.org/wiki/Non-blocking_algorithm) 如何落實 2. 改寫原本的 SPSC,擴充為 MPSC,儘量重用程式碼,並設計效能分析程式,可參考 [hungry-birds](https://github.com/jserv/hungry-birds) :::