# select 與 device driver 的關係
## 背景
在 kxo 中使用了以下程式碼
```c
FD_ZERO(&readset);
FD_SET(STDIN_FILENO, &readset);
FD_SET(device_fd, &readset);
int result = select(max_fd + 1, &readset, NULL, NULL, NULL);
```
它用 `select` 去追蹤 `STDIN_FILENO` 跟 `kxo`
然而根據 `select` 的敘述
>select() allows a program to monitor multiple file descriptors,
waiting until one or more of the file descriptors become "ready"
for some class of I/O operation (e.g., input possible). A file
descriptor is considered ready if it is possible to perform a
corresponding I/O operation (e.g., read(2), or a sufficiently
small write(2)) without blocking.
但實際使用 kxo 的時候,可以發現它會被 `read` block 住
十分直觀的體驗就是當 kxo 使用 ctrl + q 離開時,可能會卡一下,這很可能是 mcts 害的
而這件事情可以用 ftrace 觀察,使用者端因為 `read` 而陷入等待
```
6) | kxo_read [kxo]() {
6) 0.180 us | find_tid_data [kxo]();
6) | get_user_data [kxo]() {
6) 0.168 us | find_tid_data [kxo]();
6) 0.731 us | } /* get_user_data [kxo] */
1) + 34.845 us | timer_handler [kxo]();
1) | game_tasklet_func [kxo]() {
1) 4.103 us | user_list_queue_work [kxo]();
1) 5.675 us | } /* game_tasklet_func [kxo] */
------------------------------------------
1) <idle>-0 => kworker-11466
------------------------------------------
1) | ai_work_func [kxo]() {
1) | mcts [kxo]() {
...
1) @ 764443.8 us | } /* mcts [kxo] */
1) @ 764450.5 us | } /* ai_work_func [kxo] */
6) @ 865334.2 us | } /* kxo_read [kxo] */
```
這與 `select` 的敘述相左,於是我問了 charGPT ,它告訴我可能是因為 kxo 沒有實作好的 poll 來處理 select ,去查 [`poll(2)`](https://man7.org/linux/man-pages/man2/poll.2.html) 有稍微提到這件事情
>Being "ready" means that the requested operation will not block;
thus, poll()ing regular files, block devices, and other files with
no reasonable polling semantic always returns instantly as ready
to read and write.
然而, `poll` 實際上要怎麼實作 lkmpg 沒講, `select` 具體會怎麼用到它官方文件沒說,這個筆記就來探討這件事情。
## 測試程式碼
### test_select_user
```c
#include <stdio.h>
#include <fcntl.h>
#include <sys/select.h>
#include <sys/time.h>
#include <stdlib.h>
#include <unistd.h>
#define DEVICE_PATH "/dev/test_select"
int main()
{
int retval;
fd_set rfds;
struct timeval tv;
int fd = open(DEVICE_PATH, O_RDWR | O_NONBLOCK);
printf("fd: %d\n", fd);
FD_ZERO(&rfds);
FD_SET(fd, &rfds);
/* Wait up to five seconds. */
tv.tv_sec = 5;
tv.tv_usec = 0;
retval = select(fd + 1, &rfds, NULL, NULL, &tv);
if (retval == -1)
perror("select()");
else if (FD_ISSET(fd, &rfds))
printf("Read is available now.\n");
else
printf("Read is not available now.\n");
close(fd);
return 0;
}
```
### 核心模組程式碼
這裡核心模組只要存在 open, release, read, init, exit 的功能就能進行測試
不用真的實作
因此去 [lkmpg](https://sysprog21.github.io/lkmpg/) 上拿 6.5 chardev.c 的範例來用即可
## 追蹤 select
利用 ftrace 追蹤 `test_select_user.c` 的 x86 系統呼叫
### ftrace shell code
```
#!/bin/bash
TRACING_DIR=/sys/kernel/debug/tracing
TRACER=function_graph
TARGET_PROG=./user
# Reset trace
echo 0 | sudo tee $TRACING_DIR/tracing_on
echo nop | sudo tee $TRACING_DIR/current_tracer
echo > $TRACING_DIR/trace
echo > $TRACING_DIR/set_ftrace_filter
echo > $TRACING_DIR/set_graph_function
echo > $TRACING_DIR/set_ftrace_pid
# Set tracer
echo $TRACER | sudo tee $TRACING_DIR/current_tracer
echo x64_sys_call | sudo tee $TRACING_DIR/set_graph_function
# Start controlled child process (sleep 1 -> exec user)
bash -c "sleep 1; exec $TARGET_PROG" &
TARGET_PID=$!
# Set trace PID
echo $TARGET_PID | sudo tee $TRACING_DIR/set_ftrace_pid
# Start tracing
echo 1 | sudo tee $TRACING_DIR/tracing_on
echo "[*] Tracing PID $TARGET_PID..."
wait $TARGET_PID
# Stop tracing
echo 0 | sudo tee $TRACING_DIR/tracing_on
# Dump output
sudo cat $TRACING_DIR/trace > trace_user_select.txt
cat trace_user_select.txt
```
### select 的呼叫途徑
在 `trace_user_select.txt` ctrl + f 尋找 select 可以看到
```
6) | x64_sys_call() {
6) | __x64_sys_pselect6() {
6) | do_pselect.constprop.0() {
6) 0.722 us | get_timespec64();
6) 0.714 us | ktime_get_ts64();
6) 0.768 us | timespec64_add_safe();
6) 0.756 us | set_user_sigmask();
6) | core_sys_select() {
6) 0.625 us | __rcu_read_lock();
6) 0.706 us | __rcu_read_unlock();
6) | __check_object_size() {
6) | __check_object_size.part.0() {
6) 0.753 us | check_stack_object();
6) 1.748 us | } /* __check_object_size.part.0 */
6) 2.831 us | } /* __check_object_size */
6) | do_select() {
6) 0.549 us | __rcu_read_lock();
6) 0.611 us | __rcu_read_unlock();
6) | select_estimate_accuracy() {
6) 0.802 us | ktime_get_ts64();
6) 0.858 us | set_normalized_timespec64();
6) 3.435 us | } /* select_estimate_accuracy */
6) 0.817 us | __fdget();
6) 0.713 us | __cond_resched();
6) 0.953 us | poll_freewait();
6) + 11.564 us | } /* do_select */
6) | __check_object_size() {
6) | __check_object_size.part.0() {
6) 0.623 us | check_stack_object();
6) 1.748 us | } /* __check_object_size.part.0 */
6) 2.614 us | } /* __check_object_size */
6) + 22.701 us | } /* core_sys_select */
6) | poll_select_finish() {
6) 0.607 us | ktime_get_ts64();
6) 0.682 us | set_normalized_timespec64();
6) 0.964 us | put_timespec64();
6) 4.839 us | } /* poll_select_finish */
6) + 34.425 us | } /* do_pselect.constprop.0 */
6) + 35.694 us | } /* __x64_sys_pselect6 */
6) + 37.165 us | } /* x64_sys_call */
```
在 bootlin 上追蹤 [`do_pselect`](https://elixir.bootlin.com/linux/v6.15.4/source/fs/select.c#L728) -> [`core_sys_select`](https://elixir.bootlin.com/linux/v6.15.4/source/fs/select.c#L621) -> [`do_select`](https://elixir.bootlin.com/linux/v6.15.4/source/fs/select.c#L483) -> [`select_poll_one`](https://elixir.bootlin.com/linux/v6.15.4/source/fs/select.c#L465) -> [`vfs_poll`](https://elixir.bootlin.com/linux/v6.15.4/source/include/linux/poll.h#L78)
#### 觀察 `core_sys_select`
`fds.in`, `fds.out`, `fds.ex` 用來存放來自使用者的 `readfds`, `writefds`, `exceptfds` ,利用 `get_fd_set` 來做到 `copy_from_user`
在經過 `do_select` 的處理後,再將 `fds.res_in`, `fds.res_out`, `fds.res_ex` 利用 `set_fd_set` 複製到使用者空間
接下來可以開始觀察 `do_select`
#### 觀察 `do_select`
最外層的迴圈會實現 `select` 在沒有任何觀測的裝置就緒時,進入等待的情況
實際上會藉由 `poll_schedule_timeout` 實現,其中呼叫了 [`schedule_hrtimeout_range`](https://elixir.bootlin.com/linux/v6.15.4/source/kernel/time/sleep_timeout.c#L227) ,根據註解,其功能就是讓任務睡眠,直到接收到訊號,或是指定的時間到達。
接下來會把 n 個裝置都做檢查
每次檢查 `BITS_PER_LONG` 個裝置
在開始檢查前,會利用 `all_bits = in | out | ex;` 確保至少有一個事件需要檢查
進入檢查迴圈後,利用 `mask = select_poll_one(i, wait, in, out, bit, busy_flag);` 取得 i 對應裝置的 `mask` ,根據這個 `mask` 去設定 `res_in`, `res_out`, `res_ex` ,最後再將這次的結果複製到 `fds`
#### 觀察 `select_poll_one`
`CLASS(fd, f)(fd);` 不確定在幹麻 (找不到哪裡有 `class_f_t`)
但從 `vfs_poll(fd_file(f), wait);` 的使用推測,它應該是取得 `int fd` 對應的 `file`
#### 觀察 `vfs_poll`
```c
if (unlikely(!file->f_op->poll))
return DEFAULT_POLLMASK;
return file->f_op->poll(file, pt);
```
它會使用 `f_op->poll` 這個操作,這就是 `struct file_operations` 下的 `__poll_t (*poll) (struct file *, struct poll_table_struct *);`
若 `file->f_op->poll` 未定義,就會回傳 `DEFAULT_POLLMASK` ,展開就會變成
`(EPOLLIN | EPOLLOUT | EPOLLRDNORM | EPOLLWRNORM)` ,代表此裝置已經就緒,與 kxo 的觀察結果相符
## 利用 poll 讓 select 以為 read 現在 not ready
### poll 使用
由於官方文件找不到 poll 的說明,參考網路上的教學
[embetronicx.com](https://embetronicx.com/tutorials/linux/device-drivers/poll-linux-example-device-driver/)
```c
static unsigned int etx_poll(struct file *filp, struct poll_table_struct *wait)
{
__poll_t mask = 0;
poll_wait(filp, &wait_queue_etx_data, wait);
pr_info("Poll function\n");
if( can_read )
{
can_read = false;
mask |= ( POLLIN | POLLRDNORM );
}
if( can_write )
{
can_write = false;
mask |= ( POLLOUT | POLLWRNORM );
}
return mask;
}
```
然後記得註冊
```c
static struct file_operations fops =
{
.owner = THIS_MODULE,
.read = etx_read,
.write = etx_write,
.open = etx_open,
.release = etx_release,
.poll = etx_poll
};
```
`mask` 作為回傳值與前面對程式碼的觀察是相符的,但 `poll_wait` 是啥?
### poll_wait
```c
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
if (p && p->_qproc) {
p->_qproc(filp, wait_address, p);
/*
* This memory barrier is paired in the wq_has_sleeper().
* See the comment above prepare_to_wait(), we need to
* ensure that subsequent tests in this thread can't be
* reordered with __add_wait_queue() in _qproc() paths.
*/
smp_mb();
}
}
```
回頭看 `do_select`
```c
poll_initwait(&table);
wait = &table.pt;
...
mask = select_poll_one(i, wait, in, out, bit,
busy_flag);
```
找到 `poll_initwait` ,發現 `pt` 被設成 `__pollwait` ,所以 `p->_qproc` 實際上會執行 `__pollwait`
### __poll_wait
```c
/* Add a new entry */
static void __pollwait(struct file *filp, wait_queue_head_t *wait_address,
poll_table *p)
{
struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt);
struct poll_table_entry *entry = poll_get_entry(pwq);
if (!entry)
return;
entry->filp = get_file(filp);
entry->wait_address = wait_address;
entry->key = p->_key;
init_waitqueue_func_entry(&entry->wait, pollwake);
entry->wait.private = pwq;
add_wait_queue(wait_address, &entry->wait);
}
```
trace 這個 entry 會發現, `wait` 的型別是 `wait_queue_entry_t` ,它定義在 `wait.h` ,然而,沒有官方文件明確的解釋 `add_wait_queue`, `init_waitqueue_func_entry` 等機制,這邊嘗試在[別的筆記](https://hackmd.io/@weiso131/B190-Wg8gx)分析 `wait.h` 的部份機制。
:::info
這邊結合我的筆記嘗試提供 `poll_wait` 的解釋:
首先,一個合理的 poll 會利用 poll_wait 的將 poll_get_entry 產生的 entry_wait 加入該裝置的 wait_head
若 do_select 一次迭代都找不到 ready 的 device
會進入睡眠
此時,一個裝置變成 ready 就會使用 wake 去喚醒那個 head 底下的 entry 與 head
select 進入睡眠的執行緒就會被喚醒
:::
## kxo 中利用 select 追蹤 /dev/kxo read 的必要性
關於 `read(0, &input, 1)`,它其實就是從 `STDIN` 的裝置讀取一段長度的資料,若沒有資料,它就會陷入阻塞。因此, kxo 使用 `select` 的目的最初就是為了避免標準輸入監測造成的阻塞。然而, kxo 的 read 在等待 mcts 下棋的時間所造成的阻塞,會導致使用者在 `ctrl+Q`, `ctrl+P` 操作上的明顯延遲。
目前有兩種解決方法:
第一種方法是直接將 kxo `read` 改成非阻塞操作,若讀取失敗救回傳無效值,這樣即使不實作 `poll` 也能避免使用者體驗到暫停與停止的阻塞。然而,這會導致這個執行緒長期空轉,佔用 cpu 資源。
第二種方法就是乖乖把 `poll` 實作出來,這能確保兩個裝置沒有資源存取需求時,執行緒能夠進入睡眠。至於我自己的期末專題該如何搭配 `select` ,可以將 `poll` 實作成偵測某個 `pid` 之下的所有 `user` ,任一能夠成功讀取就代表就緒, 再將 `read` 實作成非阻塞,在使用者端檢查每個 `user` 的狀況。