Try   HackMD

2021q1 第 7 週測驗題: 測驗 3

tags: linux2021

測驗題目總覽

本題目檢驗學員對於 並行程式設計bitwise 操作 的認知

給定檔案 ringbuffer.c 是一個 Ring buffer 的實作,由一組指標表格所構成,設計時已考慮到 non-blocking 的行為表現,實作也將 CPU cache 列入考量,適用於 Producer–consumer problem 的 SPSC (single-producer and single-consumer) 的情境。

以下是 ringbuffer.c 程式碼列表:

/**
 * Ring buffer is a fixed-size queue, implemented as a table of
 * pointers. Head and tail pointers are modified atomically, allowing
 * concurrent access to it. It has the following features:
 * - FIFO (First In First Out)
 * - Maximum size is fixed; the pointers are stored in a table.
 * - Lockless implementation.
 *
 * The ring buffer implementation is not preemptible.
 */

#include <errno.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

/* typically 64 bytes on x86/x64 CPUs */
#define CACHE_LINE_SIZE 64

#ifndef __compiler_barrier
#define __compiler_barrier()             \
    do {                                 \
        asm volatile("" : : : "memory"); \
    } while (0)
#endif

/**
 * The producer and the consumer have a head and a tail index. The particularity
 * of these index is that they are not between 0 and size(ring). These indexes
 * are between 0 and 2^32, and we mask their value when we access the ring[]
 * field. Thanks to this assumption, we can do subtractions between 2 index
 * values in a modulo-32bit base: that is why the overflow of the indexes is not
 * a problem.
 */
typedef struct {
    struct {                          /** Ring producer status. */
        uint32_t watermark;           /**< Maximum items before EDQUOT. */
        uint32_t size;                /**< Size of ring. */
        uint32_t mask;                /**< Mask (size - 1) of ring. */
        volatile uint32_t head, tail; /**< Producer head and tail. */
    } prod __attribute__((__aligned__(CACHE_LINE_SIZE)));

    struct {                          /** Ring consumer status. */
        uint32_t size;                /**< Size of the ring. */
        uint32_t mask;                /**< Mask (size - 1) of ring. */
        volatile uint32_t head, tail; /**< Consumer head and tail. */
    } cons __attribute__((__aligned__(CACHE_LINE_SIZE)));

    void *ring[] __attribute__((__aligned__(CACHE_LINE_SIZE)));
} ringbuf_t;

/* true if x is a power of 2 */
#define IS_POWEROF2(x) ((((x) -1) & (x)) == 0)
#define RING_SIZE_MASK (unsigned) (0x0fffffff) /**< Ring size mask */
#define ALIGN_FLOOR(val, align) XXXXX /* 提供你的實作 */

/**
 * Calculate the memory size needed for a ring buffer.
 *
 * This function returns the number of bytes needed for a ring buffer, given
 * the number of elements in it. This value is the sum of the size of the
 * structure ringbuf and the size of the memory needed by the objects pointers.
 * The value is aligned to a cache line size.
 *
 * @param count
 *   The number of elements in the ring (must be a power of 2).
 * @return
 *   - The memory size occupied by the ring on success.
 *   - -EINVAL if count is not a power of 2.
 */
ssize_t ringbuf_get_memsize(const unsigned count)
{
    /* Requested size is invalid, must be power of 2, and do not exceed the
     * size limit RING_SIZE_MASK.
     */
    if ((!IS_POWEROF2(count)) || (count > RING_SIZE_MASK))
        return -EINVAL;

    ssize_t sz = sizeof(ringbuf_t) + count * sizeof(void *);
    sz = ALIGN_FLOOR(sz, CACHE_LINE_SIZE);
    return sz;
}

/**
 * Initialize a ring buffer.
 *
 * The ring size is set to *count*, which must be a power of two. Water
 * marking is disabled by default. The real usable ring size is (count - 1)
 * instead of (count) to differentiate a free ring from an empty ring.
 *
 * @param r
 *   The pointer to the ring structure followed by the objects table.
 * @param count
 *   The number of elements in the ring (must be a power of 2).
 * @return
 *   0 on success, or a negative value on error.
 */
int ringbuf_init(ringbuf_t *r, const unsigned count)
{
    memset(r, 0, sizeof(*r));
    r->prod.watermark = count, r->prod.size = r->cons.size = count;
    r->prod.mask = r->cons.mask = count - 1;
    r->prod.head = r->cons.head = 0, r->prod.tail = r->cons.tail = 0;

    return 0;
}

/**
 * Create a ring buffer.
 *
 * The real usable ring size is (count - 1) instead of (count) to
 * differentiate a free ring from an empty ring.
 *
 * @param count
 *   The size of the ring (must be a power of 2).
 * @return
 *   On success, the pointer to the new allocated ring. NULL on error with
 *    errno set appropriately. Possible errno values include:
 *    - EINVAL - count provided is not a power of 2
 *    - ENOSPC - the maximum number of memzones has already been allocated
 *    - EEXIST - a memzone with the same name already exists
 *    - ENOMEM - no appropriate memory area found in which to create memzone
 */
ringbuf_t *ringbuf_create(const unsigned count)
{
    ssize_t ring_size = ringbuf_get_memsize(count);
    if (ring_size < 0)
        return NULL;

    ringbuf_t *r = malloc(ring_size);
    if (r)
        ringbuf_init(r, count);
    return r;
}

/**
 * Release all memory used by the ring.
 *
 * @param r
 *   Ring to free
 */
void ringbuf_free(ringbuf_t *r)
{
    free(r);
}

/* The actual enqueue of pointers on the ring.
 * Placed here since identical code needed in both single- and multi- producer
 * enqueue functions.
 */
#define ENQUEUE_PTRS()                                                     \
    do {                                                                   \
        const uint32_t size = r->prod.size;                                \
        uint32_t i, idx = prod_head & mask;                                \
        if (idx + n < size) {                                              \
            for (i = 0; i < (n & ((~(unsigned) 0x3))); i += 4, idx += 4) { \
                r->ring[idx] = obj_table[i];                               \
                r->ring[idx + 1] = obj_table[i + 1];                       \
                r->ring[idx + 2] = obj_table[i + 2];                       \
                r->ring[idx + 3] = obj_table[i + 3];                       \
            }                                                              \
            switch (n & 0x3) {                                             \
            case 3:                                                        \
                r->ring[idx++] = obj_table[i++];                           \
            case 2:                                                        \
                r->ring[idx++] = obj_table[i++];                           \
            case 1:                                                        \
                r->ring[idx++] = obj_table[i++];                           \
            }                                                              \
        } else {                                                           \
            for (i = 0; idx < size; i++, idx++)                            \
                r->ring[idx] = obj_table[i];                               \
            for (idx = 0; i < n; i++, idx++)                               \
                r->ring[idx] = obj_table[i];                               \
        }                                                                  \
    } while (0)

/* The actual copy of pointers on the ring to obj_table.
 * Placed here since identical code needed in both single- and multi- consumer
 * dequeue functions.
 */
#define DEQUEUE_PTRS()                                                   \
    do {                                                                 \
        uint32_t idx = cons_head & mask;                                 \
        uint32_t i, size = r->cons.size;                                 \
        if (idx + n < size) {                                            \
            for (i = 0; i < (n & (~(unsigned) 0x3)); i += 4, idx += 4) { \
                obj_table[i] = r->ring[idx];                             \
                obj_table[i + 1] = r->ring[idx + 1];                     \
                obj_table[i + 2] = r->ring[idx + 2];                     \
                obj_table[i + 3] = r->ring[idx + 3];                     \
            }                                                            \
            switch (n & 0x3) {                                           \
            case 3:                                                      \
                obj_table[i++] = r->ring[idx++];                         \
            case 2:                                                      \
                obj_table[i++] = r->ring[idx++];                         \
            case 1:                                                      \
                obj_table[i++] = r->ring[idx++];                         \
            }                                                            \
        } else {                                                         \
            for (i = 0; idx < size; i++, idx++)                          \
                obj_table[i] = r->ring[idx];                             \
            for (idx = 0; i < n; i++, idx++)                             \
                obj_table[i] = r->ring[idx];                             \
        }                                                                \
    } while (0)

/**
 * @internal Enqueue several objects on a ring buffer (NOT multi-producers
 * safe).
 *
 * @param r
 *   A pointer to the ring structure.
 * @param obj_table
 *   A pointer to a table of void * pointers (objects).
 * @param n
 *   The number of objects to add in the ring from the obj_table.
 * @return
 *   - 0: Success; objects enqueue.
 *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
 *     high water mark is exceeded.
 *   - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
 */
static inline int ringbuffer_sp_do_enqueue(ringbuf_t *r,
                                           void *const *obj_table,
                                           const unsigned n)
{
    uint32_t mask = r->prod.mask;
    uint32_t prod_head = r->prod.head, cons_tail = r->cons.tail;
    /* The subtraction is done between two unsigned 32-bits value (the result
     * is always modulo 32 bits even if we have prod_head > cons_tail). So
     * @free_entries is always between 0 and size(ring) - 1.
     */
    uint32_t free_entries = mask + cons_tail - prod_head;

    /* check that we have enough room in ring */
    if ((n > free_entries))
        return -ENOBUFS;

    uint32_t prod_next = prod_head + n;
    r->prod.head = prod_next;

    /* write entries in ring */
    ENQUEUE_PTRS();
    __compiler_barrier();

    r->prod.tail = prod_next;

    /* if we exceed the watermark */
    XXXXX /* 請提交你的實作 */;
}

/**
 * @internal Dequeue several objects from a ring buffer (NOT multi-consumers
 * safe). When the request objects are more than the available objects, only
 * dequeue the actual number of objects
 *
 * @param r
 *   A pointer to the ring structure.
 * @param obj_table
 *   A pointer to a table of void * pointers (objects) that will be filled.
 * @param n
 *   The number of objects to dequeue from the ring to the obj_table.
 * @return
 *   - 0: Success; objects dequeued.
 *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
 *     dequeued.
 */
static inline int ringbuffer_sc_do_dequeue(ringbuf_t *r,
                                           void **obj_table,
                                           const unsigned n)
{
    uint32_t mask = r->prod.mask;
    uint32_t cons_head = r->cons.head, prod_tail = r->prod.tail;
    /* The subtraction is done between two unsigned 32-bits value (the result
     * is always modulo 32 bits even if we have cons_head > prod_tail). So
     * @entries is always between 0 and size(ring) - 1.
     */
    uint32_t entries = prod_tail - cons_head;

    if (n > entries)
        return -ENOENT;

    uint32_t cons_next = cons_head + n;
    r->cons.head = cons_next;

    /* copy in table */
    DEQUEUE_PTRS();
    __compiler_barrier();

    r->cons.tail = cons_next;
    return 0;
}

/**
 * Enqueue one object on a ring buffer (NOT multi-producers safe).
 *
 * @param r
 *   A pointer to the ring structure.
 * @param obj
 *   A pointer to the object to be added.
 * @return
 *   - 0: Success; objects enqueued.
 *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
 *     high water mark is exceeded.
 *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
 */
static inline int ringbuf_sp_enqueue(ringbuf_t *r, void *obj)
{
    return ringbuffer_sp_do_enqueue(r, &obj, 1);
}

/**
 * Dequeue one object from a ring buffer (NOT multi-consumers safe).
 *
 * @param r
 *   A pointer to the ring structure.
 * @param obj_p
 *   A pointer to a void * pointer (object) that will be filled.
 * @return
 *   - 0: Success; objects dequeued.
 *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
 *     dequeued.
 */
static inline int ringbuf_sc_dequeue(ringbuf_t *r, void **obj_p)
{
    return ringbuffer_sc_do_dequeue(r, obj_p, 1);
}

/**
 * Test if a ring buffer is full.
 *
 * @param r
 *   A pointer to the ring structure.
 * @return
 *   - true: The ring is full.
 *   - false: The ring is not full.
 */
static inline bool ringbuf_is_full(const ringbuf_t *r)
{
    uint32_t prod_tail = r->prod.tail, cons_tail = r->cons.tail;
    return ((cons_tail - prod_tail - 1) & r->prod.mask) == 0;
}

/**
 * Test if a ring buffer is empty.
 *
 * @param r
 *   A pointer to the ring structure.
 * @return
 *   - true: The ring is empty.
 *   - false: The ring is not empty.
 */
static inline bool ringbuf_is_empty(const ringbuf_t *r)
{
    uint32_t prod_tail = r->prod.tail, cons_tail = r->cons.tail;
    return cons_tail == prod_tail;
}

#include <assert.h>

int main(void)
{
    ringbuf_t *r = ringbuf_create((1 << 6));
    if (!r) {
        printf("Fail to create ring buffer.\n");
        return -1;
    }

    for (int i = 0; !ringbuf_is_full(r); i++)
        ringbuf_sp_enqueue(r, *(void **) &i);

    for (int i = 0; !ringbuf_is_empty(r); i++) {
        void *obj;
        ringbuf_sc_dequeue(r, &obj);
        assert(i == *(int *) &obj);
    }

    ringbuf_free(r);
    return 0;
}

請依據上述程式行為和註解,補完程式碼。作答時,請列出 ALIGN_FLOOR 巨集的完整定義,以及 ringbuffer_sp_do_enqueue 函式的完整內容,並回答以下問題:

  1. 上述程式碼的 watermark 有何作用?跟 lock-less 演算法有何關聯?
  2. 除了題目提到的 SPSC (single-producer and single-consumer),該如何擴展為 single producer + multiple consumer (SPMC) 和 single producer + multiple consumer (MPMC) 呢?請簡述你的手段,都針對 fixed-size queue。

延伸問題:

  1. 解釋上述程式碼運作原理,並解說 lock-less 如何落實
  2. 改寫原本的 SPSC,擴充為 MPSC,儘量重用程式碼,並設計效能分析程式,可參考 hungry-birds