TODO: https://hackmd.io/@sysprog/linux2024-integration # 以 simrupt 為基礎,建構 Quiz 13 提及的 Pub/Sub 模型,要用到 CMWQ/workqueue
TODO: 除了 Pub/Sub 模型的正確性,擴充 simrupt 進行效能分析 (訂閱者數量 vs. 執行時間)
熟悉 ktime, CMWQ, list_api
simrupt :
使用 CMWQ/work queue ,我在實做 pub/sub 時可以沿用,定義好我的 work_func 即可。
使用 kfifo ,kfifo 是 linux kernel 中的一個結構,是 First-In-First-Out 的,這個結構在 Single Producer Single Consumer 情況中不需要額外的 lock 維護。
我單純使用一個陣列去儲存 Publisher 發佈的訊息。
CMWQ :
CMWQ 通過多個 work queue 來管理任務,每個 work queue 可以分配給不同的處理器核心,從而充分利用多核處理器的並行計算能力。
CMWQ 提供了一個統一的接口來提交和處理任務,簡化了應用程序的開發和維護。並且系統自動管理 work queue 的創建、銷毀和調度,開發者只需關注任務的提交和執行邏輯,減少了操作上的複雜度。
Pub/Sub :
發布者與訂閱者分離: Pub/Sub 模型將發布者和訂閱者完全解耦合,發布者將消息發布到主題而不需要知道誰會接收這些消息。同樣,訂閱者訂閱主題以接收消息而不需要知道誰發布了消息。
輕鬆擴展: Pub/Sub 模型支持輕鬆擴展,可以在不影響現有系統的情況下添加新的發布者或訂閱者。
熟讀 Pub/Sub 教材,以理解 Pub/Sub 模型 :
根據系統設計入門:Pub/Sub Pattern,Pub/Sub 和觀察者模式(Observer Pattern),最大的不同就在 Pub/Sub 有一個中介平台(broker),存放 message 並識別 Publisher, Subscriber。
根據 Google Cloud 的 Pub/Sub 文件,
Pub/Sub is an asynchronous and scalable messaging service that decouples services producing messages from services processing those messages.
Pub/Sub 模型是不同步更新,Publisher 只要把訊息或事件傳給 Pub/Sub 服務,就不需要管了,Pub/Sub 會把訊息或是事件傳送給相關的 Subscriber。
Pub/Sub 服務的中介平台, broker 會根據 topic 把訊息傳給訂閱 topic 的 subscriber , topic 是判斷要不要傳給 subscriber 的依據, broker 則是負責儲存訊息的角色。
和 Observer pattern 做比較
Observer pattern:
Observer Pattern 重視 Subject 有更新時,主動通知 Observer,且通常為一個 Subject 對多個 Observer。這是一種同步更新的方式,實現一對多的通知。
Pub/Sub 運行圖:
Pub/Sub 模式則處理資料傳遞的過程。Publisher 上傳資料到中介(broker),而不需關心資料由誰接收。此模式支持異步更新,且具有多對多的特性。
Pub/Sub 的好處是,即使 Publisher 或 Subscriber 的任一個 server 發生故障,也不會影響整體系統的運作。
Pub/Sub 模型可以有很多 Publisher 和很多 Subscriber ,很多 topic , 我先以一個 publisher、一個 topic 、 不同數量的 subscriber 來做效能量測。
使用一個 module ,內有 publisher 和 subscriber,publisher 發佈訊息到 broker, broker 再把訊息發出去給 subscriber。
使用兩個 module,一個是 publisher,專門去寫訊息到一個文件,一個是subscriber module,可以設定 subscriber 數量去讀取訊息。(測試困難)
問題 : publisher 發布訊息時應該要將內容鎖住,但不同 subscriber 讀的時候需要鎖嗎?
我認為單純讀取的話,不需要鎖住。
後續會做實驗比較有沒有鎖的效能差距。
主要函數:
broker_fn(struct work_struct *work)
:這是工作佇列的處理函數,用來將發布者的訊息放入訊息緩衝區,並喚醒訂閱者。publish_message(const char *msg)
:這個函數用來建立一個新的工作結構體,並將其放入工作佇列中。publisher_work_fn(struct work_struct *work)
:發布者的工作函數,會產生訊息並呼叫 publish_message。subscriber_work_fn(struct work_struct *work)
:訂閱者的工作函數,會等待新的訊息並計算處理時間。__init pubsub_init(void)
:模組初始化函數,建立工作佇列並初始化發布者和訂閱者的工作。__exit pubsub_exit(void)
:模組卸載函數,銷毀工作佇列並計算總時間。我使用5個 subscriber 和 2 個 publisher 為例。
可以看到每個 subscriber 都有接收到每個 publisher 發出的 3 則 messsage。
使用 ktime,紀錄執行時間來評估效能,這時候遇到一個問題: 時間怎麼計算才好?
我使用 python 寫測試腳本。
使用 subprocess.run
執行 insmod
、rmmod
、以及 dmesg
這些命令。
讀取 dmesg 輸出的訊息,找到我在模組中輸出的時間資訊。
用 for
迴圈去 insmod
不同數量的 subscribers,並將時間記錄起來以供後續畫圖使用。
會做單純使用 kthread 和 CMWQ 的時間比較
只有一個 Publisher,一個topic ,有不同數量的 Subscriber ,使用第一個方法計算時間。
Publisher 固定為1,Subscriber 數量由1到1000。
dmesg
最後一行,我應該要使用 grep
,只過濾我想要的訊息,再讀取時間,就能分辨兩個模組各自的執行時間。(但是同時執行的效能,不能做為比較的依據,因為會互相影響)wait_event_interruptible
CMWQ(讀取有 lock ):
CMWQ(讀取沒有 lock ):
Kthread(照順序):
Kthread(不照順序,讀取沒有 lock):
可以看到照順序的版本所需時間遠比 CMWQ 多,因為 CMWQ 有 workqueue, worker pool 的概念,可以自動根據執行狀況去分配工具,照順序的版本是輪到了才建立 thread 並且讀取時有用 lock 鎖住,多了等待和建立 thread 的時間,所以所需時間較長。
Kthread (沒有照順序,沒有 lock ),效能比 Kthread (照順序,有 lock ),提升了很多,可以看到 subscriber 多的時候,所花費的時間大致是 1/2。
CMWQ 沒有 lock 的版本也比有 lock 版本快很多。
使用第二種方法計算時間,和版本依同樣使用一個 Publisher,一個topic ,測量不同數量的 Subscriber 所花費的時間。
因為已經實驗過得知,沒有 lock 比較好,接下來都會以沒有 lock 的版本進行測試。
Kthread:
CMWQ:
可以看出 CMWQ 的效率還是比單純使用 Kthread 好,有個有趣的現象是 Kthread 的表現比較有規律,花費的時間隨著 subscriber 的數量逐漸上升。
而 CMWQ 花費的時間,在一開始還算規律,但是 subscriber 數量漸漸增加後,所花費的時間就會有高有低。
兩種計算時間的方法代表的意義和一個我心裡一直有的疑問一樣,以前上課老師都會說 : "你浪費了全班1分鐘,全班40個人,你就是浪費了40分鐘!",但是時間明明只過去1分鐘,第一種時間計算方式像是老師所說浪費40分鐘的計算方法,把所有 subscriber 花費的時間相加。第二種時間計算方式則是只看每一則訊息傳到所有 subscriber 所需的時間,只看實際所花費的時間。哪種計算方法才是正確的呢?
測量不同數量的 Publisher 的效能差距,使用第一種方法計算時間。
將 Subscriber 數量固定在 500,Publisher 數量由1到1000。
Kthread
CMWQ
測量不同數量的 Publisher 的效能差距,使用第二種方法計算時間。
Kthread
CMWQ
從實驗三實驗四中看出 publisher 數量,對於整體效能的影響比較小, subscriber 數量對於效能影響比較大。
我覺得不太合理,想到我的 buffer_size 設置太小,導致 Subscriber 還沒讀到 message , Publisher 發的訊息就將之前的覆蓋掉了,導致 message 遺失。因為 message 數量最多只能達到我設定的數值,所以大部分時間會相差不多。
我將 buffer_size 設定成 10 ,使用 5 個 publisher,每個 publisher 發 3 封 message,總訊息數 15 大於 buffer_size 10 ,可以看到有訊息遺失。
測試不同 subscriber 數量的效能差異時,因為 publisher 只有一個,所以不會造成此問題。
使用 circular buffer ,並把所有 subscriber 都讀取過的 message 從 buffer 中刪除。
這樣還不能解決問題, publisher 需要在 buffer 滿了以後,暫停發佈訊息到 buffer。
因為有對 buffer 做刪除以及再寫入的動作,使用 lock 和 atomic 操作。
修改部分 :
使用 atomic 進行同步:
使用atomic_t來處理多線程之間的同步操作,更安全地處理訊息計數和讀取計數。
atomic_t message_count = ATOMIC_INIT(0);
atomic_t read_count[BUFFER_SIZE];
使用 head 和 tail 指針:
使用 head 和 tail 指針來管理 circular buffer,取代單一的 write_index。
用等待佇列處理特定情況:
為 publisher 和 subscriber 分別使用等待佇列(publisher_queue和subscriber_queue)來更有效地處理阻塞情況,比如 buffer 滿的情況。
效能測量 :
使用 CMWQ ,對不同 publisher 數量做測量,subscriber 數量固定為 500,看起來依然沒甚麼差別。
將 subscriber 數量定為 1 後做測試:
可以看到會隨著 publisher 數量增加而運行較久。
目前的 bug:
不知道為什麼有時候模組掛載後,不會有任何動作,不會發布訊息,導致時間測量出是 0。
這個問題我目前不知道怎麼解決。
找出為何掛載後,模組有時候能運作、有時候不會動作的原因並修復