# [2020q1](http://wiki.csie.ncku.edu.tw/linux/schedule) 第 16 週測驗題
###### tags: `linux2020`
:::info
目的: 檢驗學員對 Linux 系統呼叫和 ring buffer 的認知
:::
==[作答表單](https://docs.google.com/forms/d/e/1FAIpQLSdnzDAQe7R8LRq64cOmjO2Y_ZjB3u8L2_2A7_TeEdNpay6ZxQ/viewform)==
---
## 測驗 `1`
考慮以下 ring buffer 的使用:
```c
static void *producer_main(void *arg)
{
ringbuf_t *ringbuf = arg;
uint32_t counter = 0;
while (counter <= 1000000) {
void *ptr;
const size_t towrite = sizeof(uint32_t);
if ((ptr = ringbuf_write_request(ringbuf, towrite))) {
// write 'towrite' bytes to 'ptr'
*(uint32_t *) ptr = counter++;
ringbuf_write_advance(ringbuf, towrite);
}
}
return NULL;
}
static void *consumer_main(void *arg)
{
ringbuf_t *ringbuf = arg;
while (1) {
const void *ptr;
size_t toread;
if ((ptr = ringbuf_read_request(ringbuf, &toread))) {
// read 'toread' bytes from 'ptr'
if (*(uint32_t *) ptr >= 1000000) break;
ringbuf_read_advance(ringbuf);
}
}
return NULL;
}
int main()
{
pthread_t producer, consumer;
ringbuf_t *ringbuf = ringbuf_new(8192, true);
if (!ringbuf) return -1;
pthread_create(&consumer, NULL, consumer_main, ringbuf);
pthread_create(&producer, NULL, producer_main, ringbuf);
pthread_join(producer, NULL);
pthread_join(consumer, NULL);
ringbuf_free(ringbuf);
return 0;
}
```
在上述生產者—消費者之間,即以 ring buffer 進行通訊。考慮在 x86_64 GNU/Linux 環境,C11 實作和對應的測試程式碼如下:
```c
#include <assert.h>
#include <fcntl.h>
#include <pthread.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <time.h>
#include <unistd.h>
#define RINGBUF_PAD(SIZE) XXX
typedef struct {
uint32_t size, gap;
} ringbuf_element_t;
typedef struct {
size_t size, mask, rsvd /* reserved */, gapd;
memory_order acquire, release;
atomic_size_t head, tail;
uint8_t buf[] __attribute__((aligned(sizeof(ringbuf_element_t))));
} ringbuf_t;
static inline size_t ringbuf_body_size(size_t minimum)
{
size_t size = 1;
while (size < minimum) size <<= 1; // assure size to be a power of 2
return size;
}
static inline void ringbuf_init(ringbuf_t *ringbuf,
size_t body_size,
bool release_and_acquire)
{
ringbuf->acquire =
release_and_acquire ? memory_order_acquire : memory_order_relaxed;
ringbuf->release =
release_and_acquire ? memory_order_release : memory_order_relaxed;
atomic_init(&ringbuf->head, 0);
atomic_init(&ringbuf->tail, 0);
ringbuf->size = body_size;
ringbuf->mask = ringbuf->size - 1;
}
static inline ringbuf_t *ringbuf_new(size_t minimum, bool release_and_acquire)
{
ringbuf_t *ringbuf = NULL;
const size_t body_size = ringbuf_body_size(minimum);
const size_t total_size = sizeof(ringbuf_t) + body_size;
posix_memalign((void **) &ringbuf, sizeof(ringbuf_element_t), total_size);
mlock(ringbuf, total_size); // prevent memory from being flushed to disk
if (ringbuf) ringbuf_init(ringbuf, body_size, release_and_acquire);
return ringbuf;
}
static inline void ringbuf_free(ringbuf_t *ringbuf)
{
if (!ringbuf) return;
munlock(ringbuf->buf, ringbuf->size);
free(ringbuf);
}
static inline void _ringbuf_write_advance_raw(ringbuf_t *ringbuf,
size_t head,
size_t written)
{
// only producer is allowed to advance write head
const size_t new_head = (head + written) & ringbuf->mask;
atomic_store_explicit(&ringbuf->head, new_head, ringbuf->release);
}
static inline void *ringbuf_write_request_max(ringbuf_t *ringbuf,
size_t minimum,
size_t *maximum)
{
assert(ringbuf);
size_t space; // size of writable buffer
size_t end; // virtual end of writable buffer
const size_t head = atomic_load_explicit(
&ringbuf->head, memory_order_relaxed); // read head
const size_t tail = atomic_load_explicit(
&ringbuf->tail,
ringbuf->acquire); // read tail (consumer modifies it any time)
const size_t padded = 2 * sizeof(ringbuf_element_t) + RINGBUF_PAD(minimum);
// calculate writable space
if (head > tail)
space = ((tail - head + ringbuf->size) & ringbuf->mask) - 1;
else if (head < tail)
space = (tail - head) - 1;
else // head == tail
space = ringbuf->size - 1;
end = head + space;
if (end > ringbuf->size) { // available region wraps over at end of buffer
// get first part of available buffer
uint8_t *buf1 = ringbuf->buf + head;
const size_t len1 = ringbuf->size - head;
if (len1 < padded) { // not enough space left on first part of buffer
// get second part of available buffer
uint8_t *buf2 = ringbuf->buf;
const size_t len2 = end OP1 ringbuf->mask;
if (len2 < padded) { // not enough space left on second buffer
ringbuf->rsvd = 0;
ringbuf->gapd = 0;
if (maximum) *maximum = ringbuf->rsvd;
return NULL;
}
// enough space left on second buffer, use it!
ringbuf->rsvd = len2;
ringbuf->gapd = len1;
if (maximum) *maximum = ringbuf->rsvd;
return buf2 + sizeof(ringbuf_element_t);
}
// enough space left on first part of buffer, use it!
ringbuf->rsvd = len1;
ringbuf->gapd = 0;
if (maximum) *maximum = ringbuf->rsvd;
return buf1 + sizeof(ringbuf_element_t);
}
// available region is contiguous
uint8_t *buf = ringbuf->buf + head;
if (space < padded) { // no space left on contiguous buffer
ringbuf->rsvd = 0;
ringbuf->gapd = 0;
if (maximum) *maximum = ringbuf->rsvd;
return NULL;
}
// enough space left on contiguous buffer, use it!
ringbuf->rsvd = space;
ringbuf->gapd = 0;
if (maximum) *maximum = ringbuf->rsvd;
return buf + sizeof(ringbuf_element_t);
}
static inline void *ringbuf_write_request(ringbuf_t *ringbuf, size_t minimum)
{
return ringbuf_write_request_max(ringbuf, minimum, NULL);
}
static inline void ringbuf_write_advance(ringbuf_t *ringbuf, size_t written)
{
assert(ringbuf);
// fail miserably if stupid programmer tries to write more than rsvd
assert(written <= ringbuf->rsvd);
// write element header at head
const size_t head =
atomic_load_explicit(&ringbuf->head, memory_order_relaxed);
if (ringbuf->gapd > 0) {
// fill end of first buffer with gap
ringbuf_element_t *element =
(ringbuf_element_t *) (ringbuf->buf + head);
element->size = ringbuf->gapd - sizeof(ringbuf_element_t);
element->gap = 1;
// fill written element header
element = (void *) ringbuf->buf;
element->size = written;
element->gap = 0;
} else { // ringbuf->gapd == 0
// fill written element header
ringbuf_element_t *element =
(ringbuf_element_t *) (ringbuf->buf + head);
element->size = written;
element->gap = 0;
}
// advance write head
_ringbuf_write_advance_raw(
ringbuf, head,
ringbuf->gapd + sizeof(ringbuf_element_t) + RINGBUF_PAD(written));
}
static inline void _ringbuf_read_advance_raw(ringbuf_t *ringbuf,
size_t tail,
size_t read)
{
// only consumer is allowed to advance read tail
const size_t new_tail = (tail + read) OP2 ringbuf->mask;
atomic_store_explicit(&ringbuf->tail, new_tail, ringbuf->release);
}
static inline const void *ringbuf_read_request(ringbuf_t *ringbuf,
size_t *toread)
{
assert(ringbuf);
size_t space; // size of available buffer
const size_t tail = atomic_load_explicit(
&ringbuf->tail, memory_order_relaxed); // read tail
const size_t head = atomic_load_explicit(
&ringbuf->head,
ringbuf->acquire); // read head (producer modifies it any time)
// calculate readable space
if (head > tail)
space = head - tail;
else
space = (head - tail + ringbuf->size) & ringbuf->mask;
if (space > 0) { // there may be chunks available for reading
const size_t end = tail + space; // virtual end of available buffer
if (end > ringbuf->size) { // available buffer wraps around at end
// first part of available buffer
const uint8_t *buf1 = ringbuf->buf + tail;
const size_t len1 = ringbuf->size - tail;
const ringbuf_element_t *element = (const ringbuf_element_t *) buf1;
if (element->gap) { // gap element?
// skip gap
_ringbuf_read_advance_raw(ringbuf, tail, len1);
// second part of available buffer
const uint8_t *buf2 = ringbuf->buf;
// there will always be at least on element after a gap
element = (const ringbuf_element_t *) buf2;
*toread = element->size;
return buf2 + sizeof(ringbuf_element_t);
}
// valid chunk, use it!
*toread = element->size;
return buf1 + sizeof(ringbuf_element_t);
}
// available buffer is contiguous
const uint8_t *buf = ringbuf->buf + tail;
const ringbuf_element_t *element = (const ringbuf_element_t *) buf;
*toread = element->size;
return buf + sizeof(ringbuf_element_t);
}
// no chunks available. That is, empty buffer
*toread = 0;
return NULL;
}
static inline void ringbuf_read_advance(ringbuf_t *ringbuf)
{
assert(ringbuf);
// get element header from tail (for size)
const size_t tail =
atomic_load_explicit(&ringbuf->tail, memory_order_relaxed);
const ringbuf_element_t *element =
(const ringbuf_element_t *) (ringbuf->buf + tail);
// advance read tail
_ringbuf_read_advance_raw(
ringbuf, tail, sizeof(ringbuf_element_t) + RINGBUF_PAD(element->size));
}
/* Test program */
static const struct timespec req = {.tv_sec = 0, .tv_nsec = 1};
static uint64_t iterations = 10000;
#define THRESHOLD (RAND_MAX / 256)
static void *producer_main(void *arg)
{
ringbuf_t *ringbuf = arg;
uint64_t cnt = 0;
while (cnt < iterations) {
if (rand() < THRESHOLD) nanosleep(&req, NULL);
size_t written = RINGBUF_PAD(rand() * 1024.f / RAND_MAX);
size_t maximum;
uint8_t *ptr;
if ((ptr = ringbuf_write_request_max(ringbuf, written, &maximum))) {
assert(maximum >= written);
const uint8_t *end = ptr + written;
for (uint8_t *src = ptr; src < end; src += sizeof(uint64_t)) {
*(uint64_t *) src = cnt;
assert(*(uint64_t *) src == cnt);
}
ringbuf_write_advance(ringbuf, written);
cnt++;
} /* else: buffer full */
}
return NULL;
}
static void *consumer_main(void *arg)
{
ringbuf_t *ringbuf = arg;
uint64_t cnt = 0;
while (cnt < iterations) {
if (rand() < THRESHOLD) nanosleep(&req, NULL);
const uint8_t *ptr;
size_t toread;
if ((ptr = ringbuf_read_request(ringbuf, &toread))) {
const uint8_t *end = ptr + toread;
for (const uint8_t *src = ptr; src < end; src += sizeof(uint64_t))
assert(*(const uint64_t *) src == cnt);
ringbuf_read_advance(ringbuf);
cnt++;
} /* else: buffer empty */
}
return NULL;
}
static void test_threaded()
{
pthread_t producer, consumer;
ringbuf_t *ringbuf = ringbuf_new(8192, true);
assert(ringbuf);
pthread_create(&consumer, NULL, consumer_main, ringbuf);
pthread_create(&producer, NULL, producer_main, ringbuf);
pthread_join(producer, NULL);
pthread_join(consumer, NULL);
ringbuf_free(ringbuf);
}
typedef struct {
char *name;
int fd;
ringbuf_t *ringbuf;
} ringbuf_shm_t;
static int ringbuf_shm_init(ringbuf_shm_t *ringbuf_shm,
const char *name,
size_t minimum,
bool release_and_acquire)
{
const size_t body_size = ringbuf_body_size(minimum);
const size_t total_size = sizeof(ringbuf_t) + body_size;
ringbuf_shm->name = strdup(name);
if (!ringbuf_shm->name) return -1;
bool is_first = true;
ringbuf_shm->fd = shm_open(ringbuf_shm->name, O_RDWR | O_CREAT | O_EXCL,
S_IRUSR | S_IWUSR);
if (ringbuf_shm->fd == -1) {
is_first = false;
ringbuf_shm->fd =
shm_open(ringbuf_shm->name, O_RDWR, S_IRUSR | S_IWUSR);
}
if (ringbuf_shm->fd == -1) {
free(ringbuf_shm->name);
return -1;
}
if ((ftruncate(ringbuf_shm->fd, total_size) == -1) ||
((ringbuf_shm->ringbuf = mmap(NULL, total_size, PROT_READ | PROT_WRITE,
MAP_SHARED, ringbuf_shm->fd, 0)) ==
MAP_FAILED)) {
shm_unlink(ringbuf_shm->name);
close(ringbuf_shm->fd);
free(ringbuf_shm->name);
return -1;
}
if (is_first)
ringbuf_init(ringbuf_shm->ringbuf, body_size, release_and_acquire);
return 0;
}
static void ringbuf_shm_deinit(ringbuf_shm_t *ringbuf_shm)
{
const size_t total_size = sizeof(ringbuf_t) + ringbuf_shm->ringbuf->size;
munmap(ringbuf_shm->ringbuf, total_size);
shm_unlink(ringbuf_shm->name);
close(ringbuf_shm->fd);
free(ringbuf_shm->name);
}
static void test_shared()
{
pid_t pid = fork();
assert(pid != -1);
const char *name = "/ringbuf_shm_test";
if (pid == 0) { // child
ringbuf_shm_t ringbuf_shm;
assert(ringbuf_shm_init(&ringbuf_shm, name, 8192, true) == 0);
consumer_main(ringbuf_shm.ringbuf);
ringbuf_shm_deinit(&ringbuf_shm);
} else { // parent
ringbuf_shm_t ringbuf_shm;
assert(ringbuf_shm_init(&ringbuf_shm, name, 8192, true) == 0);
producer_main(ringbuf_shm.ringbuf);
ringbuf_shm_deinit(&ringbuf_shm);
}
}
int main()
{
srand(time(NULL));
test_threaded();
test_shared();
return 0;
}
```
請補完程式碼。
==作答區==
XXX = ?
* `(a)` `(((size_t)(SIZE) + 7U) & (~7U))`
* `(b)` `(((size_t)(SIZE) + 6U) & (~6U))`
* `(c)` `(((size_t)(SIZE) + 5U) & (~5U))`
* `(d)` `(((size_t)(SIZE) + 4U) & (~4U))`
* `(e)` `(((size_t)(SIZE) + 3U) & (~3U))`
* `(f)` `(((size_t)(SIZE) + 2U) & (~2U))`
* `(g)` `(((size_t)(SIZE) + 1U) & (~1U))`
OP1 = ?
* `(a)` `^`
* `(b)` `|`
* `(c)` `&`
OP2 = ?
* `(a)` `^`
* `(b)` `|`
* `(c)` `&`
延伸閱讀:
* [簡介 C++11 atomic 和 memory order](https://medium.com/fcamels-notes/%E7%B0%A1%E4%BB%8B-c-11-memory-model-b3f4ed81fea6)
* [The C/C++11 memory model](https://people.mpi-sws.org/~viktor/wmc/c11.pdf)