Try   HackMD

2020q1 第 13 週測驗題

tags: linux2020

目的: 檢驗學員對 mmap 及 memfd 系統呼叫的認知

作答表單


測驗 1

考慮一個特別的 circular buffer 實作,嘗試透過 mmap 系統呼叫以化簡緩衝區邊界處理的議題。

A circular-buffer implementation may be optimized by mapping the underlying buffer to two contiguous regions of virtual memory. (Naturally, the underlying buffer‘s length must then equal some multiple of the system’s page size.) Reading from and writing to the circular buffer may then be carried out with greater efficiency by means of direct memory access; those accesses which fall beyond the end of the first virtual-memory region will automatically wrap around to the beginning of the underlying buffer. When the read offset is advanced into the second virtual-memory region, both offsets—read and write—are decremented by the length of the underlying buffer.

接下來 circular buffer 初始化時會呼叫三次 mmap,為什麼呢?

  • 第一次呼叫
    • 向作業系統請求配置一塊足以容納兩倍 circular buffer 空間的記憶體
  • 第二次呼叫:
    • 將稍早用 mkstemp 建立的暫存檔映射到第一次呼叫 mmap 所取得的記憶體中,映射大小為 buffer 的大小
  • 第三次呼叫:
    • 以第二次要求映射的記憶體位置 (即第一次得到的位置) 加上 buffer 大小的記憶體位置作為要求配置的位置,映射大小同樣為 buffer 的大小,並也是映射到同樣的 fd 上(注意,兩次呼叫傳入的 offset 均為0)
    • 如此一來,第二次與第三次映射的記憶體位置即指向同一塊記憶體

mmap is used to mirror the buffer

the "mirrored" buffer is then placed beside the buffer. When the user polls the item it doesn't matter if the item crosses the buffer's boundary.

有了這個特性後,當我們使用 memcpy 在寫入/讀取 buffer 時即可不用考慮到邊界問題,進而改善存取效率,否則,我們必須考慮到目前 index 是否會超出邊界等等,這將對效能造成衝擊。

測試程式碼如下: (test.c)

#include <stdio.h>                         
#include <unistd.h>

#include "queue.h"

#define BUFFER_SIZE (getpagesize())
#define NUM_THREADS (8)
#define MESSAGES_PER_THREAD (getpagesize() * 2)

static void *consumer_loop(void *arg)
{
    queue_t *q = (queue_t *) arg;
    size_t count = 0;
    for (size_t i = 0; i < MESSAGES_PER_THREAD; i++) {
        size_t x;
        queue_get(q, (uint8_t *) &x, sizeof(size_t));
        count++;
    }
    return (void *) count;
}

static void *publisher_loop(void *arg)
{
    queue_t *q = (queue_t *) arg;
    size_t i;
    for (i = 0; i < NUM_THREADS * MESSAGES_PER_THREAD; i++)
        queue_put(q, (uint8_t *) &i, sizeof(size_t));
    return (void *) i;
}

int main(int argc, char *argv[])
{
    queue_t q;
    queue_init(&q, BUFFER_SIZE);

    pthread_t publisher;
    pthread_t consumers[NUM_THREADS];

    pthread_attr_t attr;
    pthread_attr_init(&attr);

    pthread_create(&publisher, &attr, &publisher_loop, (void *) &q);

    for (intptr_t i = 0; i < NUM_THREADS; i++)
        pthread_create(&consumers[i], &attr, &consumer_loop, (void *) &q);

    intptr_t sent;
    pthread_join(publisher, (void **) &sent);
    printf("publisher sent %ld messages\n", sent);

    intptr_t recd[NUM_THREADS];
    for (intptr_t i = 0; i < NUM_THREADS; i++) {
        pthread_join(consumers[i], (void **) &recd[i]);
        printf("consumer %ld received %ld messages\n", i, recd[i]);
    }

    pthread_attr_destroy(&attr);

    queue_destroy(&q);

    return 0;
}

編譯上述 test.c:

$ cc -Wall -std=c11 -D_GNU_SOURCE -o test test.c -lpthread

參考執行輸出:

publisher sent 65536 messages
consumer 0 received 8192 messages
consumer 1 received 8192 messages
consumer 2 received 8192 messages
consumer 3 received 8192 messages
consumer 4 received 8192 messages
consumer 5 received 8192 messages
consumer 6 received 8192 messages
consumer 7 received 8192 messages

其中 queue.h 程式碼列表如下:

#ifndef queue_h_
#define queue_h_

#include <pthread.h>
#include <stdint.h>

typedef struct {
    // backing buffer and size
    uint8_t *buffer;
    size_t size;

    // backing buffer's memfd descriptor
    int fd;

    // read / write indices
    size_t head, tail;

    // sequence number of next consumable message
    size_t head_seq;

    // sequence number of last written message
    size_t tail_seq;

    // synchronization primitives
    pthread_cond_t readable, writeable;
    pthread_mutex_t lock;
} queue_t;

#include <errno.h>
#include <fcntl.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#include <sys/mman.h>
#include <sys/types.h>

/* Metadata (header) for a message in the queue */
typedef struct {
    size_t len, seq;
} message_t;

/* Convenience wrappers for erroring out */
static inline void queue_error(const char *fmt, ...)
{
    va_list args;
    va_start(args, fmt);
    fprintf(stderr, "queue error: ");
    vfprintf(stderr, fmt, args);
    fprintf(stderr, "\n");
    va_end(args);
    abort();
}
static inline void queue_error_errno(const char *fmt, ...)
{
    va_list args;
    va_start(args, fmt);
    fprintf(stderr, "queue error: ");
    vfprintf(stderr, fmt, args);
    fprintf(stderr, " (errno %d)\n", errno);
    va_end(args);
    abort();
}

/** Initialize a blocking queue *q* of size *s* */
void queue_init(queue_t *q, size_t s)
{
    /* We mmap two adjacent pages (in virtual memory) that point to the same
     * physical memory. This lets us optimize memory access, so that we don't
     * need to even worry about wrapping our pointers around until we go
     * through the entire buffer.
     */

    // Check that the requested size is a multiple of a page. If it isn't, we're
    // in trouble.
    if (s % getpagesize() != 0) {
        queue_error(
            "Requested size (%lu) is not a multiple of the page size (%d)", s,
            getpagesize());
    }

    // Create an anonymous file backed by memory
    if ((q->fd = memfd_create("queue_region", 0)) == -1)
        queue_error_errno("Could not obtain anonymous file");

    // Set buffer size
    if (ftruncate(q->fd, s) != 0)
        queue_error_errno("Could not set size of anonymous file");

    // Ask mmap for a good address
    if ((q->buffer = mmap(NULL, 2 * s, PROT_NONE, MAP_PRIVATE | MAP_ANONYMOUS,
                          -1, 0)) == MAP_FAILED)
        queue_error_errno("Could not allocate virtual memory");

    // Mmap first region
    if (mmap(q->buffer, s, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_FIXED,
             q->fd, 0) == MAP_FAILED)
        queue_error_errno("Could not map buffer into virtual memory");

    // Mmap second region, with exact address
    if (mmap(q->buffer + s, s, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_FIXED,
             q->fd, 0) == MAP_FAILED)
        queue_error_errno("Could not map buffer into virtual memory");

    // Initialize synchronization primitives
    if (pthread_mutex_init(&q->lock, NULL) != 0)
        queue_error_errno("Could not initialize mutex");
    if (pthread_cond_init(&q->readable, NULL) != 0)
        queue_error_errno("Could not initialize condition variable");
    if (pthread_cond_init(&q->writeable, NULL) != 0)
        queue_error_errno("Could not initialize condition variable");

    // Initialize remaining members
    q->size = s;
    q->head = q->tail = 0;
    q->head_seq = q->tail_seq = 0;
}

/** Destroy the blocking queue *q* */
void queue_destroy(queue_t *q)
{
    if (munmap(q->buffer + q->size, q->size) != 0)
        queue_error_errno("Could not unmap buffer");

    if (munmap(q->buffer, q->size) != 0)
        queue_error_errno("Could not unmap buffer");

    if (close(q->fd) != 0)
        queue_error_errno("Could not close anonymous file");

    if (pthread_mutex_destroy(&q->lock) != 0)
        queue_error_errno("Could not destroy mutex");

    if (pthread_cond_destroy(&q->readable) != 0)
        queue_error_errno("Could not destroy condition variable");

    if (pthread_cond_destroy(&q->writeable) != 0)
        queue_error_errno("Could not destroy condition variable");
}

/** Insert into queue *q* a message of *size* bytes from *buffer*
 *
 * Blocks until sufficient space is available in the queue.
 */
void queue_put(queue_t *q, uint8_t *buffer, size_t size)
{
    pthread_mutex_lock(&q->lock);

    // Wait for space to become available
    while ((q->size - (q->tail - q->head)) < (size + sizeof(message_t)))
        pthread_cond_wait(&q->writeable, &q->lock);

    // Construct header
    message_t m = {.len = size, .seq = q->tail_seq++};

    // Write message
    memcpy(&q->buffer[q->tail], &m, sizeof(message_t));
    memcpy(&q->buffer[AAA], buffer, size);

    // Increment write index
    q->tail += BBB;

    pthread_cond_signal(&q->readable);
    pthread_mutex_unlock(&q->lock);
}

/** Retrieves a message of at most *max* bytes from queue *q* and writes
 * it to *buffer*.
 *
 * Blocks until a message of no more than *max* bytes is available.
 *
 * Returns the number of bytes in the written message.
 */
size_t queue_get(queue_t *q, uint8_t *buffer, size_t max)
{
    pthread_mutex_lock(&q->lock);

    // Wait for a message that we can successfully consume to reach the front of
    // the queue
    message_t m;
    for (;;) {
        // Wait for a message to arrive
        while ((q->tail - q->head) == 0)
            pthread_cond_wait(&q->readable, &q->lock);

        // Read message header
        memcpy(&m, &q->buffer[q->head], sizeof(message_t));

        // Message too long, wait for someone else to consume it
        if (m.len > max) {
            while (q->head_seq == m.seq)
                pthread_cond_wait(&q->writeable, &q->lock);
            continue;
        }

        // We successfully consumed the header of a suitable message, so proceed
        break;
    }

    // Read message body
    memcpy(buffer, &q->buffer[q->head + sizeof(message_t)], m.len);

    // Consume the message by incrementing the read pointer
    q->head += m.len + sizeof(message_t);
    q->head_seq++;

    // When read buffer moves into 2nd memory region, we can reset to the 1st
    // region
    if (q->head >= q->size) {
        CCC;
    }

    pthread_cond_signal(&q->writeable);
    pthread_mutex_unlock(&q->lock);

    return m.len;
}

#endif

請補完程式碼。

作答區

AAA = ?

  • (a) q->head
  • (b) q->size - (q->tail - q->head)
  • (c) q->tail + sizeof(message_t)
  • (d) sizeof(message_t)
  • (e) size + sizeof(message_t)

BBB = ?

  • (a) q->head
  • (b) q->size - (q->tail - q->head)
  • (c) q->tail + sizeof(message_t)
  • (d) sizeof(message_t)
  • (e) size + sizeof(message_t)

CCC = ?

  • (a) q->head -= q->size; q->tail -= q->size;
  • (b) q->head -= sizeof(message_t); q->tail -= sizeof(message_t);
  • (c) q->head -= (q->tail - q->head); q->tail -= (q->tail - q->head);

延伸問題:

  1. 解釋上述程式碼運作原理,並探討效能表現
  2. 在 Linux 核心原始程式碼找出類似改善 ring buffer 效能的程式碼並解析