# Linux 同步處理機制
## 概念
最大處理吞吐量自然需要並行並充分利用可用的處理資源。在高效能的並行應用中,僅允許單一執行緒在任何時刻執行的關鍵區段 (critical section,以下簡稱 CS) 相當常見。現代作業系統提供簡單的鎖定機制,以強制執行單一執行緒的限制。兩種常見的鎖類型為 spinlock 與 mutex。每種鎖都能對 CS 形成屏障,從而維護程式的正確性。然而,spinlock 與 mutex 在內部的實作方式截然不同,這在多種常見的工作負載下導致顯著的效能差異。

> 取自 [Optimizing for Workloads: Linux Spinlocks vs Mutexes](https://nathanpetersen.com/2019/02/17/optimizing-for-workloads-linux-spinlocks-vs-mutexes/)
spinlock 在單處理器且不支援搶佔式排程的作業系統上,其實沒作用,只有在多處理器環境下才有其價值。最基本的 spinlock 是所有執行緒無序搶奪同一鎖的那種,但由於所有處理器上的執行緒都在等待同一個變數,這會導致所有爭鎖處理器的快取 (cache) 被頻繁刷新。為了解決這個問題,設計者引入新的 spinlock 機制:讓每個執行緒在本地處理器的變數上自旋,避免快取的廣泛刷新。這類 spinlock 的第一個實作是陣列鎖,但陣列鎖需靜態分配固定大小,若爭鎖的執行緒較少,便造成空間浪費。於是出現第二種實作 —— 佇列鎖。佇列可依需求動態擴展,解決陣列鎖空間利用率低的問題。
雖然這類新型 spinlock 避免快取刷新,同時也減少空間浪費,但卻引發飢餓問題。排在佇列後方的執行緒,因等待時間過長,可能導致反應能力下降。為此,引入了具有超時機制的鎖,以避免執行緒長時間自旋導致系統效能降低。總的來說,只要 spinlock 的佇列過長,就有可能產生飢餓現象。那麼,有沒有辦法限制佇列長度呢?答案是肯定的,這就是「複合鎖」。複合鎖綜合超時鎖與佇列鎖的優勢,並引入新的競爭機制 —— 類似 access token 的概念。也就是說,只有獲得 access token 的執行緒才可進入佇列爭奪 spinlock。access token 數量有限,因此成功限制佇列長度。若執行緒拿到 access token 後,在佇列中等待超時,將自動放棄 access token 並拋出例外,由呼叫者決定後續處理。
然而,這樣的設計帶來一個問題:本來是爭奪 spinlock,現在變成爭奪 access token,這是否失去原意?實際上,爭奪 access token 是為了讓 spinlock 的競爭過程更公平,進而避免飢餓。既然目標是避免飢餓,那麼打破先來先服務的順序是必要的。access token 的競爭過程實際上就是一種打破先來先服務的策略,並藉由簡單的退避演算法來控制爭奪 access token 的開銷,避免對系統造成過大負擔。若單純為了解決飢餓問題,timedlock 已相當實用;但複合鎖進一步強化公平性,這種公平性正是透過打破先來先服務所實作的。
## 超市排隊與多執行緒系統的聯想
在服務台排隊問題上,超市的處理實不理想,常常發生隊頭擁塞的情況。試想,目前方結帳發生糾紛,導致後方的我們只能乾等,看著其他結帳隊伍快速前進。這種排隊方式雖然簡單,卻將「排隊排程」的任務交給顧客自己。顧客需自行判斷排哪條佇列較快,如觀察購物量、有無衣物 (卡片糾紛風險) 、是否有需稱重商品、有無打折商品、顧客年齡、收銀員熟練度等,這完全是一場賭博。
相比之下,銀行或餐廳的排隊就合理許多,顧客抽號碼,排入單一佇列,由空閒的服務台叫號,屬於「單一佇列、多服務台」的排程系統,有效避免隊頭擁塞。顧客只需持票等待,無需實體排隊,票號形成虛擬佇列,沒叫號前可自由活動。
### 可自由活動?不代表可離場
尤其在業務處理快速的情況下,一旦離開場館,剛踏出門就被叫號,只得匆忙返回,反倒不如不離開。若等待時間較長,則可完成其他任務,若收益高於體力成本,離場即有意義。
## semaphore 的聯想與實作
聯想到 semaphore。它就像是「單一佇列、多服務台」的系統,初始值即服務台數。當一執行緒獲服務,相當於服務台減一 (`down`) ,而服務完成後則服務台數加一 (`up`) 。「服務」即進入臨界區。
### semaphore的 sleep-wait 模式與性能瓶頸
問題是,Linux semaphore 採用 sleep-wait,對我的場景來說切換開銷過高,`perf` 顯示大量時間耗在 `schedule` 與 `wake_up`。頻繁切換導致 cache 失效,性能下滑。雖然 CFS 排程機制可調整 jitter,但損耗仍不可忽視。
### ticket lock 的 cache 問題與改良
Linux 的 ticket lock,所有爭鎖者與持鎖者共用同一變數,導致多處理器 cache coherence 開銷巨大。嘗試設計「本地接力 spinlock」,讓每個爭鎖者只觸碰自己專屬的變數,且避免快取行 (cache line) 衝突,釋放鎖時僅寫入下一個爭鎖者的本地變數,降低 cache coherence 需求。
### 多持鎖者的構想與 semaphore 的聯想
結合 spinlock 與超市排隊經驗,萌生出一個可讓多 CPU 同時持有鎖的 自旋佇列構想。後來發現,這不正是 semaphore?但 Linux 的 sleep-wait semaphore 不足以滿足需求,這裡需要 spin-wait 版本,因為資料包傳送非常快速。semaphore 理應適用,但不願 sleep,故採用多 spinlock 機制。結果顯示,spin-wait semaphore效果優異。
## Linux 原生 semaphore 實作簡析
為凸顯主體概念,以下簡化 semaphore 實作,忽略細節:
```c
struct semaphore {
raw_spinlock_t lock;
unsigned int count;
struct list_head wait_list;
};
struct semaphore_waiter {
struct list_head list;
struct task_struct *task;
bool up; /* 區域 CPU 變數,避免多核 cache 一致的成本 */
};
static int down(struct semaphore *sem)
{
if (likely(sem->count > 0)) {
sem->count--;
} else {
struct task_struct *task = current;
struct semaphore_waiter waiter;
list_add_tail(&waiter.list, &sem->wait_list);
waiter.task = task;
waiter.up = false;
for (;;) {
__set_task_state(task, TASK_UNINTERRUPTIBLE);
schedule();
if (waiter.up) {
return 0;
}
}
}
}
void up(struct semaphore *sem)
{
if (likely(list_empty(&sem->wait_list))) {
sem->count++;
} else {
struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list, struct semaphore_waiter, list);
list_del(&waiter->list);
waiter->up = true;
wake_up_process(waiter->task);
}
}
```
註:上述簡化程式碼忽略 semaphore 內部的 spinlock,實際情況中需防止多服務台同時操作同一等待者。
## 自旋 semaphore 實作 (spin-wait)
以下為改造版,自旋等待 semaphore:
```c
struct spin_semaphore {
unsigned int count;
struct list_head wait_list;
};
struct spin_semaphore_waiter {
struct list_head list;
struct task_struct *task;
bool up;
};
#define BEGIN_ATOMIC /* 假設由 lock 指令保護 */
#define END_ATOMIC
static int spin_down(struct spin_semaphore *sem) {
if (likely(sem->count > 0)) {
sem->count--;
} else {
struct task_struct *task = current;
struct spin_semaphore_waiter waiter;
BEGIN_ATOMIC
list_add_tail(&waiter.list, &sem->wait_list);
waiter.task = task;
waiter.up = false;
END_ATOMIC
for (;;) {
cpu_relax(); /* 自旋等待 */
if (waiter.up)
return 0;
}
}
}
void spin_up(struct spin_semaphore *sem) {
BEGIN_ATOMIC
if (likely(list_empty(&sem->wait_list))) {
sem->count++;
END_ATOMIC
} else {
struct spin_semaphore_waiter *waiter = list_first_entry(&sem->wait_list, struct spin_semaphore_waiter, list);
list_del(&waiter->list);
waiter->up = true;
END_ATOMIC
}
}
```
所有函式與結構體均加上 `spin_` 前綴,標示自旋版本的 semaphore。
以上程式碼實作出通用自旋等待佇列,而 spinlock 則是其中「單服務台」的特例。
## 鎖的開銷
鎖的開銷是巨大的,尤其是在多核處理器環境下更為明顯。
引入多處理的目的是提升並行處理能力,以增強系統效能。然而,由於共享臨界區 (critical section) 的存在,同一時間內只能允許一個執行緒存取 (特別是寫操作) ,導致本應並行執行的工作流在這裡被序列化 (serialization) 。形象地來看,這就像是一條寬闊的馬路上出現了一個瓶頸,而這種序列化是無法避免的,因為它是鎖機制的本質。問題在於,執行緒該如何應對這個瓶頸?顯然,誰都無法繞開它,關鍵在於當它們同時到達瓶頸時該怎麼辦。
一種最直接但粗暴的做法是「鬥毆式搶鎖」,這也是最簡單的自旋鎖 (spinlock) 設計方式。然而,一種更為溫和的策略則是:既然無法立即取得鎖,那就主動讓出 CPU (yield),進入睡眠狀態 (sleep) ,等待鎖釋放後再恢復執行。這就是睡眠等待 (sleep-wait) 機制,而相對應的方式則是持續忙碌等待 (spin-wait) 。問題是,執行緒本身該如何在這兩種方式之間做出最佳選擇?事實上,這種選擇的權限不在執行中的執行緒,而是在設計程式時的開發者手中。
不應簡單比較 sleep-wait 和 spin-wait 哪種效能更好,因為鎖本身就是一種效能損耗,無論哪種方式都會影響系統效能,只是影響方式不同:
- sleep-wait 的開銷來自於執行緒切換 (context switch) ,涉及暫存器、堆疊、刷新快取、MMU TLB 刷新等作業。
- spin-wait 則會浪費 CPU 週期,雖然沒有切換開銷,但在等待鎖的這段時間內無法執行任何有效工作。
為何仍然要引入 spin-wait?因為若始終使用 sleep-wait,當執行緒 A 進入睡眠,系統切換到執行緒 B,而此時鎖被釋放,系統無法保證執行緒 A 立即獲得 CPU,即使它被喚醒,仍需付出額外的切換開銷。而若鎖的釋放時間極短,那麼這樣的切換代價可能得不償失。因此,在某些情況下,持續忙碌等待 (spin-wait) 可能更為合理。
與中斷 (interrupt) 機制相比,鎖的開銷更大。若能夠透過關閉中斷 (disable interrupts) 來實作無鎖操作,那麼這是值得考慮的選項。但是,關閉中斷也會帶來副作用,例如增加系統延遲,因此需要權衡關閉中斷與鎖的開銷,評估其可行性。
### 關於自旋鎖 (Spinlock)
自 Linux 核心初始版本開始,就已經引入自旋鎖。自旋鎖的行為模式是在等待鎖的過程中「原地打轉」,這導致 CPU 週期的浪費,但這是一種必要的取捨。系統設計本質上是一場權衡 (trade-off) ,雖然不是零和博弈,但始終沒有完美方案,只能努力尋找折衷點。
若不使用自旋鎖,應該怎麼做?很顯然,選擇的方法就是將執行緒切換至其他工作,等待持鎖者釋放鎖後再喚醒。然而,這樣會帶來額外開銷:
1. 若有多個執行緒競爭同一把鎖,喚醒所有執行緒後再進行競爭,這本質上等於「鬥毆搶鎖」;
2. 即使只喚醒隊列中的第一個執行緒,執行緒切換 (context switch) 的開銷仍然存在。
因此,若忙碌等待所浪費的 CPU 週期小於兩次執行緒切換的開銷,那麼自旋等待就是合理的。這也是為什麼自旋鎖適用於短時間持有鎖的臨界區,因為:
- 自旋時間過長會導致 CPU 週期浪費過多,反而得不償失。
- 自旋鎖適用於 CPU 數量較少的環境,因為隨著 CPU 數量增加,自旋延遲會成倍增加。
### Linux 自旋鎖的歷史發展
Linux 的自旋鎖經歷二個世代的演化:
1. 第一代:無序自旋鎖 (無公平性。多個 CPU 競爭同一把鎖時,獲得鎖的機率是隨機的,受 cache 等因素影響,不一定是先嘗試獲取鎖的 CPU 先獲得鎖。
2. 第二代:Ticket 自旋鎖 (公平性機制)。透過票券機制 (ticketing) ,讓 CPU 依照排隊順序獲得鎖,避免某些 CPU 因為長期競爭失敗而導致資源飢餓 (starvation) 。
Ticket 自旋鎖的設計非常巧妙,它將一個 32 位變數拆分為高 16 位 (next ticket) 和低 16 位 (current ticket) :
- 當執行緒請求鎖時,會對 next ticket 進行原子性加 1 操作。
- 取得鎖的條件是:next ticket == current ticket,否則持續自旋等待。
- 釋放鎖時,對 current ticket 加 1,即可讓下一個排隊的執行緒獲取鎖。
這樣的設計確保了 FIFO (先進先出) 公平性,避免了第一代自旋鎖中存在的隨機爭搶問題。
### 自旋鎖的改進建議
目前在使用自旋鎖時,一般會先進行一次 try-lock 嘗試:
```c
if (spin_trylock(&lock)) {
/* 成功獲取鎖,執行臨界區 */
} else {
/* 鎖被佔用,進入等待機制 */
}
```
然而,這種 try-lock 只會返回成功或失敗,但沒有提供額外資訊。例如,它可以返回:
- 預估等待時間
- 目前等待隊列長度
這樣,呼叫者就能根據這些資訊決定是否應該:
1. 繼續自旋等待
2. 轉去執行其他任務
此外,這些統計資訊還可以用於動態改進自旋鎖,例如:
- 根據隊列長度調整 pause 指令,改進 CPU 管線 (pipeline)
- 智能適應不同負載場景,動態選擇最佳的同步策略
雖然 Linux 核心的自旋鎖機制已經相當成熟,但仍有許多可以改進的空間,尤其是在大規模多核心系統下,如何更有效率地管理 CPU 資源,將是未來鎖機制設計的關鍵方向。
## 自適應自旋鎖
很多時候,當一個行程剛進入睡眠狀態等待 mutex 時,mutex 可能已經被釋放。若能夠即時感知到 mutex 被釋放,將是最理想的情況。解決這個問題的方式之一,是使用自旋忙碌等待 (spin-waiting) 而非阻塞等待 (blocking wait),這樣的理解是否正確?
### 競態條件與自旋鎖的演進
最初,自旋鎖 (spinlock) 是為了解決頻繁睡眠與喚醒帶來的效能問題。然而,在即時系統 (real-time systems) 中,自旋鎖可能導致其他行程長時間延遲,進而影響整體吞吐量 (throughput) 。因此,為了避免這種情況,即時系統又回到了睡眠/喚醒 (sleep/wakeup) 機制,因為即時系統的首要目標不是節省開銷,而是確保執行穩定性。睡眠/喚醒機制確保了爭搶鎖的行程不會影響到其他行程,從而維持系統吞吐量。然而,以 mutex 實作的自旋鎖仍然存在一定的代價,因為它必須支援優先權繼承 (priority inheritance protocol, PIP) 或優先權提升 (priority boosting),以避免優先權反轉 (priority inversion) 與死鎖 (deadlock)。
在可搶占 (preemptive) 核心環境下,若高優先權的行程準備執行,它將會搶占低優先權行程。若低優先權行程持有某個鎖,而此鎖正被高優先權行程等待,則會導致高優先權行程被阻塞,尤其是在單 CPU 環境或行程與 CPU 綁定的情況下。此外,若此時一個中等優先權的行程介入並搶占了低優先權行程,則可能導致高優先權行程的阻塞時間過長,這在即時系統中是不可接受的。因此,Linux 核心引入優先權繼承協定,但該協定的實作需要額外的資料結構與演算法管理,這會增加系統開銷,抵消部分因睡眠/喚醒機制提升吞吐量而獲得的效益。因此,在不同方案之間取得平衡成為關鍵問題。
### Linux 核心的自適應鎖實作
Linux 核心的自適應自旋鎖設計確保:
1. 持有鎖的行程應儘速釋放鎖
2. 自旋鎖可以被搶占,但不應主動進入睡眠**
3. 當持有鎖的行程可能被搶占時,爭鎖行程應避免自旋
這段程式碼展示 Linux 核心如何判斷是否應該繼續自旋:
```c
while (waiter->spin && !lock->waiters) {
struct thread_info *owner;
owner = ACCESS_ONCE(lock->owner); /* 在此 owner 上自旋 */
if (owner && !mutex_spin_on_owner(waiter, owner))
break;
cpu_relax(); /* 讓出 CPU */
}
```
`mutex_spin_on_owner` 的邏輯如下:
```c
while (waiter->spin && !lock->waiters) {
if (lock->owner != owner) /* owner 變更,重新開始自旋 */
break;
if (task_thread_info(rq->curr) != owner) { /* owner 未運行在原 CPU,不再自旋 */
ret = 0;
break;
}
if (need_resched()) { /* 目前 CPU 上有更高優先權的行程準備執行,不再自旋 */
ret = 0;
break;
}
cpu_relax(); /* 自旋 */
}
```
這段程式碼說明 Linux 核心如何透過兩層自旋來改進鎖爭搶的過程。
## spinlock 前世今生
> 以下改寫自 [spinlock 前世今生](https://zhuanlan.zhihu.com/p/133445693)
軟體通常會極致地配合硬體,盡最大努力最佳化效能。圍繞著快取所做的最佳化非常多,而我們就以一個點作為切入,探究 spinlock 的前世今生。`spinlock` 是 Linux 核心中常見的互斥操作,適用於不可睡眠的場景,可以說是最基礎的同步操作。不知你是否研究過 spinlock 的進化史?從最初的 `wild spinlock` 到 `ticket spinlock`,再到今天的 `qspinlock`,可謂是一波三折,大起大落!那麼,`spinlock` 究竟是怎麼一步一步發生蛻變的?又是為什麼要發生這些變化?背後究竟是什麼力量推動著自身的變革呢?
Linux 核心原始程式碼就像是一部歷史,以下藉由探討快取切入來 spinlock 的前世今生,嘗試探索 spinlock 和快取之間的恩怨情仇。
### wild spinlock
首先來介紹一下這位主角,spinlock (自旋鎖) 是一種互斥的操作,用在不能進行睡眠的行程間對共享資料進行互斥的存取。任一時刻只能有一個行程能夠取得鎖,其它無法取得 spinlock 的行程則會在原地自旋,持續等待直到獲得鎖。若讓你實作一個 spinlock,會不會覺得這件事看起來好像挺簡單的?
```c
struct spinlock {
int locked;
};
void spin_lock(struct spinlock *lock)
{
while (lock->locked);
lock->locked = 1;
}
void spin_unlock(struct spinlock *lock)
{
lock->locked = 0;
}
```
看起來是不是很簡單。上面的例子中 spinlock 的 `locked` 成員是 1 代表 locked。若是 0 代表鎖是釋放狀態。`spin_lock()` 保證在不能獲得鎖的情況下一直 spin 等待。不過這裡有個小問題需要修改下,在 `spin_lock()` 中判斷 `locked` 是 0 的同時需要將 `locked` 成 1,這整個過程必須保證是 atomic 的操作。所以我們需要一道 atomic 指令。我們假設 `test_and_set()` 就是一道 atomic 指令,測試一個變數的值,然後置為 1。返回值是變數的舊值。這一系列操作是由硬體提供的 atomic 指令實作的。
```c
void spin_lock(struct spinlock *lock)
{
while (test_and_set(&lock->locked));
}
```
我們假設系統有 8 個 CPU。假設 8 個 CPU 同時申請 `spinlock`,CPU0 成功持有鎖。想一下 CPU1–CPU7 在做什麼?由於 `test_and_set` 是無條件寫入 1 的操作。所以 CPU1–CPU7 一直在寫變數。根據多核 cache coherency protocol 中的 MESI protocol,我們知道修改一個變數時,這個變數必須在快取行中,且狀態是 E 或 M。所以當 8 個 CPU 同時申請 spinlock 時會發生以下狀況。
* CPU1 寫變數時會透過 MESI 協定發送 `invalid` 訊息給其他 CPU,然後修改變數為 1。
* CPU2 修改變數前發送 `invalid` 訊息給 CPU1,然後修改變數。
* CPU3 發送 `invalid` 訊息給 CPU2,然後修改變數。
* CPU4–7 也在重複這些事情。
這就無形之中增加了頻寬壓力,使效能下降。我們既要 spin 又想避免 cache thrashing 導致的頻寬壓力和效能損失。我們知道快取行有一種狀態是 `shared`,可以在多個 CPU 之間共享,而不用發送 `invalid` 訊息,而且 CPU1–CPU7 一直在 spin,修改變數也沒有意義,只有 CPU0 `unlock` 的時候,修改才有價值。所以我們對 `spin_lock()` 修改如下。
```c
void spin_lock(struct spinlock *lock)
{
while (lock->locked || test_and_set(&lock->locked));
}
```
這樣 CPU1–CPU7 會一直保持 `Shared` 狀態,沒有 cache thrashing。這種最佳化在 Linux 核心程式碼中經常可見,例如[這裡](https://elixir.bootlin.com/linux/v5.4.33/source/include/linux/bit_spinlock.h#L16),當你看到這種奇怪的寫法時,不要覺得奇怪。當 `spin_unlock()` 時,這種 `Shared` 狀態會被打破。但是這也足以在一定程度上提升效能。
好景不常,我們發現了新的問題。某些等待的 CPU 可能會有 starvation 現象。例如,CPU1–CPU7 在原地 spin,CPU0 釋放鎖的時候,CPU1–CPU7 哪個 CPU 的 cache 先看到 `locked` 的值,就會先獲得鎖。所以不排隊的機制可能導致部分 CPU "餓死"。為了解決這個問題,我們從 `wild spinlock` 跨度到 `ticket spinlock`。
### ticket spinlock
歷史推進,我們導入 FIFO 排隊機制,按申請順序取得鎖,確保公平性。
```c
struct spinlock {
unsigned short owner;
unsigned short next;
};
void spin_lock(struct spinlock *lock)
{
unsigned short next = xadd(&lock->next, 1);
while (lock->owner != next);
}
void spin_unlock(struct spinlock *lock)
{
lock->owner++;
}
```
`spin_lock()` 中的 `xadd()` 是一個會將變數加 1 的 atomic 操作,並返回變數之前的值。在這一版實作中,我們可以確定當 `owner` 等於 `next` 時,代表鎖是釋放狀態。否則,代表鎖被其他人持有。`next` 就像是排隊拿票的機制,每來一個申請者,`next` 就加 1,代表票號;`owner` 代表目前持鎖的票號。
看起來一切都完美了,但是現實往往是殘酷的。接下來又遇到什麼問題了呢?
### ticket spinlock 的問題
我們依然以上面 8 個 CPU 的系統為例說明。CPU0–CPU7 依次申請 `spinlock`。假設初始狀態,`spinlock` 變數的結構體沒有快取到任何 CPU 的快取內。CPU0 獲取 `spinlock`,此時的 cache 狀態如下。

然後 CPU1 申請鎖,並更新 spinlock 變數。所以會 `invalid` CPU0 的 cache。接著 spinlock 變數被快取到 CPU1 的 cache,然後 CPU1 開始一直 spin,持續讀取 spinlock 的值。

接著 CPU2 申請鎖,並更新 spinlock 變數。所以會 `invalid` CPU1 的 cache,然後更新 `next` 的值。而 CPU1 又會讀取 spinlock 的值,所以 spinlock 變數對應的快取行的狀態最終是 `Shared`,並且同時快取在 CPU1 和 CPU2 的 cache 中。

後面 CPU3–CPU7 依次申請 `spinlock`,就是重複上面的操作:在更新 `next` 之前,先 `invalid` 其他 CPU 的 cache,然後其他 CPU 再從目前 CPU 獲取更新後的 spinlock 值。cache line 的狀態更新為 `Shared`,然後繼續 spin。

當 CPU0 `spin_unlock()` 的時候,會先 `invalid` CPU1–CPU7 對應的 cache line,然後更新 `owner` 的值。

CPU1–CPU7 讀取 `owner` 的值時,又會從 CPU0 獲取最新資料,並快取到各自的 cache 中。

不知道你是否發現了問題:隨著 CPU 數量的增加,總線頻寬壓力變得非常大。而且延遲也會隨之上升,效能逐漸下降。更重要的是,當 CPU0 釋放鎖後,CPU1–CPU7 中其實只有一個 CPU 能獲得鎖 —— 根本沒必要影響所有 CPU 的快取,只需要影響接下來按 FIFO 順序該獲得鎖的那個 CPU 就行。
這說明 `ticket spinlock` 並不是 scalable 的 (最初的 `wild spinlock` 也有相同問題) 。於是,歷史又往前邁出了一步。
### qspinlock
我們來到了 `qspinlock` 的時代,`qspinlock` 的出現正是為了解決 `ticket spinlock` 上述的問題。
我們先來思考一下造成這個問題的根本原因,就是每個 CPU 都在 spin 同一個共享變數 spinlock 上,導致頻寬壓力大、延遲高、效能差。所以,只要每個 CPU spin 的不是同一個變數,而是各自不同的變數,就可以有效避免這種情況。
這就需要我們換一種排隊方式,例如使用單向鏈結串列。單向鏈結串列一樣可以保證 FIFO 順序,每次解鎖時,也只需要通知鏈結串列中頭部的 CPU 即可。這,其實就是 MCS lock 的實作原理。而 `qspinlock` 的實作,就是建立在 MCS lock 的理論基礎上。
接下來,我們先探究一下 MCS lock 是如何實作的。
```c
struct mcs_spinlock {
struct mcs_spinlock *next;
int locked;
};
```
`mcs_spinlock` 中 `next` 成員就是構建單向鏈結串列的基礎,所有 `spin` 自旋的 CPU 都可以藉助 `mcs_spinlock` 結構體構建立單向鏈結串列關係。`mcs_spinlock` 結構體需要幾個呢?我們知道 `spin_lock()` 期間,搶佔是關閉的,所以最多只可能存在 CPU 數量減 1 個 CPU 自旋等待,所以我們只需要 CPU 個數的 `mcs_spinlock` 結構體 (`spinlock` 可能會出現巢狀,例如 `softirq` 可以中斷行程,`hardirq` 可以中斷 `softirq`,`NMI` 可以中斷 `hardirq`。這意味著一個 CPU 可能出現 4 個不同的 spinlock 自旋等待,預期 `mcs_spinlock` 結構體需要 `4 * CPU` 數量。後面為了簡化問題不考慮單核 spinlock 巢狀情況)
這就很適合使用 `percpu` 變數。`spin` 等鎖的操作只需要將所屬自己 CPU 的 `mcs_spinlock` 結構體加入單向鏈結串列尾部,然後 `spin`,直到自己的 `mcs_spinlock` 的 `locked` 成員置 1 (`locked` 初始值是 0) 。`unlock` 的操作也很簡單,只需要將解鎖的 CPU 對應的 `mcs_spinlock` 結構體的 `next` 指標的 `locked` 成員設 1,相當於通知下一個 CPU 退出迴圈。
我們繼續以上面的 8 個 CPU 的系統為例說明。首先 CPU0 申請 spinlock 時,發現鏈結串列是空的,且鎖是釋放狀態。所以 CPU0 獲得鎖。
```graphviz
digraph MCSLock{
rankdir=LR;
node[shape=record]
subgraph cluster_cpu0{
penwidth=0
label="CPU0"
node[shape=record];
lock [label="<n> next|<l> locked = 1"]
}
MCS_lock [label="MCS Lock" shape=plaintext]
NULL [shape=plaintext]
MCS_lock ->lock:n
lock:n -> NULL
}
```
CPU1 繼續申請 `spinlock`,需要 `spin` 等待。所以將 CPU1 對應的 `mcs_spinlock` 結構體加入鏈結串列尾部。然後 `spin` 等待 CPU1 對應的 `mcs_spinlock` 結構體 `locked` 成員被置 1。
```graphviz
digraph MCSLock{
rankdir=LR;
node[shape=record]
subgraph cluster_cpu0{
penwidth=0
label="CPU0"
node[shape=record];
lock0 [label="<n> next|<l> locked = 1"]
}
subgraph cluster_cpu1{
penwidth=0
label="CPU1"
node[shape=record];
lock1 [label="<n> next|<l> locked = 0"]
}
MCS_lock [label="MCS Lock" shape=plaintext]
NULL [shape=plaintext]
MCS_lock ->lock0:n
lock0:n -> lock1:n
lock1:n -> NULL
}
```
當 CPU2 繼續申請鎖時,發現鏈結串列不為空,說明有 CPU 在等待鎖。所以也將 CPU2 對應的 `mcs_spinlock` 結構體加入鏈表尾部。
```graphviz
digraph MCSLock{
rankdir=LR;
node[shape=record]
subgraph cluster_cpu0{
penwidth=0
label="CPU0"
node[shape=record];
lock0 [label="<n> next|<l> locked = 1"]
}
subgraph cluster_cpu1{
penwidth=0
label="CPU1"
node[shape=record];
lock1 [label="<n> next|<l> locked = 0"]
}
subgraph cluster_cpu2{
penwidth=0
label="CPU2"
node[shape=record];
lock2 [label="<n> next|<l> locked = 0"]
}
MCS_lock [label="MCS Lock" shape=plaintext]
NULL [shape=plaintext]
MCS_lock ->lock0:n
lock0:n -> lock1:n
lock1:n -> lock2:n
lock2:n -> NULL
}
```
當 CPU0 釋放鎖的時候,發現 CPU0 對應的 `mcs_spinlock` 結構體的 `next` 域不為 `NULL`,說明有等待的 CPU。然後將 `next` 域指向的 `mcs_spinlock` 結構體的 `locked` 成員置 1,通知下個獲得鎖的 CPU 退出自旋。`MCS lock` 頭指標可以選擇不更新,等到 CPU2 釋放鎖時再更新為 `NULL`。
```graphviz
digraph MCSLock{
rankdir=LR;
node[shape=record]
subgraph cluster_cpu0{
penwidth=0
label="CPU0"
node[shape=record];
lock0 [label="<n> next|<l> locked = 1"]
}
subgraph cluster_cpu1{
penwidth=0
label="CPU1"
node[shape=record];
lock1 [label="<n> next|<l> locked = 1"]
}
subgraph cluster_cpu2{
penwidth=0
label="CPU2"
node[shape=record];
lock2 [label="<n> next|<l> locked = 0"]
}
MCS_lock [label="MCS Lock" shape=plaintext]
NULL [shape=plaintext]
MCS_lock ->lock0:n
lock0:n -> lock1:n
lock1:n -> lock2:n
lock2:n -> NULL
}
```
以上只是一個簡單的參考,MCS lock 的具體程式碼細節可參考 [kernel/locking/mcs_spinlock.h](https://elixir.bootlin.com/linux/v5.4.33/source/kernel/locking/mcs_spinlock.h)。如何將 `mcs_spinlock` 頭指標塞進 spinlock 4 個位元組的身軀,成為實作 `qspinlock` 的關鍵。不過這部分可以參考 Linux 核心原始程式碼,掌握了其原理,看程式碼實作應該易如反掌。透過以上步驟,我們可以看到每個 CPU 都 `spin` 在自己的私有變數上,因此不會存在 `ticket spinlock` 的問題。
### 總結
從 `wild spinlock` 到 `qspinlock`,程式碼的複雜度增加的不是一點半點,而是很多。`qspinlock` 的實作檔案足足超過 500 行,而 `wild spinlock` 僅僅幾十行。
## 使用 Ftrace 獲取系統中的 spinlock 快照
> 以下改寫自 [如何使用 ftrace 即時取得系統中的 spinlock 快照](https://blog.csdn.net/dog250/article/details/108376624)
在這篇文章中,我給出了一個拯救 panic 的方法,其目的更多事惡作劇性質。但仍有不足,請看下面這段程式碼中的註解
```c
void stub_panic(const char *fmt, ...)
{
...
local_irq_enable();
// 這個時候若 current 持有自旋鎖那該怎麼辦?
printk("rq:%d %d\n", preempt_count(), irqs_disabled())
__set_current_state(TASK_UNINTERRUPTIBLE);
schedule();
```
若持有自旋鎖的 task 被 schedule 出去,由於該 task 不會再回來了,那麼只要另一個 task 在搶鎖,系統馬上就會發生 deadlock。
需求自然就出來了,我能不能在呼叫 schedule 之前便利系統目前所有自旋鎖的持有情況,將自己持有的自旋鎖給 unlock 呢?
有需求就有方案。當然可以。
大秀 hook 手藝的時候來了。雖然作為手藝人用手工的方式拼接指令可以實作任何功能,但現在的目標已經不僅僅是秀手藝了,而是實作上述的需求,所以我盡量先使用 stap/kprobe,ftrace 這些場面還不是太宏大的工具。
需求如圖所示:
```graphviz
digraph {
rankdir=LR;
node[shape=record]
spin_lock [label="spin_lock"]
data [label="<1>header|<2>spin_lock|<3>tailer"]
spin_lock -> data;
info1 [label=統計並記錄搶鎖訊息 shape=plaintext]
info2 [label=統計並記錄持鎖訊息 shape=plaintext]
data:1 -> info1 [style=invis];
data:3 -> info2 [style=invis]
}
```
```graphviz
digraph {
rankdir=LR;
node[shape=record]
spin_lock [label="spin_lock"]
data [label="<1>spin_lock|<2>tailer"]
spin_lock -> data;
info2 [label=統計並記錄釋鎖訊息 shape=plaintext]
data:2 -> info2 [style=invis]
}
```
很顯然,直接的思路就是使用 kretprobe 了,撰寫以下程式碼:
```c
#include <linux/module.h>
#include <linux/kallsyms.h>
#include <linux/kprobes.h>
struct lock_owner {
struct raw_spinlock *lock;
struct task_struct *owner;
struct task_struct *lock_owner;
int idx;
};
struct lock_owner plane[256];
unsigned long bitmap[4];
int lock_handler(struct kretprobe_instance *ri, struct pt_regs *regs)
{
struct lock_owner *owner, **pdata;
struct raw_spinlock *lock = (struct raw_spinlock *)regs->di;
int i = -1, retry = 10;
pdata = (struct lock_owner **)&ri->data;
*pdata = NULL;
again:
i = find_first_zero_bit(&bitmap[0], 256);
if (i > 256 || test_and_set_bit(i, bitmap)) {
if (retry --)
goto again;
goto end;
}
owner->idx = i;
owner = &plane[i];
owner->lock = lock;
owner->owner = current;
owner->lock_owner = NULL;
*pdata = owner;
end:
return 0;
}
int lock_ret_handler(struct kretprobe_instance *ri, struct pt_regs *regs)
{
struct lock_owner *owner = (struct lock_owner *)&ri->data[0];
if (owner == NULL)
return 0;
owner->lock_owner = current;
return 0;
}
static struct kretprobe lock_kretprobe = {
.handler = lock_ret_handler,
.entry_handler = lock_handler,
.data_size = sizeof(struct lock_owner *),
.maxactive = 20,
};
int unlock_handler(struct kretprobe_instance *ri, struct pt_regs *regs)
{
struct lock_owner *owner, **pdata;
struct raw_spinlock *lock = (struct raw_spinlock *)regs->di;
int i = -1;
pdata = (struct lock_owner **)&ri->data;
*pdata = NULL;
for (i = 0; i < 256; i++) {
owner = &plane[i];
if (test_bit(owner->idx, &bitmap[0]) &&
owner->owner == current &&
owner->lock_owner == current &&
owner->lock == lock &&
owner->idx == i) {
*pdata = owner;
break;
}
}
return 0;
}
int unlock_ret_handler(struct kretprobe_instance *ri, struct pt_regs *regs)
{
struct lock_owner *owner = (struct lock_owner *)&ri->data[0];
if (owner == NULL)
return 0;
owner->lock = NULL;
owner->owner = NULL;
owner->lock_owner = NULL;
owner->idx = -1;
clear_bit(owner->idx, &bitmap[0]);
return 0;
}
static struct kretprobe unlock_kretprobe = {
.handler = unlock_ret_handler,
.entry_handler = unlock_handler,
.data_size = sizeof(struct lock_owner *),
.maxactive = 20,
};
static int __init lock_detect_init(void)
{
memset((char *)&plane[0], 0, sizeof(plane));
memset((char *)&bitmap[0], 0, sizeof(plane));
lock_kretprobe.kp.symbol_name = "_raw_spin_lock";
register_kretprobe(&lock_kretprobe);
unlock_kretprobe.kp.symbol_name = "_raw_spin_unlock";
register_kretprobe(&unlock_kretprobe);
return 0;
}
static void __exit lock_detect_exit(void)
{
unregister_kretprobe(&lock_kretprobe);
unregister_kretprobe(&unlock_kretprobe);
}
module_init(lock_detect_init);
module_exit(lock_detect_exit);
```
請忽略具體的 spinlock 統計邏輯,現在僅僅關注框架,我敢說,這個玩法對於一般的 `wrap` 函數,簡直就是模板:
* 這個框架可以實作幾乎一切 `wrap` 函數。
所謂的 wrap 函式其實很簡單:
```c
my_function(void *param)
{
pre_process(param);
orig_function(param);
post_process(param);
}
```
`kretprobe` 的處理流程如下:
```graphviz
digraph kretprobe{
rankdir=LR;
node[shape=record]
call_orig_func [label="call orig_function"]
subgraph cluster_orig_function{
penwidth=0
clusterrank=global
label="orig_function"
node[shape=record];
blk_orig [label="<1> jmp/call entry_handler|<2> \n\n\n\n\n original logic\n\n\n\n\n\n"]
}
subgraph cluster_Fix{
penwidth=0
subgraph cluster_entry_handler{
penwidth=0
clusterrank=global
label="entry_handler"
node[shape=record];
blk_ent [label="<1> save return address to A|<2> modify return address to int3 |<3> new self-defined header logic"]
}
subgraph cluster_return_handler{
clusterrank=global
penwidth=0
label="\n\n\n\nreturn_handler"
node[shape=record];
blk_ret [label="<1> new self-defined tailer logic|<2> return to A"]
}
}
subgraph cluster_INT3_handler{
penwidth=0
clusterrank=global
label="INT3 handler"
node[shape=record];
blk_int3 [label="<1> restore return address to A|<2> call return_handler"]
}
call_orig_func -> blk_orig:1:nw
call_orig_func:se -> blk_ret:2 [label="return to orig_function's caller", weight=4, dir=back]
blk_orig:2 -> blk_int3:1:nw [label=return]
blk_int3:2 -> blk_ret:1:nw [label=call]
blk_orig:1 -> blk_ent:1:nw [label="jmp/call" weight=4]
blk_orig:1:se -> blk_ent:3:sw [label="jmp/call to orig", dir=back]
}
```
然而,它偏偏不能用於 `_raw_spin_lock`/`_raw_spin_unlock` 的 `wrap`!因為會死鎖!
* `kprobe`/`kretprobe` 框架內部使用 spinlock 來進行同步,在 `ret_handler` 執行的時候,它已經持有了該 `spinlock`,而屬於 `kretprobe` 的 `ret_handler` 本身同樣也需要該 `spinlock`,因此會 deadlock。
啦啦啦,這就是為什麼我喜歡純手工活兒的原因了,因為它可控啊!
來來來,試試 `ftrace`,相比於 `kretprobe` 而言,它更簡單,我覺得它應該沒問題。若再不行,就只能上純手工藝了。
先來試試框架,下面是一個什麼都不做的框架程式碼:
```c
#include <linux/module.h>
#include <linux/kallsyms.h>
#include <linux/ftrace.h>
struct wrap_hook {
char *name;
struct ftrace_ops ops;
unsigned long wrap_func;
unsigned long entry;
};
void (*real_raw_spin_lock)(spinlock_t *lock);
// _raw_spin_lock 的 wrap 函式!
void my_raw_spin_lock(spinlock_t *lock)
{
real_raw_spin_lock(lock);
}
void (*real_raw_spin_unlock)(spinlock_t *lock);
// _raw_spin_unlock 的 wrap 函式!
void my_raw_spin_unlock(spinlock_t *lock)
{
real_raw_spin_unlock(lock);
}
struct wrap_hook spinlock_hooks[2] = {
{
.name = "_raw_spin_lock",
.wrap_func = (unsigned long)my_raw_spin_lock,
},
{
.name = "_raw_spin_unlock",
.wrap_func = (unsigned long)my_raw_spin_unlock,
}
};
// 該函式是一個匯聚器,起到將邏輯 return 到 new function 的目的。
// 採用匯聚器可以避免為兩個函式編寫兩個獨立的 hook 函式,這是設計模式應用的例子。
void stub_func(unsigned long ip, unsigned long parent_ip,
struct ftrace_ops *ops, struct pt_regs *regs)
{
struct wrap_hook *hook = container_of(ops, struct wrap_hook, ops);
// 替換 return 位址
regs->ip = hook->wrap_func;
}
static int wrap_spinlock_init(void)
{
int i;
for (i = 0; i < 2; i++) {
spinlock_hooks[i].ops.func = stub_func;
spinlock_hooks[i].ops.flags = FTRACE_OPS_FL_SAVE_REGS | FTRACE_OPS_FL_RECURSION_SAFE;
spinlock_hooks[i].entry = kallsyms_lookup_name(spinlock_hooks[i].name);
if (i == 0)
real_raw_spin_lock = (void *)(spinlock_hooks[i].entry + MCOUNT_INSN_SIZE);
else
real_raw_spin_unlock = (void *)(spinlock_hooks[i].entry + MCOUNT_INSN_SIZE);
ftrace_set_filter_ip(&spinlock_hooks[i].ops, spinlock_hooks[i].entry, 0, 0);
register_ftrace_function(&spinlock_hooks[i].ops);
}
return 0;
}
static void wrap_spinlock_exit(void)
{
int i;
for (i = 0; i < 2; i++) {
unregister_ftrace_function(&spinlock_hooks[i].ops);
ftrace_set_filter_ip(&spinlock_hooks[i].ops, spinlock_hooks[i].entry, 1, 0);
}
}
module_init(wrap_spinlock_init);
module_exit(wrap_spinlock_exit);
MODULE_LICENSE("GPL");
```
當我加載這個模組的時候,系統像什麼都沒有發生一樣平靜,而我用 `crash` 看 `_raw_spin_lock`/`_raw_spin_unlock` 的時候,顯然它們已經被 hook 了,這意味著,`wrap` 起作用了。
下圖展示了 `ftrace` 實作 `wrap` 的原理:
```graphviz
digraph ftrace{
rankdir=LR;
node[shape=record]
call_orig_func [label="call orig_function"]
subgraph cluster_orig_function{
penwidth=0
label="orig_function"
node[shape=record];
orig_func [label="<1> call entry_handler|<2> \n\n\n\n original logic\n\n\n\n\n\n"]
}
subgraph cluster_ft_stub{
penwidth=0
label="ftrace_stub"
node[shape=record];
ft_stub [label="1. save return address to A\n2. rebalance the stack"]
}
subgraph cluster_ft_hook{
penwidth=0
label="ftraece_hook"
node[shape=record];
ft_hook [label="<1> modify return address|<2> return to wrap_func"]
}
subgraph cluster_wrap_func{
penwidth=0
clusterrank=global
label="wrap_function"
node[shape=record];
wrap_func [label="<1> \nnew self-defined header logic\n\n|<2> \ncall orig_function+5\n\n|<3> \nnew self-defined tailer logic\n\n"]
}
call_orig_func -> orig_func:1:nw
orig_func:1 -> ft_stub:w
ft_stub -> ft_hook:1:nw
wrap_func:1 -> ft_hook:2 [dir=back]
orig_func:1:se -> wrap_func:2 [label="call", dir=back]
orig_func:2:se -> wrap_func:2:sw[label="return", weight=4]
call_orig_func:se -> wrap_func:3:sw [label="return to orig_function's caller", dir=back]
}
```
這個已經和手工做法幾乎無異了。可以拿這個 `ftrace` 和上面的 `kretprobe` 對比一下,感受一下雷同和差異:
* `kretprobe` 一般也是用來 probe 函式而不是指令,這一點和 `ftrace` 一致。
* `ftrace` 機制上更加直接,而 `kretprobe` 則更加 trick 一點。
* 若要完成一個業務需求,我選擇用 `ftrace`;若要表演,我會照抄一套 `kretprobe` 的機制;若純自己玩,我選擇純手工藝。
好了,現在把 spinlock 統計邏輯放進去,程式碼簡單,自己感受:
```c
#include <linux/module.h>
#include <linux/kallsyms.h>
#include <linux/ftrace.h>
struct wrap_hook {
char *name;
struct ftrace_ops ops;
unsigned long wrap_func;
unsigned long entry;
};
struct lock_owner {
int idx;
spinlock_t *lock;
struct task_struct *owner;
struct task_struct *lock_owner;
};
struct lock_owner plane[32768];
unsigned long bitmap[512];
void (*real_raw_spin_lock)(spinlock_t *lock);
void (*real_raw_spin_unlock)(spinlock_t *lock);
#pragma GCC optimize ("O0")
void my_raw_spin_lock(spinlock_t *lock)
{
struct lock_owner *owner = NULL;
int i = -1, retry = 0;
// 通篇的 header 和 tailer 不要有 spinlock,因此即便是 printk 也不能使用,否则会造成嵌套 deadlock(畢竟把 spin_lock/unlock 给 hook 了)。
// 因此,下面實作了一個簡單的 lock 機制,使用 atomic 的 test_and_set 操作。
again:
i = find_first_zero_bit(bitmap, 32768);
if (test_and_set_bit(i, bitmap)) {
if (retry ++ > 10)
goto real;
goto again;
}
owner = &plane[i];
owner->idx = i;
owner->lock = lock;
owner->owner = current;
owner->lock_owner = NULL;
set_bit(i, bitmap);
real:
barrier();
real_raw_spin_lock(lock);
barrier();
if (owner)
owner->lock_owner = current;
}
#if 0
void test()
{
struct lock_owner *owner = NULL;
int i = -1;
int cnt = 4;
again:
spin_lock(&maplock);
i = find_first_zero_bit(bitmap, 32768);
set_bit(i, bitmap);
spin_unlock(&maplock);
owner = &plane[i];
printk("i is:%d at :%p\n", i, owner);
if (cnt --)
goto again;
}
#endif
#pragma GCC optimize ("O0")
void my_raw_spin_unlock(spinlock_t *lock)
{
struct lock_owner *owner = NULL;
int i = -1;
real_raw_spin_unlock(lock);
barrier();
for (i = 0; i < 32768; i++) {
// 注意,這裡沒有保證 atomic,因此我的程式無法 cover 100% 要求 atomic 的場景,統計可能不準。
if (test_bit(i, bitmap) &&
plane[i].owner == current &&
plane[i].lock_owner == current &&
plane[i].lock == lock) {
owner = &plane[i];
owner->lock = NULL;
owner->owner = NULL;
owner->lock_owner = NULL;
owner->idx = -1;
barrier();
clear_bit(i, bitmap);
break;
}
}
}
#pragma GCC optimize ("O0")
struct wrap_hook spinlock_hooks[2] = {
{
.name = "_raw_spin_lock",
.wrap_func = (unsigned long)my_raw_spin_lock,
},
{
.name = "_raw_spin_unlock",
.wrap_func = (unsigned long)my_raw_spin_unlock,
}
};
void stub_func(unsigned long ip, unsigned long parent_ip,
struct ftrace_ops *ops, struct pt_regs *regs)
{
struct wrap_hook *hook = container_of(ops, struct wrap_hook, ops);
regs->ip = hook->wrap_func;
}
#pragma GCC optimize ("O0")
static int wrap_spinlock_init(void)
{
int i;
memset((char *)plane, 0, sizeof(plane));
memset((char *)bitmap, 0, sizeof(bitmap));
// 列印出 bitmap 的地址,以便 panic_resched 模組使用!
printk("plane:%p bitmap:%p\n", &plane[0], &bitmap[0]);
#if 0
test();
return 0;
#endif
for (i = 0; i < 2; i++) {
spinlock_hooks[i].ops.func = stub_func;
spinlock_hooks[i].ops.flags = FTRACE_OPS_FL_SAVE_REGS|FTRACE_OPS_FL_RECURSION_SAFE;
spinlock_hooks[i].entry = kallsyms_lookup_name(spinlock_hooks[i].name);
if (i == 0)
real_raw_spin_lock = (void *)(spinlock_hooks[i].entry + 5);
else
real_raw_spin_unlock = (void *)(spinlock_hooks[i].entry + 5);
ftrace_set_filter_ip(&spinlock_hooks[i].ops, spinlock_hooks[i].entry, 0, 0);
register_ftrace_function(&spinlock_hooks[i].ops);
}
return 0;
}
static void wrap_spinlock_exit(void)
{
int i;
struct lock_owner *owner;
#if 0
for (i = 0; i < 32768; i++) {
owner = &plane[i];
if (test_bit(i, bitmap)) {
if (owner->lock == NULL || owner->idx != i)
continue;
printk("[%d] lock:%p owner:%s[%d] lock_owner:%s[%d]\n",
owner->idx,
owner->lock,
owner->owner?owner->owner->comm:"noone",
owner->owner?owner->owner->pid:-2,
owner->lock_owner?owner->lock_owner->comm:"null",
owner->lock_owner?owner->lock_owner->pid:-1);
}
}
#endif
for (i = 0; i < 2; i++) {
unregister_ftrace_function(&spinlock_hooks[i].ops);
ftrace_set_filter_ip(&spinlock_hooks[i].ops, spinlock_hooks[i].entry, 1, 0);
}
}
module_init(wrap_spinlock_init);
module_exit(wrap_spinlock_exit);
MODULE_LICENSE("GPL");
```
值得注意的是,`wrap` 函式中不能再呼叫任何會使用 spinlock 的函式 (避免循環巢狀) ,因此一開始用 atomics 實作一個自己的自旋鎖:
```c
void lock(unsigned long *lock)
{
while (test_and_set_bit(0, lock));
}
void unlock(unsigned long *lock)
{
*lock = 0; // 比 clear_bit 更帥
}
```
然而系統很快就卡死了:
* 把系統所有的 spinlock 操作全部在此處唯一的自旋鎖上串行化,不卡死才怪!
於是,只能退而求其次,採用寬鬆的約束了:
* 實在沒有 slot 了,就不統計該次記錄了。
所以,我這個 spinlock 快照記錄機制,`它是不準的。`
以上就是一個簡單的 spinlock 快照機制的程式碼和說明,它可以展示:
* 目前系統中有哪些 `task` 在爭搶哪一把 `spinlock`。
* 目前系統中某個 spinlock 被哪一個 `task` 所持有。
這個機制有什麼用呢?
回到本文的開頭,若想讓 `panic` 被 `schedule` 出去而不是當機,我在擔心 `current` 持有鎖怎麼辦,我希望有一個辦法讓我知道 `current` 是否持有 `spinlock`,以及持有了哪些 `spinlock`,然後將它們 `unlock`。
現在有辦法了:
```c
// 藉由 lock_detect 模組 init 函式中 printk 出來的 bitmap 地址來設置。
unsigned long pbitmap;
module_param(pbitmap, ulong, 0644);
// 藉由 lock_detect 模組 init 函式中 printk 出來的 plane 地址來設置。
unsigned long pplane;
module_param(pplane, ulong, 0644);
void stub_panic(const char *fmt, ...)
{
int i;
unsigned long *bitmap = (unsigned long *)pbitmap;
struct lock_owner *plane = (struct lock_owner *)pplane;
// 循環遍歷所有目前系统當事的 spinlock,解鎖 current 所持有的 spinlock。
for (i = 0; i < 32768; i++) {
if (test_bit(i, bitmap) &&
plane[i].owner == current &&
plane[i].lock_owner == current &&
plane[i].lock &&
plane[i].idx == i) {
printk("lock hold:%p %s %d\n",
plane[i].lock,
plane[i].lock_owner ? plane[i].lock_owner->comm : "aabb",
plane[i].lock_owner ? plane[i].lock_owner->pid : -1);
spin_unlock(plane[i].lock);
}
}
if (preempt_count())
return;
local_irq_enable();
// 安全地退場!!
__set_current_state(TASK_UNINTERRUPTIBLE);
schedule();
}
...
```
強調一點,本文介紹的把戲無法揪出所有的 spinlock 狀態,因為 Linux 核心中 spinlock 的 lock/unlock 操作並不全是透過 `_raw_spin_lock`/`_raw_spin_unlock` 入口的,比如 `spin_lock_irqsave`/`spin_unlock_irqrestore` 就不是,它們有自己的入口。因此你需要把所有這些入口都用 `ftrace` hook 起來,才能全都捕捉到。