--- tags: linux2024 --- # [2024q1](http://wiki.csie.ncku.edu.tw/linux/schedule) 第 9 週測驗題 :::info 目的: 檢驗學員對 [UNIX 作業系統 fork/exec 系統呼叫的前世今生](https://hackmd.io/@sysprog/unix-fork-exec)和「並行和多執行緒程式設計」的 [Atomics 操作](https://hackmd.io/@sysprog/concurrency-atomics)的認知 ::: ==[作答表單: 測驗 1](https://docs.google.com/forms/d/e/1FAIpQLSeHMV_qMgZQTXPiSR69LcCFFKmHe-APwL_G060NrtOrYIcadg/viewform)== (針對 Linux 核心「設計」/「實作」課程) ### 測驗 `1` 考慮以下程式碼藉由 [fork(2)](https://man7.org/linux/man-pages/man2/fork.2.html) 和 [mmap(2)](https://man7.org/linux/man-pages/man2/mmap.2.html) 系統呼叫來實作並行版本的合併排序,限制 fork 的次數不超過 5。假設 `calloc` 總是會成功,且預期由小到大排列。程式碼如下: (部分遮蔽) ```c #include <stdint.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/mman.h> #include <sys/wait.h> #include <unistd.h> static int *merge(const int *left_half, const int left_len, const int *right_half, const int right_len) { int *merged = calloc(left_len + right_len, sizeof(int)); int left_idx = 0, right_idx = 0, cur_idx = 0; while (left_idx < left_len && right_idx < right_len) { if (left_half[left_idx] <= right_half[right_idx]) { merged[cur_idx++] = left_half[left_idx++]; } else { merged[cur_idx++] = right_half[right_idx++]; } } while (left_idx < left_len) merged[cur_idx++] = left_half[left_idx++]; while (right_idx < right_len) merged[cur_idx++] = right_half[right_idx++]; return merged; } static int fork_count = 0; void merge_sort(int *arr, const int len) { if (len == 1) return; const int mid = len / 2; const int left_len = len - mid; const int right_len = mid; /* If forked too often, it gets way too slow. */ if (fork_count < 5) { pid_t pid = fork(); XAAA; if (pid == 0) { /* Child process */ merge_sort(arr, left_len); exit(0); } /* Parent process */ merge_sort(XBBB, XCCC); waitpid(pid, NULL, 0); } else { merge_sort(arr, left_len); merge_sort(XDDD, XEEE); } memcpy(arr, merge(arr, left_len, arr + left_len, right_len), len * sizeof(int)); } typedef struct { uint32_t a, b, c, d; } rand_context_t; /* See https://burtleburtle.net/bob/rand/smallprng.html */ #define ROT(x, k) (((x) << (k)) | ((x) >> (32 - (k)))) uint32_t rand_next(rand_context_t *x) { uint32_t e = x->a - ROT(x->b, 27); x->a = x->b ^ ROT(x->c, 17); x->b = x->c + x->d; x->c = x->d + e; x->d = e + x->a; return x->d; } void rand_init(rand_context_t *x, uint32_t seed) { x->a = 0xf1ea5eed, x->b = x->c = x->d = seed; for (size_t i = 0; i < 20; ++i) (void) rand_next(x); } int iabs(int n) { int mask = n >> 31; return (mask & -n) | (~mask & n); } #define N_ITEMS 1000000 int main(int argc, char **argv) { rand_context_t r; rand_init(&r, (uintptr_t) &main ^ getpid()); /* shared by forked processes */ int *arr = mmap(NULL, N_ITEMS * sizeof(int), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, 0, 0); for (int i = 0; i < N_ITEMS; ++i) arr[i] = iabs((int) rand_next(&r)); merge_sort(arr, N_ITEMS); for (int i = 1; i < N_ITEMS; ++i) { if (arr[i] < arr[i - 1]) { fprintf(stderr, "Ascending order is expected.\n"); exit(1); } } printf("OK!\n"); return 0; } ``` 假設 `calloc` 總會成功,且 `mmap` 映射的記憶體區塊亦可存取。 作答規範: * 以第一次作業風格書寫,儘量撰寫最精簡的形式,注意空白字元 * XAAA, XBBB, XCCC, XDDD, XEEE 皆為表示式 --- ### 測驗 `2` 考慮我們即將為 [llama.cpp](https://github.com/ggerganov/llama.cpp) 撰寫 Linux 核心的加速運算模組,名為 `matmul.ko`,程式碼如下: (部分遮蔽) - [ ] `matmul.c` (Linux 核心模組) ```c #include <linux/completion.h> #include <linux/delay.h> #include <linux/fs.h> #include <linux/init.h> #include <linux/ioctl.h> #include <linux/kernel.h> #include <linux/kthread.h> #include <linux/module.h> #include <linux/mutex.h> #include <linux/proc_fs.h> #include <linux/slab.h> #include <linux/uaccess.h> #include <linux/version.h> MODULE_LICENSE("Dual MIT/GPL"); MODULE_AUTHOR("National Cheng Kung University, Taiwan"); MODULE_DESCRIPTION("matrix multiplication"); MODULE_VERSION("0.1"); #if LINUX_VERSION_CODE >= KERNEL_VERSION(5, 6, 0) #define HAVE_PROC_OPS #endif #define MAT_SIZE 100 /* submatrix size for concurrent computation */ #define SUBMAT_SIZE 10 #define MATRIX_IOCTL_MAGIC 'm' #define MATRIX_IOCTL_SET_A _IOW(MATRIX_IOCTL_MAGIC, 1, int) #define MATRIX_IOCTL_SET_B _IOW(MATRIX_IOCTL_MAGIC, 2, int) #define MATRIX_IOCTL_COMPUTE _IO(MATRIX_IOCTL_MAGIC, 3) static int matrix_a[MAT_SIZE][MAT_SIZE]; static int matrix_b[MAT_SIZE][MAT_SIZE]; static int result[MAT_SIZE][MAT_SIZE]; static struct mutex matrix_mutex; static struct completion computation_done; /* for synchronization */ static int worker_thread(void *data) { int start_row = *(int *) data; int end_row = start_row + SUBMAT_SIZE; int i, j, k; for (i = start_row; i < end_row; ++i) { for (j = 0; j < MAT_SIZE; ++j) { result[i][j] = 0; for (k = 0; k < MAT_SIZE; ++k) result[i][j] += matrix_a[i][k] * matrix_b[k][j]; } } complete(&computation_done); return 0; } static long matrix_ioctl(struct file *file, unsigned int cmd, unsigned long arg) { switch (cmd) { case MATRIX_IOCTL_SET_A: /* Copy user data to kernel buffer (matrix_a) */ if (XFFF(matrix_a, (int *) arg, sizeof(matrix_a))) return -EFAULT; break; case MATRIX_IOCTL_SET_B: /* Copy user data to kernel buffer (matrix_b) */ if (XGGG(matrix_b, (int *) arg, sizeof(matrix_b))) return -EFAULT; break; case MATRIX_IOCTL_COMPUTE: { int i; mutex_lock(&matrix_mutex); init_completion(&computation_done); /* Create worker threads for each submatrix */ for (i = 0; i < MAT_SIZE; i += SUBMAT_SIZE) { int *thread_arg = kmalloc(sizeof(int), GFP_KERNEL); *thread_arg = i; kthread_run(worker_thread, thread_arg, "worker_thread"); } /* Wait for all threads to complete */ for (i = 0; i < MAT_SIZE; i += SUBMAT_SIZE) wait_for_completion(&computation_done); mutex_unlock(&matrix_mutex); break; } default: return -EINVAL; } return 0; } static ssize_t matrix_read(struct file *file, char __user *buf, size_t count, loff_t *pos) { if (*pos >= sizeof(result)) /* End of file */ return 0; if (*pos + count > sizeof(result)) count = sizeof(result) - *pos; if (XHHH(buf, (char *) result + *pos, count)) return -EFAULT; *pos += count; return count; } #ifdef HAVE_PROC_OPS static const struct proc_ops matrix_fops = { .proc_ioctl = matrix_ioctl, .proc_read = matrix_read, }; #else static const struct file_operations matrix_fops = { .unlocked_ioctl = matrix_ioctl, .read = matrix_read, }; #endif static struct proc_dir_entry *proc_entry = NULL; static int __init matrix_init(void) { mutex_init(&matrix_mutex); proc_entry = proc_create("matmul", 0666, NULL, &matrix_fops); if (!proc_entry) { printk(KERN_ALERT "Failed to create proc entry\n"); return -ENOMEM; } printk(KERN_INFO "Matrix multiplication module loaded\n"); return 0; } static void __exit matrix_exit(void) { if (proc_entry) proc_remove(proc_entry); mutex_destroy(&matrix_mutex); printk(KERN_INFO "Matrix multiplication module unloaded\n"); } module_init(matrix_init); module_exit(matrix_exit); ``` - [ ] `user.c` (使用者層級的測試程式) ```c #include <fcntl.h> #include <stdio.h> #include <stdlib.h> #include <sys/ioctl.h> #include <unistd.h> #define MAT_SIZE 100 #define MATRIX_IOCTL_MAGIC 'm' #define MATRIX_IOCTL_SET_A _IOW(MATRIX_IOCTL_MAGIC, 1, int) #define MATRIX_IOCTL_SET_B _IOW(MATRIX_IOCTL_MAGIC, 2, int) #define MATRIX_IOCTL_COMPUTE _IO(MATRIX_IOCTL_MAGIC, 3) void fill_matrix(int matrix[MAT_SIZE][MAT_SIZE], int n) { printf("Enter matrix elements:\n"); for (int i = 0; i < n; ++i) { for (int j = 0; j < n; ++j) { printf("Matrix[%d][%d]: ", i, j); scanf("%d", &matrix[i][j]); } } } int main() { int fd = open("/proc/matmul", O_RDWR); if (fd < 0) { perror("Failed to open /proc/matmul"); return -1; } int n; /* size of the matrix */ printf("Enter matrix size (n): "); scanf("%d", &n); if (n <= 0 || n > MAT_SIZE) { printf("Invalid matrix size\n"); close(fd); return -1; } int matrix_a[MAT_SIZE][MAT_SIZE]; int matrix_b[MAT_SIZE][MAT_SIZE]; printf("Matrix A:\n"); fill_matrix(matrix_a, n); printf("Matrix B:\n"); fill_matrix(matrix_b, n); /* Set matrix A/B */ if (ioctl(fd, MATRIX_IOCTL_SET_A, matrix_a) || ioctl(fd, MATRIX_IOCTL_SET_B, matrix_b)) { perror("MATRIX_IOCTL_SET_{A,B} failed"); close(fd); return -1; } /* Compute matrix multiplication */ if (ioctl(fd, MATRIX_IOCTL_COMPUTE)) { perror("MATRIX_IOCTL_COMPUTE failed"); close(fd); return -1; } /* Read the result matrix from the kernel */ int result[MAT_SIZE][MAT_SIZE]; ssize_t bytes_read = read(fd, result, sizeof(result)); if (bytes_read != sizeof(result)) { perror("Failed to read result from kernel"); close(fd); return -1; } /* Display the result matrix */ printf("Result matrix:\n"); for (int i = 0; i < n; ++i) { for (int j = 0; j < n; ++j) printf("%d ", result[i][j]); printf("\n"); } close(fd); return 0; } ``` 請補完程式碼,使其運作符合預期。作答規範: * 以第一次作業風格書寫,儘量撰寫最精簡的形式,不含空白字元 * XFFF, XGGG, XHHH 皆為 `copy_` 開頭的 Linux 核心函式 :::success 延伸問題: 1. 解釋程式碼運作原理 2. 以 [CMWQ](https://www.kernel.org/doc/html/next/core-api/workqueue.html) 重寫,並針對批次的矩陣乘法運算,提出有效的存取模型 3. 在 GitHub 找出矩陣乘法相關專案,如 [matmul](https://github.com/attractivechaos/matmul), [matmul-bench](https://github.com/tanakamura/matmul-bench), [matmul-cpu](https://github.com/jazliang/matmult-cpu), [libxsmm](https://github.com/libxsmm/libxsmm), [Matrix_Multiply_using_Arm_Neon_and_Avx](https://github.com/ruthreshx/Matrix_Multiply_using_Arm_Neon_and_Avx),並進行效能比較和實作分析,從而歸納出提升矩陣乘法的手法 4. 嘗試在 Linux 核心模組使用 SSE/AVX/NEON 等 SIMD 指令集並降低資料存取的延遲 5. 研讀 [LLaMA Now Goes Faster on CPUs](https://justine.lol/matmul/) 並歸納加速矩陣乘法的手段 ::: --- ### 測驗 `3` 〈[UNIX 作業系統 fork/exec 系統呼叫的前世今生](https://hackmd.io/@sysprog/unix-fork-exec)〉提及 1963 年電腦科學家 Melvin Conway 博士提出的 fork-join 模型,亦即將任務切割成多個子任務,最終彙整各個子任務的結果並得到原任務之結果: * Fork: 即把任務切割成多個子任務並並行運作 * Join: 合併切割後的子任務之執行結果,最終得到原任務之結果 ![](https://hackmd.io/_uploads/ByJ2vdonh.png) [Work-stealing](https://en.wikipedia.org/wiki/Work_stealing) 演算法是指某個執行緒從其它工作佇列裡「竊取」(steal) 任務來執行,運作流程圖如下: ![](https://hackmd.io/_uploads/B1qkOdsn2.png) 設想目前的情境下有個較大的任務,我們可將這個任務分解成許多彼此獨立的子任務。為了減少執行緒之間的競爭,我們將這些子任務分別放入不同的工作佇列 (workqueue) 中,並為每個佇列建立一個獨立的執行緒來執行工作佇列中的任務。執行緒與工作佇列逐一對應,例如執行緒 A負責處理工作佇列 A 中的任務。然而,有些執行緒可能會先完成自身佇列中的任務,其他執行緒對應的工作佇列中卻仍有任務在等待處理。 因此,那些經完成指派任務的執行緒會空閒下來。與其閒置,不如去幫助其他執行緒完成剩下的任務。於是,這個已沒有任務的執行緒就會去其他工作佇列中「竊取」(或說「認領」)一個任務來執行,於是,它們會存取到同一個工作佇列。為了減少竊取任務的執行緒與被竊取任務的執行緒之間的競爭,通常會使用雙向佇列 (double-ended queue,通常縮寫為 deque,發音為 [dek]),被竊取任務的執行緒永遠從雙向佇列的開端處取出任務來執行,而竊取任務的執行緒永遠從雙向佇列的尾端處取出任務來執行。 * 優點:充分地利用執行緒進行平行運算。 * 缺點:某些情況下仍然存在競爭,例如雙向佇列中只有一個任務時。除此之外,另一個缺點是它消耗更多的系統資源,例如建立多個執行緒及雙向佇列 延伸閱讀: * Linux kernel: [Workqueue](https://www.kernel.org/doc/html/next/core-api/workqueue.html) * [Can better task stealing make Linux faster?](https://blogs.oracle.com/linux/post/can-better-task-stealing-make-linux-faster) 我們嘗試以 C11 Atomics 撰寫 work stealing 程式碼,參見 [work-steal.c](https://gist.github.com/jserv/111304e7c5061b05d4d29a47571f7a98) (部分遮蔽),編譯和測試: (執行順序可能略有不同) ```shell $ gcc -O2 -Wall -std=c11 -o work-steal work-steal.c -lpthread $ ./work-steal ... work ter 1 finished work item 10 finished work item 11 finished work item 14 finished work item 6 finished work item 16 finished Expect 506 lines of output (including this one) ``` 結構體: ```c typedef struct work_internal { task_t code; atomic_int join_count; void *args[]; } work_t; ``` ![waiting closure](https://hackmd.io/_uploads/H1wq3dRgC.png) 我們可將一段程式碼的執行流程拆成多個段落,循序或是平行 (若不影響正確性) 的運行它們。則每個段落我們稱之為 work,這是以 `work_t` 描述的。對照論文〈[Cilk: An Efficient Multithreaded Runtime System](http://supertech.csail.mit.edu/papers/PPoPP95.pdf)〉的敘述該資料結構也稱為 "closure",其中的成員包含: * `code`: 各執行緒要啟動該任務時所要執行的函式之指標,且輸入至該函式的參數是 `work_t` 本身的 reference * `join_count`: 用來計算這個 work 還缺少了多少 arguments 才得以進行 * `args`: 即 work 執行所需要的 arguments 根據論文〈[Cilk: An Efficient Multithreaded Runtime System](http://supertech.csail.mit.edu/papers/PPoPP95.pdf)〉,若 clousre 已具備所有執行需要的參數,則為 ready closure,否則為 waiting closure。 ```c typedef struct { atomic_size_t size; _Atomic work_t *buffer[]; } array_t; typedef struct { /* Assume that they never overflow */ atomic_size_t top, bottom; _Atomic(array_t *) array; } deque_t; ``` 可建立多個執行緒來並行式的完成 work。每個執行緒各自維護一個 [double-ended queue](https://en.wikipedia.org/wiki/Double-ended_queue) (簡稱 deque,發音是 /dɛk/,不要讀成 dequeue /diːˈkjuː/),透過佇列讓 work 可加入到其中一個執行緒中運行。而之所以需要 double-ended 則與 work stealing 的需求和新的 work 被 spwan 且加入 deque 的模式有關,這後續會再詳細說明。 ```graphviz digraph { top [label="top", color=white]; bottom [label="bottom", color=white]; node [shape=record, fontcolor=black, fontsize=14, width=4.75, fixedsize=true]; values [label="<f0> A[0] | <f1> A[1] | <f2> A[2] | <f3> ...... | <f4> A[n - 2] | <f5> A[n - 1]", color=blue, fillcolor=lightblue, style=filled]; edge [color=blue]; top -> values:f1 bottom -> values:f4 } ``` deque 的結構僅包含佇列本體和 top/bottom 來表示佇列中元素的有效範圍。在某一執行緒上添加新的 work 稱為 `push`,行為上是將 work 擺在 bottom 對應的位置,並且將 bottom 增加 1。而該執行緒從自身的 deque 挑選下個 work 稱為 `take`,作法是選擇在 bottom - 1 位置的 work,並在取出後將 bottom 減少 1。換言之,對執行緒本身 dequeu 的是使用是偏向 stack(LIFO) 方式的。 而如果有執行緒處於閒置狀態,可以嘗試去 `steal` 其他執行緒的 work 來幫忙分擔。但 `steal` 的位置是 `take` 操作另一側的 top 位置。選擇和 `take` 不同側的 work 來消化可能是有好處的: * 理想的 `steal` 是直接取走 dequeue 中最困難、最需費時完成的一個,因為如此才不會太快完成 steal 到的 work,又要再去 steal 下一個,造成 steal 上的成本。而通常並行模式上是大任務 spawn 出小任務,又 deque 對所屬執行緒是以 stack 方式使用,因此 steal `top` 位置的 work 很可能是更好的選擇 * 因為避開另一個執行緒選擇下個 work 的一側,可預期會有更少的競爭(contention),後者會有同步上的成本 - [ ] `push` 對應上面的敘述,`push` 主要做的就只是將給定的 work `w` 更新到當前 bottom 所指的位置,然後更新 `bottom` 為 `bottom + 1` 而已。但要留意由於 deque 的 buffer 是可能被填滿的,此時我們需要透過 `resize` 先動態將 buffer 增大,再來完成上述的操作。 release fence 的用途是可保證 fence 後的 store 必然在 fence 之前的任意 load/store 之後。push 的流程大概是: ```c b = load(&q->bottom) q->array[b] = w; // <--- (1) q->bottom += 1 // <--- (2) ``` 則編譯器可能會重排成以下: ```c b = load(&q->bottom) q->bottom += 1; // <--- (2) q->array[b] = w; // <--- (1) ``` 這個重排是合法的,因為在單執行緒環境,提早更新 `bottom` 不會影響我們要 push 下個 entry 的結果。然而這個重排對多執行緒的狀況是不同的,因為假設執行緒 A 先做 (2) 且還沒來得及做 (1) 的情況下,另一個執行緒 B 還未等到 `w` 被放到正確位置,就可能先去處理該位置上的東西了。 - [ ] `take` `take` 的目的是從執行緒自己的 dequeue 中取得下個要實行的 work。整體的行為僅僅是得到 `q->bottom - 1` 位置的 work,並且將 `q->bottom` 減 `1` (如果 buffer 非空)。 實際上仍要考量正確的同步,因為其他執行緒會以 `steal` 方式取走本該在此 deque 中的 work,情況就變得複雜。 ```c work_t *take(deque_t *q) { size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed) - 1; array_t *a = atomic_load_explicit(&q->array, memory_order_relaxed); atomic_store_explicit(&q->bottom, b, memory_order_relaxed); atomic_thread_fence(memory_order_seq_cst); ``` 逐行觀察的話,最開始先取得要拿走 work 的位置 `b`,然後就直接將 `q->bottom` 進行減 `1` 的更新。這部份是被 `atomic_thread_fence` 確保必須先於後面的程式執行的。換言之,我們尚未確定 `top` 是否等於 `bottom`(佇列為空的情況)前,就先行預設位置 b 的 work 會被取走而更新了 `bottom`。 這是因為若等到確定好 deque 到底是否為空才更新 `bottom`,可能出現一個 work 同時被 `steal` 又被 `take` 的競爭問題。反之先更新 `bottom` 即使事後之後發現 deque 是空的或是 work 已經被 steal 掉,只要再復原 `bottom` 就好。 ```c size_t t = atomic_load_explicit(&q->top, memory_order_relaxed); work_t *x; if (t <= b) { /* Non-empty queue */ x = atomic_load_explicit(&a->buffer[b % a->size], memory_order_relaxed); if (t == b) { ... } ``` 考慮以下三種情境: * `t < b`: 執行緒可直接取走 `b` 位置對應的 work * `t == b`: dequeue 中僅剩一個 entry,此時可能和 `steal` 操作產生競爭。可以透過 cmpxchg 來判斷是哪種情形,注意到為了可以作 cmpxchg,這裡刻意從 `top` 方向去取(邏輯上和從 bottom 取其實相同),也就是取完之後我們想將 `q->top + 1`,而非 `q->bottom - 1`,所以無論最後是前述的哪種情況都要復原 `bottom` * `t > b`: deque 為空,需要復原 `bottom` 前面原本將 bottom 減 `1` 是預設可以成功拿到對應 work 的情況,但發現 dequeu 為空的情況下,需要將 bottom 復原以確保 bottom 的更新情況最終符合沒有取走任何 work 的情況。 - [ ] `steal` `steal` 操作是從其他執行緒的 deque 中偷取下一個 work,與 `take` 從 `bottom` 取起不同,`steal` 會從 `top` 方向進行。 ```c work_t *steal(deque_t *q) { size_t t = atomic_load_explicit(&q->top, memory_order_acquire); atomic_thread_fence(memory_order_seq_cst); size_t b = atomic_load_explicit(&q->bottom, memory_order_acquire); ``` `atomic_thread_fence` 保證需先取 `top` 再去取得 `bottom`。這對應了之前提到當 deque 剩一個 work 時,`take` 會提前更新 `bottom`,再嘗試從 `top` 取得 work 的行為。 ```c work_t *x = EMPTY; if (t < b) { /* Non-empty queue */ array_t *a = atomic_load_explicit(&q->array, memory_order_consume); x = atomic_load_explicit(&a->buffer[t % a->size], memory_order_relaxed); ... } ``` 則當 `t < b`,表示很可能 dequeue 中有可以偷走的 work。但具體還是要透過 cmpxchg 去競爭,確保 `take` 和 `steal` 的兩個執行緒最終只有一個可以得到它。 - [ ] `thread` 回到各個執行緒的主體。每個 thread 就是循環進行以下流程以選擇出下個要處理的 work 1. 先嘗試從自己的 dequeue 中取得任務,如果成功取得就執行之 2. 若失敗,嘗試在其他執行緒的 deque 中獲取,值得注意的是當 steal 得到的回傳是 ABORT 時,表示 steal 其實是發生 contention 只是最終失敗了而已,這時候允許在同一個 deque 重偷一次 3. 如果 steal 成功,則執行偷到的 work 不過,終究會有把所有派發的 work 都做完的時候。而即便都看了一輪發現沒有可偷的 work,但不代表所有的 work 都完成,也有可能再回頭找一輪就可以偷到。那麼怎麼準確知道真的沒有要進行的 work 呢? 這裡的作法是建立一個額外的 work `done_work`,做為其他 work 的一個 argument。當其他 work 完成時就將 argument 中 `done_work` 的 `join_count` 減 `1`。一直到 `join_count` 歸零時 `done_work` 可執行底下的函式,再設置對應 flag 表示所有派發的 work 都完成。各個執行緒可以根據此 flag 決定是否可以結束。 補完程式碼,使其運作符合預期。作答規範: * 使用一致的程式碼風格撰寫,並用最精簡的形式,例如不該寫作 `x+y`,而是 `x + y` (`+` 運算子前後各有一個空白字元) * 表示式若存在變數名稱和數值,應該讓變數名稱先出現,例如不該寫作 `1 + x`,而是 `x + 1`,除非後者會導致不同的運算結果 * `AAAA`, `BBBB`, `CCCC`, `DDDD`, `EEEE` 均為表達式 :::success 延伸問題: 1. 研讀程式碼註解提及的論文〈[Cilk: An Efficient Multithreaded Runtime System](http://supertech.csail.mit.edu/papers/PPoPP95.pdf)〉,解釋程式碼運作原理 2. 指出上述程式碼可改進之處並著手進行,如記憶體釋放和量化分析性能和延展性 (scalability) 3. 利用 work stealing 改寫[第 6 次作業](https://hackmd.io/@sysprog/linux2024-integration)提及的並行化快速排序程式碼,在 Linux 使用者層級實作,並確保能充分運用硬體資源 4. 研讀 Linux 核心的 [CMWQ](https://www.kernel.org/doc/html/next/core-api/workqueue.html),討論其 work stealing 的實作手法 :::