Try   HackMD

Linux 核心專題: simrupt 研究和應用

執行人: pao0626
專題解說錄影

Reviewed by yu-hsiennn

workqueue 實驗設計的程式碼重複張貼?

感謝提醒,已更新。by pao0626

Reviewed by marvin0102

我只擷取 simrupt 專案中有使用到的 HI, TASKLET 和 TASKLET 這三種 softirq 的使用數量。

TASKLET 似乎重複出現

感謝提醒,已更新。by pao0626

Reviewed by ChengChaoChun

在 simrupt 專案中定時器設置每 100ms 產生一次中斷。我看了你的實驗數據,一次中斷處理過程(top half 加上 bottom half)的時間比定時器中斷產生一次的時間快非常多,因此不會有兩個中斷處理過程同時進行。我想請問在 simrupt_work_func 函式中 val = fast_buf_get() 和 produce_data(val) 如果不使用 mutex lock 是否會影響?

[255961.544677] simrupt: [CPU#4] enter timer_handler
[255961.544695] simrupt: [CPU#4] produce data
[255961.544696] simrupt: [CPU#4] scheduling tasklet
[255961.544698] simrupt: [CPU#4] timer_handler in_irq: 3 usec
[255961.544756] simrupt: [CPU#4] simrupt_tasklet_func in_softirq: 0 usec
[255961.544758] tasklet run 54 usec after irq
[255961.544758] simrupt: [CPU#0] simrupt_work_func
[255961.544763] simrupt_work_func execution time: 4989 ns

任務簡介

simrupt 專案名稱由 simulate 和 interrupt 二個單字組合而來,其作用是模擬 IRQ 事件,並展示以下 Linux 核心機制的運用:

TODO: 解釋 simrupt 運作原理

搭配 Linux Kernel Module Programming Guide 和課程教材,逐一解釋 simrupt 所展示的核心機制,設計對應的實驗來解說。







main



timer

timer_handler



pcd

process_data



timer->pcd





fbp

fast_buf_put



pcd->fbp





stf

simrupt_tasklet_func



pcd->stf


tasklet schedule



usd

update_simrupt_data



fbp->usd





fcb

fast circular buffer



fbp->fcb





fbg

fast_buf_get



fcb->fbg





wq

workqueue

work
 (simrupt_work_func)



stf->wq


queue work



wq->fbg





pdd

produce_data



wq->pdd





fbg->pdd





k

kfifo



pdd->k


kfifo_in



r

simrupt_read



k->r


kfifo_to_user



上面是 simrupt 流程圖,實線表達函式執行順序,虛線表達資料的傳遞。
整體流程透過 timer_handler 模擬硬體中斷並呼叫 process_data 函式,該函式會做兩件事。

  1. 透過 fast_buf_put(update_simrupt_data()) 將產生的資料寫入 fast circular buffer。
  2. 呼叫 tasklet_schedule 排程。

其目的是將中斷處理分為 top half 和 bottom half。 top half 快速響應了中斷呼叫,將資料放入 fast circular buffer,而需要花費大量時間將資料放入 kfifo 的過程則為 bottom half,將 bottom half 的任務交給 tasklet 排程,放入 workqueue 等待處理。當 workqueue 中的 work 被喚醒後透過 simrupt_work_func 處理時,會使用 fast_buf_get 函式取得 fast circular buffer 裡的資料,再透過 produce_data 函式將資料放入 kfifo 中。後續當使用者讀取該模組的設備 裝置檔案時會呼叫 simrupt_read 取出資料。

注意用語:

  • device 是「裝置」

已全部更改

以下逐一解釋 simrupt 所展示的各項核心機制:

irq

不該用「軟中斷」,保留原文 softirq

已改進

在 Linux 操作系統中,irq 和 softirq 是核心用於處理異步事件的兩種機制。irq 是由硬體裝置發起的信號,告知處理器需要立即處理某些事件,像是使用者敲擊鍵盤。處理程序 應盡可能避免執行過長時間的操作。
softirq 則用於 interrupt context 中延遲執行某些不太急迫的工作。處理那些不能在硬體中斷處理程序中立即完成的任務。可以被其他中斷 (包括其他 softirq 或 irq) 中斷。
補充:延遲執行的中斷稱為 deferred interrupts,也是上面提到的 bottom half。其中包含了 softirq, tasklet 和 workqueue 三種。

此專案中使用定時器 (timer) 模擬硬體中斷 (hard-irq)。

timer_setup(&timer, timer_handler, 0);

注意用語:

  • macro 是「巨集」,不是「宏」

已改進

timer_setup 巨集初始化一個核心定時器 timer。設定了定時器的到期處理函數timer_handler,並接收一個標誌參數。

static struct timer_list timer;
static void timer_handler(struct timer_list *__timer)
{
    ktime_t tv_start, tv_end;
    s64 nsecs;

    pr_info("simrupt: [CPU#%d] enter %s\n", smp_processor_id(), __func__);
    /* We are using a kernel timer to simulate a hard-irq, so we must expect
     * to be in softirq context here.
     */
    WARN_ON_ONCE(!in_softirq());

    /* Disable interrupts for this CPU to simulate real interrupt context */
    local_irq_disable();

    tv_start = ktime_get();
    process_data();
    tv_end = ktime_get();

    nsecs = (s64) ktime_to_ns(ktime_sub(tv_end, tv_start));

    pr_info("simrupt: [CPU#%d] %s in_irq: %llu usec\n", smp_processor_id(),
            __func__, (unsigned long long) nsecs >> 10);
    mod_timer(&timer, jiffies + msecs_to_jiffies(delay));

    local_irq_enable();
}

由於是使用核心計時器模擬硬體中斷,所以必須檢查目前程式碼是否在 softirq context 中執行。使用 local_irq_disable 函式停用目前 CPU 的中斷來模擬硬體中斷環境。接著呼叫先前提到的 process_data 函式將資料放入 fast_buf,並一樣透過 ktime_get 計算操作時間。並透過 mod_timer 更新定時器(可參考 LKMPG 第十三章)。最後使用 local_irq_enable 重新啟用中斷。
補充: 全域變數 jiffies 保存自系統啟動以來發生的 ticks 數。tick 是一個基本時間單位,通常對應於系統定時器中斷的一次觸發。而定時器中斷是以固定頻率 HZ 發生的。啟動時,核心將變數初始化為零,並在每次定時器中斷期間加一,用於度量時間的流逝。因為一秒內有 HZ 次定時器中斷,所以每秒增加 HZjiffies,也是 jiffies_value = seconds_value * HZ 計算式的由來。

softirq

注意用語:

  • thread 是「執行緒」
  • command 是「命令」,而非「指令」

了解

每個 CPU 上會初始化一個 ksoftirqd 核心線程 執行緒,負責處理各種類型的 softirq 中斷事件。

閱讀許多文件後會產生疑問: softirqs 和 tasklets 是在 interrupt context 運行的,但 ksoftirqd 卻又是一個在 process context 中運行的核心執行緒,兩者產生矛盾。參考網路上相關問答,ksoftirqd 是在中斷處理過載時被呼叫,也就是當 irq 結束並呼叫 do_IrQ 後卻沒有馬上處理 softirq 時,就會被放至 ksoftirqd 等待 cpu 排程處理,雖然 ksoftirqd 是在 process context 執行。但它會啟用 local interrupts enabled 和 bottom halves disabled,與 interrupt context 環境相同。

可以透過以下指令 命令觀察我的電腦中 16 個 cpu 分別負責的 softirq 執行緒編號。

$ systemd-cgls -k | grep softirq
├─   15 [ksoftirqd/0]
├─   23 [ksoftirqd/1]
├─   29 [ksoftirqd/2]
├─   35 [ksoftirqd/3]
├─   41 [ksoftirqd/4]
├─   47 [ksoftirqd/5]
├─   53 [ksoftirqd/6]
├─   59 [ksoftirqd/7]
├─   65 [ksoftirqd/8]
├─   71 [ksoftirqd/9]
├─   77 [ksoftirqd/10]
├─   83 [ksoftirqd/11]
├─   89 [ksoftirqd/12]
├─   95 [ksoftirqd/13]
├─  101 [ksoftirqd/14]
├─  107 [ksoftirqd/15]

Linux 中定義了 10 種 softirq 如下:

const char * const softirq_to_name[NR_SOFTIRQS] = {
        "HI", "TIMER", "NET_TX", "NET_RX", "BLOCK", "BLOCK_IOPOLL",
        "TASKLET", "SCHED", "HRTIMER", "RCU"
};

在 simrupt 專案中使用了其中的 HI_SOFTIRQ 和 TASKLET_SOFTIRQ 用於 tasklet 的運作(通常不特別初始化時會使用優先權較低的後者),以及 TIMER_SOFTIRQ 用於處理前面提到的 timer_handler

tasklet

Tasklet 是一種建立在 softirq 之上的機制,其提供了更簡單的介面,並且具有動態配置的能力(可參考 LKMPG 第十四章)。
兩者差異可粗略分成 softirq 傾向效能,tasklet 傾向易用性。原因在於同類型的 softirq 可能在不同的 CPU 上並行執行,因此要引入同步機制。但同類型的 tasklet 同時只能在其中一個 cpu 上執行。
在 simrupt 中相關程式碼如下:

static struct workqueue_struct *simrupt_workqueue;
static void simrupt_tasklet_func(unsigned long __data)
{
    ktime_t tv_start, tv_end;
    s64 nsecs;

    WARN_ON_ONCE(!in_interrupt());
    WARN_ON_ONCE(!in_softirq());

    tv_start = ktime_get();
    queue_work(simrupt_workqueue, &work);
    tv_end = ktime_get();

    nsecs = (s64) ktime_to_ns(ktime_sub(tv_end, tv_start));

    pr_info("simrupt: [CPU#%d] %s in_softirq: %llu usec\n", smp_processor_id(),
            __func__, (unsigned long long) nsecs >> 10);
}
static DECLARE_TASKLET_OLD(simrupt_tasklet, simrupt_tasklet_func);

「中斷上下文」很難會意,保留原文 interrupt context

收到

simrupt_tasklet_func 函式使用 WARN_ON_ONCE 檢查 in_interruptin_softirq (兩者皆透過檢查 preempt_count 來判斷),但卻不用檢查 irqs_disabled。是因為只需確保程式碼是在 interrupt context (執行環境由 ISR 控制),但仍需保持可搶佔性供優先級更高的中斷打斷。使用 ktime_get 來計算 queue_work 函式執行時間。整段程式碼透過 Tasklet 來快速響應中斷,並交由 Workqueue 在進程 process context 中實際完成任務,以優化 改善整個系統的回應性和處理效率。__func__ 會被替換成函式名稱。

不要濫用「優化」一詞,參見: https://hackmd.io/@sysprog/it-vocabulary

process 是「行程」,而非「進程」。儘管簡體中文的「進程」寓意是「進行中的程序」,但在中華人民共和國官方的文宣中,也常見「進程」一詞,顯然語境跟作業系統無關。在中國文宣中,「進程」指英語 progress,後者可代表進步、過程、進展,甚至依詞性變化還帶有「步驟」的意思。

已更改

不同 tasklet 可以在不同 CPU 同時執行,相同 tasklet 不能同時執行,一個 tasklet 只會在排程它的 CPU 上執行。

workqueue

workqueue 和 softirq、tasklet 本質上的差異在於前者運行在 process context,滿足有睡眠需求的任務,後者們運行在 interrupt context。觀察以下程式碼檢查 WARN_ON_ONCE(in_softirq())WARN_ON_ONCE(in_interrupt()) 即可得證。

static DEFINE_MUTEX(producer_lock);
static DEFINE_MUTEX(consumer_lock);
static DECLARE_WAIT_QUEUE_HEAD(rx_wait);
static void simrupt_work_func(struct work_struct *w)
{
    int val, cpu;
    WARN_ON_ONCE(in_softirq());
    WARN_ON_ONCE(in_interrupt());
    cpu = get_cpu();
    pr_info("simrupt: [CPU#%d] %s\n", cpu, __func__);
    put_cpu();
    while (1) {
        mutex_lock(&consumer_lock);
        val = fast_buf_get();
        mutex_unlock(&consumer_lock);

        if (val < 0)
            break;
        mutex_lock(&producer_lock);
        produce_data(val);
        mutex_unlock(&producer_lock);
    }
    wake_up_interruptible(&rx_wait);
}
static DECLARE_WORK(work, simrupt_work_func);

simrupt_work_func 函式使用 get_cpuput_cpu 來停用和重新啟用搶佔來取得整確的 cpu id,因為此函式運行在 process context 中,默認允許搶佔。使用 consumer_lock 來安全地從循環緩衝區中獲取資料,使用 producer_lock 來安全地將數據 資料寫入 kfifo 緩衝區。wake_up_interruptible(&rx_wait) 用於喚醒在 rx_wait 等待隊列 佇列上的任何進程 行程。
最後透過 DECLARE_WORK 靜態初始化一個工作項目,將 simrupt_work_func 函數綁定到這個工作項目上(如果使用 INIT_WORK 則是動態初始化)。在未來這個 work 會透過先前提到的 queue_work 來將其添加至 simrupt_workqueue

注意用語:

  • data 是「資料」
  • queue 是「佇列」,而非「隊列」

已更改

kfifo

kfifo 是 Linux 核心裡頭提供了 First-In-First-Out 的 Circular buffer 資料結構。在 Single Producer Single Consumer (SPSC) 情況中是 safe 的,即不需要額外的 lock 維護。多生產者需鎖定寫入作業,多消費者需鎖定讀取操作。

在 simrupt 專案中用來儲存即將傳到 userspace 的資料,相關程式碼如下:

static DECLARE_KFIFO_PTR(rx_fifo, unsigned char);
static DECLARE_WAIT_QUEUE_HEAD(rx_wait);

宣告了一個 rx_fifo 指標指向一個 Kfifo 結構。以及一個為了防止讀取共同資料造成 race condition 的鎖 rx_wait

static void produce_data(unsigned char val)
{
    /* Implement a kind of circular FIFO here (skip oldest element if kfifo
     * buffer is full).
     */
    unsigned int len = kfifo_in(&rx_fifo, &val, sizeof(val));
    if (unlikely(len < sizeof(val)) && printk_ratelimit())
        pr_warn("%s: %zu bytes dropped\n", __func__, sizeof(val) - len);

    pr_debug("simrupt: %s: in %u/%u bytes\n", __func__, len,
             kfifo_len(&rx_fifo));
}

produce_data 函式中透過 kfifo_in(fifo, buf, n) 函式,複製 buf 資料並放到 fifo 中,並回傳插入的資料大小。透過這個回傳值可以知道寫入的空間不足,打印資訊警告使用者丟棄了某些資料。

static ssize_t simrupt_read(struct file *file,
                            char __user *buf,
                            size_t count,
                            loff_t *ppos)
{
    unsigned int read;
    int ret;

    pr_debug("simrupt: %s(%p, %zd, %lld)\n", __func__, buf, count, *ppos);

    if (unlikely(!access_ok(buf, count)))
        return -EFAULT;

    if (mutex_lock_interruptible(&read_lock))
        return -ERESTARTSYS;

    do {
        ret = kfifo_to_user(&rx_fifo, buf, count, &read);
        if (unlikely(ret < 0))
            break;
        if (read)
            break;
        if (file->f_flags & O_NONBLOCK) {
            ret = -EAGAIN;
            break;
        }
        ret = wait_event_interruptible(rx_wait, kfifo_len(&rx_fifo));
    } while (ret == 0);
    pr_debug("simrupt: %s: out %u/%u bytes\n", __func__, read,
             kfifo_len(&rx_fifo));

    mutex_unlock(&read_lock);

    return ret ? ret : read;
}

注意用語:

  • character 是「字元」,而非「字符」,繁體中文的「符」出現較少,而共製漢語則濫用該字
  • call 是「呼叫」,而非「調用」

字符設備 字元裝置註冊後,simrupt_read 函式使使用者可以使用標準的系统调用 呼叫來操作裝置,該函式透過 kfifo_to_user(fifo, to, len, copied) 函式將 len 個 bytes 資料從 fifo 移到 userspace。
這段程式碼先是使用 mutex_lock_interruptible 嘗試取得互斥鎖 read_lock,在等待鎖的過程中可以被訊號打斷。接著使用 do-while 迴圈從 kfifo 嘗試讀取資料並將其複製到使用者空間,其中包含了多個中止條件判定:

  • if (unlikely(ret < 0)) 判斷 kfifo_to_user 返回一個負值,表示資料複製過程中發生了錯誤。
  • if (read) 判斷 read 變數大於 0,表示已有資料被成功複製到使用者空間,這時將跳出迴圈。
  • if (file->f_flags & O_NONBLOCK) 則是用於處理當文件是以非阻塞模式 (O_NONBLOCK) 打開,且 kfifo 中沒有足夠的資料可供立即讀取(read 為 0),此時不會進入阻塞等待,而是返回 -EAGAIN 表達現在沒有資料可讀。

注意用語!避免看低劣的簡體中文殘句。
避免使用 ChatGPT 的中文翻譯,其品質堪憂。

之後會注意並改進

當核心模組無法立即處理一個進程 行程的請求時,wait_event_interruptible進程 行程加入等待 隊列佇列。wait_event_interruptible 在等待事件 (在這個例子中為 kfifo_len(&rx_fifo) 大於 0) 時,允許進程 行程對信號 (如 Ctrl+C) 做出響應,這與 wait_event 不同,後者會導致進程 行程完全阻塞,即使收到信號也不會喚醒(這段內容皆可參考 LKMPG 第十一章)。
所以 simrupt_read 函式有兩種情況可能導致行程進入睡眠:沒搶到互斥鎖以及 kfifo 為空的時候,後者在 kfifo 資料更新時會透過 wake_up_interruptible 觸發等待行程重新檢查條件。

最後在 simrupt_init 函式初始化模組時透過 kfifo_alloc(fifo, size, gfp_mask) 配置 kfifo buffer 一塊 PAGE_SIZE 大小的記憶體空間,並在 simrupt_exit 函式中止模型時使用 kfifo_free(fifo) 函式釋放記憶體空間。

memory barrier

simrupt 中在 fast_buf (fast circular buffer) 中使用了 memory barrier。用於快速在 interrupt context 中儲存要放到 kfifo 的資料。
Circular Buffer 是個固定大小的緩衝區,透過 2 個 indicies: head 紀錄 producer 寫入資料的位置, tail 紀錄 consumer 讀出資料的位置,因此 tail 不會超過 head。兩者初始值一致,表示空緩衝區。
緩衝區大小維持 2 的冪,才可以使用 bitwise 操作去計算緩衝區空間,取代較慢的 modulus (divide) 操作。觀察以下於 Linux 定義的巨集:

#define CIRC_CNT(head, tail, size) (((head) - (tail)) & ((size)-1))

當 size 是 2 的冪時,可以使用 & (size-1) 操作取代對 size 進行模運算。因此 CIRC_CNT 巨集會返回緩衝區中目前的資料數量,供 consumer 所用。

#define CIRC_SPACE(head,tail,size) CIRC_CNT((tail),((head)+1),(size))

CIRC_SPACE 巨集其實是對 CIRC_CNT 巨集的複用,將頭尾參數順序顛倒,計算從 headtail 的空間,返回緩衝區中剩餘的可用空間,供 producer 所用。
需注意的是 (head)+1 因為剩餘的可用空間最大只有 size - 1,保留一個字元來達到不用來區分滿和空的狀態。

smp_rmbsmp_wmb 作為 memory barrier,防止記憶體讀取指令的重排,確保先讀寫索引值後再讀取寫入內容。觀察以下程式碼:

static int fast_buf_get(void)
{
    struct circ_buf *ring = &fast_buf;

    /* prevent the compiler from merging or refetching accesses for tail */
    unsigned long head = READ_ONCE(ring->head), tail = ring->tail;
    int ret;

    if (unlikely(!CIRC_CNT(head, tail, PAGE_SIZE)))
        return -ENOENT;

    /* read index before reading contents at that index */
    smp_rmb();

    /* extract item from the buffer */
    ret = ring->buf[tail];

    /* finish reading descriptor before incrementing tail */
    smp_mb();

    /* increment the tail pointer */
    ring->tail = (tail + 1) & (PAGE_SIZE - 1);

    return ret;
}

這個函式 fast_buf_get 是供 consumer 使用,從緩衝區中取得資料,並更新 tail index。因此需要使用 READ_ONCE 巨集讀取由 producer 更動的 ring->head 變數以確保讀取最新的正確值。透過 smp_rmb() 設置 read memory barrier 確保在這之前程式碼已讀取 headtail 索引值,才讀取該索引值的內容。接著使用 smp_mb 設置 full memory barrier 確保先前的讀寫操作完成後,再進行寫入操作更新 ring->tail 索引值。

static int fast_buf_put(unsigned char val)
{
    struct circ_buf *ring = &fast_buf;
    unsigned long head = ring->head;

    /* prevent the compiler from merging or refetching accesses for tail */
    unsigned long tail = READ_ONCE(ring->tail);

    /* is circular buffer full? */
    if (unlikely(!CIRC_SPACE(head, tail, PAGE_SIZE)))
        return -ENOMEM;

    ring->buf[ring->head] = val;

    /* commit the item before incrementing the head */
    smp_wmb();

    /* update header pointer */
    ring->head = (ring->head + 1) & (PAGE_SIZE - 1);

    return 0;
}

這個函式 fast_buf_put 是供 producer 使用,與上個函式相似,只是READ_ONCE 改成確保 ring->tail 索引值,以及改成使用 CIRC_SPACE 確保是否還有可寫入空間,最後透過 smp_wmb 設置 write memory barrier 確保 val 寫入緩衝區後才進行寫入操作更新 ring->head 索引值。

注意用語!
schedule 是「排程」

了解

以下程式碼描述在核心中處理資料和調度 排程任務的流程

static void process_data(void)
{
    WARN_ON_ONCE(!irqs_disabled());

    pr_info("simrupt: [CPU#%d] produce data\n", smp_processor_id());
    fast_buf_put(update_simrupt_data());

    pr_info("simrupt: [CPU#%d] scheduling tasklet\n", smp_processor_id());
    tasklet_schedule(&simrupt_tasklet);
}

WARN_ON_ONCE 會在其參數表達式為真時發出警告。在這段程式碼中用於檢查 irqs_disabled() 是否為回傳 0,以確保在 interrupt context 中呼叫 process_data 函數。
update_simrupt_data 函式產生一段 ASCII 可顯示字元資料,並透過 fast_buf_put 函式將其放入環形緩衝區。
最後透過 tasklet_schedule 調度 排程核心 tasklet 來非同步處理先前放入環形緩衝區的資料。

注意用語:

  • schedule 是「排程」(針對 process 的「行程」),而非「調度」。

摘自教育部新編國語詞典的「調度」條目:

安排配置。如:「調度有方」。《花月痕》第一四回:「走進垂花門,見堂中正亂騰騰的擺設,謖如卻坐在炕上調度。」

「調度」一詞不足以反映作業系統核心的行為,而「排程」則更精準。

kernel module

我認為也是 simrupt 所展示的核心機制之一,simrupt 本身就是一個模組,在程式碼中透過 simrupt_initsimrupt_exit 進行註冊和註銷。simrupt_init 除了將專案中所用到的資料結構初始化,例如使用 vmalloc 分配 配置空間給 fast_buf,vmalloc 與 kmalloc 差異在於前者保證連續的虛擬位址,後者則保證連續的實際位址,還透過 cdev 介面為 simrupt 註冊一個裝置號,並且加入到系統中。並使用 class_createdevice_create 將裝置註冊到 sysfs (可參閱 LKMPG 第六章), class_create 函式用於建立一個裝置類別 (device class) 的抽象概念。建立後當成參數傳入 device_create 函式,在該裝置類別中建立一個在 /dev 目錄下具體的裝置節點。
cdev_adddevice_create 的差異在於 cdev_add 專注於核心如何處理對裝置的操作,確保當使用者空間的應用程式嘗試存取這個裝置時,核心能夠管理和排程這些操作。而 device_create 負責在檔案系統中建立一個使用者可以直接存取的節點,讓使用者能夠直接使用標準文件操作來與裝置互動。

實驗設計

在執行該專案時先紀錄系統目前資源開銷。

$ sudo cat /proc/softirqs                         
                    CPU0       CPU1       CPU2       CPU3       CPU4       CPU5       CPU6       CPU7       CPU8       CPU9       CPU10      CPU11      CPU12      CPU13      CPU14      CPU15     
          HI:         24      60395          0    1188439          0     278793      58535    1120762          0          0          0       2930         14      55972          0       5523
       TIMER:     787923     572887     453747     458307     463694     621511     463080     499769     223574     109631     123459      81149      73009      74061      69896    1101358
     TASKLET:         25       1654       2490      14302       2357       5019      19934       8830          1          0         69       1032         79          0          0        103

此命令用來查看 Linux 系統中 softirqs 的目前統計資訊,proc 檔案系統最初被設計用來方便存取有關行程的資訊,因此得名 proc。隨時間推移,它被廣泛應用於報告核心的各種資訊,(可參考 LKMPG 第七章)
我只擷取 simrupt 專案中有使用到的 HI, TASKLET 和 TIMER 這三種 softirq 的使用數量。

開始實驗,掛載核心模組。

$ sudo insmod simrupt.ko

掛載後,會產生一個裝置檔案/dev/simrupt (原因可見上面筆記),藉由以下命令可見到輸出的資料。

$ sudo cat /dev/simrupt

參考輸出: (可能會有異)

 !"#$%&'()*+,-./0123456789:;<=>?@AB

dmesg 顯示核心訊息,加入 -t | tail -12 限制顯示數量

$ sudo dmesg -t | tail -12
simrupt: [CPU#4] enter timer_handler
simrupt: [CPU#4] produce data
simrupt: [CPU#4] scheduling tasklet
simrupt: [CPU#4] timer_handler in_irq: 4 usec
simrupt: [CPU#4] simrupt_tasklet_func in_softirq: 2 usec
simrupt: [CPU#7] simrupt_work_func
simrupt: [CPU#4] enter timer_handler
simrupt: [CPU#4] produce data
simrupt: [CPU#4] scheduling tasklet
simrupt: [CPU#4] timer_handler in_irq: 4 usec
simrupt: [CPU#4] simrupt_tasklet_func in_softirq: 2 usec
simrupt: [CPU#2] simrupt_work_func

可以觀察出 cpu4 處理了計時器的中斷和 tasklet 的排程,而後續 work 則可能在不同的 cpu 上被喚醒執行 simrupt_work_func 的處理。

再次觀察實驗後 Linux 系統中 softirqs 的目前統計資訊,可以明顯看出只有 cpu4 的 TASKLET 數量增加,且 HI 都維持不變,印證了我前面介紹 softirq 時的推論。而 Timer 在 Linux 核心中被廣泛使用,較難從這種方法中觀察出明顯使用痕跡。

$ sudo cat /proc/softirqs
                    CPU0       CPU1       CPU2       CPU3       CPU4       CPU5       CPU6       CPU7       CPU8       CPU9       CPU10      CPU11      CPU12      CPU13      CPU14      CPU15  
          HI:         24      60395          0    1188439          0     278793      58974    1134704          0          0          0       2930         14      55972          0       5523
       TIMER:     789977     574245     454952     460083     466050     622689     464596     500847     223574     109631     123459      81149      73009      74061      69896    1101358
     TASKLET:         25       1654       2490      14302       3447       5019      20181       8830          1          0         69       1032         79          0          0        103

我們還可以透過更改 delay 變數大小來控制我們模擬中斷的頻率,如同前面所說,simrupt 專案會在中斷時產生資料必依序傳入 fast_buf 和 kfifo,因此將 delay 變數改成 30000 ms 時重新執行上面的實驗即可觀察到資料輸出速度變成約 30 秒一個字元。

static int delay = 30000;

再者,已知 simrupt 專案會定期發生中斷並產生資料,而 cat 命令又會不斷循環的呼叫 read 系統呼叫,再結合前面對於 simrupt 模組的裝置操作中 simrupt_read 的介紹,即可解釋為何 cat 會無止盡的輸出資料。因此,我們可以寫一個簡單的程式碼來改變使用系統呼叫 read 的方式來觀察結果。測試程式碼如下。

#include <fcntl.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>

int main()
{
    char buf[1024];
    int fd = open("/dev/simrupt", O_RDWR);
    if (fd < 0) {
        perror("Failed to open device");
        return -1;
    }

    ssize_t bytes = read(fd, buf, sizeof(buf));
    if (bytes < 0) {
        perror("Failed to read data");
    } else {
        printf("Read %zd bytes: %s\n", bytes, buf);
    }

    close(fd);
    return 0;
}

參考輸出如下(會隨時間長短有所差異)

Read 9 bytes: 23456789:

原因是這段測試程式碼相比 cat,只做了一次 read 系統呼叫,因此只會讀取當下 kfifo 的所有資料,如果為空則會進入睡眠直到讀取至少一個字元,使用者可以結合對 delay 變數的設定來控制資料產生的速度進而方便觀察。
而且由於只做了一次讀取,因此只要時間夠長或是 delay 變數設定的比較小,導致 simrupt 在模擬中斷期間產生的資料量超過 kfifo 的大小時就無法繼續寫入更新。可以透過以下命令觀察。

$ sudo dmesg
[91733.270703] simrupt: [CPU#4] enter timer_handler
[91733.270720] simrupt: [CPU#4] produce data
[91733.270723] simrupt: [CPU#4] scheduling tasklet
[91733.270726] simrupt: [CPU#4] timer_handler in_irq: 5 usec
[91733.270746] simrupt: [CPU#4] simrupt_tasklet_func in_softirq: 3 usec
[91733.270780] simrupt: [CPU#2] simrupt_work_func
[91733.270784] produce_data: 39 callbacks suppressed
[91733.270787] produce_data: 1 bytes dropped

其中 produce_data: 39 callbacks suppressed 是由於 Linux 核心日誌系統在報告大量相似訊息時保護核心免受過多的日誌記錄負載影響的一種機制導致。這個機制在核心中稱為 rate limit,用於減少日誌輸出的數量,避免因大量重複日誌而造成的效能問題或讀取困難。這則資訊說明,在先前的日誌記錄中,produce_data 函數相關的日誌輸出被抑制了39次。而抑制的輸出其實就是 produce_data: 1 bytes dropped,觀察 simrupt 中 produce_data 函式程式碼即可發現是因為 kfifo 緩衝區已滿導致無法寫入。

最後,在進行完所有實驗後記得卸載模組

$ sudo rmmod simrupt

補充: 其實仔細觀察 simrupt 專案程式碼後,可以發現 tasklet 除了紀錄排程 workqueue 的時間以外沒有做任何事情,所以移除 tasklet 相關操作並沒有任何問題。

TODO: 設計實驗來理解 workqueue

workqueue 可以將 work 延後執行,並交由一個 kernel thread 去執行,相對於 tasklet 要在 interrupt context 下執行,workqueue 能在 process context 下執行,且允許重新排程與 sleep,所以 workqueue 可以取代 tasklet。

參考 LKMPG workqueue 實驗,自行設計一個 workqueue 實驗,將 work 與 delayed_work 放到 workqueue 中,並記錄時間,之後,先執行 work,並再延遲 5 秒後,執行 delayed_work

實驗設計

參考 Linux doc

#include <linux/init.h>
#include <linux/ktime.h>
#include <linux/module.h>
#include <linux/workqueue.h>

static struct workqueue_struct *queue = NULL;
static struct work_struct my_work;
static struct delayed_work my_delayed_work;

static ktime_t work_time, delayWork_time;

static void work_handler(struct work_struct *work)
{
    work_time = ktime_get();
    pr_info("Immediate work handler function at: %lld ms.\n",
            ktime_to_ms(work_time));
}

static void delayed_work_handler(struct work_struct *work)
{
    delayWork_time = ktime_get();
    pr_info("Delayed work handler function at: %lld ms.\n",
            ktime_to_ms(delayWork_time));
    if (work_time) {
        s64 msecs = ktime_to_ms(ktime_sub(delayWork_time, work_time));
        pr_info("Time difference between work and delayed work: %lld s.\n",
                msecs >> 10);
    }
}

static int __init my_init(void)
{
    queue = alloc_workqueue("ExampleWQ", WQ_UNBOUND, 1);
    INIT_WORK(&my_work, work_handler);
    INIT_DELAYED_WORK(&my_delayed_work, delayed_work_handler);
    queue_work(queue, &my_work);
    queue_delayed_work(queue, &my_delayed_work, msecs_to_jiffies(5000));
    return 0;
}

static void __exit my_exit(void)
{
    cancel_delayed_work_sync(&my_delayed_work);
    destroy_workqueue(queue);
    pr_info("Workqueue module exiting.\n");
}

module_init(my_init);
module_exit(my_exit);

MODULE_LICENSE("GPL");
MODULE_DESCRIPTION("Workqueue with immediate and delayed work example");

參考輸出如下:

$ sudo dmesg
[218579.766188] Immediate work handler function at: 218578457 ms.
[218584.929022] Delayed work handler function at: 218583620 ms.
[218584.929028] Time difference between work and delayed work: 5 s.

使用了參考資料中的 queue_delayed_work 函式指定一個延遲時間。在延遲時間過後,工作項目將會被排程執行。

此外可以使用以下命令觀察目前核心中用來執行 workqueue 任務的 kworker 行程。在 CMWQ 架構下這些 kworker 執行序被所有 workqueue 共享。而且 CWMQ 會保留一個 idle 的 kworker 一段時間。這樣若隨即有 work 要處理則可以直接沿用,不必再重新建立。

$ systemd-cgls -k | grep kworker
├─     8 [kworker/0:0H-events_highpri]
├─    25 [kworker/1:0H-events_highpri]
├─    31 [kworker/2:0H-events_highpri]
├─    37 [kworker/3:0H-events_highpri]
...

表示方法為 kworker/N:M{flag},其中 N 代表 CPU 的編號; M 是用來區別同一 CPU 上不同 kworker 的編號; flag 提供額外資訊關於 kworker 的角色或處理的任務類型。

最後補充介紹一下 alloc_workqueue 函式,這函式用於分配一個 workqueue,已取代舊的 create_workqueue 函式,其包含三個參數:

  • @name: 工作佇列的名稱。
  • @flags: 控制工作佇列行為的標誌。
  • @max_active: 控制 workqueue 在每個 CPU 上可同時執行的 work 最大數量。

其中 flags 有以下幾種:

  • WQ_UNBOUND: 不綁定任何特定 CPU worker。提高靈活性但效能較低,因為稍微犧牲了 locality 導致快取效率較低。
  • WQ_FREEZABLE
  • WQ_MEM_RECLAIM
  • WQ_HIGHPRI:在特定 CPU 的高優先級 worker pool 中處理,適用於需要快速響應的任務,需注意其和位於同個 cpu 裡普通優先級 worker pool 是相互獨立的。
  • WQ_CPU_INTENSIVE

CMWQ 架構圖:

CMWQ

透過這張架構圖,對照上面的介紹與實驗結果,可以透過 systemd-cgls -k | grep kworker 命令觀察到每個 cpu 都有屬於自己的 highpri 高優先級 worker,以及一些 unbound 的 worker。

這個實驗與 simrupt 專案中皆使用 WQ_UNBOUND@max_active 設為 1 的組合來實現嚴格的執行排序,確保在任何時候只有一個任務可以激活,從而實現與 ST 工作佇列相同的排序屬性。
(以上介紹可對照參考 Linux doc2024 年 Linux 核心設計/實作課程作業)

TODO: 在 Linux 核心選定 kfifo 應用案例

在 Linux 核心中,kfifo 用來管理 FIFO 環狀緩衝區。經常用於實現高效的緩衝和資料流處理,特別是在需要快速且線性地存取資料的情況下。以下是幾個常見的 kfifo 應用案例:

  1. 字元裝置驅動程式,如 /drivers/char/sonypi.c
  2. 網路驅動程式,如 /drivers/net/ieee802154/ca8210.c

TODO: Memory Barrier 的使用和必要性

搭配實驗解說

simrupt 專案中分別在 fast_buf_getfast_buf_put 中使用了 smp_rmb, smp_mbsmp_wmb 三個 memory barrier,防止記憶體讀取指令的重排。
其分別的使用場景和必要性可見我上面的筆記介紹

實驗設計

為了方便觀察資料的讀取順序是否因為少了 memory barrier 而改變,我將 simrupt 專案產生的資料範圍改成 A 到 Z 區間:

 static inline int update_simrupt_data(void)
 {
-    simrupt_data = max((simrupt_data + 1) % 0x7f, 0x20);
+    simrupt_data = max((simrupt_data + 1) % 0x5b, 0x41);
     return simrupt_data;
 }

並將 delay 時間縮短使中斷發生的頻率增加,進而增加負載,接著移除三個 memory barrier ,並在 userspace 呼叫使用兩個 termianl 同時呼叫 read 觀察輸出結果:

在一開始只有一個 termianl 時輸出保持順序。

$ sudo cat /dev/simrupt 
ABCDEFGHIJKLMNOPQRSTUVWXYZABCDEFGHIJKLMNOPQRSTUVWXYZ

然而加入第二個 termianl 後兩者的輸出順序皆稍微亂序。

$ sudo cat /dev/simrupt 
ACEHILMORUXADFIKNQTWZCFILORSUVXYABDEGHJKMNPQSTVWYZBCEFHIKLNOQRUVXY

由此可知雙方讀取的索引值在沒有 memory barrier 的保護下已經互相影響。

TODO: 探討 Linux 中斷處理的流程

以 simrupt 作為解說對象,模仿科技公司面試對答的形式

我先簡單介紹大致概念。在 Linux 中,中斷是 CPU 對事件做出反應的機制。由於一個 cpu 在任意時刻只能處理一個任務,中斷允許 CPU 暫時停止目前行程的執行,立即去處理更緊急的任務。

中斷可以用許多角度分類,以下採用硬體還是軟體觸發來分類:

  • Hardware Interrupts: 由硬體裝置產生。
  • Software Interrupts: 系統呼叫。以及程式執行非法操作 (除零) 時觸發,稱為異常 (Exceptions)。

針對硬體中斷進行討論

中斷類型:

  1. I/O interrupts: 由外部裝置(如鍵盤、鼠標、網卡等)引發的中斷,當這些裝置需要 CPU 處理某些事件時(如資料準備好、讀取完成等),它們會發送中斷信號給 CPU。我之前參與過一個 simrupt 專案就是一個模擬這種情況的驅動模組。
  2. Timer interrupts: 由系統定時器定期引發的中斷,如 Round-Robin Scheduling。
  3. Interprocessor interrupts: 在多處理器系統中,一個處理器向另一個處理器發送的中斷訊號,用於協調多處理器間的操作。

中斷發生流程:

  1. 硬體裝置發送物理信號到中斷控制器 (PIC/APIC)。
  2. 中斷控制器將物理信號轉成中斷向量交給 cpu 執行。
  3. cpu 執行以下處理:
    • 關閉中斷和保存狀態: 作業系統保存目前行程的狀態以便中斷處理完成後能恢復執行,與 context switch 的狀況不同,較為輕量避免佔用太多時間。
    • 執行 ISR (Interrupt Service Routine): 每種中斷都有一個對應的 ISR,透過查尋 Interrupt Vector Table 取得。ISR 是位於核心特定位址的程式碼。
    • 任務排程和恢復: ISR 執行完畢後,作業系統根據目前的任務優先順序調整 CPU 資源分配,恢復或開始新的行程。

然而 IRQ handler (ISR) 既要盡量縮短執行時間,又可能要處理許多事情,這存在矛盾,也因而誕生了 deferred interrupts 用於解決這種情況,包含了 softirq, tasklet, workqueue。

以我參與過的 simrupt 專案為例,模擬了網卡接收封包時觸發中斷的情況。
透過核心定時器 (timer_list, base on Timer softirq) 定期產生資料來模擬網卡接收資料,過程中使用 local_irq_disable 關閉中斷來達到與真實 I/O interrupts 進入 ISR 一樣的情形。在這段期間會將資料插入到快速環形緩衝區中,同時排程 tasklet 來處理後續更多的資料處理工作,達到迅速退出 ISR 避免長時間關閉中斷的效果。
接著 tasklet 會進行排程並透過 ksoftirqd 核心執行序處理後續操作,在 simrupt 這個專案中只進行了排程 workqueue 一個操作,因為 tasklet 需在 interrupt context 中執行,所以不能執行阻塞或睡眠等複雜操作。
最後 workqueue 會排程並透過 kworker 在 process context 中處理最後耗時的複雜操作,從 kfifo 中取出資料並將其提供給使用者空間。

可能遇到的提問:

  1. fast_buf 和 kfifo 的差異?

Ans: fast_buf 是一個簡單的環形緩衝區,設計目的是在 interrupt context 中快速存儲資料。使用簡單的指針操作(head 和 tail)來管理資料的插入和取出。並透過維持 2 的冪大小達到用 bitwise 取代 modulus (divide) 操作。且由於在 interrupt context 所以不用考慮用鎖防止競爭問題。
kfifo 則是核心中一種通用的環形 FIFO 緩衝區,支持多讀者多寫者場景所以設計更加複雜。

  1. 為何此驅動模組中選擇使用 Tasklet ,不使用 softirq?

Ans: 與我上面筆記內容重複。

  1. irq 和 softirq 差異?

Ans: 兩者雖皆在 interrupt context 中執行,但後者是開放中斷的。

  1. softirq 是可以 reschduling 導致可能 starve user-level processes,如何解決?

Ans: 核心可以通過限制每個 softirq 處理的次數,或是限制每次 softirq 處理所花費的時間來防止。

補充相關實驗展示如下(只產生三個字元並讀取的結果):

[255925.006593] simrupt: registered new simrupt device: 511,0
[255930.342137] openm current cnt: 1
[255961.544677] simrupt: [CPU#4] enter timer_handler
[255961.544695] simrupt: [CPU#4] produce data
[255961.544696] simrupt: [CPU#4] scheduling tasklet
[255961.544698] simrupt: [CPU#4] timer_handler in_irq: 3 usec
[255961.544756] simrupt: [CPU#4] simrupt_tasklet_func in_softirq: 0 usec
[255961.544758] tasklet run 54 usec after irq
[255961.544758] simrupt: [CPU#0] simrupt_work_func
[255961.544763] simrupt_work_func execution time: 4989 ns
[255992.264505] simrupt: [CPU#4] enter timer_handler
[255992.264528] simrupt: [CPU#4] produce data
[255992.264530] simrupt: [CPU#4] scheduling tasklet
[255992.264532] simrupt: [CPU#4] timer_handler in_irq: 4 usec
[255992.264608] simrupt: [CPU#4] simrupt_tasklet_func in_softirq: 1 usec
[255992.264611] tasklet run 70 usec after irq
[255992.264612] simrupt: [CPU#2] simrupt_work_func
[255992.264619] simrupt_work_func execution time: 6843 ns
[256022.984602] simrupt: [CPU#4] enter timer_handler
[256022.984619] simrupt: [CPU#4] produce data
[256022.984621] simrupt: [CPU#4] scheduling tasklet
[256022.984623] simrupt: [CPU#4] timer_handler in_irq: 4 usec
[256022.984694] simrupt: [CPU#4] simrupt_tasklet_func in_softirq: 0 usec
[256022.984697] tasklet run 65 usec after irq
[256022.984792] simrupt: [CPU#5] simrupt_work_func
[256022.984798] simrupt_work_func execution time: 6652 ns
[256027.531242] release, current cnt: 0
[256032.428760] simrupt: unloaded

可以觀察到 irq 和 tasklet 的執行速度都非常快,而 tasklet 排程後並不一定會馬上執行,而在 worker 中執行的任務時間相較之下確實非常久。

使用課程指定的程式碼風格進行縮排,確保風格一致。

收到,本意是想精簡篇幅,現已改進