執行人: ollieni
專題解說影片
aa860630
在 Pub/Sub 運行圖中,似乎沒有一個箭頭來告知 Publisher 傳送狀況,該如何確保 Subscriber 準確接受到訊息?
Subscriber 接收到訊息後會傳送 Ack ,告知 broker 已收到訊息,另外,若是訊息沒辦法被 Subscriber 接收,broker 會將此訊息放入 dead topic 中,避免造成阻塞,也可供之後重傳或分析、處理問題。
參考資料 : dead_letter_topic
LIAO-JIAN-PENG
說明實驗三、四及修正 bug 部份,為什麼固定 subscriber 的數量是 500,其他數值是否會造成不同結果?
會使用 500 個 Subscriber 的原因是,一般在 Pub/Sub 系統中,可能會只有一個 Publisher,但是鮮少有一個 Subscriber 的情況,在經過幾次嘗試後,我認為 500 個 Subscriber 的數值比較能明顯展示出差異,但最後修正 bug 後,可以看到 CMWQ 處理的都很快,看不出差異,所以用 1 個 Subscriber 展示 Publisher 數量對於效能的影響。
steven523
當訊息無法成功發佈或接收時,Pub/Sub 系統應該如何進行錯誤處理?
有幾種處理方式,
第一個是可以設定訊息失敗需重新傳送,重試次數和間隔時間可以根據需求進行配置,要避免太頻繁,對系統造成過大負擔。
第二是 dead topic 機制,會將多次重試後仍無法成功處理的訊息放入 dead queue,集中後續處理。
第三是可以設定錯誤提醒機制,一般雲端服務都有此功能,在服務發生問題時發出警示訊息提醒管理員。
在給定的 simrupt 程式碼基礎之上,針對 Linux 核心實作 Pub/Sub 模型的模組,善用 CMWQ/workqueue,提升其並行處理的能力。除了 Pub/Sub 模型的正確性,尚要進行如此 Pub/Sub 模型的效能分析,探討訂閱者數量 vs. 執行時間在不同組態的表現,並提出改進方案。
以第一手材料闡述 Pub/Sub 模型的原理和應用場景。
持續更新中
根據 Google Cloud 的 Pub/Sub 文件,
Pub/Sub is an asynchronous and scalable messaging service that decouples services producing messages from services processing those messages.
Pub/Sub 模型是不同步更新,Publisher 只要把訊息或事件傳給 Pub/Sub 服務,就不需要管了,Pub/Sub 會把訊息或是事件傳送給相關的 Subscriber。
Pub/Sub 服務的中介平台, broker 會根據 topic 把訊息傳給訂閱 topic 的 subscriber , topic 是判斷要不要傳給 subscriber 的依據, broker 則是負責儲存訊息的角色。
以下不是第一手資料
根據 Observer vs Pub-Sub Pattern
和 Observer pattern 做比較
Observer pattern:
Observer Pattern 重視 Subject 有更新時,主動通知 Observer,且通常為一個 Subject 對多個 Observer。這是一種同步更新的方式,實現一對多的通知。
Pub/Sub 運行圖:
Pub/Sub 模式則處理資料傳遞的過程。Publisher 上傳資料到中介(broker),而不需關心資料由誰接收。此模式支持異步更新,且具有多對多的特性。
Pub/Sub 的好處是,即使 Publisher 或 Subscriber 的任一個 server 發生故障,也不會影響整體系統的運作。
在給定的 simrupt 程式碼基礎之上,針對 Linux 核心實作 Pub/Sub 模型的模組,善用 CMWQ/workqueue,提升其並行處理的能力。
Pub/Sub 模型可以有很多 Publisher 和很多 Subscriber ,很多 topic , 我先以一個 publisher、一個 topic 、 不同數量的 subscriber 來做效能量測。
使用一個 module ,內有 publisher 和 subscriber,publisher 發佈訊息到 broker, broker 再把訊息發出去給 subscriber。
使用兩個 module,一個是 publisher,專門去寫訊息到一個文件,一個是subscriber module,可以設定 subscriber 數量去讀取訊息。(測試困難)
問題 : publisher 發布訊息時應該要將內容鎖住,但不同 subscriber 讀的時候需要鎖嗎?
我認為單純讀取的話,不需要鎖住。
後續會做實驗比較有沒有鎖的效能差距。
主要函數:
broker_fn(struct work_struct *work)
:這是工作佇列的處理函數,用來將發布者的訊息放入訊息緩衝區,並喚醒訂閱者。publish_message(const char *msg)
:這個函數用來建立一個新的工作結構體,並將其放入工作佇列中。publisher_work_fn(struct work_struct *work)
:發布者的工作函數,會產生訊息並呼叫 publish_message。subscriber_work_fn(struct work_struct *work)
:訂閱者的工作函數,會等待新的訊息並計算處理時間。__init pubsub_init(void)
:模組初始化函數,建立工作佇列並初始化發布者和訂閱者的工作。__exit pubsub_exit(void)
:模組卸載函數,銷毀工作佇列並計算總時間。我使用5個 subscriber 和 2 個 publisher 為例。
可以看到每個 subscriber 都有接收到每個 publisher 發出的 3 則 messsage。
除了 Pub/Sub 模型的正確性,尚要進行如此 Pub/Sub 模型的效能分析,探討訂閱者數量 vs. 執行時間在不同組態的表現。
使用 ktime,紀錄執行時間來評估效能,這時候遇到一個問題: 時間怎麼計算才好?
我使用 python 寫測試腳本。
使用 subprocess.run
執行 insmod
、rmmod
、以及 dmesg
這些命令。
讀取 dmesg 輸出的訊息,找到我在模組中輸出的時間資訊。
用 for
迴圈去 insmod
不同數量的 subscribers,並將時間記錄起來以供後續畫圖使用。
會做單純使用 kthread 和 CMWQ 的時間比較
只有一個 Publisher,一個topic ,有不同數量的 Subscriber ,使用第一個方法計算時間。
Publisher 固定為1,Subscriber 數量由1到1000。
dmesg
最後一行,我應該要使用 grep
,只過濾我想要的訊息,再讀取時間,就能分辨兩個模組各自的執行時間。(但是同時執行的效能,不能做為比較的依據,因為會互相影響)wait_event_interruptible
CMWQ(讀取有 lock ):
CMWQ(讀取沒有 lock ):
Kthread(照順序):
Kthread(不照順序,讀取沒有 lock):
可以看到照順序的版本所需時間遠比 CMWQ 多,因為 CMWQ 有 workqueue, worker pool 的概念,可以自動根據執行狀況去分配工具,照順序的版本是輪到了才建立 thread 並且讀取時有用 lock 鎖住,多了等待和建立 thread 的時間,所以所需時間較長。
Kthread (沒有照順序,沒有 lock ),效能比 Kthread (照順序,有 lock ),提升了很多,可以看到 subscriber 多的時候,所花費的時間大致是 1/2。
CMWQ 沒有 lock 的版本也比有 lock 版本快很多。
使用第二種方法計算時間,和版本依同樣使用一個 Publisher,一個topic ,測量不同數量的 Subscriber 所花費的時間。
因為已經實驗過得知,沒有 lock 比較好,接下來都會以沒有 lock 的版本進行測試。
Kthread:
CMWQ:
可以看出 CMWQ 的效率還是比單純使用 Kthread 好,有個有趣的現象是 Kthread 的表現比較有規律,花費的時間隨著 subscriber 的數量逐漸上升。
而 CMWQ 花費的時間,在一開始還算規律,但是 subscriber 數量漸漸增加後,所花費的時間就會有高有低。
兩種計算時間的方法代表的意義和一個我心裡一直有的疑問一樣,以前上課老師都會說 : "你浪費了全班1分鐘,全班40個人,你就是浪費了40分鐘!",但是時間明明只過去1分鐘,第一種時間計算方式像是老師所說浪費40分鐘的計算方法,把所有 subscriber 花費的時間相加。第二種時間計算方式則是只看每一則訊息傳到所有 subscriber 所需的時間,只看實際所花費的時間。哪種計算方法才是正確的呢?
測量不同數量的 Publisher 的效能差距,使用第一種方法計算時間。
將 Subscriber 數量固定在 500,Publisher 數量由1到1000。
Kthread
CMWQ
測量不同數量的 Publisher 的效能差距,使用第二種方法計算時間。
Kthread
CMWQ
從實驗三實驗四中看出 publisher 數量,對於整體效能的影響比較小, subscriber 數量對於效能影響比較大。
我覺得不太合理,想到我的 buffer_size 設置太小,導致 Subscriber 還沒讀到 message , Publisher 發的訊息就將之前的覆蓋掉了,導致 message 遺失。因為 message 數量最多只能達到我設定的數值,所以大部分時間會相差不多。
我將 buffer_size 設定成 10 ,使用 5 個 publisher,每個 publisher 發 3 封 message,總訊息數 15 大於 buffer_size 10 ,可以看到有訊息遺失。
測試不同 subscriber 數量的效能差異時,因為 publisher 只有一個,所以不會造成此問題。
使用 circular buffer ,並把所有 subscriber 都讀取過的 message 從 buffer 中刪除。
這樣還不能解決問題, publisher 需要在 buffer 滿了以後,暫停發佈訊息到 buffer。
因為有對 buffer 做刪除以及再寫入的動作,使用 lock 和 atomic 操作。
修改部分 :
使用 atomic 進行同步:
使用atomic_t來處理多線程之間的同步操作,更安全地處理訊息計數和讀取計數。
atomic_t message_count = ATOMIC_INIT(0);
atomic_t read_count[BUFFER_SIZE];
使用 head 和 tail 指針:
使用 head 和 tail 指針來管理 circular buffer,取代單一的 write_index。
用等待佇列處理特定情況:
為 publisher 和 subscriber 分別使用等待佇列(publisher_queue和subscriber_queue)來更有效地處理阻塞情況,比如 buffer 滿的情況。
效能測量 :
使用 CMWQ ,對不同 publisher 數量做測量,subscriber 數量固定為 500,看起來依然沒甚麼差別。
將 subscriber 數量定為 1 後做測試:
可以看到會隨著 publisher 數量增加而運行較久。
目前的 bug:
不知道為什麼有時候模組掛載後,不會有任何動作,不會發布訊息,導致時間測量出是 0。
這個問題我目前不知道怎麼解決。
找出為何掛載後,模組有時候能運作、有時候不會動作的原因並修復
(我認為有 dead lock)
提案:
lock-free