# 2025q1 Homework3 (kxo) contributed by < `ibat10clw` > {%hackmd NrmQUGbRQWemgwPfhzXj6g %} ## `kxo` 核心模組 `kxo` 核心模組的運作方式為透過 `mod_timer` 設置的 `timer_handler` 函式來驅動整個程式的運作。 當 `kxo` 首次被開啟時(`open_cnt` 為 1),就會啟動 timer ```c timer_setup(&timer, timer_handler, 0); static int kxo_open(struct inode *inode, struct file *filp) { pr_debug("kxo: %s\n", __func__); if (atomic_inc_return(&open_cnt) == 1) mod_timer(&timer, jiffies + msecs_to_jiffies(delay)); pr_info("openm current cnt: %d\n", atomic_read(&open_cnt)); return 0; } ``` 根據 `timer_setup` 的註解可以得知,當 timer 到期時,就會觸發 `callback` 的函式。 > /** * timer_setup - prepare a timer for first use * @timer: the timer in question * @callback: the function to call when timer expires * @flags: any TIMER_* flags * * Regular timer initialization should use either DEFINE_TIMER() above, * or timer_setup(). For timers on the stack, timer_setup_on_stack() must * be used and must be balanced with a call to destroy_timer_on_stack(). */ 在 `timer_handler` 的函式中,會根據當前對局的狀態 `win` 去決定呼叫 `ai_game` 這個函式繼續對奕,或者是產生遊戲結果 ```c if (win == ' ') { ai_game(); mod_timer(&timer, jiffies + msecs_to_jiffies(delay)); } ``` 此處 `ai_game` 透過 `tasklet_schedule(&game_tasklet);` 將兩個 AI 棋手的操作,以及繪製棋盤的函式加入 workqueue,讓 kernel thread 去執行這些任務。 ```c if (finish && turn == 'O') { WRITE_ONCE(finish, 0); smp_wmb(); queue_work(kxo_workqueue, &ai_one_work); } else if (finish && turn == 'X') { WRITE_ONCE(finish, 0); smp_wmb(); queue_work(kxo_workqueue, &ai_two_work); } queue_work(kxo_workqueue, &drawboard_work); ``` 當分出勝負的時候,則是直接呼叫繪圖相關的函式,產生要顯示的棋盤內容後將棋盤初始化。 ```c else { read_lock(&attr_obj.lock); if (attr_obj.display == '1') { int cpu = get_cpu(); pr_info("kxo: [CPU#%d] Drawing final board\n", cpu); put_cpu(); mutex_lock(&producer_lock); draw_board(table); mutex_unlock(&producer_lock); /* Store data to the kfifo buffer */ mutex_lock(&consumer_lock); produce_board(); mutex_unlock(&consumer_lock); wake_up_interruptible(&rx_wait); } if (attr_obj.end == '0') { memset(table, ' ', N_GRIDS); /* Reset the table so the game restart */ mod_timer(&timer, jiffies + msecs_to_jiffies(delay)); } read_unlock(&attr_obj.lock); pr_info("kxo: %c win!!!\n", win); } ``` 不論是 `if` 或 `else` 的執行路徑,都會再度啟動 timer,讓核心模組可以繼續運行。 ### `kxo_read` 在對奕的過程中會將渲染後的棋盤狀態寫入 `kfifo` 中,使用者空間的程式可以透過讀取 `/dev/kxo` 來取得對奕的結果。 ```c do { ret = kfifo_to_user(&rx_fifo, buf, count, &read); if (unlikely(ret < 0)) break; if (read) break; if (file->f_flags & O_NONBLOCK) { ret = -EAGAIN; break; } ret = wait_event_interruptible(rx_wait, kfifo_len(&rx_fifo)); } while (ret == 0); ``` 此段程式碼為資料讀取的部份,這裡的流程如下 1. 嘗試從 `kfifo` 讀取資料 2. 若讀取結果異常就跳出迴圈 3. 若讀到資料就跳出迴圈 4. 若為非阻塞模式就挑出迴圈 5. 讓 user space 的程式轉為 sleep 狀態直到 `kfifo` 中有資料 > wait_event_interruptible - sleep until a condition gets true The process is put to sleep (TASK_INTERRUPTIBLE) until the condition evaluates to true or a signal is received. The condition is checked each time the waitqueue wq is woken up. :::info 這邊的 `read` 有一個問題是當多個使用者空間的程式嘗試讀取 `/dev/kxo` 的時候,可能會有讀到的棋盤狀況不連續的情形,例如: `fifo:[state1, state2, state3]` 然後 process A 讀到了 `state1`,但 `state2` 卻被 process B 給讀走,此時 process A 讀到 `state3`,就會少了中間的一步 ::: ## 減少核心與使用者空間的通訊成本 首先我們觀察資料是如何被寫入 `kfifo` 的 ```c /* Draw the board into draw_buffer */ static int draw_board(char *table) { int i = 0, k = 0; draw_buffer[i++] = '\n'; smp_wmb(); draw_buffer[i++] = '\n'; smp_wmb(); while (i < DRAWBUFFER_SIZE) { for (int j = 0; j < (BOARD_SIZE << 1) - 1 && k < N_GRIDS; j++) { draw_buffer[i++] = j & 1 ? '|' : table[k++]; smp_wmb(); } draw_buffer[i++] = '\n'; smp_wmb(); for (int j = 0; j < (BOARD_SIZE << 1) - 1; j++) { draw_buffer[i++] = '-'; smp_wmb(); } draw_buffer[i++] = '\n'; smp_wmb(); } return 0; } static void produce_board(void) { unsigned int len = kfifo_in(&rx_fifo, draw_buffer, sizeof(draw_buffer)); if (unlikely(len < sizeof(draw_buffer)) && printk_ratelimit()) pr_warn("%s: %zu bytes dropped\n", __func__, sizeof(draw_buffer) - len); pr_debug("kxo: %s: in %u/%u bytes\n", __func__, len, kfifo_len(&rx_fifo)); } ``` 這兩個函式在一局對奕結束的時候會被顯式的呼叫,對奕過程中則是由 workqueue 中的 `drawboard_work_func` 來執行,`draw_board` 根據 `table` 所紀錄的棋盤狀態來渲染可以被印出的完整棋盤,渲染完成後 `produce_board` 負責將資料寫入 `kfifo`。 這裡有一個關鍵是 `ai_xx_work_func` 和 `drawboard_work_func` 是平行在執行的,也就是說若下一次要渲染棋盤時,AI 棋手尚未更新 `table`,那麼 `kfifo` 中就會有兩筆相同的資料。 ### 將繪圖函式改由使用者空間執行 > commit [f849827](https://github.com/sysprog21/kxo/commit/f849827381ac49681ecfa2862d3f547b1d1c95e6) 原本的實作將繪圖執行在核心模組,因此使用者空間讀取資料時需讀取渲染過後的棋盤,大小為 66 bytes,繪圖函式僅仰賴大小為 16 bytes 的 `table` 即可完成,若將繪圖函式移動至 `xo_user.c` 那就可以將傳輸的資料量減少至原本的約 25% 左右。 ```diff --- a/xo-user.c +++ b/xo-user.c @@ -80,7 +80,25 @@ static void listen_keyboard_handler(void) } close(attr_fd); } +static char draw_buffer[DRAWBUFFER_SIZE]; +static int draw_board(char *table) +{ + int i = 0, k = 0; + draw_buffer[i++] = '\n'; + draw_buffer[i++] = '\n'; + while (i < DRAWBUFFER_SIZE) { + for (int j = 0; j < (BOARD_SIZE << 1) - 1 && k < N_GRIDS; j++) { + draw_buffer[i++] = j & 1 ? '|' : table[k++]; + } + draw_buffer[i++] = '\n'; + for (int j = 0; j < (BOARD_SIZE << 1) - 1; j++) { + draw_buffer[i++] = '-'; + } + draw_buffer[i++] = '\n'; + } + return 0; +} int main(int argc, char *argv[]) { if (!status_check()) @@ -90,8 +108,6 @@ int main(int argc, char *argv[]) int flags = fcntl(STDIN_FILENO, F_GETFL, 0); fcntl(STDIN_FILENO, F_SETFL, flags | O_NONBLOCK); - char display_buf[DRAWBUFFER_SIZE]; - fd_set readset; int device_fd = open(XO_DEVICE_FILE, O_RDONLY); int max_fd = device_fd > STDIN_FILENO ? device_fd : STDIN_FILENO; @@ -115,8 +131,10 @@ int main(int argc, char *argv[]) } else if (read_attr && FD_ISSET(device_fd, &readset)) { FD_CLR(device_fd, &readset); printf("\033[H\033[J"); /* ASCII escape code to clear the screen */ - read(device_fd, display_buf, DRAWBUFFER_SIZE); - printf("%s", display_buf); + char table[N_GRIDS]; + read(device_fd, table, N_GRIDS); + draw_board(table); + printf("%s", draw_buffer); } } ``` `draw_board` 函式移除了用不到的 memory barrier,並且將讀取的資料大小由原先的 `DRAWBUFFER_SIZE` 改為 `N_GRIDS`,對應核心模組的修改如下 ```diff +static void produce_table(void) +{ + unsigned int len = kfifo_in(&rx_fifo, table, sizeof(table)); + if (unlikely(len < sizeof(table)) && printk_ratelimit()) + pr_warn("%s: %zu bytes dropped\n", __func__, sizeof(table) - len); + + pr_debug("kxo: %s: in %u/%u bytes\n", __func__, len, kfifo_len(&rx_fifo)); +} + ``` 新增了將 `table` 寫入 `kfifo` 的函式 ```diff @@ -176,13 +186,10 @@ static void drawboard_work_func(struct work_struct *w) } read_unlock(&attr_obj.lock); - mutex_lock(&producer_lock); - draw_board(table); - mutex_unlock(&producer_lock); /* Store data to the kfifo buffer */ mutex_lock(&consumer_lock); - produce_board(); + produce_table(); mutex_unlock(&consumer_lock); ``` 同時移除不再使用的 `draw_board` 並且把原先要寫入 `kfifo` 的函式替換成 `produce_table`。 在對奕結束時,顯式的函式呼叫也採取相同的處理 ```diff @@ -347,13 +354,9 @@ static void timer_handler(struct timer_list *__timer) pr_info("kxo: [CPU#%d] Drawing final board\n", cpu); put_cpu(); - mutex_lock(&producer_lock); - draw_board(table); - mutex_unlock(&producer_lock); - /* Store data to the kfifo buffer */ mutex_lock(&consumer_lock); - produce_board(); + produce_table(); mutex_unlock(&consumer_lock); ``` 到此便將繪圖的工作由核心模組轉至 `xo_user` 的程式執行。 :::danger 提供數學分析 ::: ## 使用 bitops 進一步減少通訊成本 > commit [6bba946](https://github.com/ibat10clw/kxo/commit/6bba9464a8572a159e116b50c0454bb87d6bcd7a) 目前的實作將整個 `table` 寫入 `kfifo`,但其實只需要寫入當前要改變的格子狀態,若有 t - 1 這個時間點的棋盤狀態,那麼加入要改變的狀態 x ,就可以成為時間點 t 的棋盤狀態。 考慮棋盤格數為 `n` 的時候,需要 $log(n)$ 個 bits 來表示要對哪個格子操作,另外還需要 2 個用於表示狀態的 bit 去表示當前是 `O` 還是 `X` 和是否對奕結束。 ```diff --- a/game.h +++ b/game.h +struct command { + unsigned char pos : BOARD_SIZE; + unsigned char turn : 1; + unsigned char reset : 1; +}; + ``` 新增的 `command` 結構體,之後當 AI 棋手完成移動時,將改將 `command` 寫入 `kfifo` ```diff --- a/main.c +++ b/main.c +#define CMD_INIT(name, pos_val, turn_val, reset_val) \ + struct command name = { \ + .pos = (pos_val), .turn = (turn_val), .reset = (reset_val)} +static void send_cmd(int pos, int turn, int reset) { + CMD_INIT(cmd, pos, turn, reset); + unsigned int len = + kfifo_in(&rx_fifo, (const unsigned char *) &cmd, sizeof(cmd)); + if (unlikely(len < sizeof(cmd)) && printk_ratelimit()) + pr_warn("%s: %zu bytes dropped\n", __func__, sizeof(cmd) - len); pr_debug("kxo: %s: in %u/%u bytes\n", __func__, len, kfifo_len(&rx_fifo)); } ``` 此處為用於建立 `command` 的巨集,以及將 `command` 寫入 `kfifo` 的函式。 當 AI 棋手完成操作時,就可以產生對應的 `command` 並寫入 `kfifo` ```diff @@ -217,8 +149,13 @@ static void ai_one_work_func(struct work_struct *w) smp_mb(); - if (move != -1) + if (move != -1) { WRITE_ONCE(table[move], 'O'); + mutex_lock(&consumer_lock); + send_cmd(move, 0, 0); + mutex_unlock(&consumer_lock); + wake_up_interruptible(&rx_wait); + } WRITE_ONCE(turn, 'X'); WRITE_ONCE(finish, 1); @@ -251,8 +188,13 @@ static void ai_two_work_func(struct work_struct *w) smp_mb(); - if (move != -1) + if (move != -1) { WRITE_ONCE(table[move], 'X'); + mutex_lock(&consumer_lock); + send_cmd(move, 1, 0); + mutex_unlock(&consumer_lock); + wake_up_interruptible(&rx_wait); + } ``` 當對奕結束時,則是將 `reset` 設為 1,令使用者空間的程式得以重置棋盤。 ```diff @@ -356,7 +296,7 @@ static void timer_handler(struct timer_list *__timer) /* Store data to the kfifo buffer */ mutex_lock(&consumer_lock); - produce_table(); + send_cmd(0, 0, 1); mutex_unlock(&consumer_lock); ``` 在使用者空間的繪圖邏輯由一個事先宣告的 `board` 為基底 ```diff +char board[] = + "\n\n" + " | | | \n" + "-------\n" + " | | | \n" + "-------\n" + " | | | \n" + "-------\n" + " | | | \n" + "-------\n"; +static void update_board(int pos, char val) +{ + unsigned row = pos >> 2; + unsigned col = pos & 3; + unsigned index = 2 + (row << 4) + (col << 1); + board[index] = val; +} +static int reset_board() +{ + for (int i = 0; i < N_GRIDS; ++i) + update_board(i, ' '); +} ``` :::danger 用更少的 string literal 繪製棋盤 ::: 當讀取到 `command` 的時候,將操作的位置傳入函式中,可以透過 bitwise 運算取得在棋盤中對應的 index 其中 `>>2` 相當於除以 4,因為 row 大小為 4,`&3` 的操作則相當於 `%4`,可以計算出屬於哪一個 column。 但我們的棋盤不是單純的 `char board[16]`,其中還參雜了換行符號以及邊框的字元,因此還需要再一層的加工 * `2 + `: 用於跳過最前面的兩個換行符號 * `<< 4`: 用於跳過兩個 `board` 的 row,可越過中間 `------` 的部份 * `<< 1` 用於跳過一個 `board` 的 col,可越過中間的 `|` 到此繪製棋盤的部份就完成了。 最後是讀取 `command` 的修改 ```diff - char table[N_GRIDS]; - read(device_fd, table, N_GRIDS); - draw_board(table); - printf("%s", draw_buffer); + struct command cmd; + read(device_fd, &cmd, sizeof(cmd)); + update_board(cmd.pos, cmd.turn ? 'X' : 'O'); + printf("%s", board); + if (cmd.reset) { + reset_board(); + ``` 此處同樣可以使用 `read` 讀取對應的 `fd`,之後存取 `command` 的成員以獲得要操作的內容,並且在接獲 `reset` 時,將棋盤重置。 從最原先整個 `draw_buffer` 寫入 `kfifo` 需要 66 bytes 的資料量,現在降到僅需要 1 個 byte,就可以完成棋盤的繪製。