Try   HackMD

2021q3 Homework (simrupt)

contributed by < linD026 >

tags: linux2021

2021 年暑期 第 3 週隨堂測驗題目


simrupt

解釋本核心模組運作原理

kfifo

/* Data are stored into a kfifo buffer before passing them to the userspace */
static struct kfifo rx_fifo;

/* NOTE: the usage of kfifo is safe (no need for extra locking), until there is
 * only one concurrent reader and one concurrent writer. Writes are serialized
 * from the interrupt context, readers are serialized using this mutex.
 */
static DEFINE_MUTEX(read_lock);

/* Wait queue to implement blocking I/O from userspace */
static DECLARE_WAIT_QUEUE_HEAD(rx_wait);
  • kfifo_alloc(&rx_fifo, PAGE_SIZE, GFP_KERNEL)
  • kfifo_free
/* Insert a value into the kfifo buffer */
static void produce_data(unsigned char val)
{
    /* Implement a kind of circular FIFO here (skip oldest element if kfifo
     * buffer is full).
     */
    unsigned int len = kfifo_in(&rx_fifo, &val, sizeof(val));
    if (unlikely(len < sizeof(val)) && printk_ratelimit())
        pr_warn("%s: %zu bytes dropped\n", __func__, sizeof(val) - len);

    pr_debug("simrupt: %s: in %u/%u bytes\n", __func__, len,
             kfifo_len(&rx_fifo));
}
  • kfifo_in — put data into the fifo
    • fifo address of the fifo to be used
    • buf the data to be added
    • n number of elements to be added

為何 produce_data 會需要 len 來確認有沒有完整放入,是因為原始程式碼當中有做確認,若要放入的大小大於剩餘大小,則會以剩餘大小為主:

  • lib/kfifo.c
    ​​​​unsigned int __kfifo_in(struct __kfifo *fifo,
    ​​​​                const void *buf, unsigned int len)
    ​​​​{
    ​​​​        unsigned int l;
    
    ​​​​        l = kfifo_unused(fifo);
    ​​​​        if (len > l)
    ​​​​                len = l;
    
    ​​​​        kfifo_copy_in(fifo, buf, len, fifo->in);
    ​​​​        fifo->in += len;
    ​​​​        return len;
    ​​​​}
    ​​​​
    ​​​​/*
    ​​​​ * internal helper to calculate the unused elements in a fifo
    ​​​​ */
    ​​​​static inline unsigned int kfifo_unused(struct __kfifo *fifo)
    ​​​​{
    ​​​​        return (fifo->mask + 1) - (fifo->in - fifo->out);
    ​​​​}    
    
    而在 kfifo_copy_in 當中有使用到兩次 memcpy 是因為考量到在 unused 記憶體容許下,儲存物件會跨區:
    ​​​​static void kfifo_copy_in(struct __kfifo *fifo, const void *src,
    ​​​​                unsigned int len, unsigned int off)
    ​​​​{
    ​​​​        unsigned int size = fifo->mask + 1;
    ​​​​        unsigned int esize = fifo->esize;
    ​​​​        unsigned int l;
    
    ​​​​        off &= fifo->mask;
    ​​​​        if (esize != 1) {
    ​​​​                off *= esize;
    ​​​​                size *= esize;
    ​​​​                len *= esize;
    ​​​​        }
    ​​​​        l = min(len, size - off);
    
    ​​​​        memcpy(fifo->data + off, src, l);
    ​​​​        memcpy(fifo->data, src + l, len - l);
    ​​​​        /*
    ​​​​         * make sure that the data in the fifo is up to date before
    ​​​​         * incrementing the fifo->in index counter
    ​​​​         */
    ​​​​        smp_wmb();
    ​​​​}
    
/* Wait queue to implement blocking I/O from userspace */
static DECLARE_WAIT_QUEUE_HEAD(rx_wait);
simrupt_read() {
    ...
    ret = kfifo_to_user(&rx_fifo, buf, count, &read);
    ...
}
  • kfifo_to_user — copies data from the fifo into user space

    • fifo address of the fifo to be used
    • to where the data must be copied
    • len the size of the destination buffer
    • copied pointer to output variable to store the number of copied bytes
  • kfifo 單個 reader 以及單個 writer , include/linux/kfifo.h

/*
 * Note about locking: There is no locking required until only one reader
 * and one writer is using the fifo and no kfifo_reset() will be called.
 * kfifo_reset_out() can be safely used, until it will be only called
 * in the reader thread.
 * For multiple writer and one reader there is only a need to lock the writer.
 * And vice versa for only one writer and multiple reader there is only a need
 * to lock the reader.
 */

simrupt.c 也有提到:

/* NOTE: the usage of kfifo is safe (no need for extra locking), until there is
 * only one concurrent reader and one concurrent writer. Writes are serialized
 * from the interrupt context, readers are serialized using this mutex.
 */

正常:

 !"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\]^_`abcdefghijklmnopqrstuvwxyz{|}~ !"#$%&'()*+,-.^C

測試同時讀取:

ubuntu@ubuntu:~$ sudo cat /dev/simrupt > test.txt &
[1] 897
ubuntu@ubuntu:~$ sudo cat /dev/simrupt
QTWZ]`cfilorux{~"%(+.147:=@CFGJKNORUVYZ]^abefijmnqruvyz}~"#&'*+./2367:;>?BCFGJK^C
ubuntu@ubuntu:~$ cat test.txt
 !"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPRSUVXY[\^_abdeghjkmnpqstvwyz|} !#$&')*,-/0235689;<>?ABDEHILMPQSTWX[\_`cdghklopstwx{| !$%(),-014589<=@ADEHILMubuntu@ubuntu:~$ 

fast circurlar buffer

static struct circ_buf fast_buf;
fast_buf.buf = vmalloc(PAGE_SIZE);

vmalloc — allocate virtually contiguous memory

  • /include/linux/circ_buf.h
    ​​​​/* SPDX-License-Identifier: GPL-2.0 */
    ​​​​/*
    ​​​​ * See Documentation/core-api/circular-buffers.rst for more information.
    ​​​​ */
    
    ​​​​#ifndef _LINUX_CIRC_BUF_H
    ​​​​#define _LINUX_CIRC_BUF_H 1
    
    ​​​​struct circ_buf {
    ​​​​        char *buf;
    ​​​​        int head;
    ​​​​        int tail;
    ​​​​};
    
    ​​​​/* Return count in buffer.  */
    ​​​​#define CIRC_CNT(head,tail,size) (((head) - (tail)) & ((size)-1))
    
    ​​​​/* Return space available, 0..size-1.  We always leave one free char
    ​​​​   as a completely full buffer has head == tail, which is the same as
    ​​​​   empty.  */
    ​​​​#define CIRC_SPACE(head,tail,size) CIRC_CNT((tail),((head)+1),(size))
    
    ​​​​/* Return count up to the end of the buffer.  Carefully avoid
    ​​​​   accessing head and tail more than once, so they can change
    ​​​​   underneath us without returning inconsistent results.  */
    ​​​​#define CIRC_CNT_TO_END(head,tail,size) \
    ​​​​        ({int end = (size) - (tail); \
    ​​​​          int n = ((head) + end) & ((size)-1); \
    ​​​​          n < end ? n : end;})
    
    ​​​​/* Return space available up to the end of the buffer.  */
    ​​​​#define CIRC_SPACE_TO_END(head,tail,size) \
    ​​​​        ({int end = (size) - 1 - (head); \
    ​​​​          int n = (end + (tail)) & ((size)-1); \
    ​​​​          n <= end ? n : end+1;})
    
    ​​​​#endif /* _LINUX_CIRC_BUF_H  */
    
/* Clear all data from the circular buffer fast_buf */
static void fast_buf_clear(void)
{
    fast_buf.head = fast_buf.tail = 0;
}
/* Mutex to serialize fast_buf consumers: we can use a mutex because consumers
 * run in workqueue handler (kernel thread context).
 */
static DEFINE_MUTEX(consumer_lock);

生產,消費

static int fast_buf_get(void)
{
    struct circ_buf *ring = &fast_buf;

    /* prevent the compiler from merging or refetching accesses for tail */
    // unsigned long head = READ_ONCE(), tail = READ_ONCE();
    unsigned long head = (unsigned long)ring->head/*RRRR*/, tail = ring->tail;
    int ret;

    if (unlikely(!CIRC_CNT(head, tail, PAGE_SIZE)))
        return -ENOENT;

    /* read index before reading contents at that index */
    smp_read_barrier_depends();

    /* extract item from the buffer */
    // TTTT;
    ret = ring->buf[ring->tail];

    /* finish reading descriptor before incrementing tail */
    smp_mb();

    /* increment the tail pointer */
    ring->tail = (tail + 1) & (PAGE_SIZE - 1);

    return ret;
}

static int fast_buf_put(unsigned char val)
{
    struct circ_buf *ring = &fast_buf;
    unsigned long head = ring->head;

    /* prevent the compiler from merging or refetching accesses for tail */
    unsigned long tail = READ_ONCE(ring->tail);

    /* is circular buffer full? */
    if (unlikely(!CIRC_SPACE(head, tail, PAGE_SIZE)))
        return -ENOMEM;

    ring->buf[ring->head] = val;

    /* commit the item before incrementing the head */
    // MMBB;
    smp_wmb();

    /* update header pointer */
    ring->head = (ring->head + 1) & (PAGE_SIZE - 1);

    return 0;
}

workqueue

/* Wait queue to implement blocking I/O from userspace */
static DECLARE_WAIT_QUEUE_HEAD(rx_wait);
  • alloc_workqueue
  • flush_workqueue
  • destroy_workqueue
/* Workqueue handler: executed by a kernel thread */
static void simrupt_work_func(struct work_struct *w)
{
    int val, cpu;

    /* This code runs from a kernel thread, so softirqs and hard-irqs must
     * be enabled.
     */
    WARN_ON_ONCE(in_softirq());
    WARN_ON_ONCE(in_interrupt());

    /* Pretend to simulate access to per-CPU data, disabling preemption
     * during the pr_info().
     */
    cpu = get_cpu();
    pr_info("simrupt: [CPU#%d] %s\n", cpu, __func__);
    put_cpu();

    while (1) {
        /* Consume data from the circular buffer */
        mutex_lock(&consumer_lock);
        val = fast_buf_get();
        mutex_unlock(&consumer_lock);

        if (val < 0)
            break;

        /* Store data to the kfifo buffer */
        mutex_lock(&producer_lock);
        produce_data(val);
        mutex_unlock(&producer_lock);
    }
    wake_up_interruptible(&rx_wait);
}

/* Workqueue for asynchronous bottom-half processing */
static struct workqueue_struct *simrupt_workqueue;

/* Work item: holds a pointer to the function that is going to be executed
 * asynchronously.
 */
static DECLARE_WORK(work, simrupt_work_func);

Softirqs

softirqs can not be used by device drivers, they are reserved for various kernel subsystems. Because of this there is a fixed number of softirqs defined at compile time. For the current kernel version we have the following types defined:

enum {
    HI_SOFTIRQ = 0,
    TIMER_SOFTIRQ,
    NET_TX_SOFTIRQ,
    NET_RX_SOFTIRQ,
    BLOCK_SOFTIRQ,
    IRQ_POLL_SOFTIRQ,
    TASKLET_SOFTIRQ,
    SCHED_SOFTIRQ,
    HRTIMER_SOFTIRQ,
    RCU_SOFTIRQ,
    NR_SOFTIRQS
};

Each type has a specific purpose:

  • HI_SOFTIRQ and TASKLET_SOFTIRQ - running tasklets
  • TIMER_SOFTIRQ - running timers
  • NET_TX_SOFIRQ and NET_RX_SOFTIRQ - used by the networking subsystem
  • BLOCK_SOFTIRQ - used by the IO subsystem
  • BLOCK_IOPOLL_SOFTIRQ - used by the IO subsystem to increase performance when the iopoll handler is invoked;
  • SCHED_SOFTIRQ - load balancing
  • HRTIMER_SOFTIRQ - implementation of high precision timers
  • RCU_SOFTIRQ - implementation of RCU type mechanisms

The highest priority is the HI_SOFTIRQ type softirqs, followed in order by the other softirqs defined. RCU_SOFTIRQ has the lowest priority.

Softirqs are running in interrupt context which means that they can not call blocking functions. If the sofitrq handler requires calls to such functions, work queues can be scheduled to execute these blocking calls.


tasklet

/* Tasklet handler.
 *
 * NOTE: different tasklets can run concurrently on different processors, but
 * two of the same type of tasklet cannot run simultaneously. Moreover, a
 * tasklet always runs on the same CPU that schedules it.
 */
static void simrupt_tasklet_func(unsigned long __data)
{
    ktime_t tv_start, tv_end;
    s64 nsecs;

    WARN_ON_ONCE(!in_interrupt());
    WARN_ON_ONCE(!in_softirq());

    tv_start = ktime_get();
    queue_work(simrupt_workqueue, &work);
    tv_end = ktime_get();

    nsecs = (s64) ktime_to_ns(ktime_sub(tv_end, tv_start));

    pr_info("simrupt: [CPU#%d] %s in_softirq: %llu usec\n", smp_processor_id(),
            __func__, (unsigned long long) nsecs >> 10);
}

/* Tasklet for asynchronous bottom-half processing in softirq context */
static DECLARE_TASKLET(simrupt_tasklet, simrupt_tasklet_func, 0);

A tasklet is a special form of deferred work that runs in interrupt context, just like softirqs. The main difference between sofirqs and tasklets is that tasklets can be allocated dynamically and thus they can be used by device drivers. A tasklet is represented by struct tasklet and as many other kernel structures it needs to be initialized before being used. A pre-initialized tasklet can defined as following:

If we want to initialize the tasklet manually we can use the following approach:

void handler(unsigned long data);

struct tasklet_struct tasklet;

tasklet_init(&tasklet, handler, data);

The data parameter will be sent to the handler when it is executed.

Programming tasklets for running is called scheduling. Tasklets are running from softirqs. Tasklets scheduling is done with:

void tasklet_schedule(struct tasklet_struct *tasklet);

void tasklet_hi_schedule(struct tasklet_struct *tasklet);

When using tasklet_schedule, a TASKLET_SOFTIRQ softirq is scheduled and all tasklets scheduled are run. For tasklet_hi_schedule, a HI_SOFTIRQ softirq is scheduled.

If a tasklet was scheduled multiple times and it did not run between schedules, it will run once. Once the tasklet has run, it can be re-scheduled, and will run again at a later timer. Tasklets can be re-scheduled from their handlers.

Tasklets can be masked and the following functions can be used:

void tasklet_enable(struct tasklet_struct * tasklet );
void tasklet_disable(struct tasklet_struct * tasklet );

Remember that since tasklets are running from softirqs, blocking calls can not be used in the handler function.

銷毀

  • tasklet_kill(&simrupt_tasklet);

timer

static void process_data(void)
{
    WARN_ON_ONCE(!irqs_disabled());

    pr_info("simrupt: [CPU#%d] produce data\n", smp_processor_id());
    fast_buf_put(update_simrupt_data());

    pr_info("simrupt: [CPU#%d] scheduling tasklet\n", smp_processor_id());
    tasklet_schedule(&simrupt_tasklet);
}

static void timer_handler(struct timer_list *__timer)
{
    ktime_t tv_start, tv_end;
    s64 nsecs;

    pr_info("simrupt: [CPU#%d] enter %s\n", smp_processor_id(), __func__);
    /* We are using a kernel timer to simulate a hard-irq, so we must expect
     * to be in softirq context here.
     */
    WARN_ON_ONCE(!in_softirq());

    /* Disable interrupts for this CPU to simulate real interrupt context */
    local_irq_disable();

    tv_start = ktime_get();
    process_data();
    tv_end = ktime_get();

    nsecs = (s64) ktime_to_ns(ktime_sub(tv_end, tv_start));

    pr_info("simrupt: [CPU#%d] %s in_irq: %llu usec\n", smp_processor_id(),
            __func__, (unsigned long long) nsecs >> 10);
    mod_timer(&timer, jiffies + msecs_to_jiffies(delay));

    local_irq_enable();
}
/* Timer to simulate a periodic IRQ */
static struct timer_list timer;

A particular type of deferred work, very often used, are timers. They are defined by struct timer_list. They run in interrupt context and are implemented on top of softirqs.

To be used, a timer must first be initialized by calling timer_setup():

  • timer_setup(&timer, timer_handler, 0);
    ​​​​#include <linux/sched.h>
    
    ​​​​void timer_setup(struct timer_list * timer,
    ​​​​                 void (*function)(struct timer_list *),
    ​​​​                 unsigned int flags);
    

    The above function initializes the internal fields of the structure and associates function as the timer handler. Since timers are planned over softirqs, blocking calls can not be used in the code associated with the treatment function.

Scheduling a timer is done with mod_timer():

  • mod_timer(&timer, jiffies + msecs_to_jiffies(delay));
    ​​​​int mod_timer(struct timer_list *timer, unsigned long expires);
    

    Where expires is the time (in the future) to run the handler function. The function can be used to schedule or reschedule a timer.

The time unit is jiffie. The absolute value of a jiffie is dependent on the platform and it can be found using the HZ macro that defines the number of jiffies for 1 second. To convert between jiffies (jiffies_value) and seconds (seconds_value), the following formulas are used:

jiffies_value = seconds_value * HZ ;
seconds_value = jiffies_value / HZ ;

The kernel mantains a counter that contains the number of jiffies since the last boot, which can be accessed via the jiffies global variable or macro. We can use it to calculate a time in the future for timers:

#include <linux/jiffies.h>

unsigned long current_jiffies, next_jiffies;
unsigned long seconds = 1;

current_jiffies = jiffies;
next_jiffies = jiffies + seconds * HZ;
To stop a timer, use del_timer() and del_timer_sync():

int del_timer(struct timer_list *timer);
int del_timer_sync(struct timer_list *timer);

file operation

static ssize_t simrupt_read(struct file *file,
                            char __user *buf,
                            size_t count,
                            loff_t *ppos)
{
    unsigned int read;
    int ret;

    pr_debug("simrupt: %s(%p, %zd, %lld)\n", __func__, buf, count, *ppos);

    if (unlikely(!access_ok(buf, count)))
        return -EFAULT;

    if (mutex_lock_interruptible(&read_lock))
        return -ERESTARTSYS;

    do {
        ret = kfifo_to_user(&rx_fifo, buf, count, &read);
        if (unlikely(ret < 0))
            break;
        if (read)
            break;
        if (file->f_flags & O_NONBLOCK) {
            ret = -EAGAIN;
            break;
        }
        ret = wait_event_interruptible(rx_wait, kfifo_len(&rx_fifo));
    } while (ret == 0);
    pr_debug("simrupt: %s: out %u/%u bytes\n", __func__, read,
             kfifo_len(&rx_fifo));

    mutex_unlock(&read_lock);

    return ret ? ret : read;
}
  • debian.org - wait_event_interruptible

    wait_event_interruptible(wq, condition); - sleep until a condition gets true

    • wq the waitqueue to wait on
    • condition a C expression for the event to wait for

    The process is put to sleep (TASK_INTERRUPTIBLE) until the condition evaluates to true or a signal is received. The condition is checked each time the waitqueue wq is woken up.
    wake_up has to be called after changing any variable that could change the result of the wait condition.
    The function will return -ERESTARTSYS if it was interrupted by a signal and 0 if condition evaluated to true.

static int simrupt_open(struct inode *inode, struct file *filp)
{
    pr_debug("simrupt: %s\n", __func__);
    mod_timer(&timer, jiffies + msecs_to_jiffies(delay));
    return 0;
}

static int simrupt_release(struct inode *inode, struct file *filp)
{
    pr_debug("simrupt: %s\n", __func__);
    del_timer_sync(&timer);
    flush_workqueue(simrupt_workqueue);
    fast_buf_clear();

    return 0;
}

最後,整體流程圖:







main



timer

timer



pd

process_data



timer->pd





fbp

fast_buf_put



pd->fbp


produce_data



tl

tasklet



pd->tl


tasklet schedule



wq

workqueue

work
 (simrupt_work_func)

fast_buf_get

produce_data



tl->wq


queue work



k

kfifo_in

kfifo

kfifo_to_user



wq:p->k:i





r

simrupt_read



k:u->r





dmesg 中 pr_info 資訊:

[20744.840045] simrupt: [CPU#0] enter timer_handler
[20744.840144] simrupt: [CPU#0] produce data
[20744.840176] simrupt: [CPU#0] scheduling tasklet
[20744.840189] simrupt: [CPU#0] timer_handler in_irq: 43 usec
[20744.840284] simrupt: [CPU#0] simrupt_tasklet_func in_softirq: 14 usec
[20744.840290] simrupt: [CPU#0] simrupt_work_func

缺失與改進

kfifo 在宣告的時候是:

/* Data are stored into a kfifo buffer before passing them to the userspace */
static struct kfifo rx_fifo;

但 API 有提供 marco 宣告,考量大小為 PAGE_SIZE 且程式碼也以 kfifo_alloc 分配記憶體空間,應為:

static DECLARE_KFIFO_PTR(rx_fifo, unsigned char);

而關於 kfifo 以及 circular buffer 會遭遇到並行的問題,因為前者只提供 one reader 和 one writer 的情況;後者則只提供資料結構以及計算使用量等 API 。

/*
 * Note about locking: There is no locking required until only one reader
 * and one writer is using the fifo and no kfifo_reset() will be called.
 * kfifo_reset_out() can be safely used, until it will be only called
 * in the reader thread.
 * For multiple writer and one reader there is only a need to lock the writer.
 * And vice versa for only one writer and multiple reader there is only a need
 * to lock the reader.
 */

關於此問題,在後續 改寫 producer_lock 和 consumer_lock 時會說明。

lwn.net 有一篇關於 kfifo writer side lock-less support ,但去查閱 kfifo.h 沒有相關 commit 。再從這篇 Re: [RFC -v2] kfifo writer side lock-less support 以及之後的回應,應該是沒有被採用。


static int simrupt_open(struct inode *inode, struct file *filp)
{
    pr_debug("simrupt: %s\n", __func__);
    mod_timer(&timer, jiffies + msecs_to_jiffies(delay));
    return 0;
}

static int simrupt_release(struct inode *inode, struct file *filp)
{
    pr_debug("simrupt: %s\n", __func__);
    del_timer_sync(&timer);
    flush_workqueue(simrupt_workqueue);
    fast_buf_clear();

    return 0;
}

如果同時開啟檔案兩次,當其中一個關閉時會清空 timerfast_buf 等設定會導致另一個操作錯誤。因此,考量並行下的狀態,以 atomic_t 型態新增 open_cnt 變數來記錄,當為第一個開啟以及最後一個關閉的時候才會做原先的設定操作。

static atomic_t open_cnt;

static int simrupt_open(struct inode *inode, struct file *filp)
{
    pr_debug("simrupt: %s\n", __func__);
    if (atomic_inc_return(&open_cnt) == 1)
        mod_timer(&timer, jiffies + msecs_to_jiffies(delay));
    pr_info("openm current cnt: %d\n", atomic_read(&open_cnt));
    
    return 0;
}

static int simrupt_release(struct inode *inode, struct file *filp)
{
    pr_debug("simrupt: %s\n", __func__);
    if (atomic_dec_and_test(&open_cnt) == 0) {
        del_timer_sync(&timer);
        flush_workqueue(simrupt_workqueue);
        fast_buf_clear();
    }
    pr_info("release, current cnt: %d\n", atomic_read(&open_cnt));

    return 0;
}

static int __init simrupt_init(void)
{
    ...
    atomic_set(&open_cnt, 0);
    ...
}

poll 系統呼叫

  • poll(2) — Linux manual page

    poll() performs a similar task to select(2): it waits for one of a set of file descriptors to become ready to perform I/O. The Linux-specific epoll(7) API performs a similar task, but offers features beyond those found in poll().

    POLLIN
    There is data to read.
    POLLHUP
    Hang up (only returned in revents; ignored in events). Note that when reading from a channel such as a pipe or a stream socket, this event merely indicates that the peer closed its end of the channel. Subsequent reads from the channel will return 0 (end of file) only after all outstanding data in the channel has been consumed.

  • How to add poll function to the kernel module code?

    Add fortune_poll() function and add it (as .poll callback) to your file operations structure:

     static unsigned int fortune_poll(struct file *file, poll_table *wait)
     {
         poll_wait(file, &fortune_wait, wait);
         if (new-data-is-ready)
             return POLLIN | POLLRDNORM;
         return 0;
     }
    
     static const struct file_operations proc_test_fops = {
         ....
         .poll = fortune_poll,
     };
    

    Note that you should return POLLIN | POLLRDNORM if you have some new data to read, and 0 in case there is no new data to read (poll() call timed-out). See man 2 poll for details.

    Notify your waitqueue once you have new data:

    wake_up_interruptible(&fortune_wait);

    That's the basic stuff about implementing poll() operation. Depending on your task, you may be needed to use some waitqueue API in your .read function (like wait_event_interruptible()).

  • Implementing poll in a Linux kernel module

    Taking into the account that you haven't mentioned write() operation, I will assume further that your hardware is producing new data all the time. If it's so, the design you mentioned can be exactly what is confusing you:

    The read call is very simple. It starts a DMA write, and then waits on a wait queue.

    This is exactly what prevents you from working with your driver in regular, commonly used (and probably desired for you) way. Let's think out of the box and come up with the desired user interface first (how you would want to use your driver from user-space). The next case is commonly used and sufficient here (from my point of view):

    1. poll() your device file to wait for new data to arrive
    2. read() your device file to obtain arrived data

    Now you can see that data requesting (to DMA) should be started not by read() operation. The correct solution would be to read data continuously in the driver (without any triggering from user-space) and store it internally, and when user asks your driver for the data to consume (by read() operation) provide the user with data stored internally. If there is no data stored internally in driver user can wait for new data to arrive using poll() operation.

    As you can see, this is well-known producer-consumer problem.You can use circular buffer to store data from your hardware in your driver (so you intentionally lost old data when buffer is full to prevent buffer overflow situation). So the producer (DMA) writes to the head of that RX ring buffer, and the consumer (user performing read() from user-space) reads from tail of that RX ring buffer.

  • Why do we need to call poll_wait in poll?

poll_wait adds your device (represented by the "struct file") to the list of those that can wake the process up.

The idea is that the process can use poll (or select or epoll etc) to add a bunch of file descriptors to the list on which it wishes to wait. The poll entry for each driver gets called. Each one adds itself (via poll_wait) to the waiter list.

Then the core kernel blocks the process in one place. That way, any one of the devices can wake up the process. If you return non-zero mask bits, that means those "ready" attributes (readable/writable/etc) apply now.
So, in pseudo-code, it's roughly like this:

foreach fd:
    find device corresponding to fd
    call device poll function to setup wait queues (with poll_wait) and to collect its "ready-now" mask

while time remaining in timeout and no devices are ready:
    sleep

return from system call (either due to timeout or to ready devices)

Yes. When you call poll(2) in user space, that goes to a function called "sys_poll" inside the kernel (see fs/select.c in kernel source). Likewise, select(2) => sys_select, etc. All those functions follow more or less the pseudo-code I gave above.

/*
 * Do not touch the structure directly, use the access functions
 * poll_does_not_wait() and poll_requested_events() instead.
 */
typedef struct poll_table_struct {
	poll_queue_proc _qproc;
	__poll_t _key;
} poll_table;

simrupt_work_func 函式當中已有 wake_up_interruptible 呼叫:

/* Workqueue handler: executed by a kernel thread */
static void simrupt_work_func(struct work_struct *w)
{
    int val, cpu;

    /* This code runs from a kernel thread, so softirqs and hard-irqs must
     * be enabled.
     */
    WARN_ON_ONCE(in_softirq());
    WARN_ON_ONCE(in_interrupt());

    /* Pretend to simulate access to per-CPU data, disabling preemption
     * during the pr_info().
     */
    cpu = get_cpu();
    pr_info("simrupt: [CPU#%d] %s\n", cpu, __func__);
    put_cpu();

    while (1) {
        /* Consume data from the circular buffer */
        mutex_lock(&consumer_lock);
        val = fast_buf_get();
        mutex_unlock(&consumer_lock);

        if (val < 0)
            break;

        /* Store data to the kfifo buffer */
        mutex_lock(&producer_lock);
        produce_data(val);
        mutex_unlock(&producer_lock);
    }
    wake_up_interruptible(&rx_wait);
}

因此,在 module 當中新增:

static __poll_t simrupt_poll(struct file *file, poll_table *wait)
{
    poll_wait(file, &rx_wait, wait);

    return kfifo_len(&rx_fifo) ? EPOLLIN | EPOLLRDNORM : 0;
}

撰寫 userspace 的程式碼:

#include <poll.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#define error(s)  do { perror(s); exit(EXIT_FAILURE); } while(0)
#define NUM 2

int main(void)
{
    struct pollfd pfds[NUM] = {0};
    int nfds = NUM, nopen = NUM - 1;

    for (int i = 0;i < NUM;i++) {
        pfds[i].fd = open("/dev/simrupt", O_RDONLY);
        if (pfds[i].fd == -1)
            error("open");
        printf("%d Opened on fd %d\n", i, pfds[i].fd);
        pfds[i].events = POLLIN;
    }

    while (nopen > 0) {
        int ready;

        printf("Calling poll() ");
        ready = poll(pfds, nfds, -1);
        if (ready == -1)
            error("poll");
        printf("ready %d\n", ready);

        for (int i = 0;i < nfds;i++) {
            char buf[20];
            if (pfds[i].revents != 0) {
                printf("fd=%d events: %s%s%s\n", pfds[i].fd,
                        (pfds[i].revents & POLLIN)  ? "POLLIN"  : "",
                        (pfds[i].revents & POLLHUP) ? "POLLHUP" : "",
                        (pfds[i].revents & POLLERR) ? "POLLERR" : "" 
                );
                if (pfds[i].revents & POLLIN) {
                    ssize_t s = read(pfds[i].fd, buf, sizeof(buf));
                    if (s == -1)
                        error("read");
                    printf("read %zd bytes: %.*s\n", s, (int)s, buf);
                }
                else {
                    printf("closing fd %d\n", pfds[i].fd);
                    if (close(pfds[i].fd) == -1)
                        error("close");
                    nopen--;
                }
            }
        }
    }
    printf("All done\n");
    return 0;
}

輸出:

0 Opened on fd 3
1 Opened on fd 4
Calling poll() ready 2
fd=3 events: POLLIN
read 1 bytes: !
fd=4 events: POLLIN
read 1 bytes: "
Calling poll() ready 2
fd=3 events: POLLIN
read 1 bytes: #
fd=4 events: POLLIN
read 1 bytes: $
Calling poll() ready 2
fd=3 events: POLLIN
read 1 bytes: %
fd=4 events: POLLIN
read 1 bytes: &
Calling poll() ready 2
fd=3 events: POLLIN
read 1 bytes: '
fd=4 events: POLLIN
read 1 bytes: (
Calling poll() ready 2
fd=3 events: POLLIN
read 1 bytes: )
...

改寫 producer_lockconsumer_lock

/* Mutex to serialize kfifo writers within the workqueue handler */
static DEFINE_MUTEX(producer_lock);

/* Mutex to serialize fast_buf consumers: we can use a mutex because consumers
 * run in workqueue handler (kernel thread context).
 */
static DEFINE_MUTEX(consumer_lock);
  • consumer_lock 改進, fast_buf lock-free :

    ​​​​static int fast_buf_get(void)
    ​​​​{
    ​​​​    struct circ_buf *ring = &fast_buf;
    ​​​​    unsigned long head, tail;
    ​​​​    int ret;
    
    ​​​​try_again:
    ​​​​    /* prevent the compiler from merging or refetching accesses for tail */
    ​​​​    head = READ_ONCE(ring->head) & (PAGE_SIZE - 1) /*RRRR*/;
    ​​​​    tail = READ_ONCE(ring->tail);
    
    ​​​​    if (unlikely(!CIRC_CNT(head, tail, PAGE_SIZE)))
    ​​​​        return -ENOENT;
    
    ​​​​    /* read index before reading contents at that index */
    ​​​​    smp_read_barrier_depends();
    
    ​​​​    /* extract item from the buffer */
    ​​​​    // TTTT; ret = ring->buf[ring->tail];
    ​​​​    ret = ring->buf[tail];
    
    ​​​​    /* finish reading descriptor before incrementing tail */
    ​​​​    smp_mb();
    
    ​​​​    /* increment the tail pointer */
    ​​​​    if (!atomic_cmpxchg((atomic_t*)&ring->tail, tail, (tail + 1) & (PAGE_SIZE - 1)))
    ​​​​        goto try_again;
    
    ​​​​    return ret;
    ​​​​}
    
    ​​​​static int fast_buf_put(unsigned char val)
    ​​​​{
    ​​​​    struct circ_buf *ring = &fast_buf;
    ​​​​    unsigned long head, tail;
    
    ​​​​try_again:
    ​​​​    head = READ_ONCE(ring->head) & (PAGE_SIZE - 1);
    ​​​​    /* prevent the compiler from merging or refetching accesses for tail */
    ​​​​    tail = READ_ONCE(ring->tail);
    
    ​​​​    /* is circular buffer full? */
    ​​​​    if (unlikely(!CIRC_SPACE(head, tail, PAGE_SIZE)))
    ​​​​        return -ENOMEM;
    
    ​​​​    if (!atomic_cmpxchg((atomic_t *)&ring->buf[ring->head], ring->buf[head], val))
    ​​​​        goto try_again;
    
    ​​​​    /* commit the item before incrementing the head */
    ​​​​    // MMBB;
    ​​​​    smp_wmb();
    
    ​​​​    /* update header pointer */
    ​​​​    head = atomic_inc_return((atomic_t *)&ring->head);
    ​​​​    if (head + 1 >= PAGE_SIZE) {
    ​​​​        atomic_cmpxchg((atomic_t *)&ring->head, head, head & (PAGE_SIZE - 1));
    ​​​​    }
    
    ​​​​    return 0;
    ​​​​}
    
  • producer_lock

    kfifo_in
    This macro copies the given buffer into the fifo and returns the number of copied elements.

    Note that with only one concurrent reader and one concurrent writer, you don't need extra locking to use these macro.

    因此在單個 writer 情況下是不需要 lock ,但此處會因 workqueue 導致有可能會有多個 writer 所以保留:

    ​​​​    while (1) {
    ​​​​        /* Consume data from the circular buffer */
    ​​​​        val = fast_buf_get();
    
    ​​​​        if (val < 0)
    ​​​​            break;
    
    ​​​​        /* Store data to the kfifo buffer */
    ​​​​        mutex_lock(&producer_lock);
    ​​​​        produce_data(val);
    ​​​​        mutex_unlock(&producer_lock);    
    ​​​​    }
    

    而在 kfifo_to_user 需要 lock ,則可以使用前面因 fast_buf lock-free 後捨棄的 consumer_lock

    ​​​​mutex_lock(&consumer_lock);
    ​​​​ret = kfifo_to_user(&rx_fifo, buf, count, &read);
    ​​​​mutex_unlock(&consumer_lock);
    

vwifi