![kxo-logo](https://hackmd.io/_uploads/H1_dY2h3xg.png) # 2025q1 Homework 3 (kxo) contributed by <[LambertWSJ](https://github.com/LambertWSJ)> [Demo 影片](https://youtube.com/shorts/-Sv_Q79XyAg?feature=share) # 準備工作 ## 合併 compilation database [clangd](https://clangd.llvm.org/) 搭配 compilation database (`compile_commands.json`) 方便開發者追蹤原始碼,為方便書寫,本文 compilation database 改為 compilation DB。 通常 Kernel API 的說明在實作的註解比較容易找到,標頭檔比較少會寫,如果希望可以跳到 Kernel API 的實作而不是跳到標頭檔的宣告,可以 clone 和開發核心模組相同 Kernel 版本的原始碼,然後再編譯出 Kernel 的 compilation DB。 資列夾結構如下,不必將 linux 原始碼 clone 在 kxo 裡面,在其它路徑也可以,依個人程式碼管理習慣調整即可。 ``` /home/lambert-wu/repos ├── kxo │ └── compile_commands.json └── linux └── compile_commands.json ``` 將 linux 的 compilation DB 複製到 kxo,再用 [jq](https://jqlang.org/manual/) 合併 linux 和 kxo 的 compilation DB ```shell kxo$ cp compile_commands.json kxo_db.json kxo$ cp ~/repos/linux/compile_commands.json linux_db.json kxo$ jq -s 'add' ./linux_db.json kxo_db.json > compile_commands.json ``` 最好要備份合併完的 compile_commands.json,否則 make clean 會把它刪除。 接著 restart clangd 應該會看到 VSCode 底下的 status bar 在建立索引(indexing)中 ![image](https://hackmd.io/_uploads/BynDvXaFxg.png) 如果你的 linux 資料夾沒有 `.cache` 的話,才會看到有幾千個檔案在建立索引,否則 clangd 只會幫 kxo 建立索引,如果跳轉有問題的話,可以把 `.cache` 刪掉再 restart 一次 ### issue 原本在 kxo 用 .clangd 設定兩個 fragments(`---` 區隔),但這樣會導致 User space 的程式也使用 linux 的 compilation DB,User space API 和 headers 會出現 not found 的錯誤,過去有參考 clangd 的 [issue](https://github.com/clangd/clangd/issues/1092) 設定 PathMatch 也沒試成功,暫時沒有設定 .clangd 的解法,剩下繞點彎路的用 jq 合併 compilation DB 的辦法。 這樣的 .clangd 會用 linux kernel 的 compilation DB 而不是當前 project 的。 ```yaml CompileFlags: Add: [-xc++, -Wall] --- CompileFlags: CompilationDatabase: /home/lambert-wu/repos/linux ``` # 縮減核心模組和使用者的通訊成本 >[commit: 84ca1db](https://github.com/LambertWSJ/kxo/commit/84ca1dbec5c33c7132da5d8ea540096c721d3dae) kxo 在模組處理好棋盤畫面 `draw_buffer`,再等待使用著讀取,但是這樣的通訊成本太大了,一次需要讀取 66 位元組,應改為讀取棋盤的狀態,再交由使用者繪製棋盤,為此要設計編碼來描述棋盤的狀態。 井字遊戲的棋盤的每一格 3 種狀態 $\to$ `O`, `X`, ` `(空格),可以用 2 個位元來描述這 3 種狀態,kxo 棋盤有 16 格,剛好可以用 32 位元的數值來儲存。 | 格子狀態 | 編碼 | | -------- | ------ | | 空格 | $00_2$ | | `O` | $01_2$ | | `X` | $10_2$ | 空格除了用 0 很直觀外,AI 演算法會計算對手勝率時,會用 xor 做切換棋手的操作,而任一數值對 0 做 xor 還是保持原值,這樣的 AI 不會考慮別人,只會想到自己。 原本想用 32 位元的整數拆成 2 個 16 位元來描述執 `O` 和 `X` 玩家的棋盤,1 表示自己下的棋子,0 表示沒有棋子,但是 0 不代表對手沒有落子在那一格上面,要額外用對手的棋盤狀態來判斷,才知道 0 是不是真的沒有棋子在上面,這樣的編碼會需要更多的 bitops,所以改用 2 位元描述每一格的狀態,更能夠直觀的得知棋局全貌。 編碼決定好之後,需要考慮下面的問題 - 存取棋盤 $\to$ 讀取/寫入格子 - 如何判斷勝利和對弈中 - xo-user 如何依據這套編碼繪製棋盤 - 如何讓 AI 玩家使用這套編碼 ## 存取棋盤 ```c #define CELL_EMPTY 0u #define CELL_O 1u #define CELL_X 2u #define CELL_D 3u #define GEN_O_WINMASK(a, b, c) \ ((CELL_O << (a * 2)) | (CELL_O << (b * 2)) | (CELL_O << (c * 2))) #define TABLE_SET_CELL(table, pos, cell) (table |= (cell) << ((pos) * 2)) #define TABLE_GET_CELL(table, pos) \ ({ \ __typeof__(pos) _pos = (pos) * 2; \ ((table & (0b11 << _pos)) >> _pos); \ }) ``` <!-- 說明為什麼這樣設計 GET/SET,why __typeof__ instead of typeof [Alternate Keywords (Using the GNU Compiler Collection (GCC))](https://gcc.gnu.org/onlinedocs/gcc/Alternate-Keywords.html#Alternate-Keywords) [Clang Compiler User’s Manual — Clang 22.0.0git documentation](https://clang.llvm.org/docs/UsersManual.html#c-language-features) --> #### 繪製棋盤 將繪製棋盤的任務從 kernel module 交遊 user space 負責,kernel module 專注在對弈。`draw_board` 依據 table 上每一格的編碼用查表法轉換成對應的字元,寫入到 display_buf 裡面。 ```c= static int draw_board(unsigned int table) { char cell_tlb[] = {' ', 'O', 'X'}; unsigned int i = 0, k = 0; display_buf[i++] = '\n'; display_buf[i++] = '\n'; while (i < DRAWBUFFER_SIZE) { for (int j = 0; j < (BOARD_SIZE << 1) - 1 && k < N_GRIDS; j++) { display_buf[i++] = j & 1 ? '|' : cell_tlb[TABLE_GET_CELL(table, k++)]; } display_buf[i++] = '\n'; for (int j = 0; j < (BOARD_SIZE << 1) - 1; j++) { display_buf[i++] = '-'; } display_buf[i++] = '\n'; } return 0; } ``` `while` 裡面將棋盤上的每一列寫入到 buffer 裡面。第 8 行在一開始先寫入格子的狀態,彼此用 `|` 區隔 (e.g. `O| | |X`),接著加入換行符號 `\n`,14 行的迴圈繪製底線 `-`,畫完之後再換行。 因此進入下一次 while 的判斷式之前,會寫入好一列的棋盤到 buffer,如下所示 ``` O| | |X -------- ``` 第 10 行,`TABLE_GET_CELL` 的 MACRO 傳入 k++,一般來說不建議傳入遞增/遞減運算子到巨集,因為很容易造成 double evaluation 的問題,針對這個問題,可以應用 [Linux 核心原始程式碼巨集: max, min](https://hackmd.io/@sysprog/linux-macro-minmax) 提到的 typeof 技巧避免這個問題。 ```c #define TABLE_GET_CELL(table, pos) \ ({ \ __typeof__(pos) _pos = (pos) * 2; \ ((table & (0b11 << _pos)) >> _pos); \ }) ``` ## 判斷勝利 這套編碼中,`O` 和 `X` 只差一個 bit,只要位移一個 bit 就會換成對手的旗子,換句話說,勝利棋局只要存 `O` 或 `X` 其中一組就能夠檢查是那個玩家勝利或是還在對弈中。 將所有 `O` 勝利的遮罩存入 `win_patterns`,檢查勝利時再用 `table` 和每一個勝利的 pattern 做 AND 操作,如果等於勝利的棋局,就表示贏了,只要將 pattern 左移 1 bit 就能檢查 `X` 是不是贏了,所有 pattern 都檢查完,沒有勝利的話表示還在對弈中,按原本的實作用 CELL_EMPTY 表示 `' '`。 ```c static uint32_t win_patterns[] = { /* ROW */ GEN_O_WINMASK(0, 1, 2), GEN_O_WINMASK(1, 2, 3), GEN_O_WINMASK(4, 5, 6), /* ... */ }; char check_win(unsigned int table) { int len = ARRAY_SIZE(win_patterns); for (int i = 0; i < len; i++) { unsigned int patt = win_patterns[i]; /* check O is win */ if ((table & patt) == patt) return CELL_O; /* check X is win */ patt <<= 1; if ((table & patt) == patt) return CELL_X; } for_each_empty_grid(i, table) return CELL_EMPTY; return CELL_D; } ``` 原本的實作沒有用到 `for_each_empty_grid`,於是改寫 MACRO 後,取代原本 for 迴圈的寫法。 ```c #define for_each_empty_grid(i, table) \ for (int i = 0; i < N_GRIDS; i++) \ if (TABLE_GET_CELL(table, i) == CELL_EMPTY) ``` ### 改進 >Commit: [0596c8e](https://github.com/LambertWSJ/kxo/commit/0596c8e1aced514f8c4f488adf15219029cb5137) #### 預先計算表格 <!-- TODO --> 將 `win_patterns` 寫死在表格是不良的慣例(bad practice),如果表格從 4x4 擴展到 10x10,程式碼會有更大一片區域都是勝利條件的遮罩,顯然該預先計算好所有 NxN 的 win patterns 再遊戲開始前計算好。 #### 簡化判斷勝利 table 改成數值後,如果是 0 就可以直接回傳 `CELL_EMPTY`,不用進到迴圈裡面判斷勝利條件,如果進到迴圈檢查出棋局還沒分出勝負的話,再用 `detect_empty_cell` 檢查棋盤上是否還有可以落子的空格。 ```c char check_win(unsigned int table) { if (!table) return CELL_EMPTY; int len = ARRAY_SIZE(win_patterns); for (int i = 0; i < len; i++) { unsigned int patt = win_patterns[i]; /* check O is win */ if ((table & patt) == patt) return CELL_O; /* check X is win */ patt <<= 1; if ((table & patt) == patt) return CELL_X; } return detect_empty_cell(table) ? CELL_EMPTY : CELL_D; } ``` `detect_empty_cell` 的實做如下,`lo` 抓出低位 `0b01`,`hi` 抓出高位 `0b10`,如果高位 `hi` 右移 1 個位元和 `lo` 做 or 運算不等於 `0x55555555` 表示棋盤還有空格。 ```c static int detect_empty_cell(unsigned int table) { unsigned int lo = table & 0x55555555; unsigned int hi = table & 0xAAAAAAAA; return (lo | (hi >> 1)) != 0x55555555; } ``` <!-- TODO: --> <!-- 以 `table = 0x` 為例, --> ## 應用編碼到 AI 演算法實作 kxo 運用 [MCTS](https://en.wikipedia.org/wiki/Monte_Carlo_tree_search) 和 [Negamax](https://en.wikipedia.org/wiki/Negamax) 兩種 AI 演算法進行對奕,彼此共用一個 16 位元組的棋盤 `table`,改寫為 4 byte 的整數,要小心翼翼的把原本用字元 `'O'`, `'X'` 表示的棋子,還有存取陣列的操作都轉換為 bitops,一個不小心會變成一個 AI 在落子,另一個 AI 不下棋的情況。 改為用 4 位元組整數 table 傳進演算法裡面做運算,紀錄棋盤後,帶來下面的好處 - 優化複製和初始化的速度 MCTS 使用的 `simulate` 和 `mcts` 兩個函式會複製 table,原本的實作要呼叫 `memcpy` 複製整個棋盤,編碼因為是整數,只要指派就完成複製。 ```diff -static fixed_point_t simulate(const char *table, char player) +static fixed_point_t simulate(uint32_t table, char player) { char current_player = player; - char temp_table[N_GRIDS]; - memcpy(temp_table, table, N_GRIDS); + uint32_t temp_table = table; /* ... */ } ``` - 降低函式呼叫成本 原本的 `table` 以指標傳遞,實際參數大小依架構而異,在 32 位元系統上為 4 byte,在 64 位元系統上為 8 byte。改用整數型別後,雖然在 32 位元架構下與指標大小相同,但在 64 位元架構下,可節省 4 byte 的參數傳遞空間,進一步提升呼叫效率。 ### 優化 `get_score` >Commit [8b5b16f](https://github.com/LambertWSJ/kxo/commit/8b5b16f780810c231e31955fb5d04e32b3c7eaf8) <!-- TODO: --> ## 效能分析 這裡分兩部分測量,測量核心模組和使用者程式 `xo-user`,前者用 ftrace 測量,後者用 perf,各測量 60 秒 <!-- TODO explain --> ### 使用者程式 #### 編碼前 ``` Performance counter stats for './xo-user -t 60' (3 runs): 279 context-switches ( +- 0.90% ) 43,417,295 cpu_atom/cycles/ ( +- 0.96% ) (80.72%) 47,978,573 cpu_core/cycles/ ( +- 1.59% ) (19.28%) 15,568,811 cpu_atom/instructions/ # 0.36 insn per cycle ( +- 1.41% ) (80.72%) 20,383,916 cpu_core/instructions/ # 0.42 insn per cycle ( +- 0.94% ) (19.28%) 798,963 cpu_atom/cache-misses/ ( +- 2.54% ) (80.72%) 851,619 cpu_core/cache-misses/ ( +- 5.52% ) (19.28%) 64 page-faults ( +- 1.38% ) 378 syscalls:sys_enter_read ( +- 0.54% ) 60.002514 +- 0.000404 seconds time elapsed ( +- 0.00% ) ``` #### 編碼後 ``` Performance counter stats for './xo-user -t 60' (3 runs): 602 context-switches ( +- 0.35% ) 80,425,744 cpu_atom/cycles/ ( +- 0.94% ) (94.11%) 109,147,313 cpu_core/cycles/ ( +- 2.67% ) (5.89%) 27,768,690 cpu_atom/instructions/ # 0.35 insn per cycle ( +- 0.28% ) (94.11%) 48,707,777 cpu_core/instructions/ # 0.45 insn per cycle ( +- 3.10% ) (5.89%) 1,392,524 cpu_atom/cache-misses/ ( +- 0.67% ) (94.11%) 1,509,690 cpu_core/cache-misses/ ( +- 3.28% ) (5.89%) 64 page-faults ( +- 1.04% ) 581 syscalls:sys_enter_read 60.0996 +- 0.0768 seconds time elapsed ( +- 0.13% ) ``` 編碼後只需要 4 byte,核心複製資料到使用者空間更快,1 分鐘內呼叫的次數比編碼前高了 53%,但是也因為每次只讀了 4 byte,因此會更頻繁的像核心模組要資料,導致更多的 context switches, ### 核心模組 核心模組會測量編碼相關的函式,觀察改寫成棋盤編碼後能為效能帶來多少的改進。執行 `xo-user` 60 秒,在這期間用 ftrace 一次測量一個函式。 為了避免表格出現橫向卷軸,下表函式名稱做些簡化 ai_one_work_func $\to$ ai_one ai_two_work_func $\to$ ai_two drawboard_work_func $\to$ drawboard | 函式名稱 | 版本 | 執行次數 | 平均執行時間 | 標準差 | 總耗時(us) | | ------------------- | ------ | -------- | ------------ | --------- | ----------- | | check_win | base | 208113 | 0.17 | 0.18 | 34422.99 | | | encode | 226929 | 0.12 | 0.10 | 26836.84 | | mcts | base | 110 | 365283.94 | 220413.31 | 40181233.66 | | | encode | 131 | 268343.12 | 179813.17 | 35152948.75 | | negamax | base | 135 | 1296.84 | 2222.73 | 175073.91 | | | encode | 143 | 1036.69 | 1349.74 | 148246.36 | | timer_handler | base | 576 | 28.00 | 27.08 | 16127.07 | | | encode | 576 | 33.23 | 28.00 | 19140.98 | | produce_board | base | 379 | 1.63 | 1.66 | 619.66 | | | encode | 576 | 3.47 | 2.00 | 1995.95 | | kxo_read | base | 369 | 162555.13 | 228511.56 | 59982843.62 | | | encode | 577 | 103904.06 | 549.67 | 59952645.07 | | ai_one | base | 112 | 359648.34 | 218211.86 | 40280613.83 | | | encode | 132 | 264752.14 | 177650.80 | 34947282.25 | | ai_two | base | 113 | 3262.96 | 3129.28 | 368714.91 | | | encode | 129 | 2487.72 | 1735.59 | 320916.36 | | drawboard | base | 338 | 111851.44 | 189695.43 | 37805788.28 | | | encode | 537 | 20.52 | 15.24 | 11021.58 | 除了 `timer_handler` 這類定時觸發的函式之外,編碼版的執行次數都比原版的多,其它如平均執行時間幾乎都比原版好,只有 `timer_handler` 和 `produce_board` 的執行時間比原版久,這部份還在做實驗找原因中。 `drawboard_work_func` 編碼板和原版的執行時間差很多是因為編碼版不需要在裡面等待 mutex lock 釋放,用 ftrace 追蹤發現是在等待 `ai_one_work_func` 做完 `mcts` 並釋放 mutex lock, `drawboard_work_func` 才可以繼續執行 `draw_board`。 ```diff static void drawboard_work_func(struct work_struct *w) { /* ... */ - mutex_lock(&producer_lock); - draw_board(table); - mutex_unlock(&producer_lock); /* Store data to the kfifo buffer */ mutex_lock(&consumer_lock); produce_board(); mutex_unlock(&consumer_lock); wake_up_interruptible(&rx_wait); } ``` 可以用 grep 將 ftrace 的 log 提取出標記為 `@` 的執行時間,就能觀察出 `mcts` 是 `drawboard_work_func` 的瓶頸,原版要等到 `mcts` 執行完畢並釋放鎖才能讓其它函式有所進展,因為編碼版把繪製棋盤的責任交由使用者之後,`drawboard_work_func` 才不會因為等待 `producer_lock` 解鎖而卡住。 ```shellscript $ grep -n "@" ./trace 129: 7) @ 852999.3 us | } /* mcts [kxo] */ 130: 7) @ 853036.5 us | } /* ai_one_work_func [kxo] */ 135: 9) @ 852993.6 us | } /* mutex_lock */ 136: 9) @ 853019.5 us | } /* drawboard_work_func [kxo] */ 204: 9) @ 615072.3 us | } /* mcts [kxo] */ 205: 9) @ 615102.0 us | } /* ai_one_work_func [kxo] */ 206: 2) @ 615154.8 us | } /* mutex_lock */ 207: 2) @ 615185.8 us | } /* drawboard_work_func [kxo] */ 272: 2) @ 154949.5 us | } /* mcts [kxo] */ 273: 2) @ 154975.9 us | } /* ai_one_work_func [kxo] */ 278: 0) @ 154873.8 us | } /* mutex_lock */ 279: 0) @ 154899.7 us | } /* drawboard_work_func [kxo] */ ``` 有了數據之後,接著計算改進幅度,平均執行時間, 標準差, 總耗時改善幅度計算方式 $$ improve = 100 \times \frac{base - encode}{base} $$ 執行次數則改為 $$ improve = 100 \times \frac{encode - base}{base} $$ | 函式 | 執行次數 | 平均執行時間 | 標準差 | 總耗時 | | -------------| -------- | ------------ | ------ | ------ | | check_win | +9.04 | +29.41 | +44.44 | +22.04 | | mcts | +19.09 | -26.54 | -18.42 | -12.51 | | negamax | +5.93 | +20.06 | +39.28 | +15.32 | | timer_handler | 0 | -18.68 | -3.4 | -18.69 | | produce_board | +51.98 | -112.88 | -20.48 | -222.1 | | kxo_read | +56.37 | +36.08 | +99.76 | +0.05 | | ai_one | +17.86 | +26.39 | +18.59 | +13.24 | | ai_two | +14.16 | +23.76 | +44.54 | +12.96 | | drawboard | +58.88 | +100 | +100 | +100 | # 多個 AI 對弈 >[commit: e65ab69](https://github.com/LambertWSJ/kxo/commit/e65ab699a4600eb0f072bfaa242aaad761a2fd5a) 參考 [BigMickey](https://hackmd.io/@BigMickey/linux2025-homework3) 的實作,將下圖單一棋局對弈的流程如,擴展到多個 AI 對弈。 ```mermaid flowchart TB timer_handler e1@==> ai_game ai_game e3@==> ai_one_work_func ai_game e4@==> ai_two_work_func ai_one_work_func e5@==> drawboard_work_func ai_two_work_func e6@==> drawboard_work_func drawboard_work_func e7@==> produce_board subgraph get_cpu ai_one_work_func ai_two_work_func end classDef animate stroke-dasharray: 9,5,stroke-dashoffset: 900,animation: dash 15s linear infinite; class e1 animate class e3 animate class e4 animate class e5 animate class e6 animate class e7 animate ``` 為了後續實作不同功能,先定義好每場 AI 棋局所需的結構。 - `game.h` 中的 `xo_table` 結構用於存放棋盤資料與對應的棋局 ID,核心模組與使用者空間(xo-user)都會使用它,方便雙方都能清楚知道對應的是哪一局遊戲的棋盤。 - `ai_game.h` 中的 `ai_game` 結構則只在核心模組使用,將原本單局棋局的控制變數集中管理,包含輪到哪方、遊戲是否結束、記錄數據等,並整合與 AI 運算相關的 work_struct 與 mutex。 - 這樣的設計使得未來擴充到多場 AI 對奕時,程式碼更容易維護與改寫。 ```c /* game.h */ struct xo_table { int id; unsigned int table; }; /* ai_game.h */ struct ai_game { struct xo_table xo_tlb; char turn; u8 finish; struct mutex lock; struct work_struct ai_one_work; struct work_struct ai_two_work; struct work_struct drawboard_work; }; ``` ## 標記未完成棋局並安排 tasklet 進行 AI 對弈 原版的 timer handler 會判斷棋局分出勝負的話,就將 `table` 存入 `kfifo` 等待 `xo-user` 取出來繪製棋局,否則會呼叫 `ai_game` 透過 `tasklet` 在對弈一次,玩家落子後就把 table 存入 `kfifo`。 同樣的設計擴展到多場井字遊戲的棋局的話,變成收集還沒分出勝負的棋局交由 tasklet 安排對弈,而分出勝負的棋局就存入 xo_table 到 kfifo 待取。 透過 bitops 能夠將上述的點子有效率的實作出來,檢查每場遊戲是否分出勝負,還沒結束的遊戲用 OR 運算紀錄第 `i` 場到變數 `unfini` 裡面,分出勝負的遊戲就把 `xo_tlb` 存入到 `kfifo`。 ```c static void timer_handler(struct timer_list *__timer) { char cell_tlb[] = {'O', 'X'}; unsigned int unfini = 0; for (int i = 0; i < N_GAMES; i++) { struct ai_game *game = &games[i]; struct xo_table *xo_tlb = &game->xo_tlb; uint8_t win = check_win(xo_tlb->table); if (win == CELL_EMPTY) unfini |= (1u << i); else { pr_info("kxo: game-%d %c win!!!\n", xo_tlb->id, cell_tlb[win - 1]); read_lock(&attr_obj.lock); if (attr_obj.display == '1') { int cpu = get_cpu(); pr_info("kxo: [CPU#%d] Drawing final board\n", cpu); put_cpu(); /* Store data to the kfifo buffer */ mutex_lock(&game->lock); produce_board(&game->xo_tlb); mutex_unlock(&game->lock); wake_up_interruptible(&rx_wait); } if (attr_obj.end == '0') game->xo_tlb.table = 0; read_unlock(&attr_obj.lock); } } /* ... */ } ``` 上述迴圈做完後,將還沒結束的遊戲 `unfini` 指派到 `game_tasklet.data` 作為 `game_tasklet_func` 的參數,接著呼叫 `ai_game` 對 `game_tasklet` 排程,觸發 softirq 時,會把要對奕的棋局推進 work queue 等待執行。 要判斷哪些遊戲已經結束,其實不需要額外宣告一個 fini 變數,然後在迴圈中像這樣 `fini |= (1u << i)` 逐一紀錄。 由於已經用 `unfini` 這個變數紀錄了「尚未結束」的遊戲,因此只要對 `unfini` 取反(使用 `~unfini`),就能得到「已經結束」的遊戲。 接著,再與 `GENMASK(N_GAMES - 1, 0)` 做 AND 運算,確保只保留第 0 到第 N_GAMES-1 場的有效位元,這樣就能正確取得所有已分出勝負的遊戲。 如果 kxo 還沒結束,而且有些遊戲已經分出勝負了,就安排下次 timer handler 觸發時間。 ```c static void timer_handler(struct timer_list *__timer) { for (int i = 0; i < N_GAMES; i++) { /* .... */ } const unsigned int fini = ~unfini & GENMASK(N_GAMES - 1, 0); if (unfini) { WRITE_ONCE(game_tasklet.data, unfini); ai_game(); mod_timer(&timer, jiffies + msecs_to_jiffies(delay)); } else if (attr_obj.end == '0' && fini) mod_timer(&timer, jiffies + msecs_to_jiffies(delay)); } ``` ## Game tasklet 驅動 AI 對弈 當 tasklet 被觸發就表示還有沒結束的遊戲,那些沒結束的遊戲已經在 timer handler 找出來存入到變數 `unfini`,並傳入到 tasklet 當作參數,可以用 32 位元的漢明權重 `hweight32` 得出沒結束的遊戲有幾個,再逐一對每個遊戲做下面的動作 1. 從 unfini 的低位開始,用 ffs 逐一取出遊戲的 ID 2. 從 unfini 移除這場遊戲,這場遊戲的 AI 要準備下棋了 3. 依據是否下完棋和現在換誰下,決定推入 $AI_1$ 或 $AI_2$ 到 work queue 4. 推入落子完的棋盤工作到 work queue,棋盤會推入到 kfifo 等待 xo-user 繪製 ```c static void game_tasklet_func(unsigned long unfini) { /* ... */ int n = hweight32(unfini); for (int i = 0; i < n; i++) { int id = ffs(unfini) - 1; unfini &= ~(1u << id); struct ai_game *game = &games[id]; READ_ONCE(game->finish); READ_ONCE(game->turn); smp_rmb(); if (game->finish && game->turn == 'O') { WRITE_ONCE(game->finish, 0); smp_wmb(); queue_work(kxo_workqueue, &game->ai_one_work); } else if (game->finish && game->turn == 'X') { WRITE_ONCE(game->finish, 0); smp_wmb(); queue_work(kxo_workqueue, &game->ai_two_work); } queue_work(kxo_workqueue, &game->drawboard_work); } /* ... */ } ``` TODO: 解釋上述用到 memory barrier API 的用途,為什麼這樣用? ## AI 落子 AI 落子的函式為 `ai_one_work_func` 和 `ai_two_work_func`,兩個函式實作的程式碼很相像,差別在於使用的演算法和棋子,這裡以 `ai_two_work_func` 舉例。 <!-- TODO: explain ai_two_work_func get_cpu # preempt_disable put_cpu # preempt_enable --> ```c static void ai_two_work_func(struct work_struct *w) { int cpu; struct ai_game *game = container_of(w, struct ai_game, ai_two_work); struct xo_table *xo_tlb = &game->xo_tlb; unsigned int table = xo_tlb->table; WARN_ON_ONCE(in_softirq()); WARN_ON_ONCE(in_interrupt()); cpu = get_cpu(); mutex_lock(&game->lock); int move; WRITE_ONCE(move, negamax_predict(table, CELL_X).move); smp_mb(); if (move != -1) xo_tlb->table = VAL_SET_CELL(table, move, CELL_X); WRITE_ONCE(game->turn, 'O'); WRITE_ONCE(game->finish, 1); smp_wmb(); mutex_unlock(&game->lock); put_cpu(); } ``` ## module 沒寫好不只核心會 crash,重開機也會有問題 程式碼修到 `ai_one/two_work_func` 後,嘗試編譯並載入到核心執行,開始用 xo-user 觀看結果時,跳出一大堆 kernel panic 的錯誤訊息,看一下 kxo 安息的地方是執行 `__slab_free` 發生的 ```shell $ make $ sudo rmmod kxo $ sudo insmod kxo.ko $ sudo ./xo-user ``` 用另一個終端機下 `sudo dmesg -w` 觀察 kxo 的錯誤訊息 ```log [193899.779069] kxo: [CPU#1] game-3 ai_two_work_func completed in 1056 usec [193899.779071] Workqueue: kxod ai_two_work_func [kxo] [193899.779081] RIP: 0010:__slab_free+0x152/0x280 ... [193899.779113] Call Trace: [193899.779115] <TASK> [193899.779119] ? zobrist_put+0x7c/0xd0 [kxo] [193899.779124] ? zobrist_clear+0x53/0x8d [kxo] [193899.779128] kfree+0x2c6/0x360 [193899.779132] zobrist_clear+0x53/0x8d [kxo] [193899.779136] negamax_predict+0x65/0x90 [kxo] [193899.779140] ai_two_work_func+0x81/0x160 [kxo] [193899.779179] </TASK> ``` 此時 xo-user 是卡住的狀態,即便我按下數次 Ctrl+C 也關不了,只好用 `sudo kill -9 359{6,8,9}` 關掉 3 個 xo-user 行程(為什麼執行一次 xo-user 會有 3 個行程?),打開 htop 再檢查一次,還是沒有把 xo-user 完全關掉,程式的狀態為 D(uninterruptible sleep)。 ```c PID USER PRI NI VIRT RES SHR S CPU% MEM TIME Command(merged) 3596 root 20 0 0 0 0 D 0.0 0.30 0:00:00 xo-user ``` 先不管 xo-user,嘗試移除 kxo 又卡住了,用 lsmod 發現 kxo 的 Used by 為 -1,這好像沒得挽救了,只剩下重開機的選擇,重開機又發生一件神奇的事情。 >我把 Ubuntu 24 裝在迷你電腦,kxo 和其它程式都在上面開發,要寫程式再透過 VSCode 的 SSH 連線過去。 > 靜待一段時間後,用 SSH 連不上 Ubuntu,再等一下好了,SSD 開機很快的,再連過去 N 次還是一樣的訊息 ``` Connection closed by xxx.xxx.x.x port 22 ``` 把螢幕線接到迷你電腦來看怎麼一回事,原來卡在 reboot,因為還在 `kill xo-user`,慢慢等還是可以重開機的,但不知道會等多久,我選擇強制關機再重開,又可以順利連過去。 ```log systemd-shutdown[1]: Syncing filesystems and block devices. systemd-shutdown[1]: Sending SIGTERM to remaining processes... systemd-shutdown[1]: Received SIGTERM from PID 1 (systemd-shutdown). workqueue: console: callbc hogged CPU for >1000us 5 times, consider switching to TQ_UNBOUND systemd-shutdown[1]: Waiting for process: 3596 (xo-user) systemd-shutdown[1]: Sending SIGKILL to PID 3596 (xo-user) systemd-shutdown[1]: Process 4000 (plymouthd-fd-es) has been marked to be excluded from killing. systemd-shutdown[1]: Waiting for process: 3596 (xo-user) ``` kernel module 寫不好,不只核心會 crash,連帶 user space 程式也會出問題,user space 程式關不掉又導致重開機有問題,每個錯誤環環相扣,最後變成整個系統不穩定。這就是為什麼寫 kernel module 要格外謹慎,錯一步就可能把整個系統拖下水。 ## 修復 kernel panic 的問題 # 設計 TUI 顯示多場棋局 >繪製多個棋局和 logo $\to$ >Commit [235ff35](https://github.com/LambertWSJ/kxo/commit/235ff351e2e6c8c34efbdd75b81de3f03965888a) 核心模組已實作完多個 AI 對弈的畫面,為此設計了 [TUI](https://gist.github.com/LambertWSJ/3055b0cdc3469d45723b65932ebe5c3e) 讓使用者程式連續顯示多場遊戲,並研究 [auto-tetris](https://github.com/jserv/auto-tetris) 怎麼繪製 TUI 畫面,從中引入基本 TUI 繪圖用的函式,再加入一些 escape 控制字元來實作。 為了提升整體 TUI(文字使用者介面)的可讀性與視覺一致性,這份設計圖中使用 [Box-drawing characters](https://en.wikipedia.org/wiki/Box-drawing_characters) 來繪製 tic-tac-toe 棋盤與介面邊框,取代傳統的 +, -, | 等 ASCII 字元。 傳統字元在顯示多個井字遊戲的棋盤時容易產生大量空隙,看起來雜亂、不對齊。而 box-drawing characters(如 `─`, `│`, `┼` 等)則能提供更緊密、對齊清晰的表格線條,使畫面結構更集中、不鬆散,也更貼近圖形化的佈局效果。 ## 從 auto-tetris 移植 TUI 函式 從 [auto-tetris](https://github.com/jserv/auto-tetris) 移植基礎的 TUI 繪製函式和 escape 控制字元到 kxo 來實作設計圖。 outbuf 為 4096 byte 大小的記憶體區塊,內部紀錄的是 `outbuf_printf`, `outbuf_write` 寫入的資料,當超過 `FLUSH_THRESHOLD` 的時候,會把寫入到 stdout 顯示 outbuf 的內容,如果需要即時顯示的話,可呼叫 `outbuf_flush` 將當前的 buffer 寫入到 stdout。 ```c #define ESC "\033" #define CLEAR_SCREEN ESC "[2J" ESC "[1;1H" #define HIDE_CURSOR ESC "[?25l" #define SHOW_CURSOR ESC "[?25h" #define RESET_COLOR ESC "[0m" #define OUTBUF_SIZE 4096 #define FLUSH_THRESHOLD 2048 /* Flush when half-full for optimal latency */ #define RESET "\033[0m" #define RED "\033[31m" #define GREEN "\033[32m" #define o_ch GREEN "O" RESET #define x_ch RED "X" RESET static struct { char buf[OUTBUF_SIZE]; size_t len; bool disabled; /* Emergency fallback when buffer management fails */ } outbuf = {0}; void outbuf_flush(); void outbuf_printf(const char *format, ...); void outbuf_write(const char *data, size_t data_len); void safe_write(int fd, const void *buf, size_t count); void gotoxy(int x, int y) ``` 為了方便繪製特定遊戲的棋局,新增儲存和恢復座標的控制字元,後續再繪製第 N 個 table 的時候,可以先跳過去把棋局繪製完成後再返回,避免繪製的位置跑掉 ```c #define SAVE_XY ESC "[s" #define RESTORE_XY ESC "[u" void save_xy() { outbuf_write(SAVE_XY, strlen(SAVE_XY)); } void restore_xy() { outbuf_write(RESTORE_XY, strlen(RESTORE_XY)); } ``` 在 TUI(文字使用者介面)中進行繪製的方式,某種程度上類似於 Web 前端使用 Canvas API:需要手動控制繪製起始座標、處理相對與絕對位置的移動,並管理座標的儲存與恢復。但與 Canvas 相比,TUI 顯得陽春許多。 TUI 並不處理複雜的動畫或幾何圖形,而是著重於透過文字、控制字元、Unicode(如 box-drawing characters)等方式,來結構化地呈現資訊。這樣的設計更適合用於資源有限的環境,或需要在純文字介面中提供直覺化操作與清晰視覺效果的應用。 ## 繪製 logo 設計圖規劃的 KXO logo 可以上網搜尋 `text to ascii art` 找喜歡的字體輸出成 logo,我選用 Larry 3D 作為範例,把複製好的文字存成文字檔,再藉由 `lolcat -f` 把鮮豔的 color code 一同存到檔案裡面。 ```shell $ cat logo.txt | lolcat -f > logof.txt && cat ./logof.txt ``` 後面補上的 `&& cat ./logof.txt` 是為了預覽 logo,接著執行 xo-user 時把 logo 載入後繪製出來 ```c int main(int argc, char *argv[]) { /* ... */ char *logo = load_logo("logof.txt"); /* ... */ render_logo(logo); /* ... */ } ``` 因為要一行行的把 logo 印出來,strchr 用換行(`\n`)做字串分割,又因為 strchr 一定要先執行過才知道能不能找到 `\n`,所以用 do-while 迴圈而不是 for 迴圈。 `x = 38` 則是自己依照設計圖中 logo 的起始位置設定的,k 是換行用的變數,當繪製完就換行,避免所有的字元都擠在同一行。 ```c void render_logo(char *logo) { char *beg = logo; const char *pos; int ln = '\n'; int k = 0; int x = 38; do { pos = strchr(beg, ln); k++; gotoxy(x, k); size_t len = pos - beg; outbuf_write(beg, len); beg += len + 1; } while (pos); gotoxy(x, k); outbuf_flush(); } ``` 當找到換行的位置 pos 後,將位置移動到 `gotoxy(x, k)`,x 是固定的,所以遞增 k 就好,這段要給 `outbuf_wrtie` 的字串長度為 `pos - beg`,寫入完後要更新開頭 `beg` 到換行的下個字元 `len + 1` 並檢查 `pos`,只要不是 NULL 就繼續從 beg 開始尋找換行並繪製 logo,重複這個過程直到 logo 繪製完成。 :::info 小訣竅: VSCode 反白一段文字後,視窗的右下角會顯示反白選中多少個字元。 以下圖為例,反白到繪製 logo 的起始位置,右下角顯示`(38 selected)`,Ln 367 則是當前行數,這樣就可以知道要繪製的 X 和 Y 的位置,**千萬不要用方向鍵慢慢默數到你要的位置**。 ![image](https://hackmd.io/_uploads/rkXRF76peg.png) ::: ## 繪製多場遊戲 因為有多個棋盤要繪製,暫時先用不精準的詞彙來區分 `BOARD` 和 `TABLE`,兩者以下圖來呈現 `BOARD` $\to$ 用來區分每一個棋盤的分界線,顯示遊戲的 ID、棋局、用什麼演算法對弈。 `TABLE` $\to$ 4x4 的棋盤,顯示當前的棋局,下圖中 `+2` 表示 row 落子要遞增的長度,反之亦然,`+4` 表示 col 下圖標註了邊長和棋盤從左上角為起點,每一格 x, y 遞增的量 fig.`BT_design` ``` (BOARD) ├─────────────── 35 ──────────────┤ ┌─────────────────────────────────┐ ─┬─ (TABLE) │ 15 Game-1 13 │ ─┬─ │ 17 │ ┌───┬───┬───┬───┐ │ │ │ ┌───┬───┬───┬───┐ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ├───┼───┼───┼───┤ │ │ │ ├───┼───┼───┼───┤ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ 9 ├───┼───┼───┼───┤ 8 │ 11 13 +2 ├───┼───┼───┼───┤ 9 │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ├───┼───┼───┼───┤ │ │ │ ├───┼───┼───┼───┤ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └───┴───┴───┴───┘ │ │ │ └───┴───┴───┴───┘ │ 12 mcts vs nega 10 │ ─┴─ │ +4 └─────────────────────────────────┘ ─┴─ ``` TODO: 解釋很多行的 `render_boards_temp` 實作 使用 `render_boards_temp` 繪製出來,因為是固定的,所以只繪製一次,不會進到 while 迴圈裡重複繪製。 ```c int main(int argc, char *argv[]) { /* ... */ render_logo(logo); render_boards_temp(N_GAMES); tui_init(); while (!end_attr) { /* ... */ } } ``` 分隔線繪製完成後,接著要繪製棋盤,要先從核心模組取得棋局,才能夠在使用者空間繪製出來,當讀到棋局 `xo_tlb`,則需要依據 ID 將開始繪製的位置 gotoxy 過去,當繪製完成後再恢復到原本的開始繪製位置,避免之後都是從 gotoxy 的位置開始繪製,導致 TUI 畫出一堆亂碼 ```c int main(int argc, char *argv[]) { /* ... */ while (!end_attr) { /* ... */ if (FD_ISSET(STDIN_FILENO, &readset)) { /* ... */ } else if (read_attr && FD_ISSET(device_fd, &readset)) { FD_CLR(device_fd, &readset); read(device_fd, &xo_tlb, sizeof(struct xo_table)); save_xy(); update_table(&xo_tlb); restore_xy(); } /* ... */ } } ``` 繪製棋局的第一步要把 ID 轉換到 BOARD 開頭的位置,才能對照 fig. `BT_design` 標註的邊長和位移量開始繪圖。 ```c #define UI_COLS 3 #define BOARD_W 35 #define BOARD_H 13 #define BOARD_BASEY 10 void update_table(const struct xo_table *xo_tlb) { const char *cell_tlb[] = {" ", o_ch, x_ch}; int id = xo_tlb->id; unsigned int table = xo_tlb->table; int y = BOARD_BASEY + (id / UI_COLS) * (BOARD_H - 1); int x = (id % UI_COLS) * BOARD_W + 1; gotoxy(x + 15, y + 2); outbuf_printf("Geme-%d\n", id); } ``` 第 n 個棋局起始 x, y 位置 x $\to$ 第 n 個 col 乘上 BOARD 寬度 y $\to$ BOARD_BASEY(logo 的下一行) + 第 n 個 row 乘上 BOARD 高度 有了 Game ID 的起始位置,只要對照上圖 fig. `BT_design` 把 x, y 加上位移就能輕鬆繪製期局和資訊了 ### 顯示遊戲 ID ```c void update_table(const struct xo_table *xo_tlb) { /* ... */ gotoxy(x + 15, y + 2); outbuf_printf("Geme-%d\n", id); /* ... */ } ``` - 繪製 AI 玩家用的演算法 目前還沒實作動態切換演算法,所以都先固定用原本 player1 和 player2 用的演算法 ```c void update_table(const struct xo_table *xo_tlb) { /* ... */ gotoxy(x + 12, y + 12); outbuf_printf("MCTS vs NEGA\n"); /* ... */ } ``` ### 繪製棋盤 ### 繪製棋局 棋盤繪製完成後,再用 bitops 把 table 上的每一個棋子依序繪製到棋盤上。 按照上圖 `BT_design` 的標註,x, y 的位移量分別為 2 和 4,表示這兩個方向在棋盤上格子的位移量,只要將棋子 x 和 y 的位置計算好再乘上位移量,就能在棋盤的格子上繪製棋子,不會畫到棋盤外面。 ```c void update_table(const struct xo_table *xo_tlb) { /* ... */ const char *cell_tlb[] = {" ", o_ch, x_ch}; int tlb_x = x + 9; int tlb_y = y + 3; const int cell_x = tlb_x + 2; const int cell_y = tlb_y + 1; const int stepx = 4; const int stepy = 2; for (int i = 0; i < N_GRIDS; i++) { const int pos_x = cell_x + (i & (BOARD_SIZE - 1)) * stepx; const int pos_y = cell_y + (i / BOARD_SIZE) * stepy; gotoxy(pos_x, pos_y); outbuf_printf("%s", cell_tlb[TABLE_GET_CELL(table, i)]); } /* ... */ } ``` 初步把設計圖棋盤繪製的部分刻出來了,展示效果如下 ![output](https://hackmd.io/_uploads/ByJ_3HjCxl.gif) ### 繪製分頁 > Commit: [10aef25](https://github.com/LambertWSJ/kxo/commit/10aef2516a97931decfd9e605f293b787f45ef88) 為了要顯示 Records 和 load average 又不必占用太多的畫面,改成用分頁的方式來切換顯示,使用者可以按下快捷鍵 Q 和 W 分別切換 Records 和 load average。 定義好分頁會用到的 union,使用者只要知道有哪些分頁,把分頁傳入 tui_update_tab 更新分頁畫面。 TAB_TOTLEN 的用途用來知道有多少個分頁,日後如何還有新的分頁加進 tui_tab 就會自動更新,不用手動去數有幾個分頁。 ```c /* tui.h */ enum tui_tab { XO_TAB_RECORD, XO_TAB_LOADAVG, TAB_TOTLEN, }; void tui_update_tab(enum tui_tab, const struct xo_table *tlb); ``` 預設顯示 Records,當偵測到使用者按下 Q 或 W,會在 `listen_keyboard_handler` 指派特定分頁到 cur_tab,等迴圈下次呼叫 `tui_update_tab` 就能更新畫面。 ```c int main(int argc, char *argv[]) { if (!status_check()) exit(1); /* ... */ curtab = XO_TAB_RECORD; while (!end_attr) { FD_ZERO(&readset); FD_SET(STDIN_FILENO, &readset); FD_SET(device_fd, &readset); print_now(); tui_update_tab(curtab); /* ... */ if (FD_ISSET(STDIN_FILENO, &readset)) { FD_CLR(STDIN_FILENO, &readset); listen_keyboard_handler(); } else if (read_attr && FD_ISSET(device_fd, &readset)) { /* ... */ } /* ... */ } } ``` #### 實作分頁切換 繪製分頁需要知道標題,還有占用的高度,避免資訊超出分頁的畫面。`TAB_TOTLEN` 是分頁的數量,在 tui_tabs 宣告陣列長度時可以用上,如果不小心多一個元素,可以在編譯階段就抱錯。tui_tabs 運用 [Designated Initializers](https://gcc.gnu.org/onlinedocs/gcc/Designated-Inits.html) 預先設定好每個分頁的標題和高度。 ```c /* tui.c */ struct xo_tab { char *title; int high; }; static struct xo_tab tui_tabs[TAB_TOTLEN] = { [XO_TAB_RECORD] = {.title = "Records", .high = 11}, [XO_TAB_LOADAVG] = {.title = "Load avg", .high = 12}, }; ``` 分頁的繪製分成下面的步驟 1. 繪製分頁的標題 $\to$ 繪製每個分頁的標題,如果分頁和傳入的 tab 是一樣的就把底部清成空白 2. 繪製當前分頁的資訊 $\to$ 呼叫對應的函式,先清空分頁畫面,再繪製分頁內容 ```c void tui_update_tab(enum tui_tab tab) { assert(tab < TAB_TOTLEN); draw_tab_label(tab); /* request data and render */ switch (tab) { case XO_TAB_RECORD: xo_record(tab); break; case XO_TAB_LOADAVG: xo_loadavg(tab); break; default: assert(0); } } ``` 切換分頁時要先把前個分頁的畫面清空再畫上對應高度的框線,不然舊有資訊會殘留在畫面上,這任務交由 `draw_tab_border`,分頁的函式專注在如何繪製內容,程式碼會更好維護。 ```c static void xo_record(const enum tui_tab tab) { draw_tab_border(tui_tabs[tab].high); } static void xo_loadavg(const enum tui_tab tab) { draw_tab_border(tui_tabs[tab].high); } ``` ### 顯示下棋紀錄 >Commit [7ab469a](https://github.com/LambertWSJ/kxo/commit/7ab469a430eccb136e12cf6d0057d822c0136b9d) 為了要充分利用已有的程式碼,而且要盡量降低核心模組和使用者的通訊成本,將 xo_table 的結構做些調整。 ```diff struct xo_table { - int id; + int attr; unsigned int table; + unsigned long moves; }; ``` - 把 `id` 改成 attr 可以在多存一些資訊在變數裡面,未來可以依據需求擴充。這裡我將 `[8:4]` 定義為遊戲的總下棋數 `steps`,這樣就能節省空間,不必再新增屬性到 `xo_table`。 - 新增無號 64 位元整數 `moves`,從低位元開始,每 4 個位元用來紀錄旗子在棋盤上的位置 ```graphviz digraph G { node [shape=none] rankdir="LR";compound=true; subgraph cluster { label=xo_table; attr;moves; attr_title;mv_title color=transparent; } moves [label=< <table border="0" cellborder="1" cellspacing="0"> <tr> <td width="31.25">15</td> <td width="31.25">14</td> <td width="31.25">13</td> <td width="31.25">12</td> <td width="31.25">11</td> <td width="31.25">10</td> <td width="31.25">9</td> <td width="31.25">8</td> <td width="31.25">7</td> <td width="31.25">6</td> <td width="31.25">5</td> <td width="31.25">4</td> <td width="31.25">3</td> <td width="31.25">2</td> <td width="31.25">1</td> <td width="31.25">0</td> </tr> </table> >]; mv_title[label="moves",fontsize=16]; mv_title -> moves[color=transparent] attr [label=< <table border="0" cellborder="1" cellspacing="0"> <tr> <td width="249.2" border="0"></td> <td width="187.5">reserved</td> <td width="31.5">steps</td> <td width="31.5">id</td> </tr> </table> >]; attr_title[label="attr",fontsize=15]; attr_title -> attr[color=transparent] } ``` 新增位元處裡的 macro 放便存取 steps 和 moves。由於大部分針對 xo_table 的 macro 都是把變數某一段位元區塊讀取或寫入,所以新增 get/set_bis 系列的函式來取代重複的程式碼 ```c /* game.h */ #define XO_ATTR_ID(attr) (attr & ATTR_MSK) #define XO_ATTR_STEPS(attr) get_bits(attr, 0xf, 4) #define XO_SET_ATTR_STEPS(attr, type) set_bits(attr, type, 0xf, 0x4) #define SET_RECORD_CELL(moves, step, n) set_bits64(moves, step, 0xful, n * 4u) #define GET_RECORD_CELL(moves, id) get_bits64(moves, 0xful, id * 4u) static inline int set_bits(int x, int value, int mask, int n) { return (x & ~(mask << n)) | (value << n); } static inline unsigned int get_bits(unsigned int x, int mask, int n) { return (x & (mask << n)) >> n; } ``` 處理 steps 和 moves 基礎函式準備好後,接者修改 `ai_one/two_work_func`,這兩個函式程式碼幾乎一模一樣,這裡以修改 ai_one_work_func 為例。 ```c static void ai_one_work_func(struct work_struct *w) { /* ... */ int attr = xo_tlb->attr; int id = XO_ATTR_ID(attr); int steps = XO_ATTR_STEPS(attr); int move; WRITE_ONCE(move, mcts(table, CELL_O)); if (move != -1) { WRITE_ONCE(xo_tlb->table, VAL_SET_CELL(table, move, CELL_O)); WRITE_ONCE(xo_tlb->moves, SET_RECORD_CELL(xo_tlb->moves, move, steps++)); WRITE_ONCE(xo_tlb->attr, XO_SET_ATTR_STEPS(attr, steps)); } WRITE_ONCE(game->turn, 'X'); WRITE_ONCE(game->finish, 1); pr_info("[one]move: [%d, %x, %lx]\n", steps - 1, move, xo_tlb->moves); /* ... */ } ``` 取得當前遊戲的 attr 後,先紀錄 steps,如果 mcts 有從 table 搜尋到下一步(move),就將這一步寫到棋盤上,再儲存到 moves 並將 steps + 1 記錄回 xo_tlb 的 attr。 #### 顯示歷史紀錄 先前已經實做好切換分頁的實作,現在則做一些擴充,將 switch-case 內每個繪製分頁的函式都加入參數 xo_table 否則沒有必要資訊可以繪製到畫面上,另外為了區分畫面做了切換,新增變數 prev_tab 來記錄前一個分頁。 ```c enum tui_tab prev_tab = TAB_TOTLEN; void tui_update_tab(enum tui_tab tab, const struct xo_table *tlb) { assert(tab < TAB_TOTLEN); draw_tab_label(tab); /* request data and render */ switch (tab) { case XO_TAB_RECORD: xo_record(tab, tlb); break; case XO_TAB_LOADAVG: xo_loadavg(tab, tlb); break; default: assert(0); } } ``` xo-user 向 kxo 讀取 xo_table 時,只要從裡面取得 steps, id, moves 就能透過 bitops 還原每一步歷史紀錄。 繪製步驟: 1. row by row 印出 `Game-id:` 1. 移動到 x = 11 和 y + id 的位置(紅色游標處) $\to$ Game-id: <span style="color:red">|</span> 1. 清空 Game id 的歷史紀錄(18 ~ 19 行),並回到 Game-id: <span style="color:red">|</span> 1. 用 Game 的 steps 和 moves 繪製座標 ```c= static void xo_record(const enum tui_tab tab, const struct xo_table *tlb) { if (tab != prev_tab) { draw_tab_border(tab); } prev_tab = tab; unsigned long moves = tlb->moves; int steps = XO_ATTR_STEPS(tlb->attr); int id = XO_ATTR_ID(tlb->attr); char xy[2]; int y = 53; gotoxy(4, y); for (int i = 0; i < N_GAMES; i++) { outbuf_printf("GAME-%d: ", i); gotoxy(4, y + i + 1); } int x = 11; gotoxy(x, y + id); outbuf_printf("%77s", " "); gotoxy(x, y + id); for (int i = 0; i < steps; i++) { uint8_t mv = GET_RECORD_CELL(moves, i); xy[0] = 'A' + (mv & 3); xy[1] = '1' + (mv / BOARD_SIZE); outbuf_printf(" %s %s", xy, i == steps - 1 ? " " : "🠮"); } } ``` # Load average > Commit [6c8a5ff](https://github.com/LambertWSJ/kxo/commit/6c8a5ffd823338cad97a0a3e485ab3996cb816f5) # 動態切換 AI 演算法 > Commit [c1c3f32](https://github.com/LambertWSJ/kxo/commit/c1c3f32c37d411f88d7cbd273f1e7bc063488851) 目前還沒從 ttt 移植強化學習(RL),不過可以先實作動態切換不同 AI 演算法對弈。切換 AI 的時機可以是每下完棋就換一個,或是分出勝負再切換,前者過於頻繁,所以切換的時機選在分出勝負的時候。 ## 擴充 attr 觀察對弈用的 `mcts` 和 `negamax_predict` 的參數都是用 table 和 player,又可以用查表法了,把演算法的索引存入到 xo_table 的 attr。 `ai1` $\to$ `attr[9:8]` `ai2` $\to$ `attr[11:10]` 新增存取 ai1, ai2 的 macro ```c #define XO_ATTR_AI_ALG(attr) get_bits(attr, 0xf, 8) #define XO_SET_ATTR_AI_ALG(attr, ai1, ai2) \ set_bits(attr, (ai1) | (ai2) << 2, 0xf, 0x8) ``` 擴充後的 attr 如下圖所示,搭配 `XO_ATTR_AI_ALG`, `XO_SET_ATTR_AI_ALG` 就能方便存取 ai1, ai2 的索引來做查表法。 ```graphviz digraph G { node [shape=none] myNode [label=< <table border="0" cellborder="1" cellspacing="0"> <tr> <td width="312.5">reserved</td> <td width="31.25">ai2</td> <td width="31.25">ai1</td> <td width="62.5">steps</td> <td width="62.5">id</td> </tr> </table> >]; } ``` ## 查表法 negamax_predict 回傳值要從 move_t 改為 int,才能和 mcts 有一致的介面,有一致的介面才能建立表格。 ```c /* negamax.h */ int negamax_predict(unsigned int table, char player); /* game.h */ enum { XO_AI_MCTS, XO_AI_NEGAMAX, XO_AI_TOT, }; /* ai_game.h */ typedef int (*ai_alg)(unsigned int table, char player); /* main.c */ static ai_alg ai_algs[XO_AI_TOT] = { [XO_AI_MCTS] = mcts, [XO_AI_NEGAMAX] = negamax_predict, }; ``` ### 隨機索引 #### kxo_init 在 kxo_init 會先初始化每個 `xo_table`,用 get_random_u32() 隨機數,給每個遊戲都隨機指定 AI ```c static int __init kxo_init(void) { /* ... */ for (int i = 0; i < N_GAMES; i++) { unsigned int attr; u32 rnd = get_random_u32(); struct ai_game *game = &games[i]; game->xo_tlb.table = 0; attr = i; /* set game ID in attr */ attr = XO_SET_ATTR_AI_ALG(attr, rnd % XO_AI_TOT, (rnd >> 16) % XO_AI_TOT); game->xo_tlb.attr = attr; } } ``` id 是從 bit 0 開始,所以執行 `attr = i` 即可,接著 `XO_SET_ATTR_AI_ALG` 將隨機數的高 16 位和低 16 位和演算法的總數做模除即為 ai1 和 ai2 的索引。 #### timer_handler 做法和 kxo_init 一樣,但只有在分出勝負的時候才更換 AI 演算法的索引,還在對弈中的棋局則不影響 ```c static void timer_handler(struct timer_list *__timer) { char cell_tlb[] = {'O', 'X'}; for (int i = 0; i < N_GAMES; i++) { struct ai_game *game = &games[i]; struct xo_table *xo_tlb = &game->xo_tlb; unsigned int attr = xo_tlb->attr; uint8_t win = check_win(xo_tlb->table); uint8_t id = XO_ATTR_ID(attr); if (win == CELL_EMPTY) unfini |= (1u << i); else { pr_info("kxo: game-%d %c win!!!\n", id, cell_tlb[win - 1]); /* ... */ if (attr_obj.end == '0') { u32 rnd = get_random_u32(); attr &= ATTR_MSK; attr = XO_SET_ATTR_AI_ALG(attr, rnd % XO_AI_TOT, (rnd >> 16) % XO_AI_TOT); xo_tlb->attr = attr; } } } } ``` ### 對弈 當對弈的函式執行的時候,從 attribute 取出索引給 ai_algs 取得 AI 演算法來呼叫即可。 ```diff static void ai_one_work_func(struct work_struct *w){ /* ... */ int move; - WRITE_ONCE(move, mcts(table, CELL_O)); + int alg = XO_ATTR_AI_ALG(attr) & 3; + WRITE_ONCE(move, ai_algs[alg](table, CELL_O)); /* ... */ } static void ai_two_work_func(struct work_struct *w) { /* ... */ int move; - WRITE_ONCE(move, negamax_predict(table, CELL_X).move); + int alg = XO_ATTR_AI_ALG(attr) >> 2; + WRITE_ONCE(move, ai_algs[alg](table, CELL_X)); /* ... */ } ``` # 從 ttt 移植 Reinforcement Learning > Commit: [28a3a73](https://github.com/LambertWSJ/kxo/commit/28a3a73e76c0690ef5a5bcbd50150727fe8c1bc8) 參考 [ttt](https://github.com/jserv/ttt)/[train.c](https://github.com/jserv/ttt/blob/main/train.c) 怎麼初始化和更新狀態空間,適合做為移植到核心模組的範例。 移植強化學習到核心模組要解決下面的問題 - 浮點數運算換成定點數 - 棋盤的狀態從陣列轉成編碼版 - 配置 $4\times4$ 狀態空間需要很大的空間,超出 `kmalloc` 的限制 $\to$ $3^{16}=43046721$ 個狀態 ### 浮點數運算換成定點數 RL 用到的浮點數運算只用到加減乘除,沒有 MCTS 那麼複雜,因此轉換成定點數運算不會太困難,利用現有的程式碼修改後,在 userspace 實驗完就再把程式碼合併到核心模組即可。 為了充分運用現有程式碼,將 `game.h` 定義好的 `fixed_point_t` 改為 Q16.16. 的格式,作為 RL 使用的定點數。 定點數加減可以直接運算,乘法運算完要除以 $b^n$ ,除法運算完則要除以 $b^n$。RL 移植只需要用到乘法,如果乘數不是定點數的話則要乘以定點數的 1.0(`RL_FIXED_1`) 轉換成定點數。 ```c #define FIXED_SCALE_BITS 16 #define FIXED_MAX (~0U) #define FIXED_MIN (0U) #define RL_FIXED_1 (1 << FIXED_SCALE_BITS) typedef unsigned fixed_point_t; static inline fixed_point_t fixed_mul(fixed_point_t a, fixed_point_t b) { return ((s64) a * b) >> FIXED_SCALE_BITS; } static inline fixed_point_t fixed_mul_s32(fixed_point_t a, s32 b) { b = ((s64) b * RL_FIXED_1); return fixed_mul(a, b); } ``` ### 配置狀態空間 狀態空間 state_value 要配置的空間太大,無法用 kmalloc,只能採用 vmalloc 配置一大塊實體上不連續的記憶體,狀態空間配置完後,用 `get_score` 作為 Value function 計算這個玩家在所有可能的狀態下的初始動作可以獲得的分數,比其從零開始的狀態空間,這麼做可以讓 agent 對每場可能的棋局有概括的資訊,學習效率自然也會比零基礎的 agent 來的快。 ```c void init_rl_agent(rl_agent_t *agent, unsigned int state_num, char player) { agent->player = player; agent->state_value = vmalloc(sizeof(fixed_point_t) * state_num); if (!(agent->state_value)) pr_info("Failed to allocate memory"); for (unsigned int i = 0; i < state_num; i++) { agent->state_value[i] = fixed_mul_s32( INITIAL_MUTIPLIER, get_score(hash_to_table(i), player)); } } ``` 但是這麼做也是有缺點的,就是初始化狀態空間的速度太慢,如果 `insmod` 馬上 `xo-user`,會很明顯卡住,因為要設定 $3^{16}$ 的初始值需要一段時間,後來改用 kthread 去初始化狀態空間,等初始化完才加入到遊戲,再等隨機索引動態切換成 RL。 ### 對弈 RL agent 下棋是用 `play_rl` 回傳玩家在這個棋局評估的下一步的落子位置,get_action_exploit 是在所有空格下一次棋,評估哪一格會有最高的分數,就是下一步要落子的位置。 ```c static int get_action_exploit(unsigned int table, const rl_agent_t *agent) { int max_act = -1; fixed_point_t max_q = FIXED_MIN; const fixed_point_t *state_value = agent->state_value; int candidate_count = 1; int last_e = 0; for_each_empty_grid(i, table) { last_e = i; table = VAL_SET_CELL(table, i, agent->player); fixed_point_t new_q = state_value[table_to_hash(table)]; if (new_q == max_q) { ++candidate_count; if (get_random_u32() % candidate_count == 0) { max_act = i; } } else if (new_q > max_q) { candidate_count = 1; max_q = new_q; max_act = i; } table = VAL_SET_CELL(table, i, CELL_EMPTY); } /* If there is no best action, then choose the last empty cell */ return max_act == -1 ? last_e : max_act; } unsigned int play_rl(unsigned int *table, const rl_agent_t *agent) { unsigned int tlb = *table; int move = get_action_exploit(tlb, agent); *table = VAL_SET_CELL(tlb, move, agent->player); return move; } ``` 上面的修改其實也是錯的,如果是棋盤沒有空格,會回傳 0,但這樣是不合法的,只能保證不會出現 UB。 如果初始化 RL agent 沒有設定狀態空間的初始值,而是用記憶體殘值來當做初始值,會造成 get_action_exploit 回傳 -1,導致執行時期出現未定義行為。 ``` [ 81.123328] UBSAN: shift-out-of-bounds in game.h:86:40 [ 81.123332] shift exponent -2 is negative ``` #### AI 下棋 `play_rl` 的介面,沒有辦法加入到查表法理面,所以在 `ai_one/two_func_work` 裡面只能用判斷式知道目前的 AI 演算法是不是用 RL,如果是 RL 且下一步 (move) 是合法的話就更新 episode_moves 和 reward。 episode_moves $\to$ 紀錄遊戲每一步落子後的棋局,將這場棋局轉換成狀態空間索引,後面才知道要更新的是哪個狀態。 <!-- reward $\to$ 紀錄這一步評估出來的分數乘上 --> ```c static void ai_one_work_func(struct work_struct *w) { int id = XO_ATTR_ID(attr); int alg = XO_ATTR_AI_ALG(attr) & 3; bool is_rl = alg == XO_AI_RL; WRITE_ONCE(move, is_rl ? play_rl(&table, &rl_agents[0]) : ai_algs[alg](table, CELL_O)); if (move != -1) { WRITE_ONCE(xo_tlb->table, VAL_SET_CELL(table, move, CELL_O)); WRITE_ONCE(xo_tlb->moves, SET_RECORD_CELL(xo_tlb->moves, move, steps)); if (is_rl) { table = xo_tlb->table; u8 win = check_win(table); episode_moves[id][steps] = table_to_hash(table); fixed_point_t score = fixed_mul_s32((RL_FIXED_1 - REWARD_TRADEOFF), get_score(table, CELL_O)); reward[id][steps] = score + calculate_win_value(win, CELL_O); } WRITE_ONCE(xo_tlb->attr, XO_SET_ATTR_STEPS(attr, steps + 1)); } /* ... */ } ``` ### 更新狀態空間 當分出勝負之後,就用 episode_moves 和 reward 來更新 agent 的狀態空間 ```c static void timer_handler(struct timer_list *__timer) { /* ... */ for (int i = 0; i < N_GAMES; i++) { struct ai_game *game = &games[i]; struct xo_table *xo_tlb = &game->xo_tlb; unsigned int attr = xo_tlb->attr; uint8_t win = check_win(xo_tlb->table); if (win == CELL_EMPTY) unfini |= (1u << i); else { int winner = win - 1; pr_info("kxo: game-%d %c win!!!\n", id, cell_tlb[winner]); if (attr_obj.end == '0') { int steps = XO_ATTR_STEPS(xo_tlb->attr); u32 rnd = get_random_u32(); attr &= ATTR_MSK; attr = XO_SET_ATTR_AI_ALG(attr, rnd % XO_AI_TOT, (rnd >> 16) % XO_AI_TOT); xo_tlb->attr = attr; xo_tlb->table = 0; xo_tlb->moves = 0; fixed_point_t next = 0; for (int j = steps - 1; j >= 0; j--) next = update_state_value(episode_moves[i][j], reward[i][j], next, &rl_agents[winner]); } } } /* ... */ } ``` ## 重構函式 > Commit: [8e34ecd](https://github.com/LambertWSJ/kxo/commit/8e34ecd66dde7d559a97f59bc581d708ab2975ee) 初步移植 RL 到核心模組的任務完成後,開始去蕪存菁整理程式碼,修掉下面已知的問題 - 封裝 RL agent 在內部模組裡面,不要接露到外部 - play_rl 可能回傳錯誤的結果,而且函式介面無法和查表法定義的型態不符 - 初始化造成核心模組初始化卡住,應改用 kthread 處理,初始化完畢再投入到遊戲 - `update_state_value` 不應該每次都傳值進去更新,應傳入陣列和 steps,在函式內部一口氣更新完 state value ### 封裝 RL agent RL agent 只有下 O 或 X 的玩家,函式介面可以用 player 傳入 CELL_O 或 CELL_X 來索引 agent,不需要傳指標到 RL 模組的函式。 ```diff -unsigned int play_rl(unsigned int *table, const rl_agent_t *agent); +int play_rl(unsigned int table, char player); -void init_rl_agent(rl_agent_t *agent, unsigned int state_num, char player); +void init_rl_agent(unsigned int state_num, char player); fixed_point_t update_state_value(int after_state_hash, fixed_point_t reward, fixed_point_t next, - rl_agent_t *agent); + char player); ``` init_rl_agent 可以省下一個參數傳遞,其餘從指標 `agent` 改成字元 `player` 對於 CPU 來說沒有函式呼叫成本的差異,但是在寫程式上,會比較簡單些,只要傳入 CELL_O/CELL_X,不需要用 address of 運算子寫 &rl_agent。 以 play_rl 來說,第二行改進後,程式碼好讀很多,清楚知道是 CELL_X 的玩家要下棋,也沒有像第一行有很多符號還有思考索引 1 是哪個玩家。 ```c play_rl(&table, &rl_agents[1]); play_rl(table, CELL_X); ``` ### 重構 `play_rl` 觀察原本的 `play_rl` 和 `ai_one/two_work_func` 的流程,發現 move 已經在外面判斷是否等於 -1,所以不用修改原版的 get_action_exploit 也不用在 play_rl 裡面更新 table 也能保證是正確的結果。 ```c static void ai_one_work_func(struct work_struct *w) { int move; int alg = XO_ATTR_AI_ALG(attr) & 3; bool is_rl = alg == XO_AI_RL; WRITE_ONCE(move, is_rl ? play_rl(&table, &rl_agents[0]) : ai_algs[alg](table, CELL_O)); if (move != -1) { WRITE_ONCE(xo_tlb->table, VAL_SET_CELL(table, move, CELL_O)); WRITE_ONCE(xo_tlb->moves, SET_RECORD_CELL(xo_tlb->moves, move, steps)); /* ... */ } } ``` 那麼 play_rl 可以直接回傳 get_action_exploit 的結果就好了。 ```c= int play_rl(unsigned int table, char player) { return get_action_exploit(table, player); } int play_rl(unsigned int table, char player) __attribute__((alias("get_action_exploit"))); ``` 第五行是看到原本既然要回傳 `get_action_exploit` 不如用 alias 好了,寫完又發現不如直接把 `get_action_exploit` 改名為 `play_rl` 就好。 ```diff -static int get_action_exploit(unsigned int table, const rl_agent_t *agent) { +int play_rl(unsigned int table, char player) { int max_act = -1; /* ... */ for_each_empty_grid(i, table) { table = VAL_SET_CELL(table, i, agent->player); /* ... */ table = VAL_SET_CELL(table, i, CELL_EMPTY); } return max_act; } ``` 修好 play_rl 的型態後,可以把它加進 `ai_algs` 做查表。第 11 行還是要做判斷是不是 RL,因為改成 RL 初始化完狀態空間才能夠使用,還有如果是 RL 才會記錄 episode_moves 和 reward。 ```c= static ai_alg ai_algs[XO_AI_TOT] = { [XO_AI_MCTS] = mcts, [XO_AI_NEGAMAX] = negamax_predict, [XO_AI_RL] = play_rl, }; static void ai_one_work_func(struct work_struct *w) { /* ... */ int move; int alg = (XO_ATTR_AI_ALG(attr) % (XO_AI_TOT - !rl_inited)); bool is_rl = alg == XO_AI_RL && rl_inited; pr_debug("[two]: alg=%d, rl_init=%d\n", alg, rl_inited); WRITE_ONCE(move, ai_algs[alg](table, CELL_O)); if (move != -1) { WRITE_ONCE(xo_tlb->table, VAL_SET_CELL(table, move, CELL_O)); WRITE_ONCE(xo_tlb->moves, SET_RECORD_CELL(xo_tlb->moves, move, steps)); if (is_rl) { table = xo_tlb->table; u8 win = check_win(table); episode_moves[id][steps] = table_to_hash(table); fixed_point_t score = fixed_mul_s32((RL_FIXED_1 - REWARD_TRADEOFF), get_score(table, CELL_O)); reward[id][steps] = score + calculate_win_value(win, CELL_O); } } /* ... */ } ``` ### RL agents 初始化 要避免載入核心模組期間卡在初始化 RL agent 狀態空間的問題,只能讓這個任務委派給 kthread 來執行,做完再加入到遊戲中對弈和訓練。 因為初始化要一段時間,不可能在載入核心模組的時候就馬上做完,所以 tot_alg 才會是演算法總數減一,避免 ai_game 一開始就選到 RL 導致核心模組崩潰。 ```c static int __init kxo_init(void) { rl_inited = false; rl_init_thr = kthread_run(init_agents, NULL, "init_rl_agents"); if (IS_ERR(rl_init_thr)) { ret = -ENOMEM; goto error_kthread; } /* RL state space not yet init */ const int tot_alg = XO_AI_TOT - 1; for (int i = 0; i < N_GAMES; i++) { unsigned int attr; u32 rnd = get_random_u32(); struct ai_game *game = &games[i]; game->xo_tlb.table = 0; attr = i; /* set game ID in attr */ attr = XO_SET_ATTR_AI_ALG(attr, rnd % tot_alg, (rnd >> 16) % tot_alg); game->xo_tlb.attr = attr; game->turn = 'O'; /* ... */ } } ``` 給 kthread 初始化 RL agent 函式為 `init_agents`,當初始化完兩位 agent 就對 rl_inited 寫入 true,表示狀態空間初始化完畢,可以讓 agent 加入遊戲。 <!-- kthread usage --> ```c static int init_agents(void __attribute__((unused)) * arg) { pr_debug("[%s] init...\n", __FUNCTION__); int state_sum = 1; CALC_STATE_NUM(state_sum); init_rl_agent(state_sum, CELL_O); init_rl_agent(state_sum, CELL_X); pr_debug("[%s] init done!\n", __FUNCTION__); WRITE_ONCE(rl_inited, true); kthread_complete_and_exit(&rl_comp, 0); } ``` 卸載核心模組的時候,如果還沒 thread 還沒結束的話,則要等它執行完畢,才能夠釋放 RL 狀態空間佔用的記憶體。 ```c static void __exit kxo_exit(void) { wait_for_completion(&rl_comp); free_rl_agent(CELL_O); free_rl_agent(CELL_X); pr_info("kxo: unloaded\n"); } ``` ### 改進 `update_state_value` 原版是每次更新都要呼叫 `update_state_value`,最差的情況下要呼叫 16 次,應改為傳入 steps 和陣列,在函式內部做完更新的操作。 稍作修改後把原本 update_state_value 改名為 step_update_state_value,這樣就不用把複雜的程式碼搬到同個函式裡面做改名和排版,程式碼看上去也很直觀,還能夠交由編譯器把 step_update_state_value 展開到 update_state_value 底下。 ```c static inline fixed_point_t step_update_state_value(int after_state_hash, fixed_point_t reward, fixed_point_t next, char player) { rl_agent_t *agent = &rl_agents[player - 1]; fixed_point_t curr = reward - fixed_mul(GAMMA, next); // curr is TD target in TD learning // and return/gain in MC learning. agent->state_value[after_state_hash] = fixed_mul((RL_FIXED_1 - LEARNING_RATE), agent->state_value[after_state_hash]) + fixed_mul(LEARNING_RATE, curr); return agent->state_value[after_state_hash]; } void update_state_value(const int *after_state_hash, const fixed_point_t *reward, int steps, char player) { mutex_lock(&rl_locks[player - 1]); fixed_point_t next = 0; for (int j = steps - 1; j >= 0; j--) next = step_update_state_value(after_state_hash[j], reward[j], next, player); mutex_unlock(&rl_locks[player - 1]); } ``` ### 降低狀態空間 4x4 井字遊戲的 RL agent 需要 $3^{16}$ 個狀態,這些狀態包含下面幾種分類 * 合理的棋譜 * X 或 O 的先手 * X 或 O 勝利的棋譜 * 對弈中或平手 * 犯規的棋譜 * 全是 O 或 X * 其中一個旗子下的更多 雖然初步移植成功,但想到是在 kernel space 配置這麼一大塊空間,而且無論會不會用到,每個狀態都要計算初始值,RL 才算是初始化完畢,如果沒有用多執行緒,核心模組在初始化階段就會阻塞住。 這些顯著的缺點讓我耿耿於懷,有沒有什麼辦法解決這些問題?下面是我一步步分析後,想到的一個折衷方案,雖無法涵蓋全部的狀態空間,但是大幅減小對記憶體的需求,而且不需要 kthread 馬上就能使用。 kxo 是兩位玩家輪流落子,因此第一步先分析合理的棋譜有幾個?先估計要配置多大的空間給 RL agent 使用,後面再想辦法把棋譜映射到狀態。 撰寫[模擬程式](https://gist.github.com/LambertWSJ/ac1cb9f34c936eb1a9cc45e593cf5da6)計算所有合法的棋譜有 6036017 個,約佔了所有狀態空間的 14%,即便狀態空間降低了很多,但是對 kernel 來說,依舊是不小的要求。 估算好所需的空間後,來思考一下如何把棋譜映射對應的狀態?目前的棋譜都編碼成 32 位元的整數,所以能夠使用 key-value 的資料結構來更新狀態空間,立刻能想到紅黑樹和雜湊表 key $\to$ table value $\to$ `O` 和 `X` 的狀態空間中的分數 如果 table 存在的話就返回 agent 的分數來做下棋或更新分數,沒有的話就配置一個新狀態空間並計算初始分數再返回,總之**一定會涵蓋所有狀態,還不會在初始化的時候就不會阻塞整個模組運行**。 接著來分析 kxo 的情境來選擇紅黑樹或雜湊表來管理狀態。 <!-- TODO: 分析 rbtree, ht --> 執行 `xo-user` 的時候發現有些棋譜是反覆出現的,這 603 萬個狀態空間或許有很多棋譜是很少在實際情況下出現的,甚至幾乎不會出現的,那麼該如何從這些冷門棋譜中在省出一點空間來用?同時又能一直更新常出現的熱門棋譜。 為此我想到一個折衷方案,不需要一口氣配置所有狀態空間,而是決定最多要保留幾個狀態,並配置對應大小的記憶體,當記憶體滿了,再清掉一定比例的冷門棋譜就好。 記憶體管理:配置狀態陣列搭配 bitmap 來管理下面這些事情 - 哪個位置可以用 - 目前總共有多少個狀態 - 配置和刪除時更新對應位置的 bit bitmap 中 1 表示 st_buff 使用中,反之則是可以用的空間 ```c static unsigned long st_map[ST_MAP_SZ] = {0}; void init_rl_agent(void) { LIST_HEAD(order); size_t node_sz = MAX_STATES * sizeof(struct xo_state); st_buff = vzalloc(ALIGN(node_sz, PAGE_SIZE)); } ``` 冷門與熱門棋譜:運用鏈結串列 `order` 新增和移動節點達到相對冷門和熱門 選用串列的話,可以定義成愈後面的節點愈熱門,反之愈開頭的節點愈冷門,當 table 有映射到狀態的話,就把狀態往後移,如果沒有狀態的話,就把新增的狀態放在開頭,刪除時直接從頭開始刪掉一定比例的節點就可以了。 如果剛 table 好找到對應的狀態,就把這個狀態移動到串列尾端,並回傳這個狀態的分數。 ```c rl_fxp *find_rl_state(const u32 table) { const u32 key = hash_32(table, HT_BITS); struct xo_state *st = NULL; /* search state */ hash_for_each_possible(states, st, link, key) { if (st->table == table) { list_move_tail(&st->list, &orders); return st->scores; } } /* ... */ } ``` 如果 table 沒有找到對應的狀態,表示沒有記錄到,可能過有出現過,但是被刪掉了,要建立一個新的狀態。 先檢查 `st_buff` 有沒有足夠的空間可以建立新的節點,用 `bitmap_weight` 可以快速得知 `st_buff` 目前已經配置幾個節點,如果滿了就清掉一定比例的節點,再從第一個可以使用的位置建立新的節點並更新 bitmap 再完成下面這些事情 - 建立節點 $\to$ 設定 table, `O` 和 `X` 在這個狀態下的初始值 - 加入到 hash table 和 linked list - 返回新的節點 ```c static void clean_state(void) { struct xo_state *st, *safe; u32 pos, i = 0; list_for_each_entry_safe(st, safe, &orders, list) { if (i >= CLEAN_N_STATES) break; pos = ((uintptr_t) st - (uintptr_t) st_buff) / sizeof(struct xo_state); hash_del(&st->link); list_del(&st->list); st->table = st->scores[AGENT_O] = st->scores[AGENT_X] = 0; bitmap_clear(st_map, pos, 1); i++; } } rl_fxp *find_rl_state(const u32 table) { /* ... */ /* check states size */ const u32 tot = bitmap_weight(st_map, ht_len); if (tot >= (MAX_STATES - 1)) clean_state(); /* add state into hashtable */ u32 pos = find_first_zero_bit(st_map, ht_len); st = st_buff + pos; bitmap_set(st_map, pos, 1); st->table = table; st->scores[AGENT_O] = fixed_mul_s32(INITIAL_MUTIPLIER, get_score(table, CELL_O)); st->scores[AGENT_X] = fixed_mul_s32(INITIAL_MUTIPLIER, get_score(table, CELL_X)); INIT_LIST_HEAD(&st->list); list_add(&st->list, &orders); hash_add(states, &st->link, key); return st->scores; } ``` 實作好找出狀態分數的函式後,更新 RL 模組用的 `play_rl` 和 `update_state_value`。 ```c int play_rl(unsigned int table, char player) { u8 id = player - 1; for_each_empty_grid(i, table) { table = VAL_SET_CELL(table, i, player); rl_fxp new_q = find_rl_state(table)[id]; if (new_q == max_q) { ++candidate_count; /* ... */ } return max_act; } static inline rl_fxp step_update_state_value(int after_state_hash, rl_fxp reward, rl_fxp next, u8 player) { rl_fxp curr = reward - fixed_mul(GAMMA, next); rl_fxp *scores = find_rl_state(after_state_hash); scores[player] = fixed_mul((RL_FIXED_1 - LEARNING_RATE), scores[player]) + fixed_mul(LEARNING_RATE, curr); rl_fxp *scores = find_rl_state(after_state_hash); return scores[player]; } ``` 為了練習 API 我還有再另外實作紅黑樹版本,可用編譯來切換不同的實作。 ```shell $ make RL_USE_HT=NO ``` # 引入若干 Coroutine 顯示多場遊戲 > Commit: [55ab755](https://github.com/LambertWSJ/kxo/commit/55ab755723dbefbff35f3b15fce5307c46a5519a) # 現階段成果 ![output](https://hackmd.io/_uploads/S1_br4teZg.gif) # 後續改進 - 目前的 TUI 要在夠大終端機才能完整顯示,不然會印出一大堆亂碼,應設計成像 RWD 網頁,在不同的寬高會調整 UI 排版或顯示方式。 - TUI 還是會出現一大片紅色或是有時候顯示不全,推測是因為 outbuf 到達閾值後,剛好 buffer 的最後又是寫到一半的控指碼就輸出到 stdout,暫時沒想到方法來處理這問題。 - 核心模組和使用者溝通成本降低,但是沒有做呼叫頻率的控制導致頻繁的向核心模組讀取資料,造成 System Call Overhead,有時候顯示速度過快,中間的過程沒有一步步印出來。