# Softirq ```c struct softirq_action { void (*action)(struct softirq_action *); }; ``` 在 <linux/interrupt.h> 中 Softirq 使用上面的 function pointer 來定義要處理的函式。 並在 <kernel/softirq.c> 中定義 ```c static struct softirq_action softirq_vec[NR_SOFTIRQS]; ``` 每個 NR_SOFTIRQS 對應到一個要處理的類型如下 ![image](https://hackmd.io/_uploads/HJrmJ1FRJe.png =75%x) 可以透過指令來查看每個 processor 的狀態 ```bash ❯ cat /proc/softirqs CPU0 CPU1 CPU2 CPU3 CPU4 CPU5 CPU6 CPU7 CPU8 CPU9 CPU10 CPU11 CPU12 CPU13 CPU14 CPU15 HI: 109721 2869711 72727 131753 72230 94170 85335 88910 80672 94838 91256 90124 78995 92006 41616 120521 TIMER: 267446 633971 241653 646459 230569 335494 205695 224188 195240 210106 221215 212685 190591 178533 145031 278274 NET_TX: 33 1 53 0 59 2 29 54 2 41 16 36 87 15 18 2 NET_RX: 14788 11263 13225 11755 7242 10869 12698 41630 8336 17306 18256 11298 19849 15652 2082 228121 BLOCK: 813 1028 551 404 1992 1537 1283 1240 1267 1297 584 783 431 192 804 2011 IRQ_POLL: 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 TASKLET: 2970 437 735 419991 1766 2540 1966 2618 3555 1595 3112 2252 1938 986 1177 7678 SCHED: 2032149 1714187 1110663 1341410 1082487 1035925 1064972 1061477 1029584 1026428 1051626 1058653 1050613 987149 993305 1090070 HRTIMER: 69 32 14 32 0 2 80 18 9 381 36 160 4 30 0 0 RCU: 928887 1049765 703344 775629 673889 675989 695192 670216 657793 701563 673167 661130 654198 649485 637746 731928 ``` Linux 核心使用一個 32 位元的整數作為 bitmask 來紀錄目前處於 pending 狀態的最多 32 種 SoftIRQ 類型(type)。每個 SoftIRQ 類型底下各自維護其工作佇列(如逾時定時器、封包緩衝區、tasklet 等),負責儲存等待執行的工作項目(work items)。在 SoftIRQ 被觸發時,核心會依照 bitmask 並行(邏輯上)處理所有 pending 的類型,依序執行各自註冊的處理函式,進而批次處理對應的工作佇列內容。 ```c pending = local_softirq_pending(); if (pending) { struct softirq_action *h; /* reset the pending bitmask */ set_softirq_pending(0); h = softirq_vec; do { if (pending & 1) h->action(h); h++; pending >>= 1; } while (pending); } ``` > 1. It sets the pending local variable to the value returned by the local_softirq_pending() macro.This is a 32-bit mask of pending softirqs—if bit n is set, the nth softirq is pending. > 2. Now that the pending bitmask of softirqs is saved, it clears the actual bitmask. > 3. The pointer h is set to the first entry in the softirq_vec. > 4. If the first bit in pending is set, h->action(h) is called. > 5. The pointer h is incremented by one so that it now points to the second entry in the softirq_vec array. > 6. The bitmask pending is right-shifted by one.This tosses the first bit away and moves all other bits one place to the right. Consequently, the second bit is now the first (and so on). > 7. The pointer h now points to the second entry in the array, and the pending bit mask now has the second bit as the first. Repeat the previous steps. > 8. ontinue repeating until pending is zero, at which point there are no more pending softirqs and the work is done. Note, this check is sufficient to ensure h always points to a valid entry in softirq_vec because pending has at most 32 set bits and thus this loop executes at most 32 times. 對應到的執行分別為 1. 讀取目前被觸發的 softirq 類型,其中 local_softirq_pending() 會回傳一個 32 位元的 mask 代表 32 種 softirq 。 2. 把 local_softirq_pending 中數值清 0 避免重複執行相同的 softirq。此時副本仍存在,透過這個副本處理對應的 handler。 3. 將 h 設在 softirq_vec 的第一個 entry,而 softirq_vec 內每個元素對應到一種 softirq 類型的 handler。 4. 如果為 1 則觸發。 5. 接著把 h 加一處理 softirq 下一個類型。 6. 把原本的 pending 右移,便可處理逐步讀取每一個值確定是否需要執行。 7. h 現在是 softirq_vec[1] , pending 的 bit0 現在對應原來的 bit1 8. 當 pending == 0,表示已經沒有任何 softirq 類型需要處理 >The softirq handlers run with **interrupts enabled** and **cannot sleep**.While a handler runs, softirqs on the current processor are disabled. ## 使用 以 networking 的為例,在 <net/core/dev.c> 中使用 `open_softirq(NET_TX_SOFTIRQ, net_tx_action);` 註冊,將 handler 綁定為 `net_tx_action` ,當需要執行的時候使用 `raise_softirq(NET_TX_SOFTIRQ);` 將其標注為 1 ,如此在下次呼叫 `do_softirq()` 時便會被執行到。 因此流程為 : [1] 系統啟動 or 模組初始化階段 : open_softirq(nr, handler) [2] 中斷處理階段(例如:網卡/定時器中斷): interrupt handler 執行 raise_softirq(nr) [3] 當前 CPU 回到可允許 softirq context 的位置(例如中斷返回) [4] do_softirq() 用前述八個步驟執行該類所有待處理工作 # Tasklet 基於 softirq 建立,因此同樣可被 interrupts 且不可 sleep (表示內部不可使用 semaphores 或是 blocking function) ,而 tasklet 和 softirq 不同的地方在於前者是不能並行執行的 (但不同的 tasklet 可以在不同的 processor 執行),因此如果要和其他 tasklet 或 softirq 共享資源需要 lock 。分為 `HI_SOFTIRQ` 及 `TASKLET_SOFTIRQ` , 兩者僅差在執行順序。 >當某段程式碼「不能 sleep」,代表它執行的上下文(context)不是 process context,而是 atomic context(例如中斷處理函數、tasklet、底半部等等)。而 sleep 會主動讓出 CPU 控制權切換到其他 process 。 其結構如下 ```c struct tasklet_struct { struct tasklet_struct *next; /* next tasklet in the list */ unsigned long state; /* state of the tasklet */ atomic_t count; /* reference counter */ void (*func)(unsigned long); /* tasklet handler function */ unsigned long data; /* argument to the tasklet function */ }; ``` - state : zero, **TASKLET_STATE_SCHED**, or **TASKLET_STATE_RUN** 其中 TASKLET_STATE_RUN 只會出現在 multiprocessor ,因為單一處理器的設備會知道 tasklet 有沒有在執行 - count : 計算 tasklet 的數量,如果為 0 才能執行。 tasklet 使用 linked list 串接,並使用 tasklet_vec 及 tasklet_hi_vec 來儲存。 使用 tasklet_schedule() 及 tasklet_hi_schedule() 來排程 1. 檢查 tasklet 的狀態是否為 TASKLET_STATE_SCHED。 如果是,表示該 tasklet 已經被排程執行,因此這個函式可以立刻返回,不需再重複排程。 2. 呼叫 __tasklet_schedule()。 3. 儲存目前的中斷系統狀態,並關閉本地中斷。 - 這樣做是為了確保在 tasklet_schedule() 操作 tasklet 時,不會有其他處理器上的操作干擾它。 4. 將欲排程的 tasklet 加入 tasklet_vec 或 tasklet_hi_vec 鏈結串列的開頭。 - 這兩個串列是每個處理器獨有的,用來追蹤各自應執行的 tasklet。 5. 觸發 TASKLET_SOFTIRQ 或 HI_SOFTIRQ 這類的 softirq, - 讓 do_softirq() 在不久的將來執行這個 tasklet。 6. 恢復先前的中斷狀態並返回。 完成排程後 do_softirq() 會被喚醒,並照著前述的步驟執行。而 handler 會按照下列步驟執行 1. 關閉本地中斷的傳遞(不需要先儲存狀態), 因為這段程式碼總是作為 softirq 處理程序執行,而在此情況下中斷總是處於啟用狀態。接著,取得該處理器上對應的 tasklet_vec 或 tasklet_hi_vec 串列。 2. 清空該處理器的 tasklet 串列, 做法是將其設為 NULL,表示這些 tasklet 已經取出準備處理。 3. 重新啟用本地中斷傳遞。 同樣地,不需要還原先前的中斷狀態,因為這個函式知道中斷原本就是啟用的。 4. 對剛剛取出的 tasklet 串列中的每個 tasklet 逐一處理。 5. 如果系統是多處理器架構, 檢查該 tasklet 是否正在其他處理器上執行(透過 TASKLET_STATE_RUN 標誌判斷)。 如果它正在執行,就不要執行它,直接跳過處理下一個排程中的 tasklet。 (回憶一下:同一個類型的 tasklet 同時只允許在一個處理器上執行。) 6. 如果 tasklet 目前未在執行,則設置 TASKLET_STATE_RUN 標誌, 以防止其他處理器同時執行它。 7. 檢查 tasklet 的 count 值是否為零, 這是為了確保該 tasklet 沒有被禁用。如果已被禁用,則跳過它,處理下一個。 8. 此時可以確定:tasklet 沒有在其他地方執行,已被標記為執行中,且沒有被禁用。 因此,執行該 tasklet 的處理函式。 9. tasklet 執行完成後,清除 TASKLET_STATE_RUN 標誌。 10. 繼續處理下一個排程中的 tasklet,直到所有待處理 tasklet 都已執行完畢。 - 當一個 tasklet 被排程時會執行一次 - 當被排程的 tasklet 又被排程總共僅會執行一次 - 已經正在執行中(例如在另一個處理器上),那麼這次新的排程會讓它再次被排入執行佇列,也就是執行完後還會再執行一次 - tasklet 總是會在排程它的那個處理器上執行 ## 使用 statically 的建立 tasklet 的方法如下,使用到 <linux/interrupt.h> 裡面的宏。 ```c DECLARE_TASKLET(name, func, data) DECLARE_TASKLET_DISABLED(name, func, data); ``` 前者會建立一個 count 為 0 的 tasklet 而後者為建立 count 為 1 的 tasklet。舉例而言,下面兩種是等價的 ```c DECLARE_TASKLET(my_tasklet, my_tasklet_handler, dev); struct tasklet_struct my_tasklet = { NULL, 0, ATOMIC_INIT(0), my_tasklet_handler, dev }; ``` 如果想使用 indirecct rederence 的方式動態呼叫 tasklet ,則可以用下列方法 ```c /* dynamically as opposed to statically */ tasklet_init(t, tasklet_handler, dev); ``` 可以使用 `tasklet_disable()` 來停用一個 tasklet,若這個 tasklet 正在執行會等到執行完成才停用。若使用 `tasklet_disable_nosync()` 則會直接停用,不會等它結束。 當你對一個 tasklet 呼叫 `tasklet_disable(&my_tasklet)` 時,它的內部 count 會遞增,表示它被暫時禁止執行。在這種狀態下,即使你呼叫 `tasklet_schedule(&my_tasklet)`,它也不會被執行。 呼叫 `tasklet_enable(&my_tasklet)`; 則會讓 count 減少(通常減 1),如果 count 變回 0,就代表 tasklet 已被重新啟用,可以再次被排程並執行。 # ksoftirq >Softirq (and thus tasklet) processing is aided by a set of per-processor kernel threads. - softirq(軟中斷)可能頻繁地觸發,再加上它們可以自我重新標記為活躍(active)的能力,這種組合會導致 user-space 程式得不到足夠的 CPU 時間(被餓死) 最初有兩種可行的解決方法 1. 只要有 softirq 發生,就持續處理它們,並且在結束前再次檢查是否還有待處理的 softirq,如果有就繼續處理。這樣可以確保 kernel 能及時處理 softirq,更重要的是,即使某些 softirq 在處理過程中再次被啟用(reactivate),它們也能被立即處理。 - 缺點 : 可能會有大量的 softirq 不斷地被觸發,而且它們會一直重新啟用自己(remark active)。在這種情況下,核心會陷入不停處理 softirq 的迴圈,卻幾乎做不了其他事情。 2. 不馬上處理那些重新被 reactivated 的 softirq。 當中斷(interrupt)返回時,核心只會查看目前有哪些「待處理的 softirq」,然後照常執行它們。如果其中有 softirq 在執行過程中再次將自己標記為 activate,它們不會馬上執行,而是等到下一次核心處理 softirq 時才會被執行。 - 缺點 : 這個「下一次處理 softirq」通常是要等到下一次有中斷發生, 這可能會導致這些「新的或重新 activate 的 softirq」在相當長一段時間內都無法執行 >The solution ultimately implemented in the kernel is to not immediately process reactivated softirqs. >Instead, if the number of softirqs grows excessive, the kernel wakes up a family of kernel threads to handle the load. 當 softirq 的數量變得過多時,核心會喚醒一組 kernel thread 來處理這些工作。這些 thread 的執行優先權非常低(nice 值為 19),這樣能確保它們不會取代任何重要的工作來執行。 ```c for (;;) { if (!softirq_pending(cpu)) //檢查該 CPU 是否有待處理的 softirq schedule(); //如果沒有 softirq,則進入休眠(schedule() 會讓出 CPU 給其他程式)。 //將目前行程(也就是 ksoftirqd 本身)的狀態設為「正在執行」。 set_current_state(TASK_RUNNING); //只要有 softirq 在排隊,就持續處理它們 while (softirq_pending(cpu)) { do_softirq(); if (need_resched()) schedule(); } set_current_state(TASK_INTERRUPTIBLE); //進入睡眠狀態(但可被中斷喚醒) } ``` # Workqueue > Work queues let your driver create a special worker thread to handle deferred work. kernel threads 又叫做 worker threads , work queue 提供了簡單的介面讓不同的任務可以建立 kernel threads 。 而不同的 worker threads 則叫 `event/n` 其中 n 表示有多少 processor,每一個 processor 有一個 event。 workqueue_struct 的結構如下: ```c /* * The externally visible workqueue abstraction is an array of * per-CPU workqueues: */ struct workqueue_struct { struct cpu_workqueue_struct cpu_wq[NR_CPUS]; struct list_head list; const char *name; int singlethread; int freezeable; int rt; }; ``` 其中每一個 CPU 都會有一個 cpu_workqueue_struct 的結構,而 list 則是把全部的 workqueue 串接在一起。 而 cpu_workqueue_struct 則結構如下 ```c struct cpu_workqueue_struct { spinlock_t lock; /* lock protecting this structure */ struct list_head worklist; /* list of work */ wait_queue_head_t more_work; struct work_struct *current_struct; struct workqueue_struct *wq; /* associated workqueue_struct */ task_t *thread; /* associated thread */ }; ``` > each type of worker thread has one workqueue_struct associated to it 所有的 work thread 都是用一般的 kernel thread 來跑的,使用 `worker_thread()` 後會進入無窮迴圈,沒有任務時候則會進入 sleep,並在有任務需要執行的時候喚醒。 work 的結構如下,每個 work_struct 會被使用 liked list 串連起來。 ```c struct work_struct { atomic_long_t data; struct list_head entry; work_func_t func; }; ``` 因此三種結構作用如下: - workqueue_struct(整體工作佇列管理者) 代表一個整體的工作佇列(可能是單執行緒也可能是每 CPU 一條 thread)。 - cpu_workqueue_struct(每 CPU 一個工作佇列) 維護某個 CPU 的待處理工作 list。當有 work_struct 被加入後,它的對應 worker_thread() 就會被喚醒來處理。 - work_struct(工作本體) 你寫的函式(例如 my_work_func())會包裝成一個 work_struct,裡面包含要執行的 function pointer,等待被排入佇列。 >There can be multiple types of worker threads; there is one worker thread per processor of a given type. Parts of the kernel can create worker threads as needed. By default, there is the events worker thread. Each worker thread is represented by the cpu_workqueue_struct structure.The workqueue_struct structure represents all the worker threads of a given type. ![image](https://hackmd.io/_uploads/ByYpQKACke.png =70%x) 而 `work_thread()` 的實作如下 ```c for (;;) { prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE); if (list_empty(&cwq->worklist)) schedule(); finish_wait(&cwq->more_work, &wait); run_workqueue(cwq); } ``` kernel thread 會永遠保持運行,先將進來的 work 加到 wait queue 之後設置為 TASK_INTERRUPTIBLE 表示可以被中斷。接著判斷是否有 work 需要執行,若沒有則 sleep 。 若有需要執行的 work 則將 worklist 中的 work 一個一個拿出來執行。 其中的 run_workqueue 實作如下 ```c while (!list_empty(&cwq->worklist)) { struct work_struct *work; work_func_t f; void *data; work = list_entry(cwq->worklist.next, struct work_struct, entry); f = work->func; list_del_init(cwq->worklist.next); work_clear_pending(work); f(work); } ``` 當需要執行的 list 非空的時候便會一個一個 work 抓出來執行,並在它執行完成後從 pending 移除。 ## 使用 使用 workqueue 時候須建立 work_handler ```c void work_handler(void *data) ``` 在執行的時候會跑在 process context 中。執行 work_handler 時候是沒有 lock 且 enable interrupts 的。 >Note that, despite running in process context, the **work handlers cannot access user-space memory** because there is no associated user-space memory map for kernel threads.The kernel can access user memory only when running on behalf of a user-space process, such as when executing a system call. 若只是要綁定在預設的 events worker thread 則需要呼叫 `schedule_work(&work)`,若想要延遲執行則呼叫 `schedule_delayed_work(&work, delay);`,當需要等待全部執行完成時便呼叫 `void flush_scheduled_work(void);` >any work that was scheduled via schedule_delayed_work(), and whose delay is not yet up, is not flushed via flush_scheduled_work(). 另一方面 可以透過 create_workqueue 來建立新的 work queue ```c struct workqueue_struct *create_workqueue(const char *name); ``` 並使用 `queue_work` 及 `queue_delayed_work` 來將 work 加入到 queue 中。