Try   HackMD

N04: kxo

主講人: jserv / 課程討論區: 2025 年系統軟體課程

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
返回「Linux 核心設計」課程進度表

Linux 核心的並行處理

simrupt 專案名稱由 simulate 和 interrupt 二個單字組合而來,其作用是模擬 IRQ 事件,並展示以下 Linux 核心機制的運用:

timer_list

Linux Kernel 提供兩種類型的計時器,分別是 dynamic timer 和 interval timers ,前者用在 kernel space 當中而後者用在 user space 當中。 struct timer_list 則是運用在 kernel space 當中的 dynamic timers ( 參見 include/linux/timer_types.h )。

kernel/time/timer.c 當中定義以下函式,注意到每個 cpu 都有自己的 timer 。

void __init init_timers(void)
{
	init_timer_cpus();
	posix_cputimers_init_work();
	open_softirq(TIMER_SOFTIRQ, run_timer_softirq);
}

我們可以利用 $ sudo cat /proc/timer_list 來觀察 CPU timer_list 資訊。

Interrupt Handling

Interrupts and Interrupt Handling

在 Linux 核心當中的中斷大致上有數種特性,例如以下兩種

  • interrupt handler must execute quickly
  • sometimes an interrupt handler must do a large amount of work

有些情況是兩種特性都存在的,但它們看起來互相衝突,該如何解決呢? Linux kernel 採用一種方式叫 deferred interrupts ,將中斷處理延遲,並且將上述兩種特性分別稱為 top half 和 bottom half 。有時中斷處理會需要很大的工作量,但在一般的 interrupt handler 當中,中斷機制會被暫時取消,若在這時候做很大的工作量整個系統會全部都在等待它完成,為了避免這件事發生才把中斷處理拆成 top half 和 bottom half ,在 top half 只處理一點重要的事然後就將 bottom half 交由之後的 context 處理,在處理 bottom half 的時候是不允許中斷發生的。在 Linux 核心當中有三種 deferred interrupts 的機制

  • softirqs
  • tasklets
  • workqueues

延伸閱讀: Linux 核心設計: 中斷處理和現代架構考量

softirqs

透過稱為 ksoftirqd 的 kernel thread 來達成,每個 CPU 都有一個這樣的 thread ,可透過以下命令觀察

$ systemd-cgls -k | grep ksoft
├─  15 [ksoftirqd/0]
├─  23 [ksoftirqd/1]
├─  29 [ksoftirqd/2]
├─  35 [ksoftirqd/3]
├─  41 [ksoftirqd/4]
├─  47 [ksoftirqd/5]
├─  53 [ksoftirqd/6]
├─  59 [ksoftirqd/7]
│ │   └─6999 grep --color=auto ksoft

我們可以在 kernel/softirq.c 看到以下定義,分別有 softirq_vec, ksoftirqd, softirq_to_name ,每個 CPU 都有自己的 ksoftirqd kernel thread ,而這些 kernel thread 也有各自的 softirq_vec ,分別對應到 softirq_to_name 所對應的種類。

static struct softirq_action softirq_vec[NR_SOFTIRQS] __cacheline_aligned_in_smp;

DEFINE_PER_CPU(struct task_struct *, ksoftirqd);

const char * const softirq_to_name[NR_SOFTIRQS] = {
	"HI", "TIMER", "NET_TX", "NET_RX", "BLOCK", "IRQ_POLL",
	"TASKLET", "SCHED", "HRTIMER", "RCU"
};

利用以下命令觀察

$ sudo cat /proc/softirqs
                    CPU0       CPU1       CPU2       CPU3       CPU4       CPU5       CPU6       CPU7       
          HI:         63          0          1          0          0         11          0          5
       TIMER:      91569      81926      83934     103131      74011     127987     113794     125569
      NET_TX:          4          2          0          0          0          3          0         33
      NET_RX:       4400       2811       4047       3683       2501       5472       2422     152954
       BLOCK:        351        282        679        225        358      15623      39399        486
    IRQ_POLL:          0          0          0          0          0          0          0          0
     TASKLET:     173719          0         57        524         20         42          0       1908
       SCHED:     240706     199132     152716     184508     189760     380257     337690     202108
     HRTIMER:          0          0          0          0          0          0          0          1
         RCU:     142200     142248     117051     133957     138481     245684     225400     148630

被延遲的 interrupt 會被放到對應的欄位當中,透過 raise_softirq 來觸發, wakeup_softirqd 則是會觸發當前 CPU 的 ksoftirqd kernel thread 。

tasklets

tasklets 在 Linux kernel 當中的實作位在 /include/linux/interrupt.h

struct tasklet_struct
{
	struct tasklet_struct *next;
	unsigned long state;
	atomic_t count;
	bool use_callback;
	union {
		void (*func)(unsigned long data);
		void (*callback)(struct tasklet_struct *t);
	};
	unsigned long data;
};

它是實作在 softirq 上,另一種延遲中斷處理的機制,它依賴以下兩種 softirqs

  • TASKLET_SOFTIRQ
  • HI_SOFTIRQ

同一種類型的 tasklets 不能同時在多個處理器上運作,從以上 tasklet_struct 的定義來看,可以剖析它的實作包括

  • 指向位於 scheduling queue 當中下一個 tasklet 的指標
  • tasklet 的狀態
  • 該 tasklet 的 callback 函式
  • callback 函式的參數

Linux 核心利用以下三個函式來標示 tasklet 為 ready to run

  • tasklet_schedule()
  • tasklet_hi_schedule()
  • tasklet_hi_schedule_first()

這三個函式實作相近,差別在優先權,第一個函式所標註的 tasklet 優先權最低。

workqueues

workqueue 和 tasklet 的概念類似,但依舊有差別, tasklet 透過 software interrupt context 來執行,而 worqueue 當中的 work items 則是透過 kernel process ,這代表 work item 的執行不像 tasklet 一樣是 atomic 的 (換言之,整個 tasklet 的函式只能執行在最初被分配到的 CPU 上)。
Kernel 會建立稱為 worker threads 的 kernel threads 來處理 work items ,我們可以透過以下命令來觀察這些 kernel threads 。

$ systemd-cgls -k | grep kworker
├─   7 [kworker/0:0-events]
├─   8 [kworker/0:0H-events_highpri]
├─  25 [kworker/1:0H-events_highpri]
├─  31 [kworker/2:0H-events_highpri]
├─  37 [kworker/3:0H-events_highpri]
├─  43 [kworker/4:0H-events_highpri]
├─  55 [kworker/6:0H-events_highpri]
├─  60 [kworker/7:0-events]
├─  61 [kworker/7:0H-kblockd]
├─  72 [kworker/3:1-events]
├─  85 [kworker/3:1H-kblockd]
├─  88 [kworker/6:1-events]
├─  90 [kworker/1:1-mm_percpu_wq]
├─  91 [kworker/2:1-events]
├─  92 [kworker/5:1-mm_percpu_wq]
├─ 113 [kworker/4:1H-kblockd]
├─ 121 [kworker/u17:0-i915_flip]
├─ 128 [kworker/5:1H-kblockd]
├─ 129 [kworker/7:1H-kblockd]
├─ 148 [kworker/2:1H-kblockd]
├─ 152 [kworker/0:1H-kblockd]
├─ 153 [kworker/1:1H-kblockd]
├─ 166 [kworker/6:1H-kblockd]
├─ 181 [kworker/5:2H-kblockd]
├─ 918 [kworker/0:3-cgroup_destroy]
├─1323 [kworker/6:2-events]
├─1673 [kworker/u17:1]
├─4813 [kworker/4:0-events]
├─6021 [kworker/2:0-cgroup_destroy]
├─6031 [kworker/7:1-events]
├─6086 [kworker/4:1-cgroup_destroy]
├─6123 [kworker/3:0]
├─6140 [kworker/5:0]
├─6159 [kworker/u16:1-events_power_efficient]
├─6180 [kworker/u16:3-events_freezable_power_]
├─6923 [kworker/1:2]
├─7102 [kworker/u16:2-ext4-rsv-conversion]
├─7147 [kworker/u16:4-events_unbound]
├─7191 [kworker/2:2-events]
├─7192 [kworker/u16:0]
│ │   └─7195 grep --color=auto kworker

queue_work() 函式則是可以幫我們把 work item 放置到 workqueue 當中。

kfifo

kfifo 是 Linux 核心裡頭 First-In-First-Out (FIFO) 的結構,在 Single Producer Single Consumer (SPSC) 情況中是 safe 的,即不需要額外的 lock 維護,在程式碼中註解中也有提及。

在此專案中有一個 kfifo 資料結構 rx_fifo,用來儲存即將傳到 userspace 的 data。

/* Data are stored into a kfifo buffer before passing them to the userspace */
static struct kfifo rx_fifo;

/* NOTE: the usage of kfifo is safe (no need for extra locking), until there is
 * only one concurrent reader and one concurrent writer. Writes are serialized
 * from the interrupt context, readers are serialized using this mutex.
 */
static DEFINE_MUTEX(read_lock);

將 Data 插入到 rx_fifo 中,並檢查寫入的長度與避免過度輸出日誌而影響效能,之所以對 len 進行檢查的原因在於 kfifo_in 所回傳之值,是實際成功插入的數量。

/* Insert a value into the kfifo buffer */
static void produce_data(unsigned char val)
{
    /* Implement a kind of circular FIFO here (skip oldest element if kfifo
     * buffer is full).
     */
    unsigned int len = kfifo_in(&rx_fifo, &val, sizeof(val));
    if (unlikely(len < sizeof(val)) && printk_ratelimit())
        pr_warn("%s: %zu bytes dropped\n", __func__, sizeof(val) - len);

    pr_debug("simrupt: %s: in %u/%u bytes\n", __func__, len,
             kfifo_len(&rx_fifo));
}

kfifo_in(fifo, buf, n);

  • 複製 buf 資料並放到 fifo 中,最後會回傳插入的資料大小。
  • 在程式碼中,會先確認要放入的大小是否大於剩餘的大小,若超過,則會以剩餘大小為主。
unsigned int __kfifo_in(struct __kfifo *fifo,
                        const void *buf, unsigned int len)
{
    unsigned int l;

    l = kfifo_unused(fifo);
    if (len > l)
        len = l;

    kfifo_copy_in(fifo, buf, len, fifo->in);
    fifo->in += len;
    return len;
}

kfifo_to_user(fifo, to, len, copied);

  • 將最多 len 個 bytes 資料從 fifo 移到 userspace。

kfifo_alloc(fifo, size, gfp_mask);

  • 動態配置一個 fifo buffer,配置成功會回傳 0。若要釋放 fifo 可藉由 kfifo_free(fifo); 實現。

fast circular buffer

Circular Buffer 是個固定大小的緩衝區,其中具有 2 個 indicies:

  • head index: the point at which the producer inserts items into the buffer.
  • tail index: the point at which the consumer finds the next item in the buffer.

當 head 和 tail 重疊時,代表目前是空的緩衝區。相反的,當 head 比 tail 少 1 時,代表緩衝區是滿的。

當有項目被添加時,head index 會增加,當有項目被移除時,tail index 會被增加,tail 不會超過 head,且當兩者都到達緩衝區的末端時,都必須被設定回 0。也可以藉由此方法清除緩衝區中的資料。

{
    ...
    /* Allocate fast circular buffer */
    fast_buf.buf = vmalloc(PAGE_SIZE);
    ...

    /* Clear all data from the circular buffer fast_buf */
    fast_buf.head = fast_buf.tail = 0;
}

Measuring power-of-2 buffers: 讓緩衝區大小維持 2 的冪,就可以使用 bitwise 操作去計算緩衝區空間,避免使用較慢的 modulus (divide) 操作。

struct circ_buf {
    char *buf;
    int head;
    int tail;
};

/* Return count in buffer.  */
#define CIRC_CNT(head,tail,size) (((head) - (tail)) & ((size)-1))

/* Return space available, 0..size-1.  We always leave one free char
 * as a completely full buffer has head == tail, which is the same as
 * empty.
 */
#define CIRC_SPACE(head,tail,size) CIRC_CNT((tail),((head)+1),(size))

/* Return count up to the end of the buffer.  Carefully avoid
 * accessing head and tail more than once, so they can change
 * underneath us without returning inconsistent results.
 */
#define CIRC_CNT_TO_END(head,tail,size) \
        ({int end = (size) - (tail); \
          int n = ((head) + end) & ((size)-1); \
          n < end ? n : end;})

/* Return space available up to the end of the buffer.  */
#define CIRC_SPACE_TO_END(head,tail,size) \
        ({int end = (size) - 1 - (head); \
          int n = (end + (tail)) & ((size)-1); \
          n <= end ? n : end+1;})

CIRC_SPACE*() 被 producer 使用,CIRC_CNT*() 是 consumer 所用。

在 simrupt 中,一個「更快」的 circular buffer 被拿來儲存即將要放到 kfifo 的資料。

/* We use an additional "faster" circular buffer to quickly store data from
 * interrupt context, before adding them to the kfifo.
 */
static struct circ_buf fast_buf;

READ_ONCE() 是個 relaxed-ordering 且保證 atomic 的 memory operation,可以確保在多執行緒環境中,讀取到的值是正確的,並保證讀寫操作不會被編譯器最佳化所影響。

smp_rmb() 是個 memory barrier,會防止記憶體讀取指令的重排,確保先讀取索引值後再讀取內容。在〈Lockless patterns: relaxed access and partial memory barriers〉提到 smp_rmb()smp_wmb() 的 barrier 效果比 smp_load_acquire()smp_store_release() 還要來的差,但是因為 load-store 之間的排序關係很少有影響,所以開發人員常以 smp_rmb()smp_wmb() 作為 memory barrier。

fast_buf_get 扮演一個 consumer 的角色,會從緩衝區中取得資料,並更新 tail index。

static int fast_buf_get(void)
{
    struct circ_buf *ring = &fast_buf;

    /* prevent the compiler from merging or refetching accesses for tail */
    unsigned long head = READ_ONCE(ring->head), tail = ring->tail;
    int ret;

    if (unlikely(!CIRC_CNT(head, tail, PAGE_SIZE)))
        return -ENOENT;

    /* read index before reading contents at that index */
    smp_rmb();

    /* extract item from the buffer */
    ret = ring->buf[tail];

    /* finish reading descriptor before incrementing tail */
    smp_mb();

    /* increment the tail pointer */
    ring->tail = (tail + 1) & (PAGE_SIZE - 1);

    return ret;
}

fast_buf_put 扮演一個 producer 的角色,藉由 CIRC_SPACE() 判斷 buffer 中是否有剩餘空間,並更新 head index。

static int fast_buf_put(unsigned char val)
{
    struct circ_buf *ring = &fast_buf;
    unsigned long head = ring->head;

    /* prevent the compiler from merging or refetching accesses for tail */
    unsigned long tail = READ_ONCE(ring->tail);

    /* is circular buffer full? */
    if (unlikely(!CIRC_SPACE(head, tail, PAGE_SIZE)))
        return -ENOMEM;

    ring->buf[ring->head] = val;

    /* commit the item before incrementing the head */
    smp_wmb();

    /* update header pointer */
    ring->head = (ring->head + 1) & (PAGE_SIZE - 1);

    return 0;
}

process_data 函式呼叫 fast_buf_put(update_simrupt_data()); ,其中 update_simrupt_data() 會產生資料,這些資料的範圍在 0x200x7E 之間,即 ASCII 中的可顯示字元,這些資料會被放入 circular buffer 中,最後交由 tasklet_schedule 進行排程。

static void process_data(void)
{
    WARN_ON_ONCE(!irqs_disabled());

    pr_info("simrupt: [CPU#%d] produce data\n", smp_processor_id());
    fast_buf_put(update_simrupt_data());

    pr_info("simrupt: [CPU#%d] scheduling tasklet\n", smp_processor_id());
    tasklet_schedule(&simrupt_tasklet);
}

tasklet

tasklet 是基於 softirq 之上建立的,但最大的差別在於 tasklet 可以動態配置且可以被用在驅動裝置上。

tasklet 可以被 workqueues, timers 或 threaded interrupts 取代,但 kernel 中尚有使用 tasklet 的情況,Linux 核心開發者已著手 API 變更,而 DECLARE_TASKLET_OLD 的存在是顧及相容性。

Modernizing the tasklet API

#define DECLARE_TASKLET_OLD(arg1, arg2) DECLARE_TASKLET(arg1, arg2, 0L)

首先會先確保函式在 interrupt context 和 softirq context 中執行,使用 queue_work 將 work 放入 workqueue 中,並記錄執行時間。

/**
* queue_work - queue work on a workqueue
* @wq: workqueue to use
* @work: work to queue */
static inline bool queue_work(struct workqueue_struct *wq,
			      struct work_struct *work)
{
	return queue_work_on(WORK_CPU_UNBOUND, wq, work);
}
/* Tasklet handler.
 *
 * NOTE: different tasklets can run concurrently on different processors, but
 * two of the same type of tasklet cannot run simultaneously. Moreover, a
 * tasklet always runs on the same CPU that schedules it.
 */
static void simrupt_tasklet_func(unsigned long __data)
{
    ktime_t tv_start, tv_end;
    s64 nsecs;

    WARN_ON_ONCE(!in_interrupt());
    WARN_ON_ONCE(!in_softirq());

    tv_start = ktime_get();
    queue_work(simrupt_workqueue, &work);
    tv_end = ktime_get();

    nsecs = (s64) ktime_to_ns(ktime_sub(tv_end, tv_start));

    pr_info("simrupt: [CPU#%d] %s in_softirq: %llu usec\n", smp_processor_id(),
            __func__, (unsigned long long) nsecs >> 10);
}

/* Tasklet for asynchronous bottom-half processing in softirq context */
static DECLARE_TASKLET_OLD(simrupt_tasklet, simrupt_tasklet_func);

藉由上述註解可以得知:

  • 不同 tasklet 可以在不同 CPU 同時執行
  • 相同 tasklet 不能同時執行
  • 一個 tasklet 只會在排程它的 CPU 上執行
softirq tasklet
多個在同一個 CPU 執行? No No
相同的可在不同 CPU 執行? Yes No
會在同個 CPU 執行? Yes Maybe

tasklet_schedule()被呼叫時,代表此 tasklet 被允許在 CPU 上執行,詳見 linux/include/linux/interrupt.h

workqueue

linux/include/linux/workqueue.h

定義兩個 mutex lock: producer_lockconsumer_lock

/* Mutex to serialize kfifo writers within the workqueue handler */
static DEFINE_MUTEX(producer_lock);

/* Mutex to serialize fast_buf consumers: we can use a mutex because consumers
 * run in workqueue handler (kernel thread context).
 */
static DEFINE_MUTEX(consumer_lock);

get_cpu() 獲取目前 CPU 編號並 disable preemption,最後需要 put_cpu() 重新 enable preemption。

24-26行使用 mutex_lock(&consumer_lock) 鎖住消費者區域,防止其它的任務取得 circular buffer 的資料。

32-34行使用 mutex_lock(&producer_lock) 鎖住生產者區域,防止其它的任務寫入 kfifo buffer。

wake_up_interruptible(&rx_wait) 會換醒 wait queue 上的行程,將其狀態設置為 TASK_RUNNING。

/* Wait queue to implement blocking I/O from userspace */ static DECLARE_WAIT_QUEUE_HEAD(rx_wait); /* Workqueue handler: executed by a kernel thread */ static void simrupt_work_func(struct work_struct *w) { int val, cpu; /* This code runs from a kernel thread, so softirqs and hard-irqs must * be enabled. */ WARN_ON_ONCE(in_softirq()); WARN_ON_ONCE(in_interrupt()); /* Pretend to simulate access to per-CPU data, disabling preemption * during the pr_info(). */ cpu = get_cpu(); pr_info("simrupt: [CPU#%d] %s\n", cpu, __func__); put_cpu(); while (1) { /* Consume data from the circular buffer */ mutex_lock(&consumer_lock); val = fast_buf_get(); mutex_unlock(&consumer_lock); if (val < 0) break; /* Store data to the kfifo buffer */ mutex_lock(&producer_lock); produce_data(val); mutex_unlock(&producer_lock); } wake_up_interruptible(&rx_wait); }

在 workqueue 中執行的 work,可由以下方式定義。

  • DECLARE_WORK(name, void (*func) (void *), void *data) 會在編譯時,靜態地初始化 work。
  • INIT_WORK(struct work_struct *work, woid(*func) (void *), void *data) 在執行時,動態地初始化一個 work。
/* Workqueue for asynchronous bottom-half processing */
static struct workqueue_struct *simrupt_workqueue;

/* Work item: holds a pointer to the function that is going to be executed
 * asynchronously.
 */
static DECLARE_WORK(work, simrupt_work_func);

timer

藉由 timer_setup() 初始化 timer。

/* Setup the timer */
timer_setup(&timer, timer_handler, 0);

void timer_setup(struct timer_list * timer,
                 void (*function)(struct timer_list *),
                 unsigned int flags);

目標是模擬 hard-irq,所以必須確保目前是在 softirq context,欲模擬在 interrupt context 中處理中斷,所以針對該 CPU disable interrupts。

/* Timer to simulate a periodic IRQ */
static struct timer_list timer;

static void timer_handler(struct timer_list *__timer)
{
    ktime_t tv_start, tv_end;
    s64 nsecs;

    pr_info("simrupt: [CPU#%d] enter %s\n", smp_processor_id(), __func__);
    /* We are using a kernel timer to simulate a hard-irq, so we must expect
     * to be in softirq context here.
     */
    WARN_ON_ONCE(!in_softirq());

    /* Disable interrupts for this CPU to simulate real interrupt context */
    local_irq_disable();

    tv_start = ktime_get();
    process_data();
    tv_end = ktime_get();

    nsecs = (s64) ktime_to_ns(ktime_sub(tv_end, tv_start));

    pr_info("simrupt: [CPU#%d] %s in_irq: %llu usec\n", smp_processor_id(),
            __func__, (unsigned long long) nsecs >> 10);
    mod_timer(&timer, jiffies + msecs_to_jiffies(delay));

    local_irq_enable();
}

使用 mod_timer 對 timer 進行排程。

Jiffy 表示不具體的非常短暫的時間段,可藉由以下公式進行轉換。

jiffies_value = seconds_value * HZ;
seconds_value = jiffies_value / HZ;

simrupt_init

該函式進行許多資料結構的初始化:

  1. 配置給 kfifo 一個 PAGE_SIZE 大小的空間
    ​​​​kfifo_alloc(&rx_fifo, PAGE_SIZE, GFP_KERNEL)
    
  2. 配置給 circular buffer 一個 PAGE_SIZE 大小的空間
    ​​​​fast_buf.buf = vmalloc(PAGE_SIZE);
    
  3. 為 simrupt 註冊一個設備號,並且加入到系統中
    ​​​​ret = alloc_chrdev_region(&dev_id, 0, NR_SIMRUPT, DEV_NAME);
    ​​​​...
    ​​​​cdev_init(&simrupt_cdev, &simrupt_fops);
    ​​​​ret = cdev_add(&simrupt_cdev, dev_id, NR_SIMRUPT);
    
  4. 註冊設備到 sysfs,即可允許使用者空間運用裝置檔案 /dev/simrupt 來存取和控制該設備
    ​​​​device_create(simrupt_class, NULL, MKDEV(major, 0), NULL, DEV_NAME);
    
  5. 配置一個新的 workqueue
    ​​​​simrupt_workqueue = alloc_workqueue("simruptd", WQ_UNBOUND, WQ_MAX_ACTIVE);
    
  6. 設定 timer
    ​​​​timer_setup(&timer, timer_handler, 0);
    

simrupt 流程圖







simrupt



timer_handler

timer_handler



process_data

process_data



timer_handler->process_data





update_simrupt_data

update_simrupt_data



process_data->update_simrupt_data





simrupt_tasklet_func

simrupt_tasklet_func



process_data->simrupt_tasklet_func


 tasklet_schedule



fast_circular_buffer

fast_circular_buffer



update_simrupt_data->fast_circular_buffer


  fast_buf_put



simrupt_work_func

simrupt_work_func



fast_circular_buffer->simrupt_work_func


fast_buf_get



simrupt_tasklet_func->simrupt_work_func


 queue_work



kfifo

kfifo



simrupt_work_func->kfifo


  produce_data



simrupt 的使用

掛載核心模組。

$ sudo insmod simrupt.ko

掛載後,會產生一個裝置檔案/dev/simrupt,藉由以下命令可見到輸出的資料。

$ sudo cat /dev/simrupt 

參考輸出: (可能會有異)

!"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\]^_`abcdefghijklmnopqrstuvwxyz{|}~

dmesg 顯示核心訊息,加入 --follow 可即時查看。

sudo dmesg --follow

參考輸出:

[882265.813265] simrupt: [CPU#3] enter timer_handler
[882265.813297] simrupt: [CPU#3] produce data
[882265.813299] simrupt: [CPU#3] scheduling tasklet
[882265.813300] simrupt: [CPU#3] timer_handler in_irq: 2 usec
[882265.813350] simrupt: [CPU#3] simrupt_tasklet_func in_softirq: 0 usec
[882265.813383] simrupt: [CPU#5] simrupt_work_func

Linux 核心的 kfifo

kfifo 是一個 Circular buffer 的資料結構,而 ring-buffer 就是參考 kfifo 所實作。

在 simrupt_init 會先配置 buffer,使其具備一個 PAGE 的空間。fast_buf.buf = vmalloc(PAGE_SIZE); 將 buffer 的虛擬記憶體位址存在 fast_buf.buf







structs



main

main thread



buffer

buffer
(1 PAGESIZE)



main->buffer


put data



worker1

worker thread



kfifo

kfifo
(1 PAGESIZE)



worker1->kfifo


put data



worker2

worker thread



worker2->kfifo


put data



worker3

worker thread



worker3->kfifo


put data



userspace

userspace



kfifo->userspace


kfifo_to_user()



buffer->worker1


get data



buffer->worker2


get data



buffer->worker3


get data



主執行緒會將更新的字元放入 buffer 中,而每個 worker thread 則是使用函式 fast_buf_get() 從 buffer 取出資料後,藉由 produce_data() 放到 kfifo。

kfifo 適合的使用情境,可以在 linux/kfifo.h 中看到:

/*
 * Note about locking: There is no locking required until only one reader
 * and one writer is using the fifo and no kfifo_reset() will be called.
 * kfifo_reset_out() can be safely used, until it will be only called
 * in the reader thread.
 * For multiple writer and one reader there is only a need to lock the writer.
 * And vice versa for only one writer and multiple reader there is only a need
 * to lock the reader.
 */

選定 linux/samples/kfifo/ 作為應用案例,並參考 kfifo-examples 進行實驗。

  • 應用案例: record-example.c
  • 定義一個大小為 100 的 char 陣列 buf,用於臨時儲存資料。
  • 定義一個 struct hello,包含一個 unsigned char buf,且該 buf 初始化為 "hello"。
  • kfifo_in(&test, &hello, sizeof(hello)) 將 struct hello 寫入 kfifo buffer,並用 kfifo_peek_len(&test) 印出 kfifo buffer 下一個 record 的大小。
  • for 迴圈裡面,每次會使用 memset 將 buf 的前 i+1 個元素設置為 'a'+i,並用 kfifo_in(&test, buf, i + 1) 寫入 kfifo buffer。
  • kfifo_skip(&test) 跳過 kfifo buffer 的第一個值,即跳過 "hello"。
  • kfifo_out_peek(&test, buf, sizeof(buf) 會在不刪除元素情況下,印出 kfifo buffer 的第一個元素。
  • kfifo_len(&test) 印出目前 kfifo buffer 已佔用的空間。
  • while 迴圈用 kfifo_out(&test, buf, sizeof(buf)) 逐一比對 kfifo buffer 中的元素是不是和 excepted_result 中的元素一樣。
static int __init testfunc(void)
{
    char		buf[100];
    unsigned int	i;
    unsigned int	ret;
    struct { unsigned char buf[6]; } hello = { "hello" };

    printk(KERN_INFO "record fifo test start\n");

    kfifo_in(&test, &hello, sizeof(hello));

    /* show the size of the next record in the fifo */
    printk(KERN_INFO "fifo peek len: %u\n", kfifo_peek_len(&test));

    /* put in variable length data */
    for (i = 0; i < 10; i++) {
        memset(buf, 'a' + i, i + 1);
        kfifo_in(&test, buf, i + 1);
    }

    /* skip first element of the fifo */
    printk(KERN_INFO "skip 1st element\n");
    kfifo_skip(&test);

    printk(KERN_INFO "fifo len: %u\n", kfifo_len(&test));

    /* show the first record without removing from the fifo */
    ret = kfifo_out_peek(&test, buf, sizeof(buf));
    if (ret)
        printk(KERN_INFO "%.*s\n", ret, buf);

    /* check the correctness of all values in the fifo */
    i = 0;
    while (!kfifo_is_empty(&test)) {
        ret = kfifo_out(&test, buf, sizeof(buf));
        buf[ret] = '\0';
        printk(KERN_INFO "item = %.*s\n", ret, buf);
        if (strcmp(buf, expected_result[i++])) {
            printk(KERN_WARNING "value mismatch: test failed\n");
            return -EIO;
        }
    }
    if (i != ARRAY_SIZE(expected_result)) {
        printk(KERN_WARNING "size mismatch: test failed\n");
        return -EIO;
    }
    printk(KERN_INFO "test passed\n");

    return 0;
}

掛載核心模組。

$ sudo insmod record-example.ko

利用 dmesg 查看核心訊息

$ sudo dmesg
[360087.628314] record fifo test start
[360087.628316] fifo peek len: 6
[360087.628317] skip 1st element
[360087.628317] fifo len: 65
[360087.628318] a
[360087.628318] item = a
[360087.628319] item = bb
[360087.628319] item = ccc
[360087.628319] item = dddd
[360087.628319] item = eeeee
[360087.628320] item = ffffff
[360087.628320] item = ggggggg
[360087.628320] item = hhhhhhhh
[360087.628321] item = iiiiiiiii
[360087.628321] item = jjjjjjjjjj
[360087.628321] test passed
  • 應用案例: bytestream-example.c
  • 分別用 kfifo_inkfifo_put 將字串 "hello" 與數字 0-9 放入 kfifo buffer。
    • kfifo_in: 可一次將 n Bytes 的 object 放到 kfifo buffer 中。
    • kfifo_put: 與 kfifo_in 相似,只是用來處理要將單一個值放入 kfifo buffer 的情境,若要插入時,buffer 已滿,則會返回 0。
  • kfifo_out 先將 kfifo buffer 前 5 個值拿出,即 "hello"。
  • kfifo_out 將 kfifo buffer 前 2 個值 (0、1) 拿出,再用 kfifo_in 重新將 0、1 放入 kfifo buffer,並用 kfifo_skip 拿出並忽略 buffer 中第一個值。
  • 值從 20 開始,逐一將大小為 32B 的 kfifo buffer 填滿。
  • 並用 kfifo_get 逐一檢查 buffer 內的值是否與 expected_result 中的值一樣,若一樣,則 test passed。
static int __init testfunc(void)
{
    unsigned char	buf[6];
    unsigned char	i, j;
    unsigned int	ret;

    printk(KERN_INFO "byte stream fifo test start\n");

    /* put string into the fifo */
    kfifo_in(&test, "hello", 5);

    /* put values into the fifo */
    for (i = 0; i != 10; i++)
        kfifo_put(&test, i);

    /* show the number of used elements */
    printk(KERN_INFO "fifo len: %u\n", kfifo_len(&test));

    /* get max of 5 bytes from the fifo */
    i = kfifo_out(&test, buf, 5);
    printk(KERN_INFO "buf: %.*s\n", i, buf);

    /* get max of 2 elements from the fifo */
    ret = kfifo_out(&test, buf, 2);
    printk(KERN_INFO "ret: %d\n", ret);
    /* and put it back to the end of the fifo */
    ret = kfifo_in(&test, buf, ret);
    printk(KERN_INFO "ret: %d\n", ret);

    /* skip first element of the fifo */
    printk(KERN_INFO "skip 1st element\n");
    kfifo_skip(&test);

    /* put values into the fifo until is full */
    for (i = 20; kfifo_put(&test, i); i++)
        ;

    printk(KERN_INFO "queue len: %u\n", kfifo_len(&test));

    /* show the first value without removing from the fifo */
    if (kfifo_peek(&test, &i))
        printk(KERN_INFO "%d\n", i);

    /* check the correctness of all values in the fifo */
    j = 0;
    while (kfifo_get(&test, &i)) {
        printk(KERN_INFO "item = %d\n", i);
        if (i != expected_result[j++]) {
            printk(KERN_WARNING "value mismatch: test failed\n");
            return -EIO;
        }
    }
    if (j != ARRAY_SIZE(expected_result)) {
        printk(KERN_WARNING "size mismatch: test failed\n");
        return -EIO;
    }
    printk(KERN_INFO "test passed\n");

    return 0;
}

掛載核心模組。

$ sudo insmod bytestream-example.ko

利用 dmesg 查看核心訊息

$ sudo dmesg
[130567.107610] byte stream fifo test start
[130567.107612] fifo len: 15
[130567.107613] buf: hello
[130567.107614] ret: 2
[130567.107614] ret: 2
[130567.107614] skip 1st element
[130567.107615] queue len: 32
[130567.107615] 3
[130567.107615] item = 3
[130567.107615] item = 4
[130567.107616] item = 5
[130567.107616] item = 6
[130567.107616] item = 7
[130567.107616] item = 8
[130567.107617] item = 9
[130567.107617] item = 0
[130567.107617] item = 1
[130567.107617] item = 20
[130567.107618] item = 21
[130567.107618] item = 22
[130567.107618] item = 23
[130567.107618] item = 24
[130567.107619] item = 25
[130567.107619] item = 26
[130567.107619] item = 27
[130567.107619] item = 28
[130567.107619] item = 29
[130567.107620] item = 30
[130567.107620] item = 31
[130567.107620] item = 32
[130567.107620] item = 33
[130567.107621] item = 34
[130567.107621] item = 35
[130567.107621] item = 36
[130567.107621] item = 37
[130567.107622] item = 38
[130567.107622] item = 39
[130567.107622] item = 40
[130567.107622] item = 41
[130567.107623] item = 42
[130567.107623] test passed
  • 應用案例: 生產者與消費者

producer-consumer.c

設計一個 kfifo 的生產者與消費者實驗,包含一個 producer 與一個 consumer,producer 函式每 1 秒會將一個值放入 kfifo 中,並從 1 遞增到 10,而consumer 函式每 2 秒會消耗一個 kfifo 的值。

static int producer(void *data)
{
    unsigned char i;

    for (i = 1; i <= 10; i++) {
        kfifo_put(&test, i);
        pr_info("Producer inserted value: %d\n", i);
        msleep(1000);
    }

    kthread_stop(producer_thread);
    return 0;
}

static int consumer(void *data)
{
    unsigned char j;
    unsigned char buf[10];
    unsigned int ret;
    
    for (j = 1; j <= 5; j++) {
        ret = kfifo_out(&test, buf, 1);
        if (ret) {
            pr_info("Consumer removed value: %d\n", j);
        } else {
            pr_info("Consumer failed to remove value from kfifo\n");
        }
        msleep(2000);
    }

    kthread_stop(consumer_thread);
    return 0;
}

在 example_init 中,使用 kthread_run 建立兩個 kernel thread,分別是 producer_threadconsumer_thread

producer_thread = kthread_run(producer, NULL, "producer_thread");
...
consumer_thread = kthread_run(consumer, NULL, "consumer_thread"); 

example_exit 中,會用 kfifo_get 逐一檢查 kfifo 剩餘的值是否與 expected_result 相同。

static void __exit example_exit(void)
{
    unsigned char i, j;

    /* check the correctness of all values in the fifo */
    j = 0;
    while (kfifo_get(&test, &i)) {
        pr_info("kfifo item = %d\n", i);
        if (i != expected_result[j++]) {
            pr_warn("value mismatch: test failed\n");
            goto error_EIO;
        }
    }
    pr_info("test passed\n");
    kfifo_free(&test);
    pr_info("kfifo Example Exit\n");

error_EIO:
    kfifo_free(&test);
}
$ make check
$ sudo dmesg 
[96656.871161] kfifo Example Init
[96656.871280] Producer inserted value: 1
[96656.871364] Consumer removed value: 1
[96657.890006] Producer inserted value: 2
[96658.882042] Consumer removed value: 2
[96658.914025] Producer inserted value: 3
[96659.937999] Producer inserted value: 4
[96660.897975] Consumer removed value: 3
[96660.961976] Producer inserted value: 5
[96661.985950] Producer inserted value: 6
[96662.917915] Consumer removed value: 4
[96663.009917] Producer inserted value: 7
[96664.033895] Producer inserted value: 8
[96664.929874] Consumer removed value: 5
[96665.057866] Producer inserted value: 9
[96666.081860] Producer inserted value: 10
[96801.846529] kfifo item = 6
[96801.846536] kfifo item = 7
[96801.846539] kfifo item = 8
[96801.846540] kfifo item = 9
[96801.846542] kfifo item = 10
[96801.846544] test passed
[96801.846546] kfifo Example Exit