# 2021q3 Homework (simrupt)
contributed by < [`linD026`](https://github.com/linD026) >
###### tags: `linux2021`
> [2021 年暑期 第 3 週隨堂測驗題目](https://hackmd.io/@sysprog/linux2021-summer-quiz3)
---
# simrupt
## 解釋本核心模組運作原理
### kfifo
```cpp
/* Data are stored into a kfifo buffer before passing them to the userspace */
static struct kfifo rx_fifo;
/* NOTE: the usage of kfifo is safe (no need for extra locking), until there is
* only one concurrent reader and one concurrent writer. Writes are serialized
* from the interrupt context, readers are serialized using this mutex.
*/
static DEFINE_MUTEX(read_lock);
/* Wait queue to implement blocking I/O from userspace */
static DECLARE_WAIT_QUEUE_HEAD(rx_wait);
```
* `kfifo_alloc(&rx_fifo, PAGE_SIZE, GFP_KERNEL)`
* `kfifo_free`
```cpp
/* Insert a value into the kfifo buffer */
static void produce_data(unsigned char val)
{
/* Implement a kind of circular FIFO here (skip oldest element if kfifo
* buffer is full).
*/
unsigned int len = kfifo_in(&rx_fifo, &val, sizeof(val));
if (unlikely(len < sizeof(val)) && printk_ratelimit())
pr_warn("%s: %zu bytes dropped\n", __func__, sizeof(val) - len);
pr_debug("simrupt: %s: in %u/%u bytes\n", __func__, len,
kfifo_len(&rx_fifo));
}
```
* `kfifo_in` — put data into the fifo
* `fifo` address of the fifo to be used
* `buf` the data to be added
* `n` number of elements to be added
為何 `produce_data` 會需要 `len` 來確認有沒有完整放入,是因為原始程式碼當中有做確認,若要放入的大小大於剩餘大小,則會以剩餘大小為主:
* [lib/kfifo.c](https://elixir.bootlin.com/linux/latest/source/lib/kfifo.c#L113)
```cpp
unsigned int __kfifo_in(struct __kfifo *fifo,
const void *buf, unsigned int len)
{
unsigned int l;
l = kfifo_unused(fifo);
if (len > l)
len = l;
kfifo_copy_in(fifo, buf, len, fifo->in);
fifo->in += len;
return len;
}
/*
* internal helper to calculate the unused elements in a fifo
*/
static inline unsigned int kfifo_unused(struct __kfifo *fifo)
{
return (fifo->mask + 1) - (fifo->in - fifo->out);
}
```
而在 `kfifo_copy_in` 當中有使用到兩次 `memcpy` 是因為考量到在 `unused` 記憶體容許下,儲存物件會跨區:
```cpp
static void kfifo_copy_in(struct __kfifo *fifo, const void *src,
unsigned int len, unsigned int off)
{
unsigned int size = fifo->mask + 1;
unsigned int esize = fifo->esize;
unsigned int l;
off &= fifo->mask;
if (esize != 1) {
off *= esize;
size *= esize;
len *= esize;
}
l = min(len, size - off);
memcpy(fifo->data + off, src, l);
memcpy(fifo->data, src + l, len - l);
/*
* make sure that the data in the fifo is up to date before
* incrementing the fifo->in index counter
*/
smp_wmb();
}
```
```cpp
/* Wait queue to implement blocking I/O from userspace */
static DECLARE_WAIT_QUEUE_HEAD(rx_wait);
```
```cpp
simrupt_read() {
...
ret = kfifo_to_user(&rx_fifo, buf, count, &read);
...
}
```
* `kfifo_to_user` — copies data from the fifo into user space
* `fifo` address of the fifo to be used
* `to` where the data must be copied
* `len` the size of the destination buffer
* `copied` pointer to output variable to store the number of copied bytes
* `kfifo` 單個 reader 以及單個 writer , [include/linux/kfifo.h](https://elixir.bootlin.com/linux/latest/source/include/linux/kfifo.h) :
```cpp
/*
* Note about locking: There is no locking required until only one reader
* and one writer is using the fifo and no kfifo_reset() will be called.
* kfifo_reset_out() can be safely used, until it will be only called
* in the reader thread.
* For multiple writer and one reader there is only a need to lock the writer.
* And vice versa for only one writer and multiple reader there is only a need
* to lock the reader.
*/
```
在 `simrupt.c` 也有提到:
```cpp
/* NOTE: the usage of kfifo is safe (no need for extra locking), until there is
* only one concurrent reader and one concurrent writer. Writes are serialized
* from the interrupt context, readers are serialized using this mutex.
*/
```
正常:
```shell
!"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\]^_`abcdefghijklmnopqrstuvwxyz{|}~ !"#$%&'()*+,-.^C
```
測試同時讀取:
```shell
ubuntu@ubuntu:~$ sudo cat /dev/simrupt > test.txt &
[1] 897
ubuntu@ubuntu:~$ sudo cat /dev/simrupt
QTWZ]`cfilorux{~"%(+.147:=@CFGJKNORUVYZ]^abefijmnqruvyz}~"#&'*+./2367:;>?BCFGJK^C
ubuntu@ubuntu:~$ cat test.txt
!"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPRSUVXY[\^_abdeghjkmnpqstvwyz|} !#$&')*,-/0235689;<>?ABDEHILMPQSTWX[\_`cdghklopstwx{| !$%(),-014589<=@ADEHILMubuntu@ubuntu:~$
```
---
### fast circurlar buffer
```cpp
static struct circ_buf fast_buf;
fast_buf.buf = vmalloc(PAGE_SIZE);
```
> vmalloc — allocate virtually contiguous memory
* [/include/linux/circ_buf.h](https://elixir.bootlin.com/linux/latest/source/include/linux/circ_buf.h)
```cpp
/* SPDX-License-Identifier: GPL-2.0 */
/*
* See Documentation/core-api/circular-buffers.rst for more information.
*/
#ifndef _LINUX_CIRC_BUF_H
#define _LINUX_CIRC_BUF_H 1
struct circ_buf {
char *buf;
int head;
int tail;
};
/* Return count in buffer. */
#define CIRC_CNT(head,tail,size) (((head) - (tail)) & ((size)-1))
/* Return space available, 0..size-1. We always leave one free char
as a completely full buffer has head == tail, which is the same as
empty. */
#define CIRC_SPACE(head,tail,size) CIRC_CNT((tail),((head)+1),(size))
/* Return count up to the end of the buffer. Carefully avoid
accessing head and tail more than once, so they can change
underneath us without returning inconsistent results. */
#define CIRC_CNT_TO_END(head,tail,size) \
({int end = (size) - (tail); \
int n = ((head) + end) & ((size)-1); \
n < end ? n : end;})
/* Return space available up to the end of the buffer. */
#define CIRC_SPACE_TO_END(head,tail,size) \
({int end = (size) - 1 - (head); \
int n = (end + (tail)) & ((size)-1); \
n <= end ? n : end+1;})
#endif /* _LINUX_CIRC_BUF_H */
```
```cpp
/* Clear all data from the circular buffer fast_buf */
static void fast_buf_clear(void)
{
fast_buf.head = fast_buf.tail = 0;
}
```
```cpp
/* Mutex to serialize fast_buf consumers: we can use a mutex because consumers
* run in workqueue handler (kernel thread context).
*/
static DEFINE_MUTEX(consumer_lock);
```
生產,消費
```cpp
static int fast_buf_get(void)
{
struct circ_buf *ring = &fast_buf;
/* prevent the compiler from merging or refetching accesses for tail */
// unsigned long head = READ_ONCE(), tail = READ_ONCE();
unsigned long head = (unsigned long)ring->head/*RRRR*/, tail = ring->tail;
int ret;
if (unlikely(!CIRC_CNT(head, tail, PAGE_SIZE)))
return -ENOENT;
/* read index before reading contents at that index */
smp_read_barrier_depends();
/* extract item from the buffer */
// TTTT;
ret = ring->buf[ring->tail];
/* finish reading descriptor before incrementing tail */
smp_mb();
/* increment the tail pointer */
ring->tail = (tail + 1) & (PAGE_SIZE - 1);
return ret;
}
static int fast_buf_put(unsigned char val)
{
struct circ_buf *ring = &fast_buf;
unsigned long head = ring->head;
/* prevent the compiler from merging or refetching accesses for tail */
unsigned long tail = READ_ONCE(ring->tail);
/* is circular buffer full? */
if (unlikely(!CIRC_SPACE(head, tail, PAGE_SIZE)))
return -ENOMEM;
ring->buf[ring->head] = val;
/* commit the item before incrementing the head */
// MMBB;
smp_wmb();
/* update header pointer */
ring->head = (ring->head + 1) & (PAGE_SIZE - 1);
return 0;
}
```
---
### workqueue
```cpp
/* Wait queue to implement blocking I/O from userspace */
static DECLARE_WAIT_QUEUE_HEAD(rx_wait);
```
* `alloc_workqueue`
* `flush_workqueue`
* `destroy_workqueue`
```cpp
/* Workqueue handler: executed by a kernel thread */
static void simrupt_work_func(struct work_struct *w)
{
int val, cpu;
/* This code runs from a kernel thread, so softirqs and hard-irqs must
* be enabled.
*/
WARN_ON_ONCE(in_softirq());
WARN_ON_ONCE(in_interrupt());
/* Pretend to simulate access to per-CPU data, disabling preemption
* during the pr_info().
*/
cpu = get_cpu();
pr_info("simrupt: [CPU#%d] %s\n", cpu, __func__);
put_cpu();
while (1) {
/* Consume data from the circular buffer */
mutex_lock(&consumer_lock);
val = fast_buf_get();
mutex_unlock(&consumer_lock);
if (val < 0)
break;
/* Store data to the kfifo buffer */
mutex_lock(&producer_lock);
produce_data(val);
mutex_unlock(&producer_lock);
}
wake_up_interruptible(&rx_wait);
}
/* Workqueue for asynchronous bottom-half processing */
static struct workqueue_struct *simrupt_workqueue;
/* Work item: holds a pointer to the function that is going to be executed
* asynchronously.
*/
static DECLARE_WORK(work, simrupt_work_func);
```
---
### Softirqs
softirqs can not be used by device drivers, they are reserved for various kernel subsystems. Because of this there is a fixed number of softirqs defined at compile time. For the current kernel version we have the following types defined:
```cpp
enum {
HI_SOFTIRQ = 0,
TIMER_SOFTIRQ,
NET_TX_SOFTIRQ,
NET_RX_SOFTIRQ,
BLOCK_SOFTIRQ,
IRQ_POLL_SOFTIRQ,
TASKLET_SOFTIRQ,
SCHED_SOFTIRQ,
HRTIMER_SOFTIRQ,
RCU_SOFTIRQ,
NR_SOFTIRQS
};
```
Each type has a specific purpose:
* `HI_SOFTIRQ` and `TASKLET_SOFTIRQ` - running tasklets
* `TIMER_SOFTIRQ` - running timers
* `NET_TX_SOFIRQ` and `NET_RX_SOFTIRQ` - used by the networking subsystem
* `BLOCK_SOFTIRQ` - used by the IO subsystem
* `BLOCK_IOPOLL_SOFTIRQ` - used by the IO subsystem to increase performance when the iopoll handler is invoked;
* `SCHED_SOFTIRQ` - load balancing
* `HRTIMER_SOFTIRQ` - implementation of high precision timers
* `RCU_SOFTIRQ` - implementation of RCU type mechanisms
The highest priority is the `HI_SOFTIRQ` type softirqs, followed in order by the other softirqs defined. `RCU_SOFTIRQ` has the lowest priority.
**Softirqs are running in interrupt context which means that they can not call blocking functions. If the sofitrq handler requires calls to such functions, work queues can be scheduled to execute these blocking calls.**
---
### tasklet
```cpp
/* Tasklet handler.
*
* NOTE: different tasklets can run concurrently on different processors, but
* two of the same type of tasklet cannot run simultaneously. Moreover, a
* tasklet always runs on the same CPU that schedules it.
*/
static void simrupt_tasklet_func(unsigned long __data)
{
ktime_t tv_start, tv_end;
s64 nsecs;
WARN_ON_ONCE(!in_interrupt());
WARN_ON_ONCE(!in_softirq());
tv_start = ktime_get();
queue_work(simrupt_workqueue, &work);
tv_end = ktime_get();
nsecs = (s64) ktime_to_ns(ktime_sub(tv_end, tv_start));
pr_info("simrupt: [CPU#%d] %s in_softirq: %llu usec\n", smp_processor_id(),
__func__, (unsigned long long) nsecs >> 10);
}
/* Tasklet for asynchronous bottom-half processing in softirq context */
static DECLARE_TASKLET(simrupt_tasklet, simrupt_tasklet_func, 0);
```
A tasklet is a special form of deferred work that runs in interrupt context, just like softirqs. The main difference between sofirqs and tasklets is that tasklets can be allocated dynamically and thus they can be used by device drivers. A tasklet is represented by `struct tasklet` and as many other kernel structures it needs to be initialized before being used. A pre-initialized tasklet can defined as following:
If we want to initialize the tasklet manually we can use the following approach:
```cpp
void handler(unsigned long data);
struct tasklet_struct tasklet;
tasklet_init(&tasklet, handler, data);
```
The data parameter will be sent to the handler when it is executed.
Programming tasklets for running is called scheduling. Tasklets are running from softirqs. Tasklets scheduling is done with:
```cpp
void tasklet_schedule(struct tasklet_struct *tasklet);
void tasklet_hi_schedule(struct tasklet_struct *tasklet);
```
When using `tasklet_schedule`, a `TASKLET_SOFTIRQ` softirq is scheduled and all tasklets scheduled are run. For `tasklet_hi_schedule`, a `HI_SOFTIRQ` softirq is scheduled.
**If a tasklet was scheduled multiple times and it did not run between schedules, it will run once. Once the tasklet has run, it can be re-scheduled, and will run again at a later timer. Tasklets can be re-scheduled from their handlers.**
Tasklets can be masked and the following functions can be used:
```cpp
void tasklet_enable(struct tasklet_struct * tasklet );
void tasklet_disable(struct tasklet_struct * tasklet );
```
Remember that since tasklets are running from softirqs, blocking calls can not be used in the handler function.
銷毀
* `tasklet_kill(&simrupt_tasklet);`
---
### timer
```cpp
static void process_data(void)
{
WARN_ON_ONCE(!irqs_disabled());
pr_info("simrupt: [CPU#%d] produce data\n", smp_processor_id());
fast_buf_put(update_simrupt_data());
pr_info("simrupt: [CPU#%d] scheduling tasklet\n", smp_processor_id());
tasklet_schedule(&simrupt_tasklet);
}
static void timer_handler(struct timer_list *__timer)
{
ktime_t tv_start, tv_end;
s64 nsecs;
pr_info("simrupt: [CPU#%d] enter %s\n", smp_processor_id(), __func__);
/* We are using a kernel timer to simulate a hard-irq, so we must expect
* to be in softirq context here.
*/
WARN_ON_ONCE(!in_softirq());
/* Disable interrupts for this CPU to simulate real interrupt context */
local_irq_disable();
tv_start = ktime_get();
process_data();
tv_end = ktime_get();
nsecs = (s64) ktime_to_ns(ktime_sub(tv_end, tv_start));
pr_info("simrupt: [CPU#%d] %s in_irq: %llu usec\n", smp_processor_id(),
__func__, (unsigned long long) nsecs >> 10);
mod_timer(&timer, jiffies + msecs_to_jiffies(delay));
local_irq_enable();
}
```
```cpp
/* Timer to simulate a periodic IRQ */
static struct timer_list timer;
```
A particular type of deferred work, very often used, are timers. They are defined by struct timer_list. They run in interrupt context and are implemented on top of softirqs.
To be used, a timer must first be initialized by calling timer_setup():
* `timer_setup(&timer, timer_handler, 0);`
```cpp
#include <linux/sched.h>
void timer_setup(struct timer_list * timer,
void (*function)(struct timer_list *),
unsigned int flags);
```
> The above function initializes the internal fields of the structure and associates function as the timer handler. Since timers are planned over softirqs, blocking calls can not be used in the code associated with the treatment function.
Scheduling a timer is done with `mod_timer()`:
* `mod_timer(&timer, jiffies + msecs_to_jiffies(delay));`
```cpp
int mod_timer(struct timer_list *timer, unsigned long expires);
```
> Where expires is the time (in the future) to run the handler function. The function can be used to schedule or reschedule a timer.
**The time unit is `jiffie`. The absolute value of a `jiffie` is dependent on the platform and it can be found using the HZ macro that defines the number of jiffies for 1 second.** To convert between jiffies (`jiffies_value`) and seconds (`seconds_value`), the following formulas are used:
```cpp
jiffies_value = seconds_value * HZ ;
seconds_value = jiffies_value / HZ ;
```
The kernel mantains a counter that contains the number of jiffies since the last boot, which can be accessed via the jiffies global variable or macro. We can use it to calculate a time in the future for timers:
```cpp
#include <linux/jiffies.h>
unsigned long current_jiffies, next_jiffies;
unsigned long seconds = 1;
current_jiffies = jiffies;
next_jiffies = jiffies + seconds * HZ;
To stop a timer, use del_timer() and del_timer_sync():
int del_timer(struct timer_list *timer);
int del_timer_sync(struct timer_list *timer);
```
:::info
**Reference**
* [The Linux Kernel 5.10.14 - Deferred work](https://linux-kernel-labs.github.io/refs/heads/master/labs/deferred_work.html)
:::
---
### file operation
```cpp
static ssize_t simrupt_read(struct file *file,
char __user *buf,
size_t count,
loff_t *ppos)
{
unsigned int read;
int ret;
pr_debug("simrupt: %s(%p, %zd, %lld)\n", __func__, buf, count, *ppos);
if (unlikely(!access_ok(buf, count)))
return -EFAULT;
if (mutex_lock_interruptible(&read_lock))
return -ERESTARTSYS;
do {
ret = kfifo_to_user(&rx_fifo, buf, count, &read);
if (unlikely(ret < 0))
break;
if (read)
break;
if (file->f_flags & O_NONBLOCK) {
ret = -EAGAIN;
break;
}
ret = wait_event_interruptible(rx_wait, kfifo_len(&rx_fifo));
} while (ret == 0);
pr_debug("simrupt: %s: out %u/%u bytes\n", __func__, read,
kfifo_len(&rx_fifo));
mutex_unlock(&read_lock);
return ret ? ret : read;
}
```
* [debian.org - `wait_event_interruptible`](https://manpages.debian.org/jessie/linux-manual-3.16/wait_event_interruptible.9.en.html)
> `wait_event_interruptible(wq, condition);` - sleep until a condition gets true
> - `wq` the waitqueue to wait on
> - `condition` a C expression for the event to wait for
>
> The process is put to sleep (`TASK_INTERRUPTIBLE`) until the `condition` evaluates to true or a signal is received. The `condition` is checked each time the waitqueue `wq` is woken up.
> `wake_up` has to be called after changing any variable that could change the result of the wait `condition`.
> The function will return `-ERESTARTSYS` if it was interrupted by a signal and `0` if `condition` evaluated to `true`.
```cpp
static int simrupt_open(struct inode *inode, struct file *filp)
{
pr_debug("simrupt: %s\n", __func__);
mod_timer(&timer, jiffies + msecs_to_jiffies(delay));
return 0;
}
static int simrupt_release(struct inode *inode, struct file *filp)
{
pr_debug("simrupt: %s\n", __func__);
del_timer_sync(&timer);
flush_workqueue(simrupt_workqueue);
fast_buf_clear();
return 0;
}
```
---
最後,整體流程圖:
```graphviz
digraph main {
node [shape = box]
rankdir = LR
timer [label = "timer"]
pd [label = "process_data"]
fbp [label = "fast_buf_put"]
tl [label = "tasklet"]
wq [label = "workqueue|{work\n (simrupt_work_func)|{<g> fast_buf_get|<p> produce_data}}", shape = record]
k [label = "<i>kfifo_in|kfifo|<u>kfifo_to_user", shape = record]
r [label = "simrupt_read"]
timer -> pd
pd -> fbp [label = "produce_data"]
pd -> tl [label = "tasklet schedule"]
tl -> wq [label = "queue work"]
wq:p -> k:i
k:u -> r
{rank=same r timer}
}
```
dmesg 中 `pr_info` 資訊:
```shell
[20744.840045] simrupt: [CPU#0] enter timer_handler
[20744.840144] simrupt: [CPU#0] produce data
[20744.840176] simrupt: [CPU#0] scheduling tasklet
[20744.840189] simrupt: [CPU#0] timer_handler in_irq: 43 usec
[20744.840284] simrupt: [CPU#0] simrupt_tasklet_func in_softirq: 14 usec
[20744.840290] simrupt: [CPU#0] simrupt_work_func
```
---
### 缺失與改進
`kfifo` 在宣告的時候是:
```cpp
/* Data are stored into a kfifo buffer before passing them to the userspace */
static struct kfifo rx_fifo;
```
但 API 有提供 marco 宣告,考量大小為 `PAGE_SIZE` 且程式碼也以 `kfifo_alloc` 分配記憶體空間,應為:
```cpp
static DECLARE_KFIFO_PTR(rx_fifo, unsigned char);
```
而關於 kfifo 以及 circular buffer 會遭遇到並行的問題,因為前者只提供 one reader 和 one writer 的情況;後者則只提供資料結構以及計算使用量等 API 。
* [kfifo.h](https://elixir.bootlin.com/linux/latest/source/include/linux/kfifo.h#L29) 的說明:
```cpp
/*
* Note about locking: There is no locking required until only one reader
* and one writer is using the fifo and no kfifo_reset() will be called.
* kfifo_reset_out() can be safely used, until it will be only called
* in the reader thread.
* For multiple writer and one reader there is only a need to lock the writer.
* And vice versa for only one writer and multiple reader there is only a need
* to lock the reader.
*/
```
* [/include/linux/circ_buf.h](https://elixir.bootlin.com/linux/latest/source/include/linux/circ_buf.h)
* [lwn.net - enhanced reimplemention of the kfifo API](https://lwn.net/Articles/371485/)
關於此問題,在後續 **改寫 producer_lock 和 consumer_lock** 時會說明。
:::success
lwn.net 有一篇關於 [kfifo writer side lock-less support](https://lwn.net/Articles/391292/) ,但去查閱 [kfifo.h](https://github.com/torvalds/linux/commits/master/include/linux/kfifo.h) 沒有相關 commit 。再從這篇 [Re: \[RFC -v2\] kfifo writer side lock-less support](https://marc.info/?l=linux-kernel&m=128267726019751) 以及之後的回應,應該是沒有被採用。
:::
---
```cpp
static int simrupt_open(struct inode *inode, struct file *filp)
{
pr_debug("simrupt: %s\n", __func__);
mod_timer(&timer, jiffies + msecs_to_jiffies(delay));
return 0;
}
static int simrupt_release(struct inode *inode, struct file *filp)
{
pr_debug("simrupt: %s\n", __func__);
del_timer_sync(&timer);
flush_workqueue(simrupt_workqueue);
fast_buf_clear();
return 0;
}
```
如果同時開啟檔案兩次,當其中一個關閉時會清空 `timer` 、 `fast_buf` 等設定會導致另一個操作錯誤。因此,考量並行下的狀態,以 `atomic_t` 型態新增 `open_cnt` 變數來記錄,當為第一個開啟以及最後一個關閉的時候才會做原先的設定操作。
```cpp
static atomic_t open_cnt;
static int simrupt_open(struct inode *inode, struct file *filp)
{
pr_debug("simrupt: %s\n", __func__);
if (atomic_inc_return(&open_cnt) == 1)
mod_timer(&timer, jiffies + msecs_to_jiffies(delay));
pr_info("openm current cnt: %d\n", atomic_read(&open_cnt));
return 0;
}
static int simrupt_release(struct inode *inode, struct file *filp)
{
pr_debug("simrupt: %s\n", __func__);
if (atomic_dec_and_test(&open_cnt) == 0) {
del_timer_sync(&timer);
flush_workqueue(simrupt_workqueue);
fast_buf_clear();
}
pr_info("release, current cnt: %d\n", atomic_read(&open_cnt));
return 0;
}
static int __init simrupt_init(void)
{
...
atomic_set(&open_cnt, 0);
...
}
```
---
### poll 系統呼叫
* [poll(2) — Linux manual page](https://man7.org/linux/man-pages/man2/poll.2.html)
> `poll()` performs a similar task to `select(2)`: **it waits for one of a set of file descriptors to become ready to perform I/O.** The Linux-specific `epoll(7)` API performs a similar task, but offers features beyond those found in `poll()`.
>
> **POLLIN**
> There is data to read.
> **POLLHUP**
> Hang up (only returned in `revents`; ignored in `events`). Note that when reading from a channel such as a pipe or a stream socket, this event merely indicates that the peer closed its end of the channel. Subsequent reads from the channel will return `0` (end of file) only after all outstanding data in the channel has been consumed.
* [How to add poll function to the kernel module code?](https://stackoverflow.com/questions/30035776/how-to-add-poll-function-to-the-kernel-module-code)
> Add `fortune_poll()` function and add it (as `.poll` callback) to your file operations structure:
>
> ```cpp
> static unsigned int fortune_poll(struct file *file, poll_table *wait)
> {
> poll_wait(file, &fortune_wait, wait);
> if (new-data-is-ready)
> return POLLIN | POLLRDNORM;
> return 0;
> }
>
> static const struct file_operations proc_test_fops = {
> ....
> .poll = fortune_poll,
> };
> ```
> **Note that you should return `POLLIN | POLLRDNORM` if you have some new data to read, and `0` in case there is no new data to read (`poll()` call timed-out). See man 2 `poll` for details.**
>
> Notify your waitqueue once you have new data:
>
> `wake_up_interruptible(&fortune_wait);`
>
> That's the basic stuff about implementing `poll()` operation. Depending on your task, you may be needed to use some waitqueue API in your `.read` function (like `wait_event_interruptible()`).
* [Implementing poll in a Linux kernel module](https://stackoverflow.com/questions/34027366/implementing-poll-in-a-linux-kernel-module)
> Taking into the account that you haven't mentioned `write()` operation, I will assume further that your hardware is producing new data all the time. If it's so, the design you mentioned can be exactly what is confusing you:
>
>> The read call is very simple. It starts a DMA write, and then waits on a wait queue.
>
> This is exactly what prevents you from working with your driver in regular, commonly used (and probably desired for you) way. Let's think out of the box and come up with the desired user interface first (how you would want to use your driver from user-space). The next case is commonly used and sufficient here (from my point of view):
>
> 1. **poll()** your device file to wait for new data to arrive
> 2. **read()** your device file to obtain arrived data
>
> Now you can see that data requesting (to DMA) should be started not by `read()` operation. The correct solution would be to read data continuously in the driver (without any triggering from user-space) and store it internally, and when user asks your driver for the data to consume (by `read()` operation) -- provide the user with data stored internally. If there is no data stored internally in driver -- user can wait for new data to arrive using `poll()` operation.
>
> As you can see, this is well-known producer-consumer problem.You can use circular buffer to store data from your hardware in your driver (so you intentionally lost old data when buffer is full to prevent buffer overflow situation). So the producer (DMA) writes to the head of that RX ring buffer, and the consumer (user performing `read()` from user-space) reads from tail of that RX ring buffer.
* [Why do we need to call poll_wait in poll?](https://stackoverflow.com/questions/30234496/why-do-we-need-to-call-poll-wait-in-poll)
> `poll_wait` adds your device (represented by the "struct file") to the list of those that can wake the process up.
>
> The idea is that the process can use poll (or select or epoll etc) to add a bunch of file descriptors to the list on which it wishes to wait. The poll entry for each driver gets called. Each one adds itself (via `poll_wait`) to the waiter list.
>
> Then the core kernel blocks the process in one place. That way, any one of the devices can wake up the process. If you return non-zero mask bits, that means those "ready" attributes (readable/writable/etc) apply now.
> So, in pseudo-code, it's roughly like this:
>
> ```
> foreach fd:
> find device corresponding to fd
> call device poll function to setup wait queues (with poll_wait) and to collect its "ready-now" mask
>
> while time remaining in timeout and no devices are ready:
> sleep
>
> return from system call (either due to timeout or to ready devices)
> ```
>
>> Yes. When you call `poll(2)` in user space, that goes to a function called "sys_poll" inside the kernel (see fs/select.c in kernel source). Likewise, select(2) => sys_select, etc. All those functions follow more or less the pseudo-code I gave above.
* [/include/linux/poll.h](https://elixir.bootlin.com/linux/v5.7/source/include/linux/poll.h#L46)
```cpp
/*
* Do not touch the structure directly, use the access functions
* poll_does_not_wait() and poll_requested_events() instead.
*/
typedef struct poll_table_struct {
poll_queue_proc _qproc;
__poll_t _key;
} poll_table;
```
:::info
**Example - rtc**
* [/drivers/rtc/dev.c](https://elixir.bootlin.com/linux/v5.7/source/drivers/rtc/dev.c#L196)
* [/include/uapi/linux/rtc.h](https://elixir.bootlin.com/linux/v5.7/source/include/uapi/linux/rtc.h#L84)
* [/include/linux/rtc.h](https://elixir.bootlin.com/linux/v5.7/source/include/linux/rtc.h#L88)
* [/drivers/rtc/interface.c](https://elixir.bootlin.com/linux/v5.7/source/drivers/rtc/interface.c#L629)
:::
---
在 `simrupt_work_func` 函式當中已有 `wake_up_interruptible` 呼叫:
```cpp
/* Workqueue handler: executed by a kernel thread */
static void simrupt_work_func(struct work_struct *w)
{
int val, cpu;
/* This code runs from a kernel thread, so softirqs and hard-irqs must
* be enabled.
*/
WARN_ON_ONCE(in_softirq());
WARN_ON_ONCE(in_interrupt());
/* Pretend to simulate access to per-CPU data, disabling preemption
* during the pr_info().
*/
cpu = get_cpu();
pr_info("simrupt: [CPU#%d] %s\n", cpu, __func__);
put_cpu();
while (1) {
/* Consume data from the circular buffer */
mutex_lock(&consumer_lock);
val = fast_buf_get();
mutex_unlock(&consumer_lock);
if (val < 0)
break;
/* Store data to the kfifo buffer */
mutex_lock(&producer_lock);
produce_data(val);
mutex_unlock(&producer_lock);
}
wake_up_interruptible(&rx_wait);
}
```
因此,在 module 當中新增:
```cpp
static __poll_t simrupt_poll(struct file *file, poll_table *wait)
{
poll_wait(file, &rx_wait, wait);
return kfifo_len(&rx_fifo) ? EPOLLIN | EPOLLRDNORM : 0;
}
```
撰寫 userspace 的程式碼:
```cpp
#include <poll.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#define error(s) do { perror(s); exit(EXIT_FAILURE); } while(0)
#define NUM 2
int main(void)
{
struct pollfd pfds[NUM] = {0};
int nfds = NUM, nopen = NUM - 1;
for (int i = 0;i < NUM;i++) {
pfds[i].fd = open("/dev/simrupt", O_RDONLY);
if (pfds[i].fd == -1)
error("open");
printf("%d Opened on fd %d\n", i, pfds[i].fd);
pfds[i].events = POLLIN;
}
while (nopen > 0) {
int ready;
printf("Calling poll() ");
ready = poll(pfds, nfds, -1);
if (ready == -1)
error("poll");
printf("ready %d\n", ready);
for (int i = 0;i < nfds;i++) {
char buf[20];
if (pfds[i].revents != 0) {
printf("fd=%d events: %s%s%s\n", pfds[i].fd,
(pfds[i].revents & POLLIN) ? "POLLIN" : "",
(pfds[i].revents & POLLHUP) ? "POLLHUP" : "",
(pfds[i].revents & POLLERR) ? "POLLERR" : ""
);
if (pfds[i].revents & POLLIN) {
ssize_t s = read(pfds[i].fd, buf, sizeof(buf));
if (s == -1)
error("read");
printf("read %zd bytes: %.*s\n", s, (int)s, buf);
}
else {
printf("closing fd %d\n", pfds[i].fd);
if (close(pfds[i].fd) == -1)
error("close");
nopen--;
}
}
}
}
printf("All done\n");
return 0;
}
```
輸出:
```shell
0 Opened on fd 3
1 Opened on fd 4
Calling poll() ready 2
fd=3 events: POLLIN
read 1 bytes: !
fd=4 events: POLLIN
read 1 bytes: "
Calling poll() ready 2
fd=3 events: POLLIN
read 1 bytes: #
fd=4 events: POLLIN
read 1 bytes: $
Calling poll() ready 2
fd=3 events: POLLIN
read 1 bytes: %
fd=4 events: POLLIN
read 1 bytes: &
Calling poll() ready 2
fd=3 events: POLLIN
read 1 bytes: '
fd=4 events: POLLIN
read 1 bytes: (
Calling poll() ready 2
fd=3 events: POLLIN
read 1 bytes: )
...
```
---
### 改寫 `producer_lock` 和 `consumer_lock`
```cpp
/* Mutex to serialize kfifo writers within the workqueue handler */
static DEFINE_MUTEX(producer_lock);
/* Mutex to serialize fast_buf consumers: we can use a mutex because consumers
* run in workqueue handler (kernel thread context).
*/
static DEFINE_MUTEX(consumer_lock);
```
* `consumer_lock` 改進, `fast_buf` lock-free :
```cpp
static int fast_buf_get(void)
{
struct circ_buf *ring = &fast_buf;
unsigned long head, tail;
int ret;
try_again:
/* prevent the compiler from merging or refetching accesses for tail */
head = READ_ONCE(ring->head) & (PAGE_SIZE - 1) /*RRRR*/;
tail = READ_ONCE(ring->tail);
if (unlikely(!CIRC_CNT(head, tail, PAGE_SIZE)))
return -ENOENT;
/* read index before reading contents at that index */
smp_read_barrier_depends();
/* extract item from the buffer */
// TTTT; ret = ring->buf[ring->tail];
ret = ring->buf[tail];
/* finish reading descriptor before incrementing tail */
smp_mb();
/* increment the tail pointer */
if (!atomic_cmpxchg((atomic_t*)&ring->tail, tail, (tail + 1) & (PAGE_SIZE - 1)))
goto try_again;
return ret;
}
static int fast_buf_put(unsigned char val)
{
struct circ_buf *ring = &fast_buf;
unsigned long head, tail;
try_again:
head = READ_ONCE(ring->head) & (PAGE_SIZE - 1);
/* prevent the compiler from merging or refetching accesses for tail */
tail = READ_ONCE(ring->tail);
/* is circular buffer full? */
if (unlikely(!CIRC_SPACE(head, tail, PAGE_SIZE)))
return -ENOMEM;
if (!atomic_cmpxchg((atomic_t *)&ring->buf[ring->head], ring->buf[head], val))
goto try_again;
/* commit the item before incrementing the head */
// MMBB;
smp_wmb();
/* update header pointer */
head = atomic_inc_return((atomic_t *)&ring->head);
if (head + 1 >= PAGE_SIZE) {
atomic_cmpxchg((atomic_t *)&ring->head, head, head & (PAGE_SIZE - 1));
}
return 0;
}
```
* `producer_lock`
> `kfifo_in`
> This macro copies the given buffer into the fifo and returns the number of copied elements.
>
> Note that with only one concurrent reader and one concurrent writer, you don't need extra locking to use these macro.
因此在單個 writer 情況下是不需要 lock ,但此處會因 workqueue 導致有可能會有多個 writer 所以保留:
```cpp
while (1) {
/* Consume data from the circular buffer */
val = fast_buf_get();
if (val < 0)
break;
/* Store data to the kfifo buffer */
mutex_lock(&producer_lock);
produce_data(val);
mutex_unlock(&producer_lock);
}
```
而在 `kfifo_to_user` 需要 lock ,則可以使用前面因 `fast_buf` lock-free 後捨棄的 `consumer_lock` 。
```cpp
mutex_lock(&consumer_lock);
ret = kfifo_to_user(&rx_fifo, buf, count, &read);
mutex_unlock(&consumer_lock);
```
:::info
**Reference**
* [gigatux - timers](http://books.gigatux.nl/mirror/kerneldevelopment/0672327201/ch10lev1sec7.html)
* [kernel.org - Linux generic IRQ handling](https://www.kernel.org/doc/html/latest/core-api/genericirq.html)
* [kernel.org - interrupts](https://linux-kernel-labs.github.io/refs/heads/master/lectures/interrupts.html)
* [kernel.org - deferred work](https://linux-kernel-labs.github.io/refs/heads/master/labs/deferred_work.html)
* [kernel.org - Semantics and Behavior of Atomic and Bitmask Operations
](https://www.kernel.org/doc/html/v4.12/core-api/atomic_ops.html)
:::
---
# vwifi