# 2021q3 Homework3 (vpoll) contributed by < `hankluo6` > :::spoiler ### 參考解答 * `MMM` = 1 * `NNN` = 2 * `WWW` = `wake_up_interruptible_poll` * `ZZZ` = 10 ::: ## 運作原理 [`epoll_create1` ](https://man7.org/linux/man-pages/man2/epoll_create.2.html) 建立一個新的 epoll instance,並回傳對應的 file descriptor 來使用。[`epoll_ctl`](https://man7.org/linux/man-pages/man2/epoll_ctl.2.html) 可將 `vpoll` 以 `eptime` 包裝並註冊到 epoll 內部,其之後便能在 `epoll_wait` 中找到 `vpoll`。 `epoll_ctl` 會呼叫 `vpoll` 中的 `poll` file operation,用來初始化 wait queue 及對應的 hook function。 呼叫順序為:`epoll_ctl` $\to$ `do_epoll_ctl` $\to$ `ep_insert` $\to$ `ep_item_poll` $\to$ `vfs_poll` ```c static inline __poll_t vfs_poll(struct file *file, struct poll_table_struct *pt) { if (unlikely(!file->f_op->poll)) return DEFAULT_POLLMASK; return file->f_op->poll(file, pt); } ``` 最後呼叫到 `file->f_op->poll()`,即 `vpoll` 中指定的 `vpoll_poll` callback。 ```c static __poll_t vpoll_poll(struct file *file, struct poll_table_struct *wait) { struct vpoll_data *vpoll_data = file->private_data; poll_wait(file, &vpoll_data->wqh, wait); return READ_ONCE(vpoll_data->events); } ``` 追蹤 `poll_wait` 會發現其初始化 `vpoll` 內的 wait queue,並且註冊 `ep_poll_callback` 為 callback function: ```c static int ep_insert(...) { ... init_poll_funcptr(&epq.pt, ep_ptable_queue_proc); ... } static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc) { pt->_qproc = qproc; pt->_key = ~(__poll_t)0; /* all events enabled */ } static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p) { if (p && p->_qproc && wait_address) p->_qproc(filp, wait_address, p); } /* * This is the callback that is used to add our wait queue to the * target file wakeup lists. */ static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, poll_table *pt) { ... pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL); init_waitqueue_func_entry(&pwq->wait, ep_poll_callback); ... if (epi->event.events & EPOLLEXCLUSIVE) add_wait_queue_exclusive(whead, &pwq->wait); else add_wait_queue(whead, &pwq->wait); ... } static inline void init_waitqueue_func_entry(struct wait_queue_entry *wq_entry, wait_queue_func_t func) { wq_entry->flags = 0; wq_entry->private = NULL; wq_entry->func = func; } ``` 當我們的 `vpoll` 收到訊息時,便能透過喚醒 `vpoll` 內的 wait queue 來執行 `ep_poll_callback`。 ```c static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, void *key) { struct epitem *epi = ep_item_from_wait(wait); ... if (READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR) { if (chain_epi_lockless(epi)) ep_pm_stay_awake_rcu(epi); } else if (!ep_is_linked(epi)) { /* In the usual case, add event to ready list. */ if (list_add_tail_lockless(&epi->rdllink, &ep->rdllist)) ep_pm_stay_awake_rcu(epi); } /* * Wake up ( if active ) both the eventpoll wait list and the ->poll() * wait list. */ if (waitqueue_active(&ep->wq)) { ... wake_up(&ep->wq); } ... } ``` 而 `ep_poll_callback` 便會利用 `list_add_tail_lockless` 將 `vpoll` 對應的 `eptime` 加入到 `epoll` 內的 `rdllist` 當中,表示 `vpoll` 已有事件發生並等待處理。且當 epoll 目前的 wait queue 有任務在等待時 (呼叫 `epoll_wait` 之後),便會喚醒 epoll 內的 `ep_wait`。 `epoll_wait` 會等待 `vpoll` 上有事件發生,利用 epoll 本身的 wait queue 來等待事件。 呼叫順序為:`epoll_wait` $\to$ `do_epoll_wait` $\to$ `ep_wait` ```c /** * ep_poll - Retrieves ready events, and delivers them to the caller-supplied * event buffer. * * @ep: Pointer to the eventpoll context. * @events: Pointer to the userspace buffer where the ready events should be * stored. * @maxevents: Size (in terms of number of events) of the caller event buffer. * @timeout: Maximum timeout for the ready events fetch operation, in * timespec. If the timeout is zero, the function will not block, * while if the @timeout ptr is NULL, the function will block * until at least one event has been retrieved (or an error * occurred). * * Return: the number of ready events which have been fetched, or an * error code, in case of error. */ static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, struct timespec64 *timeout) { wait_queue_entry_t wait; ... /* * This call is racy: We may or may not see events that are being added * to the ready list under the lock (e.g., in IRQ callbacks). For cases * with a non-zero timeout, this thread will check the ready list under * lock and will add to the wait queue. For cases with a zero * timeout, the user by definition should not care and will have to * recheck again. */ eavail = ep_events_available(ep); while (1) { if (eavail) { /* * Try to transfer events to user space. In case we get * 0 events and there's still timeout left over, we go * trying again in search of more luck. */ res = ep_send_events(ep, events, maxevents); if (res) return res; } init_wait(&wait); eavail = ep_events_available(ep); if (!eavail) __add_wait_queue_exclusive(&ep->wq, &wait); if (!eavail) timed_out = !schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS); eavail = 1; if (!list_empty_careful(&wait.entry)) { write_lock_irq(&ep->lock); /* * If the thread timed out and is not on the wait queue, * it means that the thread was woken up after its * timeout expired before it could reacquire the lock. * Thus, when wait.entry is empty, it needs to harvest * events. */ if (timed_out) eavail = list_empty(&wait.entry); __remove_wait_queue(&ep->wq, &wait); write_unlock_irq(&ep->lock); } } ``` 當目前沒有事件要處理時,`schedule_hrtimeout_range` 會主動讓出 CPU 給其他 process,而當 `schedule_hrtimeout_range` 回傳時表示被喚醒或 timeout,下方 if 部分決定是否被喚醒。 如果 `vpoll` 有事件發生,透過先前的 `ep_poll_callback` 喚醒此 process,接下來的 `ep_events_available` 便會回傳非 0 數值,接著 `ep_send_events` 便會將事件對應的 events 回傳到 user space。 ```c static inline int ep_events_available(struct eventpoll *ep) { return !list_empty_careful(&ep->rdllist) || READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR; } ``` 可以看到 `ep_events_available` 判斷 `rdllist` 內是否為空,表示是否有待處理的事件在 `rdllist` 當中。 在 `module.c` 內的 `ioctl` 便是有事件發生的情形,所以要讓 `vpoll` 喚醒其內部的 wait queue,以便執行 `ep_poll_callback` 來通知 epoll。 ```c static long vpoll_ioctl(struct file *file, unsigned int cmd, unsigned long arg) { ... if (res >= 0) { res = vpoll_data->events; if (waitqueue_active(&vpoll_data->wqh)) wake_up_locked_poll(&vpoll_data->wqh, vpoll_data->events); } ... } ``` :::warning Why can't use `wake_up_interruptible_poll` ? ::: 最後便會呼叫 `__wake_up_common` 並執行 `wait_queue_head` 對應的 function: `ep_poll_callback`。 而 `epoll_wait` 回傳後的參數 events 是透過 `ep_send_events` 來取得的,內部會再一次呼叫 `vfs_poll`,其回傳值便是事件產生的 events,也就是 `vpoll` 內的 `vpoll_poll` 回傳值。 ```c /* * Differs from ep_eventpoll_poll() in that internal callers already have * the ep->mtx so we need to start from depth=1, such that mutex_lock_nested() * is correctly annotated. */ static __poll_t ep_item_poll(const struct epitem *epi, poll_table *pt, int depth) { struct file *file = epi->ffd.file; __poll_t res; pt->_key = epi->event.events; if (!is_file_epoll(file)) res = vfs_poll(file, pt); else res = __ep_eventpoll_poll(file, pt, depth); return res & epi->event.events; } static int ep_send_events(struct eventpoll *ep, struct epoll_event __user *events, int maxevents) { ... list_for_each_entry_safe(epi, tmp, &txlist, rdllink) { ... revents = ep_item_poll(epi, &pt, 1); if (!revents) continue; if (__put_user(revents, &events->events) || __put_user(epi->event.data, &events->data)) { list_add(&epi->rdllink, &txlist); ep_pm_stay_awake(epi); if (!res) res = -EFAULT; break; } } ... } ``` 流程圖: ```graphviz digraph G { node [fontname = "Handlee"]; edge [fontname = "Handlee"]; vpoll [ label = "vpoll" shape=box; ]; ioctl [ label = "vpoll_ioctl" shape=box; ]; wake_epoll [label = "wake up epoll" shape=box;]; insert [label = "insert eptime \nto rdllist" shape=box;]; epoll [label = "epoll" shape=box;]; ep_wait [label = "ep_wait" shape=box;]; handle [label = "receive events \nfrom rdllist" shape=box;]; return [label = "return" shape=box; ]; vpoll -> ioctl [style=dashed arrowhead=none]; ioctl -> wake_epoll; ioctl -> insert; epoll -> ep_wait [style=dashed arrowhead=none] ep_wait -> ep_wait [label = " sleep"] ep_wait -> handle handle -> return wake_epoll -> ep_wait; { rank=same; vpoll; epoll; } { rank=same; ioctl; ep_wait; } } ``` 可以透過 ftrace 來驗證上述分析: ```c | __x64_sys_epoll_ctl() { | do_epoll_ctl() { | ep_insert() { | ep_item_poll.isra.0() { | vpoll_poll() { | ep_ptable_queue_proc() { | kmem_cache_alloc() { | _cond_resched() { 0.187 us | rcu_all_qs(); 0.635 us | } 0.224 us | should_failslab(); 0.389 us | memcg_kmem_get_cache(); 0.203 us | memcg_kmem_put_cache(); 3.638 us | } | add_wait_queue() { 0.223 us | _raw_spin_lock_irqsave(); 0.225 us | __lock_text_start(); 1.126 us | } 5.463 us | } + 21.730 us | } + 22.259 us | } + 27.386 us | } + 31.464 us | } + 32.281 us | } ``` `epoll_ctl` 呼叫到我們實作的 `vpoll_poll`,並呼叫之前在 `poll_wait` 內放到 wait queue 中的 `ep_ptable_queue_proc`。 ```c | __x64_sys_ioctl() { | ksys_ioctl() { | __fdget() { 0.217 us | __fget_light(); 0.520 us | } 0.167 us | security_file_ioctl(); 0.300 us | do_vfs_ioctl(); | vpoll_ioctl() { 0.204 us | _raw_spin_lock_irq(); | __wake_up_locked_key() { | __wake_up_common() { | ep_poll_callback() { 0.214 us | _raw_read_lock_irqsave(); 0.171 us | _raw_read_unlock_irqrestore(); 0.912 us | } 1.373 us | } 1.681 us | } 2.356 us | } 4.321 us | } 4.641 us | } ``` `ioctl` 確實呼叫到 `__wake_up_common` ,並且喚醒之前的 hook function: `ep_poll_callback`。 ```c | __x64_sys_epoll_wait() { | do_epoll_wait() { | ep_poll() { | schedule_hrtimeout_range() { | schedule() { ... 1000931 us | } | hrtimer_try_to_cancel() { 0.173 us | hrtimer_active(); 0.498 us | } 1000935 us | } 1000936 us | } 1000939 us | } 1000941 us | } ``` `epoll_wait` 呼叫 `ep_poll` 並等待 `rdllist` 不為空,如果為空 (如 `user.c` 等待 1 秒) 則會進入 scheduler 切換 process。可以發現中間的確呼叫 `schedule` 做排程,且 `schedule` 運行時間為 `1000931 us`,差不多是 `1 s` 的時間在等待,與程式內容相符。 ###### tags: `linux2021`