2025q1 Homerwork3

contributed by < charliechiou >

作業書寫規範:

  • 無論標題和內文中,中文和英文字元之間要有空白字元 (對排版和文字搜尋有利)
  • 文字訊息 (尤其是程式執行結果) 請避免用圖片來表示,否則不好搜尋和分類
  • 共筆書寫請考慮到日後協作,避免過多的個人色彩,用詞儘量中性
  • 不要在筆記內加入 [TOC] : 筆記左上方已有 Table of Contents (TOC) 功能,不需要畫蛇添足
  • 不要變更預設的 CSS 也不要加入任何佈景主題: 這是「開發紀錄」,用於評分和接受同儕的檢閱
  • 在筆記中貼入程式碼時,避免非必要的行號,也就是該手動將 c=cpp= 變更為 ccpp。行號只在後續討論明確需要行號時,才要出現,否則維持精簡的展現。可留意「你所不知道的 C 語言: linked list 和非連續記憶體」裡頭程式碼展現的方式
  • HackMD 不是讓你張貼完整程式碼的地方,GitHub 才是!因此你在開發紀錄只該列出關鍵程式碼 (善用 diff 標示),可附上對應 GitHub commit 的超連結,列出程式碼是為了「檢討」和「便於他人參與討論」
  • 留意科技詞彙的使用,請參見「資訊科技詞彙翻譯」及「詞彙對照表
  • 不要濫用 :::info, :::success, :::warning 等標示,儘量用清晰的文字書寫。:::danger 則僅限授課教師作為批注使用
  • 避免過多的中英文混用,已有明確翻譯詞彙者,例如「鏈結串列」(linked list) 和「佇列」(queue),就使用該中文詞彙,英文則留給變數名稱、人名,或者缺乏通用翻譯詞彙的場景
  • 在中文敘述中,使用全形標點符號,例如該用「,」,而非 ","。注意書名號的使用,即 ,非「小於」和「大於」符號
  • 避免使用不必要的 emoji 字元

Softirq, Tasklet, Workqueue

Linux 核心中關於 Bottom half 有以下三種機制:

  • softirqs
  • tasklets
  • workqueues

閱讀 Linux Kernel Development 以理解其機制並整理於筆記中

Softirq, Tasklet, Workqueue 理解

Simrupt

Simrupt 專案名稱由 simulate 和 interrupt 二個單字組合而來,其作用是模擬 IRQ 事件。

參考 Linux 核心的並行處理simrupt 專案以理解 Linux 中核心模組的使用及並行處理。

核心模組使用以下指令編譯

$ make -C /lib/modules/`uname -r`/build M=`pwd` modules

並透過程式碼中 module_init()module_exit() 當作核心模組的進出口,而關於核心模組的敘述可以設定如下

MODULE_LICENSE("Dual MIT/GPL");
MODULE_AUTHOR("National Cheng Kung University, Taiwan");
MODULE_DESCRIPTION("A device that simulates interrupts");

編譯後也可以透過 modinfo simrupt.ko 來查看模組相關資訊。

由於核心模組運行在 Kernel mode 下,因此我們需要先引入核心相關的標頭檔。

#include <linux/cdev.h>
...
#include <linux/workqueue.h>

初始化

程式碼透過 simrupt_init 來初始化模組,使用 dev_id 來註冊裝置及 ret 來紀錄回傳數值。並透過 kfifo_alloc 來分配 PAGE_SIZE 大小的記憶體空間,且由 alloc_chrdev_region 來註冊一個裝置。 alloc_chrdev_region 會將分配的裝置編號儲存在 &dev_id 中,並使用 MAJOR 的宏來取得裝置編號。

ret = alloc_chrdev_region(&dev_id, 0, NR_SIMRUPT, DEV_NAME);

接著透過 cdev_init 來初始化一個 character device 並使用 cdev_add 綁定。

cdev_init(&simrupt_cdev, &simrupt_fops);
ret = cdev_add(&simrupt_cdev, dev_id, NR_SIMRUPT);

而其中初始化過程中所設定的操作則定義在 &simrupt_fops 中,如下

static const struct file_operations simrupt_fops = {
    .read = simrupt_read,
    .llseek = no_llseek,
    .open = simrupt_open,
    .release = simrupt_release,
    .owner = THIS_MODULE,
};

將 character device 不同的操作對應到不同的操作函式中。由於 linux 核心中並沒有類似模擬中斷類型的裝置,因此使用 class_create 來創造一個新的類別。

simrupt_class = class_create(DEV_NAME);

最後透過 device_create 創建一個裝置節點 /dev/simrupt 讓使用者可以透過先前定義的 open, read 來操作我們所建立的裝置。由於我們前面在 alloc_chrdev_region 時將裝置的起始值設定為 0 ,因此這邊使用 MKDEV(major, 0) 將主及次裝置編號合成對應到前面設定的裝置。

device_create(simrupt_class, NULL, MKDEV(major, 0), NULL, DEV_NAME);

接著再分別使用 vmallocalloc_workqueue 宣告會使用到的 buffer 及 workqueue。

fast_buf.buf = vmalloc(PAGE_SIZE);
simrupt_workqueue = alloc_workqueue("simruptd", WQ_UNBOUND, WQ_MAX_ACTIVE);

其中 PAGE_SIZE 是我們要分配的 buffer 大小,而 WQ_UNBOUNDWQ_MAX_ACTIVE 則分別表示不綁定特定 CPU 及同時啟用的工作數量上限。

最後再使用 timer_setup 初始化 timer ,並用 atomic_set 來統計裝置被打開的次數,以確保只有在裝置打開後才會執行。

timer_setup(&timer, timer_handler, 0);
atomic_set(&open_cnt, 0);

其中設定當 timer 觸發時所要執行的程式 timer_handler。其中, simrupt_open 會在使用者 open 裝置的時候觸發,並透過 atomic_inc_return 將 open_cnt 加 1 並且回傳,若回傳值為 1 則表示裝置是第一次被開啟,因此需要啟動 timer 。

if (atomic_inc_return(&open_cnt) == 1)
    mod_timer(&timer, jiffies + msecs_to_jiffies(delay));

其中 jiffies 在 linux 核心中表示現在,在經過 delay 秒後觸發 timer。

觸發計時器

參考 linux 核心中的 timer.c 檔案及翻閱 Linux Kernel Development 中的敘述 , Linux 在啟動的過程中 (init/main.c) 會呼叫 init_timers()

void __init init_timers(void)
{
	init_timer_cpus();
	posix_cputimers_init_work();
	open_softirq(TIMER_SOFTIRQ, run_timer_softirq);
}

建立 timer 的時候會先註冊對應的 softirq (i.e., open_softirq(TIMER_SOFTIRQ, run_timer_softirq);) 在這邊將 TIMER_SOFTIRQ 類型的 softirq 其 handler 函式設為 run_timer_softirq,其中 open_softirq 實現如下

void open_softirq(int nr, void (*action)(void))
{
    softirq_vec[nr].action = action;
}

/*
 * This function runs timers and the timer-tq in **bottom half context.**
 */
static __latent_entropy void run_timer_softirq(void)
{
    ...
}

會把對應的 action 函式(在此為 run_timer_softirq )設定到全域陣列 softirq_vec[] 中對應 nr (在此為 TIMER_SOFTIRQ) 的位置。由此可知,timer 基於 softirq 建立,時間到觸發後會在 softirq context 中執行綁定的 timer_handler

timer_handler 中,先藉由 in_softirq 來確認我們目前是在 sortirq 中,並且使用 local_irq_disable 將原始的 irq 關閉。

WARN_ON_ONCE(!in_softirq());

/* Disable interrupts for this CPU to simulate real interrupt context */
local_irq_disable();

接著使用 ktime_get() 來取得目前的時間,並在 process_data() 前後計算,以便分析執行時間,並以 ns 的單位計算。

tv_start = ktime_get();
process_data();
tv_end = ktime_get();

nsecs = (s64) ktime_to_ns(ktime_sub(tv_end, tv_start));

pr_info("simrupt: [CPU#%d] %s in_irq: %llu usec\n", smp_processor_id(),
        __func__, (unsigned long long) nsecs >> 10);

最後再重新設定下一次中斷的觸發時間點,並重新開啟原始的 irq 。

mod_timer(&timer, jiffies + msecs_to_jiffies(delay));

local_irq_enable();

中斷執行程式

當計時器觸發時會執行 process_data(),首先先確認上一段是否有正確的將原始的 irq 取消

WARN_ON_ONCE(!irqs_disabled());

interrupts 可以分為

  • interrupt handler must execute quickly
  • sometimes an interrupt handler must do a large amount of work

在 simrupt 中,透過將資料寫入環形的快取緩衝區 fast_buf 中,即為呼叫 fast_buf_put()update_simrupt_data() 傳入 fast_buf 來模擬 top half 的操作。

fast_buf_put(update_simrupt_data());

接著, 先宣告一個 tasklet 的型態,指定其 callback 的函式為simrupt_tasklet_func()

static DECLARE_TASKLET_OLD(simrupt_tasklet, simrupt_tasklet_func);

再使用 tasklet_schedule 將剩餘操作排程等待執行,將 fast_buffer 搬移至 kfifo 的工作排程。

tasklet_schedule(&simrupt_tasklet);

Fast buffer

為了增加延遲處理速度,我們會將資料先放入快取緩衝區 fast_buf 中,其中對於 fast buffer 的操作又分為 fast_buf_put , fast_buf_get, 及 fast_buf_clear,分別對應到寫入,讀出及移除,而其中的 fast buffer 則使用 Linux 的 Circular Buffer 來儲存。

Circular Buffer 是個固定大小的緩衝區,其中具有 2 個 indicies:

  • head index: the point at which the producer inserts items into the buffer.
  • tail index: the point at which the consumer finds the next item in the buffer.

當 head 和 tail 重疊時,代表目前是空的緩衝區。相反的,當 head 比 tail 少 1 時,代表緩衝區是滿的。當有項目被添加時,head index 會增加,當有項目被移除時,tail index 會被增加,tail 不會超過 head,且當兩者都到達緩衝區的末端時,都必須被設定回 0。也可以藉由此方法清除緩衝區中的資料。

fast_buf_put

寫入前先將目前 fast_buf 的地址儲存在 ring 的指標變數中,並儲存 head

struct circ_buf *ring = &fast_buf;
unsigned long head = ring->head;

接著用核心提供的 MARCO 來確認使否還有剩餘空間

if (unlikely(!CIRC_SPACE(head, tail, PAGE_SIZE)))
    return -ENOMEM;

若有空間則寫入 ring 的 head 位置

ring->buf[ring->head] = val;

寫入完成後將 head 移動到下一個位置,由於緩衝區大小維持 2 的冪,因此可以使用 bitwise 操作來計算緩衝區空間。以 PAGE_SIZE 大小 8 為例,PAGE_SIZE-1 = 0b0111,遮掉超過 PAGE_SIZE 的高位相當於只留下除完後的餘數,透過位元計算快速求得模除的結果。

ring->head = (ring->head + 1) & (PAGE_SIZE - 1);

fast_buf_put 的操作中,為了避免編譯器造成的錯誤,因此在讀取 tail 時候限制僅讀取一次

unsigned long tail = READ_ONCE(ring->tail);

也使用 smp_rmb() 作為 memory barrier,會防止記憶體讀取指令的重排,確保先讀取索引值後再讀取內容。避免因為 cpu 的優化造成先執行 ring->head = ... 再執行 ing->buf[ring->head] = val; 造成錯誤,此即為 memory barrier 的用途。

fast_buf_get

讀取 fast buffer 中的資料方式和插入資料方式類似,同樣先讀取 head 及 tail 的地址,為了避免資料被其他 CPU 改動到,因此使用 READ_ONCE

unsigned long head = READ_ONCE(ring->head), tail = ring->tail;

接著便讀取 tail 的資料並更新 tail 將資料移出,而過程中也同樣使用 memory barrier 來防止錯誤。

/* read index before reading contents at that index */
smp_rmb();

/* extract item from the buffer */
ret = ring->buf[tail];

/* finish reading descriptor before incrementing tail */
smp_mb();

/* increment the tail pointer */
ring->tail = (tail + 1) & (PAGE_SIZE - 1);

fast_buf_clear

fast_buf_clear 則直接將 head 及 tail 設置為 0 來清空 fast buffer。

fast_buf.head = fast_buf.tail = 0;

在 top half 則透過上述的 fast_buf_put 將update_simrupt_data 產生的資料儲存在 fast buffer 中。

/* Generate new data from the simulated device */
static inline int update_simrupt_data(void)
{
    simrupt_data = max((simrupt_data + 1) % 0x7f, 0x20);
    return simrupt_data;
}

其中將 simrupt_data 限制在 0x20以內以防止小於ASCII 的空白字元 ' ',並且透過 % 0x7f 讓資料循環在 0x20~0x7f 之間。

Tasklet & Workqueue

指定 tasklet 的 callback 的函式為 simrupt_tasklet_func()

static DECLARE_TASKLET_OLD(simrupt_tasklet, simrupt_tasklet_func);

並使用 tasklet_schedule 將任務排程

tasklet_schedule(&simrupt_tasklet);

simrupt_tasklet_func

任務排程後,simrupt_tasklet_func 使用 queue_work 將工作項目 work(work item)加入工作佇列 simrupt_workqueue 中排程執行並計算執行時間。

queue_work(simrupt_workqueue, &work);

其中工作佇列及每個 work 所綁定的 function 如下

/* Workqueue for asynchronous bottom-half processing */
static struct workqueue_struct *simrupt_workqueue;

/* Work item: holds a pointer to the function that is going to be executed
 * asynchronously.
 */
static DECLARE_WORK(work, simrupt_work_func);

simrupt_workqueue 是模組中的工作佇列,當 work 被排入該佇列並被 kernel thread 執行時,將會執行綁定的處理函式 simrupt_work_func()。

simrupt_work_func

當 simrupt_work_func 執行時,會避免目前是執行 softirq 或是 interrupt 的情況,使用到 WARN_ON_ONCE 來警告。

/* This code runs from a kernel thread, so softirqs and hard-irqs must
* be enabled.
*/
WARN_ON_ONCE(in_softirq());
WARN_ON_ONCE(in_interrupt());

透過 get_cpu() 來取得目前 CPU 編號並 disable preemption,最後需要 put_cpu() 重新 enable preemption。

/* Pretend to simulate access to per-CPU data, disabling preemption
* during the pr_info().
*/
cpu = get_cpu();
pr_info("simrupt: [CPU#%d] %s\n", cpu, __func__);
put_cpu();

在接下來的迴圈中,會不斷取出 circular buffer 中的資料 (consumer) 並且放入 kfifo buffer 中 (producer)。其中,為了避免不同的任務在執行過程中同時取得 circular buffer 的資料,或不同任務同時寫入 kfifo buffer 中,因此要使用 mutex_lock 來鎖住 critical section 。

while (1) {
    /* Consume data from the circular buffer */
    mutex_lock(&consumer_lock);
    val = fast_buf_get();
    mutex_unlock(&consumer_lock);

    if (val < 0)
        break;

    /* Store data to the kfifo buffer */
    mutex_lock(&producer_lock);
    produce_data(val);
    mutex_unlock(&producer_lock);
}

其中 fast_buf_get 從 circular buffer 中取得資料,並使用 produce_data 放入 kfifo buffer 中。而 producer_lock 及 consumer_lock 則定義為 mutex lock。

static DEFINE_MUTEX(producer_lock);
static DEFINE_MUTEX(consumer_lock);

最後的 wake_up_interruptible 則對應到 simrupt_read 中的 wait_event_interruptible。當資料已經被放入 kfifo buffer 中,通知 simrupt_read 已經可以讀取。

wake_up_interruptible(&rx_wait);

produce_data

資料使用 fast_buf_get 取出後,藉由 produce_data 存入 kfifo_buffer 中

unsigned int len = kfifo_in(&rx_fifo, &val, sizeof(val));

simrupt_read

當使用指使用 cat /dev/simrupt 開啟裝置時,會依序使用 open() 開啟檔案並使用 read() 來讀取檔案。對應到程式碼中 simrupt_opensimrupt_read

simrupt_open

當程式碼第一次被開啟時,會將記數 open_cnt 加一,以開啟 timer 。

if (atomic_inc_return(&open_cnt) == 1)
    mod_timer(&timer, jiffies + msecs_to_jiffies(delay));

simrupt_read

先使用 access_ok 來確認使用者提供的指標位置是合法的,並且使用 mutex_lock_interruptible 來確認目前是可以讀取的狀態。

if (unlikely(!access_ok(buf, count)))
    return -EFAULT;

if (mutex_lock_interruptible(&read_lock))
    return -ERESTARTSYS;

接著使用 kfifo_to_user 來將資料從 fifo 中搬移到使用者空間的 buf 中。如果回傳錯誤碼則退出,若成功搬移也退出。若目前 kfifo 中沒有資料則使用 wait_event_interruptible 睡眠直到 wake_up_interruptible 將其喚醒後再確認 kfifo_len(&rx_fifo) 是否大於 0 若是則回傳 0 。

do {
    ret = kfifo_to_user(&rx_fifo, buf, count, &read);
    if (unlikely(ret < 0))
        break;
    if (read)
        break;
    if (file->f_flags & O_NONBLOCK) {
        ret = -EAGAIN;
        break;
    }
    ret = wait_event_interruptible(rx_wait, kfifo_len(&rx_fifo));
} while (ret == 0);

最後再釋放 read_lock。

mutex_unlock(&read_lock);

使用 bpftool 分析