# select 與 device driver 的關係 ## 背景 在 kxo 中使用了以下程式碼 ```c FD_ZERO(&readset); FD_SET(STDIN_FILENO, &readset); FD_SET(device_fd, &readset); int result = select(max_fd + 1, &readset, NULL, NULL, NULL); ``` 它用 `select` 去追蹤 `STDIN_FILENO` 跟 `kxo` 然而根據 `select` 的敘述 >select() allows a program to monitor multiple file descriptors, waiting until one or more of the file descriptors become "ready" for some class of I/O operation (e.g., input possible). A file descriptor is considered ready if it is possible to perform a corresponding I/O operation (e.g., read(2), or a sufficiently small write(2)) without blocking. 但實際使用 kxo 的時候,可以發現它會被 `read` block 住 十分直觀的體驗就是當 kxo 使用 ctrl + q 離開時,可能會卡一下,這很可能是 mcts 害的 而這件事情可以用 ftrace 觀察,使用者端因為 `read` 而陷入等待 ``` 6) | kxo_read [kxo]() { 6) 0.180 us | find_tid_data [kxo](); 6) | get_user_data [kxo]() { 6) 0.168 us | find_tid_data [kxo](); 6) 0.731 us | } /* get_user_data [kxo] */ 1) + 34.845 us | timer_handler [kxo](); 1) | game_tasklet_func [kxo]() { 1) 4.103 us | user_list_queue_work [kxo](); 1) 5.675 us | } /* game_tasklet_func [kxo] */ ------------------------------------------ 1) <idle>-0 => kworker-11466 ------------------------------------------ 1) | ai_work_func [kxo]() { 1) | mcts [kxo]() { ... 1) @ 764443.8 us | } /* mcts [kxo] */ 1) @ 764450.5 us | } /* ai_work_func [kxo] */ 6) @ 865334.2 us | } /* kxo_read [kxo] */ ``` 這與 `select` 的敘述相左,於是我問了 charGPT ,它告訴我可能是因為 kxo 沒有實作好的 poll 來處理 select ,去查 [`poll(2)`](https://man7.org/linux/man-pages/man2/poll.2.html) 有稍微提到這件事情 >Being "ready" means that the requested operation will not block; thus, poll()ing regular files, block devices, and other files with no reasonable polling semantic always returns instantly as ready to read and write. 然而, `poll` 實際上要怎麼實作 lkmpg 沒講, `select` 具體會怎麼用到它官方文件沒說,這個筆記就來探討這件事情。 ## 測試程式碼 ### test_select_user ```c #include <stdio.h> #include <fcntl.h> #include <sys/select.h> #include <sys/time.h> #include <stdlib.h> #include <unistd.h> #define DEVICE_PATH "/dev/test_select" int main() { int retval; fd_set rfds; struct timeval tv; int fd = open(DEVICE_PATH, O_RDWR | O_NONBLOCK); printf("fd: %d\n", fd); FD_ZERO(&rfds); FD_SET(fd, &rfds); /* Wait up to five seconds. */ tv.tv_sec = 5; tv.tv_usec = 0; retval = select(fd + 1, &rfds, NULL, NULL, &tv); if (retval == -1) perror("select()"); else if (FD_ISSET(fd, &rfds)) printf("Read is available now.\n"); else printf("Read is not available now.\n"); close(fd); return 0; } ``` ### 核心模組程式碼 這裡核心模組只要存在 open, release, read, init, exit 的功能就能進行測試 不用真的實作 因此去 [lkmpg](https://sysprog21.github.io/lkmpg/) 上拿 6.5 chardev.c 的範例來用即可 ## 追蹤 select 利用 ftrace 追蹤 `test_select_user.c` 的 x86 系統呼叫 ### ftrace shell code ``` #!/bin/bash TRACING_DIR=/sys/kernel/debug/tracing TRACER=function_graph TARGET_PROG=./user # Reset trace echo 0 | sudo tee $TRACING_DIR/tracing_on echo nop | sudo tee $TRACING_DIR/current_tracer echo > $TRACING_DIR/trace echo > $TRACING_DIR/set_ftrace_filter echo > $TRACING_DIR/set_graph_function echo > $TRACING_DIR/set_ftrace_pid # Set tracer echo $TRACER | sudo tee $TRACING_DIR/current_tracer echo x64_sys_call | sudo tee $TRACING_DIR/set_graph_function # Start controlled child process (sleep 1 -> exec user) bash -c "sleep 1; exec $TARGET_PROG" & TARGET_PID=$! # Set trace PID echo $TARGET_PID | sudo tee $TRACING_DIR/set_ftrace_pid # Start tracing echo 1 | sudo tee $TRACING_DIR/tracing_on echo "[*] Tracing PID $TARGET_PID..." wait $TARGET_PID # Stop tracing echo 0 | sudo tee $TRACING_DIR/tracing_on # Dump output sudo cat $TRACING_DIR/trace > trace_user_select.txt cat trace_user_select.txt ``` ### select 的呼叫途徑 在 `trace_user_select.txt` ctrl + f 尋找 select 可以看到 ``` 6) | x64_sys_call() { 6) | __x64_sys_pselect6() { 6) | do_pselect.constprop.0() { 6) 0.722 us | get_timespec64(); 6) 0.714 us | ktime_get_ts64(); 6) 0.768 us | timespec64_add_safe(); 6) 0.756 us | set_user_sigmask(); 6) | core_sys_select() { 6) 0.625 us | __rcu_read_lock(); 6) 0.706 us | __rcu_read_unlock(); 6) | __check_object_size() { 6) | __check_object_size.part.0() { 6) 0.753 us | check_stack_object(); 6) 1.748 us | } /* __check_object_size.part.0 */ 6) 2.831 us | } /* __check_object_size */ 6) | do_select() { 6) 0.549 us | __rcu_read_lock(); 6) 0.611 us | __rcu_read_unlock(); 6) | select_estimate_accuracy() { 6) 0.802 us | ktime_get_ts64(); 6) 0.858 us | set_normalized_timespec64(); 6) 3.435 us | } /* select_estimate_accuracy */ 6) 0.817 us | __fdget(); 6) 0.713 us | __cond_resched(); 6) 0.953 us | poll_freewait(); 6) + 11.564 us | } /* do_select */ 6) | __check_object_size() { 6) | __check_object_size.part.0() { 6) 0.623 us | check_stack_object(); 6) 1.748 us | } /* __check_object_size.part.0 */ 6) 2.614 us | } /* __check_object_size */ 6) + 22.701 us | } /* core_sys_select */ 6) | poll_select_finish() { 6) 0.607 us | ktime_get_ts64(); 6) 0.682 us | set_normalized_timespec64(); 6) 0.964 us | put_timespec64(); 6) 4.839 us | } /* poll_select_finish */ 6) + 34.425 us | } /* do_pselect.constprop.0 */ 6) + 35.694 us | } /* __x64_sys_pselect6 */ 6) + 37.165 us | } /* x64_sys_call */ ``` 在 bootlin 上追蹤 [`do_pselect`](https://elixir.bootlin.com/linux/v6.15.4/source/fs/select.c#L728) -> [`core_sys_select`](https://elixir.bootlin.com/linux/v6.15.4/source/fs/select.c#L621) -> [`do_select`](https://elixir.bootlin.com/linux/v6.15.4/source/fs/select.c#L483) -> [`select_poll_one`](https://elixir.bootlin.com/linux/v6.15.4/source/fs/select.c#L465) -> [`vfs_poll`](https://elixir.bootlin.com/linux/v6.15.4/source/include/linux/poll.h#L78) #### 觀察 `core_sys_select` `fds.in`, `fds.out`, `fds.ex` 用來存放來自使用者的 `readfds`, `writefds`, `exceptfds` ,利用 `get_fd_set` 來做到 `copy_from_user` 在經過 `do_select` 的處理後,再將 `fds.res_in`, `fds.res_out`, `fds.res_ex` 利用 `set_fd_set` 複製到使用者空間 接下來可以開始觀察 `do_select` #### 觀察 `do_select` 最外層的迴圈會實現 `select` 在沒有任何觀測的裝置就緒時,進入等待的情況 實際上會藉由 `poll_schedule_timeout` 實現,其中呼叫了 [`schedule_hrtimeout_range`](https://elixir.bootlin.com/linux/v6.15.4/source/kernel/time/sleep_timeout.c#L227) ,根據註解,其功能就是讓任務睡眠,直到接收到訊號,或是指定的時間到達。 接下來會把 n 個裝置都做檢查 每次檢查 `BITS_PER_LONG` 個裝置 在開始檢查前,會利用 `all_bits = in | out | ex;` 確保至少有一個事件需要檢查 進入檢查迴圈後,利用 `mask = select_poll_one(i, wait, in, out, bit, busy_flag);` 取得 i 對應裝置的 `mask` ,根據這個 `mask` 去設定 `res_in`, `res_out`, `res_ex` ,最後再將這次的結果複製到 `fds` #### 觀察 `select_poll_one` `CLASS(fd, f)(fd);` 不確定在幹麻 (找不到哪裡有 `class_f_t`) 但從 `vfs_poll(fd_file(f), wait);` 的使用推測,它應該是取得 `int fd` 對應的 `file` #### 觀察 `vfs_poll` ```c if (unlikely(!file->f_op->poll)) return DEFAULT_POLLMASK; return file->f_op->poll(file, pt); ``` 它會使用 `f_op->poll` 這個操作,這就是 `struct file_operations` 下的 `__poll_t (*poll) (struct file *, struct poll_table_struct *);` 若 `file->f_op->poll` 未定義,就會回傳 `DEFAULT_POLLMASK` ,展開就會變成 `(EPOLLIN | EPOLLOUT | EPOLLRDNORM | EPOLLWRNORM)` ,代表此裝置已經就緒,與 kxo 的觀察結果相符 ## 利用 poll 讓 select 以為 read 現在 not ready ### poll 使用 由於官方文件找不到 poll 的說明,參考網路上的教學 [embetronicx.com](https://embetronicx.com/tutorials/linux/device-drivers/poll-linux-example-device-driver/) ```c static unsigned int etx_poll(struct file *filp, struct poll_table_struct *wait) { __poll_t mask = 0; poll_wait(filp, &wait_queue_etx_data, wait); pr_info("Poll function\n"); if( can_read ) { can_read = false; mask |= ( POLLIN | POLLRDNORM ); } if( can_write ) { can_write = false; mask |= ( POLLOUT | POLLWRNORM ); } return mask; } ``` 然後記得註冊 ```c static struct file_operations fops = { .owner = THIS_MODULE, .read = etx_read, .write = etx_write, .open = etx_open, .release = etx_release, .poll = etx_poll }; ``` `mask` 作為回傳值與前面對程式碼的觀察是相符的,但 `poll_wait` 是啥? ### poll_wait ```c static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p) { if (p && p->_qproc) { p->_qproc(filp, wait_address, p); /* * This memory barrier is paired in the wq_has_sleeper(). * See the comment above prepare_to_wait(), we need to * ensure that subsequent tests in this thread can't be * reordered with __add_wait_queue() in _qproc() paths. */ smp_mb(); } } ``` 回頭看 `do_select` ```c poll_initwait(&table); wait = &table.pt; ... mask = select_poll_one(i, wait, in, out, bit, busy_flag); ``` 找到 `poll_initwait` ,發現 `pt` 被設成 `__pollwait` ,所以 `p->_qproc` 實際上會執行 `__pollwait` ### __poll_wait ```c /* Add a new entry */ static void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p) { struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt); struct poll_table_entry *entry = poll_get_entry(pwq); if (!entry) return; entry->filp = get_file(filp); entry->wait_address = wait_address; entry->key = p->_key; init_waitqueue_func_entry(&entry->wait, pollwake); entry->wait.private = pwq; add_wait_queue(wait_address, &entry->wait); } ``` trace 這個 entry 會發現, `wait` 的型別是 `wait_queue_entry_t` ,它定義在 `wait.h` ,然而,沒有官方文件明確的解釋 `add_wait_queue`, `init_waitqueue_func_entry` 等機制,這邊嘗試在[別的筆記](https://hackmd.io/@weiso131/B190-Wg8gx)分析 `wait.h` 的部份機制。 :::info 這邊結合我的筆記嘗試提供 `poll_wait` 的解釋: 首先,一個合理的 poll 會利用 poll_wait 的將 poll_get_entry 產生的 entry_wait 加入該裝置的 wait_head 若 do_select 一次迭代都找不到 ready 的 device 會進入睡眠 此時,一個裝置變成 ready 就會使用 wake 去喚醒那個 head 底下的 entry 與 head select 進入睡眠的執行緒就會被喚醒 ::: ## kxo 中利用 select 追蹤 /dev/kxo read 的必要性 關於 `read(0, &input, 1)`,它其實就是從 `STDIN` 的裝置讀取一段長度的資料,若沒有資料,它就會陷入阻塞。因此, kxo 使用 `select` 的目的最初就是為了避免標準輸入監測造成的阻塞。然而, kxo 的 read 在等待 mcts 下棋的時間所造成的阻塞,會導致使用者在 `ctrl+Q`, `ctrl+P` 操作上的明顯延遲。 目前有兩種解決方法: 第一種方法是直接將 kxo `read` 改成非阻塞操作,若讀取失敗救回傳無效值,這樣即使不實作 `poll` 也能避免使用者體驗到暫停與停止的阻塞。然而,這會導致這個執行緒長期空轉,佔用 cpu 資源。 第二種方法就是乖乖把 `poll` 實作出來,這能確保兩個裝置沒有資源存取需求時,執行緒能夠進入睡眠。至於我自己的期末專題該如何搭配 `select` ,可以將 `poll` 實作成偵測某個 `pid` 之下的所有 `user` ,任一能夠成功讀取就代表就緒, 再將 `read` 實作成非阻塞,在使用者端檢查每個 `user` 的狀況。