目的: 檢驗學員對 UNIX 作業系統 fork/exec 系統呼叫的前世今生和「並行和多執行緒程式設計」的 Atomics 操作的認知
作答表單: 測驗 1 (針對 Linux 核心「設計」/「實作」課程)
1
考慮以下程式碼藉由 fork(2) 和 mmap(2) 系統呼叫來實作並行版本的合併排序,限制 fork 的次數不超過 5。假設 calloc
總是會成功,且預期由小到大排列。程式碼如下: (部分遮蔽)
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/wait.h>
#include <unistd.h>
static int *merge(const int *left_half,
const int left_len,
const int *right_half,
const int right_len)
{
int *merged = calloc(left_len + right_len, sizeof(int));
int left_idx = 0, right_idx = 0, cur_idx = 0;
while (left_idx < left_len && right_idx < right_len) {
if (left_half[left_idx] <= right_half[right_idx]) {
merged[cur_idx++] = left_half[left_idx++];
} else {
merged[cur_idx++] = right_half[right_idx++];
}
}
while (left_idx < left_len)
merged[cur_idx++] = left_half[left_idx++];
while (right_idx < right_len)
merged[cur_idx++] = right_half[right_idx++];
return merged;
}
static int fork_count = 0;
void merge_sort(int *arr, const int len)
{
if (len == 1)
return;
const int mid = len / 2;
const int left_len = len - mid;
const int right_len = mid;
/* If forked too often, it gets way too slow. */
if (fork_count < 5) {
pid_t pid = fork();
XAAA;
if (pid == 0) { /* Child process */
merge_sort(arr, left_len);
exit(0);
}
/* Parent process */
merge_sort(XBBB, XCCC);
waitpid(pid, NULL, 0);
} else {
merge_sort(arr, left_len);
merge_sort(XDDD, XEEE);
}
memcpy(arr, merge(arr, left_len, arr + left_len, right_len),
len * sizeof(int));
}
typedef struct {
uint32_t a, b, c, d;
} rand_context_t;
/* See https://burtleburtle.net/bob/rand/smallprng.html */
#define ROT(x, k) (((x) << (k)) | ((x) >> (32 - (k))))
uint32_t rand_next(rand_context_t *x)
{
uint32_t e = x->a - ROT(x->b, 27);
x->a = x->b ^ ROT(x->c, 17);
x->b = x->c + x->d;
x->c = x->d + e;
x->d = e + x->a;
return x->d;
}
void rand_init(rand_context_t *x, uint32_t seed)
{
x->a = 0xf1ea5eed, x->b = x->c = x->d = seed;
for (size_t i = 0; i < 20; ++i)
(void) rand_next(x);
}
int iabs(int n)
{
int mask = n >> 31;
return (mask & -n) | (~mask & n);
}
#define N_ITEMS 1000000
int main(int argc, char **argv)
{
rand_context_t r;
rand_init(&r, (uintptr_t) &main ^ getpid());
/* shared by forked processes */
int *arr = mmap(NULL, N_ITEMS * sizeof(int), PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_ANONYMOUS, 0, 0);
for (int i = 0; i < N_ITEMS; ++i)
arr[i] = iabs((int) rand_next(&r));
merge_sort(arr, N_ITEMS);
for (int i = 1; i < N_ITEMS; ++i) {
if (arr[i] < arr[i - 1]) {
fprintf(stderr, "Ascending order is expected.\n");
exit(1);
}
}
printf("OK!\n");
return 0;
}
假設 calloc
總會成功,且 mmap
映射的記憶體區塊亦可存取。
作答規範:
2
考慮我們即將為 llama.cpp 撰寫 Linux 核心的加速運算模組,名為 matmul.ko
,程式碼如下: (部分遮蔽)
matmul.c
(Linux 核心模組)#include <linux/completion.h>
#include <linux/delay.h>
#include <linux/fs.h>
#include <linux/init.h>
#include <linux/ioctl.h>
#include <linux/kernel.h>
#include <linux/kthread.h>
#include <linux/module.h>
#include <linux/mutex.h>
#include <linux/proc_fs.h>
#include <linux/slab.h>
#include <linux/uaccess.h>
#include <linux/version.h>
MODULE_LICENSE("Dual MIT/GPL");
MODULE_AUTHOR("National Cheng Kung University, Taiwan");
MODULE_DESCRIPTION("matrix multiplication");
MODULE_VERSION("0.1");
#if LINUX_VERSION_CODE >= KERNEL_VERSION(5, 6, 0)
#define HAVE_PROC_OPS
#endif
#define MAT_SIZE 100
/* submatrix size for concurrent computation */
#define SUBMAT_SIZE 10
#define MATRIX_IOCTL_MAGIC 'm'
#define MATRIX_IOCTL_SET_A _IOW(MATRIX_IOCTL_MAGIC, 1, int)
#define MATRIX_IOCTL_SET_B _IOW(MATRIX_IOCTL_MAGIC, 2, int)
#define MATRIX_IOCTL_COMPUTE _IO(MATRIX_IOCTL_MAGIC, 3)
static int matrix_a[MAT_SIZE][MAT_SIZE];
static int matrix_b[MAT_SIZE][MAT_SIZE];
static int result[MAT_SIZE][MAT_SIZE];
static struct mutex matrix_mutex;
static struct completion computation_done; /* for synchronization */
static int worker_thread(void *data)
{
int start_row = *(int *) data;
int end_row = start_row + SUBMAT_SIZE;
int i, j, k;
for (i = start_row; i < end_row; ++i) {
for (j = 0; j < MAT_SIZE; ++j) {
result[i][j] = 0;
for (k = 0; k < MAT_SIZE; ++k)
result[i][j] += matrix_a[i][k] * matrix_b[k][j];
}
}
complete(&computation_done);
return 0;
}
static long matrix_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
{
switch (cmd) {
case MATRIX_IOCTL_SET_A:
/* Copy user data to kernel buffer (matrix_a) */
if (XFFF(matrix_a, (int *) arg, sizeof(matrix_a)))
return -EFAULT;
break;
case MATRIX_IOCTL_SET_B:
/* Copy user data to kernel buffer (matrix_b) */
if (XGGG(matrix_b, (int *) arg, sizeof(matrix_b)))
return -EFAULT;
break;
case MATRIX_IOCTL_COMPUTE: {
int i;
mutex_lock(&matrix_mutex);
init_completion(&computation_done);
/* Create worker threads for each submatrix */
for (i = 0; i < MAT_SIZE; i += SUBMAT_SIZE) {
int *thread_arg = kmalloc(sizeof(int), GFP_KERNEL);
*thread_arg = i;
kthread_run(worker_thread, thread_arg, "worker_thread");
}
/* Wait for all threads to complete */
for (i = 0; i < MAT_SIZE; i += SUBMAT_SIZE)
wait_for_completion(&computation_done);
mutex_unlock(&matrix_mutex);
break;
}
default:
return -EINVAL;
}
return 0;
}
static ssize_t matrix_read(struct file *file,
char __user *buf,
size_t count,
loff_t *pos)
{
if (*pos >= sizeof(result)) /* End of file */
return 0;
if (*pos + count > sizeof(result))
count = sizeof(result) - *pos;
if (XHHH(buf, (char *) result + *pos, count))
return -EFAULT;
*pos += count;
return count;
}
#ifdef HAVE_PROC_OPS
static const struct proc_ops matrix_fops = {
.proc_ioctl = matrix_ioctl,
.proc_read = matrix_read,
};
#else
static const struct file_operations matrix_fops = {
.unlocked_ioctl = matrix_ioctl,
.read = matrix_read,
};
#endif
static struct proc_dir_entry *proc_entry = NULL;
static int __init matrix_init(void)
{
mutex_init(&matrix_mutex);
proc_entry = proc_create("matmul", 0666, NULL, &matrix_fops);
if (!proc_entry) {
printk(KERN_ALERT "Failed to create proc entry\n");
return -ENOMEM;
}
printk(KERN_INFO "Matrix multiplication module loaded\n");
return 0;
}
static void __exit matrix_exit(void)
{
if (proc_entry)
proc_remove(proc_entry);
mutex_destroy(&matrix_mutex);
printk(KERN_INFO "Matrix multiplication module unloaded\n");
}
module_init(matrix_init);
module_exit(matrix_exit);
user.c
(使用者層級的測試程式)#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/ioctl.h>
#include <unistd.h>
#define MAT_SIZE 100
#define MATRIX_IOCTL_MAGIC 'm'
#define MATRIX_IOCTL_SET_A _IOW(MATRIX_IOCTL_MAGIC, 1, int)
#define MATRIX_IOCTL_SET_B _IOW(MATRIX_IOCTL_MAGIC, 2, int)
#define MATRIX_IOCTL_COMPUTE _IO(MATRIX_IOCTL_MAGIC, 3)
void fill_matrix(int matrix[MAT_SIZE][MAT_SIZE], int n)
{
printf("Enter matrix elements:\n");
for (int i = 0; i < n; ++i) {
for (int j = 0; j < n; ++j) {
printf("Matrix[%d][%d]: ", i, j);
scanf("%d", &matrix[i][j]);
}
}
}
int main()
{
int fd = open("/proc/matmul", O_RDWR);
if (fd < 0) {
perror("Failed to open /proc/matmul");
return -1;
}
int n; /* size of the matrix */
printf("Enter matrix size (n): ");
scanf("%d", &n);
if (n <= 0 || n > MAT_SIZE) {
printf("Invalid matrix size\n");
close(fd);
return -1;
}
int matrix_a[MAT_SIZE][MAT_SIZE];
int matrix_b[MAT_SIZE][MAT_SIZE];
printf("Matrix A:\n");
fill_matrix(matrix_a, n);
printf("Matrix B:\n");
fill_matrix(matrix_b, n);
/* Set matrix A/B */
if (ioctl(fd, MATRIX_IOCTL_SET_A, matrix_a) ||
ioctl(fd, MATRIX_IOCTL_SET_B, matrix_b)) {
perror("MATRIX_IOCTL_SET_{A,B} failed");
close(fd);
return -1;
}
/* Compute matrix multiplication */
if (ioctl(fd, MATRIX_IOCTL_COMPUTE)) {
perror("MATRIX_IOCTL_COMPUTE failed");
close(fd);
return -1;
}
/* Read the result matrix from the kernel */
int result[MAT_SIZE][MAT_SIZE];
ssize_t bytes_read = read(fd, result, sizeof(result));
if (bytes_read != sizeof(result)) {
perror("Failed to read result from kernel");
close(fd);
return -1;
}
/* Display the result matrix */
printf("Result matrix:\n");
for (int i = 0; i < n; ++i) {
for (int j = 0; j < n; ++j)
printf("%d ", result[i][j]);
printf("\n");
}
close(fd);
return 0;
}
請補完程式碼,使其運作符合預期。作答規範:
copy_
開頭的 Linux 核心函式延伸問題:
3
〈UNIX 作業系統 fork/exec 系統呼叫的前世今生〉提及 1963 年電腦科學家 Melvin Conway 博士提出的 fork-join 模型,亦即將任務切割成多個子任務,最終彙整各個子任務的結果並得到原任務之結果:
Work-stealing 演算法是指某個執行緒從其它工作佇列裡「竊取」(steal) 任務來執行,運作流程圖如下:
設想目前的情境下有個較大的任務,我們可將這個任務分解成許多彼此獨立的子任務。為了減少執行緒之間的競爭,我們將這些子任務分別放入不同的工作佇列 (workqueue) 中,並為每個佇列建立一個獨立的執行緒來執行工作佇列中的任務。執行緒與工作佇列逐一對應,例如執行緒 A負責處理工作佇列 A 中的任務。然而,有些執行緒可能會先完成自身佇列中的任務,其他執行緒對應的工作佇列中卻仍有任務在等待處理。
因此,那些經完成指派任務的執行緒會空閒下來。與其閒置,不如去幫助其他執行緒完成剩下的任務。於是,這個已沒有任務的執行緒就會去其他工作佇列中「竊取」(或說「認領」)一個任務來執行,於是,它們會存取到同一個工作佇列。為了減少竊取任務的執行緒與被竊取任務的執行緒之間的競爭,通常會使用雙向佇列 (double-ended queue,通常縮寫為 deque,發音為 [dek]),被竊取任務的執行緒永遠從雙向佇列的開端處取出任務來執行,而竊取任務的執行緒永遠從雙向佇列的尾端處取出任務來執行。
延伸閱讀:
我們嘗試以 C11 Atomics 撰寫 work stealing 程式碼,參見 work-steal.c (部分遮蔽),編譯和測試: (執行順序可能略有不同)
$ gcc -O2 -Wall -std=c11 -o work-steal work-steal.c -lpthread
$ ./work-steal
...
work ter 1 finished
work item 10 finished
work item 11 finished
work item 14 finished
work item 6 finished
work item 16 finished
Expect 506 lines of output (including this one)
結構體:
typedef struct work_internal {
task_t code;
atomic_int join_count;
void *args[];
} work_t;
我們可將一段程式碼的執行流程拆成多個段落,循序或是平行 (若不影響正確性) 的運行它們。則每個段落我們稱之為 work,這是以 work_t
描述的。對照論文〈Cilk: An Efficient Multithreaded Runtime System〉的敘述該資料結構也稱為 "closure",其中的成員包含:
code
: 各執行緒要啟動該任務時所要執行的函式之指標,且輸入至該函式的參數是 work_t
本身的 referencejoin_count
: 用來計算這個 work 還缺少了多少 arguments 才得以進行args
: 即 work 執行所需要的 arguments根據論文〈Cilk: An Efficient Multithreaded Runtime System〉,若 clousre 已具備所有執行需要的參數,則為 ready closure,否則為 waiting closure。
typedef struct {
atomic_size_t size;
_Atomic work_t *buffer[];
} array_t;
typedef struct {
/* Assume that they never overflow */
atomic_size_t top, bottom;
_Atomic(array_t *) array;
} deque_t;
可建立多個執行緒來並行式的完成 work。每個執行緒各自維護一個 double-ended queue (簡稱 deque,發音是 /dɛk/,不要讀成 dequeue /diːˈkjuː/),透過佇列讓 work 可加入到其中一個執行緒中運行。而之所以需要 double-ended 則與 work stealing 的需求和新的 work 被 spwan 且加入 deque 的模式有關,這後續會再詳細說明。
deque 的結構僅包含佇列本體和 top/bottom 來表示佇列中元素的有效範圍。在某一執行緒上添加新的 work 稱為 push
,行為上是將 work 擺在 bottom 對應的位置,並且將 bottom 增加 1。而該執行緒從自身的 deque 挑選下個 work 稱為 take
,作法是選擇在 bottom - 1 位置的 work,並在取出後將 bottom 減少 1。換言之,對執行緒本身 dequeu 的是使用是偏向 stack(LIFO) 方式的。
而如果有執行緒處於閒置狀態,可以嘗試去 steal
其他執行緒的 work 來幫忙分擔。但 steal
的位置是 take
操作另一側的 top 位置。選擇和 take
不同側的 work 來消化可能是有好處的:
steal
是直接取走 dequeue 中最困難、最需費時完成的一個,因為如此才不會太快完成 steal 到的 work,又要再去 steal 下一個,造成 steal 上的成本。而通常並行模式上是大任務 spawn 出小任務,又 deque 對所屬執行緒是以 stack 方式使用,因此 steal top
位置的 work 很可能是更好的選擇push
對應上面的敘述,push
主要做的就只是將給定的 work w
更新到當前 bottom 所指的位置,然後更新 bottom
為 bottom + 1
而已。但要留意由於 deque 的 buffer 是可能被填滿的,此時我們需要透過 resize
先動態將 buffer 增大,再來完成上述的操作。
release fence 的用途是可保證 fence 後的 store 必然在 fence 之前的任意 load/store 之後。push 的流程大概是:
b = load(&q->bottom)
q->array[b] = w; // <--- (1)
q->bottom += 1 // <--- (2)
則編譯器可能會重排成以下:
b = load(&q->bottom)
q->bottom += 1; // <--- (2)
q->array[b] = w; // <--- (1)
這個重排是合法的,因為在單執行緒環境,提早更新 bottom
不會影響我們要 push 下個 entry 的結果。然而這個重排對多執行緒的狀況是不同的,因為假設執行緒 A 先做 (2) 且還沒來得及做 (1) 的情況下,另一個執行緒 B 還未等到 w
被放到正確位置,就可能先去處理該位置上的東西了。
take
take
的目的是從執行緒自己的 dequeue 中取得下個要實行的 work。整體的行為僅僅是得到 q->bottom - 1
位置的 work,並且將 q->bottom
減 1
(如果 buffer 非空)。
實際上仍要考量正確的同步,因為其他執行緒會以 steal
方式取走本該在此 deque 中的 work,情況就變得複雜。
work_t *take(deque_t *q)
{
size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed) - 1;
array_t *a = atomic_load_explicit(&q->array, memory_order_relaxed);
atomic_store_explicit(&q->bottom, b, memory_order_relaxed);
atomic_thread_fence(memory_order_seq_cst);
逐行觀察的話,最開始先取得要拿走 work 的位置 b
,然後就直接將 q->bottom
進行減 1
的更新。這部份是被 atomic_thread_fence
確保必須先於後面的程式執行的。換言之,我們尚未確定 top
是否等於 bottom
(佇列為空的情況)前,就先行預設位置 b 的 work 會被取走而更新了 bottom
。
這是因為若等到確定好 deque 到底是否為空才更新 bottom
,可能出現一個 work 同時被 steal
又被 take
的競爭問題。反之先更新 bottom
即使事後之後發現 deque 是空的或是 work 已經被 steal 掉,只要再復原 bottom
就好。
size_t t = atomic_load_explicit(&q->top, memory_order_relaxed);
work_t *x;
if (t <= b) {
/* Non-empty queue */
x = atomic_load_explicit(&a->buffer[b % a->size], memory_order_relaxed);
if (t == b) {
...
}
考慮以下三種情境:
t < b
: 執行緒可直接取走 b
位置對應的 workt == b
: dequeue 中僅剩一個 entry,此時可能和 steal
操作產生競爭。可以透過 cmpxchg 來判斷是哪種情形,注意到為了可以作 cmpxchg,這裡刻意從 top
方向去取(邏輯上和從 bottom 取其實相同),也就是取完之後我們想將 q->top + 1
,而非 q->bottom - 1
,所以無論最後是前述的哪種情況都要復原 bottom
t > b
: deque 為空,需要復原 bottom
前面原本將 bottom 減 1
是預設可以成功拿到對應 work 的情況,但發現 dequeu 為空的情況下,需要將 bottom 復原以確保 bottom 的更新情況最終符合沒有取走任何 work 的情況。
steal
steal
操作是從其他執行緒的 deque 中偷取下一個 work,與 take
從 bottom
取起不同,steal
會從 top
方向進行。
work_t *steal(deque_t *q)
{
size_t t = atomic_load_explicit(&q->top, memory_order_acquire);
atomic_thread_fence(memory_order_seq_cst);
size_t b = atomic_load_explicit(&q->bottom, memory_order_acquire);
atomic_thread_fence
保證需先取 top
再去取得 bottom
。這對應了之前提到當 deque 剩一個 work 時,take
會提前更新 bottom
,再嘗試從 top
取得 work 的行為。
work_t *x = EMPTY;
if (t < b) {
/* Non-empty queue */
array_t *a = atomic_load_explicit(&q->array, memory_order_consume);
x = atomic_load_explicit(&a->buffer[t % a->size], memory_order_relaxed);
...
}
則當 t < b
,表示很可能 dequeue 中有可以偷走的 work。但具體還是要透過 cmpxchg 去競爭,確保 take
和 steal
的兩個執行緒最終只有一個可以得到它。
thread
回到各個執行緒的主體。每個 thread 就是循環進行以下流程以選擇出下個要處理的 work
不過,終究會有把所有派發的 work 都做完的時候。而即便都看了一輪發現沒有可偷的 work,但不代表所有的 work 都完成,也有可能再回頭找一輪就可以偷到。那麼怎麼準確知道真的沒有要進行的 work 呢? 這裡的作法是建立一個額外的 work done_work
,做為其他 work 的一個 argument。當其他 work 完成時就將 argument 中 done_work
的 join_count
減 1
。一直到 join_count
歸零時 done_work
可執行底下的函式,再設置對應 flag 表示所有派發的 work 都完成。各個執行緒可以根據此 flag 決定是否可以結束。
補完程式碼,使其運作符合預期。作答規範:
x+y
,而是 x + y
(+
運算子前後各有一個空白字元)1 + x
,而是 x + 1
,除非後者會導致不同的運算結果AAAA
, BBBB
, CCCC
, DDDD
, EEEE
均為表達式延伸問題: