# 2025q1 Homework3 (kxo) contributed by <`Andrewtangtang`> {%hackmd NrmQUGbRQWemgwPfhzXj6g %} ## 開發環境 ``` $ gcc --version gcc (Ubuntu 13.3.0-6ubuntu2~24.04) 13.3.0 $ lscpu Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Address sizes: 39 bits physical, 48 bits virtual Byte Order: Little Endian CPU(s): 12 On-line CPU(s) list: 0-11 Vendor ID: GenuineIntel Model name: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz CPU family: 6 Model: 158 Thread(s) per core: 2 Core(s) per socket: 6 Socket(s): 1 Stepping: 10 CPU(s) scaling MHz: 18% CPU max MHz: 4500.0000 CPU min MHz: 800.0000 BogoMIPS: 5199.98 Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_ tsc art arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc cpuid aperfmperf pni pclmulqdq dtes64 monitor ds_cpl vmx est tm2 ssse3 sdbg fma cx16 xtpr pd cm pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm 3dnowprefetch cpuid_fault epb pti ssbd ibrs ibpb stibp tpr_ shadow flexpriority ept vpid ept_ad fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid mpx rdseed adx smap clflushopt intel_pt xsaveopt xsavec xgetbv1 xsave s dtherm ida arat pln pts hwp hwp_notify hwp_act_window hwp_epp vnmi md_clear flush_l1d arch_capabilities Virtualization features: Virtualization: VT-x Caches (sum of all): L1d: 192 KiB (6 instances) L1i: 192 KiB (6 instances) L2: 1.5 MiB (6 instances) L3: 12 MiB (1 instance) NUMA: NUMA node(s): 1 NUMA node0 CPU(s): 0-11 Vulnerabilities: Gather data sampling: Mitigation; Microcode Itlb multihit: KVM: Mitigation: VMX disabled L1tf: Mitigation; PTE Inversion; VMX conditional cache flushes, SMT vulnerable Mds: Mitigation; Clear CPU buffers; SMT vulnerable Meltdown: Mitigation; PTI Mmio stale data: Mitigation; Clear CPU buffers; SMT vulnerable Reg file data sampling: Not affected Retbleed: Mitigation; IBRS Spec rstack overflow: Not affected Spec store bypass: Mitigation; Speculative Store Bypass disabled via prctl Spectre v1: Mitigation; usercopy/swapgs barriers and __user pointer sanitization Spectre v2: Mitigation; IBRS; IBPB conditional; STIBP conditional; RSB filling; PBRSB-eIBRS Not affected; BHI Not affected Srbds: Mitigation; Microcode Tsx async abort: Not affected ``` ## 理解`kxo`核心程式碼 ### simrupt 流程 在核心程式碼 `main.c` 使用了一個 linux kernel 中的 `timer` 每100 ms 觸發一次 interrupt 來模擬週期性 `HardIRQ` , 而 `timer_hadler()` 這個函式就是被註冊作為當 timer interrupt 時需要進行的 Interrupt service routine,但因為這個過程本質上還是屬於`softIRQ context` 因此在 `timer_handler()` 的程式碼中對於於該 cpu disable interrupt 來模擬真正 `hardIRQ` (hardIRQ context 是不可以被排程與被其他 interrupt process 搶佔,在([linux/hardirq.h](https://github.com/torvalds/linux/blob/master/include/linux/hardirq.h) 可以看到 `#define __irq_enter()` 這個 macro 可以利用對 `preempt_count()` 增加來確保在中斷處理期間不會再被打斷)。 > Todo:Can softIRQ be interrupted by hardware interrupt? ```c #define __irq_enter() \ do { \ preempt_count_add(HARDIRQ_OFFSET); \ lockdep_hardirq_enter(); \ account_hardirq_enter(current); \ } while (0) ``` 緊接著在 `timer_handler()` 中我們實做需要即時檢查的一些程式碼,判斷棋局輸贏、重置遊戲、或對於 bottom half 部份使用 tasklet 進行排程。 ```c static void timer_handler(struct timer_list *__timer) { if (win == ' ') { ai_game();// schedule the bottom half task } else { if (attr_obj.display == '1') { // transmite the data to user spase } if (attr_obj.end == '0') {// reset the game } } ``` ### tasklet 與 workqueue 流程 在 `timer_handler()` 呼叫 `tasklet_schedule(&game_tasklet)`,這個 function 雖然有schedule 一字,但 tasklet 是不能被排程的在 [kernel/softirq.c](https://github.com/torvalds/linux/blob/master/include/linux/hardirq.h) 可以看到其內部的實做並不是把它放到 cpu runqueue ,而是把這個 tasklet 加入到 SoftIRQ 的 pending list。 ```c static void __tasklet_schedule_common(struct tasklet_struct *t, struct tasklet_head __percpu *headp, unsigned int softirq_nr) { struct tasklet_head *head; t->next = NULL; *head->tail = t; head->tail = &(t->next); } ``` 而在`game_tasklet` 註冊的 `game_tasklet_func` function 中會先透過 `WARN_ON_ONCE` 檢查當前是否確實處於 softirq atomic context,接著讀取 `finish` 和 `turn`(搭配 memory barrier `smp_rmb` 確保一致性),判斷該輪到哪一個 AI 玩家下棋。決定後,會將旗標 `finish` 清零後,透過 queue_work() 將對應玩家的 AI 計算 (`ai_one_work` 或 `ai_two_work`) 排程到可睡眠的工作佇列(workqueue)進行。這樣的設計可以必免較為耗時的 AI 演算法直接在 atomic context 執行而影響系統中斷處理的效能。最後,無論哪方 AI 完成下棋動作後,都會再額外排程 `drawboard_work` 工作,將棋盤更新資訊傳回使用者空間,完成一次的下棋迴圈。 ## 將棋盤的顯示移到使用者層級 ### 原先棋盤顯示的實作 :::danger 注意用語! ::: 當 AI 棋手任一方完成了下棋的動作後,都會進行 `drawboard_work` 這個 workqueue 任務,看一下該 workqueue 的 callback function `drawboard_work_func()` 可以看到 ```c /* Workqueue handler: executed by a kernel thread */ static void drawboard_work_func(struct work_struct *w) { mutex_lock(&producer_lock); draw_board(table); mutex_unlock(&producer_lock); /* Store data to the kfifo buffer */ mutex_lock(&consumer_lock); produce_board(); mutex_unlock(&consumer_lock); wake_up_interruptible(&rx_wait); } ``` 這裡先使用 `draw_board()` 根據目前下棋的 'O'、'X' 來繪製一整個棋盤後寫進 `kfifo` 中,接著喚醒在 wait_queue 中的 process,此處是 user_space 的 process,因為在user space read kxo module 時會執行到以下程式碼 ```c static ssize_t kxo_read(struct file *file, char __user *buf, size_t count, loff_t *ppos) { ret = wait_event_interruptible(rx_wait, kfifo_len(&rx_fifo)); ``` 當 `kfifo` 長度為便會將該 process 加入 `rx_wait` 我們宣告的`wait_queue` 中。所以當我們喚醒後就會使用 `kfifo_to_user(&rx_fifo, buf, count, &read);` 將棋盤的整個畫面傳回user_space。 ### 降低通訊成本 將原本 `kfifo` 的長度因出來可以看到原本的程式碼將棋盤全部做傳輸的大小最小是 66 bytes。 :::danger 文字訊息不要用圖片展現! ::: ##### uint32_t packed > commit :[ef357cd](https://github.com/Andrewtangtang/kxo/commit/ef357cdfdab25aab8c75c193797a623e71d7b697) 我一開始的想法是,棋盤的大小是16而每個位置至多就只有三種狀態 'O'、'X'、' ',因此使用每個位置 2bits 剛好可以在 4 個bytes(2*16 bits)紀錄完,因此我在`drawboard_work_func()` 增加了將 `table` 根據其 index 紀錄在一個 uint32_t 的操作。 ```c /* pack the chess board into uint32_t */ static inline u32 pack_board(const char *table) { u32 out = 0; for (int i = 0; i < N_GRIDS; ++i) { u32 v = table[i] == 'O' ? 1 : (table[i] == 'X' ? 2 : 0); out |= (v << (i * 2)); } pr_debug("kxo: %s: %u\n", __func__, out); return out; } ``` 而在user_space 收到這個 uint32_t 後,根據每個位子的資訊分別對應的符號重新繪製出棋盤 ```c typedef uint32_t u32; static inline char decode_cell(u32 board, int index) { int bits = (board >> (index * 2)) & 3; return bits == 1 ? 'O' : (bits == 2 ? 'X' : ' '); } void print_board(u32 board) { printf("\033[H\033[J"); // clear screen for (int r = 0; r < BOARD_SIZE; ++r) { for (int c = 0; c < BOARD_SIZE; ++c) { putchar(decode_cell(board, r * BOARD_SIZE + c)); if (c != BOARD_SIZE - 1) putchar('|'); } putchar('\n'); if (r != BOARD_SIZE - 1) print_hline(); } } ``` 藉由這樣的改動可以將原本每次的通訊成本從原本的66 bytes 降低到 4 bytes。 #### 進一步探討需要傳遞的資訊 > commit [2822111](https://github.com/Andrewtangtang/kxo/commit/2822111a886ccd33645519d880a9d9177993a596) 後來與 <`ibat10clw`> 討論作業該部份後發現,對於更新棋盤畫面來說,kernerl 通知 user_space 每一步需要做的更新其實只有,該次的 turn 、位置以及分出勝負或棋局結束時重新清空頁面。如果只要傳這些資訊給 user_space 其實只要 1 byte 就可以傳完。而為了同時可以提升可讀性與壓縮使用空間,我使用了之前課程教材中出現的 `Bit fields` 技巧來定義我要來通訊的 struct `move_event_t` ,同時可以支援更大的棋盤。 ```c #if N_GRIDS <= 64 // Maximum 8x8 board typedef struct { uint8_t reset : 1; uint8_t player : 1; // 0 for 'X', 1 for 'O' uint8_t position : 6; // 6 bits sufficient for 0-63 } __attribute__((packed)) move_event_t; #elif N_GRIDS <= 256 // Maximum 16x16 board typedef struct { uint8_t reset : 1; uint8_t player : 1; // 0 for 'X', 1 for 'O' uint16_t position : 8; // 8 bits sufficient for 0-255 } __attribute__((packed)) move_event_t; #else // Larger boards typedef struct { uint8_t reset : 1; uint8_t player : 1; // 0 for 'X', 1 for 'O' uint16_t position; // Full 16 bits } __attribute__((packed)) move_event_t; #endif ``` 此外因為需要寫進 `kfifo` buffer 的任務從 66 bytes 減為 1 byte 我認為可以刪除原先繪製棋盤與寫入 buffer 額外創建所需要的排程任務`drawboard_work_func()` ,直接在 `ai_one_work` 、 `ai_two_work` 的 workqueue callback function 中將運算完的結果直接寫進`kfifo` buffer中即可。 ```diff +static void send_move_event(int position, char turn, int reset) +{ + move_event_t move_event; + move_event.reset = reset; + move_event.player = turn == 'O' ? 1 : 0; + move_event.position = position; + unsigned int len = + kfifo_in(&rx_fifo, (unsigned char *) &move_event, sizeof(move_event)); + // other code + } +static void ai_one_work_func(struct work_struct *w) +{ + if (move != -1) { + WRITE_ONCE(table[move], 'O'); + mutex_lock(&consumer_lock); + send_move_event(move, 'O', 0); + mutex_unlock(&consumer_lock); + wake_up_interruptible(&rx_wait); + } +} +static void ai_two_work_func(struct work_struct *w) +{ + if (move != -1) { + WRITE_ONCE(table[move], 'X'); + mutex_lock(&consumer_lock); + send_move_event(move, 'X', 0); + mutex_unlock(&consumer_lock); + wake_up_interruptible(&rx_wait); + } +} ``` 而在 user_space 端因為目前每一次收到的訊息都是棋盤上的更動而已而不是整個棋盤的資訊,因此在這裡我修改了 user_space 的邏輯,讓它只要開始執行時就會不停的接收更新資料,但按下 `ctrl+p` 時不會將本地的棋盤 buffer 進行繪製,以此方法來持續獲得 kernel 的即時更新。 ```diff +#define GET_INDEX(i, j) ((i) * (BOARD_SIZE) + (j)) +char board[N_GRIDS]; while (!end_attr) { FD_ZERO(&readset); FD_SET(STDIN_FILENO, &readset); if (FD_ISSET(STDIN_FILENO, &readset)) { FD_CLR(STDIN_FILENO, &readset); listen_keyboard_handler(); - } else if (read_attr && FD_ISSET(device_fd, &readset)) { + } + if (FD_ISSET(device_fd, &readset)) { FD_CLR(device_fd, &readset); - ssize_t n = read(device_fd, &board_buf, sizeof(board_buf)); - if (n == sizeof(board_buf)) - print_board(board_buf); + read(device_fd, &move_event, sizeof(move_event)); + if (move_event.reset) { + memset(board, ' ', N_GRIDS); + } else { + if (move_event.player == 0) { + board[move_event.position] = 'X'; + } else { + board[move_event.position] = 'O'; + } + } + if (read_attr) + print_board(); } ``` :::danger 提供數學分析 ::: ### 使用 ftrace 進行效能分析 我在本次實驗中使用了 Ftrace 來量測 kernel-space 傳送資料到 user-space 的通訊成本。Ftrace 提供了內建的函式追蹤(function tracer)與事件追蹤(event tracer)功能,可以精確地記錄特定函式的執行時間與呼叫關係。我原本想透過 function_graph 模式觀察kfifo_to_user 函式的執行時間來作為統計通訊成本的資料,擔當我把 `kxo` 核心模組塞進去後,我查看一下 ftrace 可以讓我追蹤的函式清單 ``` cat /sys/kernel/tracing/available_filter_functions ``` 發現 kfifo 的操作函式都沒有在清單內,目前如果想要追蹤通訊成本的相關函式只能追蹤 `kfifo_to_user()` 更上一層的 `kxo_read` 函式,但這樣我認為可能會量到不是通訊成本的時間,後來看了`linux/kfifo.h` , `kfifo_to_user()` 是一個 macro 並不是 function 所以並不能被設置成被追蹤的函式。 ```c #define kfifo_to_user(fifo, to, len, copied) \ __kfifo_int_must_check_helper( \ ({ \ typeof((fifo) + 1) __tmp = (fifo); \ void __user *__to = (to); \ unsigned int __len = (len); \ unsigned int *__copied = (copied); \ const size_t __recsize = sizeof(*__tmp->rectype); \ struct __kfifo *__kfifo = &__tmp->kfifo; \ (__recsize) ? \ __kfifo_to_user_r(__kfifo, __to, __len, __copied, __recsize) : \ __kfifo_to_user(__kfifo, __to, __len, __copied); \ }) \ ) ``` #### ktime_get() 為了還是可以先做出數學分析我在 `kfifo_to_user()` 的前後獲取 kernel 當下的時間戳,之後在用 `ktime_sub()` 將兩者時間戳相減獲取執行時間。 ```c ktime_t start_time = ktime_get(); ret = kfifo_to_user(&rx_fifo, buf, count, &read); ktime_t end_time = ktime_get(); if (read > 0){ num_read += 1; nsecs = (s64) ktime_to_ns(ktime_sub(end_time, start_time)); pr_info("kxo: [CPU#%d] %s in_user_read: %llu nsec num:%d read:%d\n ", smp_processor_id(), __func__, (unsigned long long) nsecs, num_read, read); } ``` 我對於原本 1bytes 版本的和 66 bytes 通訊成本的的資料各收取 5000 筆資料來做統計分析,結果如下。 ``` === kfifo_to_user === Version Samples Mean (ns) Std Dev (ns) p50 (ns) p90 (ns) p99 (ns) Max (ns) 1bytes 5000 2234.9602 1813.098946 1664.0 4107.2 7601.46 27854.0 66bytes 5000 1789.6858 2071.752232 1486.0 2620.8 7327.32 62323.0 ``` ![image](https://hackmd.io/_uploads/Sks9ORKxgx.png) 結果很意外的是原本 66 bytes 的通訊成本平均起來卻比我修改過得版本成本還低,我認為或許是我尚未完全排除使用者空間與核心執行環境中的干擾因素,例如 CPU 頻率調整、Turbo Boost 影響,或是 SMT 所造成的 cache 競爭,也或許是通訊成本要量測的函式不是 `kfifo_to_user()`。 ### 顯示過去 AI 歷史對戰紀錄 > commit: [58ed29d](https://github.com/sysprog21/kxo/commit/58ed29de3569eef702a5b30189b33f3bab5b1259) 因為上方的改動,只要核心的 AI 棋手有了新的下棋的動作變可以從核心傳回來的 `move_event_t` 得知下棋位址的紀錄,我們只要在 user_space 增加紀錄的結構在最後一次印出來即可。新創建了 log.[ch] 來實做有關於紀錄的歷史紀錄的操作。針對每一場的棋局我使用了 `game_record_t` 來紀錄,`moves` 是一個 uint8_t 的陣列, 每4個 bits 可以用來紀錄一個step 的 position index, move count 則是用來紀錄該場棋局移動了幾步。 ```diff +typedef struct { + uint8_t moves[MAX_MOVES / 2]; // each byte stores two moves (4 bits) + int move_count; // current move count +} game_record_t; ``` 接著使用另外一個 struct `game_record_list_t` 來對於運行期間的每一場棋局移動的位置進行紀錄, ```c typedef struct { game_record_t *data; // pointer to records size_t size; // number of used records size_t cap; // array capacity } game_record_list_t; ``` 在 user_space 中 log.[ch] 對於此結構體有新增相關操作函式可以對於紀錄遊戲以及繪製出其棋盤有相對應的操作。 ## 實作 coroutine 在與老師討論時,針對目前的 user space 程式碼,老師提出了兩個問題並建議我進行實作改進: 1. **如何同時實現固定週期的畫面刷新(例如每秒更新一次時間)以及即時處理來自 kernel 的非固定時間點事件(如 AI 下棋)?** 2. **如何讓畫面更新既穩定且具備即時性,而非受到事件發生時間不穩定影響?** 目前的 user space 程式碼採用以下結構: ```c while (!end_attr) { FD_ZERO(&readset); FD_SET(STDIN_FILENO, &readset); FD_SET(device_fd, &readset); int result = select(max_fd + 1, &readset, NULL, NULL, NULL); // handle each event... } ``` 這裡使用了 `select` 來同時監聽鍵盤輸入與 kernel 發送的 move 資訊。然而,目前時間顯示是透過 `<time.h>` 在畫面輸出時**順帶**取得並顯示,這會導致時間顯示的更新頻率**依賴於事件觸發的頻率**,造成秒數更新不穩定、忽快忽慢的問題。這是因為 AI 下棋的間隔本身不固定,導致畫面更新不連貫。 --- ### 改進方向 我重新思考了 user space 與 kernel space 的分工,目標是讓: - **固定週期性任務(如時間刷新)** 獨立運作,不依賴其他事件。 - **非固定事件(如 kernel 傳回 move )** 能即時觸發畫面更新。 考量到這類需求符合典型的 **事件驅動架構**,且在教材〈[針對事件驅動的 I/O 模型演化](https://hackmd.io/@sysprog/linux-io-model/https%3A%2F%2Fhackmd.io%2F%40sysprog%2Fevent-driven-server)〉中提到的設計理念,我計畫採用: - `epoll` 取代 `select`:提升 I/O 事件監聽效率,並支援更多事件類型(包括 timerfd)。 - 使用 `timerfd` 實現固定週期事件:專門負責每秒時間刷新,解耦畫面時間顯示與其他事件。 - 使用 coroutine 來執行對應事件:將不同事件類型(鍵盤、kernel move、時間刷新)封裝成獨立的 coroutine,由簡單的 scheduler 輪詢或基於事件喚醒執行。 - 只將最費時的 AI 運算留在核心,其餘工作的排程部份移動到使用者層級 ### user psace coroutine 使用共5條 coroutine 來執行對應的事件 - `ai_trigger_task` : 利用 `timer_fd` 100ms 醒來一次讓 AI1 和 AI2 輪流向核心發送目前棋盤狀態,請求最新的落子位置 - `clock_task` : 利用 `timer_fd` 1s 醒來一次更新要顯示的當下時間字串同時更新畫面。 - `kb_task` : 監聽鍵盤發出的訊號 - `kxo_rx_task` : 接收 kernel 回傳回來的落子位置訊息,同時將 `render_task` 加入 coroutine 的 waiting list 中 - `render_task` : 將棋盤的最新狀態繪製出來。 ### coro.[ch] 的實作 這部份我使用了 `setjmp/longjmp` 來實作 #### coroutine 結構體及全域變數 > commit: [26fb280](https://github.com/sysprog21/kxo/commit/26fb280c5c735b07cc8a7ff92cea9d64567217fa) 首先我先定義了一個結構體用來存放每個 coroutine 在執行時會需要使用到的資料,根據 [cppreference](https://en.cppreference.com/w/c/program/jmp_buf) 對於`jmp_buf` 的描述 > The stored information is sufficient to restore execution at the correct block of the program and invocation of that block. The state of floating-point status flags, or open files, or any other data is not stored in an object of type jmp_buf. 我們可以得知,jmp_buf 會儲存足以回復當前執行點所需的 register 狀態(如 PC、SP 等),但它不會記錄 local variables、file descriptor 或 Stack 內容本身。這意味著,如果每條 coroutine 要有獨立的執行堆疊(Stack),那麼我們就需要自己額外配置記憶體來模擬該 Stack,並管理其起始與當前位置。因此,我額外設計了兩個指標: - `stack_bottom` :指向 coroutine 建立時分配的 Stack 空間底部,用來後續釋放。 - `stack`:指向目前 coroutine 執行期間對應的 Stack Pointer 位置 而每個 coroutine 的結構如下,每個 coroutine 可以利用我在創建時使用 malloc 個別分配的獨立空間作為 Stack 獨立運行,避免因 longjmp 回到已釋放的 Stack 而產生 Undefined Behavior,並讓 coroutine 能在任意深度的函式呼叫中 yield、resume 可以順利的記住當下執行的狀態,而不受 call stack 層級的限制。 ```diff +typedef struct coro { + jmp_buf env; /* Execution context */ + void *stack; /* Stack pointer */ + void *stack_bottom; /* Original stack allocation for cleanup */ + coro_func_t fn; /* Function pointer */ + coro_state_t state; /* Current state */ + struct list_head list; /* For ready queue */ + void *user_data; /* Optional user data */ +} coro_t; ``` 此外為了可以更加有效率的維護 task queue ,我引入了 linux 核心風格的 list.h 檔來有效管理任務的鍊結串列。 #### coroutine 結構體及全域變數 在閱讀完 〈[並行程式設計:排程器原理](https://hackmd.io/@sysprog/linux2025-kxo/%2F%40sysprog%2Flinux2025-kxo-f%3Fstext%3D483%253A62%253A0%253A1745479177%253AZ1hja_)〉我有以下幾個疑惑的地方 - coroutine 既然可以透過自己本身決定要不要讓出 cpu 執行,那要怎麼通知核心 process 排程器目前的工作已經換人? - 這個問題我用了 [coro](https://github.com/sysprog21/concurrent-programs/tree/master/coro) 這個簡單的 user level coroutine 執行後便得知了結論,我將內部的等待時間拉長後執行 coro 後,接著執行 ![image](https://hackmd.io/_uploads/SJUflOwyex.png) ps -T -p $(pgrep coro) 後發現站在 kernel 的角度來說其實都只有一個 thread 在執行這個個程式,也就是說其實可以看成我們自己在 user_space 的地方去做許多個任務的並行,事實上對應到 kernel 當中都只有一個 thread 在執行而已,只是我們在 user_space 的地方的單獨任務可以去自己決定何時要讓出 CPU 的使用權。 ![image](https://hackmd.io/_uploads/Sk7JbuP1gg.png)