# [2022q1](http://wiki.csie.ncku.edu.tw/sysprog/schedule) 第 12 週測驗題
###### tags: `linux2022`
:::info
目的: 檢驗學員對 **[並行和多執行緒程式設計](https://hackmd.io/@sysprog/concurrency)** 和 **[Linux 核心記憶體管理](https://hackmd.io/@sysprog/linux-memory)** 的認知
:::
作答表單:
* ==[測驗 `1`](https://docs.google.com/forms/d/e/1FAIpQLSf2ipYY7SLxDg3FCSyzLuaYPmUUqI00Q_CRoBwK2q6TpWAaCA/viewform)== (Linux 核心設計)
* ==[測驗 `2`, `3`](https://docs.google.com/forms/d/e/1FAIpQLSerzDR4vLG0MaPJctukoiJmyGrhVWlBg3uIwYo9ISqGQX2OKA/viewform)== (Linux 核心實作)
### 測驗 `1`
考慮我們想要實作一個 lock-free 的 single-producer/single-comsuer (SPSC) 並行程式,底層使用 [ring buffer](https://en.wikipedia.org/wiki/Circular_buffer),且避免 [false sharing](https://en.wikipedia.org/wiki/False_sharing)。測試程式的參考輸出: (其中 `4` 指定 4 個處理器核)
```
$ ./spsc_test 4
producer 0: ---1----
consumer 2: ---4----
consumer 1: ---2----
consumer 3: ---6----
Consumer created...
Consumer created...
Consumer created...
consumer: 23 cycles/op
producer 7 cycles/op
consumer: 23 cycles/op
consumer: 23 cycles/op
Done!
```
程式碼可參見 [SPSC](https://gist.github.com/jserv/13344287e6dfcbb73a3e02a1e3da31e2) (部分遮蔽)
其中利用到針對多核處理器的 [spinlock: significant optimizations](https://en.wikipedia.org/wiki/Spinlock#Significant_optimizations) 技巧,對照 cserv 專案的 [src/util/spinlock.h](https://github.com/sysprog21/cserv/blob/master/src/util/spinlock.h)。
已知執行過程不會遇到任何 assert 錯誤,請補完程式碼,使其執行符合預期。作答規範:
* `DDDD` 和 `EEEE` 皆為表示式
* 以最精簡的形式作答
---
## 測驗 `2`
考慮一個特別的 [circular buffer](https://en.wikipedia.org/wiki/Circular_buffer) 實作,嘗試透過 [mmap 系統呼叫](http://man7.org/linux/man-pages/man2/mmap.2.html)以化簡緩衝區邊界處理的議題。
> A circular-buffer implementation may be optimized by mapping the underlying buffer to two contiguous regions of virtual memory. (Naturally, the underlying buffer‘s length must then equal some multiple of the system’s page size.) Reading from and writing to the circular buffer may then be carried out with greater efficiency by means of direct memory access; those accesses which fall beyond the end of the first virtual-memory region will automatically wrap around to the beginning of the underlying buffer. When the read offset is advanced into the second virtual-memory region, both offsets—read and write—are decremented by the length of the underlying buffer.
接下來 circular buffer 初始化時會呼叫三次 mmap,為什麼呢?
* 第一次呼叫
* 向作業系統請求配置一塊足以容納兩倍 circular buffer 空間的記憶體
* 第二次呼叫:
* 將稍早用 mkstemp 建立的暫存檔映射到第一次呼叫 mmap 所取得的記憶體中,映射大小為 buffer 的大小
* 第三次呼叫:
* 以第二次要求映射的記憶體位置 (即第一次得到的位置) 加上 buffer 大小的記憶體位置作為要求配置的位置,映射大小同樣為 buffer 的大小,並也是映射到同樣的 fd 上(注意,兩次呼叫傳入的 offset 均為0)
* 如此一來,第二次與第三次映射的記憶體位置即指向同一塊記憶體
![](https://i.imgur.com/q0rsuMD.png)
> mmap is used to mirror the buffer
![](https://i.imgur.com/83N0jjH.png)
> the "mirrored" buffer is then placed beside the buffer. When the user polls the item it doesn't matter if the item crosses the buffer's boundary.
有了這個特性後,當我們使用 memcpy 在寫入/讀取 buffer 時即可不用考慮到邊界問題,進而改善存取效率,否則,我們必須考慮到目前 index 是否會超出邊界等等,這將對效能造成衝擊。
測試程式碼如下: (`test.c`)
```cpp
#include <stdio.h>
#include <unistd.h>
#include "queue.h"
#define BUFFER_SIZE (getpagesize())
#define NUM_THREADS (8)
#define MESSAGES_PER_THREAD (getpagesize() * 2)
static void *consumer_loop(void *arg)
{
queue_t *q = (queue_t *) arg;
size_t count = 0;
for (size_t i = 0; i < MESSAGES_PER_THREAD; i++) {
size_t x;
queue_get(q, (uint8_t *) &x, sizeof(size_t));
count++;
}
return (void *) count;
}
static void *publisher_loop(void *arg)
{
queue_t *q = (queue_t *) arg;
size_t i;
for (i = 0; i < NUM_THREADS * MESSAGES_PER_THREAD; i++)
queue_put(q, (uint8_t *) &i, sizeof(size_t));
return (void *) i;
}
int main(int argc, char *argv[])
{
queue_t q;
queue_init(&q, BUFFER_SIZE);
pthread_t publisher;
pthread_t consumers[NUM_THREADS];
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_create(&publisher, &attr, &publisher_loop, (void *) &q);
for (intptr_t i = 0; i < NUM_THREADS; i++)
pthread_create(&consumers[i], &attr, &consumer_loop, (void *) &q);
intptr_t sent;
pthread_join(publisher, (void **) &sent);
printf("publisher sent %ld messages\n", sent);
intptr_t recd[NUM_THREADS];
for (intptr_t i = 0; i < NUM_THREADS; i++) {
pthread_join(consumers[i], (void **) &recd[i]);
printf("consumer %ld received %ld messages\n", i, recd[i]);
}
pthread_attr_destroy(&attr);
queue_destroy(&q);
return 0;
}
```
編譯上述 `test.c`:
```shell
$ cc -Wall -std=c11 -D_GNU_SOURCE -o test test.c -lpthread
```
參考執行輸出:
```
publisher sent 65536 messages
consumer 0 received 8192 messages
consumer 1 received 8192 messages
consumer 2 received 8192 messages
consumer 3 received 8192 messages
consumer 4 received 8192 messages
consumer 5 received 8192 messages
consumer 6 received 8192 messages
consumer 7 received 8192 messages
```
其中 `queue.h` 程式碼列表如下:
```cpp
#ifndef queue_h_
#define queue_h_
#include <pthread.h>
#include <stdint.h>
typedef struct {
// backing buffer and size
uint8_t *buffer;
size_t size;
// backing buffer's memfd descriptor
int fd;
// read / write indices
size_t head, tail;
// sequence number of next consumable message
size_t head_seq;
// sequence number of last written message
size_t tail_seq;
// synchronization primitives
pthread_cond_t readable, writeable;
pthread_mutex_t lock;
} queue_t;
#include <errno.h>
#include <fcntl.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/mman.h>
#include <sys/types.h>
/* Metadata (header) for a message in the queue */
typedef struct {
size_t len, seq;
} message_t;
/* Convenience wrappers for erroring out */
static inline void queue_error(const char *fmt, ...)
{
va_list args;
va_start(args, fmt);
fprintf(stderr, "queue error: ");
vfprintf(stderr, fmt, args);
fprintf(stderr, "\n");
va_end(args);
abort();
}
static inline void queue_error_errno(const char *fmt, ...)
{
va_list args;
va_start(args, fmt);
fprintf(stderr, "queue error: ");
vfprintf(stderr, fmt, args);
fprintf(stderr, " (errno %d)\n", errno);
va_end(args);
abort();
}
/** Initialize a blocking queue *q* of size *s* */
void queue_init(queue_t *q, size_t s)
{
/* We mmap two adjacent pages (in virtual memory) that point to the same
* physical memory. This lets us optimize memory access, so that we don't
* need to even worry about wrapping our pointers around until we go
* through the entire buffer.
*/
// Check that the requested size is a multiple of a page. If it isn't, we're
// in trouble.
if (s % getpagesize() != 0) {
queue_error(
"Requested size (%lu) is not a multiple of the page size (%d)", s,
getpagesize());
}
// Create an anonymous file backed by memory
if ((q->fd = memfd_create("queue_region", 0)) == -1)
queue_error_errno("Could not obtain anonymous file");
// Set buffer size
if (ftruncate(q->fd, s) != 0)
queue_error_errno("Could not set size of anonymous file");
// Ask mmap for a good address
if ((q->buffer = mmap(NULL, 2 * s, PROT_NONE, MAP_PRIVATE | MAP_ANONYMOUS,
-1, 0)) == MAP_FAILED)
queue_error_errno("Could not allocate virtual memory");
// Mmap first region
if (mmap(q->buffer, s, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_FIXED,
q->fd, 0) == MAP_FAILED)
queue_error_errno("Could not map buffer into virtual memory");
// Mmap second region, with exact address
if (mmap(q->buffer + s, s, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_FIXED,
q->fd, 0) == MAP_FAILED)
queue_error_errno("Could not map buffer into virtual memory");
// Initialize synchronization primitives
if (pthread_mutex_init(&q->lock, NULL) != 0)
queue_error_errno("Could not initialize mutex");
if (pthread_cond_init(&q->readable, NULL) != 0)
queue_error_errno("Could not initialize condition variable");
if (pthread_cond_init(&q->writeable, NULL) != 0)
queue_error_errno("Could not initialize condition variable");
// Initialize remaining members
q->size = s;
q->head = q->tail = 0;
q->head_seq = q->tail_seq = 0;
}
/** Destroy the blocking queue *q* */
void queue_destroy(queue_t *q)
{
if (munmap(q->buffer + q->size, q->size) != 0)
queue_error_errno("Could not unmap buffer");
if (munmap(q->buffer, q->size) != 0)
queue_error_errno("Could not unmap buffer");
if (close(q->fd) != 0)
queue_error_errno("Could not close anonymous file");
if (pthread_mutex_destroy(&q->lock) != 0)
queue_error_errno("Could not destroy mutex");
if (pthread_cond_destroy(&q->readable) != 0)
queue_error_errno("Could not destroy condition variable");
if (pthread_cond_destroy(&q->writeable) != 0)
queue_error_errno("Could not destroy condition variable");
}
/** Insert into queue *q* a message of *size* bytes from *buffer*
*
* Blocks until sufficient space is available in the queue.
*/
void queue_put(queue_t *q, uint8_t *buffer, size_t size)
{
pthread_mutex_lock(&q->lock);
// Wait for space to become available
while ((q->size - (q->tail - q->head)) < (size + sizeof(message_t)))
pthread_cond_wait(&q->writeable, &q->lock);
// Construct header
message_t m = {.len = size, .seq = q->tail_seq++};
// Write message
memcpy(&q->buffer[q->tail], &m, sizeof(message_t));
memcpy(&q->buffer[AAA], buffer, size);
// Increment write index
q->tail += BBB;
pthread_cond_signal(&q->readable);
pthread_mutex_unlock(&q->lock);
}
/** Retrieves a message of at most *max* bytes from queue *q* and writes
* it to *buffer*.
*
* Blocks until a message of no more than *max* bytes is available.
*
* Returns the number of bytes in the written message.
*/
size_t queue_get(queue_t *q, uint8_t *buffer, size_t max)
{
pthread_mutex_lock(&q->lock);
// Wait for a message that we can successfully consume to reach the front of
// the queue
message_t m;
for (;;) {
// Wait for a message to arrive
while ((q->tail - q->head) == 0)
pthread_cond_wait(&q->readable, &q->lock);
// Read message header
memcpy(&m, &q->buffer[q->head], sizeof(message_t));
// Message too long, wait for someone else to consume it
if (m.len > max) {
while (q->head_seq == m.seq)
pthread_cond_wait(&q->writeable, &q->lock);
continue;
}
// We successfully consumed the header of a suitable message, so proceed
break;
}
// Read message body
memcpy(buffer, &q->buffer[q->head + sizeof(message_t)], m.len);
// Consume the message by incrementing the read pointer
q->head += m.len + sizeof(message_t);
q->head_seq++;
// When read buffer moves into 2nd memory region, we can reset to the 1st
// region
if (q->head >= q->size) {
CCC;
}
pthread_cond_signal(&q->writeable);
pthread_mutex_unlock(&q->lock);
return m.len;
}
#endif
```
請補完程式碼。
==作答區==
AAA = ?
* `(a)` `q->head`
* `(b)` `q->size - (q->tail - q->head)`
* `(c)` `q->tail + sizeof(message_t)`
* `(d)` `sizeof(message_t)`
* `(e)` `size + sizeof(message_t)`
BBB = ?
* `(a)` `q->head`
* `(b)` `q->size - (q->tail - q->head)`
* `(c)` `q->tail + sizeof(message_t)`
* `(d)` `sizeof(message_t)`
* `(e)` `size + sizeof(message_t)`
CCC = ?
* `(a)` `q->head -= q->size; q->tail -= q->size;`
* `(b)` `q->head -= sizeof(message_t); q->tail -= sizeof(message_t);`
* `(c)` `q->head -= (q->tail - q->head); q->tail -= (q->tail - q->head);`
:::success
延伸問題:
1. 解釋上述程式碼運作原理,並探討效能表現
2. 在 Linux 核心原始程式碼找出類似改善 ring buffer 效能的程式碼並解析
:::
---
### 測驗 `3`
考慮以下透過 mmap 實作快速檔案複製的程式碼: `mmap-filecopy.c`
```cpp
/* copy modified blocks of source file to destination file efficiently
* using mmap.
*/
#include <assert.h>
#include <fcntl.h>
#include <stddef.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sysexits.h>
#include <unistd.h>
int main(int argc, char *argv[])
{
if (argc != 3) {
printf("Usage: %s <source> <destination>\n", argv[0]);
return EX_USAGE;
}
const char *src_name = argv[1];
const char *dst_name = argv[2];
int src_fd, dst_fd;
struct stat dst_stat = {0};
off_t src_len, dst_len;
src_fd = open(src_name, O_RDONLY);
if (src_fd == -1) {
perror(src_name);
return EX_DATAERR;
}
dst_fd = open(dst_name, O_RDWR | O_CREAT,
S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
if (dst_fd == -1 || fstat(dst_fd, &dst_stat) != 0) {
perror(dst_name);
return EX_DATAERR;
}
src_len = lseek(src_fd, 0, SEEK_END);
if (src_len < 0) {
perror(src_name);
return EX_DATAERR;
}
dst_len = lseek(dst_fd, 0, SEEK_END);
if (dst_len < 0) {
perror(dst_name);
return EX_DATAERR;
}
if (dst_len > src_len) {
printf("Destination file is larger (%zd) than input file (%zd)\n",
dst_len, src_len);
return EX_DATAERR;
}
const size_t page_size =
dst_stat.st_blksize > 0 ? dst_stat.st_blksize : BUFSIZ;
const size_t len = src_len;
if (ftruncate(dst_fd, len) != 0) {
perror(dst_name);
return EX_DATAERR;
}
size_t read_count = 0;
size_t write_count = 0;
if (len > 0) {
const uint8_t *src;
uint8_t *dst;
src = mmap(NULL, len, PROT_READ, MAP_SHARED, src_fd, 0);
if (src == NULL ||
posix_madvise((void *) src, len, POSIX_MADV_SEQUENTIAL) != 0) {
perror(src_name);
return EX_UNAVAILABLE;
}
dst = mmap(NULL, len, PROP, MAP_SHARED, dst_fd, 0);
if (dst == NULL ||
posix_madvise(dst, len, POSIX_MADV_SEQUENTIAL) != 0) {
perror(dst_name);
return EX_UNAVAILABLE;
}
for (size_t i = 0; i < len; i += page_size) {
size_t block_size = (len - i) >= page_size ? page_size : (len - i);
if (memcmp(src + i, dst + i, block_size)) {
memcpy(dst + i, src + i, block_size);
write_count += block_size;
}
read_count += block_size;
}
if (munmap((void *) src, len) != 0) {
perror(src_name);
return EX_UNAVAILABLE;
}
if (msync(dst, len, MS_SYNC) != 0 || munmap(dst, len) != 0) {
perror(dst_name);
return EX_UNAVAILABLE;
}
}
if (close(src_fd) != 0) {
perror(src_name);
return EX_UNAVAILABLE;
}
if (close(dst_fd) != 0) {
perror(dst_name);
return EX_UNAVAILABLE;
}
printf("%zu bytes read\n", read_count);
printf("%zu bytes written\n", write_count);
return EXIT_SUCCESS;
}
```
編譯方式:
```shell
$ gcc -std=c11 -D_POSIX_C_SOURCE=200809L -o mmap-filecopy mmap-filecopy.c
```
假設原本已有檔名為 `in` 的檔案,且 `out` 不存在目前的路徑,可執行以下命令:
```shell
$ ./mmap-filecopy in out
```
這樣即可達成快速的檔案複製。
請補完程式碼,使得符合預期。
==作答區==
PROP = ?
* `(a)` `PROT_READ | PROT_WRITE`
* `(b)` `PROT_READ`
:::success
延伸問題:
1. 解釋上述程式碼運作原理,並指出其缺失
2. 探討 sendfile 和 splice 等系統系統在上述程式的應用
* 參見 [以 sendfile 和 splice 系統呼叫達到 Zero-Copy](https://hackmd.io/@sysprog/linux2020-zerocopy)
:::