# Linux 核心專題: Pub/Sub 模型的設計和實作 > 執行人: ollieni > [專題解說影片](https://youtu.be/5MmaK4FDQxQ) ### Reviewed by `aa860630` 在 Pub/Sub 運行圖中,似乎沒有一個箭頭來告知 Publisher 傳送狀況,該如何確保 Subscriber 準確接受到訊息? >Subscriber 接收到訊息後會傳送 Ack ,告知 broker 已收到訊息,另外,若是訊息沒辦法被 Subscriber 接收,broker 會將此訊息放入 dead topic 中,避免造成阻塞,也可供之後重傳或分析、處理問題。 參考資料 : [dead_letter_topic](https://cloud.google.com/pubsub/docs/handling-failures#dead_letter_topic) ### Reviewed by `LIAO-JIAN-PENG` 說明實驗三、四及修正 bug 部份,為什麼固定 subscriber 的數量是 500,其他數值是否會造成不同結果? >會使用 500 個 Subscriber 的原因是,一般在 Pub/Sub 系統中,可能會只有一個 Publisher,但是鮮少有一個 Subscriber 的情況,在經過幾次嘗試後,我認為 500 個 Subscriber 的數值比較能明顯展示出差異,但最後修正 bug 後,可以看到 CMWQ 處理的都很快,看不出差異,所以用 1 個 Subscriber 展示 Publisher 數量對於效能的影響。 ### Reviewed by `steven523` 當訊息無法成功發佈或接收時,Pub/Sub 系統應該如何進行錯誤處理? >有幾種處理方式, >第一個是可以設定訊息失敗需重新傳送,重試次數和間隔時間可以根據需求進行配置,要避免太頻繁,對系統造成過大負擔。 >第二是 dead topic 機制,會將多次重試後仍無法成功處理的訊息放入 dead queue,集中後續處理。 >第三是可以設定錯誤提醒機制,一般雲端服務都有此功能,在服務發生問題時發出警示訊息提醒管理員。 ## 任務簡介 在給定的 simrupt 程式碼基礎之上,針對 Linux 核心實作 Pub/Sub 模型的模組,善用 CMWQ/workqueue,提升其並行處理的能力。除了 Pub/Sub 模型的正確性,尚要進行如此 Pub/Sub 模型的效能分析,探討訂閱者數量 vs. 執行時間在不同組態的表現,並提出改進方案。 ## TODO: 探討 Pub/Sub 模型 > 以第一手材料闡述 Pub/Sub 模型的原理和應用場景。 :::info 持續更新中 ::: 根據 Google Cloud 的 [Pub/Sub 文件](https://cloud.google.com/pubsub/docs/overview), >Pub/Sub is an asynchronous and scalable messaging service that decouples services producing messages from services processing those messages. Pub/Sub 模型是不同步更新,Publisher 只要把訊息或事件傳給 Pub/Sub 服務,就不需要管了,Pub/Sub 會把訊息或是事件傳送給相關的 Subscriber。 Pub/Sub 服務的中介平台, broker 會根據 topic 把訊息傳給訂閱 topic 的 subscriber , topic 是判斷要不要傳給 subscriber 的依據, broker 則是負責儲存訊息的角色。 以下不是第一手資料 根據 [Observer vs Pub-Sub Pattern](https://www.linkedin.com/pulse/observer-vs-pub-sub-pattern-ahmed-shamim-hassan-s0yrc) 和 Observer pattern 做比較 **Observer pattern:** ```graphviz digraph{ rankdir=LR nodesep = 1.5; ranksep = 2; Subject->Observer [dir=one, label = "message"]; Observer->Subject [dir=one, label = "subscribe"]; } ``` Observer Pattern 重視 Subject 有更新時,主動通知 Observer,且通常為一個 Subject 對多個 Observer。這是一種**同步更新的方式,實現一對多的通知**。 **Pub/Sub 運行圖:** ```graphviz digraph{ rankdir=LR nodesep = 1.5; ranksep = 2; Publisher->broker [dir=one, label = "Publish message"]; Subscriber->broker [dir=one, label = "subscribe"]; broker -> Subscriber [dir=one, label = "message"]; } ``` Pub/Sub 模式則處理資料傳遞的過程。Publisher 上傳資料到中介(broker),而不需關心資料由誰接收。此模式支持**異步更新,且具有多對多的特性**。 Pub/Sub 的好處是,即使 Publisher 或 Subscriber 的任一個 server 發生故障,也不會影響整體系統的運作。 ## TODO: 建構採用 Pub/Sub 模型的核心模組 > 在給定的 simrupt 程式碼基礎之上,針對 Linux 核心實作 Pub/Sub 模型的模組,善用 CMWQ/workqueue,提升其並行處理的能力。 ## 開發構想 : Pub/Sub 模型**可以有很多 Publisher 和很多 Subscriber ,很多 topic ,** 我先以一個 publisher、一個 topic 、 不同數量的 subscriber 來做效能量測。 1. 使用一個 module ,內有 publisher 和 subscriber,publisher 發佈訊息到 broker, broker 再把訊息發出去給 subscriber。 2. 使用兩個 module,一個是 publisher,專門去寫訊息到一個文件,一個是subscriber module,可以設定 subscriber 數量去讀取訊息。(測試困難) 問題 : publisher 發布訊息時應該要將內容鎖住,但不同 subscriber 讀的時候需要鎖嗎? 我認為單純讀取的話,不需要鎖住。 後續會做實驗比較有沒有鎖的效能差距。 ## 實作: 主要函數: * `broker_fn(struct work_struct *work)`:這是工作佇列的處理函數,用來將發布者的訊息放入訊息緩衝區,並喚醒訂閱者。 * **注意 : 訊息寫入時要將 buffer 鎖起來,避免正在寫入時被 subscriber 讀取。** ```c static void broker_fn(struct work_struct *work) { struct pubsub_work *ps_work = container_of(work, struct pubsub_work, work); mutex_lock(&buffer_lock); if (message_count < BUFFER_SIZE) { strncpy(message_buffer[write_index], ps_work->message, 256); ktime_get_real_ts64(&publish_times[write_index]); write_index = (write_index + 1) % BUFFER_SIZE; message_count++; wake_up_interruptible(&subscriber_queue); } mutex_unlock(&buffer_lock); kfree(ps_work); } ``` * `publish_message(const char *msg)`:這個函數用來建立一個新的工作結構體,並將其放入工作佇列中。 ```c static void publish_message(const char *msg) { struct pubsub_work *ps_work; ps_work = kmalloc(sizeof(*ps_work), GFP_KERNEL); if (!ps_work) { pr_err("Failed to allocate memory for pubsub_work\n"); return; } strncpy(ps_work->message, msg, 256); INIT_WORK(&ps_work->work, broker_fn); queue_work(pubsub_wq, &ps_work->work); } ``` * `publisher_work_fn(struct work_struct *work)`:發布者的工作函數,會產生訊息並呼叫 publish_message。 ```c static void publisher_work_fn(struct work_struct *work) { struct pubsub_id_work *id_work = container_of(work, struct pubsub_id_work, work); int id = id_work->id; int i; kfree(id_work); for (i = 0; i < 3; i++) { char msg[32]; snprintf(msg, sizeof(msg), "Publisher %d message %d", id, i); publish_message(msg); } } ``` * `subscriber_work_fn(struct work_struct *work)`:訂閱者的工作函數,會等待新的訊息並計算處理時間。 ```c static void subscriber_work_fn(struct work_struct *work) { struct pubsub_id_work *id_work = container_of(work, struct pubsub_id_work, work); char msg[256]; int id = id_work->id; struct timespec64 receive_time, time_diff; int my_read_index = 0; kfree(id_work); while (!exit_flag) { wait_event_interruptible_timeout(subscriber_queue, message_count > my_read_index, msecs_to_jiffies(100)); if (message_count > my_read_index) { int buffer_index = (my_read_index + write_index - message_count + BUFFER_SIZE) % BUFFER_SIZE; strncpy(msg, message_buffer[buffer_index], 256); ktime_get_real_ts64(&receive_time); time_diff = timespec64_sub(receive_time, publish_times[buffer_index]); total_time_ns += timespec64_to_ns(&time_diff); pr_info("Subscriber %d received: %s, Time taken: %lld.%.9lds\n", id, msg, (long long)time_diff.tv_sec, time_diff.tv_nsec); my_read_index++; } } } ``` * `__init pubsub_init(void)`:模組初始化函數,建立工作佇列並初始化發布者和訂閱者的工作。 ```c static int __init pubsub_init(void) { int i; ktime_get_real_ts64(&overall_start_time); mutex_init(&buffer_lock); pubsub_wq = alloc_workqueue("pubsub_wq", WQ_UNBOUND, 0); if (!pubsub_wq) { pr_err("Failed to create workqueue\n"); return -ENOMEM; } for (i = 0; i < num_publishers; i++) { struct pubsub_id_work *id_work = kmalloc(sizeof(*id_work), GFP_KERNEL); if (!id_work) { pr_err("Failed to allocate memory for publisher ID %d\n", i); destroy_workqueue(pubsub_wq); return -ENOMEM; } id_work->id = i; INIT_WORK(&id_work->work, publisher_work_fn); queue_work(pubsub_wq, &id_work->work); } for (i = 0; i < num_subscribers; i++) { struct pubsub_id_work *id_work = kmalloc(sizeof(*id_work), GFP_KERNEL); if (!id_work) { pr_err("Failed to allocate memory for subscriber ID %d\n", i); destroy_workqueue(pubsub_wq); return -ENOMEM; } id_work->id = i; INIT_WORK(&id_work->work, subscriber_work_fn); queue_work(pubsub_wq, &id_work->work); } pr_info("Pub-Sub module loaded with %d subscribers and %d publishers\n", num_subscribers, num_publishers); return 0; } ``` * `__exit pubsub_exit(void)`:模組卸載函數,銷毀工作佇列並計算總時間。 ```c static void __exit pubsub_exit(void) { struct timespec64 time_diff; ktime_get_real_ts64(&overall_end_time); exit_flag = true; flush_workqueue(pubsub_wq); destroy_workqueue(pubsub_wq); time_diff = timespec64_sub(overall_end_time, overall_start_time); overall_time = timespec64_to_ns(&time_diff); pr_info("Total time taken by all subscribers: %lld.%.9lds\n", (long long)total_time_ns / 1000000000, (long)total_time_ns % 1000000000); pr_info("Overall time taken by all subscribers: %lld.%.9lds\n", (long long)overall_time / 1000000000, (long)overall_time % 1000000000); pr_info("Pub-Sub module unloaded\n"); } ``` ## 執行結果 ```shell [五 六 28 01:36:46 2024] Pub-Sub module loaded with 5 subscribers and 2 publishers [五 六 28 01:36:46 2024] Subscriber 4 received: Publisher 0 message 0, Time taken: 0.000002367s [五 六 28 01:36:46 2024] Subscriber 2 received: Publisher 0 message 0, Time taken: 0.000002641s [五 六 28 01:36:46 2024] Subscriber 1 received: Publisher 0 message 0, Time taken: 0.000003143s [五 六 28 01:36:46 2024] Subscriber 0 received: Publisher 0 message 0, Time taken: 0.000003864s [五 六 28 01:36:46 2024] Subscriber 2 received: Publisher 0 message 1, Time taken: 0.000003511s [五 六 28 01:36:46 2024] Subscriber 1 received: Publisher 0 message 1, Time taken: 0.000003704s [五 六 28 01:36:46 2024] Subscriber 3 received: Publisher 0 message 0, Time taken: 0.000005079s [五 六 28 01:36:46 2024] Subscriber 4 received: Publisher 0 message 1, Time taken: 0.000003962s [五 六 28 01:36:46 2024] Subscriber 1 received: Publisher 0 message 2, Time taken: 0.000004681s [五 六 28 01:36:46 2024] Subscriber 0 received: Publisher 0 message 1, Time taken: 0.000005636s [五 六 28 01:36:46 2024] Subscriber 4 received: Publisher 0 message 2, Time taken: 0.000005259s [五 六 28 01:36:46 2024] Subscriber 2 received: Publisher 0 message 2, Time taken: 0.000005269s [五 六 28 01:36:46 2024] Subscriber 1 received: Publisher 1 message 0, Time taken: 0.000005517s [五 六 28 01:36:46 2024] Subscriber 3 received: Publisher 0 message 1, Time taken: 0.000007413s [五 六 28 01:36:46 2024] Subscriber 4 received: Publisher 1 message 0, Time taken: 0.000005874s [五 六 28 01:36:46 2024] Subscriber 2 received: Publisher 1 message 0, Time taken: 0.000006338s [五 六 28 01:36:46 2024] Subscriber 1 received: Publisher 1 message 1, Time taken: 0.000006210s [五 六 28 01:36:46 2024] Subscriber 4 received: Publisher 1 message 1, Time taken: 0.000006860s [五 六 28 01:36:46 2024] Subscriber 0 received: Publisher 0 message 2, Time taken: 0.000008905s [五 六 28 01:36:46 2024] Subscriber 2 received: Publisher 1 message 1, Time taken: 0.000007885s [五 六 28 01:36:46 2024] Subscriber 1 received: Publisher 1 message 2, Time taken: 0.000007265s [五 六 28 01:36:46 2024] Subscriber 0 received: Publisher 1 message 0, Time taken: 0.000009909s [五 六 28 01:36:46 2024] Subscriber 4 received: Publisher 1 message 2, Time taken: 0.000008406s [五 六 28 01:36:46 2024] Subscriber 2 received: Publisher 1 message 2, Time taken: 0.000008638s [五 六 28 01:36:46 2024] Subscriber 3 received: Publisher 0 message 2, Time taken: 0.000012894s [五 六 28 01:36:46 2024] Subscriber 0 received: Publisher 1 message 1, Time taken: 0.000011922s [五 六 28 01:36:46 2024] Subscriber 0 received: Publisher 1 message 2, Time taken: 0.000012701s [五 六 28 01:36:46 2024] Subscriber 3 received: Publisher 1 message 0, Time taken: 0.000014676s [五 六 28 01:36:46 2024] Subscriber 3 received: Publisher 1 message 1, Time taken: 0.000015833s [五 六 28 01:36:46 2024] Subscriber 3 received: Publisher 1 message 2, Time taken: 0.000016599s [五 六 28 01:42:29 2024] Total time taken by all subscribers: 0.000217702s [五 六 28 01:42:29 2024] Overall time taken by all subscribers: 343.724282463s ``` 我使用5個 subscriber 和 2 個 publisher 為例。 可以看到每個 subscriber 都有接收到每個 publisher 發出的 3 則 messsage。 ## TODO: 分析 Pub/Sub 模型作為核心模組的效能表現 > 除了 Pub/Sub 模型的正確性,尚要進行如此 Pub/Sub 模型的效能分析,探討訂閱者數量 vs. 執行時間在不同組態的表現。 ## 測量效能 : 使用 ktime,紀錄執行時間來評估效能,這時候遇到一個問題: 時間怎麼計算才好? 1. 每一個 subscriber 讀取每一則訊息的時間都記錄下來,用那則訊息發布的時間到 subscriber 讀取所花費的時間,全部加總。 2. 測量整體所耗費時間,從加載模型到跑完卸載模型的時間。 3. 每一則訊息發布,到最後一個 subscriber 讀取訊息的時間,為一個訊息所耗費的時間,將全部訊息所耗費的時間加總。 我使用 python 寫測試腳本。 使用 `subprocess.run` 執行 `insmod`、`rmmod`、以及 `dmesg` 這些命令。 ```python def run_command(command): result = subprocess.run(command, shell=True, text=True, capture_output=True) return result.stdout ``` 讀取 dmesg 輸出的訊息,找到我在模組中輸出的時間資訊。 ```python def get_total_time_from_dmesg(): dmesg_output = run_command("sudo dmesg | tail -n 20") match = re.search(r'Overall time taken by all subscribers:\s+([\d\.]+)s', dmesg_output) if match: return float(match.group(1)) return None ``` 用 `for` 迴圈去 `insmod` 不同數量的 subscribers,並將時間記錄起來以供後續畫圖使用。 ```python subscribers_range = range(1, 1001) times = [] for subscribers in subscribers_range: print(f"Testing with {subscribers} subscribers...") run_command(f"make load PUBLISHERS=1 SUBSCRIBERS={subscribers}") run_command("sudo rmmod pubsub") total_time = get_total_time_from_dmesg() print(total_time) if total_time is not None and total_time < 50: times.append(total_time) else: times.append(0) ``` 會做單純使用 kthread 和 CMWQ 的時間比較 ### 實驗一: 只有一個 Publisher,一個topic ,有不同數量的 Subscriber ,使用第一個方法計算時間。 Publisher 固定為1,Subscriber 數量由1到1000。 * 實作時遇到的問題 **問題一:** 原本想要同時執行兩個模組去,進行效能量測,但是發現兩者時間一樣,原因是我的 python script 檔抓取時間是單純的抓 `dmesg` 最後一行,我應該要使用 `grep` ,只過濾我想要的訊息,再讀取時間,就能分辨兩個模組各自的執行時間。(但是同時執行的效能,不能做為比較的依據,因為會互相影響) **問題二:** 一直遇到模組無法正確卸載的狀況,發現是我的 kthread_should_stop() 這個條件沒有成立,thread 一直沒有關掉,導致在卸載模型時,會有錯誤。 **問題三:** 發現我的 Kthread 版本是完全照順序的讀取,我覺得沒有 cocurrent 的工作,也是因為我讀取訊息這件事速度很快,在下一個 thread 創建好前,這個 thread 就結束了,並且在讀取時我有用 lock 鎖住,才會看起來像是沒有 cocurrent 做事。 我改變了一下順序,先將 subscriber thread 創建好,然後使用`wait_event_interruptible` 去監控有沒有訊息被寫到 buffer 中,有訊息實再讀取,這樣會提升很多效率。 CMWQ(讀取有 lock ): ![kecho_concurrent_performance](https://hackmd.io/_uploads/BktAUeuIR.png) CMWQ(讀取沒有 lock ): ![kecho_concurrent_performance](https://hackmd.io/_uploads/r1sMYId8C.png) Kthread(照順序): ![kecho_concurrent_performance](https://hackmd.io/_uploads/Hk21PgOLC.png) Kthread(不照順序,讀取沒有 lock): ![kecho_concurrent_performance](https://hackmd.io/_uploads/BJIyAruUC.png) 可以看到照順序的版本所需時間遠比 CMWQ 多,因為 CMWQ 有 workqueue, worker pool 的概念,可以自動根據執行狀況去分配工具,照順序的版本是輪到了才建立 thread 並且讀取時有用 lock 鎖住,多了等待和建立 thread 的時間,所以所需時間較長。 Kthread (沒有照順序,沒有 lock ),效能比 Kthread (照順序,有 lock ),提升了很多,可以看到 subscriber 多的時候,所花費的時間大致是 1/2。 CMWQ 沒有 lock 的版本也比有 lock 版本快很多。 ### 實驗二: 使用第二種方法計算時間,和版本依同樣使用一個 Publisher,一個topic ,測量不同數量的 Subscriber 所花費的時間。 因為已經實驗過得知,沒有 lock 比較好,接下來都會以沒有 lock 的版本進行測試。 Kthread: ![kecho_concurrent_performance_test](https://hackmd.io/_uploads/Bk1tfrKIR.png) CMWQ: ![kecho_concurrent_performance_test](https://hackmd.io/_uploads/rk9uGrtIR.png) 可以看出 CMWQ 的效率還是比單純使用 Kthread 好,有個有趣的現象是 Kthread 的表現比較有規律,花費的時間隨著 subscriber 的數量逐漸上升。 而 CMWQ 花費的時間,在一開始還算規律,但是 subscriber 數量漸漸增加後,所花費的時間就會有高有低。 兩種計算時間的方法代表的意義和一個我心裡一直有的疑問一樣,以前上課老師都會說 : "你浪費了全班1分鐘,全班40個人,你就是浪費了40分鐘!",但是時間明明只過去1分鐘,第一種時間計算方式像是老師所說浪費40分鐘的計算方法,把所有 subscriber 花費的時間相加。第二種時間計算方式則是只看每一則訊息傳到所有 subscriber 所需的時間,只看實際所花費的時間。哪種計算方法才是正確的呢? ## 從實驗中發現實作錯誤 ### 實驗三: 測量不同數量的 Publisher 的效能差距,使用第一種方法計算時間。 將 Subscriber 數量固定在 500,Publisher 數量由1到1000。 Kthread ![kecho_concurrent_performance2](https://hackmd.io/_uploads/rk1SuYc8A.png) CMWQ ![kecho_concurrent_performance2](https://hackmd.io/_uploads/ByMBs3K80.png) ### 實驗四: 測量不同數量的 Publisher 的效能差距,使用第二種方法計算時間。 Kthread ![kecho_concurrent_performance_test2](https://hackmd.io/_uploads/BJMHOtcUC.png) CMWQ ![kecho_concurrent_performance_test2](https://hackmd.io/_uploads/B1Lrj3FLC.png) 從實驗三實驗四中看出 publisher 數量,對於整體效能的影響比較小, subscriber 數量對於效能影響比較大。 我覺得不太合理,想到我的 buffer_size 設置太小,導致 **Subscriber 還沒讀到 message , Publisher 發的訊息就將之前的覆蓋掉了**,導致 message 遺失。因為 message 數量最多只能達到我設定的數值,所以大部分時間會相差不多。 我將 buffer_size 設定成 10 ,使用 5 個 publisher,每個 publisher 發 3 封 message,總訊息數 15 大於 buffer_size 10 ,可以看到有訊息遺失。 ```shell [五 六 28 01:55:11 2024] Pub-Sub module loaded with 2 subscribers and 5 publishers [五 六 28 01:55:11 2024] Subscriber 0 received: Publisher 0 message 0, Time taken: 0.000007686s [五 六 28 01:55:11 2024] Subscriber 1 received: Publisher 0 message 0, Time taken: 0.000012225s [五 六 28 01:55:11 2024] Subscriber 0 received: Publisher 0 message 1, Time taken: 0.000009814s [五 六 28 01:55:11 2024] Subscriber 0 received: Publisher 0 message 2, Time taken: 0.000012108s [五 六 28 01:55:11 2024] Subscriber 1 received: Publisher 0 message 1, Time taken: 0.000015387s [五 六 28 01:55:11 2024] Subscriber 0 received: Publisher 1 message 0, Time taken: 0.000014315s [五 六 28 01:55:11 2024] Subscriber 1 received: Publisher 0 message 2, Time taken: 0.000018394s [五 六 28 01:55:11 2024] Subscriber 0 received: Publisher 1 message 1, Time taken: 0.000016483s [五 六 28 01:55:11 2024] Subscriber 1 received: Publisher 1 message 0, Time taken: 0.000020439s [五 六 28 01:55:11 2024] Subscriber 0 received: Publisher 1 message 2, Time taken: 0.000014714s [五 六 28 01:55:11 2024] Subscriber 1 received: Publisher 1 message 1, Time taken: 0.000022577s [五 六 28 01:55:11 2024] Subscriber 0 received: Publisher 2 message 0, Time taken: 0.000016224s [五 六 28 01:55:11 2024] Subscriber 1 received: Publisher 1 message 2, Time taken: 0.000021655s [五 六 28 01:55:11 2024] Subscriber 0 received: Publisher 2 message 1, Time taken: 0.000018250s [五 六 28 01:55:11 2024] Subscriber 0 received: Publisher 2 message 2, Time taken: 0.000020130s [五 六 28 01:55:11 2024] Subscriber 1 received: Publisher 2 message 0, Time taken: 0.000024077s [五 六 28 01:55:11 2024] Subscriber 0 received: Publisher 3 message 0, Time taken: 0.000022043s [五 六 28 01:55:11 2024] Subscriber 1 received: Publisher 2 message 1, Time taken: 0.000025915s [五 六 28 01:55:11 2024] Subscriber 1 received: Publisher 2 message 2, Time taken: 0.000027926s [五 六 28 01:55:11 2024] Subscriber 1 received: Publisher 3 message 0, Time taken: 0.000028948s ``` 測試不同 subscriber 數量的效能差異時,因為 publisher 只有一個,所以不會造成此問題。 ### 修正 bug 使用 circular buffer ,並把所有 subscriber 都讀取過的 message 從 buffer 中刪除。 這樣還不能解決問題, publisher 需要在 buffer 滿了以後,暫停發佈訊息到 buffer。 因為有對 buffer 做刪除以及再寫入的動作,使用 lock 和 atomic 操作。 修改部分 : 1. 使用 atomic 進行同步: 使用atomic_t來處理多線程之間的同步操作,更安全地處理訊息計數和讀取計數。 atomic_t message_count = ATOMIC_INIT(0); atomic_t read_count[BUFFER_SIZE]; 2. 使用 head 和 tail 指針: 使用 head 和 tail 指針來管理 circular buffer,取代單一的 write_index。 3. 用等待佇列處理特定情況: 為 publisher 和 subscriber 分別使用等待佇列(publisher_queue和subscriber_queue)來更有效地處理阻塞情況,比如 buffer 滿的情況。 效能測量 : 使用 CMWQ ,對不同 publisher 數量做測量,subscriber 數量固定為 500,看起來依然沒甚麼差別。 ![kecho_concurrent_performance_test](https://hackmd.io/_uploads/r1fAcXsU0.png) 將 subscriber 數量定為 1 後做測試: ![kecho_concurrent_performance_test3](https://hackmd.io/_uploads/ryUXiQoUR.png) 可以看到會隨著 publisher 數量增加而運行較久。 **目前的 bug:** 不知道為什麼有時候模組掛載後,不會有任何動作,不會發布訊息,導致時間測量出是 0。 這個問題我目前不知道怎麼解決。 ![kecho_concurrent_performance3](https://hackmd.io/_uploads/HyOqjQo80.png) ## 未來改進方向(持續努力中) 找出為何掛載後,模組有時候能運作、有時候不會動作的原因並修復 (我認為有 dead lock) ## TODO: 提出效能改進方案 提案: lock-free