
# 2025q1 Homework 3 (kxo)
contributed by <[LambertWSJ](https://github.com/LambertWSJ)>
[Demo 影片](https://youtube.com/shorts/-Sv_Q79XyAg?feature=share)
# 準備工作
## 合併 compilation database
[clangd](https://clangd.llvm.org/) 搭配 compilation database (`compile_commands.json`) 方便開發者追蹤原始碼,為方便書寫,本文 compilation database 改為 compilation DB。
通常 Kernel API 的說明在實作的註解比較容易找到,標頭檔比較少會寫,如果希望可以跳到 Kernel API 的實作而不是跳到標頭檔的宣告,可以 clone 和開發核心模組相同 Kernel 版本的原始碼,然後再編譯出 Kernel 的 compilation DB。
資列夾結構如下,不必將 linux 原始碼 clone 在 kxo 裡面,在其它路徑也可以,依個人程式碼管理習慣調整即可。
```
/home/lambert-wu/repos
├── kxo
│ └── compile_commands.json
└── linux
└── compile_commands.json
```
將 linux 的 compilation DB 複製到 kxo,再用 [jq](https://jqlang.org/manual/) 合併 linux 和 kxo 的 compilation DB
```shell
kxo$ cp compile_commands.json kxo_db.json
kxo$ cp ~/repos/linux/compile_commands.json linux_db.json
kxo$ jq -s 'add' ./linux_db.json kxo_db.json > compile_commands.json
```
最好要備份合併完的 compile_commands.json,否則 make clean 會把它刪除。
接著 restart clangd 應該會看到 VSCode 底下的 status bar 在建立索引(indexing)中

如果你的 linux 資料夾沒有 `.cache` 的話,才會看到有幾千個檔案在建立索引,否則 clangd 只會幫 kxo 建立索引,如果跳轉有問題的話,可以把 `.cache` 刪掉再 restart 一次
### issue
原本在 kxo 用 .clangd 設定兩個 fragments(`---` 區隔),但這樣會導致 User space 的程式也使用 linux 的 compilation DB,User space API 和 headers 會出現 not found 的錯誤,過去有參考 clangd 的 [issue](https://github.com/clangd/clangd/issues/1092) 設定 PathMatch 也沒試成功,暫時沒有設定 .clangd 的解法,剩下繞點彎路的用 jq 合併 compilation DB 的辦法。
這樣的 .clangd 會用 linux kernel 的 compilation DB 而不是當前 project 的。
```yaml
CompileFlags:
Add: [-xc++, -Wall]
---
CompileFlags:
CompilationDatabase: /home/lambert-wu/repos/linux
```
# 縮減核心模組和使用者的通訊成本
>[commit: 84ca1db](https://github.com/LambertWSJ/kxo/commit/84ca1dbec5c33c7132da5d8ea540096c721d3dae)
kxo 在模組處理好棋盤畫面 `draw_buffer`,再等待使用著讀取,但是這樣的通訊成本太大了,一次需要讀取 66 位元組,應改為讀取棋盤的狀態,再交由使用者繪製棋盤,為此要設計編碼來描述棋盤的狀態。
井字遊戲的棋盤的每一格 3 種狀態 $\to$ `O`, `X`, ` `(空格),可以用 2 個位元來描述這 3 種狀態,kxo 棋盤有 16 格,剛好可以用 32 位元的數值來儲存。
| 格子狀態 | 編碼 |
| -------- | ------ |
| 空格 | $00_2$ |
| `O` | $01_2$ |
| `X` | $10_2$ |
空格除了用 0 很直觀外,AI 演算法會計算對手勝率時,會用 xor 做切換棋手的操作,而任一數值對 0 做 xor 還是保持原值,這樣的 AI 不會考慮別人,只會想到自己。
原本想用 32 位元的整數拆成 2 個 16 位元來描述執 `O` 和 `X` 玩家的棋盤,1 表示自己下的棋子,0 表示沒有棋子,但是 0 不代表對手沒有落子在那一格上面,要額外用對手的棋盤狀態來判斷,才知道 0 是不是真的沒有棋子在上面,這樣的編碼會需要更多的 bitops,所以改用 2 位元描述每一格的狀態,更能夠直觀的得知棋局全貌。
編碼決定好之後,需要考慮下面的問題
- 存取棋盤 $\to$ 讀取/寫入格子
- 如何判斷勝利和對弈中
- xo-user 如何依據這套編碼繪製棋盤
- 如何讓 AI 玩家使用這套編碼
## 存取棋盤
```c
#define CELL_EMPTY 0u
#define CELL_O 1u
#define CELL_X 2u
#define CELL_D 3u
#define GEN_O_WINMASK(a, b, c) \
((CELL_O << (a * 2)) | (CELL_O << (b * 2)) | (CELL_O << (c * 2)))
#define TABLE_SET_CELL(table, pos, cell) (table |= (cell) << ((pos) * 2))
#define TABLE_GET_CELL(table, pos) \
({ \
__typeof__(pos) _pos = (pos) * 2; \
((table & (0b11 << _pos)) >> _pos); \
})
```
<!--
說明為什麼這樣設計 GET/SET,why __typeof__ instead of typeof
[Alternate Keywords (Using the GNU Compiler Collection (GCC))](https://gcc.gnu.org/onlinedocs/gcc/Alternate-Keywords.html#Alternate-Keywords)
[Clang Compiler User’s Manual — Clang 22.0.0git documentation](https://clang.llvm.org/docs/UsersManual.html#c-language-features)
-->
#### 繪製棋盤
將繪製棋盤的任務從 kernel module 交遊 user space 負責,kernel module 專注在對弈。`draw_board` 依據 table 上每一格的編碼用查表法轉換成對應的字元,寫入到 display_buf 裡面。
```c=
static int draw_board(unsigned int table)
{
char cell_tlb[] = {' ', 'O', 'X'};
unsigned int i = 0, k = 0;
display_buf[i++] = '\n';
display_buf[i++] = '\n';
while (i < DRAWBUFFER_SIZE) {
for (int j = 0; j < (BOARD_SIZE << 1) - 1 && k < N_GRIDS; j++) {
display_buf[i++] =
j & 1 ? '|' : cell_tlb[TABLE_GET_CELL(table, k++)];
}
display_buf[i++] = '\n';
for (int j = 0; j < (BOARD_SIZE << 1) - 1; j++) {
display_buf[i++] = '-';
}
display_buf[i++] = '\n';
}
return 0;
}
```
`while` 裡面將棋盤上的每一列寫入到 buffer 裡面。第 8 行在一開始先寫入格子的狀態,彼此用 `|` 區隔 (e.g. `O| | |X`),接著加入換行符號 `\n`,14 行的迴圈繪製底線 `-`,畫完之後再換行。
因此進入下一次 while 的判斷式之前,會寫入好一列的棋盤到 buffer,如下所示
```
O| | |X
--------
```
第 10 行,`TABLE_GET_CELL` 的 MACRO 傳入 k++,一般來說不建議傳入遞增/遞減運算子到巨集,因為很容易造成 double evaluation 的問題,針對這個問題,可以應用 [Linux 核心原始程式碼巨集: max, min](https://hackmd.io/@sysprog/linux-macro-minmax) 提到的 typeof 技巧避免這個問題。
```c
#define TABLE_GET_CELL(table, pos) \
({ \
__typeof__(pos) _pos = (pos) * 2; \
((table & (0b11 << _pos)) >> _pos); \
})
```
## 判斷勝利
這套編碼中,`O` 和 `X` 只差一個 bit,只要位移一個 bit 就會換成對手的旗子,換句話說,勝利棋局只要存 `O` 或 `X` 其中一組就能夠檢查是那個玩家勝利或是還在對弈中。
將所有 `O` 勝利的遮罩存入 `win_patterns`,檢查勝利時再用 `table` 和每一個勝利的 pattern 做 AND 操作,如果等於勝利的棋局,就表示贏了,只要將 pattern 左移 1 bit 就能檢查 `X` 是不是贏了,所有 pattern 都檢查完,沒有勝利的話表示還在對弈中,按原本的實作用 CELL_EMPTY 表示 `' '`。
```c
static uint32_t win_patterns[] = {
/* ROW */
GEN_O_WINMASK(0, 1, 2),
GEN_O_WINMASK(1, 2, 3),
GEN_O_WINMASK(4, 5, 6),
/* ... */
};
char check_win(unsigned int table)
{
int len = ARRAY_SIZE(win_patterns);
for (int i = 0; i < len; i++) {
unsigned int patt = win_patterns[i];
/* check O is win */
if ((table & patt) == patt)
return CELL_O;
/* check X is win */
patt <<= 1;
if ((table & patt) == patt)
return CELL_X;
}
for_each_empty_grid(i, table)
return CELL_EMPTY;
return CELL_D;
}
```
原本的實作沒有用到 `for_each_empty_grid`,於是改寫 MACRO 後,取代原本 for 迴圈的寫法。
```c
#define for_each_empty_grid(i, table) \
for (int i = 0; i < N_GRIDS; i++) \
if (TABLE_GET_CELL(table, i) == CELL_EMPTY)
```
### 改進
>Commit: [0596c8e](https://github.com/LambertWSJ/kxo/commit/0596c8e1aced514f8c4f488adf15219029cb5137)
#### 預先計算表格
<!-- TODO -->
將 `win_patterns` 寫死在表格是不良的慣例(bad practice),如果表格從 4x4 擴展到 10x10,程式碼會有更大一片區域都是勝利條件的遮罩,顯然該預先計算好所有 NxN 的 win patterns 再遊戲開始前計算好。
#### 簡化判斷勝利
table 改成數值後,如果是 0 就可以直接回傳 `CELL_EMPTY`,不用進到迴圈裡面判斷勝利條件,如果進到迴圈檢查出棋局還沒分出勝負的話,再用 `detect_empty_cell` 檢查棋盤上是否還有可以落子的空格。
```c
char check_win(unsigned int table) {
if (!table)
return CELL_EMPTY;
int len = ARRAY_SIZE(win_patterns);
for (int i = 0; i < len; i++) {
unsigned int patt = win_patterns[i];
/* check O is win */
if ((table & patt) == patt)
return CELL_O;
/* check X is win */
patt <<= 1;
if ((table & patt) == patt)
return CELL_X;
}
return detect_empty_cell(table) ? CELL_EMPTY : CELL_D;
}
```
`detect_empty_cell` 的實做如下,`lo` 抓出低位 `0b01`,`hi` 抓出高位 `0b10`,如果高位 `hi` 右移 1 個位元和 `lo` 做 or 運算不等於 `0x55555555` 表示棋盤還有空格。
```c
static int detect_empty_cell(unsigned int table) {
unsigned int lo = table & 0x55555555;
unsigned int hi = table & 0xAAAAAAAA;
return (lo | (hi >> 1)) != 0x55555555;
}
```
<!-- TODO: -->
<!-- 以 `table = 0x` 為例, -->
## 應用編碼到 AI 演算法實作
kxo 運用 [MCTS](https://en.wikipedia.org/wiki/Monte_Carlo_tree_search) 和 [Negamax](https://en.wikipedia.org/wiki/Negamax) 兩種 AI 演算法進行對奕,彼此共用一個 16 位元組的棋盤 `table`,改寫為 4 byte 的整數,要小心翼翼的把原本用字元 `'O'`, `'X'` 表示的棋子,還有存取陣列的操作都轉換為 bitops,一個不小心會變成一個 AI 在落子,另一個 AI 不下棋的情況。
改為用 4 位元組整數 table 傳進演算法裡面做運算,紀錄棋盤後,帶來下面的好處
- 優化複製和初始化的速度
MCTS 使用的 `simulate` 和 `mcts` 兩個函式會複製 table,原本的實作要呼叫 `memcpy` 複製整個棋盤,編碼因為是整數,只要指派就完成複製。
```diff
-static fixed_point_t simulate(const char *table, char player)
+static fixed_point_t simulate(uint32_t table, char player)
{
char current_player = player;
- char temp_table[N_GRIDS];
- memcpy(temp_table, table, N_GRIDS);
+ uint32_t temp_table = table;
/* ... */
}
```
- 降低函式呼叫成本
原本的 `table` 以指標傳遞,實際參數大小依架構而異,在 32 位元系統上為 4 byte,在 64 位元系統上為 8 byte。改用整數型別後,雖然在 32 位元架構下與指標大小相同,但在 64 位元架構下,可節省 4 byte 的參數傳遞空間,進一步提升呼叫效率。
### 優化 `get_score`
>Commit [8b5b16f](https://github.com/LambertWSJ/kxo/commit/8b5b16f780810c231e31955fb5d04e32b3c7eaf8)
<!-- TODO: -->
## 效能分析
這裡分兩部分測量,測量核心模組和使用者程式 `xo-user`,前者用 ftrace 測量,後者用 perf,各測量 60 秒
<!-- TODO explain -->
### 使用者程式
#### 編碼前
```
Performance counter stats for './xo-user -t 60' (3 runs):
279 context-switches ( +- 0.90% )
43,417,295 cpu_atom/cycles/ ( +- 0.96% ) (80.72%)
47,978,573 cpu_core/cycles/ ( +- 1.59% ) (19.28%)
15,568,811 cpu_atom/instructions/ # 0.36 insn per cycle ( +- 1.41% ) (80.72%)
20,383,916 cpu_core/instructions/ # 0.42 insn per cycle ( +- 0.94% ) (19.28%)
798,963 cpu_atom/cache-misses/ ( +- 2.54% ) (80.72%)
851,619 cpu_core/cache-misses/ ( +- 5.52% ) (19.28%)
64 page-faults ( +- 1.38% )
378 syscalls:sys_enter_read ( +- 0.54% )
60.002514 +- 0.000404 seconds time elapsed ( +- 0.00% )
```
#### 編碼後
```
Performance counter stats for './xo-user -t 60' (3 runs):
602 context-switches ( +- 0.35% )
80,425,744 cpu_atom/cycles/ ( +- 0.94% ) (94.11%)
109,147,313 cpu_core/cycles/ ( +- 2.67% ) (5.89%)
27,768,690 cpu_atom/instructions/ # 0.35 insn per cycle ( +- 0.28% ) (94.11%)
48,707,777 cpu_core/instructions/ # 0.45 insn per cycle ( +- 3.10% ) (5.89%)
1,392,524 cpu_atom/cache-misses/ ( +- 0.67% ) (94.11%)
1,509,690 cpu_core/cache-misses/ ( +- 3.28% ) (5.89%)
64 page-faults ( +- 1.04% )
581 syscalls:sys_enter_read
60.0996 +- 0.0768 seconds time elapsed ( +- 0.13% )
```
編碼後只需要 4 byte,核心複製資料到使用者空間更快,1 分鐘內呼叫的次數比編碼前高了 53%,但是也因為每次只讀了 4 byte,因此會更頻繁的像核心模組要資料,導致更多的 context switches,
### 核心模組
核心模組會測量編碼相關的函式,觀察改寫成棋盤編碼後能為效能帶來多少的改進。執行 `xo-user` 60 秒,在這期間用 ftrace 一次測量一個函式。
為了避免表格出現橫向卷軸,下表函式名稱做些簡化
ai_one_work_func $\to$ ai_one
ai_two_work_func $\to$ ai_two
drawboard_work_func $\to$ drawboard
| 函式名稱 | 版本 | 執行次數 | 平均執行時間 | 標準差 | 總耗時(us) |
| ------------------- | ------ | -------- | ------------ | --------- | ----------- |
| check_win | base | 208113 | 0.17 | 0.18 | 34422.99 |
| | encode | 226929 | 0.12 | 0.10 | 26836.84 |
| mcts | base | 110 | 365283.94 | 220413.31 | 40181233.66 |
| | encode | 131 | 268343.12 | 179813.17 | 35152948.75 |
| negamax | base | 135 | 1296.84 | 2222.73 | 175073.91 |
| | encode | 143 | 1036.69 | 1349.74 | 148246.36 |
| timer_handler | base | 576 | 28.00 | 27.08 | 16127.07 |
| | encode | 576 | 33.23 | 28.00 | 19140.98 |
| produce_board | base | 379 | 1.63 | 1.66 | 619.66 |
| | encode | 576 | 3.47 | 2.00 | 1995.95 |
| kxo_read | base | 369 | 162555.13 | 228511.56 | 59982843.62 |
| | encode | 577 | 103904.06 | 549.67 | 59952645.07 |
| ai_one | base | 112 | 359648.34 | 218211.86 | 40280613.83 |
| | encode | 132 | 264752.14 | 177650.80 | 34947282.25 |
| ai_two | base | 113 | 3262.96 | 3129.28 | 368714.91 |
| | encode | 129 | 2487.72 | 1735.59 | 320916.36 |
| drawboard | base | 338 | 111851.44 | 189695.43 | 37805788.28 |
| | encode | 537 | 20.52 | 15.24 | 11021.58 |
除了 `timer_handler` 這類定時觸發的函式之外,編碼版的執行次數都比原版的多,其它如平均執行時間幾乎都比原版好,只有 `timer_handler` 和 `produce_board` 的執行時間比原版久,這部份還在做實驗找原因中。
`drawboard_work_func` 編碼板和原版的執行時間差很多是因為編碼版不需要在裡面等待 mutex lock 釋放,用 ftrace 追蹤發現是在等待 `ai_one_work_func` 做完 `mcts` 並釋放 mutex lock, `drawboard_work_func` 才可以繼續執行 `draw_board`。
```diff
static void drawboard_work_func(struct work_struct *w) {
/* ... */
- mutex_lock(&producer_lock);
- draw_board(table);
- mutex_unlock(&producer_lock);
/* Store data to the kfifo buffer */
mutex_lock(&consumer_lock);
produce_board();
mutex_unlock(&consumer_lock);
wake_up_interruptible(&rx_wait);
}
```
可以用 grep 將 ftrace 的 log 提取出標記為 `@` 的執行時間,就能觀察出 `mcts` 是 `drawboard_work_func` 的瓶頸,原版要等到 `mcts` 執行完畢並釋放鎖才能讓其它函式有所進展,因為編碼版把繪製棋盤的責任交由使用者之後,`drawboard_work_func` 才不會因為等待 `producer_lock` 解鎖而卡住。
```shellscript
$ grep -n "@" ./trace
129: 7) @ 852999.3 us | } /* mcts [kxo] */
130: 7) @ 853036.5 us | } /* ai_one_work_func [kxo] */
135: 9) @ 852993.6 us | } /* mutex_lock */
136: 9) @ 853019.5 us | } /* drawboard_work_func [kxo] */
204: 9) @ 615072.3 us | } /* mcts [kxo] */
205: 9) @ 615102.0 us | } /* ai_one_work_func [kxo] */
206: 2) @ 615154.8 us | } /* mutex_lock */
207: 2) @ 615185.8 us | } /* drawboard_work_func [kxo] */
272: 2) @ 154949.5 us | } /* mcts [kxo] */
273: 2) @ 154975.9 us | } /* ai_one_work_func [kxo] */
278: 0) @ 154873.8 us | } /* mutex_lock */
279: 0) @ 154899.7 us | } /* drawboard_work_func [kxo] */
```
有了數據之後,接著計算改進幅度,平均執行時間, 標準差, 總耗時改善幅度計算方式
$$
improve = 100 \times \frac{base - encode}{base}
$$
執行次數則改為
$$
improve = 100 \times \frac{encode - base}{base}
$$
| 函式 | 執行次數 | 平均執行時間 | 標準差 | 總耗時 |
| -------------| -------- | ------------ | ------ | ------ |
| check_win | +9.04 | +29.41 | +44.44 | +22.04 |
| mcts | +19.09 | -26.54 | -18.42 | -12.51 |
| negamax | +5.93 | +20.06 | +39.28 | +15.32 |
| timer_handler | 0 | -18.68 | -3.4 | -18.69 |
| produce_board | +51.98 | -112.88 | -20.48 | -222.1 |
| kxo_read | +56.37 | +36.08 | +99.76 | +0.05 |
| ai_one | +17.86 | +26.39 | +18.59 | +13.24 |
| ai_two | +14.16 | +23.76 | +44.54 | +12.96 |
| drawboard | +58.88 | +100 | +100 | +100 |
# 多個 AI 對弈
>[commit: e65ab69](https://github.com/LambertWSJ/kxo/commit/e65ab699a4600eb0f072bfaa242aaad761a2fd5a)
參考 [BigMickey](https://hackmd.io/@BigMickey/linux2025-homework3) 的實作,將下圖單一棋局對弈的流程如,擴展到多個 AI 對弈。
```mermaid
flowchart TB
timer_handler e1@==> ai_game
ai_game e3@==> ai_one_work_func
ai_game e4@==> ai_two_work_func
ai_one_work_func e5@==> drawboard_work_func
ai_two_work_func e6@==> drawboard_work_func
drawboard_work_func e7@==> produce_board
subgraph get_cpu
ai_one_work_func
ai_two_work_func
end
classDef animate stroke-dasharray: 9,5,stroke-dashoffset: 900,animation: dash 15s linear infinite;
class e1 animate
class e3 animate
class e4 animate
class e5 animate
class e6 animate
class e7 animate
```
為了後續實作不同功能,先定義好每場 AI 棋局所需的結構。
- `game.h` 中的 `xo_table` 結構用於存放棋盤資料與對應的棋局 ID,核心模組與使用者空間(xo-user)都會使用它,方便雙方都能清楚知道對應的是哪一局遊戲的棋盤。
- `ai_game.h` 中的 `ai_game` 結構則只在核心模組使用,將原本單局棋局的控制變數集中管理,包含輪到哪方、遊戲是否結束、記錄數據等,並整合與 AI 運算相關的 work_struct 與 mutex。
- 這樣的設計使得未來擴充到多場 AI 對奕時,程式碼更容易維護與改寫。
```c
/* game.h */
struct xo_table {
int id;
unsigned int table;
};
/* ai_game.h */
struct ai_game {
struct xo_table xo_tlb;
char turn;
u8 finish;
struct mutex lock;
struct work_struct ai_one_work;
struct work_struct ai_two_work;
struct work_struct drawboard_work;
};
```
## 標記未完成棋局並安排 tasklet 進行 AI 對弈
原版的 timer handler 會判斷棋局分出勝負的話,就將 `table` 存入 `kfifo` 等待 `xo-user` 取出來繪製棋局,否則會呼叫 `ai_game` 透過 `tasklet` 在對弈一次,玩家落子後就把 table 存入 `kfifo`。
同樣的設計擴展到多場井字遊戲的棋局的話,變成收集還沒分出勝負的棋局交由 tasklet 安排對弈,而分出勝負的棋局就存入 xo_table 到 kfifo 待取。
透過 bitops 能夠將上述的點子有效率的實作出來,檢查每場遊戲是否分出勝負,還沒結束的遊戲用 OR 運算紀錄第 `i` 場到變數 `unfini` 裡面,分出勝負的遊戲就把 `xo_tlb` 存入到 `kfifo`。
```c
static void timer_handler(struct timer_list *__timer)
{
char cell_tlb[] = {'O', 'X'};
unsigned int unfini = 0;
for (int i = 0; i < N_GAMES; i++) {
struct ai_game *game = &games[i];
struct xo_table *xo_tlb = &game->xo_tlb;
uint8_t win = check_win(xo_tlb->table);
if (win == CELL_EMPTY)
unfini |= (1u << i);
else {
pr_info("kxo: game-%d %c win!!!\n", xo_tlb->id, cell_tlb[win - 1]);
read_lock(&attr_obj.lock);
if (attr_obj.display == '1') {
int cpu = get_cpu();
pr_info("kxo: [CPU#%d] Drawing final board\n", cpu);
put_cpu();
/* Store data to the kfifo buffer */
mutex_lock(&game->lock);
produce_board(&game->xo_tlb);
mutex_unlock(&game->lock);
wake_up_interruptible(&rx_wait);
}
if (attr_obj.end == '0')
game->xo_tlb.table = 0;
read_unlock(&attr_obj.lock);
}
}
/* ... */
}
```
上述迴圈做完後,將還沒結束的遊戲 `unfini` 指派到 `game_tasklet.data` 作為 `game_tasklet_func` 的參數,接著呼叫 `ai_game` 對 `game_tasklet` 排程,觸發 softirq 時,會把要對奕的棋局推進 work queue 等待執行。
要判斷哪些遊戲已經結束,其實不需要額外宣告一個 fini 變數,然後在迴圈中像這樣 `fini |= (1u << i)` 逐一紀錄。
由於已經用 `unfini` 這個變數紀錄了「尚未結束」的遊戲,因此只要對 `unfini` 取反(使用 `~unfini`),就能得到「已經結束」的遊戲。
接著,再與 `GENMASK(N_GAMES - 1, 0)` 做 AND 運算,確保只保留第 0 到第 N_GAMES-1 場的有效位元,這樣就能正確取得所有已分出勝負的遊戲。
如果 kxo 還沒結束,而且有些遊戲已經分出勝負了,就安排下次 timer handler 觸發時間。
```c
static void timer_handler(struct timer_list *__timer) {
for (int i = 0; i < N_GAMES; i++) {
/* .... */
}
const unsigned int fini = ~unfini & GENMASK(N_GAMES - 1, 0);
if (unfini) {
WRITE_ONCE(game_tasklet.data, unfini);
ai_game();
mod_timer(&timer, jiffies + msecs_to_jiffies(delay));
} else if (attr_obj.end == '0' && fini)
mod_timer(&timer, jiffies + msecs_to_jiffies(delay));
}
```
## Game tasklet 驅動 AI 對弈
當 tasklet 被觸發就表示還有沒結束的遊戲,那些沒結束的遊戲已經在 timer handler 找出來存入到變數 `unfini`,並傳入到 tasklet 當作參數,可以用 32 位元的漢明權重 `hweight32` 得出沒結束的遊戲有幾個,再逐一對每個遊戲做下面的動作
1. 從 unfini 的低位開始,用 ffs 逐一取出遊戲的 ID
2. 從 unfini 移除這場遊戲,這場遊戲的 AI 要準備下棋了
3. 依據是否下完棋和現在換誰下,決定推入 $AI_1$ 或 $AI_2$ 到 work queue
4. 推入落子完的棋盤工作到 work queue,棋盤會推入到 kfifo 等待 xo-user 繪製
```c
static void game_tasklet_func(unsigned long unfini) {
/* ... */
int n = hweight32(unfini);
for (int i = 0; i < n; i++) {
int id = ffs(unfini) - 1;
unfini &= ~(1u << id);
struct ai_game *game = &games[id];
READ_ONCE(game->finish);
READ_ONCE(game->turn);
smp_rmb();
if (game->finish && game->turn == 'O') {
WRITE_ONCE(game->finish, 0);
smp_wmb();
queue_work(kxo_workqueue, &game->ai_one_work);
} else if (game->finish && game->turn == 'X') {
WRITE_ONCE(game->finish, 0);
smp_wmb();
queue_work(kxo_workqueue, &game->ai_two_work);
}
queue_work(kxo_workqueue, &game->drawboard_work);
}
/* ... */
}
```
TODO: 解釋上述用到 memory barrier API 的用途,為什麼這樣用?
## AI 落子
AI 落子的函式為 `ai_one_work_func` 和 `ai_two_work_func`,兩個函式實作的程式碼很相像,差別在於使用的演算法和棋子,這裡以 `ai_two_work_func` 舉例。
<!--
TODO: explain ai_two_work_func
get_cpu # preempt_disable
put_cpu # preempt_enable
-->
```c
static void ai_two_work_func(struct work_struct *w) {
int cpu;
struct ai_game *game = container_of(w, struct ai_game, ai_two_work);
struct xo_table *xo_tlb = &game->xo_tlb;
unsigned int table = xo_tlb->table;
WARN_ON_ONCE(in_softirq());
WARN_ON_ONCE(in_interrupt());
cpu = get_cpu();
mutex_lock(&game->lock);
int move;
WRITE_ONCE(move, negamax_predict(table, CELL_X).move);
smp_mb();
if (move != -1)
xo_tlb->table = VAL_SET_CELL(table, move, CELL_X);
WRITE_ONCE(game->turn, 'O');
WRITE_ONCE(game->finish, 1);
smp_wmb();
mutex_unlock(&game->lock);
put_cpu();
}
```
## module 沒寫好不只核心會 crash,重開機也會有問題
程式碼修到 `ai_one/two_work_func` 後,嘗試編譯並載入到核心執行,開始用 xo-user 觀看結果時,跳出一大堆 kernel panic 的錯誤訊息,看一下 kxo 安息的地方是執行 `__slab_free` 發生的
```shell
$ make
$ sudo rmmod kxo
$ sudo insmod kxo.ko
$ sudo ./xo-user
```
用另一個終端機下 `sudo dmesg -w` 觀察 kxo 的錯誤訊息
```log
[193899.779069] kxo: [CPU#1] game-3 ai_two_work_func completed in 1056 usec
[193899.779071] Workqueue: kxod ai_two_work_func [kxo]
[193899.779081] RIP: 0010:__slab_free+0x152/0x280
...
[193899.779113] Call Trace:
[193899.779115] <TASK>
[193899.779119] ? zobrist_put+0x7c/0xd0 [kxo]
[193899.779124] ? zobrist_clear+0x53/0x8d [kxo]
[193899.779128] kfree+0x2c6/0x360
[193899.779132] zobrist_clear+0x53/0x8d [kxo]
[193899.779136] negamax_predict+0x65/0x90 [kxo]
[193899.779140] ai_two_work_func+0x81/0x160 [kxo]
[193899.779179] </TASK>
```
此時 xo-user 是卡住的狀態,即便我按下數次 Ctrl+C 也關不了,只好用 `sudo kill -9 359{6,8,9}` 關掉 3 個 xo-user 行程(為什麼執行一次 xo-user 會有 3 個行程?),打開 htop 再檢查一次,還是沒有把 xo-user 完全關掉,程式的狀態為 D(uninterruptible sleep)。
```c
PID USER PRI NI VIRT RES SHR S CPU% MEM TIME Command(merged)
3596 root 20 0 0 0 0 D 0.0 0.30 0:00:00 xo-user
```
先不管 xo-user,嘗試移除 kxo 又卡住了,用 lsmod 發現 kxo 的 Used by 為 -1,這好像沒得挽救了,只剩下重開機的選擇,重開機又發生一件神奇的事情。
>我把 Ubuntu 24 裝在迷你電腦,kxo 和其它程式都在上面開發,要寫程式再透過 VSCode 的 SSH 連線過去。
>
靜待一段時間後,用 SSH 連不上 Ubuntu,再等一下好了,SSD 開機很快的,再連過去 N 次還是一樣的訊息
```
Connection closed by xxx.xxx.x.x port 22
```
把螢幕線接到迷你電腦來看怎麼一回事,原來卡在 reboot,因為還在 `kill xo-user`,慢慢等還是可以重開機的,但不知道會等多久,我選擇強制關機再重開,又可以順利連過去。
```log
systemd-shutdown[1]: Syncing filesystems and block devices.
systemd-shutdown[1]: Sending SIGTERM to remaining processes...
systemd-shutdown[1]: Received SIGTERM from PID 1 (systemd-shutdown).
workqueue: console: callbc hogged CPU for >1000us 5 times, consider switching to TQ_UNBOUND
systemd-shutdown[1]: Waiting for process: 3596 (xo-user)
systemd-shutdown[1]: Sending SIGKILL to PID 3596 (xo-user)
systemd-shutdown[1]: Process 4000 (plymouthd-fd-es) has been marked to be excluded from killing.
systemd-shutdown[1]: Waiting for process: 3596 (xo-user)
```
kernel module 寫不好,不只核心會 crash,連帶 user space 程式也會出問題,user space 程式關不掉又導致重開機有問題,每個錯誤環環相扣,最後變成整個系統不穩定。這就是為什麼寫 kernel module 要格外謹慎,錯一步就可能把整個系統拖下水。
## 修復 kernel panic 的問題
# 設計 TUI 顯示多場棋局
>繪製多個棋局和 logo $\to$ >Commit [235ff35](https://github.com/LambertWSJ/kxo/commit/235ff351e2e6c8c34efbdd75b81de3f03965888a)
核心模組已實作完多個 AI 對弈的畫面,為此設計了 [TUI](https://gist.github.com/LambertWSJ/3055b0cdc3469d45723b65932ebe5c3e) 讓使用者程式連續顯示多場遊戲,並研究 [auto-tetris](https://github.com/jserv/auto-tetris) 怎麼繪製 TUI 畫面,從中引入基本 TUI 繪圖用的函式,再加入一些 escape 控制字元來實作。
為了提升整體 TUI(文字使用者介面)的可讀性與視覺一致性,這份設計圖中使用 [Box-drawing characters](https://en.wikipedia.org/wiki/Box-drawing_characters) 來繪製 tic-tac-toe 棋盤與介面邊框,取代傳統的 +, -, | 等 ASCII 字元。
傳統字元在顯示多個井字遊戲的棋盤時容易產生大量空隙,看起來雜亂、不對齊。而 box-drawing characters(如 `─`, `│`, `┼` 等)則能提供更緊密、對齊清晰的表格線條,使畫面結構更集中、不鬆散,也更貼近圖形化的佈局效果。
## 從 auto-tetris 移植 TUI 函式
從 [auto-tetris](https://github.com/jserv/auto-tetris) 移植基礎的 TUI 繪製函式和 escape 控制字元到 kxo 來實作設計圖。
outbuf 為 4096 byte 大小的記憶體區塊,內部紀錄的是 `outbuf_printf`, `outbuf_write` 寫入的資料,當超過 `FLUSH_THRESHOLD` 的時候,會把寫入到 stdout 顯示 outbuf 的內容,如果需要即時顯示的話,可呼叫 `outbuf_flush` 將當前的 buffer 寫入到 stdout。
```c
#define ESC "\033"
#define CLEAR_SCREEN ESC "[2J" ESC "[1;1H"
#define HIDE_CURSOR ESC "[?25l"
#define SHOW_CURSOR ESC "[?25h"
#define RESET_COLOR ESC "[0m"
#define OUTBUF_SIZE 4096
#define FLUSH_THRESHOLD 2048 /* Flush when half-full for optimal latency */
#define RESET "\033[0m"
#define RED "\033[31m"
#define GREEN "\033[32m"
#define o_ch GREEN "O" RESET
#define x_ch RED "X" RESET
static struct {
char buf[OUTBUF_SIZE];
size_t len;
bool disabled; /* Emergency fallback when buffer management fails */
} outbuf = {0};
void outbuf_flush();
void outbuf_printf(const char *format, ...);
void outbuf_write(const char *data, size_t data_len);
void safe_write(int fd, const void *buf, size_t count);
void gotoxy(int x, int y)
```
為了方便繪製特定遊戲的棋局,新增儲存和恢復座標的控制字元,後續再繪製第 N 個 table 的時候,可以先跳過去把棋局繪製完成後再返回,避免繪製的位置跑掉
```c
#define SAVE_XY ESC "[s"
#define RESTORE_XY ESC "[u"
void save_xy() {
outbuf_write(SAVE_XY, strlen(SAVE_XY));
}
void restore_xy() {
outbuf_write(RESTORE_XY, strlen(RESTORE_XY));
}
```
在 TUI(文字使用者介面)中進行繪製的方式,某種程度上類似於 Web 前端使用 Canvas API:需要手動控制繪製起始座標、處理相對與絕對位置的移動,並管理座標的儲存與恢復。但與 Canvas 相比,TUI 顯得陽春許多。
TUI 並不處理複雜的動畫或幾何圖形,而是著重於透過文字、控制字元、Unicode(如 box-drawing characters)等方式,來結構化地呈現資訊。這樣的設計更適合用於資源有限的環境,或需要在純文字介面中提供直覺化操作與清晰視覺效果的應用。
## 繪製 logo
設計圖規劃的 KXO logo 可以上網搜尋 `text to ascii art` 找喜歡的字體輸出成 logo,我選用 Larry 3D 作為範例,把複製好的文字存成文字檔,再藉由 `lolcat -f` 把鮮豔的 color code 一同存到檔案裡面。
```shell
$ cat logo.txt | lolcat -f > logof.txt && cat ./logof.txt
```
後面補上的 `&& cat ./logof.txt` 是為了預覽 logo,接著執行 xo-user 時把 logo 載入後繪製出來
```c
int main(int argc, char *argv[]) {
/* ... */
char *logo = load_logo("logof.txt");
/* ... */
render_logo(logo);
/* ... */
}
```
因為要一行行的把 logo 印出來,strchr 用換行(`\n`)做字串分割,又因為 strchr 一定要先執行過才知道能不能找到 `\n`,所以用 do-while 迴圈而不是 for 迴圈。
`x = 38` 則是自己依照設計圖中 logo 的起始位置設定的,k 是換行用的變數,當繪製完就換行,避免所有的字元都擠在同一行。
```c
void render_logo(char *logo) {
char *beg = logo;
const char *pos;
int ln = '\n';
int k = 0;
int x = 38;
do {
pos = strchr(beg, ln);
k++;
gotoxy(x, k);
size_t len = pos - beg;
outbuf_write(beg, len);
beg += len + 1;
} while (pos);
gotoxy(x, k);
outbuf_flush();
}
```
當找到換行的位置 pos 後,將位置移動到 `gotoxy(x, k)`,x 是固定的,所以遞增 k 就好,這段要給 `outbuf_wrtie` 的字串長度為 `pos - beg`,寫入完後要更新開頭 `beg` 到換行的下個字元 `len + 1` 並檢查 `pos`,只要不是 NULL 就繼續從 beg 開始尋找換行並繪製 logo,重複這個過程直到 logo 繪製完成。
:::info
小訣竅: VSCode 反白一段文字後,視窗的右下角會顯示反白選中多少個字元。
以下圖為例,反白到繪製 logo 的起始位置,右下角顯示`(38 selected)`,Ln 367 則是當前行數,這樣就可以知道要繪製的 X 和 Y 的位置,**千萬不要用方向鍵慢慢默數到你要的位置**。

:::
## 繪製多場遊戲
因為有多個棋盤要繪製,暫時先用不精準的詞彙來區分 `BOARD` 和 `TABLE`,兩者以下圖來呈現
`BOARD` $\to$ 用來區分每一個棋盤的分界線,顯示遊戲的 ID、棋局、用什麼演算法對弈。
`TABLE` $\to$ 4x4 的棋盤,顯示當前的棋局,下圖中 `+2` 表示 row 落子要遞增的長度,反之亦然,`+4` 表示 col
下圖標註了邊長和棋盤從左上角為起點,每一格 x, y 遞增的量
fig.`BT_design`
```
(BOARD)
├─────────────── 35 ──────────────┤
┌─────────────────────────────────┐ ─┬─ (TABLE)
│ 15 Game-1 13 │ ─┬─ │ 17
│ ┌───┬───┬───┬───┐ │ │ │ ┌───┬───┬───┬───┐
│ │ │ │ │ │ │ │ │ │ │ │ │ │
│ ├───┼───┼───┼───┤ │ │ │ ├───┼───┼───┼───┤
│ │ │ │ │ │ │ │ │ │ │ │ │ │
│ 9 ├───┼───┼───┼───┤ 8 │ 11 13 +2 ├───┼───┼───┼───┤ 9
│ │ │ │ │ │ │ │ │ │ │ │ │ │
│ ├───┼───┼───┼───┤ │ │ │ ├───┼───┼───┼───┤
│ │ │ │ │ │ │ │ │ │ │ │ │ │
│ └───┴───┴───┴───┘ │ │ │ └───┴───┴───┴───┘
│ 12 mcts vs nega 10 │ ─┴─ │ +4
└─────────────────────────────────┘ ─┴─
```
TODO: 解釋很多行的 `render_boards_temp` 實作
使用 `render_boards_temp` 繪製出來,因為是固定的,所以只繪製一次,不會進到 while 迴圈裡重複繪製。
```c
int main(int argc, char *argv[]) {
/* ... */
render_logo(logo);
render_boards_temp(N_GAMES);
tui_init();
while (!end_attr) {
/* ... */
}
}
```
分隔線繪製完成後,接著要繪製棋盤,要先從核心模組取得棋局,才能夠在使用者空間繪製出來,當讀到棋局 `xo_tlb`,則需要依據 ID 將開始繪製的位置 gotoxy 過去,當繪製完成後再恢復到原本的開始繪製位置,避免之後都是從 gotoxy 的位置開始繪製,導致 TUI 畫出一堆亂碼
```c
int main(int argc, char *argv[]) {
/* ... */
while (!end_attr) {
/* ... */
if (FD_ISSET(STDIN_FILENO, &readset)) {
/* ... */
} else if (read_attr && FD_ISSET(device_fd, &readset)) {
FD_CLR(device_fd, &readset);
read(device_fd, &xo_tlb, sizeof(struct xo_table));
save_xy();
update_table(&xo_tlb);
restore_xy();
}
/* ... */
}
}
```
繪製棋局的第一步要把 ID 轉換到 BOARD 開頭的位置,才能對照 fig. `BT_design` 標註的邊長和位移量開始繪圖。
```c
#define UI_COLS 3
#define BOARD_W 35
#define BOARD_H 13
#define BOARD_BASEY 10
void update_table(const struct xo_table *xo_tlb) {
const char *cell_tlb[] = {" ", o_ch, x_ch};
int id = xo_tlb->id;
unsigned int table = xo_tlb->table;
int y = BOARD_BASEY + (id / UI_COLS) * (BOARD_H - 1);
int x = (id % UI_COLS) * BOARD_W + 1;
gotoxy(x + 15, y + 2);
outbuf_printf("Geme-%d\n", id);
}
```
第 n 個棋局起始 x, y 位置
x $\to$ 第 n 個 col 乘上 BOARD 寬度
y $\to$ BOARD_BASEY(logo 的下一行) + 第 n 個 row 乘上 BOARD 高度
有了 Game ID 的起始位置,只要對照上圖 fig. `BT_design` 把 x, y 加上位移就能輕鬆繪製期局和資訊了
### 顯示遊戲 ID
```c
void update_table(const struct xo_table *xo_tlb) {
/* ... */
gotoxy(x + 15, y + 2);
outbuf_printf("Geme-%d\n", id);
/* ... */
}
```
- 繪製 AI 玩家用的演算法
目前還沒實作動態切換演算法,所以都先固定用原本 player1 和 player2 用的演算法
```c
void update_table(const struct xo_table *xo_tlb) {
/* ... */
gotoxy(x + 12, y + 12);
outbuf_printf("MCTS vs NEGA\n");
/* ... */
}
```
### 繪製棋盤
### 繪製棋局
棋盤繪製完成後,再用 bitops 把 table 上的每一個棋子依序繪製到棋盤上。
按照上圖 `BT_design` 的標註,x, y 的位移量分別為 2 和 4,表示這兩個方向在棋盤上格子的位移量,只要將棋子 x 和 y 的位置計算好再乘上位移量,就能在棋盤的格子上繪製棋子,不會畫到棋盤外面。
```c
void update_table(const struct xo_table *xo_tlb) {
/* ... */
const char *cell_tlb[] = {" ", o_ch, x_ch};
int tlb_x = x + 9;
int tlb_y = y + 3;
const int cell_x = tlb_x + 2;
const int cell_y = tlb_y + 1;
const int stepx = 4;
const int stepy = 2;
for (int i = 0; i < N_GRIDS; i++) {
const int pos_x = cell_x + (i & (BOARD_SIZE - 1)) * stepx;
const int pos_y = cell_y + (i / BOARD_SIZE) * stepy;
gotoxy(pos_x, pos_y);
outbuf_printf("%s", cell_tlb[TABLE_GET_CELL(table, i)]);
}
/* ... */
}
```
初步把設計圖棋盤繪製的部分刻出來了,展示效果如下

### 繪製分頁
> Commit: [10aef25](https://github.com/LambertWSJ/kxo/commit/10aef2516a97931decfd9e605f293b787f45ef88)
為了要顯示 Records 和 load average 又不必占用太多的畫面,改成用分頁的方式來切換顯示,使用者可以按下快捷鍵 Q 和 W 分別切換 Records 和 load average。
定義好分頁會用到的 union,使用者只要知道有哪些分頁,把分頁傳入 tui_update_tab 更新分頁畫面。
TAB_TOTLEN 的用途用來知道有多少個分頁,日後如何還有新的分頁加進 tui_tab 就會自動更新,不用手動去數有幾個分頁。
```c
/* tui.h */
enum tui_tab {
XO_TAB_RECORD,
XO_TAB_LOADAVG,
TAB_TOTLEN,
};
void tui_update_tab(enum tui_tab, const struct xo_table *tlb);
```
預設顯示 Records,當偵測到使用者按下 Q 或 W,會在 `listen_keyboard_handler` 指派特定分頁到 cur_tab,等迴圈下次呼叫 `tui_update_tab` 就能更新畫面。
```c
int main(int argc, char *argv[]) {
if (!status_check())
exit(1);
/* ... */
curtab = XO_TAB_RECORD;
while (!end_attr) {
FD_ZERO(&readset);
FD_SET(STDIN_FILENO, &readset);
FD_SET(device_fd, &readset);
print_now();
tui_update_tab(curtab);
/* ... */
if (FD_ISSET(STDIN_FILENO, &readset)) {
FD_CLR(STDIN_FILENO, &readset);
listen_keyboard_handler();
} else if (read_attr && FD_ISSET(device_fd, &readset)) {
/* ... */
}
/* ... */
}
}
```
#### 實作分頁切換
繪製分頁需要知道標題,還有占用的高度,避免資訊超出分頁的畫面。`TAB_TOTLEN` 是分頁的數量,在 tui_tabs 宣告陣列長度時可以用上,如果不小心多一個元素,可以在編譯階段就抱錯。tui_tabs 運用 [Designated Initializers](https://gcc.gnu.org/onlinedocs/gcc/Designated-Inits.html) 預先設定好每個分頁的標題和高度。
```c
/* tui.c */
struct xo_tab {
char *title;
int high;
};
static struct xo_tab tui_tabs[TAB_TOTLEN] = {
[XO_TAB_RECORD] = {.title = "Records", .high = 11},
[XO_TAB_LOADAVG] = {.title = "Load avg", .high = 12},
};
```
分頁的繪製分成下面的步驟
1. 繪製分頁的標題 $\to$ 繪製每個分頁的標題,如果分頁和傳入的 tab 是一樣的就把底部清成空白
2. 繪製當前分頁的資訊 $\to$ 呼叫對應的函式,先清空分頁畫面,再繪製分頁內容
```c
void tui_update_tab(enum tui_tab tab) {
assert(tab < TAB_TOTLEN);
draw_tab_label(tab);
/* request data and render */
switch (tab) {
case XO_TAB_RECORD:
xo_record(tab);
break;
case XO_TAB_LOADAVG:
xo_loadavg(tab);
break;
default:
assert(0);
}
}
```
切換分頁時要先把前個分頁的畫面清空再畫上對應高度的框線,不然舊有資訊會殘留在畫面上,這任務交由 `draw_tab_border`,分頁的函式專注在如何繪製內容,程式碼會更好維護。
```c
static void xo_record(const enum tui_tab tab) {
draw_tab_border(tui_tabs[tab].high);
}
static void xo_loadavg(const enum tui_tab tab) {
draw_tab_border(tui_tabs[tab].high);
}
```
### 顯示下棋紀錄
>Commit [7ab469a](https://github.com/LambertWSJ/kxo/commit/7ab469a430eccb136e12cf6d0057d822c0136b9d)
為了要充分利用已有的程式碼,而且要盡量降低核心模組和使用者的通訊成本,將 xo_table 的結構做些調整。
```diff
struct xo_table {
- int id;
+ int attr;
unsigned int table;
+ unsigned long moves;
};
```
- 把 `id` 改成 attr 可以在多存一些資訊在變數裡面,未來可以依據需求擴充。這裡我將 `[8:4]` 定義為遊戲的總下棋數 `steps`,這樣就能節省空間,不必再新增屬性到 `xo_table`。
- 新增無號 64 位元整數 `moves`,從低位元開始,每 4 個位元用來紀錄旗子在棋盤上的位置
```graphviz
digraph G {
node [shape=none]
rankdir="LR";compound=true;
subgraph cluster {
label=xo_table;
attr;moves;
attr_title;mv_title
color=transparent;
}
moves [label=<
<table border="0" cellborder="1" cellspacing="0">
<tr>
<td width="31.25">15</td>
<td width="31.25">14</td>
<td width="31.25">13</td>
<td width="31.25">12</td>
<td width="31.25">11</td>
<td width="31.25">10</td>
<td width="31.25">9</td>
<td width="31.25">8</td>
<td width="31.25">7</td>
<td width="31.25">6</td>
<td width="31.25">5</td>
<td width="31.25">4</td>
<td width="31.25">3</td>
<td width="31.25">2</td>
<td width="31.25">1</td>
<td width="31.25">0</td>
</tr>
</table>
>];
mv_title[label="moves",fontsize=16];
mv_title -> moves[color=transparent]
attr [label=<
<table border="0" cellborder="1" cellspacing="0">
<tr>
<td width="249.2" border="0"></td>
<td width="187.5">reserved</td>
<td width="31.5">steps</td>
<td width="31.5">id</td>
</tr>
</table>
>];
attr_title[label="attr",fontsize=15];
attr_title -> attr[color=transparent]
}
```
新增位元處裡的 macro 放便存取 steps 和 moves。由於大部分針對 xo_table 的 macro 都是把變數某一段位元區塊讀取或寫入,所以新增 get/set_bis 系列的函式來取代重複的程式碼
```c
/* game.h */
#define XO_ATTR_ID(attr) (attr & ATTR_MSK)
#define XO_ATTR_STEPS(attr) get_bits(attr, 0xf, 4)
#define XO_SET_ATTR_STEPS(attr, type) set_bits(attr, type, 0xf, 0x4)
#define SET_RECORD_CELL(moves, step, n) set_bits64(moves, step, 0xful, n * 4u)
#define GET_RECORD_CELL(moves, id) get_bits64(moves, 0xful, id * 4u)
static inline int set_bits(int x, int value, int mask, int n) {
return (x & ~(mask << n)) | (value << n);
}
static inline unsigned int get_bits(unsigned int x, int mask, int n) {
return (x & (mask << n)) >> n;
}
```
處理 steps 和 moves 基礎函式準備好後,接者修改 `ai_one/two_work_func`,這兩個函式程式碼幾乎一模一樣,這裡以修改 ai_one_work_func 為例。
```c
static void ai_one_work_func(struct work_struct *w) {
/* ... */
int attr = xo_tlb->attr;
int id = XO_ATTR_ID(attr);
int steps = XO_ATTR_STEPS(attr);
int move;
WRITE_ONCE(move, mcts(table, CELL_O));
if (move != -1) {
WRITE_ONCE(xo_tlb->table, VAL_SET_CELL(table, move, CELL_O));
WRITE_ONCE(xo_tlb->moves,
SET_RECORD_CELL(xo_tlb->moves, move, steps++));
WRITE_ONCE(xo_tlb->attr, XO_SET_ATTR_STEPS(attr, steps));
}
WRITE_ONCE(game->turn, 'X');
WRITE_ONCE(game->finish, 1);
pr_info("[one]move: [%d, %x, %lx]\n", steps - 1, move, xo_tlb->moves);
/* ... */
}
```
取得當前遊戲的 attr 後,先紀錄 steps,如果 mcts 有從 table 搜尋到下一步(move),就將這一步寫到棋盤上,再儲存到 moves 並將 steps + 1 記錄回 xo_tlb 的 attr。
#### 顯示歷史紀錄
先前已經實做好切換分頁的實作,現在則做一些擴充,將 switch-case 內每個繪製分頁的函式都加入參數 xo_table 否則沒有必要資訊可以繪製到畫面上,另外為了區分畫面做了切換,新增變數 prev_tab 來記錄前一個分頁。
```c
enum tui_tab prev_tab = TAB_TOTLEN;
void tui_update_tab(enum tui_tab tab, const struct xo_table *tlb)
{
assert(tab < TAB_TOTLEN);
draw_tab_label(tab);
/* request data and render */
switch (tab) {
case XO_TAB_RECORD:
xo_record(tab, tlb);
break;
case XO_TAB_LOADAVG:
xo_loadavg(tab, tlb);
break;
default:
assert(0);
}
}
```
xo-user 向 kxo 讀取 xo_table 時,只要從裡面取得 steps, id, moves 就能透過 bitops 還原每一步歷史紀錄。
繪製步驟:
1. row by row 印出 `Game-id:`
1. 移動到 x = 11 和 y + id 的位置(紅色游標處) $\to$ Game-id: <span style="color:red">|</span>
1. 清空 Game id 的歷史紀錄(18 ~ 19 行),並回到 Game-id: <span style="color:red">|</span>
1. 用 Game 的 steps 和 moves 繪製座標
```c=
static void xo_record(const enum tui_tab tab, const struct xo_table *tlb) {
if (tab != prev_tab) {
draw_tab_border(tab);
}
prev_tab = tab;
unsigned long moves = tlb->moves;
int steps = XO_ATTR_STEPS(tlb->attr);
int id = XO_ATTR_ID(tlb->attr);
char xy[2];
int y = 53;
gotoxy(4, y);
for (int i = 0; i < N_GAMES; i++) {
outbuf_printf("GAME-%d: ", i);
gotoxy(4, y + i + 1);
}
int x = 11;
gotoxy(x, y + id);
outbuf_printf("%77s", " ");
gotoxy(x, y + id);
for (int i = 0; i < steps; i++) {
uint8_t mv = GET_RECORD_CELL(moves, i);
xy[0] = 'A' + (mv & 3);
xy[1] = '1' + (mv / BOARD_SIZE);
outbuf_printf(" %s %s", xy, i == steps - 1 ? " " : "🠮");
}
}
```
# Load average
> Commit [6c8a5ff](https://github.com/LambertWSJ/kxo/commit/6c8a5ffd823338cad97a0a3e485ab3996cb816f5)
# 動態切換 AI 演算法
> Commit [c1c3f32](https://github.com/LambertWSJ/kxo/commit/c1c3f32c37d411f88d7cbd273f1e7bc063488851)
目前還沒從 ttt 移植強化學習(RL),不過可以先實作動態切換不同 AI 演算法對弈。切換 AI 的時機可以是每下完棋就換一個,或是分出勝負再切換,前者過於頻繁,所以切換的時機選在分出勝負的時候。
## 擴充 attr
觀察對弈用的 `mcts` 和 `negamax_predict` 的參數都是用 table 和 player,又可以用查表法了,把演算法的索引存入到 xo_table 的 attr。
`ai1` $\to$ `attr[9:8]`
`ai2` $\to$ `attr[11:10]`
新增存取 ai1, ai2 的 macro
```c
#define XO_ATTR_AI_ALG(attr) get_bits(attr, 0xf, 8)
#define XO_SET_ATTR_AI_ALG(attr, ai1, ai2) \
set_bits(attr, (ai1) | (ai2) << 2, 0xf, 0x8)
```
擴充後的 attr 如下圖所示,搭配 `XO_ATTR_AI_ALG`, `XO_SET_ATTR_AI_ALG` 就能方便存取 ai1, ai2 的索引來做查表法。
```graphviz
digraph G {
node [shape=none]
myNode [label=<
<table border="0" cellborder="1" cellspacing="0">
<tr>
<td width="312.5">reserved</td>
<td width="31.25">ai2</td>
<td width="31.25">ai1</td>
<td width="62.5">steps</td>
<td width="62.5">id</td>
</tr>
</table>
>];
}
```
## 查表法
negamax_predict 回傳值要從 move_t 改為 int,才能和 mcts 有一致的介面,有一致的介面才能建立表格。
```c
/* negamax.h */
int negamax_predict(unsigned int table, char player);
/* game.h */
enum {
XO_AI_MCTS,
XO_AI_NEGAMAX,
XO_AI_TOT,
};
/* ai_game.h */
typedef int (*ai_alg)(unsigned int table, char player);
/* main.c */
static ai_alg ai_algs[XO_AI_TOT] = {
[XO_AI_MCTS] = mcts,
[XO_AI_NEGAMAX] = negamax_predict,
};
```
### 隨機索引
#### kxo_init
在 kxo_init 會先初始化每個 `xo_table`,用 get_random_u32() 隨機數,給每個遊戲都隨機指定 AI
```c
static int __init kxo_init(void) {
/* ... */
for (int i = 0; i < N_GAMES; i++) {
unsigned int attr;
u32 rnd = get_random_u32();
struct ai_game *game = &games[i];
game->xo_tlb.table = 0;
attr = i; /* set game ID in attr */
attr =
XO_SET_ATTR_AI_ALG(attr, rnd % XO_AI_TOT, (rnd >> 16) % XO_AI_TOT);
game->xo_tlb.attr = attr;
}
}
```
id 是從 bit 0 開始,所以執行 `attr = i` 即可,接著 `XO_SET_ATTR_AI_ALG` 將隨機數的高 16 位和低 16 位和演算法的總數做模除即為 ai1 和 ai2 的索引。
#### timer_handler
做法和 kxo_init 一樣,但只有在分出勝負的時候才更換 AI 演算法的索引,還在對弈中的棋局則不影響
```c
static void timer_handler(struct timer_list *__timer) {
char cell_tlb[] = {'O', 'X'};
for (int i = 0; i < N_GAMES; i++) {
struct ai_game *game = &games[i];
struct xo_table *xo_tlb = &game->xo_tlb;
unsigned int attr = xo_tlb->attr;
uint8_t win = check_win(xo_tlb->table);
uint8_t id = XO_ATTR_ID(attr);
if (win == CELL_EMPTY)
unfini |= (1u << i);
else {
pr_info("kxo: game-%d %c win!!!\n", id, cell_tlb[win - 1]);
/* ... */
if (attr_obj.end == '0') {
u32 rnd = get_random_u32();
attr &= ATTR_MSK;
attr = XO_SET_ATTR_AI_ALG(attr, rnd % XO_AI_TOT,
(rnd >> 16) % XO_AI_TOT);
xo_tlb->attr = attr;
}
}
}
}
```
### 對弈
當對弈的函式執行的時候,從 attribute 取出索引給 ai_algs 取得 AI 演算法來呼叫即可。
```diff
static void ai_one_work_func(struct work_struct *w){
/* ... */
int move;
- WRITE_ONCE(move, mcts(table, CELL_O));
+ int alg = XO_ATTR_AI_ALG(attr) & 3;
+ WRITE_ONCE(move, ai_algs[alg](table, CELL_O));
/* ... */
}
static void ai_two_work_func(struct work_struct *w)
{
/* ... */
int move;
- WRITE_ONCE(move, negamax_predict(table, CELL_X).move);
+ int alg = XO_ATTR_AI_ALG(attr) >> 2;
+ WRITE_ONCE(move, ai_algs[alg](table, CELL_X));
/* ... */
}
```
# 從 ttt 移植 Reinforcement Learning
> Commit: [28a3a73](https://github.com/LambertWSJ/kxo/commit/28a3a73e76c0690ef5a5bcbd50150727fe8c1bc8)
參考 [ttt](https://github.com/jserv/ttt)/[train.c](https://github.com/jserv/ttt/blob/main/train.c) 怎麼初始化和更新狀態空間,適合做為移植到核心模組的範例。
移植強化學習到核心模組要解決下面的問題
- 浮點數運算換成定點數
- 棋盤的狀態從陣列轉成編碼版
- 配置 $4\times4$ 狀態空間需要很大的空間,超出 `kmalloc` 的限制 $\to$ $3^{16}=43046721$ 個狀態
### 浮點數運算換成定點數
RL 用到的浮點數運算只用到加減乘除,沒有 MCTS 那麼複雜,因此轉換成定點數運算不會太困難,利用現有的程式碼修改後,在 userspace 實驗完就再把程式碼合併到核心模組即可。
為了充分運用現有程式碼,將 `game.h` 定義好的 `fixed_point_t` 改為 Q16.16. 的格式,作為 RL 使用的定點數。
定點數加減可以直接運算,乘法運算完要除以 $b^n$ ,除法運算完則要除以 $b^n$。RL 移植只需要用到乘法,如果乘數不是定點數的話則要乘以定點數的 1.0(`RL_FIXED_1`) 轉換成定點數。
```c
#define FIXED_SCALE_BITS 16
#define FIXED_MAX (~0U)
#define FIXED_MIN (0U)
#define RL_FIXED_1 (1 << FIXED_SCALE_BITS)
typedef unsigned fixed_point_t;
static inline fixed_point_t fixed_mul(fixed_point_t a, fixed_point_t b)
{
return ((s64) a * b) >> FIXED_SCALE_BITS;
}
static inline fixed_point_t fixed_mul_s32(fixed_point_t a, s32 b)
{
b = ((s64) b * RL_FIXED_1);
return fixed_mul(a, b);
}
```
### 配置狀態空間
狀態空間 state_value 要配置的空間太大,無法用 kmalloc,只能採用 vmalloc 配置一大塊實體上不連續的記憶體,狀態空間配置完後,用 `get_score` 作為 Value function 計算這個玩家在所有可能的狀態下的初始動作可以獲得的分數,比其從零開始的狀態空間,這麼做可以讓 agent 對每場可能的棋局有概括的資訊,學習效率自然也會比零基礎的 agent 來的快。
```c
void init_rl_agent(rl_agent_t *agent, unsigned int state_num, char player) {
agent->player = player;
agent->state_value = vmalloc(sizeof(fixed_point_t) * state_num);
if (!(agent->state_value))
pr_info("Failed to allocate memory");
for (unsigned int i = 0; i < state_num; i++) {
agent->state_value[i] = fixed_mul_s32(
INITIAL_MUTIPLIER, get_score(hash_to_table(i), player));
}
}
```
但是這麼做也是有缺點的,就是初始化狀態空間的速度太慢,如果 `insmod` 馬上 `xo-user`,會很明顯卡住,因為要設定 $3^{16}$ 的初始值需要一段時間,後來改用 kthread 去初始化狀態空間,等初始化完才加入到遊戲,再等隨機索引動態切換成 RL。
### 對弈
RL agent 下棋是用 `play_rl` 回傳玩家在這個棋局評估的下一步的落子位置,get_action_exploit 是在所有空格下一次棋,評估哪一格會有最高的分數,就是下一步要落子的位置。
```c
static int get_action_exploit(unsigned int table, const rl_agent_t *agent) {
int max_act = -1;
fixed_point_t max_q = FIXED_MIN;
const fixed_point_t *state_value = agent->state_value;
int candidate_count = 1;
int last_e = 0;
for_each_empty_grid(i, table) {
last_e = i;
table = VAL_SET_CELL(table, i, agent->player);
fixed_point_t new_q = state_value[table_to_hash(table)];
if (new_q == max_q) {
++candidate_count;
if (get_random_u32() % candidate_count == 0) {
max_act = i;
}
} else if (new_q > max_q) {
candidate_count = 1;
max_q = new_q;
max_act = i;
}
table = VAL_SET_CELL(table, i, CELL_EMPTY);
}
/* If there is no best action, then choose the last empty cell */
return max_act == -1 ? last_e : max_act;
}
unsigned int play_rl(unsigned int *table, const rl_agent_t *agent) {
unsigned int tlb = *table;
int move = get_action_exploit(tlb, agent);
*table = VAL_SET_CELL(tlb, move, agent->player);
return move;
}
```
上面的修改其實也是錯的,如果是棋盤沒有空格,會回傳 0,但這樣是不合法的,只能保證不會出現 UB。
如果初始化 RL agent 沒有設定狀態空間的初始值,而是用記憶體殘值來當做初始值,會造成 get_action_exploit 回傳 -1,導致執行時期出現未定義行為。
```
[ 81.123328] UBSAN: shift-out-of-bounds in game.h:86:40
[ 81.123332] shift exponent -2 is negative
```
#### AI 下棋
`play_rl` 的介面,沒有辦法加入到查表法理面,所以在 `ai_one/two_func_work` 裡面只能用判斷式知道目前的 AI 演算法是不是用 RL,如果是 RL 且下一步 (move) 是合法的話就更新 episode_moves 和 reward。
episode_moves $\to$ 紀錄遊戲每一步落子後的棋局,將這場棋局轉換成狀態空間索引,後面才知道要更新的是哪個狀態。
<!-- reward $\to$ 紀錄這一步評估出來的分數乘上 -->
```c
static void ai_one_work_func(struct work_struct *w) {
int id = XO_ATTR_ID(attr);
int alg = XO_ATTR_AI_ALG(attr) & 3;
bool is_rl = alg == XO_AI_RL;
WRITE_ONCE(move, is_rl ? play_rl(&table, &rl_agents[0])
: ai_algs[alg](table, CELL_O));
if (move != -1) {
WRITE_ONCE(xo_tlb->table, VAL_SET_CELL(table, move, CELL_O));
WRITE_ONCE(xo_tlb->moves, SET_RECORD_CELL(xo_tlb->moves, move, steps));
if (is_rl) {
table = xo_tlb->table;
u8 win = check_win(table);
episode_moves[id][steps] = table_to_hash(table);
fixed_point_t score = fixed_mul_s32((RL_FIXED_1 - REWARD_TRADEOFF),
get_score(table, CELL_O));
reward[id][steps] = score + calculate_win_value(win, CELL_O);
}
WRITE_ONCE(xo_tlb->attr, XO_SET_ATTR_STEPS(attr, steps + 1));
}
/* ... */
}
```
### 更新狀態空間
當分出勝負之後,就用 episode_moves 和 reward 來更新 agent 的狀態空間
```c
static void timer_handler(struct timer_list *__timer) {
/* ... */
for (int i = 0; i < N_GAMES; i++) {
struct ai_game *game = &games[i];
struct xo_table *xo_tlb = &game->xo_tlb;
unsigned int attr = xo_tlb->attr;
uint8_t win = check_win(xo_tlb->table);
if (win == CELL_EMPTY)
unfini |= (1u << i);
else {
int winner = win - 1;
pr_info("kxo: game-%d %c win!!!\n", id, cell_tlb[winner]);
if (attr_obj.end == '0') {
int steps = XO_ATTR_STEPS(xo_tlb->attr);
u32 rnd = get_random_u32();
attr &= ATTR_MSK;
attr = XO_SET_ATTR_AI_ALG(attr, rnd % XO_AI_TOT,
(rnd >> 16) % XO_AI_TOT);
xo_tlb->attr = attr;
xo_tlb->table = 0;
xo_tlb->moves = 0;
fixed_point_t next = 0;
for (int j = steps - 1; j >= 0; j--)
next = update_state_value(episode_moves[i][j], reward[i][j],
next, &rl_agents[winner]);
}
}
}
/* ... */
}
```
## 重構函式
> Commit: [8e34ecd](https://github.com/LambertWSJ/kxo/commit/8e34ecd66dde7d559a97f59bc581d708ab2975ee)
初步移植 RL 到核心模組的任務完成後,開始去蕪存菁整理程式碼,修掉下面已知的問題
- 封裝 RL agent 在內部模組裡面,不要接露到外部
- play_rl 可能回傳錯誤的結果,而且函式介面無法和查表法定義的型態不符
- 初始化造成核心模組初始化卡住,應改用 kthread 處理,初始化完畢再投入到遊戲
- `update_state_value` 不應該每次都傳值進去更新,應傳入陣列和 steps,在函式內部一口氣更新完 state value
### 封裝 RL agent
RL agent 只有下 O 或 X 的玩家,函式介面可以用 player 傳入 CELL_O 或 CELL_X 來索引 agent,不需要傳指標到 RL 模組的函式。
```diff
-unsigned int play_rl(unsigned int *table, const rl_agent_t *agent);
+int play_rl(unsigned int table, char player);
-void init_rl_agent(rl_agent_t *agent, unsigned int state_num, char player);
+void init_rl_agent(unsigned int state_num, char player);
fixed_point_t update_state_value(int after_state_hash,
fixed_point_t reward,
fixed_point_t next,
- rl_agent_t *agent);
+ char player);
```
init_rl_agent 可以省下一個參數傳遞,其餘從指標 `agent` 改成字元 `player` 對於 CPU 來說沒有函式呼叫成本的差異,但是在寫程式上,會比較簡單些,只要傳入 CELL_O/CELL_X,不需要用 address of 運算子寫 &rl_agent。
以 play_rl 來說,第二行改進後,程式碼好讀很多,清楚知道是 CELL_X 的玩家要下棋,也沒有像第一行有很多符號還有思考索引 1 是哪個玩家。
```c
play_rl(&table, &rl_agents[1]);
play_rl(table, CELL_X);
```
### 重構 `play_rl`
觀察原本的 `play_rl` 和 `ai_one/two_work_func` 的流程,發現 move 已經在外面判斷是否等於 -1,所以不用修改原版的 get_action_exploit 也不用在 play_rl 裡面更新 table 也能保證是正確的結果。
```c
static void ai_one_work_func(struct work_struct *w) {
int move;
int alg = XO_ATTR_AI_ALG(attr) & 3;
bool is_rl = alg == XO_AI_RL;
WRITE_ONCE(move, is_rl ? play_rl(&table, &rl_agents[0])
: ai_algs[alg](table, CELL_O));
if (move != -1) {
WRITE_ONCE(xo_tlb->table, VAL_SET_CELL(table, move, CELL_O));
WRITE_ONCE(xo_tlb->moves, SET_RECORD_CELL(xo_tlb->moves, move, steps));
/* ... */
}
}
```
那麼 play_rl 可以直接回傳 get_action_exploit 的結果就好了。
```c=
int play_rl(unsigned int table, char player) {
return get_action_exploit(table, player);
}
int play_rl(unsigned int table, char player)
__attribute__((alias("get_action_exploit")));
```
第五行是看到原本既然要回傳 `get_action_exploit` 不如用 alias 好了,寫完又發現不如直接把 `get_action_exploit` 改名為 `play_rl` 就好。
```diff
-static int get_action_exploit(unsigned int table, const rl_agent_t *agent) {
+int play_rl(unsigned int table, char player) {
int max_act = -1;
/* ... */
for_each_empty_grid(i, table) {
table = VAL_SET_CELL(table, i, agent->player);
/* ... */
table = VAL_SET_CELL(table, i, CELL_EMPTY);
}
return max_act;
}
```
修好 play_rl 的型態後,可以把它加進 `ai_algs` 做查表。第 11 行還是要做判斷是不是 RL,因為改成 RL 初始化完狀態空間才能夠使用,還有如果是 RL 才會記錄 episode_moves 和 reward。
```c=
static ai_alg ai_algs[XO_AI_TOT] = {
[XO_AI_MCTS] = mcts,
[XO_AI_NEGAMAX] = negamax_predict,
[XO_AI_RL] = play_rl,
};
static void ai_one_work_func(struct work_struct *w) {
/* ... */
int move;
int alg = (XO_ATTR_AI_ALG(attr) % (XO_AI_TOT - !rl_inited));
bool is_rl = alg == XO_AI_RL && rl_inited;
pr_debug("[two]: alg=%d, rl_init=%d\n", alg, rl_inited);
WRITE_ONCE(move, ai_algs[alg](table, CELL_O));
if (move != -1) {
WRITE_ONCE(xo_tlb->table, VAL_SET_CELL(table, move, CELL_O));
WRITE_ONCE(xo_tlb->moves, SET_RECORD_CELL(xo_tlb->moves, move, steps));
if (is_rl) {
table = xo_tlb->table;
u8 win = check_win(table);
episode_moves[id][steps] = table_to_hash(table);
fixed_point_t score = fixed_mul_s32((RL_FIXED_1 - REWARD_TRADEOFF),
get_score(table, CELL_O));
reward[id][steps] = score + calculate_win_value(win, CELL_O);
}
}
/* ... */
}
```
### RL agents 初始化
要避免載入核心模組期間卡在初始化 RL agent 狀態空間的問題,只能讓這個任務委派給 kthread 來執行,做完再加入到遊戲中對弈和訓練。
因為初始化要一段時間,不可能在載入核心模組的時候就馬上做完,所以 tot_alg 才會是演算法總數減一,避免 ai_game 一開始就選到 RL 導致核心模組崩潰。
```c
static int __init kxo_init(void) {
rl_inited = false;
rl_init_thr = kthread_run(init_agents, NULL, "init_rl_agents");
if (IS_ERR(rl_init_thr)) {
ret = -ENOMEM;
goto error_kthread;
}
/* RL state space not yet init */
const int tot_alg = XO_AI_TOT - 1;
for (int i = 0; i < N_GAMES; i++) {
unsigned int attr;
u32 rnd = get_random_u32();
struct ai_game *game = &games[i];
game->xo_tlb.table = 0;
attr = i; /* set game ID in attr */
attr = XO_SET_ATTR_AI_ALG(attr, rnd % tot_alg, (rnd >> 16) % tot_alg);
game->xo_tlb.attr = attr;
game->turn = 'O';
/* ... */
}
}
```
給 kthread 初始化 RL agent 函式為 `init_agents`,當初始化完兩位 agent 就對 rl_inited 寫入 true,表示狀態空間初始化完畢,可以讓 agent 加入遊戲。
<!-- kthread usage -->
```c
static int init_agents(void __attribute__((unused)) * arg) {
pr_debug("[%s] init...\n", __FUNCTION__);
int state_sum = 1;
CALC_STATE_NUM(state_sum);
init_rl_agent(state_sum, CELL_O);
init_rl_agent(state_sum, CELL_X);
pr_debug("[%s] init done!\n", __FUNCTION__);
WRITE_ONCE(rl_inited, true);
kthread_complete_and_exit(&rl_comp, 0);
}
```
卸載核心模組的時候,如果還沒 thread 還沒結束的話,則要等它執行完畢,才能夠釋放 RL 狀態空間佔用的記憶體。
```c
static void __exit kxo_exit(void) {
wait_for_completion(&rl_comp);
free_rl_agent(CELL_O);
free_rl_agent(CELL_X);
pr_info("kxo: unloaded\n");
}
```
### 改進 `update_state_value`
原版是每次更新都要呼叫 `update_state_value`,最差的情況下要呼叫 16 次,應改為傳入 steps 和陣列,在函式內部做完更新的操作。
稍作修改後把原本 update_state_value 改名為 step_update_state_value,這樣就不用把複雜的程式碼搬到同個函式裡面做改名和排版,程式碼看上去也很直觀,還能夠交由編譯器把 step_update_state_value 展開到 update_state_value 底下。
```c
static inline fixed_point_t step_update_state_value(int after_state_hash,
fixed_point_t reward,
fixed_point_t next,
char player) {
rl_agent_t *agent = &rl_agents[player - 1];
fixed_point_t curr =
reward - fixed_mul(GAMMA, next); // curr is TD target in TD learning
// and return/gain in MC learning.
agent->state_value[after_state_hash] =
fixed_mul((RL_FIXED_1 - LEARNING_RATE),
agent->state_value[after_state_hash]) +
fixed_mul(LEARNING_RATE, curr);
return agent->state_value[after_state_hash];
}
void update_state_value(const int *after_state_hash,
const fixed_point_t *reward,
int steps,
char player) {
mutex_lock(&rl_locks[player - 1]);
fixed_point_t next = 0;
for (int j = steps - 1; j >= 0; j--)
next = step_update_state_value(after_state_hash[j], reward[j], next,
player);
mutex_unlock(&rl_locks[player - 1]);
}
```
### 降低狀態空間
4x4 井字遊戲的 RL agent 需要 $3^{16}$ 個狀態,這些狀態包含下面幾種分類
* 合理的棋譜
* X 或 O 的先手
* X 或 O 勝利的棋譜
* 對弈中或平手
* 犯規的棋譜
* 全是 O 或 X
* 其中一個旗子下的更多
雖然初步移植成功,但想到是在 kernel space 配置這麼一大塊空間,而且無論會不會用到,每個狀態都要計算初始值,RL 才算是初始化完畢,如果沒有用多執行緒,核心模組在初始化階段就會阻塞住。
這些顯著的缺點讓我耿耿於懷,有沒有什麼辦法解決這些問題?下面是我一步步分析後,想到的一個折衷方案,雖無法涵蓋全部的狀態空間,但是大幅減小對記憶體的需求,而且不需要 kthread 馬上就能使用。
kxo 是兩位玩家輪流落子,因此第一步先分析合理的棋譜有幾個?先估計要配置多大的空間給 RL agent 使用,後面再想辦法把棋譜映射到狀態。
撰寫[模擬程式](https://gist.github.com/LambertWSJ/ac1cb9f34c936eb1a9cc45e593cf5da6)計算所有合法的棋譜有 6036017 個,約佔了所有狀態空間的 14%,即便狀態空間降低了很多,但是對 kernel 來說,依舊是不小的要求。
估算好所需的空間後,來思考一下如何把棋譜映射對應的狀態?目前的棋譜都編碼成 32 位元的整數,所以能夠使用 key-value 的資料結構來更新狀態空間,立刻能想到紅黑樹和雜湊表
key $\to$ table
value $\to$ `O` 和 `X` 的狀態空間中的分數
如果 table 存在的話就返回 agent 的分數來做下棋或更新分數,沒有的話就配置一個新狀態空間並計算初始分數再返回,總之**一定會涵蓋所有狀態,還不會在初始化的時候就不會阻塞整個模組運行**。
接著來分析 kxo 的情境來選擇紅黑樹或雜湊表來管理狀態。
<!-- TODO: 分析 rbtree, ht -->
執行 `xo-user` 的時候發現有些棋譜是反覆出現的,這 603 萬個狀態空間或許有很多棋譜是很少在實際情況下出現的,甚至幾乎不會出現的,那麼該如何從這些冷門棋譜中在省出一點空間來用?同時又能一直更新常出現的熱門棋譜。
為此我想到一個折衷方案,不需要一口氣配置所有狀態空間,而是決定最多要保留幾個狀態,並配置對應大小的記憶體,當記憶體滿了,再清掉一定比例的冷門棋譜就好。
記憶體管理:配置狀態陣列搭配 bitmap 來管理下面這些事情
- 哪個位置可以用
- 目前總共有多少個狀態
- 配置和刪除時更新對應位置的 bit
bitmap 中 1 表示 st_buff 使用中,反之則是可以用的空間
```c
static unsigned long st_map[ST_MAP_SZ] = {0};
void init_rl_agent(void) {
LIST_HEAD(order);
size_t node_sz = MAX_STATES * sizeof(struct xo_state);
st_buff = vzalloc(ALIGN(node_sz, PAGE_SIZE));
}
```
冷門與熱門棋譜:運用鏈結串列 `order` 新增和移動節點達到相對冷門和熱門
選用串列的話,可以定義成愈後面的節點愈熱門,反之愈開頭的節點愈冷門,當 table 有映射到狀態的話,就把狀態往後移,如果沒有狀態的話,就把新增的狀態放在開頭,刪除時直接從頭開始刪掉一定比例的節點就可以了。
如果剛 table 好找到對應的狀態,就把這個狀態移動到串列尾端,並回傳這個狀態的分數。
```c
rl_fxp *find_rl_state(const u32 table) {
const u32 key = hash_32(table, HT_BITS);
struct xo_state *st = NULL;
/* search state */
hash_for_each_possible(states, st, link, key) {
if (st->table == table) {
list_move_tail(&st->list, &orders);
return st->scores;
}
}
/* ... */
}
```
如果 table 沒有找到對應的狀態,表示沒有記錄到,可能過有出現過,但是被刪掉了,要建立一個新的狀態。
先檢查 `st_buff` 有沒有足夠的空間可以建立新的節點,用 `bitmap_weight` 可以快速得知 `st_buff` 目前已經配置幾個節點,如果滿了就清掉一定比例的節點,再從第一個可以使用的位置建立新的節點並更新 bitmap 再完成下面這些事情
- 建立節點 $\to$ 設定 table, `O` 和 `X` 在這個狀態下的初始值
- 加入到 hash table 和 linked list
- 返回新的節點
```c
static void clean_state(void) {
struct xo_state *st, *safe;
u32 pos, i = 0;
list_for_each_entry_safe(st, safe, &orders, list) {
if (i >= CLEAN_N_STATES)
break;
pos = ((uintptr_t) st - (uintptr_t) st_buff) / sizeof(struct xo_state);
hash_del(&st->link);
list_del(&st->list);
st->table = st->scores[AGENT_O] = st->scores[AGENT_X] = 0;
bitmap_clear(st_map, pos, 1);
i++;
}
}
rl_fxp *find_rl_state(const u32 table) {
/* ... */
/* check states size */
const u32 tot = bitmap_weight(st_map, ht_len);
if (tot >= (MAX_STATES - 1))
clean_state();
/* add state into hashtable */
u32 pos = find_first_zero_bit(st_map, ht_len);
st = st_buff + pos;
bitmap_set(st_map, pos, 1);
st->table = table;
st->scores[AGENT_O] =
fixed_mul_s32(INITIAL_MUTIPLIER, get_score(table, CELL_O));
st->scores[AGENT_X] =
fixed_mul_s32(INITIAL_MUTIPLIER, get_score(table, CELL_X));
INIT_LIST_HEAD(&st->list);
list_add(&st->list, &orders);
hash_add(states, &st->link, key);
return st->scores;
}
```
實作好找出狀態分數的函式後,更新 RL 模組用的 `play_rl` 和 `update_state_value`。
```c
int play_rl(unsigned int table, char player) {
u8 id = player - 1;
for_each_empty_grid(i, table) {
table = VAL_SET_CELL(table, i, player);
rl_fxp new_q = find_rl_state(table)[id];
if (new_q == max_q) {
++candidate_count;
/* ... */
}
return max_act;
}
static inline rl_fxp step_update_state_value(int after_state_hash,
rl_fxp reward,
rl_fxp next,
u8 player)
{
rl_fxp curr = reward - fixed_mul(GAMMA, next);
rl_fxp *scores = find_rl_state(after_state_hash);
scores[player] = fixed_mul((RL_FIXED_1 - LEARNING_RATE), scores[player]) +
fixed_mul(LEARNING_RATE, curr);
rl_fxp *scores = find_rl_state(after_state_hash);
return scores[player];
}
```
為了練習 API 我還有再另外實作紅黑樹版本,可用編譯來切換不同的實作。
```shell
$ make RL_USE_HT=NO
```
# 引入若干 Coroutine 顯示多場遊戲
> Commit: [55ab755](https://github.com/LambertWSJ/kxo/commit/55ab755723dbefbff35f3b15fce5307c46a5519a)
# 現階段成果

# 後續改進
- 目前的 TUI 要在夠大終端機才能完整顯示,不然會印出一大堆亂碼,應設計成像 RWD 網頁,在不同的寬高會調整 UI 排版或顯示方式。
- TUI 還是會出現一大片紅色或是有時候顯示不全,推測是因為 outbuf 到達閾值後,剛好 buffer 的最後又是寫到一半的控指碼就輸出到 stdout,暫時沒想到方法來處理這問題。
- 核心模組和使用者溝通成本降低,但是沒有做呼叫頻率的控制導致頻繁的向核心模組讀取資料,造成 System Call Overhead,有時候顯示速度過快,中間的過程沒有一步步印出來。