Try   HackMD

select 與 device driver 的關係

背景

在 kxo 中使用了以下程式碼

FD_ZERO(&readset);
FD_SET(STDIN_FILENO, &readset);
FD_SET(device_fd, &readset);

int result = select(max_fd + 1, &readset, NULL, NULL, NULL);

它用 select 去追蹤 STDIN_FILENOkxo
然而根據 select 的敘述

select() allows a program to monitor multiple file descriptors,
waiting until one or more of the file descriptors become "ready"
for some class of I/O operation (e.g., input possible). A file
descriptor is considered ready if it is possible to perform a
corresponding I/O operation (e.g., read(2), or a sufficiently
small write(2)) without blocking.

但實際使用 kxo 的時候,可以發現它會被 read block 住
十分直觀的體驗就是當 kxo 使用 ctrl + q 離開時,可能會卡一下,這很可能是 mcts 害的
而這件事情可以用 ftrace 觀察,使用者端因為 read 而陷入等待

 6)               |  kxo_read [kxo]() {
 6)   0.180 us    |    find_tid_data [kxo]();
 6)               |    get_user_data [kxo]() {
 6)   0.168 us    |      find_tid_data [kxo]();
 6)   0.731 us    |    } /* get_user_data [kxo] */
 1) + 34.845 us   |  timer_handler [kxo]();
 1)               |  game_tasklet_func [kxo]() {
 1)   4.103 us    |    user_list_queue_work [kxo]();
 1)   5.675 us    |  } /* game_tasklet_func [kxo] */
 ------------------------------------------
 1)    <idle>-0    => kworker-11466 
 ------------------------------------------

 1)               |  ai_work_func [kxo]() {
 1)               |    mcts [kxo]() {
...
 1) @ 764443.8 us |    } /* mcts [kxo] */
 1) @ 764450.5 us |  } /* ai_work_func [kxo] */
 6) @ 865334.2 us |  } /* kxo_read [kxo] */

這與 select 的敘述相左,於是我問了 charGPT ,它告訴我可能是因為 kxo 沒有實作好的 poll 來處理 select ,去查 poll(2) 有稍微提到這件事情

Being "ready" means that the requested operation will not block;
thus, poll()ing regular files, block devices, and other files with
no reasonable polling semantic always returns instantly as ready
to read and write.

然而, poll 實際上要怎麼實作 lkmpg 沒講, select 具體會怎麼用到它官方文件沒說,這個筆記就來探討這件事情。

測試程式碼

test_select_user

#include <stdio.h>
#include <fcntl.h>
#include <sys/select.h>
#include <sys/time.h>
#include <stdlib.h>
#include <unistd.h>

#define DEVICE_PATH "/dev/test_select"

int main()
{
    int             retval;
    fd_set          rfds;
    struct timeval  tv;

    int fd = open(DEVICE_PATH, O_RDWR | O_NONBLOCK);
    printf("fd: %d\n", fd);
    FD_ZERO(&rfds);
    FD_SET(fd, &rfds);

    /* Wait up to five seconds. */

    tv.tv_sec = 5;
    tv.tv_usec = 0;

    retval = select(fd + 1, &rfds, NULL, NULL, &tv);

    if (retval == -1)
        perror("select()");
    else if (FD_ISSET(fd, &rfds))
        printf("Read is available now.\n");
    else
        printf("Read is not available now.\n");

    close(fd);
    return 0;
}

核心模組程式碼

這裡核心模組只要存在 open, release, read, init, exit 的功能就能進行測試
不用真的實作
因此去 lkmpg 上拿 6.5 chardev.c 的範例來用即可

追蹤 select

利用 ftrace 追蹤 test_select_user.c 的 x86 系統呼叫

ftrace shell code

#!/bin/bash

TRACING_DIR=/sys/kernel/debug/tracing
TRACER=function_graph
TARGET_PROG=./user

# Reset trace
echo 0 | sudo tee $TRACING_DIR/tracing_on
echo nop | sudo tee $TRACING_DIR/current_tracer
echo > $TRACING_DIR/trace
echo > $TRACING_DIR/set_ftrace_filter
echo > $TRACING_DIR/set_graph_function
echo > $TRACING_DIR/set_ftrace_pid

# Set tracer
echo $TRACER | sudo tee $TRACING_DIR/current_tracer
echo x64_sys_call | sudo tee $TRACING_DIR/set_graph_function

# Start controlled child process (sleep 1 -> exec user)
bash -c "sleep 1; exec $TARGET_PROG" &
TARGET_PID=$!

# Set trace PID
echo $TARGET_PID | sudo tee $TRACING_DIR/set_ftrace_pid

# Start tracing
echo 1 | sudo tee $TRACING_DIR/tracing_on

echo "[*] Tracing PID $TARGET_PID..."
wait $TARGET_PID

# Stop tracing
echo 0 | sudo tee $TRACING_DIR/tracing_on

# Dump output
sudo cat $TRACING_DIR/trace > trace_user_select.txt
cat trace_user_select.txt

select 的呼叫途徑

trace_user_select.txt ctrl + f 尋找 select 可以看到

 6)               |  x64_sys_call() {
 6)               |    __x64_sys_pselect6() {
 6)               |      do_pselect.constprop.0() {
 6)   0.722 us    |        get_timespec64();
 6)   0.714 us    |        ktime_get_ts64();
 6)   0.768 us    |        timespec64_add_safe();
 6)   0.756 us    |        set_user_sigmask();
 6)               |        core_sys_select() {
 6)   0.625 us    |          __rcu_read_lock();
 6)   0.706 us    |          __rcu_read_unlock();
 6)               |          __check_object_size() {
 6)               |            __check_object_size.part.0() {
 6)   0.753 us    |              check_stack_object();
 6)   1.748 us    |            } /* __check_object_size.part.0 */
 6)   2.831 us    |          } /* __check_object_size */
 6)               |          do_select() {
 6)   0.549 us    |            __rcu_read_lock();
 6)   0.611 us    |            __rcu_read_unlock();
 6)               |            select_estimate_accuracy() {
 6)   0.802 us    |              ktime_get_ts64();
 6)   0.858 us    |              set_normalized_timespec64();
 6)   3.435 us    |            } /* select_estimate_accuracy */
 6)   0.817 us    |            __fdget();
 6)   0.713 us    |            __cond_resched();
 6)   0.953 us    |            poll_freewait();
 6) + 11.564 us   |          } /* do_select */
 6)               |          __check_object_size() {
 6)               |            __check_object_size.part.0() {
 6)   0.623 us    |              check_stack_object();
 6)   1.748 us    |            } /* __check_object_size.part.0 */
 6)   2.614 us    |          } /* __check_object_size */
 6) + 22.701 us   |        } /* core_sys_select */
 6)               |        poll_select_finish() {
 6)   0.607 us    |          ktime_get_ts64();
 6)   0.682 us    |          set_normalized_timespec64();
 6)   0.964 us    |          put_timespec64();
 6)   4.839 us    |        } /* poll_select_finish */
 6) + 34.425 us   |      } /* do_pselect.constprop.0 */
 6) + 35.694 us   |    } /* __x64_sys_pselect6 */
 6) + 37.165 us   |  } /* x64_sys_call */

在 bootlin 上追蹤 do_pselect -> core_sys_select -> do_select -> select_poll_one -> vfs_poll

觀察 core_sys_select

fds.in, fds.out, fds.ex 用來存放來自使用者的 readfds, writefds, exceptfds ,利用 get_fd_set 來做到 copy_from_user

在經過 do_select 的處理後,再將 fds.res_in, fds.res_out, fds.res_ex 利用 set_fd_set 複製到使用者空間

接下來可以開始觀察 do_select

觀察 do_select

最外層的迴圈會實現 select 在沒有任何觀測的裝置就緒時,進入等待的情況
實際上會藉由 poll_schedule_timeout 實現,其中呼叫了 schedule_hrtimeout_range ,根據註解,其功能就是讓任務睡眠,直到接收到訊號,或是指定的時間到達。

接下來會把 n 個裝置都做檢查
每次檢查 BITS_PER_LONG 個裝置
在開始檢查前,會利用 all_bits = in | out | ex; 確保至少有一個事件需要檢查
進入檢查迴圈後,利用 mask = select_poll_one(i, wait, in, out, bit, busy_flag); 取得 i 對應裝置的 mask ,根據這個 mask 去設定 res_in, res_out, res_ex ,最後再將這次的結果複製到 fds

觀察 select_poll_one

CLASS(fd, f)(fd); 不確定在幹麻 (找不到哪裡有 class_f_t)
但從 vfs_poll(fd_file(f), wait); 的使用推測,它應該是取得 int fd 對應的 file

觀察 vfs_poll

if (unlikely(!file->f_op->poll))
    return DEFAULT_POLLMASK;
return file->f_op->poll(file, pt);

它會使用 f_op->poll 這個操作,這就是 struct file_operations 下的 __poll_t (*poll) (struct file *, struct poll_table_struct *);

file->f_op->poll 未定義,就會回傳 DEFAULT_POLLMASK ,展開就會變成
(EPOLLIN | EPOLLOUT | EPOLLRDNORM | EPOLLWRNORM) ,代表此裝置已經就緒,與 kxo 的觀察結果相符

利用 poll 讓 select 以為 read 現在 not ready

poll 使用

由於官方文件找不到 poll 的說明,參考網路上的教學
embetronicx.com

static unsigned int etx_poll(struct file *filp, struct poll_table_struct *wait)
{
  __poll_t mask = 0;
  
  poll_wait(filp, &wait_queue_etx_data, wait);
  pr_info("Poll function\n");
  
  if( can_read )
  {
    can_read = false;
    mask |= ( POLLIN | POLLRDNORM );
  }
  
  if( can_write )
  {
    can_write = false;
    mask |= ( POLLOUT | POLLWRNORM );
  }
    
  return mask;
}

然後記得註冊

static struct file_operations fops =
{
  .owner          = THIS_MODULE,
  .read           = etx_read,
  .write          = etx_write,
  .open           = etx_open,
  .release        = etx_release,
  .poll           = etx_poll
};

mask 作為回傳值與前面對程式碼的觀察是相符的,但 poll_wait 是啥?

poll_wait

static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
	if (p && p->_qproc) {
		p->_qproc(filp, wait_address, p);
		/*
		 * This memory barrier is paired in the wq_has_sleeper().
		 * See the comment above prepare_to_wait(), we need to
		 * ensure that subsequent tests in this thread can't be
		 * reordered with __add_wait_queue() in _qproc() paths.
		 */
		smp_mb();
	}
}

回頭看 do_select

poll_initwait(&table);
wait = &table.pt;
...
            mask = select_poll_one(i, wait, in, out, bit,
						       busy_flag);

找到 poll_initwait ,發現 pt 被設成 __pollwait ,所以 p->_qproc 實際上會執行 __pollwait

__poll_wait

/* Add a new entry */
static void __pollwait(struct file *filp, wait_queue_head_t *wait_address,
				poll_table *p)
{
	struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt);
	struct poll_table_entry *entry = poll_get_entry(pwq);
	if (!entry)
		return;
	entry->filp = get_file(filp);
	entry->wait_address = wait_address;
	entry->key = p->_key;
	init_waitqueue_func_entry(&entry->wait, pollwake);
	entry->wait.private = pwq;
	add_wait_queue(wait_address, &entry->wait);
}

trace 這個 entry 會發現, wait 的型別是 wait_queue_entry_t ,它定義在 wait.h ,然而,沒有官方文件明確的解釋 add_wait_queue, init_waitqueue_func_entry 等機制,這邊嘗試在別的筆記分析 wait.h 的部份機制。

這邊結合我的筆記嘗試提供 poll_wait 的解釋:
首先,一個合理的 poll 會利用 poll_wait 的將 poll_get_entry 產生的 entry_wait 加入該裝置的 wait_head
若 do_select 一次迭代都找不到 ready 的 device
會進入睡眠
此時,一個裝置變成 ready 就會使用 wake 去喚醒那個 head 底下的 entry 與 head
select 進入睡眠的執行緒就會被喚醒

kxo 中利用 select 追蹤 /dev/kxo read 的必要性

關於 read(0, &input, 1),它其實就是從 STDIN 的裝置讀取一段長度的資料,若沒有資料,它就會陷入阻塞。因此, kxo 使用 select 的目的最初就是為了避免標準輸入監測造成的阻塞。然而, kxo 的 read 在等待 mcts 下棋的時間所造成的阻塞,會導致使用者在 ctrl+Q, ctrl+P 操作上的明顯延遲。
目前有兩種解決方法:
第一種方法是直接將 kxo read 改成非阻塞操作,若讀取失敗救回傳無效值,這樣即使不實作 poll 也能避免使用者體驗到暫停與停止的阻塞。然而,這會導致這個執行緒長期空轉,佔用 cpu 資源。
第二種方法就是乖乖把 poll 實作出來,這能確保兩個裝置沒有資源存取需求時,執行緒能夠進入睡眠。至於我自己的期末專題該如何搭配 select ,可以將 poll 實作成偵測某個 pid 之下的所有 user ,任一能夠成功讀取就代表就緒, 再將 read 實作成非阻塞,在使用者端檢查每個 user 的狀況。