contributed by < charliechiou
>
Linux 核心中關於 Bottom half 有以下三種機制:
閱讀 Linux Kernel Development 以理解其機制並整理於筆記中
Softirq, Tasklet, Workqueue 理解
Simrupt 專案名稱由 simulate 和 interrupt 二個單字組合而來,其作用是模擬 IRQ 事件。
參考 Linux 核心的並行處理及 simrupt 專案以理解 Linux 中核心模組的使用及並行處理。
核心模組使用以下指令編譯
並透過程式碼中 module_init()
及 module_exit()
當作核心模組的進出口,而關於核心模組的敘述可以設定如下
編譯後也可以透過 modinfo simrupt.ko
來查看模組相關資訊。
由於核心模組運行在 Kernel mode 下,因此我們需要先引入核心相關的標頭檔。
程式碼透過 simrupt_init
來初始化模組,使用 dev_id
來註冊裝置及 ret
來紀錄回傳數值。並透過 kfifo_alloc
來分配 PAGE_SIZE 大小的記憶體空間,且由 alloc_chrdev_region 來註冊一個裝置。 alloc_chrdev_region 會將分配的裝置編號儲存在 &dev_id
中,並使用 MAJOR 的宏來取得裝置編號。
接著透過 cdev_init
來初始化一個 character device 並使用 cdev_add
綁定。
而其中初始化過程中所設定的操作則定義在 &simrupt_fops
中,如下
將 character device 不同的操作對應到不同的操作函式中。由於 linux 核心中並沒有類似模擬中斷類型的裝置,因此使用 class_create 來創造一個新的類別。
最後透過 device_create
創建一個裝置節點 /dev/simrupt
讓使用者可以透過先前定義的 open
, read
來操作我們所建立的裝置。由於我們前面在 alloc_chrdev_region 時將裝置的起始值設定為 0 ,因此這邊使用 MKDEV(major, 0)
將主及次裝置編號合成對應到前面設定的裝置。
接著再分別使用 vmalloc
及 alloc_workqueue
宣告會使用到的 buffer 及 workqueue。
其中 PAGE_SIZE
是我們要分配的 buffer 大小,而 WQ_UNBOUND
及 WQ_MAX_ACTIVE
則分別表示不綁定特定 CPU 及同時啟用的工作數量上限。
最後再使用 timer_setup
初始化 timer ,並用 atomic_set
來統計裝置被打開的次數,以確保只有在裝置打開後才會執行。
其中設定當 timer 觸發時所要執行的程式 timer_handler
。其中, simrupt_open
會在使用者 open
裝置的時候觸發,並透過 atomic_inc_return
將 open_cnt 加 1 並且回傳,若回傳值為 1 則表示裝置是第一次被開啟,因此需要啟動 timer 。
其中 jiffies 在 linux 核心中表示現在,在經過 delay 秒後觸發 timer。
參考 linux 核心中的 timer.c 檔案及翻閱 Linux Kernel Development 中的敘述 , Linux 在啟動的過程中 (init/main.c) 會呼叫 init_timers()
。
建立 timer 的時候會先註冊對應的 softirq (i.e., open_softirq(TIMER_SOFTIRQ, run_timer_softirq);
) 在這邊將 TIMER_SOFTIRQ 類型的 softirq 其 handler 函式設為 run_timer_softirq,其中 open_softirq 實現如下
會把對應的 action 函式(在此為 run_timer_softirq )設定到全域陣列 softirq_vec[] 中對應 nr (在此為 TIMER_SOFTIRQ) 的位置。由此可知,timer 基於 softirq 建立,時間到觸發後會在 softirq context 中執行綁定的 timer_handler
。
在 timer_handler
中,先藉由 in_softirq
來確認我們目前是在 sortirq 中,並且使用 local_irq_disable
將原始的 irq 關閉。
接著使用 ktime_get()
來取得目前的時間,並在 process_data()
前後計算,以便分析執行時間,並以 ns 的單位計算。
最後再重新設定下一次中斷的觸發時間點,並重新開啟原始的 irq 。
當計時器觸發時會執行 process_data()
,首先先確認上一段是否有正確的將原始的 irq 取消
interrupts 可以分為
在 simrupt 中,透過將資料寫入環形的快取緩衝區 fast_buf 中,即為呼叫 fast_buf_put()
將 update_simrupt_data()
傳入 fast_buf
來模擬 top half 的操作。
接著, 先宣告一個 tasklet 的型態,指定其 callback 的函式為simrupt_tasklet_func()
。
再使用 tasklet_schedule
將剩餘操作排程等待執行,將 fast_buffer 搬移至 kfifo 的工作排程。
為了增加延遲處理速度,我們會將資料先放入快取緩衝區 fast_buf 中,其中對於 fast buffer 的操作又分為 fast_buf_put
, fast_buf_get
, 及 fast_buf_clear
,分別對應到寫入,讀出及移除,而其中的 fast buffer 則使用 Linux 的 Circular Buffer 來儲存。
Circular Buffer 是個固定大小的緩衝區,其中具有 2 個 indicies:
- head index: the point at which the producer inserts items into the buffer.
- tail index: the point at which the consumer finds the next item in the buffer.
當 head 和 tail 重疊時,代表目前是空的緩衝區。相反的,當 head 比 tail 少 1 時,代表緩衝區是滿的。當有項目被添加時,head index 會增加,當有項目被移除時,tail index 會被增加,tail 不會超過 head,且當兩者都到達緩衝區的末端時,都必須被設定回 0。也可以藉由此方法清除緩衝區中的資料。
寫入前先將目前 fast_buf 的地址儲存在 ring 的指標變數中,並儲存 head
接著用核心提供的 MARCO 來確認使否還有剩餘空間
若有空間則寫入 ring 的 head 位置
寫入完成後將 head 移動到下一個位置,由於緩衝區大小維持 2 的冪,因此可以使用 bitwise 操作來計算緩衝區空間。以 PAGE_SIZE 大小 8 為例,PAGE_SIZE-1 = 0b0111
,遮掉超過 PAGE_SIZE 的高位相當於只留下除完後的餘數,透過位元計算快速求得模除的結果。
在 fast_buf_put
的操作中,為了避免編譯器造成的錯誤,因此在讀取 tail 時候限制僅讀取一次
也使用 smp_rmb()
作為 memory barrier,會防止記憶體讀取指令的重排,確保先讀取索引值後再讀取內容。避免因為 cpu 的優化造成先執行 ring->head = ...
再執行 ing->buf[ring->head] = val;
造成錯誤,此即為 memory barrier 的用途。
讀取 fast buffer 中的資料方式和插入資料方式類似,同樣先讀取 head 及 tail 的地址,為了避免資料被其他 CPU 改動到,因此使用 READ_ONCE
。
接著便讀取 tail 的資料並更新 tail 將資料移出,而過程中也同樣使用 memory barrier 來防止錯誤。
在 fast_buf_clear
則直接將 head 及 tail 設置為 0 來清空 fast buffer。
在 top half 則透過上述的 fast_buf_put 將update_simrupt_data
產生的資料儲存在 fast buffer 中。
其中將 simrupt_data 限制在 0x20以內以防止小於ASCII 的空白字元 ' '
,並且透過 % 0x7f
讓資料循環在 0x20~0x7f
之間。
指定 tasklet 的 callback 的函式為 simrupt_tasklet_func()
。
並使用 tasklet_schedule
將任務排程
任務排程後,simrupt_tasklet_func
使用 queue_work
將工作項目 work(work item)加入工作佇列 simrupt_workqueue 中排程執行並計算執行時間。
其中工作佇列及每個 work 所綁定的 function 如下
simrupt_workqueue 是模組中的工作佇列,當 work 被排入該佇列並被 kernel thread 執行時,將會執行綁定的處理函式 simrupt_work_func()。
當 simrupt_work_func 執行時,會避免目前是執行 softirq 或是 interrupt 的情況,使用到 WARN_ON_ONCE
來警告。
透過 get_cpu()
來取得目前 CPU 編號並 disable preemption,最後需要 put_cpu() 重新 enable preemption。
在接下來的迴圈中,會不斷取出 circular buffer 中的資料 (consumer) 並且放入 kfifo buffer 中 (producer)。其中,為了避免不同的任務在執行過程中同時取得 circular buffer 的資料,或不同任務同時寫入 kfifo buffer 中,因此要使用 mutex_lock 來鎖住 critical section 。
其中 fast_buf_get
從 circular buffer 中取得資料,並使用 produce_data
放入 kfifo buffer 中。而 producer_lock 及 consumer_lock 則定義為 mutex lock。
最後的 wake_up_interruptible
則對應到 simrupt_read
中的 wait_event_interruptible
。當資料已經被放入 kfifo buffer 中,通知 simrupt_read
已經可以讀取。
資料使用 fast_buf_get
取出後,藉由 produce_data
存入 kfifo_buffer 中
當使用指使用 cat /dev/simrupt
開啟裝置時,會依序使用 open()
開啟檔案並使用 read()
來讀取檔案。對應到程式碼中 simrupt_open
及 simrupt_read
。
當程式碼第一次被開啟時,會將記數 open_cnt 加一,以開啟 timer 。
先使用 access_ok
來確認使用者提供的指標位置是合法的,並且使用 mutex_lock_interruptible
來確認目前是可以讀取的狀態。
接著使用 kfifo_to_user
來將資料從 fifo 中搬移到使用者空間的 buf 中。如果回傳錯誤碼則退出,若成功搬移也退出。若目前 kfifo 中沒有資料則使用 wait_event_interruptible
睡眠直到 wake_up_interruptible
將其喚醒後再確認 kfifo_len(&rx_fifo) 是否大於 0 若是則回傳 0 。
最後再釋放 read_lock。
使用 bpftool 分析