Try   HackMD

2024q1 第 19 週測驗題

目的: 檢驗學員對 bitwise operation 的認知

作答表單 (針對 Linux 核心「設計」課程)

測驗 1

考慮以下 SPSC (single-producer/single-consumer) 的實作: (部分遮蔽)

  • queue.h
#pragma once

#include <stdatomic.h>

/*  Queue Description
 *
 * Queues operate in index space.
 *  - You provide the buffer.
 *  - Index is modded by the buffer size.
 *    - This allows for arbitrary queue sizes while preserving
 *      power of two optimizations.
 *    - Use queue sizes that are powers of two for optimal performance.
 *
 * In the diagram:
 *  'w' indicates entries being consumed.
 *  'x' indicates entries in the queue yet to be consumed.
 *  'p' indicates entries being pushed.
 *  ' ' indicates free space.
 *
 *                tail side
 *                 \     /
 *         committed|   |pending
 *                  |   |
 *        | | | | | |w|w|x|x|x|x|x|x|p|p| | | | | |
 *                                  |   |
 *                         committed|   |pending
 *                                  /   \
 *                                head side
 *
 * The 'committed' and 'pending' values increment sequentially for
 * both the head and tail sides.
 *
 * The diagram describes an MPMC (Multiple Producer Multiple Consumer)
 * queue. If the queue is single producer, the 'committed' and 'pending'
 * of the head will always be equal. Similarly, if the queue is single
 * consumer, the same applies for the tail.
 *
 * Usage:
 *   Producer side:
 *     Use 'prepare_push' functions to get indices to work on.
 *     Use 'commit_push' functions to commit to the queue.
 *
 *   Consumer side:
 *     Use 'prepare_consume' functions to get indices to work on.
 *     Use 'commit_consume' functions to consume from the queue,
 *     freeing indices.
 */

union MaybeAtomicU32 {
    atomic_uint atomic_value;
    unsigned int value;
};

union QueueSingleSide {
    union MaybeAtomicU32 pending, committed;
};

typedef struct SPSCQueue {
    union QueueSingleSide head;
    union QueueSingleSide tail;
    unsigned int queue_size; /* Should always be less than INT32_MAX */
    union {
        atomic_uint tail_waiters, head_waiters;
    };
} SPSCQueue;

#define queue_init(queue, size)                          \
    {                                                    \
        *(queue) = (typeof(*queue)){.queue_size = size}; \
    }
#define queue_get_used(queue) \
    ((queue)->head.pending.atomic_value - (queue)->tail.committed.atomic_value)
#define queue_get_free(queue) \
    ((int) ((queue)->queue_size - queue_get_used(queue)))
#define queue_get_free_explicit(queue_size, head_pending, tail_committed) \
    ((int) ((queue_size) - ((head_pending) - (tail_committed))))
#define queue_get_committed(queue) \
    ((queue)->head.committed.atomic_value - (queue)->tail.pending.atomic_value)
#define queue_get_committed_explicit(head_committed, tail_pending) \
    ((head_committed) - (tail_pending))

/* Implementation */

#define uint unsigned int

#include <linux/futex.h>
#include <stdint.h>
#include <sys/syscall.h>
#include <unistd.h>

static inline void futex_call(_Atomic uint32_t *uaddr,
                              int futex_op,
                              uint32_t val)
{
    syscall(SYS_futex, uaddr, futex_op, val, NULL, NULL, 0);
}

static inline void atomic_wake_one(_Atomic uint32_t *futex)
{
    futex_call(futex, FUTEX_WAKE_PRIVATE, 1);
}

static inline void atomic_wait(_Atomic uint32_t *futex, uint32_t expected_value)
{
    futex_call(futex, FUTEX_WAIT_PRIVATE, expected_value);
}

#define QUEUE_ATOMIC_WAIT(atomic_x, v) atomic_wait(atomic_x, v)
#define QUEUE_ATOMIC_WAKE_ONE(atomic_x) atomic_wake_one(atomic_x)
#define QUEUE_ATOMIC_WAIT_AND_READ(atomic_x, v) \
    {                                           \
        QUEUE_ATOMIC_WAIT((atomic_x), (v));     \
        (v) = atomic_load(atomic_x);            \
    }
#define QUEUE_WAKE_TAIL_WAITER(tail_committed) \
    QUEUE_ATOMIC_WAKE_ONE(tail_committed)
#define QUEUE_WAKE_HEAD_WAITER(head_committed) \
    QUEUE_ATOMIC_WAKE_ONE(head_committed)
#define QUEUE_WAKE_ALL_HEAD_WAITERS(head_committed) \
    QUEUE_ATOMIC_WAKE_ALL(head_committed)

#define QUEUE_WAIT_FOR_TAIL(tail_waiters, tail_committed, v) \
    {                                                        \
        atomic_fetch_add(tail_waiters, 1);                   \
        QUEUE_ATOMIC_WAIT_AND_READ(tail_committed, v);       \
        atomic_fetch_sub(tail_waiters, 1);                   \
    }

#define QUEUE_WAIT_FOR_HEAD(head_waiters, head_comitted, v) \
    {                                                       \
        atomic_fetch_add(head_waiters, 1);                  \
        QUEUE_ATOMIC_WAIT_AND_READ(head_committed, v);      \
        atomic_fetch_sub(head_waiters, 1);                  \
    }

/* Single producer implementation */

/* Returns an index to push or -1 on failure (Queue is full) */
static inline int sp_prepare_push(uint queue_size,
                                  uint head_pending,
                                  atomic_uint *tail_committed,
                                  atomic_uint *tail_waiters)
{
    uint tail = atomic_load(tail_committed);

    while (queue_get_free_explicit(queue_size, head_pending, tail) <= 0)
        AAAA;

    /* As this is an SP queue, the usage cannot increase from this point
     * onwards, so we can safely return the current working index.
     */
    return head_pending;
}

static inline void sp_commit_push(uint prepared_index,
                                  atomic_uint *head_committed,
                                  atomic_uint *head_waiters)
{
    atomic_fetch_add(head_committed, 1);

    if (atomic_load(head_waiters))
        BBBB;
}

/* --- Single consumer implementation --- */

/* Returns an index to consume */
static inline int sc_prepare_consume(atomic_uint *head_committed,
                                     uint tail_pending,
                                     atomic_uint *head_waiters)
{
    uint head = atomic_load(head_committed);

    while (queue_get_committed_explicit(head, tail_pending) == 0)
        CCCC;

    /* As this is an SC queue, the committed value cannot decrease from this
     * point onwards, so we can consume safely.
     */
    return tail_pending;
}

static inline void sc_commit_consume(uint prepared_index,
                                     atomic_uint *tail_committed,
                                     atomic_uint *tail_waiters)
{
    atomic_fetch_add(tail_committed, 1);

    /* As this is an SC queue, any waiters would have to be the producer in
     * this case.
     */
    if (atomic_load(tail_waiters))
        QUEUE_WAKE_TAIL_WAITER(tail_committed);
}

/* SPSC implementation */

/* Returns an index to push or -1 on failure (Queue is full) */
static inline int spsc_prepare_push(SPSCQueue *queue)
{
    return sp_prepare_push(queue->queue_size, queue->head.pending.value,
                           &queue->tail.committed.atomic_value,
                           &queue->tail_waiters);
}

static inline void spsc_commit_push(unsigned int prepared_index,
                                    SPSCQueue *queue)
{
    sp_commit_push(prepared_index, &queue->head.committed.atomic_value,
                   &queue->head_waiters);
}

/* Returns an index to consume */
static inline int spsc_prepare_consume(SPSCQueue *queue)
{
    return sc_prepare_consume(&queue->head.committed.atomic_value,
                              queue->tail.pending.value, &queue->head_waiters);
}

static inline void spsc_commit_consume(unsigned int prepared_index,
                                       SPSCQueue *queue)
{
    sc_commit_consume(prepared_index, &queue->tail.committed.atomic_value,
                      &queue->tail_waiters);
}

#undef uint
  • test.c
#include <assert.h>
#include <stdio.h>
#include <threads.h>
#include "queue.h"

#define DATA_SIZE 1024
#define QUEUE_SIZE 1024

static int input_buffer[DATA_SIZE];
static int output_buffer[DATA_SIZE];
static int queue_buffer[QUEUE_SIZE];

int consumer(SPSCQueue *q)
{
    for (int i = 0; i < DATA_SIZE; i++) {
        int index = spsc_prepare_consume(q);
        output_buffer[i] = queue_buffer[index % QUEUE_SIZE];
        spsc_commit_consume(index, q);
    }
    return 0;
}

int main(int argc, char *argv[])
{
    for (int i = 0; i < DATA_SIZE; i++)
        input_buffer[i] = i + 100;

    SPSCQueue q;
    queue_init(&q, QUEUE_SIZE);

    thrd_t thread;
    thrd_create(&thread, (void *) consumer, &q);

    for (int i = 0; i < DATA_SIZE; i++) {
        int index = spsc_prepare_push(&q);
        queue_buffer[index % QUEUE_SIZE] = input_buffer[i];
        spsc_commit_push(index, &q);
    }

    thrd_join(thread, NULL);

    for (int i = 0; i < DATA_SIZE; i++) {
        printf(" [%d]: in:%d  out:%d\n", i, input_buffer[i], output_buffer[i]);
        assert(input_buffer[i] == output_buffer[i]);
    }

    return 0;
}

已知執行過程中不會觸發任何 assert,請補完程式碼,使其符合預期。作答規範:

  • AAAA, BBBB, CCCC 皆為表示式,不包含任何分號 (;)
  • AAAA, BBBB, CCCC 皆包含 QUEUE_ 開頭的巨集
  • 使用課程指定的程式碼風格,用最精簡的形式書寫