# Linux 同步處理機制 ## 概念 最大處理吞吐量自然需要並行並充分利用可用的處理資源。在高效能的並行應用中,僅允許單一執行緒在任何時刻執行的關鍵區段 (critical section,以下簡稱 CS) 相當常見。現代作業系統提供簡單的鎖定機制,以強制執行單一執行緒的限制。兩種常見的鎖類型為 spinlock 與 mutex。每種鎖都能對 CS 形成屏障,從而維護程式的正確性。然而,spinlock 與 mutex 在內部的實作方式截然不同,這在多種常見的工作負載下導致顯著的效能差異。 ![image](https://hackmd.io/_uploads/SkmJbzt3yl.png) > 取自 [Optimizing for Workloads: Linux Spinlocks vs Mutexes](https://nathanpetersen.com/2019/02/17/optimizing-for-workloads-linux-spinlocks-vs-mutexes/) spinlock 在單處理器且不支援搶佔式排程的作業系統上,其實沒作用,只有在多處理器環境下才有其價值。最基本的 spinlock 是所有執行緒無序搶奪同一鎖的那種,但由於所有處理器上的執行緒都在等待同一個變數,這會導致所有爭鎖處理器的快取 (cache) 被頻繁刷新。為了解決這個問題,設計者引入新的 spinlock 機制:讓每個執行緒在本地處理器的變數上自旋,避免快取的廣泛刷新。這類 spinlock 的第一個實作是陣列鎖,但陣列鎖需靜態分配固定大小,若爭鎖的執行緒較少,便造成空間浪費。於是出現第二種實作 —— 佇列鎖。佇列可依需求動態擴展,解決陣列鎖空間利用率低的問題。 雖然這類新型 spinlock 避免快取刷新,同時也減少空間浪費,但卻引發飢餓問題。排在佇列後方的執行緒,因等待時間過長,可能導致反應能力下降。為此,引入了具有超時機制的鎖,以避免執行緒長時間自旋導致系統效能降低。總的來說,只要 spinlock 的佇列過長,就有可能產生飢餓現象。那麼,有沒有辦法限制佇列長度呢?答案是肯定的,這就是「複合鎖」。複合鎖綜合超時鎖與佇列鎖的優勢,並引入新的競爭機制 —— 類似 access token 的概念。也就是說,只有獲得 access token 的執行緒才可進入佇列爭奪 spinlock。access token 數量有限,因此成功限制佇列長度。若執行緒拿到 access token 後,在佇列中等待超時,將自動放棄 access token 並拋出例外,由呼叫者決定後續處理。 然而,這樣的設計帶來一個問題:本來是爭奪 spinlock,現在變成爭奪 access token,這是否失去原意?實際上,爭奪 access token 是為了讓 spinlock 的競爭過程更公平,進而避免飢餓。既然目標是避免飢餓,那麼打破先來先服務的順序是必要的。access token 的競爭過程實際上就是一種打破先來先服務的策略,並藉由簡單的退避演算法來控制爭奪 access token 的開銷,避免對系統造成過大負擔。若單純為了解決飢餓問題,timedlock 已相當實用;但複合鎖進一步強化公平性,這種公平性正是透過打破先來先服務所實作的。 ## 超市排隊與多執行緒系統的聯想 在服務台排隊問題上,超市的處理實不理想,常常發生隊頭擁塞的情況。試想,目前方結帳發生糾紛,導致後方的我們只能乾等,看著其他結帳隊伍快速前進。這種排隊方式雖然簡單,卻將「排隊排程」的任務交給顧客自己。顧客需自行判斷排哪條佇列較快,如觀察購物量、有無衣物 (卡片糾紛風險) 、是否有需稱重商品、有無打折商品、顧客年齡、收銀員熟練度等,這完全是一場賭博。 相比之下,銀行或餐廳的排隊就合理許多,顧客抽號碼,排入單一佇列,由空閒的服務台叫號,屬於「單一佇列、多服務台」的排程系統,有效避免隊頭擁塞。顧客只需持票等待,無需實體排隊,票號形成虛擬佇列,沒叫號前可自由活動。 ### 可自由活動?不代表可離場 尤其在業務處理快速的情況下,一旦離開場館,剛踏出門就被叫號,只得匆忙返回,反倒不如不離開。若等待時間較長,則可完成其他任務,若收益高於體力成本,離場即有意義。 ## semaphore 的聯想與實作 聯想到 semaphore。它就像是「單一佇列、多服務台」的系統,初始值即服務台數。當一執行緒獲服務,相當於服務台減一 (`down`) ,而服務完成後則服務台數加一 (`up`) 。「服務」即進入臨界區。 ### semaphore的 sleep-wait 模式與性能瓶頸 問題是,Linux semaphore 採用 sleep-wait,對我的場景來說切換開銷過高,`perf` 顯示大量時間耗在 `schedule` 與 `wake_up`。頻繁切換導致 cache 失效,性能下滑。雖然 CFS 排程機制可調整 jitter,但損耗仍不可忽視。 ### ticket lock 的 cache 問題與改良 Linux 的 ticket lock,所有爭鎖者與持鎖者共用同一變數,導致多處理器 cache coherence 開銷巨大。嘗試設計「本地接力 spinlock」,讓每個爭鎖者只觸碰自己專屬的變數,且避免快取行 (cache line) 衝突,釋放鎖時僅寫入下一個爭鎖者的本地變數,降低 cache coherence 需求。 ### 多持鎖者的構想與 semaphore 的聯想 結合 spinlock 與超市排隊經驗,萌生出一個可讓多 CPU 同時持有鎖的 自旋佇列構想。後來發現,這不正是 semaphore?但 Linux 的 sleep-wait semaphore 不足以滿足需求,這裡需要 spin-wait 版本,因為資料包傳送非常快速。semaphore 理應適用,但不願 sleep,故採用多 spinlock 機制。結果顯示,spin-wait semaphore效果優異。 ## Linux 原生 semaphore 實作簡析 為凸顯主體概念,以下簡化 semaphore 實作,忽略細節: ```c struct semaphore { raw_spinlock_t lock; unsigned int count; struct list_head wait_list; }; struct semaphore_waiter { struct list_head list; struct task_struct *task; bool up; /* 區域 CPU 變數,避免多核 cache 一致的成本 */ }; static int down(struct semaphore *sem) { if (likely(sem->count > 0)) { sem->count--; } else { struct task_struct *task = current; struct semaphore_waiter waiter; list_add_tail(&waiter.list, &sem->wait_list); waiter.task = task; waiter.up = false; for (;;) { __set_task_state(task, TASK_UNINTERRUPTIBLE); schedule(); if (waiter.up) { return 0; } } } } void up(struct semaphore *sem) { if (likely(list_empty(&sem->wait_list))) { sem->count++; } else { struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list, struct semaphore_waiter, list); list_del(&waiter->list); waiter->up = true; wake_up_process(waiter->task); } } ``` 註:上述簡化程式碼忽略 semaphore 內部的 spinlock,實際情況中需防止多服務台同時操作同一等待者。 ## 自旋 semaphore 實作 (spin-wait) 以下為改造版,自旋等待 semaphore: ```c struct spin_semaphore { unsigned int count; struct list_head wait_list; }; struct spin_semaphore_waiter { struct list_head list; struct task_struct *task; bool up; }; #define BEGIN_ATOMIC /* 假設由 lock 指令保護 */ #define END_ATOMIC static int spin_down(struct spin_semaphore *sem) { if (likely(sem->count > 0)) { sem->count--; } else { struct task_struct *task = current; struct spin_semaphore_waiter waiter; BEGIN_ATOMIC list_add_tail(&waiter.list, &sem->wait_list); waiter.task = task; waiter.up = false; END_ATOMIC for (;;) { cpu_relax(); /* 自旋等待 */ if (waiter.up) return 0; } } } void spin_up(struct spin_semaphore *sem) { BEGIN_ATOMIC if (likely(list_empty(&sem->wait_list))) { sem->count++; END_ATOMIC } else { struct spin_semaphore_waiter *waiter = list_first_entry(&sem->wait_list, struct spin_semaphore_waiter, list); list_del(&waiter->list); waiter->up = true; END_ATOMIC } } ``` 所有函式與結構體均加上 `spin_` 前綴,標示自旋版本的 semaphore。 以上程式碼實作出通用自旋等待佇列,而 spinlock 則是其中「單服務台」的特例。 ## 鎖的開銷 鎖的開銷是巨大的,尤其是在多核處理器環境下更為明顯。 引入多處理的目的是提升並行處理能力,以增強系統效能。然而,由於共享臨界區 (critical section) 的存在,同一時間內只能允許一個執行緒存取 (特別是寫操作) ,導致本應並行執行的工作流在這裡被序列化 (serialization) 。形象地來看,這就像是一條寬闊的馬路上出現了一個瓶頸,而這種序列化是無法避免的,因為它是鎖機制的本質。問題在於,執行緒該如何應對這個瓶頸?顯然,誰都無法繞開它,關鍵在於當它們同時到達瓶頸時該怎麼辦。 一種最直接但粗暴的做法是「鬥毆式搶鎖」,這也是最簡單的自旋鎖 (spinlock) 設計方式。然而,一種更為溫和的策略則是:既然無法立即取得鎖,那就主動讓出 CPU (yield),進入睡眠狀態 (sleep) ,等待鎖釋放後再恢復執行。這就是睡眠等待 (sleep-wait) 機制,而相對應的方式則是持續忙碌等待 (spin-wait) 。問題是,執行緒本身該如何在這兩種方式之間做出最佳選擇?事實上,這種選擇的權限不在執行中的執行緒,而是在設計程式時的開發者手中。 不應簡單比較 sleep-wait 和 spin-wait 哪種效能更好,因為鎖本身就是一種效能損耗,無論哪種方式都會影響系統效能,只是影響方式不同: - sleep-wait 的開銷來自於執行緒切換 (context switch) ,涉及暫存器、堆疊、刷新快取、MMU TLB 刷新等作業。 - spin-wait 則會浪費 CPU 週期,雖然沒有切換開銷,但在等待鎖的這段時間內無法執行任何有效工作。 為何仍然要引入 spin-wait?因為若始終使用 sleep-wait,當執行緒 A 進入睡眠,系統切換到執行緒 B,而此時鎖被釋放,系統無法保證執行緒 A 立即獲得 CPU,即使它被喚醒,仍需付出額外的切換開銷。而若鎖的釋放時間極短,那麼這樣的切換代價可能得不償失。因此,在某些情況下,持續忙碌等待 (spin-wait) 可能更為合理。 與中斷 (interrupt) 機制相比,鎖的開銷更大。若能夠透過關閉中斷 (disable interrupts) 來實作無鎖操作,那麼這是值得考慮的選項。但是,關閉中斷也會帶來副作用,例如增加系統延遲,因此需要權衡關閉中斷與鎖的開銷,評估其可行性。 ### 關於自旋鎖 (Spinlock) 自 Linux 核心初始版本開始,就已經引入自旋鎖。自旋鎖的行為模式是在等待鎖的過程中「原地打轉」,這導致 CPU 週期的浪費,但這是一種必要的取捨。系統設計本質上是一場權衡 (trade-off) ,雖然不是零和博弈,但始終沒有完美方案,只能努力尋找折衷點。 若不使用自旋鎖,應該怎麼做?很顯然,選擇的方法就是將執行緒切換至其他工作,等待持鎖者釋放鎖後再喚醒。然而,這樣會帶來額外開銷: 1. 若有多個執行緒競爭同一把鎖,喚醒所有執行緒後再進行競爭,這本質上等於「鬥毆搶鎖」; 2. 即使只喚醒隊列中的第一個執行緒,執行緒切換 (context switch) 的開銷仍然存在。 因此,若忙碌等待所浪費的 CPU 週期小於兩次執行緒切換的開銷,那麼自旋等待就是合理的。這也是為什麼自旋鎖適用於短時間持有鎖的臨界區,因為: - 自旋時間過長會導致 CPU 週期浪費過多,反而得不償失。 - 自旋鎖適用於 CPU 數量較少的環境,因為隨著 CPU 數量增加,自旋延遲會成倍增加。 ### Linux 自旋鎖的歷史發展 Linux 的自旋鎖經歷二個世代的演化: 1. 第一代:無序自旋鎖 (無公平性。多個 CPU 競爭同一把鎖時,獲得鎖的機率是隨機的,受 cache 等因素影響,不一定是先嘗試獲取鎖的 CPU 先獲得鎖。 2. 第二代:Ticket 自旋鎖 (公平性機制)。透過票券機制 (ticketing) ,讓 CPU 依照排隊順序獲得鎖,避免某些 CPU 因為長期競爭失敗而導致資源飢餓 (starvation) 。 Ticket 自旋鎖的設計非常巧妙,它將一個 32 位變數拆分為高 16 位 (next ticket) 和低 16 位 (current ticket) : - 當執行緒請求鎖時,會對 next ticket 進行原子性加 1 操作。 - 取得鎖的條件是:next ticket == current ticket,否則持續自旋等待。 - 釋放鎖時,對 current ticket 加 1,即可讓下一個排隊的執行緒獲取鎖。 這樣的設計確保了 FIFO (先進先出) 公平性,避免了第一代自旋鎖中存在的隨機爭搶問題。 ### 自旋鎖的改進建議 目前在使用自旋鎖時,一般會先進行一次 try-lock 嘗試: ```c if (spin_trylock(&lock)) { /* 成功獲取鎖,執行臨界區 */ } else { /* 鎖被佔用,進入等待機制 */ } ``` 然而,這種 try-lock 只會返回成功或失敗,但沒有提供額外資訊。例如,它可以返回: - 預估等待時間 - 目前等待隊列長度 這樣,呼叫者就能根據這些資訊決定是否應該: 1. 繼續自旋等待 2. 轉去執行其他任務 此外,這些統計資訊還可以用於動態改進自旋鎖,例如: - 根據隊列長度調整 pause 指令,改進 CPU 管線 (pipeline) - 智能適應不同負載場景,動態選擇最佳的同步策略 雖然 Linux 核心的自旋鎖機制已經相當成熟,但仍有許多可以改進的空間,尤其是在大規模多核心系統下,如何更有效率地管理 CPU 資源,將是未來鎖機制設計的關鍵方向。 ## 自適應自旋鎖 很多時候,當一個行程剛進入睡眠狀態等待 mutex 時,mutex 可能已經被釋放。若能夠即時感知到 mutex 被釋放,將是最理想的情況。解決這個問題的方式之一,是使用自旋忙碌等待 (spin-waiting) 而非阻塞等待 (blocking wait),這樣的理解是否正確? ### 競態條件與自旋鎖的演進 最初,自旋鎖 (spinlock) 是為了解決頻繁睡眠與喚醒帶來的效能問題。然而,在即時系統 (real-time systems) 中,自旋鎖可能導致其他行程長時間延遲,進而影響整體吞吐量 (throughput) 。因此,為了避免這種情況,即時系統又回到了睡眠/喚醒 (sleep/wakeup) 機制,因為即時系統的首要目標不是節省開銷,而是確保執行穩定性。睡眠/喚醒機制確保了爭搶鎖的行程不會影響到其他行程,從而維持系統吞吐量。然而,以 mutex 實作的自旋鎖仍然存在一定的代價,因為它必須支援優先權繼承 (priority inheritance protocol, PIP) 或優先權提升 (priority boosting),以避免優先權反轉 (priority inversion) 與死鎖 (deadlock)。 在可搶占 (preemptive) 核心環境下,若高優先權的行程準備執行,它將會搶占低優先權行程。若低優先權行程持有某個鎖,而此鎖正被高優先權行程等待,則會導致高優先權行程被阻塞,尤其是在單 CPU 環境或行程與 CPU 綁定的情況下。此外,若此時一個中等優先權的行程介入並搶占了低優先權行程,則可能導致高優先權行程的阻塞時間過長,這在即時系統中是不可接受的。因此,Linux 核心引入優先權繼承協定,但該協定的實作需要額外的資料結構與演算法管理,這會增加系統開銷,抵消部分因睡眠/喚醒機制提升吞吐量而獲得的效益。因此,在不同方案之間取得平衡成為關鍵問題。 ### Linux 核心的自適應鎖實作 Linux 核心的自適應自旋鎖設計確保: 1. 持有鎖的行程應儘速釋放鎖 2. 自旋鎖可以被搶占,但不應主動進入睡眠** 3. 當持有鎖的行程可能被搶占時,爭鎖行程應避免自旋 這段程式碼展示 Linux 核心如何判斷是否應該繼續自旋: ```c while (waiter->spin && !lock->waiters) { struct thread_info *owner; owner = ACCESS_ONCE(lock->owner); /* 在此 owner 上自旋 */ if (owner && !mutex_spin_on_owner(waiter, owner)) break; cpu_relax(); /* 讓出 CPU */ } ``` `mutex_spin_on_owner` 的邏輯如下: ```c while (waiter->spin && !lock->waiters) { if (lock->owner != owner) /* owner 變更,重新開始自旋 */ break; if (task_thread_info(rq->curr) != owner) { /* owner 未運行在原 CPU,不再自旋 */ ret = 0; break; } if (need_resched()) { /* 目前 CPU 上有更高優先權的行程準備執行,不再自旋 */ ret = 0; break; } cpu_relax(); /* 自旋 */ } ``` 這段程式碼說明 Linux 核心如何透過兩層自旋來改進鎖爭搶的過程。 ## spinlock 前世今生 > 以下改寫自 [spinlock 前世今生](https://zhuanlan.zhihu.com/p/133445693) 軟體通常會極致地配合硬體,盡最大努力最佳化效能。圍繞著快取所做的最佳化非常多,而我們就以一個點作為切入,探究 spinlock 的前世今生。`spinlock` 是 Linux 核心中常見的互斥操作,適用於不可睡眠的場景,可以說是最基礎的同步操作。不知你是否研究過 spinlock 的進化史?從最初的 `wild spinlock` 到 `ticket spinlock`,再到今天的 `qspinlock`,可謂是一波三折,大起大落!那麼,`spinlock` 究竟是怎麼一步一步發生蛻變的?又是為什麼要發生這些變化?背後究竟是什麼力量推動著自身的變革呢? Linux 核心原始程式碼就像是一部歷史,以下藉由探討快取切入來 spinlock 的前世今生,嘗試探索 spinlock 和快取之間的恩怨情仇。 ### wild spinlock 首先來介紹一下這位主角,spinlock (自旋鎖) 是一種互斥的操作,用在不能進行睡眠的行程間對共享資料進行互斥的存取。任一時刻只能有一個行程能夠取得鎖,其它無法取得 spinlock 的行程則會在原地自旋,持續等待直到獲得鎖。若讓你實作一個 spinlock,會不會覺得這件事看起來好像挺簡單的? ```c struct spinlock { int locked; }; void spin_lock(struct spinlock *lock) { while (lock->locked); lock->locked = 1; } void spin_unlock(struct spinlock *lock) { lock->locked = 0; } ``` 看起來是不是很簡單。上面的例子中 spinlock 的 `locked` 成員是 1 代表 locked。若是 0 代表鎖是釋放狀態。`spin_lock()` 保證在不能獲得鎖的情況下一直 spin 等待。不過這裡有個小問題需要修改下,在 `spin_lock()` 中判斷 `locked` 是 0 的同時需要將 `locked` 成 1,這整個過程必須保證是 atomic 的操作。所以我們需要一道 atomic 指令。我們假設 `test_and_set()` 就是一道 atomic 指令,測試一個變數的值,然後置為 1。返回值是變數的舊值。這一系列操作是由硬體提供的 atomic 指令實作的。 ```c void spin_lock(struct spinlock *lock) { while (test_and_set(&lock->locked)); } ``` 我們假設系統有 8 個 CPU。假設 8 個 CPU 同時申請 `spinlock`,CPU0 成功持有鎖。想一下 CPU1–CPU7 在做什麼?由於 `test_and_set` 是無條件寫入 1 的操作。所以 CPU1–CPU7 一直在寫變數。根據多核 cache coherency protocol 中的 MESI protocol,我們知道修改一個變數時,這個變數必須在快取行中,且狀態是 E 或 M。所以當 8 個 CPU 同時申請 spinlock 時會發生以下狀況。 * CPU1 寫變數時會透過 MESI 協定發送 `invalid` 訊息給其他 CPU,然後修改變數為 1。 * CPU2 修改變數前發送 `invalid` 訊息給 CPU1,然後修改變數。 * CPU3 發送 `invalid` 訊息給 CPU2,然後修改變數。 * CPU4–7 也在重複這些事情。 這就無形之中增加了頻寬壓力,使效能下降。我們既要 spin 又想避免 cache thrashing 導致的頻寬壓力和效能損失。我們知道快取行有一種狀態是 `shared`,可以在多個 CPU 之間共享,而不用發送 `invalid` 訊息,而且 CPU1–CPU7 一直在 spin,修改變數也沒有意義,只有 CPU0 `unlock` 的時候,修改才有價值。所以我們對 `spin_lock()` 修改如下。 ```c void spin_lock(struct spinlock *lock) { while (lock->locked || test_and_set(&lock->locked)); } ``` 這樣 CPU1–CPU7 會一直保持 `Shared` 狀態,沒有 cache thrashing。這種最佳化在 Linux 核心程式碼中經常可見,例如[這裡](https://elixir.bootlin.com/linux/v5.4.33/source/include/linux/bit_spinlock.h#L16),當你看到這種奇怪的寫法時,不要覺得奇怪。當 `spin_unlock()` 時,這種 `Shared` 狀態會被打破。但是這也足以在一定程度上提升效能。 好景不常,我們發現了新的問題。某些等待的 CPU 可能會有 starvation 現象。例如,CPU1–CPU7 在原地 spin,CPU0 釋放鎖的時候,CPU1–CPU7 哪個 CPU 的 cache 先看到 `locked` 的值,就會先獲得鎖。所以不排隊的機制可能導致部分 CPU "餓死"。為了解決這個問題,我們從 `wild spinlock` 跨度到 `ticket spinlock`。 ### ticket spinlock 歷史推進,我們導入 FIFO 排隊機制,按申請順序取得鎖,確保公平性。 ```c struct spinlock { unsigned short owner; unsigned short next; }; void spin_lock(struct spinlock *lock) { unsigned short next = xadd(&lock->next, 1); while (lock->owner != next); } void spin_unlock(struct spinlock *lock) { lock->owner++; } ``` `spin_lock()` 中的 `xadd()` 是一個會將變數加 1 的 atomic 操作,並返回變數之前的值。在這一版實作中,我們可以確定當 `owner` 等於 `next` 時,代表鎖是釋放狀態。否則,代表鎖被其他人持有。`next` 就像是排隊拿票的機制,每來一個申請者,`next` 就加 1,代表票號;`owner` 代表目前持鎖的票號。 看起來一切都完美了,但是現實往往是殘酷的。接下來又遇到什麼問題了呢? ### ticket spinlock 的問題 我們依然以上面 8 個 CPU 的系統為例說明。CPU0–CPU7 依次申請 `spinlock`。假設初始狀態,`spinlock` 變數的結構體沒有快取到任何 CPU 的快取內。CPU0 獲取 `spinlock`,此時的 cache 狀態如下。 ![97e07e22-8438-4120-a1f4-5a7039a115c1](https://hackmd.io/_uploads/B1Bk9lKTyx.jpg) 然後 CPU1 申請鎖,並更新 spinlock 變數。所以會 `invalid` CPU0 的 cache。接著 spinlock 變數被快取到 CPU1 的 cache,然後 CPU1 開始一直 spin,持續讀取 spinlock 的值。 ![cff4164d-3289-48d4-9baf-2110ede21528](https://hackmd.io/_uploads/SyukcxKp1e.jpg) 接著 CPU2 申請鎖,並更新 spinlock 變數。所以會 `invalid` CPU1 的 cache,然後更新 `next` 的值。而 CPU1 又會讀取 spinlock 的值,所以 spinlock 變數對應的快取行的狀態最終是 `Shared`,並且同時快取在 CPU1 和 CPU2 的 cache 中。 ![ecca4160-d8b0-4fbb-bc96-9ccd66bd00e2](https://hackmd.io/_uploads/BJ2k5xtpyl.jpg) 後面 CPU3–CPU7 依次申請 `spinlock`,就是重複上面的操作:在更新 `next` 之前,先 `invalid` 其他 CPU 的 cache,然後其他 CPU 再從目前 CPU 獲取更新後的 spinlock 值。cache line 的狀態更新為 `Shared`,然後繼續 spin。 ![0cf5ceb8-1ec7-499d-ad01-7e9965e66c6d](https://hackmd.io/_uploads/SJylcxYTke.jpg) 當 CPU0 `spin_unlock()` 的時候,會先 `invalid` CPU1–CPU7 對應的 cache line,然後更新 `owner` 的值。 ![48aef43c-baff-4daf-b8ca-0265f26812bf](https://hackmd.io/_uploads/BkMecgFpyg.jpg) CPU1–CPU7 讀取 `owner` 的值時,又會從 CPU0 獲取最新資料,並快取到各自的 cache 中。 ![3dfa7832-cb60-4758-8f4f-26d27056f12f](https://hackmd.io/_uploads/rkLl5gFayg.jpg) 不知道你是否發現了問題:隨著 CPU 數量的增加,總線頻寬壓力變得非常大。而且延遲也會隨之上升,效能逐漸下降。更重要的是,當 CPU0 釋放鎖後,CPU1–CPU7 中其實只有一個 CPU 能獲得鎖 —— 根本沒必要影響所有 CPU 的快取,只需要影響接下來按 FIFO 順序該獲得鎖的那個 CPU 就行。 這說明 `ticket spinlock` 並不是 scalable 的 (最初的 `wild spinlock` 也有相同問題) 。於是,歷史又往前邁出了一步。 ### qspinlock 我們來到了 `qspinlock` 的時代,`qspinlock` 的出現正是為了解決 `ticket spinlock` 上述的問題。 我們先來思考一下造成這個問題的根本原因,就是每個 CPU 都在 spin 同一個共享變數 spinlock 上,導致頻寬壓力大、延遲高、效能差。所以,只要每個 CPU spin 的不是同一個變數,而是各自不同的變數,就可以有效避免這種情況。 這就需要我們換一種排隊方式,例如使用單向鏈結串列。單向鏈結串列一樣可以保證 FIFO 順序,每次解鎖時,也只需要通知鏈結串列中頭部的 CPU 即可。這,其實就是 MCS lock 的實作原理。而 `qspinlock` 的實作,就是建立在 MCS lock 的理論基礎上。 接下來,我們先探究一下 MCS lock 是如何實作的。 ```c struct mcs_spinlock { struct mcs_spinlock *next; int locked; }; ``` `mcs_spinlock` 中 `next` 成員就是構建單向鏈結串列的基礎,所有 `spin` 自旋的 CPU 都可以藉助 `mcs_spinlock` 結構體構建立單向鏈結串列關係。`mcs_spinlock` 結構體需要幾個呢?我們知道 `spin_lock()` 期間,搶佔是關閉的,所以最多只可能存在 CPU 數量減 1 個 CPU 自旋等待,所以我們只需要 CPU 個數的 `mcs_spinlock` 結構體 (`spinlock` 可能會出現巢狀,例如 `softirq` 可以中斷行程,`hardirq` 可以中斷 `softirq`,`NMI` 可以中斷 `hardirq`。這意味著一個 CPU 可能出現 4 個不同的 spinlock 自旋等待,預期 `mcs_spinlock` 結構體需要 `4 * CPU` 數量。後面為了簡化問題不考慮單核 spinlock 巢狀情況) 這就很適合使用 `percpu` 變數。`spin` 等鎖的操作只需要將所屬自己 CPU 的 `mcs_spinlock` 結構體加入單向鏈結串列尾部,然後 `spin`,直到自己的 `mcs_spinlock` 的 `locked` 成員置 1 (`locked` 初始值是 0) 。`unlock` 的操作也很簡單,只需要將解鎖的 CPU 對應的 `mcs_spinlock` 結構體的 `next` 指標的 `locked` 成員設 1,相當於通知下一個 CPU 退出迴圈。 我們繼續以上面的 8 個 CPU 的系統為例說明。首先 CPU0 申請 spinlock 時,發現鏈結串列是空的,且鎖是釋放狀態。所以 CPU0 獲得鎖。 ```graphviz digraph MCSLock{ rankdir=LR; node[shape=record] subgraph cluster_cpu0{ penwidth=0 label="CPU0" node[shape=record]; lock [label="<n> next|<l> locked = 1"] } MCS_lock [label="MCS Lock" shape=plaintext] NULL [shape=plaintext] MCS_lock ->lock:n lock:n -> NULL } ``` CPU1 繼續申請 `spinlock`,需要 `spin` 等待。所以將 CPU1 對應的 `mcs_spinlock` 結構體加入鏈結串列尾部。然後 `spin` 等待 CPU1 對應的 `mcs_spinlock` 結構體 `locked` 成員被置 1。 ```graphviz digraph MCSLock{ rankdir=LR; node[shape=record] subgraph cluster_cpu0{ penwidth=0 label="CPU0" node[shape=record]; lock0 [label="<n> next|<l> locked = 1"] } subgraph cluster_cpu1{ penwidth=0 label="CPU1" node[shape=record]; lock1 [label="<n> next|<l> locked = 0"] } MCS_lock [label="MCS Lock" shape=plaintext] NULL [shape=plaintext] MCS_lock ->lock0:n lock0:n -> lock1:n lock1:n -> NULL } ``` 當 CPU2 繼續申請鎖時,發現鏈結串列不為空,說明有 CPU 在等待鎖。所以也將 CPU2 對應的 `mcs_spinlock` 結構體加入鏈表尾部。 ```graphviz digraph MCSLock{ rankdir=LR; node[shape=record] subgraph cluster_cpu0{ penwidth=0 label="CPU0" node[shape=record]; lock0 [label="<n> next|<l> locked = 1"] } subgraph cluster_cpu1{ penwidth=0 label="CPU1" node[shape=record]; lock1 [label="<n> next|<l> locked = 0"] } subgraph cluster_cpu2{ penwidth=0 label="CPU2" node[shape=record]; lock2 [label="<n> next|<l> locked = 0"] } MCS_lock [label="MCS Lock" shape=plaintext] NULL [shape=plaintext] MCS_lock ->lock0:n lock0:n -> lock1:n lock1:n -> lock2:n lock2:n -> NULL } ``` 當 CPU0 釋放鎖的時候,發現 CPU0 對應的 `mcs_spinlock` 結構體的 `next` 域不為 `NULL`,說明有等待的 CPU。然後將 `next` 域指向的 `mcs_spinlock` 結構體的 `locked` 成員置 1,通知下個獲得鎖的 CPU 退出自旋。`MCS lock` 頭指標可以選擇不更新,等到 CPU2 釋放鎖時再更新為 `NULL`。 ```graphviz digraph MCSLock{ rankdir=LR; node[shape=record] subgraph cluster_cpu0{ penwidth=0 label="CPU0" node[shape=record]; lock0 [label="<n> next|<l> locked = 1"] } subgraph cluster_cpu1{ penwidth=0 label="CPU1" node[shape=record]; lock1 [label="<n> next|<l> locked = 1"] } subgraph cluster_cpu2{ penwidth=0 label="CPU2" node[shape=record]; lock2 [label="<n> next|<l> locked = 0"] } MCS_lock [label="MCS Lock" shape=plaintext] NULL [shape=plaintext] MCS_lock ->lock0:n lock0:n -> lock1:n lock1:n -> lock2:n lock2:n -> NULL } ``` 以上只是一個簡單的參考,MCS lock 的具體程式碼細節可參考 [kernel/locking/mcs_spinlock.h](https://elixir.bootlin.com/linux/v5.4.33/source/kernel/locking/mcs_spinlock.h)。如何將 `mcs_spinlock` 頭指標塞進 spinlock 4 個位元組的身軀,成為實作 `qspinlock` 的關鍵。不過這部分可以參考 Linux 核心原始程式碼,掌握了其原理,看程式碼實作應該易如反掌。透過以上步驟,我們可以看到每個 CPU 都 `spin` 在自己的私有變數上,因此不會存在 `ticket spinlock` 的問題。 ### 總結 從 `wild spinlock` 到 `qspinlock`,程式碼的複雜度增加的不是一點半點,而是很多。`qspinlock` 的實作檔案足足超過 500 行,而 `wild spinlock` 僅僅幾十行。 ## 使用 Ftrace 獲取系統中的 spinlock 快照 > 以下改寫自 [如何使用 ftrace 即時取得系統中的 spinlock 快照](https://blog.csdn.net/dog250/article/details/108376624) 在這篇文章中,我給出了一個拯救 panic 的方法,其目的更多事惡作劇性質。但仍有不足,請看下面這段程式碼中的註解 ```c void stub_panic(const char *fmt, ...) { ... local_irq_enable(); // 這個時候若 current 持有自旋鎖那該怎麼辦? printk("rq:%d %d\n", preempt_count(), irqs_disabled()) __set_current_state(TASK_UNINTERRUPTIBLE); schedule(); ``` 若持有自旋鎖的 task 被 schedule 出去,由於該 task 不會再回來了,那麼只要另一個 task 在搶鎖,系統馬上就會發生 deadlock。 需求自然就出來了,我能不能在呼叫 schedule 之前便利系統目前所有自旋鎖的持有情況,將自己持有的自旋鎖給 unlock 呢? 有需求就有方案。當然可以。 大秀 hook 手藝的時候來了。雖然作為手藝人用手工的方式拼接指令可以實作任何功能,但現在的目標已經不僅僅是秀手藝了,而是實作上述的需求,所以我盡量先使用 stap/kprobe,ftrace 這些場面還不是太宏大的工具。 需求如圖所示: ```graphviz digraph { rankdir=LR; node[shape=record] spin_lock [label="spin_lock"] data [label="<1>header|<2>spin_lock|<3>tailer"] spin_lock -> data; info1 [label=統計並記錄搶鎖訊息 shape=plaintext] info2 [label=統計並記錄持鎖訊息 shape=plaintext] data:1 -> info1 [style=invis]; data:3 -> info2 [style=invis] } ``` ```graphviz digraph { rankdir=LR; node[shape=record] spin_lock [label="spin_lock"] data [label="<1>spin_lock|<2>tailer"] spin_lock -> data; info2 [label=統計並記錄釋鎖訊息 shape=plaintext] data:2 -> info2 [style=invis] } ``` 很顯然,直接的思路就是使用 kretprobe 了,撰寫以下程式碼: ```c #include <linux/module.h> #include <linux/kallsyms.h> #include <linux/kprobes.h> struct lock_owner { struct raw_spinlock *lock; struct task_struct *owner; struct task_struct *lock_owner; int idx; }; struct lock_owner plane[256]; unsigned long bitmap[4]; int lock_handler(struct kretprobe_instance *ri, struct pt_regs *regs) { struct lock_owner *owner, **pdata; struct raw_spinlock *lock = (struct raw_spinlock *)regs->di; int i = -1, retry = 10; pdata = (struct lock_owner **)&ri->data; *pdata = NULL; again: i = find_first_zero_bit(&bitmap[0], 256); if (i > 256 || test_and_set_bit(i, bitmap)) { if (retry --) goto again; goto end; } owner->idx = i; owner = &plane[i]; owner->lock = lock; owner->owner = current; owner->lock_owner = NULL; *pdata = owner; end: return 0; } int lock_ret_handler(struct kretprobe_instance *ri, struct pt_regs *regs) { struct lock_owner *owner = (struct lock_owner *)&ri->data[0]; if (owner == NULL) return 0; owner->lock_owner = current; return 0; } static struct kretprobe lock_kretprobe = { .handler = lock_ret_handler, .entry_handler = lock_handler, .data_size = sizeof(struct lock_owner *), .maxactive = 20, }; int unlock_handler(struct kretprobe_instance *ri, struct pt_regs *regs) { struct lock_owner *owner, **pdata; struct raw_spinlock *lock = (struct raw_spinlock *)regs->di; int i = -1; pdata = (struct lock_owner **)&ri->data; *pdata = NULL; for (i = 0; i < 256; i++) { owner = &plane[i]; if (test_bit(owner->idx, &bitmap[0]) && owner->owner == current && owner->lock_owner == current && owner->lock == lock && owner->idx == i) { *pdata = owner; break; } } return 0; } int unlock_ret_handler(struct kretprobe_instance *ri, struct pt_regs *regs) { struct lock_owner *owner = (struct lock_owner *)&ri->data[0]; if (owner == NULL) return 0; owner->lock = NULL; owner->owner = NULL; owner->lock_owner = NULL; owner->idx = -1; clear_bit(owner->idx, &bitmap[0]); return 0; } static struct kretprobe unlock_kretprobe = { .handler = unlock_ret_handler, .entry_handler = unlock_handler, .data_size = sizeof(struct lock_owner *), .maxactive = 20, }; static int __init lock_detect_init(void) { memset((char *)&plane[0], 0, sizeof(plane)); memset((char *)&bitmap[0], 0, sizeof(plane)); lock_kretprobe.kp.symbol_name = "_raw_spin_lock"; register_kretprobe(&lock_kretprobe); unlock_kretprobe.kp.symbol_name = "_raw_spin_unlock"; register_kretprobe(&unlock_kretprobe); return 0; } static void __exit lock_detect_exit(void) { unregister_kretprobe(&lock_kretprobe); unregister_kretprobe(&unlock_kretprobe); } module_init(lock_detect_init); module_exit(lock_detect_exit); ``` 請忽略具體的 spinlock 統計邏輯,現在僅僅關注框架,我敢說,這個玩法對於一般的 `wrap` 函數,簡直就是模板: * 這個框架可以實作幾乎一切 `wrap` 函數。 所謂的 wrap 函式其實很簡單: ```c my_function(void *param) { pre_process(param); orig_function(param); post_process(param); } ``` `kretprobe` 的處理流程如下: ```graphviz digraph kretprobe{ rankdir=LR; node[shape=record] call_orig_func [label="call orig_function"] subgraph cluster_orig_function{ penwidth=0 clusterrank=global label="orig_function" node[shape=record]; blk_orig [label="<1> jmp/call entry_handler|<2> \n\n\n\n\n original logic\n\n\n\n\n\n"] } subgraph cluster_Fix{ penwidth=0 subgraph cluster_entry_handler{ penwidth=0 clusterrank=global label="entry_handler" node[shape=record]; blk_ent [label="<1> save return address to A|<2> modify return address to int3 |<3> new self-defined header logic"] } subgraph cluster_return_handler{ clusterrank=global penwidth=0 label="\n\n\n\nreturn_handler" node[shape=record]; blk_ret [label="<1> new self-defined tailer logic|<2> return to A"] } } subgraph cluster_INT3_handler{ penwidth=0 clusterrank=global label="INT3 handler" node[shape=record]; blk_int3 [label="<1> restore return address to A|<2> call return_handler"] } call_orig_func -> blk_orig:1:nw call_orig_func:se -> blk_ret:2 [label="return to orig_function's caller", weight=4, dir=back] blk_orig:2 -> blk_int3:1:nw [label=return] blk_int3:2 -> blk_ret:1:nw [label=call] blk_orig:1 -> blk_ent:1:nw [label="jmp/call" weight=4] blk_orig:1:se -> blk_ent:3:sw [label="jmp/call to orig", dir=back] } ``` 然而,它偏偏不能用於 `_raw_spin_lock`/`_raw_spin_unlock` 的 `wrap`!因為會死鎖! * `kprobe`/`kretprobe` 框架內部使用 spinlock 來進行同步,在 `ret_handler` 執行的時候,它已經持有了該 `spinlock`,而屬於 `kretprobe` 的 `ret_handler` 本身同樣也需要該 `spinlock`,因此會 deadlock。 啦啦啦,這就是為什麼我喜歡純手工活兒的原因了,因為它可控啊! 來來來,試試 `ftrace`,相比於 `kretprobe` 而言,它更簡單,我覺得它應該沒問題。若再不行,就只能上純手工藝了。 先來試試框架,下面是一個什麼都不做的框架程式碼: ```c #include <linux/module.h> #include <linux/kallsyms.h> #include <linux/ftrace.h> struct wrap_hook { char *name; struct ftrace_ops ops; unsigned long wrap_func; unsigned long entry; }; void (*real_raw_spin_lock)(spinlock_t *lock); // _raw_spin_lock 的 wrap 函式! void my_raw_spin_lock(spinlock_t *lock) { real_raw_spin_lock(lock); } void (*real_raw_spin_unlock)(spinlock_t *lock); // _raw_spin_unlock 的 wrap 函式! void my_raw_spin_unlock(spinlock_t *lock) { real_raw_spin_unlock(lock); } struct wrap_hook spinlock_hooks[2] = { { .name = "_raw_spin_lock", .wrap_func = (unsigned long)my_raw_spin_lock, }, { .name = "_raw_spin_unlock", .wrap_func = (unsigned long)my_raw_spin_unlock, } }; // 該函式是一個匯聚器,起到將邏輯 return 到 new function 的目的。 // 採用匯聚器可以避免為兩個函式編寫兩個獨立的 hook 函式,這是設計模式應用的例子。 void stub_func(unsigned long ip, unsigned long parent_ip, struct ftrace_ops *ops, struct pt_regs *regs) { struct wrap_hook *hook = container_of(ops, struct wrap_hook, ops); // 替換 return 位址 regs->ip = hook->wrap_func; } static int wrap_spinlock_init(void) { int i; for (i = 0; i < 2; i++) { spinlock_hooks[i].ops.func = stub_func; spinlock_hooks[i].ops.flags = FTRACE_OPS_FL_SAVE_REGS | FTRACE_OPS_FL_RECURSION_SAFE; spinlock_hooks[i].entry = kallsyms_lookup_name(spinlock_hooks[i].name); if (i == 0) real_raw_spin_lock = (void *)(spinlock_hooks[i].entry + MCOUNT_INSN_SIZE); else real_raw_spin_unlock = (void *)(spinlock_hooks[i].entry + MCOUNT_INSN_SIZE); ftrace_set_filter_ip(&spinlock_hooks[i].ops, spinlock_hooks[i].entry, 0, 0); register_ftrace_function(&spinlock_hooks[i].ops); } return 0; } static void wrap_spinlock_exit(void) { int i; for (i = 0; i < 2; i++) { unregister_ftrace_function(&spinlock_hooks[i].ops); ftrace_set_filter_ip(&spinlock_hooks[i].ops, spinlock_hooks[i].entry, 1, 0); } } module_init(wrap_spinlock_init); module_exit(wrap_spinlock_exit); MODULE_LICENSE("GPL"); ``` 當我加載這個模組的時候,系統像什麼都沒有發生一樣平靜,而我用 `crash` 看 `_raw_spin_lock`/`_raw_spin_unlock` 的時候,顯然它們已經被 hook 了,這意味著,`wrap` 起作用了。 下圖展示了 `ftrace` 實作 `wrap` 的原理: ```graphviz digraph ftrace{ rankdir=LR; node[shape=record] call_orig_func [label="call orig_function"] subgraph cluster_orig_function{ penwidth=0 label="orig_function" node[shape=record]; orig_func [label="<1> call entry_handler|<2> \n\n\n\n original logic\n\n\n\n\n\n"] } subgraph cluster_ft_stub{ penwidth=0 label="ftrace_stub" node[shape=record]; ft_stub [label="1. save return address to A\n2. rebalance the stack"] } subgraph cluster_ft_hook{ penwidth=0 label="ftraece_hook" node[shape=record]; ft_hook [label="<1> modify return address|<2> return to wrap_func"] } subgraph cluster_wrap_func{ penwidth=0 clusterrank=global label="wrap_function" node[shape=record]; wrap_func [label="<1> \nnew self-defined header logic\n\n|<2> \ncall orig_function+5\n\n|<3> \nnew self-defined tailer logic\n\n"] } call_orig_func -> orig_func:1:nw orig_func:1 -> ft_stub:w ft_stub -> ft_hook:1:nw wrap_func:1 -> ft_hook:2 [dir=back] orig_func:1:se -> wrap_func:2 [label="call", dir=back] orig_func:2:se -> wrap_func:2:sw[label="return", weight=4] call_orig_func:se -> wrap_func:3:sw [label="return to orig_function's caller", dir=back] } ``` 這個已經和手工做法幾乎無異了。可以拿這個 `ftrace` 和上面的 `kretprobe` 對比一下,感受一下雷同和差異: * `kretprobe` 一般也是用來 probe 函式而不是指令,這一點和 `ftrace` 一致。 * `ftrace` 機制上更加直接,而 `kretprobe` 則更加 trick 一點。 * 若要完成一個業務需求,我選擇用 `ftrace`;若要表演,我會照抄一套 `kretprobe` 的機制;若純自己玩,我選擇純手工藝。 好了,現在把 spinlock 統計邏輯放進去,程式碼簡單,自己感受: ```c #include <linux/module.h> #include <linux/kallsyms.h> #include <linux/ftrace.h> struct wrap_hook { char *name; struct ftrace_ops ops; unsigned long wrap_func; unsigned long entry; }; struct lock_owner { int idx; spinlock_t *lock; struct task_struct *owner; struct task_struct *lock_owner; }; struct lock_owner plane[32768]; unsigned long bitmap[512]; void (*real_raw_spin_lock)(spinlock_t *lock); void (*real_raw_spin_unlock)(spinlock_t *lock); #pragma GCC optimize ("O0") void my_raw_spin_lock(spinlock_t *lock) { struct lock_owner *owner = NULL; int i = -1, retry = 0; // 通篇的 header 和 tailer 不要有 spinlock,因此即便是 printk 也不能使用,否则会造成嵌套 deadlock(畢竟把 spin_lock/unlock 给 hook 了)。 // 因此,下面實作了一個簡單的 lock 機制,使用 atomic 的 test_and_set 操作。 again: i = find_first_zero_bit(bitmap, 32768); if (test_and_set_bit(i, bitmap)) { if (retry ++ > 10) goto real; goto again; } owner = &plane[i]; owner->idx = i; owner->lock = lock; owner->owner = current; owner->lock_owner = NULL; set_bit(i, bitmap); real: barrier(); real_raw_spin_lock(lock); barrier(); if (owner) owner->lock_owner = current; } #if 0 void test() { struct lock_owner *owner = NULL; int i = -1; int cnt = 4; again: spin_lock(&maplock); i = find_first_zero_bit(bitmap, 32768); set_bit(i, bitmap); spin_unlock(&maplock); owner = &plane[i]; printk("i is:%d at :%p\n", i, owner); if (cnt --) goto again; } #endif #pragma GCC optimize ("O0") void my_raw_spin_unlock(spinlock_t *lock) { struct lock_owner *owner = NULL; int i = -1; real_raw_spin_unlock(lock); barrier(); for (i = 0; i < 32768; i++) { // 注意,這裡沒有保證 atomic,因此我的程式無法 cover 100% 要求 atomic 的場景,統計可能不準。 if (test_bit(i, bitmap) && plane[i].owner == current && plane[i].lock_owner == current && plane[i].lock == lock) { owner = &plane[i]; owner->lock = NULL; owner->owner = NULL; owner->lock_owner = NULL; owner->idx = -1; barrier(); clear_bit(i, bitmap); break; } } } #pragma GCC optimize ("O0") struct wrap_hook spinlock_hooks[2] = { { .name = "_raw_spin_lock", .wrap_func = (unsigned long)my_raw_spin_lock, }, { .name = "_raw_spin_unlock", .wrap_func = (unsigned long)my_raw_spin_unlock, } }; void stub_func(unsigned long ip, unsigned long parent_ip, struct ftrace_ops *ops, struct pt_regs *regs) { struct wrap_hook *hook = container_of(ops, struct wrap_hook, ops); regs->ip = hook->wrap_func; } #pragma GCC optimize ("O0") static int wrap_spinlock_init(void) { int i; memset((char *)plane, 0, sizeof(plane)); memset((char *)bitmap, 0, sizeof(bitmap)); // 列印出 bitmap 的地址,以便 panic_resched 模組使用! printk("plane:%p bitmap:%p\n", &plane[0], &bitmap[0]); #if 0 test(); return 0; #endif for (i = 0; i < 2; i++) { spinlock_hooks[i].ops.func = stub_func; spinlock_hooks[i].ops.flags = FTRACE_OPS_FL_SAVE_REGS|FTRACE_OPS_FL_RECURSION_SAFE; spinlock_hooks[i].entry = kallsyms_lookup_name(spinlock_hooks[i].name); if (i == 0) real_raw_spin_lock = (void *)(spinlock_hooks[i].entry + 5); else real_raw_spin_unlock = (void *)(spinlock_hooks[i].entry + 5); ftrace_set_filter_ip(&spinlock_hooks[i].ops, spinlock_hooks[i].entry, 0, 0); register_ftrace_function(&spinlock_hooks[i].ops); } return 0; } static void wrap_spinlock_exit(void) { int i; struct lock_owner *owner; #if 0 for (i = 0; i < 32768; i++) { owner = &plane[i]; if (test_bit(i, bitmap)) { if (owner->lock == NULL || owner->idx != i) continue; printk("[%d] lock:%p owner:%s[%d] lock_owner:%s[%d]\n", owner->idx, owner->lock, owner->owner?owner->owner->comm:"noone", owner->owner?owner->owner->pid:-2, owner->lock_owner?owner->lock_owner->comm:"null", owner->lock_owner?owner->lock_owner->pid:-1); } } #endif for (i = 0; i < 2; i++) { unregister_ftrace_function(&spinlock_hooks[i].ops); ftrace_set_filter_ip(&spinlock_hooks[i].ops, spinlock_hooks[i].entry, 1, 0); } } module_init(wrap_spinlock_init); module_exit(wrap_spinlock_exit); MODULE_LICENSE("GPL"); ``` 值得注意的是,`wrap` 函式中不能再呼叫任何會使用 spinlock 的函式 (避免循環巢狀) ,因此一開始用 atomics 實作一個自己的自旋鎖: ```c void lock(unsigned long *lock) { while (test_and_set_bit(0, lock)); } void unlock(unsigned long *lock) { *lock = 0; // 比 clear_bit 更帥 } ``` 然而系統很快就卡死了: * 把系統所有的 spinlock 操作全部在此處唯一的自旋鎖上串行化,不卡死才怪! 於是,只能退而求其次,採用寬鬆的約束了: * 實在沒有 slot 了,就不統計該次記錄了。 所以,我這個 spinlock 快照記錄機制,`它是不準的。` 以上就是一個簡單的 spinlock 快照機制的程式碼和說明,它可以展示: * 目前系統中有哪些 `task` 在爭搶哪一把 `spinlock`。 * 目前系統中某個 spinlock 被哪一個 `task` 所持有。 這個機制有什麼用呢? 回到本文的開頭,若想讓 `panic` 被 `schedule` 出去而不是當機,我在擔心 `current` 持有鎖怎麼辦,我希望有一個辦法讓我知道 `current` 是否持有 `spinlock`,以及持有了哪些 `spinlock`,然後將它們 `unlock`。 現在有辦法了: ```c // 藉由 lock_detect 模組 init 函式中 printk 出來的 bitmap 地址來設置。 unsigned long pbitmap; module_param(pbitmap, ulong, 0644); // 藉由 lock_detect 模組 init 函式中 printk 出來的 plane 地址來設置。 unsigned long pplane; module_param(pplane, ulong, 0644); void stub_panic(const char *fmt, ...) { int i; unsigned long *bitmap = (unsigned long *)pbitmap; struct lock_owner *plane = (struct lock_owner *)pplane; // 循環遍歷所有目前系统當事的 spinlock,解鎖 current 所持有的 spinlock。 for (i = 0; i < 32768; i++) { if (test_bit(i, bitmap) && plane[i].owner == current && plane[i].lock_owner == current && plane[i].lock && plane[i].idx == i) { printk("lock hold:%p %s %d\n", plane[i].lock, plane[i].lock_owner ? plane[i].lock_owner->comm : "aabb", plane[i].lock_owner ? plane[i].lock_owner->pid : -1); spin_unlock(plane[i].lock); } } if (preempt_count()) return; local_irq_enable(); // 安全地退場!! __set_current_state(TASK_UNINTERRUPTIBLE); schedule(); } ... ``` 強調一點,本文介紹的把戲無法揪出所有的 spinlock 狀態,因為 Linux 核心中 spinlock 的 lock/unlock 操作並不全是透過 `_raw_spin_lock`/`_raw_spin_unlock` 入口的,比如 `spin_lock_irqsave`/`spin_unlock_irqrestore` 就不是,它們有自己的入口。因此你需要把所有這些入口都用 `ftrace` hook 起來,才能全都捕捉到。