# [2021q1](http://wiki.csie.ncku.edu.tw/linux/schedule) 第 17 週測驗題
###### tags: `linux2021`
:::info
目的: 檢驗學員對 ==[RCU 同步機制](https://hackmd.io/@sysprog/linux-rcu)== 的認知
:::
==[作答表單](https://docs.google.com/forms/d/e/1FAIpQLSdu4i51UPiBfgUZd11KNHjiLDujadWbEdGAHrFR8gqfHEyg0w/viewform)==
### 測驗 `1`
考慮藉由 C11 Atomics 來實作 [QSBR](https://lwn.net/Articles/573424/) (quiescent-state-based RCU),以管理並行的佇列。程式碼如下:
```cpp=
#define CACHE_LINE_SIZE 64
#define ALLOC_SIZE(...) __attribute__((__alloc_size__(__VA_ARGS__)))
#define ALLOC_ALIGN(n) __attribute__((__alloc_align__(n)))
#define LIKELY(exp) __builtin_expect((exp), 1)
#define UNLIKELY(exp) __builtin_expect((exp), 0)
#include <assert.h>
#include <stdlib.h>
#include <stdint.h>
typedef void *(*wrap_realloc_t)(void *ptr, size_t size);
extern wrap_realloc_t wrap_realloc_ptr;
ALLOC_SIZE(1)
static inline void *wrap_malloc(size_t size)
{
void *ptr = wrap_realloc_ptr(NULL, size);
/* The returned pointer should be at least aligned to sizeof(void *). */
assert((uintptr_t) ptr % sizeof(void *) == 0);
return ptr;
}
static inline void wrap_free(void *ptr)
{
if (ptr)
wrap_realloc_ptr(ptr, 0);
}
wrap_realloc_t wrap_realloc_ptr = &realloc;
#include <stdatomic.h>
#include <stdbool.h>
struct qsbr_entry {
_Atomic(struct qsbr_entry *) next;
};
struct qsbr_global {
/* Current global epoch */
_Atomic uint32_t epoch;
/* Number of threads that have not entered a quiescent section in the
* current epoch yet.
*/
_Atomic uint32_t n_remaining;
/* Entries that can be freed at current epoch + 1 */
_Atomic(struct qsbr_entry *) to_free1;
/* Entries that can be freed at current epoch + 2 */
_Atomic(struct qsbr_entry *) to_free2;
uint32_t n_threads;
};
struct qsbr_local {
uint32_t epoch;
};
static inline void qsbr_init_global(struct qsbr_global *global,
uint32_t n_threads)
{
atomic_init(&global->epoch, 0);
atomic_init(&global->n_remaining, 0);
atomic_init(&global->to_free1, NULL);
atomic_init(&global->to_free2, NULL);
global->n_threads = n_threads;
}
static inline void qsbr_init_local(struct qsbr_local *local)
{
local->epoch = 0;
}
static inline void qsbr_fini_global(struct qsbr_global *global,
struct qsbr_entry **to_free1,
struct qsbr_entry **to_free2)
{
*to_free1 = atomic_load_explicit(&global->to_free1, memory_order_relaxed);
*to_free2 = atomic_load_explicit(&global->to_free2, memory_order_relaxed);
}
static inline void qsbr_free(struct qsbr_global *global,
struct qsbr_local *local,
struct qsbr_entry *entry)
{
struct qsbr_entry *expected = NULL;
/* qsbr_free() is not supported when the number of threads is 1. */
assert(global->n_threads > 1);
atomic_store_explicit(&entry->next, NULL, memory_order_relaxed);
bool exchanged = atomic_compare_exchange_strong_explicit(
&global->to_free1, &expected, entry, memory_order_acq_rel,
memory_order_consume);
if (UNLIKELY(exchanged)) {
uint32_t epoch =
atomic_load_explicit(&global->epoch, memory_order_relaxed);
atomic_store_explicit(&global->n_remaining, global->n_threads - 1,
memory_order_relaxed);
atomic_store_explicit(&global->epoch, epoch + 1, memory_order_release);
local->epoch = epoch + 1;
} else {
struct qsbr_entry *next =
atomic_load_explicit(&global->to_free2, memory_order_relaxed);
atomic_store_explicit(&entry->next, next, memory_order_relaxed);
while (!atomic_compare_exchange_weak_explicit(
&global->to_free2, &entry->next, entry, memory_order_release,
memory_order_relaxed))
;
}
}
static inline struct qsbr_entry *qsbr_quiescent(struct qsbr_global *global,
struct qsbr_local *local)
{
uint32_t epoch = atomic_load_explicit(&global->epoch, memory_order_consume);
if (LIKELY(epoch == local->epoch))
return NULL;
struct qsbr_entry *to_free = NULL;
uint32_t n_remaining = atomic_fetch_sub_explicit(&global->n_remaining, 1,
memory_order_acq_rel);
assert(n_remaining >= 1);
if (UNLIKELY(COND)) {
to_free = atomic_load_explicit(&global->to_free1, memory_order_acquire);
assert(to_free);
struct qsbr_entry *new_to_free1 =
atomic_load_explicit(&global->to_free2, memory_order_consume);
if (new_to_free1) {
while (!atomic_compare_exchange_weak_explicit(
AAA, BBB, NULL, memory_order_acquire,
memory_order_relaxed))
;
assert(new_to_free1);
atomic_store_explicit(&global->to_free1, new_to_free1,
memory_order_relaxed);
atomic_store_explicit(&global->n_remaining, global->n_threads - 1,
memory_order_relaxed);
atomic_store_explicit(&global->epoch, epoch + 1,
memory_order_release);
epoch++;
} else {
atomic_store_explicit(&global->to_free1, NULL,
memory_order_relaxed);
}
}
local->epoch = epoch;
return to_free;
}
#include <stdalign.h>
struct qsbr_queue_node {
struct qsbr_entry qsbr_entry;
void *value;
_Atomic(struct qsbr_queue_node *) next;
};
struct qsbr_queue {
alignas(CACHE_LINE_SIZE) _Atomic(struct qsbr_queue_node *) head;
alignas(CACHE_LINE_SIZE) _Atomic(struct qsbr_queue_node *) tail;
};
static inline void qsbr_queue_init(struct qsbr_queue *queue,
struct qsbr_queue_node *init_node)
{
atomic_init(&init_node->next, NULL);
atomic_init(&queue->head, init_node);
atomic_init(&queue->tail, init_node);
}
static inline void qsbr_queue_fini(struct qsbr_queue *queue,
struct qsbr_queue_node **released_node)
{
*released_node = atomic_load_explicit(&queue->head, memory_order_relaxed);
}
static inline void qsbr_queue_push(struct qsbr_queue *queue,
struct qsbr_queue_node *node,
void *value)
{
node->value = value;
atomic_store_explicit(&node->next, NULL, memory_order_relaxed);
struct qsbr_queue_node *tail =
atomic_load_explicit(&queue->tail, memory_order_relaxed);
for (;;) {
struct qsbr_queue_node *next = NULL;
if (atomic_compare_exchange_weak_explicit(&tail->next, &next, node,
memory_order_release,
memory_order_relaxed))
break;
atomic_compare_exchange_weak_explicit(&queue->tail, &tail, next,
memory_order_relaxed,
memory_order_relaxed);
}
atomic_compare_exchange_strong(&queue->tail, &tail, node);
}
static inline bool qsbr_queue_pop(struct qsbr_queue *queue,
struct qsbr_queue_node **released_node,
void **value_ptr)
{
struct qsbr_queue_node *next;
struct qsbr_queue_node *head =
atomic_load_explicit(&queue->head, memory_order_consume);
for (;;) {
next = atomic_load_explicit(&head->next, memory_order_acquire);
if (!next)
return false;
if (atomic_compare_exchange_weak_explicit(&queue->head, &head, next,
memory_order_acquire,
memory_order_consume))
break;
}
*value_ptr = next->value;
*released_node = head;
return true;
}
#include <errno.h>
#include <pthread.h>
struct thread {
pthread_t handle;
};
static inline int thread_create(struct thread *thr,
void *(*start_routine)(void *),
void *arg)
{
int ret = pthread_create(&thr->handle, NULL, start_routine, arg);
assert(ret != EINVAL);
return -ret;
}
static inline void thread_join(struct thread *thr, void **ret_val_ptr)
{
int ret = pthread_join(thr->handle, ret_val_ptr);
(void) ret;
assert(ret == 0);
}
/* Test program starts here */
#define container_of(ptr, type, member) \
((type *) ((uint8_t *) (ptr) -offsetof(type, member)))
/* Lehmer RNG */
#define RANDOM_MULTIPLIER UINT64_C(48271)
#define RANDOM_MODULUS UINT64_C(2147483647)
#define RANDOM_MAX (RANDOM_MODULUS - 1)
#define RANDOM_NEXT(r) \
((uint32_t)((uint64_t)(r) *RANDOM_MULTIPLIER % RANDOM_MODULUS))
#define FATAL(fmt, ...) \
do { \
fprintf(stderr, fmt "\n", ##__VA_ARGS__); \
abort(); \
} while (0)
#define CHECK(expr, fmt, ...) \
do { \
if (!(expr)) \
FATAL("Asseration \"%s\" failed in %s (%s:%u): " fmt, #expr, \
__func__, __FILE__, __LINE__, ##__VA_ARGS__); \
} while (0)
#include <inttypes.h>
#include <stddef.h>
#include <stdio.h>
static uint32_t n_workers, n_tries;
static struct qsbr_queue queue;
static atomic_uint barrier;
static _Atomic uint64_t total_sum;
static struct qsbr_global qsbr_global;
static struct qsbr_queue_node *alloc_node(void)
{
struct qsbr_queue_node *node = wrap_malloc(sizeof(*node));
CHECK(node, "Allocating node failed");
CHECK((uintptr_t) node % 8 == 0, "Bad alignment");
return node;
}
static void free_node(struct qsbr_queue_node *node)
{
wrap_free(node);
}
static void free_qsbr_nodes(struct qsbr_entry *head)
{
while (head) {
struct qsbr_entry *next =
atomic_load_explicit(&head->next, memory_order_relaxed);
free_node(container_of(head, struct qsbr_queue_node, qsbr_entry));
head = next;
}
}
static void *worker(void *arg)
{
uint64_t sum = 0;
uint32_t n_enqueue = 0, n_dequeue = 0;
uint32_t r = (uint32_t)(uintptr_t) arg;
struct qsbr_local qsbr_local;
qsbr_init_local(&qsbr_local);
/* Wait until all threads are created. */
atomic_fetch_sub(&barrier, 1);
while (atomic_load_explicit(&barrier, memory_order_acquire) > 0)
;
while (n_enqueue < n_tries || n_dequeue < n_tries) {
r = RANDOM_NEXT(r);
uint32_t iters = r % 1024;
while (iters-- > 0 && n_enqueue < n_tries) {
struct qsbr_queue_node *node = alloc_node();
qsbr_queue_push(&queue, node, (void *) (uintptr_t)(n_enqueue + 1));
n_enqueue++;
}
r = RANDOM_NEXT(r);
iters = r % 1024;
while (iters-- > 0 && n_dequeue < n_tries) {
struct qsbr_queue_node *node;
void *value;
bool popped = qsbr_queue_pop(&queue, &node, &value);
if (!popped)
continue;
if (n_workers == 1)
free_node(node);
else
qsbr_free(&qsbr_global, &qsbr_local, &node->qsbr_entry);
sum += (uint32_t)(uintptr_t) value;
n_dequeue++;
}
struct qsbr_entry *to_free = qsbr_quiescent(&qsbr_global, &qsbr_local);
free_qsbr_nodes(to_free);
}
atomic_fetch_add(&total_sum, sum);
return NULL;
}
#include <unistd.h>
int main(void)
{
uint32_t seed = getpid() ^ (uintptr_t) &main;
n_workers = 32;
n_tries = 500;
struct thread *threads =
wrap_malloc((size_t) n_workers * sizeof(struct thread));
if (!threads) {
fputs("Allocating memory for threads failed\n", stderr);
return 1;
}
qsbr_init_global(&qsbr_global, n_workers);
struct qsbr_queue_node *node = alloc_node();
qsbr_queue_init(&queue, node);
atomic_store(&barrier, n_workers);
uint32_t r = seed;
for (uint32_t i = 0; i < n_workers; i++) {
r = RANDOM_NEXT(r);
int err = thread_create(&threads[i], worker, (void *) (uintptr_t) r);
if (err != 0) {
fprintf(stderr, "Creating thread failed, err=%d\n", err);
return 1;
}
}
for (uint32_t i = 0; i < n_workers; i++)
thread_join(&threads[i], NULL);
wrap_free(threads);
qsbr_queue_fini(&queue, &node);
free_node(node);
struct qsbr_entry *to_free1, *to_free2;
qsbr_fini_global(&qsbr_global, &to_free1, &to_free2);
free_qsbr_nodes(to_free1);
free_qsbr_nodes(to_free2);
uint64_t expected_sum =
((uint64_t) n_tries * ((uint64_t) n_tries + 1) / 2) * n_workers;
printf("sum: %" PRIu64 ", expected: %" PRIu64 "\n", atomic_load(&total_sum),
expected_sum);
return total_sum != expected_sum;
}
```
參考執行輸出:
```
sum: 4008000, expected: 4008000
```
預期不會遇到任何 `assert` 失敗。
請補完程式碼,使程式碼運作符合預期。
==作答區==
COND = ? (第 132 行)
AAA = ? (第 140 行)
BBB = ? (第 140 行)