---
tags: linux2024
---
# [2024q1](https://wiki.csie.ncku.edu.tw/linux/schedule) 第 19 週測驗題
:::info
目的: 檢驗學員對 bitwise operation 的認知
:::
==[作答表單](https://docs.google.com/forms/d/e/1FAIpQLSd5rN6a2ljoNQNXAGJP1GOSzvJjMcRQNXkbQ3D9RY55tcZiQg/viewform)== (針對 Linux 核心「設計」課程)
### 測驗 `1`
考慮以下 SPSC (single-producer/single-consumer) 的實作: (部分遮蔽)
- [ ] `queue.h`
```c
#pragma once
#include <stdatomic.h>
/* Queue Description
*
* Queues operate in index space.
* - You provide the buffer.
* - Index is modded by the buffer size.
* - This allows for arbitrary queue sizes while preserving
* power of two optimizations.
* - Use queue sizes that are powers of two for optimal performance.
*
* In the diagram:
* 'w' indicates entries being consumed.
* 'x' indicates entries in the queue yet to be consumed.
* 'p' indicates entries being pushed.
* ' ' indicates free space.
*
* tail side
* \ /
* committed| |pending
* | |
* | | | | | |w|w|x|x|x|x|x|x|p|p| | | | | |
* | |
* committed| |pending
* / \
* head side
*
* The 'committed' and 'pending' values increment sequentially for
* both the head and tail sides.
*
* The diagram describes an MPMC (Multiple Producer Multiple Consumer)
* queue. If the queue is single producer, the 'committed' and 'pending'
* of the head will always be equal. Similarly, if the queue is single
* consumer, the same applies for the tail.
*
* Usage:
* Producer side:
* Use 'prepare_push' functions to get indices to work on.
* Use 'commit_push' functions to commit to the queue.
*
* Consumer side:
* Use 'prepare_consume' functions to get indices to work on.
* Use 'commit_consume' functions to consume from the queue,
* freeing indices.
*/
union MaybeAtomicU32 {
atomic_uint atomic_value;
unsigned int value;
};
union QueueSingleSide {
union MaybeAtomicU32 pending, committed;
};
typedef struct SPSCQueue {
union QueueSingleSide head;
union QueueSingleSide tail;
unsigned int queue_size; /* Should always be less than INT32_MAX */
union {
atomic_uint tail_waiters, head_waiters;
};
} SPSCQueue;
#define queue_init(queue, size) \
{ \
*(queue) = (typeof(*queue)){.queue_size = size}; \
}
#define queue_get_used(queue) \
((queue)->head.pending.atomic_value - (queue)->tail.committed.atomic_value)
#define queue_get_free(queue) \
((int) ((queue)->queue_size - queue_get_used(queue)))
#define queue_get_free_explicit(queue_size, head_pending, tail_committed) \
((int) ((queue_size) - ((head_pending) - (tail_committed))))
#define queue_get_committed(queue) \
((queue)->head.committed.atomic_value - (queue)->tail.pending.atomic_value)
#define queue_get_committed_explicit(head_committed, tail_pending) \
((head_committed) - (tail_pending))
/* Implementation */
#define uint unsigned int
#include <linux/futex.h>
#include <stdint.h>
#include <sys/syscall.h>
#include <unistd.h>
static inline void futex_call(_Atomic uint32_t *uaddr,
int futex_op,
uint32_t val)
{
syscall(SYS_futex, uaddr, futex_op, val, NULL, NULL, 0);
}
static inline void atomic_wake_one(_Atomic uint32_t *futex)
{
futex_call(futex, FUTEX_WAKE_PRIVATE, 1);
}
static inline void atomic_wait(_Atomic uint32_t *futex, uint32_t expected_value)
{
futex_call(futex, FUTEX_WAIT_PRIVATE, expected_value);
}
#define QUEUE_ATOMIC_WAIT(atomic_x, v) atomic_wait(atomic_x, v)
#define QUEUE_ATOMIC_WAKE_ONE(atomic_x) atomic_wake_one(atomic_x)
#define QUEUE_ATOMIC_WAIT_AND_READ(atomic_x, v) \
{ \
QUEUE_ATOMIC_WAIT((atomic_x), (v)); \
(v) = atomic_load(atomic_x); \
}
#define QUEUE_WAKE_TAIL_WAITER(tail_committed) \
QUEUE_ATOMIC_WAKE_ONE(tail_committed)
#define QUEUE_WAKE_HEAD_WAITER(head_committed) \
QUEUE_ATOMIC_WAKE_ONE(head_committed)
#define QUEUE_WAKE_ALL_HEAD_WAITERS(head_committed) \
QUEUE_ATOMIC_WAKE_ALL(head_committed)
#define QUEUE_WAIT_FOR_TAIL(tail_waiters, tail_committed, v) \
{ \
atomic_fetch_add(tail_waiters, 1); \
QUEUE_ATOMIC_WAIT_AND_READ(tail_committed, v); \
atomic_fetch_sub(tail_waiters, 1); \
}
#define QUEUE_WAIT_FOR_HEAD(head_waiters, head_comitted, v) \
{ \
atomic_fetch_add(head_waiters, 1); \
QUEUE_ATOMIC_WAIT_AND_READ(head_committed, v); \
atomic_fetch_sub(head_waiters, 1); \
}
/* Single producer implementation */
/* Returns an index to push or -1 on failure (Queue is full) */
static inline int sp_prepare_push(uint queue_size,
uint head_pending,
atomic_uint *tail_committed,
atomic_uint *tail_waiters)
{
uint tail = atomic_load(tail_committed);
while (queue_get_free_explicit(queue_size, head_pending, tail) <= 0)
AAAA;
/* As this is an SP queue, the usage cannot increase from this point
* onwards, so we can safely return the current working index.
*/
return head_pending;
}
static inline void sp_commit_push(uint prepared_index,
atomic_uint *head_committed,
atomic_uint *head_waiters)
{
atomic_fetch_add(head_committed, 1);
if (atomic_load(head_waiters))
BBBB;
}
/* --- Single consumer implementation --- */
/* Returns an index to consume */
static inline int sc_prepare_consume(atomic_uint *head_committed,
uint tail_pending,
atomic_uint *head_waiters)
{
uint head = atomic_load(head_committed);
while (queue_get_committed_explicit(head, tail_pending) == 0)
CCCC;
/* As this is an SC queue, the committed value cannot decrease from this
* point onwards, so we can consume safely.
*/
return tail_pending;
}
static inline void sc_commit_consume(uint prepared_index,
atomic_uint *tail_committed,
atomic_uint *tail_waiters)
{
atomic_fetch_add(tail_committed, 1);
/* As this is an SC queue, any waiters would have to be the producer in
* this case.
*/
if (atomic_load(tail_waiters))
QUEUE_WAKE_TAIL_WAITER(tail_committed);
}
/* SPSC implementation */
/* Returns an index to push or -1 on failure (Queue is full) */
static inline int spsc_prepare_push(SPSCQueue *queue)
{
return sp_prepare_push(queue->queue_size, queue->head.pending.value,
&queue->tail.committed.atomic_value,
&queue->tail_waiters);
}
static inline void spsc_commit_push(unsigned int prepared_index,
SPSCQueue *queue)
{
sp_commit_push(prepared_index, &queue->head.committed.atomic_value,
&queue->head_waiters);
}
/* Returns an index to consume */
static inline int spsc_prepare_consume(SPSCQueue *queue)
{
return sc_prepare_consume(&queue->head.committed.atomic_value,
queue->tail.pending.value, &queue->head_waiters);
}
static inline void spsc_commit_consume(unsigned int prepared_index,
SPSCQueue *queue)
{
sc_commit_consume(prepared_index, &queue->tail.committed.atomic_value,
&queue->tail_waiters);
}
#undef uint
```
- [ ] `test.c`
```c
#include <assert.h>
#include <stdio.h>
#include <threads.h>
#include "queue.h"
#define DATA_SIZE 1024
#define QUEUE_SIZE 1024
static int input_buffer[DATA_SIZE];
static int output_buffer[DATA_SIZE];
static int queue_buffer[QUEUE_SIZE];
int consumer(SPSCQueue *q)
{
for (int i = 0; i < DATA_SIZE; i++) {
int index = spsc_prepare_consume(q);
output_buffer[i] = queue_buffer[index % QUEUE_SIZE];
spsc_commit_consume(index, q);
}
return 0;
}
int main(int argc, char *argv[])
{
for (int i = 0; i < DATA_SIZE; i++)
input_buffer[i] = i + 100;
SPSCQueue q;
queue_init(&q, QUEUE_SIZE);
thrd_t thread;
thrd_create(&thread, (void *) consumer, &q);
for (int i = 0; i < DATA_SIZE; i++) {
int index = spsc_prepare_push(&q);
queue_buffer[index % QUEUE_SIZE] = input_buffer[i];
spsc_commit_push(index, &q);
}
thrd_join(thread, NULL);
for (int i = 0; i < DATA_SIZE; i++) {
printf(" [%d]: in:%d out:%d\n", i, input_buffer[i], output_buffer[i]);
assert(input_buffer[i] == output_buffer[i]);
}
return 0;
}
```
已知執行過程中不會觸發任何 assert,請補完程式碼,使其符合預期。作答規範:
* `AAAA`, `BBBB`, `CCCC` 皆為表示式,不包含任何分號 (`;`)
* `AAAA`, `BBBB`, `CCCC` 皆包含 `QUEUE_` 開頭的巨集
* 使用課程指定的程式碼風格,用最精簡的形式書寫