linux2020
目的: 檢驗學員對 Linux 系統呼叫和 ring buffer 的認知
1
考慮以下 ring buffer 的使用:
static void *producer_main(void *arg)
{
ringbuf_t *ringbuf = arg;
uint32_t counter = 0;
while (counter <= 1000000) {
void *ptr;
const size_t towrite = sizeof(uint32_t);
if ((ptr = ringbuf_write_request(ringbuf, towrite))) {
// write 'towrite' bytes to 'ptr'
*(uint32_t *) ptr = counter++;
ringbuf_write_advance(ringbuf, towrite);
}
}
return NULL;
}
static void *consumer_main(void *arg)
{
ringbuf_t *ringbuf = arg;
while (1) {
const void *ptr;
size_t toread;
if ((ptr = ringbuf_read_request(ringbuf, &toread))) {
// read 'toread' bytes from 'ptr'
if (*(uint32_t *) ptr >= 1000000) break;
ringbuf_read_advance(ringbuf);
}
}
return NULL;
}
int main()
{
pthread_t producer, consumer;
ringbuf_t *ringbuf = ringbuf_new(8192, true);
if (!ringbuf) return -1;
pthread_create(&consumer, NULL, consumer_main, ringbuf);
pthread_create(&producer, NULL, producer_main, ringbuf);
pthread_join(producer, NULL);
pthread_join(consumer, NULL);
ringbuf_free(ringbuf);
return 0;
}
在上述生產者—消費者之間,即以 ring buffer 進行通訊。考慮在 x86_64 GNU/Linux 環境,C11 實作和對應的測試程式碼如下:
#include <assert.h>
#include <fcntl.h>
#include <pthread.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <time.h>
#include <unistd.h>
#define RINGBUF_PAD(SIZE) XXX
typedef struct {
uint32_t size, gap;
} ringbuf_element_t;
typedef struct {
size_t size, mask, rsvd /* reserved */, gapd;
memory_order acquire, release;
atomic_size_t head, tail;
uint8_t buf[] __attribute__((aligned(sizeof(ringbuf_element_t))));
} ringbuf_t;
static inline size_t ringbuf_body_size(size_t minimum)
{
size_t size = 1;
while (size < minimum) size <<= 1; // assure size to be a power of 2
return size;
}
static inline void ringbuf_init(ringbuf_t *ringbuf,
size_t body_size,
bool release_and_acquire)
{
ringbuf->acquire =
release_and_acquire ? memory_order_acquire : memory_order_relaxed;
ringbuf->release =
release_and_acquire ? memory_order_release : memory_order_relaxed;
atomic_init(&ringbuf->head, 0);
atomic_init(&ringbuf->tail, 0);
ringbuf->size = body_size;
ringbuf->mask = ringbuf->size - 1;
}
static inline ringbuf_t *ringbuf_new(size_t minimum, bool release_and_acquire)
{
ringbuf_t *ringbuf = NULL;
const size_t body_size = ringbuf_body_size(minimum);
const size_t total_size = sizeof(ringbuf_t) + body_size;
posix_memalign((void **) &ringbuf, sizeof(ringbuf_element_t), total_size);
mlock(ringbuf, total_size); // prevent memory from being flushed to disk
if (ringbuf) ringbuf_init(ringbuf, body_size, release_and_acquire);
return ringbuf;
}
static inline void ringbuf_free(ringbuf_t *ringbuf)
{
if (!ringbuf) return;
munlock(ringbuf->buf, ringbuf->size);
free(ringbuf);
}
static inline void _ringbuf_write_advance_raw(ringbuf_t *ringbuf,
size_t head,
size_t written)
{
// only producer is allowed to advance write head
const size_t new_head = (head + written) & ringbuf->mask;
atomic_store_explicit(&ringbuf->head, new_head, ringbuf->release);
}
static inline void *ringbuf_write_request_max(ringbuf_t *ringbuf,
size_t minimum,
size_t *maximum)
{
assert(ringbuf);
size_t space; // size of writable buffer
size_t end; // virtual end of writable buffer
const size_t head = atomic_load_explicit(
&ringbuf->head, memory_order_relaxed); // read head
const size_t tail = atomic_load_explicit(
&ringbuf->tail,
ringbuf->acquire); // read tail (consumer modifies it any time)
const size_t padded = 2 * sizeof(ringbuf_element_t) + RINGBUF_PAD(minimum);
// calculate writable space
if (head > tail)
space = ((tail - head + ringbuf->size) & ringbuf->mask) - 1;
else if (head < tail)
space = (tail - head) - 1;
else // head == tail
space = ringbuf->size - 1;
end = head + space;
if (end > ringbuf->size) { // available region wraps over at end of buffer
// get first part of available buffer
uint8_t *buf1 = ringbuf->buf + head;
const size_t len1 = ringbuf->size - head;
if (len1 < padded) { // not enough space left on first part of buffer
// get second part of available buffer
uint8_t *buf2 = ringbuf->buf;
const size_t len2 = end OP1 ringbuf->mask;
if (len2 < padded) { // not enough space left on second buffer
ringbuf->rsvd = 0;
ringbuf->gapd = 0;
if (maximum) *maximum = ringbuf->rsvd;
return NULL;
}
// enough space left on second buffer, use it!
ringbuf->rsvd = len2;
ringbuf->gapd = len1;
if (maximum) *maximum = ringbuf->rsvd;
return buf2 + sizeof(ringbuf_element_t);
}
// enough space left on first part of buffer, use it!
ringbuf->rsvd = len1;
ringbuf->gapd = 0;
if (maximum) *maximum = ringbuf->rsvd;
return buf1 + sizeof(ringbuf_element_t);
}
// available region is contiguous
uint8_t *buf = ringbuf->buf + head;
if (space < padded) { // no space left on contiguous buffer
ringbuf->rsvd = 0;
ringbuf->gapd = 0;
if (maximum) *maximum = ringbuf->rsvd;
return NULL;
}
// enough space left on contiguous buffer, use it!
ringbuf->rsvd = space;
ringbuf->gapd = 0;
if (maximum) *maximum = ringbuf->rsvd;
return buf + sizeof(ringbuf_element_t);
}
static inline void *ringbuf_write_request(ringbuf_t *ringbuf, size_t minimum)
{
return ringbuf_write_request_max(ringbuf, minimum, NULL);
}
static inline void ringbuf_write_advance(ringbuf_t *ringbuf, size_t written)
{
assert(ringbuf);
// fail miserably if stupid programmer tries to write more than rsvd
assert(written <= ringbuf->rsvd);
// write element header at head
const size_t head =
atomic_load_explicit(&ringbuf->head, memory_order_relaxed);
if (ringbuf->gapd > 0) {
// fill end of first buffer with gap
ringbuf_element_t *element =
(ringbuf_element_t *) (ringbuf->buf + head);
element->size = ringbuf->gapd - sizeof(ringbuf_element_t);
element->gap = 1;
// fill written element header
element = (void *) ringbuf->buf;
element->size = written;
element->gap = 0;
} else { // ringbuf->gapd == 0
// fill written element header
ringbuf_element_t *element =
(ringbuf_element_t *) (ringbuf->buf + head);
element->size = written;
element->gap = 0;
}
// advance write head
_ringbuf_write_advance_raw(
ringbuf, head,
ringbuf->gapd + sizeof(ringbuf_element_t) + RINGBUF_PAD(written));
}
static inline void _ringbuf_read_advance_raw(ringbuf_t *ringbuf,
size_t tail,
size_t read)
{
// only consumer is allowed to advance read tail
const size_t new_tail = (tail + read) OP2 ringbuf->mask;
atomic_store_explicit(&ringbuf->tail, new_tail, ringbuf->release);
}
static inline const void *ringbuf_read_request(ringbuf_t *ringbuf,
size_t *toread)
{
assert(ringbuf);
size_t space; // size of available buffer
const size_t tail = atomic_load_explicit(
&ringbuf->tail, memory_order_relaxed); // read tail
const size_t head = atomic_load_explicit(
&ringbuf->head,
ringbuf->acquire); // read head (producer modifies it any time)
// calculate readable space
if (head > tail)
space = head - tail;
else
space = (head - tail + ringbuf->size) & ringbuf->mask;
if (space > 0) { // there may be chunks available for reading
const size_t end = tail + space; // virtual end of available buffer
if (end > ringbuf->size) { // available buffer wraps around at end
// first part of available buffer
const uint8_t *buf1 = ringbuf->buf + tail;
const size_t len1 = ringbuf->size - tail;
const ringbuf_element_t *element = (const ringbuf_element_t *) buf1;
if (element->gap) { // gap element?
// skip gap
_ringbuf_read_advance_raw(ringbuf, tail, len1);
// second part of available buffer
const uint8_t *buf2 = ringbuf->buf;
// there will always be at least on element after a gap
element = (const ringbuf_element_t *) buf2;
*toread = element->size;
return buf2 + sizeof(ringbuf_element_t);
}
// valid chunk, use it!
*toread = element->size;
return buf1 + sizeof(ringbuf_element_t);
}
// available buffer is contiguous
const uint8_t *buf = ringbuf->buf + tail;
const ringbuf_element_t *element = (const ringbuf_element_t *) buf;
*toread = element->size;
return buf + sizeof(ringbuf_element_t);
}
// no chunks available. That is, empty buffer
*toread = 0;
return NULL;
}
static inline void ringbuf_read_advance(ringbuf_t *ringbuf)
{
assert(ringbuf);
// get element header from tail (for size)
const size_t tail =
atomic_load_explicit(&ringbuf->tail, memory_order_relaxed);
const ringbuf_element_t *element =
(const ringbuf_element_t *) (ringbuf->buf + tail);
// advance read tail
_ringbuf_read_advance_raw(
ringbuf, tail, sizeof(ringbuf_element_t) + RINGBUF_PAD(element->size));
}
/* Test program */
static const struct timespec req = {.tv_sec = 0, .tv_nsec = 1};
static uint64_t iterations = 10000;
#define THRESHOLD (RAND_MAX / 256)
static void *producer_main(void *arg)
{
ringbuf_t *ringbuf = arg;
uint64_t cnt = 0;
while (cnt < iterations) {
if (rand() < THRESHOLD) nanosleep(&req, NULL);
size_t written = RINGBUF_PAD(rand() * 1024.f / RAND_MAX);
size_t maximum;
uint8_t *ptr;
if ((ptr = ringbuf_write_request_max(ringbuf, written, &maximum))) {
assert(maximum >= written);
const uint8_t *end = ptr + written;
for (uint8_t *src = ptr; src < end; src += sizeof(uint64_t)) {
*(uint64_t *) src = cnt;
assert(*(uint64_t *) src == cnt);
}
ringbuf_write_advance(ringbuf, written);
cnt++;
} /* else: buffer full */
}
return NULL;
}
static void *consumer_main(void *arg)
{
ringbuf_t *ringbuf = arg;
uint64_t cnt = 0;
while (cnt < iterations) {
if (rand() < THRESHOLD) nanosleep(&req, NULL);
const uint8_t *ptr;
size_t toread;
if ((ptr = ringbuf_read_request(ringbuf, &toread))) {
const uint8_t *end = ptr + toread;
for (const uint8_t *src = ptr; src < end; src += sizeof(uint64_t))
assert(*(const uint64_t *) src == cnt);
ringbuf_read_advance(ringbuf);
cnt++;
} /* else: buffer empty */
}
return NULL;
}
static void test_threaded()
{
pthread_t producer, consumer;
ringbuf_t *ringbuf = ringbuf_new(8192, true);
assert(ringbuf);
pthread_create(&consumer, NULL, consumer_main, ringbuf);
pthread_create(&producer, NULL, producer_main, ringbuf);
pthread_join(producer, NULL);
pthread_join(consumer, NULL);
ringbuf_free(ringbuf);
}
typedef struct {
char *name;
int fd;
ringbuf_t *ringbuf;
} ringbuf_shm_t;
static int ringbuf_shm_init(ringbuf_shm_t *ringbuf_shm,
const char *name,
size_t minimum,
bool release_and_acquire)
{
const size_t body_size = ringbuf_body_size(minimum);
const size_t total_size = sizeof(ringbuf_t) + body_size;
ringbuf_shm->name = strdup(name);
if (!ringbuf_shm->name) return -1;
bool is_first = true;
ringbuf_shm->fd = shm_open(ringbuf_shm->name, O_RDWR | O_CREAT | O_EXCL,
S_IRUSR | S_IWUSR);
if (ringbuf_shm->fd == -1) {
is_first = false;
ringbuf_shm->fd =
shm_open(ringbuf_shm->name, O_RDWR, S_IRUSR | S_IWUSR);
}
if (ringbuf_shm->fd == -1) {
free(ringbuf_shm->name);
return -1;
}
if ((ftruncate(ringbuf_shm->fd, total_size) == -1) ||
((ringbuf_shm->ringbuf = mmap(NULL, total_size, PROT_READ | PROT_WRITE,
MAP_SHARED, ringbuf_shm->fd, 0)) ==
MAP_FAILED)) {
shm_unlink(ringbuf_shm->name);
close(ringbuf_shm->fd);
free(ringbuf_shm->name);
return -1;
}
if (is_first)
ringbuf_init(ringbuf_shm->ringbuf, body_size, release_and_acquire);
return 0;
}
static void ringbuf_shm_deinit(ringbuf_shm_t *ringbuf_shm)
{
const size_t total_size = sizeof(ringbuf_t) + ringbuf_shm->ringbuf->size;
munmap(ringbuf_shm->ringbuf, total_size);
shm_unlink(ringbuf_shm->name);
close(ringbuf_shm->fd);
free(ringbuf_shm->name);
}
static void test_shared()
{
pid_t pid = fork();
assert(pid != -1);
const char *name = "/ringbuf_shm_test";
if (pid == 0) { // child
ringbuf_shm_t ringbuf_shm;
assert(ringbuf_shm_init(&ringbuf_shm, name, 8192, true) == 0);
consumer_main(ringbuf_shm.ringbuf);
ringbuf_shm_deinit(&ringbuf_shm);
} else { // parent
ringbuf_shm_t ringbuf_shm;
assert(ringbuf_shm_init(&ringbuf_shm, name, 8192, true) == 0);
producer_main(ringbuf_shm.ringbuf);
ringbuf_shm_deinit(&ringbuf_shm);
}
}
int main()
{
srand(time(NULL));
test_threaded();
test_shared();
return 0;
}
請補完程式碼。
作答區
XXX = ?
(a)
(((size_t)(SIZE) + 7U) & (~7U))
(b)
(((size_t)(SIZE) + 6U) & (~6U))
(c)
(((size_t)(SIZE) + 5U) & (~5U))
(d)
(((size_t)(SIZE) + 4U) & (~4U))
(e)
(((size_t)(SIZE) + 3U) & (~3U))
(f)
(((size_t)(SIZE) + 2U) & (~2U))
(g)
(((size_t)(SIZE) + 1U) & (~1U))
OP1 = ?
(a)
^
(b)
|
(c)
&
OP2 = ?
(a)
^
(b)
|
(c)
&
延伸閱讀:
回歸第一手資料,透過反思 C 語言程式設計的細節,重新學習電腦原理
Jul 21, 2025自 Linux 核心原始程式碼編譯出 User-mode Linux 並運用工具來建構實驗所需的檔案系統
Jul 20, 2025系統的 throughput 是評斷排程器優劣的重要效能
Jul 19, 2025無論是作業系統核心、C 語言函式庫內部、程式開發框架,到應用程式,都不難見到 linked list 的身影,包含多種針對效能和安全議題所做的 linked list 變形,又還要考慮到應用程式的泛用性 (generic programming),是很好的進階題材。
Jul 17, 2025or
By clicking below, you agree to our terms of service.
New to HackMD? Sign up