# [2021q1](http://wiki.csie.ncku.edu.tw/linux/schedule) 第 8 週測驗題: 測驗 `1`
###### tags: `linux2021`
> [測驗題目總覽](https://hackmd.io/@sysprog/linux2021-quiz8)
:::info
本題目檢驗學員對 ==[並行程式設計](https://hackmd.io/@sysprog/concurrency)== 和 ==[第 7 週測驗題](https://hackmd.io/@sysprog/linux2021-quiz7)== 的認知
:::
[第 7 週測驗題](https://hackmd.io/@sysprog/linux2021-quiz7)的第 3 部分實作 SPSC (single-producer and single-consumer) 模型,以下則是 single-producer, multiple-consumer (SPMC) 佇列 (queue) 的並行實作,使用 [C11 Atomics](https://lwn.net/Articles/691128/): (`spmc.c`)
```cpp
/* A concurrent single-producer, multiple-consumer (SPMC) queue using C11
* Atomics. It is lock-free and atomic, allowing one enqueue-caller/producer,
* arbitrary amount of dequeue-callers/consumers.
*
* Known issue: if one has multiple consumers, some of them will be swapped
* off the CPU after grabbing curr_dequeue, and will have dequeued an element
* from a different node, if that node ends up having free space.
*/
#include <assert.h>
#include <limits.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <stdlib.h>
typedef struct __spmc_node {
size_t cap; /* One more than the number of slots available */
_Atomic size_t front, back;
struct __spmc_node *_Atomic next;
uintptr_t buf[];
} spmc_node_t;
typedef void (*spmc_destructor_t)(uintptr_t);
struct spmc_base {
/* current node which enqueues/dequeues */
spmc_node_t *_Atomic curr_enqueue, *_Atomic curr_dequeue;
uint8_t last_power;
spmc_destructor_t destructor;
};
typedef struct spmc_base *spmc_ref_t;
#define DEFAULT_INITIAL_POWER 6 /* Initial capacity: 64, as a power of two */
#define SIZE_FROM_CAP(cap, offset) ((cap) * sizeof(uintptr_t) + (offset))
#define MODULO(lhs, rhs) ((lhs) & (rhs - 1)) /* Requires rhs is power of 2 */
#define INDEX_OF(idx, node) (MODULO((idx), (node)->cap))
#define IS_READABLE(idx, node) ((node)->back - (idx) != 0)
#define IS_WRITABLE(idx, node) ((idx) - (node)->front < (node)->cap)
/* The head of the spmc resides contiguously after the spmc_base struct itself.
* Here, two objects are stored in the same block of memory, but are accessed
* separately.
*/
#define HEAD_OF(spmc) ((spmc_node_t *) (void *) ((spmc_ref_t)(spmc) + 1))
static void init_node(spmc_node_t *node, spmc_node_t *next, size_t cap)
{
node->cap = cap;
atomic_init(&node->front, 0), atomic_init(&node->back, 0);
atomic_init(&node->next, next);
}
/* In the event initial_cap is 0, the spmc will select a default capacity.
* Takes capacities as powers of two. i.e., initial_cap argument of 4 =>
* an allocation of ~16 machine words.
*/
spmc_ref_t spmc_new(size_t initial_cap, spmc_destructor_t destructor)
{
assert(initial_cap < sizeof(size_t) * CHAR_BIT);
const uint8_t power = initial_cap ? initial_cap : DEFAULT_INITIAL_POWER;
const size_t cap = 1 << power;
/* Allocate spmc_base and head spmc_node in the same underlying buffer */
spmc_ref_t spmc = malloc(
SIZE_FROM_CAP(cap, sizeof(struct spmc_base) + sizeof(spmc_node_t)));
spmc_node_t *const head = HEAD_OF(spmc);
init_node(head, head, cap);
atomic_init(&spmc->curr_enqueue, head);
atomic_init(&spmc->curr_dequeue, head);
spmc->destructor = destructor;
spmc->last_power = power;
return spmc;
}
/* Destroy the SPMC, freeing all nodes/elements now assoicated with it.
* Assume all users of the channel are done with it.
*/
void spmc_delete(spmc_ref_t spmc)
{
const spmc_node_t *const head = HEAD_OF(spmc);
spmc_node_t *prev;
if (spmc->destructor) {
for (spmc_node_t *node = head->next; node != head;
prev = node, node = node->next, free(prev))
for (size_t i = node->front; IS_READABLE(i, node); ++i)
spmc->destructor(node->buf[i]);
} else {
for (spmc_node_t *node = head->next; node != head;
prev = node, node = node->next, free(prev))
;
}
/* Also frees the head; it resides reside in the same buffer. */
free(spmc);
}
/* Send (enqueue) an item onto the SPMC */
bool spmc_enqueue(spmc_ref_t spmc, uintptr_t element)
{
spmc_node_t *node =
atomic_load_explicit(&spmc->curr_enqueue, memory_order_relaxed);
size_t idx;
retry:
idx = atomic_load_explicit(AAA, memory_order_consume);
if (!IS_WRITABLE(idx, node)) {
spmc_node_t *const next =
atomic_load_explicit(&node->next, memory_order_relaxed);
/* Never move to write on top of the node that is currently being read;
* In that case, items would be read out of order they were enqueued.
*/
if (next !=
atomic_load_explicit(&spmc->curr_dequeue, memory_order_relaxed)) {
node = next;
goto retry;
}
const uint8_t power = ++spmc->last_power;
assert(power < sizeof(size_t) * CHAR_BIT);
const size_t cap = 1 << power;
spmc_node_t *new_node = malloc(SIZE_FROM_CAP(cap, sizeof(spmc_node_t)));
if (!new_node)
return false;
init_node(new_node, next, cap);
atomic_store_explicit(&node->next, new_node, memory_order_release);
idx = 0;
node = new_node;
}
node->buf[INDEX_OF(idx, node)] = element;
atomic_store_explicit(&spmc->curr_enqueue, node, memory_order_relaxed);
atomic_fetch_add_explicit(BBB, CCC, memory_order_release);
return true;
}
/* Recieve (dequeue) an item from the SPMC */
bool spmc_dequeue(spmc_ref_t spmc, uintptr_t *slot)
{
spmc_node_t *node =
atomic_load_explicit(&spmc->curr_dequeue, memory_order_consume);
size_t idx;
no_increment:
do {
idx = atomic_load_explicit(DDD, memory_order_consume);
if (!IS_READABLE(idx, node)) {
if (node != spmc->curr_enqueue)
atomic_compare_exchange_strong(
&spmc->curr_dequeue, &node,
atomic_load_explicit(&node->next, memory_order_relaxed));
goto no_increment;
} else
*slot = node->buf[INDEX_OF(idx, node)];
} while (
!atomic_compare_exchange_weak(EEE, &(size_t){idx}, idx + 1));
return true;
}
#include <pthread.h>
#include <stdio.h>
#include <string.h>
#define N_ITEMS (1024UL * 8)
static void *producer_thread(void *arg)
{
spmc_ref_t spmc = arg;
for (uintptr_t i = 0; i < N_ITEMS; ++i) {
if (!spmc_enqueue(spmc, i))
fprintf(stderr, "Failed to enqueue on %zu.\n", (size_t) i);
}
return NULL;
}
#define N_MC_ITEMS (1024UL * 8)
static _Atomic size_t observed_count[N_MC_ITEMS + 1];
static void *mc_thread(void *arg)
{
spmc_ref_t spmc = arg;
uintptr_t element = 0, greatest = 0;
for (;;) {
greatest = (greatest > element) ? greatest : element;
if (!spmc_dequeue(spmc, &element))
fprintf(stderr, "Failed to dequeue in mc_thread.\n");
else if (observed_count[element]++)
fprintf(stderr, "Consumed twice!\n");
else if (element < greatest)
fprintf(stderr, "%zu after %zu; bad order!\n", (size_t) element,
(size_t) greatest);
printf("Observed %zu.\n", (size_t) element);
/* Test for sentinel signalling termination */
if (element >= (N_MC_ITEMS - 1)) {
spmc_enqueue(spmc, element + 1); /* notify other threads */
break;
}
}
return NULL;
}
#define N_MC_THREADS 16
int main()
{
spmc_ref_t spmc = spmc_new(0, NULL);
pthread_t mc[N_MC_THREADS], producer;
pthread_create(&producer, NULL, producer_thread, spmc);
for (int i = 0; i < N_MC_THREADS; i++)
pthread_create(&mc[i], NULL, mc_thread, spmc);
pthread_join(producer, NULL);
for (int i = 0; i < N_MC_THREADS; i++)
pthread_join(mc[i], NULL);
for (size_t i = 0; i < N_MC_ITEMS; ++i) {
if (observed_count[i] == 1)
continue;
fprintf(stderr, "An item seen %zu times: %zu.\n", observed_count[i], i);
}
spmc_delete(spmc);
return 0;
}
```
編譯方式:
```shell
$ gcc -Wall -o spmc spmc.c -lpthread
```
執行輸出: (僅是其中一種可能)
```
Observed 0.
Observed 2.
Observed 3.
Observed 4.
...
Observed 8205.
Observed 8190.
Observed 8206.
```
注意 `Observed 8206` 之前的數字可能會非遞增排列,但都是 `Observed` 後接數字的形式,沒有其他錯誤訊息。
請依據題目描述和程式碼註解,補完程式碼,使其運作符合預期。
參考資料:
* [Toward a Better Use of C11 Atomics – Part 1](https://developers.redhat.com/blog/2016/01/14/toward-a-better-use-of-c11-atomics-part-1/)
* [Toward a Better Use of C11 Atomics – Part 2](https://developers.redhat.com/blog/2016/01/19/toward-a-better-use-of-c11-atomics-part-2/)
==作答區==
AAA = ?
BBB = ?
CCC = ?
DDD = ?
EEE = ?
:::success
延伸問題:
1. 解釋上述程式碼運作原理,可搭配[第 7 週測驗題](https://hackmd.io/@sysprog/linux2021-quiz7)的第 3 部分,說明從 SPSC 到 SPMC 做了哪些變更?
2. 當啟用 [ThreadSanitizer](https://github.com/google/sanitizers) (簡稱 TSan) 時,`spmc.c` 程式碼會遇到以下警告 [data race](https://en.wikipedia.org/wiki/Race_condition#Data_race) 的訊息,請嘗試排除:
```cpp
WARNING: ThreadSanitizer: data race (pid=1037062)
Atomic read of size 8 at 0x7b7400000008 by thread T2:
#0 __tsan_atomic64_load <null> (libtsan.so.0+0x7cec7)
#1 spmc_dequeue spmc.c:147 (spmc+0x1880)
#2 mc_thread spmc.c:186 (spmc+0x1a9f)
#3 <null> <null> (libtsan.so.0+0x2d1af)
Previous write of size 8 at 0x7b7400000008 by thread T1:
[failed to restore the stack]
Location is heap block of size 2080 at 0x7b7400000000 allocated by thread T1:
#0 malloc <null> (libtsan.so.0+0x30343)
#1 spmc_enqueue spmc.c:124 (spmc+0x16d4)
#2 producer_thread spmc.c:170 (spmc+0x17ef)
#3 <null> <null> (libtsan.so.0+0x2d1af)
Thread T2 (tid=1037065, running) created by main thread at:
#0 pthread_create <null> (libtsan.so.0+0x5ea99)
#1 main spmc.c:212 (spmc+0x1be8)
Thread T1 (tid=1037064, finished) created by main thread at:
#0 pthread_create <null> (libtsan.so.0+0x5ea99)
#1 main spmc.c:210 (spmc+0x1bc5)
SUMMARY: ThreadSanitizer: data race (/usr/lib/x86_64-linux-gnu/libtsan.so.0+0x7cec7) in __tsan_atomic64_load
```
:::