# Softirq
```c
struct softirq_action {
void (*action)(struct softirq_action *);
};
```
在 <linux/interrupt.h> 中 Softirq 使用上面的 function pointer 來定義要處理的函式。
並在 <kernel/softirq.c> 中定義
```c
static struct softirq_action softirq_vec[NR_SOFTIRQS];
```
每個 NR_SOFTIRQS 對應到一個要處理的類型如下

可以透過指令來查看每個 processor 的狀態
```bash
❯ cat /proc/softirqs
CPU0 CPU1 CPU2 CPU3 CPU4 CPU5 CPU6 CPU7 CPU8 CPU9 CPU10 CPU11 CPU12 CPU13 CPU14 CPU15
HI: 109721 2869711 72727 131753 72230 94170 85335 88910 80672 94838 91256 90124 78995 92006 41616 120521
TIMER: 267446 633971 241653 646459 230569 335494 205695 224188 195240 210106 221215 212685 190591 178533 145031 278274
NET_TX: 33 1 53 0 59 2 29 54 2 41 16 36 87 15 18 2
NET_RX: 14788 11263 13225 11755 7242 10869 12698 41630 8336 17306 18256 11298 19849 15652 2082 228121
BLOCK: 813 1028 551 404 1992 1537 1283 1240 1267 1297 584 783 431 192 804 2011
IRQ_POLL: 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
TASKLET: 2970 437 735 419991 1766 2540 1966 2618 3555 1595 3112 2252 1938 986 1177 7678
SCHED: 2032149 1714187 1110663 1341410 1082487 1035925 1064972 1061477 1029584 1026428 1051626 1058653 1050613 987149 993305 1090070
HRTIMER: 69 32 14 32 0 2 80 18 9 381 36 160 4 30 0 0
RCU: 928887 1049765 703344 775629 673889 675989 695192 670216 657793 701563 673167 661130 654198 649485 637746 731928
```
Linux 核心使用一個 32 位元的整數作為 bitmask 來紀錄目前處於 pending 狀態的最多 32 種 SoftIRQ 類型(type)。每個 SoftIRQ 類型底下各自維護其工作佇列(如逾時定時器、封包緩衝區、tasklet 等),負責儲存等待執行的工作項目(work items)。在 SoftIRQ 被觸發時,核心會依照 bitmask 並行(邏輯上)處理所有 pending 的類型,依序執行各自註冊的處理函式,進而批次處理對應的工作佇列內容。
```c
pending = local_softirq_pending();
if (pending) {
struct softirq_action *h;
/* reset the pending bitmask */
set_softirq_pending(0);
h = softirq_vec;
do {
if (pending & 1)
h->action(h);
h++;
pending >>= 1;
} while (pending);
}
```
> 1. It sets the pending local variable to the value returned by the
local_softirq_pending() macro.This is a 32-bit mask of pending softirqs—if bit
n is set, the nth softirq is pending.
> 2. Now that the pending bitmask of softirqs is saved, it clears the actual bitmask.
> 3. The pointer h is set to the first entry in the softirq_vec.
> 4. If the first bit in pending is set, h->action(h) is called.
> 5. The pointer h is incremented by one so that it now points to the second entry in
the softirq_vec array.
> 6. The bitmask pending is right-shifted by one.This tosses the first bit away and
moves all other bits one place to the right. Consequently, the second bit is now the first (and so on).
> 7. The pointer h now points to the second entry in the array, and the pending bit mask now has the second bit as the first. Repeat the previous steps.
> 8. ontinue repeating until pending is zero, at which point there are no more pending softirqs and the work is done. Note, this check is sufficient to ensure h always
points to a valid entry in softirq_vec because pending has at most 32 set bits and
thus this loop executes at most 32 times.
對應到的執行分別為
1. 讀取目前被觸發的 softirq 類型,其中 local_softirq_pending() 會回傳一個 32 位元的 mask 代表 32 種 softirq 。
2. 把 local_softirq_pending 中數值清 0 避免重複執行相同的 softirq。此時副本仍存在,透過這個副本處理對應的 handler。
3. 將 h 設在 softirq_vec 的第一個 entry,而 softirq_vec 內每個元素對應到一種 softirq 類型的 handler。
4. 如果為 1 則觸發。
5. 接著把 h 加一處理 softirq 下一個類型。
6. 把原本的 pending 右移,便可處理逐步讀取每一個值確定是否需要執行。
7. h 現在是 softirq_vec[1] , pending 的 bit0 現在對應原來的 bit1
8. 當 pending == 0,表示已經沒有任何 softirq 類型需要處理
>The softirq handlers run with **interrupts enabled** and **cannot sleep**.While a handler
runs, softirqs on the current processor are disabled.
## 使用
以 networking 的為例,在 <net/core/dev.c> 中使用 `open_softirq(NET_TX_SOFTIRQ, net_tx_action);` 註冊,將 handler 綁定為 `net_tx_action` ,當需要執行的時候使用 `raise_softirq(NET_TX_SOFTIRQ);` 將其標注為 1 ,如此在下次呼叫 `do_softirq()` 時便會被執行到。
因此流程為 :
[1] 系統啟動 or 模組初始化階段 : open_softirq(nr, handler)
[2] 中斷處理階段(例如:網卡/定時器中斷): interrupt handler 執行 raise_softirq(nr)
[3] 當前 CPU 回到可允許 softirq context 的位置(例如中斷返回)
[4] do_softirq() 用前述八個步驟執行該類所有待處理工作
# Tasklet
基於 softirq 建立,因此同樣可被 interrupts 且不可 sleep (表示內部不可使用 semaphores 或是 blocking function) ,而 tasklet 和 softirq 不同的地方在於前者是不能並行執行的 (但不同的 tasklet 可以在不同的 processor 執行),因此如果要和其他 tasklet 或 softirq 共享資源需要 lock 。分為 `HI_SOFTIRQ` 及 `TASKLET_SOFTIRQ` , 兩者僅差在執行順序。
>當某段程式碼「不能 sleep」,代表它執行的上下文(context)不是 process context,而是 atomic context(例如中斷處理函數、tasklet、底半部等等)。而 sleep 會主動讓出 CPU 控制權切換到其他 process 。
其結構如下
```c
struct tasklet_struct {
struct tasklet_struct *next; /* next tasklet in the list */
unsigned long state; /* state of the tasklet */
atomic_t count; /* reference counter */
void (*func)(unsigned long); /* tasklet handler function */
unsigned long data; /* argument to the tasklet function */
};
```
- state : zero, **TASKLET_STATE_SCHED**, or **TASKLET_STATE_RUN** 其中 TASKLET_STATE_RUN 只會出現在 multiprocessor ,因為單一處理器的設備會知道 tasklet 有沒有在執行
- count : 計算 tasklet 的數量,如果為 0 才能執行。
tasklet 使用 linked list 串接,並使用 tasklet_vec 及 tasklet_hi_vec 來儲存。
使用 tasklet_schedule() 及 tasklet_hi_schedule() 來排程
1. 檢查 tasklet 的狀態是否為 TASKLET_STATE_SCHED。
如果是,表示該 tasklet 已經被排程執行,因此這個函式可以立刻返回,不需再重複排程。
2. 呼叫 __tasklet_schedule()。
3. 儲存目前的中斷系統狀態,並關閉本地中斷。
- 這樣做是為了確保在 tasklet_schedule() 操作 tasklet 時,不會有其他處理器上的操作干擾它。
4. 將欲排程的 tasklet 加入 tasklet_vec 或 tasklet_hi_vec 鏈結串列的開頭。
- 這兩個串列是每個處理器獨有的,用來追蹤各自應執行的 tasklet。
5. 觸發 TASKLET_SOFTIRQ 或 HI_SOFTIRQ 這類的 softirq,
- 讓 do_softirq() 在不久的將來執行這個 tasklet。
6. 恢復先前的中斷狀態並返回。
完成排程後 do_softirq() 會被喚醒,並照著前述的步驟執行。而 handler 會按照下列步驟執行
1. 關閉本地中斷的傳遞(不需要先儲存狀態),
因為這段程式碼總是作為 softirq 處理程序執行,而在此情況下中斷總是處於啟用狀態。接著,取得該處理器上對應的 tasklet_vec 或 tasklet_hi_vec 串列。
2. 清空該處理器的 tasklet 串列,
做法是將其設為 NULL,表示這些 tasklet 已經取出準備處理。
3. 重新啟用本地中斷傳遞。
同樣地,不需要還原先前的中斷狀態,因為這個函式知道中斷原本就是啟用的。
4. 對剛剛取出的 tasklet 串列中的每個 tasklet 逐一處理。
5. 如果系統是多處理器架構,
檢查該 tasklet 是否正在其他處理器上執行(透過 TASKLET_STATE_RUN 標誌判斷)。
如果它正在執行,就不要執行它,直接跳過處理下一個排程中的 tasklet。
(回憶一下:同一個類型的 tasklet 同時只允許在一個處理器上執行。)
6. 如果 tasklet 目前未在執行,則設置 TASKLET_STATE_RUN 標誌,
以防止其他處理器同時執行它。
7. 檢查 tasklet 的 count 值是否為零,
這是為了確保該 tasklet 沒有被禁用。如果已被禁用,則跳過它,處理下一個。
8. 此時可以確定:tasklet 沒有在其他地方執行,已被標記為執行中,且沒有被禁用。
因此,執行該 tasklet 的處理函式。
9. tasklet 執行完成後,清除 TASKLET_STATE_RUN 標誌。
10. 繼續處理下一個排程中的 tasklet,直到所有待處理 tasklet 都已執行完畢。
- 當一個 tasklet 被排程時會執行一次
- 當被排程的 tasklet 又被排程總共僅會執行一次
- 已經正在執行中(例如在另一個處理器上),那麼這次新的排程會讓它再次被排入執行佇列,也就是執行完後還會再執行一次
- tasklet 總是會在排程它的那個處理器上執行
## 使用
statically 的建立 tasklet 的方法如下,使用到 <linux/interrupt.h> 裡面的宏。
```c
DECLARE_TASKLET(name, func, data)
DECLARE_TASKLET_DISABLED(name, func, data);
```
前者會建立一個 count 為 0 的 tasklet 而後者為建立 count 為 1 的 tasklet。舉例而言,下面兩種是等價的
```c
DECLARE_TASKLET(my_tasklet, my_tasklet_handler, dev);
struct tasklet_struct my_tasklet = { NULL, 0, ATOMIC_INIT(0),
my_tasklet_handler, dev };
```
如果想使用 indirecct rederence 的方式動態呼叫 tasklet ,則可以用下列方法
```c
/* dynamically as opposed to statically */
tasklet_init(t, tasklet_handler, dev);
```
可以使用 `tasklet_disable()` 來停用一個 tasklet,若這個 tasklet 正在執行會等到執行完成才停用。若使用 `tasklet_disable_nosync()` 則會直接停用,不會等它結束。
當你對一個 tasklet 呼叫 `tasklet_disable(&my_tasklet)` 時,它的內部 count 會遞增,表示它被暫時禁止執行。在這種狀態下,即使你呼叫 `tasklet_schedule(&my_tasklet)`,它也不會被執行。
呼叫 `tasklet_enable(&my_tasklet)`; 則會讓 count 減少(通常減 1),如果 count 變回 0,就代表 tasklet 已被重新啟用,可以再次被排程並執行。
# ksoftirq
>Softirq (and thus tasklet) processing is aided by a set of per-processor kernel threads.
- softirq(軟中斷)可能頻繁地觸發,再加上它們可以自我重新標記為活躍(active)的能力,這種組合會導致 user-space 程式得不到足夠的 CPU 時間(被餓死)
最初有兩種可行的解決方法
1. 只要有 softirq 發生,就持續處理它們,並且在結束前再次檢查是否還有待處理的 softirq,如果有就繼續處理。這樣可以確保 kernel 能及時處理 softirq,更重要的是,即使某些 softirq 在處理過程中再次被啟用(reactivate),它們也能被立即處理。
- 缺點 : 可能會有大量的 softirq 不斷地被觸發,而且它們會一直重新啟用自己(remark active)。在這種情況下,核心會陷入不停處理 softirq 的迴圈,卻幾乎做不了其他事情。
2. 不馬上處理那些重新被 reactivated 的 softirq。
當中斷(interrupt)返回時,核心只會查看目前有哪些「待處理的 softirq」,然後照常執行它們。如果其中有 softirq 在執行過程中再次將自己標記為 activate,它們不會馬上執行,而是等到下一次核心處理 softirq 時才會被執行。
- 缺點 : 這個「下一次處理 softirq」通常是要等到下一次有中斷發生,
這可能會導致這些「新的或重新 activate 的 softirq」在相當長一段時間內都無法執行
>The solution ultimately implemented in the kernel is to not immediately process reactivated softirqs.
>Instead, if the number of softirqs grows excessive, the kernel wakes up a family of kernel threads to handle the load.
當 softirq 的數量變得過多時,核心會喚醒一組 kernel thread 來處理這些工作。這些 thread 的執行優先權非常低(nice 值為 19),這樣能確保它們不會取代任何重要的工作來執行。
```c
for (;;) {
if (!softirq_pending(cpu)) //檢查該 CPU 是否有待處理的 softirq
schedule(); //如果沒有 softirq,則進入休眠(schedule() 會讓出 CPU 給其他程式)。
//將目前行程(也就是 ksoftirqd 本身)的狀態設為「正在執行」。
set_current_state(TASK_RUNNING);
//只要有 softirq 在排隊,就持續處理它們
while (softirq_pending(cpu)) {
do_softirq();
if (need_resched())
schedule();
}
set_current_state(TASK_INTERRUPTIBLE); //進入睡眠狀態(但可被中斷喚醒)
}
```
# Workqueue
> Work queues let your driver create a special worker thread to handle deferred work.
kernel threads 又叫做 worker threads , work queue 提供了簡單的介面讓不同的任務可以建立 kernel threads 。 而不同的 worker threads 則叫 `event/n` 其中 n 表示有多少 processor,每一個 processor 有一個 event。
workqueue_struct 的結構如下:
```c
/*
* The externally visible workqueue abstraction is an array of
* per-CPU workqueues:
*/
struct workqueue_struct {
struct cpu_workqueue_struct cpu_wq[NR_CPUS];
struct list_head list;
const char *name;
int singlethread;
int freezeable;
int rt;
};
```
其中每一個 CPU 都會有一個 cpu_workqueue_struct 的結構,而 list 則是把全部的 workqueue 串接在一起。
而 cpu_workqueue_struct 則結構如下
```c
struct cpu_workqueue_struct {
spinlock_t lock; /* lock protecting this structure */
struct list_head worklist; /* list of work */
wait_queue_head_t more_work;
struct work_struct *current_struct;
struct workqueue_struct *wq; /* associated workqueue_struct */
task_t *thread; /* associated thread */
};
```
> each type of worker thread has one workqueue_struct associated to it
所有的 work thread 都是用一般的 kernel thread 來跑的,使用 `worker_thread()` 後會進入無窮迴圈,沒有任務時候則會進入 sleep,並在有任務需要執行的時候喚醒。
work 的結構如下,每個 work_struct 會被使用 liked list 串連起來。
```c
struct work_struct {
atomic_long_t data;
struct list_head entry;
work_func_t func;
};
```
因此三種結構作用如下:
- workqueue_struct(整體工作佇列管理者)
代表一個整體的工作佇列(可能是單執行緒也可能是每 CPU 一條 thread)。
- cpu_workqueue_struct(每 CPU 一個工作佇列)
維護某個 CPU 的待處理工作 list。當有 work_struct 被加入後,它的對應 worker_thread() 就會被喚醒來處理。
- work_struct(工作本體)
你寫的函式(例如 my_work_func())會包裝成一個 work_struct,裡面包含要執行的 function pointer,等待被排入佇列。
>There can be multiple types of worker threads; there is one worker thread per processor of a given type. Parts of the kernel can create worker threads as needed. By default, there is the events worker thread. Each worker thread is represented by the cpu_workqueue_struct structure.The workqueue_struct structure represents all the worker threads of a given type.

而 `work_thread()` 的實作如下
```c
for (;;) {
prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
if (list_empty(&cwq->worklist))
schedule();
finish_wait(&cwq->more_work, &wait);
run_workqueue(cwq);
}
```
kernel thread 會永遠保持運行,先將進來的 work 加到 wait queue 之後設置為 TASK_INTERRUPTIBLE 表示可以被中斷。接著判斷是否有 work 需要執行,若沒有則 sleep 。 若有需要執行的 work 則將 worklist 中的 work 一個一個拿出來執行。
其中的 run_workqueue 實作如下
```c
while (!list_empty(&cwq->worklist)) {
struct work_struct *work;
work_func_t f;
void *data;
work = list_entry(cwq->worklist.next, struct work_struct, entry);
f = work->func;
list_del_init(cwq->worklist.next);
work_clear_pending(work);
f(work);
}
```
當需要執行的 list 非空的時候便會一個一個 work 抓出來執行,並在它執行完成後從 pending 移除。
## 使用
使用 workqueue 時候須建立 work_handler
```c
void work_handler(void *data)
```
在執行的時候會跑在 process context 中。執行 work_handler 時候是沒有 lock 且 enable interrupts 的。
>Note that, despite running in process context, the **work handlers cannot access user-space memory** because there is no associated user-space memory map for kernel threads.The kernel can access user memory only when running on behalf of a user-space process, such as when executing a system call.
若只是要綁定在預設的 events worker thread 則需要呼叫 `schedule_work(&work)`,若想要延遲執行則呼叫 `schedule_delayed_work(&work, delay);`,當需要等待全部執行完成時便呼叫 `void flush_scheduled_work(void);`
>any work that was scheduled via schedule_delayed_work(), and whose delay is not yet up, is not flushed via flush_scheduled_work().
另一方面 可以透過 create_workqueue 來建立新的 work queue
```c
struct workqueue_struct *create_workqueue(const char *name);
```
並使用 `queue_work` 及 `queue_delayed_work` 來將 work 加入到 queue 中。