# Linux 核心專題: 改進 kxo > 執行人: weiso131, Ian-Yen > [Github](https://github.com/weiso131/kxo) > [專題解說錄影](https://youtu.be/xeFu7JVGMrY) > [專題發表直播](https://www.youtube.com/watch?v=AI8shjBfUoE&t=11135s) ### Reviewed by `tsaiiuo` 在減少 lock 依賴那邊,可以量化新增過後的效能影響嗎? >原本的標題寫 `kxo 如何減少 lock 依賴並支援多使用者`,但實際內文主要是探討如何調整並行機制以支援多使用者,標題可能導致誤解,已修改。至於效能影響,由於這邊探討的是「如何支援多使用者」這個原本不存在的功能要如何實現,追求的並非效能提昇,直接將其與原本的單使用者實作比較並不合理,因此這邊無法提供量化的效能影響。 [name=weiso131] ### Reviewed by `HeLunWu0317` 請問關於update_state_value()與原公式 TD-learning的不同,會不會可能是這個 state 更新是需要轉移視角的,和 negamax 原理類似,減號是要作為視點轉換, next 會變成對弈中的另一方視角。 >你說的沒錯,稍早發的 [issue](https://github.com/jserv/ttt/issues/44) 也收到類似回覆,確實是我少考慮了這個部份。 [name=weiso131] ### Reviewed by `Hande1004` 關於[通訊方法討論](https://hackmd.io/XWDjMx0bTmmNIWvlNHsvKw?view#%E9%80%9A%E8%A8%8A%E6%96%B9%E6%B3%95%E8%A8%8E%E8%AB%96)這部分想請問雖然把棋盤所需的 byte 數降低至 4 bytes,但大多情況 kernel 端傳給 user 端是用一個 page 為單位,也就是就算降低棋盤的 byte 數,傳輸的大小成本依然一樣,那請問實際上有提高甚麼樣的效率或真正節省到哪部分嗎? >我嘗試利用 bootlin 追蹤 `linux v6.15.4` 中 x86 (大部分人電腦的處理器應該都是 x86 架構)的 `copy_to_user` 實作,發現實際上進行複製的函式為 [`copy_user_generic`](https://elixir.bootlin.com/linux/v6.15.4/source/arch/x86/include/asm/uaccess_64.h#L119) ,其中,若存在 FSRM feature ,會使用 `rep_movs` 這個 x86 指令,根據 [`felixcloutier.com`](https://www.felixcloutier.com/x86/movs:movsb:movsw:movsd:movsq) 的描述,它會一次複製一個 byte 。關於「 kernel 端傳給 user 端是用一個 page 為單位」這件事情,我沒有找到相關資料佐證,嘗試追蹤 `copy_to_user` 的實作也發現它在 x86 架構上並非一次複製一個 page ,希望同學能提出相關的參考資料來說明。 至於降低棋盤 bytes 數是否有效率提昇或真正節省到哪部分?效率提昇的部份目前缺乏量化分析,然而,儘管經過測試後發現它真的能帶來效能提昇,對使用者而言可能不明顯,因為目前最大的效能瓶頸是 MCTS 演算法。對於我所使用的「用 1 bytes 傳遞步數的方法」,除了能減少傳遞的 bytes ,這個方法也直接的提供使用者下棋的先後順序,不用做額外的處理,這讓使用者維護棋盤歷史資料的成本降低。[name=weiso131] ## 任務描述 以[第三份作業](https://hackmd.io/@sysprog/linux2025-kxo)為基礎、搭配課堂討論和觀摩學員的成果,提出 kxo 的改進方案並落實,至少該涵蓋以下: * 允許核心模組切換 negamax, reinforcement learning (RL), Monte Carlo tree search (MCTS), TD learning 等演算法 (可追加更多演算法的實作),並得以在使用者空間進行訓練 * 縮減使用者和核心空間通訊的成本 * 改進棋盤展現的方式,並修訂核心通訊介面 * 支援多使用者並強化安全機制 * 在不犧牲棋力的前提,降低 CPU 使用率 * 探討 lock-free 的設計 * 充分展現多核處理器的任務執行狀況,善用 Ftrace 和 eBPF 一類的工具 ## 允許核心模組切換多種演算法 ### 切換算法的 ioctl 核心通訊界面的 `GET_USER_ID` ### 利用執行參數調整使用的演算法 >[de8614b](https://github.com/weiso131/kxo/commit/de8614b487a9952a178febf3a1beac10f98ed139) #### `Userspace` 結構 >[4c2eece](https://github.com/weiso131/kxo/commit/4c2eecedb009d3ebe0724e224129432e392a9be8) >[5548f1a](https://github.com/weiso131/kxo/commit/5548f1a43c6cf277ba7fd30a81f19fa7eb240644) ```c typedef struct userspace { int device_fd; uint32_t board; unsigned char user_id; PlayerPermission player1, player2; char turn; History history; } Userspace; ``` 原本 `xo-user.c` 與核心模組互動的變數被四散在各處,使用一個結構體將其整合,為之後要支援的`參數調整演算法`與`使用者端的多使用者功能`做準備,具體功能如下 - Userspace 儲存與核心互動的必要資訊(device_fd, user_id),棋盤當前的狀態 (board) 與歷史紀錄 (history) - 提供 `mod_control` 與 `user_control` 兩個函式,讓核心模組的 read, write 操作更直觀,同時切換 userspace 中 turn 的狀態 - 提供 `get_permission` 取得當前使用者 Todo: 1. turn 的維護應該要交給核心 #### 實現參數切換演算法 >[de8614b](https://github.com/weiso131/kxo/commit/de8614b487a9952a178febf3a1beac10f98ed139) ### [ttt](https://github.com/jserv/ttt) 的 TD learning 整合 #### 原本實作可能的錯誤 - 更新公式錯誤 TD_learning 的更新公式: $V(S_t) \leftarrow V(S_t) + \alpha \left[ R_{t+1} + \gamma V(S_{t+1}) - V(S_t) \right]$ 但 `update_state_value` 實作如下 ```c static float update_state_value(int after_state_hash, float reward, float next, rl_agent_t *agent) { float curr = reward - GAMMA * next; // curr is TD target in TD learning // and return/gain in MC learning. agent->state_value[after_state_hash] = (1 - LEARNING_RATE) * agent->state_value[after_state_hash] + LEARNING_RATE * curr; #if MONTE_CARLO return curr; #else return agent->state_value[after_state_hash]; #endif } ``` 展開會變成 $V(S_t) \leftarrow V(S_t) + \alpha \left[ R_{t+1} - \gamma V(S_{t+1}) - V(S_t) \right]$ 然而,在 xo 屬於 minimax 遊戲,應該盡可能降低對手的得分,一開始沒考慮到這件事情,誤以為它是 bug 。 #### 整合工作 >[dde5e70](https://github.com/weiso131/kxo/commit/dde5e7025f619a3467a5bd4a763e70e7ab6e94c8) - 移除多餘函式 - 勝負判斷的功能已經由核心模組維護,因此 `game.h` 中的 `check_win` 不再需要 - `drawboard` 已由 xo-user 的函式維護,因此可以移除 - `userspace.h` 已經提供 `random_get_move` 供 `get_action_epsilon_greedy` 使用,這也導致 `get_available_moves` 不再需要 - 將 `char table[16]` 改成使用 `uint32` 的 `board` - 為了搭配使用 `Userspace` 中的 `board` - 使用 `user_control` 實現與核心的 write 互動 最後訓練工作在 `xo-train.c` 進行, `make` 之後執行 `./xo-train` 即可開始訓練 ### negamax 多使用者會出問題 某次開兩個終端機與 kxo 模組互動,兩邊都用到 `NEGAMAX` ,結果核心就發生了 Segment fault , 觀察 negamax 有大量全域變數使用,推測可能是因為錯誤的寫入發生問題 #### kxo 的 negamax_predict 實作大致流程 1. 初始化 `history_score_sum` 與 `history_count` 2. 對各種深度進行 negamax (到 `MAX_SEARCH_DEPTCH` 之前都是用來更新 history_score_sum 與 history_count) 3. negamax 1. 若遊戲結束,計算 score 並回傳 2. 利用 `zobrist_get` 搜尋這個盤面是否已經被計算過,用來節省計算 3. 對 `available_move` 進行排序 (`history_score_sum` 和 `history_count` 用在這裡) 4. 對每個 `available_move` 搜尋 5. 更新 table 與 hash_value,對下一步進行 `negamax` 遞迴, `negamax` 需要最小化對方得分並最大化我方得分,因此加上負號 6. 遞迴結束之後,還原 table 與 hash_value ,更新 `history_score_sum`, `history_count`, `best_move` 4. `zobrist_clear()` 釋放 `hlist_node` 觀察上述過程可以發現,實際上這個演算法用到的所有全域變數(`hash_value`, `history_score_sum`, `history_count`, `zobrist_table`, `hash_table`)都只是拿來暫存資料,結束之後就會還原成最初的樣子,可以將其移動至 `stack` 或是,放在 `heap` ,結束時再釋放 #### 實作改動 > [ad94247](https://github.com/sysprog21/kxo/commit/ad94247b43205138eb73de024c9e7fb42b584bfe) > [ba9d427](https://github.com/sysprog21/kxo/commit/ba9d4276aad1ee2b7d4175c1cb7608867d16632d) > [f0871a1](https://github.com/sysprog21/kxo/commit/f0871a190b48a4f21d50664733092306e95db03d) 1. 消除全域變數依賴,使 negamax 支援多使用者同時執行。 2. 引入 context 結構體 negamax_context_t 封裝所有需要的狀態。 3. 使用者自行管理 context 生命周期,記憶體由使用者分配與釋放。 4. 確保遞迴過程內部使用的狀態皆封裝在 stack 或 heap 中,保證 thread-safe。 5. 核心函數說明 `negamax_context_t *ctx` 傳遞 所有函數(如 `negamax`, `negamax_sort`, `zobrist_get` 等)都改為顯式接受 `context` 指標 `ctx`,完全移除任何全域狀態存取。 `negamax_sort` 使用歷史評分來排序 move,以加快剪枝效率。評分資料從 ctx->`history_score_sum` 中取得,避免干涉他人執行。 `negamax` 主遞迴搜尋函式,採用 `alpha-beta` 剪枝 + `zobrist` 快取。所有表格與歷史資訊皆由 `ctx` 提供與還原。 ## 縮減使用者和核心空間通訊的成本 ### 通訊方法討論 有兩種方法可以有效的傳遞棋盤資訊 1. 傳遞當下完整棋盤資訊 一個位子有三種狀態 (``'o', 'x', ' '``),有 16 個位子就會有 $3^{16}=43046721$ 種組合,需要使用 4 bytes 來傳遞 2. 只傳遞一步棋的資訊 一步棋的資料包含 1. 誰下的,有 2 種可能 2. 下在哪,考慮第一步的情況有 16 種可能 3. 是否結束,有 2 種可能 4. 是否和局,有 2 種可能 總共需要 7 bits 來表達一步棋的資料 一步棋的資料顯然可以用更少的 bytes 做資料傳遞,但是需要使用者維護過去的棋盤資訊。不過,如果能假設使用者會自己維護歷史資料,在實現 `ctrl + q` 結束並印出歷史資料的功能就會變得非常簡單,核心模組的 namespace 不需要額外維護歷史資料,也不再需要使用 sysfs (這導致沒辦法將模組變成全域可讀可寫),因此最後選擇傳遞一步棋的資訊 ### 詳細討論通訊內容 考慮多使用者的情形,雖然可藉由 task_id 區分不同 task 的要求 但考慮一個 task 多次開啟 kxo ,仍需要傳遞使用者 id 來區分身份 此時 read 所需的 buffer 大小就是 `max(id 所需 bit, 64)` write (使用者端下棋) 所需的 buffer 大小就是 `id 所需 bit + 64` ## 改進棋盤展現的方式,並修訂核心通訊介面 ### 在使用者端繪製棋盤 ### 紀錄棋盤歷史紀錄 > [e0111e3](https://github.com/sysprog21/kxo/commit/e0111e31077e40bbc153a3a5de6e8ebe859f0bc5) > 將原本以 `printf("%s", display_buf)` 輸出整段字串的作法,改為使用 `display_board()` 函數解析單一位元編碼的棋步資訊並以棋盤形式顯示。此改動的主要目的是為了減少使用者空間與核心空間之間的通訊成本,僅傳遞一個位元編碼即可描述一次落子動作,並在使用者空間進行畫面重建,同時結合歷史記錄模組進行棋局狀態更新。 display_board(char table) 的輸入參數 table 為一個 8-bit 資料,透過位元操作取得如下資訊: `(table & 0xF)` 表示落子位置(0~15) `(table >> 4)` & 1 表示玩家(1 為 X,0 為 O) `(table >> 5)` 若為 1 表示重設棋盤(新一局開始) 棋盤狀態由 board 這個 32-bit 變數管理,其中高 16 位記錄格子是否有棋子,低 16 位記錄該棋子為 X 或 O。顯示邏輯使用雙層迴圈輸出 4x4 棋盤,每一格以 ASCII 字元顯示對應棋子,並加上適當分隔符號與橫線來模擬棋盤外觀。 在傳入帶有重設旗標的落子資料時,會清空棋盤並透過 history_new_table() 建立新的歷史節點。此方式能夠減少核心回傳資料的長度,也將顯示 邏輯移至使用者空間。 #### 實作歷史紀錄 > [f2bf83f](https://github.com/sysprog21/kxo/commit/f2bf83fdded8db272cd26813e40eab1935737d3c) > [de08cd5](https://github.com/sysprog21/kxo/commit/de08cd58eab5515ea5bb95684c95a17d9542f8e9) > [0a43687](https://github.com/sysprog21/kxo/commit/0a436870f5c81051a4d194d50314acc24e9c0cd9) history 模組用來記錄使用者的操作過程,例如下棋或輸入指令。實作上,使用一個 linked list(鏈結串列)來串接多個 history 節點,每個節點負責記錄一段操作序列。 一開始會建立一個初始節點,作為整段歷史的開頭與目前記錄的對象。每次操作都會被壓縮成 4 bits,依序儲存在節點的 value 欄位中,並用 count 記錄目前已存幾筆。 當一個節點記錄的數量達上限(因為 value 是 64 位元,一次最多存 16 筆),或者邏輯上需要分段時,系統會建立新的節點接到串列後面,並移動 current 指標指向它。若整體節點數超過最大限制(128),會釋放最舊的節點以控制記憶體使用。 列印歷史時,會從最早的節點開始,逐筆解碼其中的操作並輸出。釋放歷史時,則會一路釋放整條 linked list,避免記憶體洩漏。 ### ctrl + P 與 ctrl + Q 控制 #### 刪除 kxo_state 由於棋盤歷史紀錄已交由使用者端儲存,只要持續執行 `read` 操作, `ctrl+P` 暫停的功能可以透過控制是否繼續繪製新的棋盤來實現,不需要 `attr_obj.display` 來決定是否傳送新的棋盤。而 `ctrl+Q` 的功能其實可以單純的結束迴圈並關閉 kxo ,核心不需要針對 `ctrl+Q` 進行額外的處理,因此 `attr_obj.end` 也沒有保留的必要。至於 `attr_obj.resume` 原本在 `main.c` 就沒有被使用。 此外,在 [\[PATCH\] sysfs: tightened sysfs permission checks](https://www.spinics.net/lists/linux-next/msg32846.html) 中, sysfs 預設不開放 other 的寫入權限,這沒辦法單純透過調整核心模組程式碼來更改,這對於核心模組而言可能是不必要的限制。 考慮上述的情況,應該將 kxo_state 移除。 >[2c63b8e](https://github.com/weiso131/kxo/commit/2c63b8e942bcfebde2ab24d1aeee2333171a7baf) >[7047748](https://github.com/weiso131/kxo/commit/70477489930f340118673de2f071d9ffe65ad261) >[43a6056](https://github.com/weiso131/kxo/commit/43a60562dac952cf9853acd4d2626b403c3c34db) ### 實作 poll 改進 kxo 的體驗 [關於 select 的探討](https://hackmd.io/@weiso131/linux_select) 原本的 kxo 因為沒有實作 poll 的功能,導致 select 不管 kxo 的 read 是否會阻塞,都當作他是 ready (參考 [linux/poll.h](https://github.com/torvalds/linux/blob/master/include/linux/poll.h#L78)) #### 嘗試貢獻原本的 kxo ```c static __poll_t kxo_poll(struct file *filp, struct poll_table_struct *wait) { __poll_t mask = 0; poll_wait(filp, &rx_wait, wait); if(kfifo_len(&rx_fifo)) mask |= EPOLLRDNORM | EPOLLIN; return mask; } ``` - `poll_wait` 會建立一個 `wait_entry` 並加入 `wait_head` ,當資源就緒時,核心模組使用 `wake_up_interruptible(&rx_wait);` 即可喚醒 `select` 的睡眠(詳細參考 [select 與 device driver 的關係](https://hackmd.io/@weiso131/linux_select)) - `EPOLLIN` 可參考 [`POLLIN_SET` 的定義](https://github.com/torvalds/linux/blob/master/fs/select.c#L459) 的定義與 [`poll(2)`](https://man7.org/linux/man-pages/man2/poll.2.html) 有關 `POLLIN` 的使用 在 `ret = wait_event_interruptible(rx_wait, kfifo_len(&rx_fifo));` 之前使用 `pr_info` 印出訊息觀察 `poll` 是否有效 ```diff + pr_info("kxo: read_wait\n"); ret = wait_event_interruptible(rx_wait, kfifo_len(&rx_fifo)); ``` 在註冊 `poll` 之前, `sudo ./xo-user` 並 `sudo dmesg | grep read_wait` ,可觀查到 `kxo: read_wait` 在註冊 `poll` 之後,在只執行一個 `xo-user` 的情況下, `sudo dmesg | grep read_wait` 會沒有任何訊息,這代表 `poll` 實作生效, `select` 正常運作 然而,若執行兩個以上的 `xo-user` ,可能會發生兩個執行緒 `select` 檢查時資源皆就緒,但只有其中一個 `read` 取得資訊,另一個則進入等待, `sudo dmesg | grep read_wait` 會看到 `kxo: read_wait` 。但這其實是符合預期的, `select` 只能保證檢查的當下資源是否能夠存取,之後的先後順序它本來就沒辦法處理 >[Implement poll to make select work correctly](https://github.com/sysprog21/kxo/pull/24) #### 修改當前的 kxo 由於我的 kxo 要支援多使用者,要實作好的 `poll` 會比較麻煩 由於 `select` 最多只能藉由 `current` 取得 `tid` , 沒辦法每個 `user_id` 追蹤,需要藉由使用者端利用迴圈檢查當前執行緒的所有使用者,因此, `read` 必須變成非阻塞操作。雖然在使用者端做迴圈檢查有點浪費,但只要確保它的時間不會造成明顯延遲即可,而且這個操作也不太可能比 `mcts` 造成的影響嚴重,之後在使用者端實現多使用者後可以做時間測量。 為了確保 `select` 能在任意使用者就緒時被喚醒, `user_data` 需要把原本的 `wait_queue_head` 換成指標,並指向 `tid_data` 中的 `wait_queue_head` ,每次呼叫 `poll` 時會利用 `find_tid_data` 找到對應的 `tid_data` 做檢查 觀察到原本的 `kxo_read` 存在 `file->f_flags & O_NONBLOCK` 的檢查,這能透過使用者端使用 `fcntl` 來操作 `f_flags` 的值,這邊正是檢查是否有非阻塞的 flags ,這點出了是否要保留阻塞版本的 `read` 供使用者選擇。 若需要每個使用者的 `read` 都做到阻塞操作,需要恢復原本 `user_data` 中 `wait_queue_head` 的使用,然而在使用者端多使用者的情況,讓 `read` 阻塞不是個好主意,阻塞操作只有在使用者端保證只有一個使用者的情況下有效益(不用額外檢查是否成功),因此直接使用各個 `tid_data` 的 `wait_queue_head` 即可。 然而實際修改會遇到一個問題,因為 `wait_queue_head` 被放在 `tid_data` ,當某個執行緒 close kxo 時, `tid_data` 會直接被釋放,這導致還在 `workqueue` 內的 `ai_work_func` 使用 `wake_up` 時,會存取到無效指標。若使用 `unuse` 作為判斷是否可以使用 `wake_up` 的依據,可能會發生檢查當下可用,但 `wake_up` 時已經變成無效指標。 目前的解法是藉由延後釋放來避免這個問題,在 `user_list_queue_work` 將 `user_data` 移出 `user_list` 時,將 `user_cnt` ,也就是計算 `tid_data` 有多少個使用者的計數器減 1 ,當它變成 0 時,就能釋放 `tid_data` 。 由於這個實作會遇到循環引用的問題( `user_data` 下面需要 `tid_data` 的指標),因此將相關結構定義移動到 `type.h` ,方便管理 >[ee76c5a](https://github.com/weiso131/kxo/commit/ee76c5ab85d90e84bae638829562a087c5bd4907) ### 利用 time_handler 定時更新棋盤 Todo ### 同時顯示多個棋盤 ### 核心通訊介面 #### 列表 - ioctl get_id - ioctl set id permission - ioctl release id - read - write - open - release #### permission 用一個 byte 儲存權限資料 前 4 個 bit 代表 player2 要使用的權限 前 4 個 bit 代表 player1 要使用的權限 若權限設為 `USER_CTL` ,輪到對應的玩家時,就能使用 write 操作,但不能使用 read 操作。 設成其他權限時,會使用對應的演算法,此時不能使用 write 操作但可以使用 read 操作。 相關的定義在 `kxo_ioctl.h`。 #### ioctl get_id >[879d5fb](https://github.com/weiso131/kxo/commit/879d5fb8c711048b0c785338661f9b42e6265a7b) - 取得一個全新的 user id - buffer 內部要放置 permission 的設定 #### ioctl set id permission - 更改某個 id 的 permission ,達成動態切換 ai 算法 - 需要 lock #### ioctl release id - 釋放某個 id 的 user data #### read - 根據使用者端傳送的 buffer 中的 user_id ,取得對應 user 棋盤的更新 - 若 user_id 不存在,會回傳 `EFAULT` - 若輪到 `USER_CTL` 權限的玩家,會回傳 `-EPERM` - 複製一步棋的資料到使用者傳送的 buffer ,格式參考`縮減使用者和核心空間通訊的成本` #### write >[6aa6fc7](https://github.com/weiso131/kxo/commit/6aa6fc77bb6e610d2be51b3132f8d335c5c8e0c9) - 使用者端下棋 - 若 user_id 不存在,會回傳 `EFAULT` - 若當前輪到的玩家不能由使用者端操控,會回傳 `-EPERM` - 若下的位置非法,會回傳 `-EPERM` - 將移動資料複製到使用者傳送的 buffer ,格式與 `read` 操作相同 #### open - 同個 pid 的 thread 只能 open 一次,避免 release 出問題 #### release - 釋放該 pid 下的所有 user_data ## 探討 lock-free 的設計 ### lock-free vs lock-less vs wait-less [並行程式設計: Lock-Free Programming](https://hackmd.io/@sysprog/concurrency/%2F%40sysprog%2Fconcurrency-lockfree#A-lock-free-vs-wait-free) - wait-free 是 non-blocking 之中強度最高的,它保證所有的指令可以 bound 在某個時間內完成,也就是所有的執行緒經過一段足夠的時間後,一定會有所進展 - lock-free 相較於 wait-free 稍微弱一點,它允許部份執行緒被暫停,但仍然保證整體程式的其他部份會有進展。最差的情況下,就算只有一個執行緒在執行,其他所有執行緒都在等待,仍然屬於 lock-free - lockless: 避免使用 lock 而能夠安全無誤地操作共用資料,不使用 mutex lock (e.g. busy waiting),不代表就是 lock-free - obstruction-free non-blocking 中最弱的,僅保證在沒有競爭情況下,執行緒可以持續運作,倘若發生競爭,則 obstruction-free 無法提供任何保證。換言之,可能會發生 livelock ## 支援多使用者並強化安全機制 ### 棋盤狀態 - 核心模組: 儲存當前棋盤與相關狀態 - 使用者: 儲存當前棋盤狀態、所有棋局的移動紀錄 - 棋盤資料傳遞: 誰下了哪一步棋 在我原本的 kxo 專案,為了讓不同的使用者觀察到同一個盤面,且在任一使用者按下 Ctrl + Q 時同步輸出紀錄,移動紀錄是被存放在核心的。 然而,現在的情況是考慮多個使用者獨立的狀態,而移動紀錄也只會被存取一次,那將移動紀錄存放在核心空間就顯得不必要,而且移動到使用者端會有以下優點: 1. 在任何使用者按下 ctrl + Q 之後,就不需要再跟核心模組有任何互動,這能減少核心模組的工作負擔 2. 減少核心 heap 空間浪費 3. 讓使用者端保有棋盤紀錄,能讓資料傳遞只需要 `誰下了哪一步棋` 此時核心模組的主要功能就是裁判,詳細功能如下: 1. 檢查新的這步棋是否合法 2. 判斷勝負 3. namespace 4. 使用者身分驗證 第一個功能考慮到之後需要做到 `人工智慧程式可指定運作於使用者/核心層級` 從使用者傳過來的移動不能保證是合法的,因此有驗證的必要 第二個功能是原本就存在的功能 第三、四個功能會獨立出來討論 ### 使用者身分驗證 [參考 `rota1001`](https://hackmd.io/Tcy6NPtnQiubDGyrRUnSdw#rota1001) 最直觀的想法 - 實作一個 `ioctl` 讓使用者決定一些資訊,同時回傳 `user_id` 給使用者 如果有攻擊者反覆呼叫 ioctl ,這個系統會很容易被灌爆記憶體? - 若在建立 id 時就分配 namespace ,明顯會被灌爆 - 若在要求第一步的時候分配 namespace ,攻擊者同樣可以只呼叫第一步再進行 ioctl 跟 `rota1001` 討論 我提出可能可以藉由得知 task id 來作為 ban 使用者的依據 他也認為這個解法可能可行,只要能夠得知哪個 process 建立了一堆 child process 但他也提出這個問題應該是可以不要考慮的,因為 kxo 的使用者想搞事,消耗的也是自己的資源 後來想想,若在這個前提下能拿到 task id 等等跟 `task struct` 綁在一起的參數,就不用另外生成核心模組的 `uid` 然而,在 5/20 與老師下課的討論時,問到作業要求引入 corutine 的意義,他希望我們可以利用 corutine 做到在同個畫面中顯示多個棋局,在此情況下,一個 thread 就可能有多個 user ,然而一個 thread 就只有一個 task id ,在此情況下仍要引入其他機制來辨識使用者身份。 ### kxo 如何調整並行的機制以支援多使用者 原先的 work_function, tasklet_function, time_handler 的設計並沒有考慮不同使用者的問題,要讓它們能夠正確執行,必須進行修改。 #### work_function 原先的 `work_function` 分成 `ai1`, `ai2`, `drawboard` 並且在全域存取 `table` 與 `turn` 要處理多使用者必須將棋盤相關資訊放在該使用者對應的結構 然而, work_func 沒辦法藉由 queue_work 的時候取得參數 觀察 work_func 的格式 ```c void work_func(struct work_struct *w) ``` 它會傳遞 `work_struct *` ,也就是 work item 作為參數 因此只要把 `work_struct` 放進 `user_data` ,就能在 `work_func` 內利用 `container_of` 取得 `user_data` 的位址 `user_data` 內的 `work_struct` 可利用 `INIT_WORK` 進行初始化 觀察原本的 `ai_one_work_func` 與 `ai_two_work_func` 的程式可發現,它們的差異只有使用的 AI 算法和棋子的圖案相反,剩下的實作邏輯完全一樣 考慮之後需要動態切換 ai 演算法,實作一個通用的 ai_work_func 然後存取 `user_data` 當下的資訊決定輪到誰比較合理 至於 `drawboard` 這個函式,其原本的功能是繪製出用來顯示的棋盤,並放入 kfifo ,然而,在往後的修改中,需要放入 kfifo 的資訊只有下了哪一步棋 ,如果能跟 `ai_work_func` 放在一起就能直接取得結果,不用額外的 buffer 存放。 `drawboard` 還有另外一個重要的功能,就是 `wake_up_interruptible(&rx_wait);` 使用者在 `read` 的時候,若 kfifo 內沒有東西,會進入睡眠狀態,直到 `wake_up_interruptible(&rx_wait);` 被呼叫,此時再嘗試取得 kfifo 的內容。 在多個使用者的情況下,若共用一個 `rx_wait` 會導致多個 `read` 被頻繁喚醒,但只有部份的 `read` 能真正結束等待,比較合理的作法是讓 `user_data` 也保有一個 `rx_wait` #### time_handler 原本的 time_handler 負責的是確認棋盤時否分出勝負 來決定要重置棋盤,還是呼叫 tasklet_function 將 work_item 放進 workqueue 然而,判斷是否分出勝負以及重製棋盤這兩個功能,其實可以在 `work_func` 完成,若保留在 time_handler 還會有兩個缺點 1. 每個 user 都會需要 consumer, producer lock 來維護 :::info 在有多個 cpu 的情況下, time_interrupt 與 work 可能發生在不同 cpu ,導致 racing 嗎? ::: 2. 隨著使用者越來越多, time_handler 的執行時間會變更長,但 time_interrupt 是沒辦法被排程的,因此執行時間不應該那麼長 因此,原本的 time_handler 的功能只有呼叫 tasklet_function 會被保留 #### tasklet_function 原本的 tasklet 會需要一個 finish ,確保下棋完成 否則可能會出現連下兩步的情況 然而,只有一種 `work_item` 則沒有這個問題 由於 tasklet 物件不會被重複排列的特性,可以確保系統不需要反覆的存取所有 work 並且嘗試放進 workqueue 為了方便存取,可以利用 `hlist` 將所有 `user_data` 串起來 :::danger 注意用語: access 是「存取」,而非「訪問」(visit) ::: 若現在輪到使用者下棋,則不排入 work_queue 除此之外,在多使用者的情況下,它還需要做到回收使用者資料的功能 在原本的程式中,只有單個使用者的情況下 release ,可以無腦的把 work_queue 清空 然而,在多使用者的情況下不能這麼做,而且需要釋放不再使用的使用者 若需要釋放的使用者已經不在 work_queue 中,可以直接將它釋放 反之,則等待它完成工作,此時就與上述情況相同 這會需要一個變數紀錄它是否已經不再使用 然而,在實際考慮到 `work_queue API` 之後,發現直接在 `tasklet_function` 內直接進行 `release` 是不可行的。 `work_queue` 提供 `cancel_work_sync` ,可確保在這個函式回傳之後,可以安全的釋放 `work_item` ,然而,這個函式可能會進入睡眠, tasklet function 是不允許睡眠的。 因此,應該要將 release 的工作放在其他地方,可藉由將需要 release 的 user_data 放入一個 trash_list ,再藉由其他方法釋放記憶體 ### user_data >[b7fdce5](https://github.com/weiso131/kxo/commit/b7fdce5b60da6ccd89857f78bb0b474ee4017675) >[f0242a7](https://github.com/weiso131/kxo/commit/f0242a7c4e71187becbb174f6efef59c5dbc31c3) ```c typedef struct user_data { char table[16]; char turn; //'O' or 'X' atomic_t unuse; // tasklet function will release user_data if unuse ai_func_t ai1_func; //'O', if NULL mean user space control ai_func_t ai2_func; //'X', if NULL mean user space control struct work_struct work; DECLARE_KFIFO_PTR(user_fifo, unsigned char); /* Wait queue to implement blocking I/O from userspace */ struct wait_queue_head rx_wait; struct hlist_node hlist; } UserData; ``` #### unuse 由於 `UserData` 會被 `TidData` 管理,在使用 `delete_tid_data` 時很有可能有些 `UserData` 的 `work` 剛好還在 worl_pool 中執行,即使使用 `cancel_work_sync` 仍然需要等待它執行完畢,若很不幸的遇到執行很久的 `MCTS` ,會導致 `delete_tid_data` 很久才能完成,因此先將其標記成 `unuse` ,之後搭配其他功能延後釋放是更好的選擇 #### work [doc](https://docs.kernel.org/core-api/workqueue.html) [workqueue.h](https://github.com/torvalds/linux/blob/master/include/linux/workqueue.h) [workqueue_type.h](https://github.com/torvalds/linux/blob/master/include/linux/workqueue_types.h) 由 `struct work_struct work;` 是 `workqueue` 安排任務的結構體,可藉由 `INIT_WORK(&user_data->work, work_func);` 初始化,並指定要執行的任務 `work` 藉由 `queue_work` 來將任務安排進 `workqueue` , 而 `queue_work` 能確保同個 `work` 不會在所有 `workqueue` 中出現兩次,為了能充分使用 `workqueue` 的能力,每個 user 有自己的 work_item 才能確保 `workqueue` 可以盡力處理多個 user 的要求。 此外,由於 work function 被定義成 `void (*work_func_t)(struct work_struct *work);` ,只有 `work` 作為參數傳遞 為了更新 user 的棋盤狀態與玩家切換,將 `work_struct` 定義在 `UserData` 中可以利用 `contain_of` 取得結構體的地址,這就能取得與 `user` 相關的所有資訊並進行更新。 #### kfifo 藉由 `DECLARE_KFIFO_PTR` 定義, API 使用參考 [FIFO Buffer](https://archive.kernel.org/oldlinux/htmldocs/kernel-api/kfifo.html#id-1.8.2) :::info `DECLARE_KFIFO_PTR(fifo)` 會定義一個 kfifo 指標 `fifo` `kfifo_alloc`, `kfifo_free`, `kfifo_in`, `kfifo_to_user` 原始碼註釋與文件上都表示要傳送 fifo 的指標 然而原本的 kxo 實際使用時卻加上取址操作(`&`),而這樣會正確執行,有可能是文件有問題? ::: 關於為什麼會選擇讓每個使用者擁有自己的 kfifo 。一個很明顯的好處是可以減少 lock 的使用, `kfifo` 雖然提供 `kfifo_out_peek` 可以先判斷 kfifo 的第一項是不是當前使用者在等待的資料,再決定要不要拿出來。 但 `kfifo` 只保證 SPSC 的情況下是 lock free 的,這種情況得加上 lock。 #### rx_wait #### hlist 前面提到 `unuse` 的功能,那要如何在 `TidData` 消失後仍可以存取到 `UserData` ? 此時可以利用 `include/linux/list.h` 中的 `hlist` 來維護鍊結串列,利用 tasklet function 定期檢查 ### namespace 討論 #### namespace 資料結構選擇 根據前面的討論, namespace 需要區分不同的 tid ,同個 tid 也會有多個不同的使用者,某個 tid 使用 close 時,需要將對應 tid 之下的所有 user_data 釋放。 若將 tid 與 user_id 以某種方式相加並取得一個 id ,沒辦法直接反應 tid 與 user_id 的親子關係,在新增或釋放 user_data 時,需要額外的資料結構維護,因此存在一個結構,紀錄某個 tid 之下有哪些 user 會是比較好的選擇。 為了 O(1) 取得 tid 對應的結構,使用陣列是個好選擇,但是直接開一個 `pid_max` 大小的陣列顯得很浪費,使用 hash table 能更有效的節省空間。 當 hash table 發生碰撞時,可以使用 hlist 處理 namespace 使用 hlist 需要讀寫鎖, add, del 需要 write_lock ,存取只需要 read_lock ,為了避免一次阻塞大量的 thread ,可以每一個 hash_table 的 hlist_head 搭配一個 rw_lock #### 維護 user_id 完全使用 tasklet 來管理 userdata 該在 user_list 還是 trash_list - unuse 可以 atomic - hlist 維護需要 lock (tasklet 跟 user 建立),此外,由於過程會有 tasklet function 參預 lock 競爭,須使用 spin_lock 避免進入睡眠 建立與刪除使用者 1. 使用爆搜取得使用者 id ,可以用 CAS 來避免 lock - 建立 user 的操作的次數不會那麼頻繁,而且一個 tid 的 user 數量是有限的 - 好實作 2. 使用 queue 維護 user_id ,會需要一個 lock 來避免衝突 - O(1) 建立 user - 刪除 user 仍然需要額外的資料結構維護(例如:紅黑樹),才能做到 $O(ln(n))$ ### kxo_namespace >[124ceaa](https://github.com/weiso131/kxo/commit/124ceaa805e5e4134c6da54c34b34acd3953b958) >[aab3436](https://github.com/weiso131/kxo/commit/aab3436d1aee4bce98cf16ed581202a59f911926)(此 commit 未實作 remove_user) #### struct ```c typedef struct tid_data { pid_t tid; UserData **user_data_list; struct hlist_node hlist; } TidData; ``` ```c struct kxo_namespace { struct hlist_head head; rwlock_t lock; }; ``` - `kxo_namespace` 定義在 `kxo_namespace.c` 內,`main.c` 透過函式與 `struct kxo_namespace kxo_namespace[NAMESPACE_MAX];` 互動 #### add_tid_data - 如討論所述,需要將讀寫鎖用 `write_lock` 鎖住對應的 `kxo_namespace` - 若 `vmalloc` 失敗回傳 `-1` ,否則回傳 `0` #### delete_tid_data - 逐一將該 `TidData` 之下的所有 `UserData` 使用 `atomic_set` 設成 `unuse` ,需要使用 `atomic` 操作是因為 `unuse` 可能被 `user_list_queue_work` 存取 #### find_tid_data - 將 `kxo_namespace` 的讀寫鎖用 `read_lock` 鎖住 #### add_user - 由於 `USER_MAX` 目前為 `32` ,逐個檢查是否未使用並不會花太久 - init_user_data 失敗回傳 -1 - 考慮之後刪除單個使用者的情況可能的衝突,利用 CAS 操作(`cmpxchg`) 嘗試檢查與交換,若到最後都失敗則回傳 -1 - 將 `user_data->hlist` 加入 `user_list_head` 時使用 `spin_lock` 保護,避免 `user_list_queue_work` 同時存取 #### get_user_data - 根據 `user_id` 回傳對應 `UserData` - 在 [386e914](https://github.com/weiso131/kxo/commit/386e914b4d398d605890ff9f0abb083d0d41b09a) 增加檢查 `user_id` 是否超過 `USER_MAX` #### user_list_queue_work - 被 tasklet_function 呼叫的函式,用來處理所有 user work 的 `queuework` - 當發現一個 `user` 是 `unuse` 時,不會立刻刪除,會將它放到 `trash_list_head` ,之後透過回收機制再真正回收,原因參考核心模組並行討論 #### release_namespace 這只能在 `rmmod` 時被呼叫,目的是真正清空 user_list 與 trash_list 上所有 uesr_data 動態配置的記憶體,在未來可以考慮加入定期清理的函式,但這個函式能確保記憶體被徹底清空,始終有存在的必要 - clean_namespace Todo ### 討論當前設計因為哪些部份無法讓整個系統為 lock-free #### namespace 裡面的 lock 在維護 `kxo_namespace` 與 `user_list_head` 時,我使用到了 lock ,即使 lock 住的內容都是鍊結串列操作,由於相關函式主要都被使用在核心模組通訊界面,這是會被排程的(可以從能夠使用 current 這個 task_struct 得知),雖然現代排程器不太可能讓一個任務飢餓太久,但探討 lock-free 我們不能考慮排程器的優化,這個前提下,如果一個執行緒搶到 lock 之後馬上被排程,然後陷入無盡的等待,整個系統將完全不會有進展(`kxo_namespace` 要剛好所有 user 都發生碰撞,假設一個 user 沒發生碰撞,那它會有進展) 至於這件事情是否會對效能造成很大的影響? `kxo_namespace` 由於不會一次鎖住全部,降低了因為其中的 `rwlock` 鎖住整個 `kxo_namespace` 的風險 而 `user_list_head` critical section 的操作甚至只包含了單純的 `hlist_add_head` 與 `hlist_entry_safe` ,不會讓 critical section 持續太久 因此,即時它沒作到 lock free ,考慮現代排程器會維護公平性,實際上目前架構不會對系統造成非常大的影響,但仍有改進的空間 :::info 可以發現,這邊用到 lock 的原因都跟 hlist 脫不了關係,由於 hlist 要同時維護 `next` 跟 `prev` ,很難讓刪改操作是 atomic 的 考量這邊的使用情境,是否可以自訂 linklist 結構,使其操作 lock free? [參考 lock free programing 案例分析](https://hackmd.io/@sysprog/concurrency/%2F%40sysprog%2Fconcurrency-lockfree#%E6%A1%88%E4%BE%8B%E5%88%86%E6%9E%9010) ::: #### 其他沒有 lock 但能導致沒辦法 lock-free 的因素 目前沒想到 ### lock-free 鍊結串列探討 參考: [並行程式設計: Lock-Free Programming](https://hackmd.io/@sysprog/concurrency-lockfree#%E6%A1%88%E4%BE%8B%E5%88%86%E6%9E%90) #### 與 hlist 比較 hlist 會維護一個 prev 來方便刪除操作,然而,為了達到 lock-free ,沒辦法保留 prev ,因為 next, prev 的插入與移除皆需要 `CAS` 來維護,以確保左節點與右節點相鄰,維護兩個指標可能會導致 next 成功,但在 prev 未成功時被存取導致錯誤,無法確保 atomic 。 #### add_head 文中維護的是一個有大小順序的鍊結串列,使用 `insert` 需要先 `search`,考慮 kxo_namespace 使用情境, `add_head` 就夠用了,讓操作能夠 O(1) ```c static inline void lf_list_add_head(struct lf_list *head, struct lf_list *node) { while (1) { struct lf_list *next = head->next; WRITE_ONCE(node->next, next); if (cmpxchg(&head->next, next, node) == next) break; } } ``` #### mark_remove 考慮 kxo_namespace 使用情境, remove 時能夠保證能先知道目標的位址 利用記憶體對齊的特性,可以確保不會發生位址的最後一個 bit 被使用到的狀況 (因為都是 4 的倍數 + 必須對齊),因此可以將最後 1 bit 用來標示是否被 remove 需要利用 CAS 確保 mark 的指標正確 ```c #define mark_remove(next) next | 1 #define unmark_remove(next) next & 0Xfffffffe #define is_mark(next) next & 1 ``` #### remove 這個 remove 必須在遍歷時使用,為了避免要刪除的節點仍被存取時釋放,造成 segment fault ,因此先使用 `mark_remove` 標記,在這個節點下次被存取時,才真的改變 `last->next` 達成 remove 。 為了確保刪除時左節點沒有被刪除,需要用 CAS 檢查它是否還存在。因為這個原因, remove 時沒辦法馬上將其釋放,然而除了系統結束的時候,沒有其他時間點能保證此時釋放不會出問題(釋放的節點不會是其他要刪除節點的左節點)。 為了達成 $O(1)$ `remove` ,當 `last` 為 `head` 時需要做額外檢查,需要利用 `CAS` 保證 `head` 真正指向要刪除的節點,才能將其刪除,否則放棄這次的刪除操作。 ```c void lf_list_remove(struct lf_list *last, struct lf_list *node, struct lf_list *head) ``` #### 討論 考慮目前使用 lock 的情況 - user_list_head 此處只有 `add_head` 會發生競爭, `remove` 的部份只有 `user_list_queue_work` 會用到, `remove` 時能夠保證不會有其他執行緒要存取目標節點,之後可以搭配回收機制在系統尚未結束時釋放記憶體,因此這個情況適合使用 lock-free 鍊結串列。 - 回收機制 目前還未真正實現,但情況跟 user_list_head 差不多,執行回收時只要先用 CAS 檢查並把 head 的 next 設成 NULL , 再把後面的鍊結串列整個釋放掉就結束了 - kxo_namespace 由於可能有多個執行緒進行 `remove` ,沒辦法在系統結束前釋放記憶體,若系統長時間運行會累積大量無用的記憶體。原本的作法由於只會某個 index,只要確保 hash_function 分佈足夠分散,實務上很難真的阻塞整個系統,已經達到接近 lock-free 的效能,因此這個結構不使用 lock-free 鍊結串列維護 ## 在不犧牲棋力的前提,降低 CPU 使用率 ### CMWQ [CMWQ 筆記](https://hackmd.io/@weiso131/ryBufYOzxl) >[20f3760](https://github.com/weiso131/kxo/commit/20f3760a3e2d472515fc5a0917977e34f1ba1b5e) - `WQ_CPU_INTENSIVE` 的影響經過實測,不會影響排程策略跟 nice 值,但跑 mcts 的時候電腦變得不卡了,根據 doc 的描述,它會讓這個 workqueue 的 work 不會阻礙同個 work pool 其他 work 的執行,這讓那些需要快速回應的任務(例如:滑鼠事件)反應變快,使用者就比較不會感受到電腦變卡的問題 >runnable CPU intensive work items will not prevent other work items in the same worker-pool from starting execution # 充分展現多核處理器的任務執行狀況 ## 兩個使用者 ai_work_func 的交互執行 - 開啟兩個使用者 - 為了方便觀察,一個使用者兩方都是 `mcts` ,一個使用者兩方都是 `negamax` ### trace_ai_func.sh ```shell #!/bin/bash TRACE_DIR=/sys/kernel/debug/tracing # clear echo 0 > $TRACE_DIR/tracing_on echo > $TRACE_DIR/set_graph_function echo > $TRACE_DIR/set_ftrace_filter echo nop > $TRACE_DIR/current_tracer sudo sh -c 'echo > /sys/kernel/tracing/set_graph_notrace' # setting echo function_graph > $TRACE_DIR/current_tracer echo 2 > $TRACE_DIR/max_graph_depth echo ai_work_func > $TRACE_DIR/set_graph_function # execute echo 1 > $TRACE_DIR/tracing_on ``` - 執行前先執行 `chmod +x trace_ai_func.sh` ### 部份輸出節錄 ``` 5) | ai_work_func [kxo]() { 5) 3.847 us | _printk(); 5) 0.144 us | ktime_get(); 5) 2.647 us | _printk(); 5) @ 528050.0 us | mcts [kxo](); 7) | ai_work_func [kxo]() { 7) 3.085 us | _printk(); 7) 0.118 us | ktime_get(); 7) 2.711 us | _printk(); 7) # 3809.916 us | negamax_move [kxo](); 7) 0.272 us | check_win [kxo](); 7) 4.582 us | _printk(); 7) 1.036 us | __wake_up(); 7) 0.120 us | ktime_get(); 7) 3.315 us | _printk(); 7) # 3829.471 us | } /* ai_work_func [kxo] */ ... //這裡都在 cpu 7 上跑 negamax 的 ai_work_func 7) ! 369.253 us | } /* ai_work_func [kxo] */ 5) 0.268 us | check_win [kxo](); 5) 6.721 us | _printk(); 5) + 15.621 us | __wake_up(); 5) 0.149 us | ktime_get(); 5) 3.555 us | _printk(); 5) @ 528086.1 us | } /* ai_work_func [kxo] */ ``` - 以上為開啟效能模式的執行結果 - 效能模式下, workqueue 會傾向讓同個 work 在同個 cpu 上執行 - 若電腦開啟省電模式,會容易觀察到不同的 work 在同個 cpu 運行,或是 work 使用的 cpu 切換,推測是排程器會在省電模式下盡可能維持 cpu idle 的狀態來達到省電的效果 ## 兩個使用者的 kxo 並行 ### trace_kxo.sh ```shell #!/bin/bash TRACE_DIR=/sys/kernel/debug/tracing # clear echo 0 > $TRACE_DIR/tracing_on echo > $TRACE_DIR/set_graph_function echo > $TRACE_DIR/set_ftrace_filter echo nop > $TRACE_DIR/current_tracer echo 10 > $TRACE_DIR/max_graph_depth sudo sh -c 'echo > /sys/kernel/tracing/set_graph_notrace' # setting echo function_graph | sudo tee $TRACE_DIR/current_tracer echo display-graph | sudo tee $TRACE_DIR/trace_options echo funcgraph-tail | sudo tee $TRACE_DIR/trace_options echo -e "available_moves\nxoro_jump\nxoro_next\nnew_node\ncalculate_win_value\ncheck_win\nnegamax\nnegamax_predict\nnegamax_init" | sudo tee /sys/kernel/tracing/set_graph_notrace echo '*:mod:kxo' | sudo tee $TRACE_DIR/set_ftrace_filter # execute echo 1 > $TRACE_DIR/tracing_on ``` ### 部份輸出節錄 #### timer_handler + tasklet 的問題 ``` 1) | ai_work_func [kxo]() { 1) | mcts [kxo]() { 1) 3.788 us | timer_handler [kxo](); 1) | game_tasklet_func [kxo]() { 1) 0.457 us | user_list_queue_work [kxo](); 1) 1.635 us | } /* game_tasklet_func [kxo] */ ... //觸發很多次 timer_handler + tasklet_func 1) 1.369 us | } /* game_tasklet_func [kxo] */ 1) # 1061.848 us | free_node [kxo](); 1) @ 764443.8 us | } /* mcts [kxo] */ 1) @ 764450.5 us | } /* ai_work_func [kxo] */ ``` - 目前的 timer delay 是 100 ms ,這個時間是 negamax 執行時間的數倍 - 但也不能因此把 delay 設太小,否則中間可能會遇到多次 context_switch - 考慮將 `user_list_queue_work` 的功能放進 `work_struct` ,內部反覆執行 `user_list_queue_work` 與睡眠一段時間 :::info 待驗證: - `worker pool` 會傾向讓 cpu 有事情做就好,放在 work 比較不會發生 `ai_work_func` 的搶佔 - 若 cpu 資源被用完,也不需要一直執行 `user_list_queue_work` - 唯一的問題可能是 cpu 資源被佔滿的同時, unuse 也很多,因此可能需要把 `work_struct` 放進有 `WQ_MEM_RECLAIM` 的 flag 的 workqueue :::