# 2025q1 Homework3 (kxo) contributed by < [Mike1117](https://github.com/Mike1117/) > :::danger 說好的進度呢? ::: :::danger 這是開發紀錄。 Show me the code and progress. ::: ## 移植 AI 至 user-space 並以 coroutine 進行排程 根據[作業要求](https://hackmd.io/@sysprog/linux2025-kxo/%2F%40sysprog%2Flinux2025-kxo-f#-%E4%BD%9C%E6%A5%AD%E8%A6%81%E6%B1%82):`AI1, AI2 (具備玩井字遊戲的人工智慧程式,採取不同的演算法,可指定運作於使用者/核心層級)` ,程式應具備切換 AI 運作於 user/kernel space 的功能,但觀察作業區其他同學的進度,均未提到這一功能,不知道是否是我理解有誤,但我仍先實作了該功能。 >[Commit f6a9a7b](https://github.com/sysprog21/kxo/commit/f6a9a7bd3ead9dbff7656d066c66c1457b0d2e16) 首先將原本與 kernel mode 進行通訊及讀取鍵盤輸入的邏輯封裝為 `run_kernel_mode` ,再修改 `main()` 函式使其在啟動時可以以鍵盤輸入 1 或 2 選擇 AI 函式要執行在 kernel-mode 還是 user-mode 。 ```shell $ sudo ./xo-user Select AI mode: 1. Kernel AI (current default) 2. User-space AI (coroutine) Enter choice (1/2): ``` >[Commit c96a833](https://github.com/sysprog21/kxo/commit/c96a83337a291d4a5b3e926d4c149637e6d7fd3a) 隨後引入老師在 [〈並行程式設計:排程器原理〉](https://hackmd.io/@sysprog/concurrency-sched#%E5%8D%94%E5%90%8C%E5%BC%8F%E5%A4%9A%E5%B7%A5) 中提供的 `coro.c` ,實作 naive 的 coroutine 排程。 原本以為只需簡單的註冊任務並加入 `registered_task` 中即可,但實作中發現因為原本 AI 及遊戲均運作在 kernel 中,故大量 API 均為 kernel 專屬,遂重寫這些程式使其可以正常運行在 user-space 。 我將原本的遊戲與 AI 共拆分為五個任務,分別為負責兩個 AI 下棋、繪製棋盤及棋子、檢查是否勝利以及監聽鍵盤輸入,具體排程如下: ``` ai_one (MCTS) ↓ check_win ↓ listen_keyboard ↓ drawboard ↓ ai_two (Negamax) ↓ check_win ↓ listen_keyboard ↓ drawboard_work_func ``` 至此,每個 AI 下完棋後,就會檢查是否滿足勝利條件,若滿足,則清空棋盤進行下一輪,若否,則監聽是否有鍵盤輸入 (暫停或退出) ,再繪製出當前棋盤並交給下一個 AI 進行下棋。 ## 將棋盤的顯示移到使用者層級 可以看到在原實作中,`draw_board(char *table)` 這個函式是放在 `main.c` 中,意即他的運作會在 kernel space ,所以最後就需要將繪製完成的完整棋盤從 kernel space 傳至 user space ,此舉無疑為徒增通訊成本。 我們僅需將 `table` 傳輸至 user space ,再由 user space 完成完整棋盤的繪製,就可以大幅減少通訊成本,實作如下: >[Commit 45d08dc](https://github.com/sysprog21/kxo/commit/45d08dc5e6631bcf6079775bd60e5ab13c4eaa63) 首先刪去 `main.c` 中的 `draw_board` 函式及 `draw_buffer`,將 `produce_board` 中傳入 `kfifo` 的資料改為直接傳入 `table`。 而 `xo-user.c` 中則只要在接收到 kernel 傳來的 table 後,呼叫 `draw_board()` 將棋盤繪製完整後再印出即可。 實際執行後可以很容易看出下棋的速度較先前快。 ## 降低核心和使用者層級之間的通訊成本 原始 table 的長度為 16 byte,但實際上每個 byte 僅會有三種狀態 ( 'O', 'X', ' ' ),所以我們可以利用 bitops 對 table 進行編碼,進一步減少通訊成本。 針對三種狀態,我們可以分別編碼為 `00`, `01`, `10`,如此 16 格棋盤我們僅需 $16 \times 2 = 32$ bit 即可進行棋盤的傳輸。 每次傳輸都可節省 $16 \times 8 - 32 = 96$ bit 的傳輸量,為原本的 $1/4$ 。 具體實作如下: >[Commit da16bfb](https://github.com/sysprog21/kxo/commit/da16bfb9197602de704525ed5e1dd5358e91b78a) 先於 `main.c` 中建立新的函式以將棋盤壓縮,再透過 `produce_compressed_board` 寫入 `kfifo` 供 user space 讀取。 ```c static uint32_t compress_table(const char *table) { uint32_t bits = 0; for (int i = 0; i < N_GRIDS; i++) { uint32_t v = (table[i] == ' ') ? 0 : (table[i] == 'O' ? 1 : 2); bits |= (v & 0x3) << (i * 2); } return bits; } static void produce_compressed_board(void) { compressed_table = compress_table(table); unsigned int len = kfifo_in(&rx_fifo, (const unsigned char *) &compressed_table, sizeof(compressed_table)); if (unlikely(len < sizeof(compressed_table))) pr_warn_ratelimited("%s: %zu bytes dropped\n", __func__, sizeof(compressed_table) - len); pr_debug("kxo: %s: in %u/%u bytes\n", __func__, len, kfifo_len(&rx_fifo)); } ``` 再於 `xo-user.c` 中建立 decompressed 的函式,以將壓縮的棋盤還原: ```c static void decompress_table(uint32_t bits, char *table) { for (int i = 0; i < N_GRIDS; i++) { uint32_t v = (bits >> (i * 2)) & 0x3; table[i] = (v == 0) ? ' ' : (v == 1) ? 'O' : 'X'; } } ``` 還原後再將棋盤繪製即可: ```diff + decompress_table(compressed_table, table_buf); + draw_board(table_buf); ``` ## 顯示多場對弈的過程 具體實作如下: >[Commit 5c66f07](https://github.com/sysprog21/kxo/commit/5c66f0757a58b7d2ce884dc91b7668a102c8e7cb) 想要在離開對弈時,顯示歷史對弈的過程,需要從兩個方面著手: · 記錄每次的 `move` · 維護一個可以記錄多局對弈 `move` 的陣列 針對前者我能想到的實現方法有兩種 1. kernel 每次除了傳輸壓縮過的 `table` 外,再額外向 `kfifo` 中存入當前的 `move` 資訊。 2. kernel 不額外傳輸當前的 `move` 資訊,由 user space 接收到壓縮的棋盤後,與上一次接收到的棋盤作比對,計算出當前的新棋子下在何處。 最後選擇的方法為 1. 首先在 `main.c` 中建立一個新的 struct 及全域變數 `last_move`: ```c static u8 last_move; struct kxo_frame { u32 compressed_table; u8 last_move; }; ``` `kxo_frame` 會儲存當前的 table 以及 當前落子的位置, `last_move` 則會被兩個 `ai_work_func` 更新: ```diff WRITE_ONCE(turn, 'O'); WRITE_ONCE(finish, 1); + WRITE_ONCE(last_move, move); ``` 再更新 `static void produce_compressed_board(void)` ,改為傳入 `kxo_frame` 至 `kfifo` 供 user space 讀取: ```diff static void produce_compressed_board(void) { + struct kxo_frame frame = { + .compressed_table = compress_table(table), + .last_move = last_move, + }; unsigned int len = kfifo_in(&rx_fifo, (const unsigned char *) &frame, sizeof(frame)); if (unlikely(len < sizeof(frame))) pr_warn_ratelimited("%s: %zu bytes dropped\n", __func__, sizeof(frame) - len); pr_debug("kxo: %s: in %u/%u bytes\n", __func__, len, kfifo_len(&rx_fifo)); } ``` 這樣就可以在 user space 得知當前的落子位置,但實際上 user space 目前接收的資訊僅有棋盤及落子位置,它無從得知是否開始了新的對局,所以此處我在 `main.c` 中更新了對局結束時傳送的資訊,在傳輸最後的棋盤之後,額外傳輸一個代表對局結束的落子位置 `17` : ```c struct kxo_frame end_frame = { .compressed_table = compress_table(table), .last_move = 17, }; mutex_lock(&producer_lock); mutex_lock(&consumer_lock); kfifo_in(&rx_fifo, (const unsigned char *) &end_frame, sizeof(end_frame)); mutex_unlock(&consumer_lock); mutex_unlock(&producer_lock); ``` user space 接收到 `move = 17` 後,即可判斷當前對弈已結束,轉而記錄下一局的對弈數據。 而 `xo-user.c` 的部分則維護一個動態配置的二位陣列 `static uint8_t **move_log` 來記錄不同局對弈的 `move` 。最後由 `static void move_to_coordinate(int move, char *buf)` 將 `uint8_t` 的 `move` 轉換為 `字母+數字` 形式的座標並印出。 最後實際效果如下: ```shell |O| |O ------- O| | | ------- |X|X|X ------- | | | ------- Stopping the kernel space tic-tac-toe game... Game 1: B4 -> C2 -> A1 -> C3 -> D2 -> B3 -> A2 -> A3 Game 2: A3 -> D2 -> A1 -> D3 -> A2 Game 3: C3 -> D1 -> C2 -> D2 -> D3 -> B1 -> B3 Game 4: A3 -> D2 -> A1 -> D3 -> A2 Game 5: C3 -> B3 -> B2 -> A3 -> C2 -> B1 -> D2 Game 6: A1 -> B2 -> D3 -> C2 -> A4 -> B3 -> C1 -> B1 Game 7: C2 -> C3 -> D2 -> B2 -> D3 -> D4 ``` 目前實作的方法會導致 kernel 每次都需要多傳輸一組 8-bit 的整數,可能會導致整體效能的下降,可以考慮不傳輸額外的 data ,由 user space 透過比對當前與上次的棋盤進行落子位置與對局重置的判斷,具體的效能評比尚待實作。 ## 顯示當下的秒數 >[Commit cefe467](https://github.com/sysprog21/kxo/commit/cefe467379745c69c3f3f933766e5e59c176b815) 建立一個函式 `display_time()` 以顯示時間,並於執行 AI 前初始化 `start_time` 。 ```diff static void display_time() { time_t now = time(NULL); int elapsed = (int) difftime(now, start_time); printf("\nElapsed Time: %d seconds\n", elapsed); } ... + start_time = time(NULL); if (mode == MODE_KERNEL) { run_kernel_mode(); } else { run_user_mode(); } ``` ## 以 bitops 加速勝利與否的判斷 可以看到原始的實作中, `check_win` 函式是以三層迴圈來判定當前是否有 AI 達成勝利條件。 ```c char check_win(const char *t) { for (int i_line = 0; i_line < 4; ++i_line) { line_t line = lines[i_line]; for (int i = line.i_lower_bound; i < line.i_upper_bound; ++i) { for (int j = line.j_lower_bound; j < line.j_upper_bound; ++j) { char win = check_line_segment_win(t, i, j, line); if (win != ' ') return win; } } } for (int i = 0; i < N_GRIDS; i++) if (t[i] == ' ') return ' '; return 'D'; } ``` 而在 `time_handler` 中,我們可以看到 `check_win` 這個函式每次都會被呼叫,這樣每次都以三層迴圈來判定勝利與否效率非常低下,故此可以嘗試以 bitops 來快速判定棋盤的狀態。 我們在前面已經成功將 table 壓縮成 32-bit 的無號整數,故此我們可以根據已經壓縮的 table 來設計 mask 。