# Linux 核心專題: Pub/Sub 模型的設計和實作
> 執行人: ollieni
> [專題解說影片](https://youtu.be/5MmaK4FDQxQ)
### Reviewed by `aa860630`
在 Pub/Sub 運行圖中,似乎沒有一個箭頭來告知 Publisher 傳送狀況,該如何確保 Subscriber 準確接受到訊息?
>Subscriber 接收到訊息後會傳送 Ack ,告知 broker 已收到訊息,另外,若是訊息沒辦法被 Subscriber 接收,broker 會將此訊息放入 dead topic 中,避免造成阻塞,也可供之後重傳或分析、處理問題。
參考資料 : [dead_letter_topic](https://cloud.google.com/pubsub/docs/handling-failures#dead_letter_topic)
### Reviewed by `LIAO-JIAN-PENG`
說明實驗三、四及修正 bug 部份,為什麼固定 subscriber 的數量是 500,其他數值是否會造成不同結果?
>會使用 500 個 Subscriber 的原因是,一般在 Pub/Sub 系統中,可能會只有一個 Publisher,但是鮮少有一個 Subscriber 的情況,在經過幾次嘗試後,我認為 500 個 Subscriber 的數值比較能明顯展示出差異,但最後修正 bug 後,可以看到 CMWQ 處理的都很快,看不出差異,所以用 1 個 Subscriber 展示 Publisher 數量對於效能的影響。
### Reviewed by `steven523`
當訊息無法成功發佈或接收時,Pub/Sub 系統應該如何進行錯誤處理?
>有幾種處理方式,
>第一個是可以設定訊息失敗需重新傳送,重試次數和間隔時間可以根據需求進行配置,要避免太頻繁,對系統造成過大負擔。
>第二是 dead topic 機制,會將多次重試後仍無法成功處理的訊息放入 dead queue,集中後續處理。
>第三是可以設定錯誤提醒機制,一般雲端服務都有此功能,在服務發生問題時發出警示訊息提醒管理員。
## 任務簡介
在給定的 simrupt 程式碼基礎之上,針對 Linux 核心實作 Pub/Sub 模型的模組,善用 CMWQ/workqueue,提升其並行處理的能力。除了 Pub/Sub 模型的正確性,尚要進行如此 Pub/Sub 模型的效能分析,探討訂閱者數量 vs. 執行時間在不同組態的表現,並提出改進方案。
## TODO: 探討 Pub/Sub 模型
> 以第一手材料闡述 Pub/Sub 模型的原理和應用場景。
:::info
持續更新中
:::
根據 Google Cloud 的 [Pub/Sub 文件](https://cloud.google.com/pubsub/docs/overview),
>Pub/Sub is an asynchronous and scalable messaging service that decouples services producing messages from services processing those messages.
Pub/Sub 模型是不同步更新,Publisher 只要把訊息或事件傳給 Pub/Sub 服務,就不需要管了,Pub/Sub 會把訊息或是事件傳送給相關的 Subscriber。
Pub/Sub 服務的中介平台, broker 會根據 topic 把訊息傳給訂閱 topic 的 subscriber , topic 是判斷要不要傳給 subscriber 的依據, broker 則是負責儲存訊息的角色。
以下不是第一手資料
根據 [Observer vs Pub-Sub Pattern](https://www.linkedin.com/pulse/observer-vs-pub-sub-pattern-ahmed-shamim-hassan-s0yrc)
和 Observer pattern 做比較
**Observer pattern:**
```graphviz
digraph{
rankdir=LR
nodesep = 1.5;
ranksep = 2;
Subject->Observer [dir=one, label = "message"];
Observer->Subject [dir=one, label = "subscribe"];
}
```
Observer Pattern 重視 Subject 有更新時,主動通知 Observer,且通常為一個 Subject 對多個 Observer。這是一種**同步更新的方式,實現一對多的通知**。
**Pub/Sub 運行圖:**
```graphviz
digraph{
rankdir=LR
nodesep = 1.5;
ranksep = 2;
Publisher->broker [dir=one, label = "Publish message"];
Subscriber->broker [dir=one, label = "subscribe"];
broker -> Subscriber [dir=one, label = "message"];
}
```
Pub/Sub 模式則處理資料傳遞的過程。Publisher 上傳資料到中介(broker),而不需關心資料由誰接收。此模式支持**異步更新,且具有多對多的特性**。
Pub/Sub 的好處是,即使 Publisher 或 Subscriber 的任一個 server 發生故障,也不會影響整體系統的運作。
## TODO: 建構採用 Pub/Sub 模型的核心模組
> 在給定的 simrupt 程式碼基礎之上,針對 Linux 核心實作 Pub/Sub 模型的模組,善用 CMWQ/workqueue,提升其並行處理的能力。
## 開發構想 :
Pub/Sub 模型**可以有很多 Publisher 和很多 Subscriber ,很多 topic ,** 我先以一個 publisher、一個 topic 、 不同數量的 subscriber 來做效能量測。
1. 使用一個 module ,內有 publisher 和 subscriber,publisher 發佈訊息到 broker, broker 再把訊息發出去給 subscriber。
2. 使用兩個 module,一個是 publisher,專門去寫訊息到一個文件,一個是subscriber module,可以設定 subscriber 數量去讀取訊息。(測試困難)
問題 : publisher 發布訊息時應該要將內容鎖住,但不同 subscriber 讀的時候需要鎖嗎?
我認為單純讀取的話,不需要鎖住。
後續會做實驗比較有沒有鎖的效能差距。
## 實作:
主要函數:
* `broker_fn(struct work_struct *work)`:這是工作佇列的處理函數,用來將發布者的訊息放入訊息緩衝區,並喚醒訂閱者。
* **注意 : 訊息寫入時要將 buffer 鎖起來,避免正在寫入時被 subscriber 讀取。**
```c
static void broker_fn(struct work_struct *work) {
struct pubsub_work *ps_work = container_of(work, struct pubsub_work, work);
mutex_lock(&buffer_lock);
if (message_count < BUFFER_SIZE) {
strncpy(message_buffer[write_index], ps_work->message, 256);
ktime_get_real_ts64(&publish_times[write_index]);
write_index = (write_index + 1) % BUFFER_SIZE;
message_count++;
wake_up_interruptible(&subscriber_queue);
}
mutex_unlock(&buffer_lock);
kfree(ps_work);
}
```
* `publish_message(const char *msg)`:這個函數用來建立一個新的工作結構體,並將其放入工作佇列中。
```c
static void publish_message(const char *msg) {
struct pubsub_work *ps_work;
ps_work = kmalloc(sizeof(*ps_work), GFP_KERNEL);
if (!ps_work) {
pr_err("Failed to allocate memory for pubsub_work\n");
return;
}
strncpy(ps_work->message, msg, 256);
INIT_WORK(&ps_work->work, broker_fn);
queue_work(pubsub_wq, &ps_work->work);
}
```
* `publisher_work_fn(struct work_struct *work)`:發布者的工作函數,會產生訊息並呼叫 publish_message。
```c
static void publisher_work_fn(struct work_struct *work) {
struct pubsub_id_work *id_work = container_of(work, struct pubsub_id_work, work);
int id = id_work->id;
int i;
kfree(id_work);
for (i = 0; i < 3; i++) {
char msg[32];
snprintf(msg, sizeof(msg), "Publisher %d message %d", id, i);
publish_message(msg);
}
}
```
* `subscriber_work_fn(struct work_struct *work)`:訂閱者的工作函數,會等待新的訊息並計算處理時間。
```c
static void subscriber_work_fn(struct work_struct *work) {
struct pubsub_id_work *id_work = container_of(work, struct pubsub_id_work, work);
char msg[256];
int id = id_work->id;
struct timespec64 receive_time, time_diff;
int my_read_index = 0;
kfree(id_work);
while (!exit_flag) {
wait_event_interruptible_timeout(subscriber_queue, message_count > my_read_index, msecs_to_jiffies(100));
if (message_count > my_read_index) {
int buffer_index = (my_read_index + write_index - message_count + BUFFER_SIZE) % BUFFER_SIZE;
strncpy(msg, message_buffer[buffer_index], 256);
ktime_get_real_ts64(&receive_time);
time_diff = timespec64_sub(receive_time, publish_times[buffer_index]);
total_time_ns += timespec64_to_ns(&time_diff);
pr_info("Subscriber %d received: %s, Time taken: %lld.%.9lds\n", id, msg,
(long long)time_diff.tv_sec, time_diff.tv_nsec);
my_read_index++;
}
}
}
```
* `__init pubsub_init(void)`:模組初始化函數,建立工作佇列並初始化發布者和訂閱者的工作。
```c
static int __init pubsub_init(void) {
int i;
ktime_get_real_ts64(&overall_start_time);
mutex_init(&buffer_lock);
pubsub_wq = alloc_workqueue("pubsub_wq", WQ_UNBOUND, 0);
if (!pubsub_wq) {
pr_err("Failed to create workqueue\n");
return -ENOMEM;
}
for (i = 0; i < num_publishers; i++) {
struct pubsub_id_work *id_work = kmalloc(sizeof(*id_work), GFP_KERNEL);
if (!id_work) {
pr_err("Failed to allocate memory for publisher ID %d\n", i);
destroy_workqueue(pubsub_wq);
return -ENOMEM;
}
id_work->id = i;
INIT_WORK(&id_work->work, publisher_work_fn);
queue_work(pubsub_wq, &id_work->work);
}
for (i = 0; i < num_subscribers; i++) {
struct pubsub_id_work *id_work = kmalloc(sizeof(*id_work), GFP_KERNEL);
if (!id_work) {
pr_err("Failed to allocate memory for subscriber ID %d\n", i);
destroy_workqueue(pubsub_wq);
return -ENOMEM;
}
id_work->id = i;
INIT_WORK(&id_work->work, subscriber_work_fn);
queue_work(pubsub_wq, &id_work->work);
}
pr_info("Pub-Sub module loaded with %d subscribers and %d publishers\n", num_subscribers, num_publishers);
return 0;
}
```
* `__exit pubsub_exit(void)`:模組卸載函數,銷毀工作佇列並計算總時間。
```c
static void __exit pubsub_exit(void) {
struct timespec64 time_diff;
ktime_get_real_ts64(&overall_end_time);
exit_flag = true;
flush_workqueue(pubsub_wq);
destroy_workqueue(pubsub_wq);
time_diff = timespec64_sub(overall_end_time, overall_start_time);
overall_time = timespec64_to_ns(&time_diff);
pr_info("Total time taken by all subscribers: %lld.%.9lds\n",
(long long)total_time_ns / 1000000000, (long)total_time_ns % 1000000000);
pr_info("Overall time taken by all subscribers: %lld.%.9lds\n",
(long long)overall_time / 1000000000, (long)overall_time % 1000000000);
pr_info("Pub-Sub module unloaded\n");
}
```
## 執行結果
```shell
[五 六 28 01:36:46 2024] Pub-Sub module loaded with 5 subscribers and 2 publishers
[五 六 28 01:36:46 2024] Subscriber 4 received: Publisher 0 message 0, Time taken: 0.000002367s
[五 六 28 01:36:46 2024] Subscriber 2 received: Publisher 0 message 0, Time taken: 0.000002641s
[五 六 28 01:36:46 2024] Subscriber 1 received: Publisher 0 message 0, Time taken: 0.000003143s
[五 六 28 01:36:46 2024] Subscriber 0 received: Publisher 0 message 0, Time taken: 0.000003864s
[五 六 28 01:36:46 2024] Subscriber 2 received: Publisher 0 message 1, Time taken: 0.000003511s
[五 六 28 01:36:46 2024] Subscriber 1 received: Publisher 0 message 1, Time taken: 0.000003704s
[五 六 28 01:36:46 2024] Subscriber 3 received: Publisher 0 message 0, Time taken: 0.000005079s
[五 六 28 01:36:46 2024] Subscriber 4 received: Publisher 0 message 1, Time taken: 0.000003962s
[五 六 28 01:36:46 2024] Subscriber 1 received: Publisher 0 message 2, Time taken: 0.000004681s
[五 六 28 01:36:46 2024] Subscriber 0 received: Publisher 0 message 1, Time taken: 0.000005636s
[五 六 28 01:36:46 2024] Subscriber 4 received: Publisher 0 message 2, Time taken: 0.000005259s
[五 六 28 01:36:46 2024] Subscriber 2 received: Publisher 0 message 2, Time taken: 0.000005269s
[五 六 28 01:36:46 2024] Subscriber 1 received: Publisher 1 message 0, Time taken: 0.000005517s
[五 六 28 01:36:46 2024] Subscriber 3 received: Publisher 0 message 1, Time taken: 0.000007413s
[五 六 28 01:36:46 2024] Subscriber 4 received: Publisher 1 message 0, Time taken: 0.000005874s
[五 六 28 01:36:46 2024] Subscriber 2 received: Publisher 1 message 0, Time taken: 0.000006338s
[五 六 28 01:36:46 2024] Subscriber 1 received: Publisher 1 message 1, Time taken: 0.000006210s
[五 六 28 01:36:46 2024] Subscriber 4 received: Publisher 1 message 1, Time taken: 0.000006860s
[五 六 28 01:36:46 2024] Subscriber 0 received: Publisher 0 message 2, Time taken: 0.000008905s
[五 六 28 01:36:46 2024] Subscriber 2 received: Publisher 1 message 1, Time taken: 0.000007885s
[五 六 28 01:36:46 2024] Subscriber 1 received: Publisher 1 message 2, Time taken: 0.000007265s
[五 六 28 01:36:46 2024] Subscriber 0 received: Publisher 1 message 0, Time taken: 0.000009909s
[五 六 28 01:36:46 2024] Subscriber 4 received: Publisher 1 message 2, Time taken: 0.000008406s
[五 六 28 01:36:46 2024] Subscriber 2 received: Publisher 1 message 2, Time taken: 0.000008638s
[五 六 28 01:36:46 2024] Subscriber 3 received: Publisher 0 message 2, Time taken: 0.000012894s
[五 六 28 01:36:46 2024] Subscriber 0 received: Publisher 1 message 1, Time taken: 0.000011922s
[五 六 28 01:36:46 2024] Subscriber 0 received: Publisher 1 message 2, Time taken: 0.000012701s
[五 六 28 01:36:46 2024] Subscriber 3 received: Publisher 1 message 0, Time taken: 0.000014676s
[五 六 28 01:36:46 2024] Subscriber 3 received: Publisher 1 message 1, Time taken: 0.000015833s
[五 六 28 01:36:46 2024] Subscriber 3 received: Publisher 1 message 2, Time taken: 0.000016599s
[五 六 28 01:42:29 2024] Total time taken by all subscribers: 0.000217702s
[五 六 28 01:42:29 2024] Overall time taken by all subscribers: 343.724282463s
```
我使用5個 subscriber 和 2 個 publisher 為例。
可以看到每個 subscriber 都有接收到每個 publisher 發出的 3 則 messsage。
## TODO: 分析 Pub/Sub 模型作為核心模組的效能表現
> 除了 Pub/Sub 模型的正確性,尚要進行如此 Pub/Sub 模型的效能分析,探討訂閱者數量 vs. 執行時間在不同組態的表現。
## 測量效能 :
使用 ktime,紀錄執行時間來評估效能,這時候遇到一個問題: 時間怎麼計算才好?
1. 每一個 subscriber 讀取每一則訊息的時間都記錄下來,用那則訊息發布的時間到 subscriber 讀取所花費的時間,全部加總。
2. 測量整體所耗費時間,從加載模型到跑完卸載模型的時間。
3. 每一則訊息發布,到最後一個 subscriber 讀取訊息的時間,為一個訊息所耗費的時間,將全部訊息所耗費的時間加總。
我使用 python 寫測試腳本。
使用 `subprocess.run` 執行 `insmod`、`rmmod`、以及 `dmesg` 這些命令。
```python
def run_command(command):
result = subprocess.run(command, shell=True, text=True, capture_output=True)
return result.stdout
```
讀取 dmesg 輸出的訊息,找到我在模組中輸出的時間資訊。
```python
def get_total_time_from_dmesg():
dmesg_output = run_command("sudo dmesg | tail -n 20")
match = re.search(r'Overall time taken by all subscribers:\s+([\d\.]+)s', dmesg_output)
if match:
return float(match.group(1))
return None
```
用 `for` 迴圈去 `insmod` 不同數量的 subscribers,並將時間記錄起來以供後續畫圖使用。
```python
subscribers_range = range(1, 1001)
times = []
for subscribers in subscribers_range:
print(f"Testing with {subscribers} subscribers...")
run_command(f"make load PUBLISHERS=1 SUBSCRIBERS={subscribers}")
run_command("sudo rmmod pubsub")
total_time = get_total_time_from_dmesg()
print(total_time)
if total_time is not None and total_time < 50:
times.append(total_time)
else:
times.append(0)
```
會做單純使用 kthread 和 CMWQ 的時間比較
### 實驗一:
只有一個 Publisher,一個topic ,有不同數量的 Subscriber ,使用第一個方法計算時間。
Publisher 固定為1,Subscriber 數量由1到1000。
* 實作時遇到的問題
**問題一:**
原本想要同時執行兩個模組去,進行效能量測,但是發現兩者時間一樣,原因是我的 python script 檔抓取時間是單純的抓 `dmesg` 最後一行,我應該要使用 `grep` ,只過濾我想要的訊息,再讀取時間,就能分辨兩個模組各自的執行時間。(但是同時執行的效能,不能做為比較的依據,因為會互相影響)
**問題二:**
一直遇到模組無法正確卸載的狀況,發現是我的 kthread_should_stop() 這個條件沒有成立,thread 一直沒有關掉,導致在卸載模型時,會有錯誤。
**問題三:**
發現我的 Kthread 版本是完全照順序的讀取,我覺得沒有 cocurrent 的工作,也是因為我讀取訊息這件事速度很快,在下一個 thread 創建好前,這個 thread 就結束了,並且在讀取時我有用 lock 鎖住,才會看起來像是沒有 cocurrent 做事。
我改變了一下順序,先將 subscriber thread 創建好,然後使用`wait_event_interruptible`
去監控有沒有訊息被寫到 buffer 中,有訊息實再讀取,這樣會提升很多效率。
CMWQ(讀取有 lock ):
![kecho_concurrent_performance](https://hackmd.io/_uploads/BktAUeuIR.png)
CMWQ(讀取沒有 lock ):
![kecho_concurrent_performance](https://hackmd.io/_uploads/r1sMYId8C.png)
Kthread(照順序):
![kecho_concurrent_performance](https://hackmd.io/_uploads/Hk21PgOLC.png)
Kthread(不照順序,讀取沒有 lock):
![kecho_concurrent_performance](https://hackmd.io/_uploads/BJIyAruUC.png)
可以看到照順序的版本所需時間遠比 CMWQ 多,因為 CMWQ 有 workqueue, worker pool 的概念,可以自動根據執行狀況去分配工具,照順序的版本是輪到了才建立 thread 並且讀取時有用 lock 鎖住,多了等待和建立 thread 的時間,所以所需時間較長。
Kthread (沒有照順序,沒有 lock ),效能比 Kthread (照順序,有 lock ),提升了很多,可以看到 subscriber 多的時候,所花費的時間大致是 1/2。
CMWQ 沒有 lock 的版本也比有 lock 版本快很多。
### 實驗二:
使用第二種方法計算時間,和版本依同樣使用一個 Publisher,一個topic ,測量不同數量的 Subscriber 所花費的時間。
因為已經實驗過得知,沒有 lock 比較好,接下來都會以沒有 lock 的版本進行測試。
Kthread:
![kecho_concurrent_performance_test](https://hackmd.io/_uploads/Bk1tfrKIR.png)
CMWQ:
![kecho_concurrent_performance_test](https://hackmd.io/_uploads/rk9uGrtIR.png)
可以看出 CMWQ 的效率還是比單純使用 Kthread 好,有個有趣的現象是 Kthread 的表現比較有規律,花費的時間隨著 subscriber 的數量逐漸上升。
而 CMWQ 花費的時間,在一開始還算規律,但是 subscriber 數量漸漸增加後,所花費的時間就會有高有低。
兩種計算時間的方法代表的意義和一個我心裡一直有的疑問一樣,以前上課老師都會說 : "你浪費了全班1分鐘,全班40個人,你就是浪費了40分鐘!",但是時間明明只過去1分鐘,第一種時間計算方式像是老師所說浪費40分鐘的計算方法,把所有 subscriber 花費的時間相加。第二種時間計算方式則是只看每一則訊息傳到所有 subscriber 所需的時間,只看實際所花費的時間。哪種計算方法才是正確的呢?
## 從實驗中發現實作錯誤
### 實驗三:
測量不同數量的 Publisher 的效能差距,使用第一種方法計算時間。
將 Subscriber 數量固定在 500,Publisher 數量由1到1000。
Kthread
![kecho_concurrent_performance2](https://hackmd.io/_uploads/rk1SuYc8A.png)
CMWQ
![kecho_concurrent_performance2](https://hackmd.io/_uploads/ByMBs3K80.png)
### 實驗四:
測量不同數量的 Publisher 的效能差距,使用第二種方法計算時間。
Kthread
![kecho_concurrent_performance_test2](https://hackmd.io/_uploads/BJMHOtcUC.png)
CMWQ
![kecho_concurrent_performance_test2](https://hackmd.io/_uploads/B1Lrj3FLC.png)
從實驗三實驗四中看出 publisher 數量,對於整體效能的影響比較小, subscriber 數量對於效能影響比較大。
我覺得不太合理,想到我的 buffer_size 設置太小,導致 **Subscriber 還沒讀到 message , Publisher 發的訊息就將之前的覆蓋掉了**,導致 message 遺失。因為 message 數量最多只能達到我設定的數值,所以大部分時間會相差不多。
我將 buffer_size 設定成 10 ,使用 5 個 publisher,每個 publisher 發 3 封 message,總訊息數 15 大於 buffer_size 10 ,可以看到有訊息遺失。
```shell
[五 六 28 01:55:11 2024] Pub-Sub module loaded with 2 subscribers and 5 publishers
[五 六 28 01:55:11 2024] Subscriber 0 received: Publisher 0 message 0, Time taken: 0.000007686s
[五 六 28 01:55:11 2024] Subscriber 1 received: Publisher 0 message 0, Time taken: 0.000012225s
[五 六 28 01:55:11 2024] Subscriber 0 received: Publisher 0 message 1, Time taken: 0.000009814s
[五 六 28 01:55:11 2024] Subscriber 0 received: Publisher 0 message 2, Time taken: 0.000012108s
[五 六 28 01:55:11 2024] Subscriber 1 received: Publisher 0 message 1, Time taken: 0.000015387s
[五 六 28 01:55:11 2024] Subscriber 0 received: Publisher 1 message 0, Time taken: 0.000014315s
[五 六 28 01:55:11 2024] Subscriber 1 received: Publisher 0 message 2, Time taken: 0.000018394s
[五 六 28 01:55:11 2024] Subscriber 0 received: Publisher 1 message 1, Time taken: 0.000016483s
[五 六 28 01:55:11 2024] Subscriber 1 received: Publisher 1 message 0, Time taken: 0.000020439s
[五 六 28 01:55:11 2024] Subscriber 0 received: Publisher 1 message 2, Time taken: 0.000014714s
[五 六 28 01:55:11 2024] Subscriber 1 received: Publisher 1 message 1, Time taken: 0.000022577s
[五 六 28 01:55:11 2024] Subscriber 0 received: Publisher 2 message 0, Time taken: 0.000016224s
[五 六 28 01:55:11 2024] Subscriber 1 received: Publisher 1 message 2, Time taken: 0.000021655s
[五 六 28 01:55:11 2024] Subscriber 0 received: Publisher 2 message 1, Time taken: 0.000018250s
[五 六 28 01:55:11 2024] Subscriber 0 received: Publisher 2 message 2, Time taken: 0.000020130s
[五 六 28 01:55:11 2024] Subscriber 1 received: Publisher 2 message 0, Time taken: 0.000024077s
[五 六 28 01:55:11 2024] Subscriber 0 received: Publisher 3 message 0, Time taken: 0.000022043s
[五 六 28 01:55:11 2024] Subscriber 1 received: Publisher 2 message 1, Time taken: 0.000025915s
[五 六 28 01:55:11 2024] Subscriber 1 received: Publisher 2 message 2, Time taken: 0.000027926s
[五 六 28 01:55:11 2024] Subscriber 1 received: Publisher 3 message 0, Time taken: 0.000028948s
```
測試不同 subscriber 數量的效能差異時,因為 publisher 只有一個,所以不會造成此問題。
### 修正 bug
使用 circular buffer ,並把所有 subscriber 都讀取過的 message 從 buffer 中刪除。
這樣還不能解決問題, publisher 需要在 buffer 滿了以後,暫停發佈訊息到 buffer。
因為有對 buffer 做刪除以及再寫入的動作,使用 lock 和 atomic 操作。
修改部分 :
1. 使用 atomic 進行同步:
使用atomic_t來處理多線程之間的同步操作,更安全地處理訊息計數和讀取計數。
atomic_t message_count = ATOMIC_INIT(0);
atomic_t read_count[BUFFER_SIZE];
2. 使用 head 和 tail 指針:
使用 head 和 tail 指針來管理 circular buffer,取代單一的 write_index。
3. 用等待佇列處理特定情況:
為 publisher 和 subscriber 分別使用等待佇列(publisher_queue和subscriber_queue)來更有效地處理阻塞情況,比如 buffer 滿的情況。
效能測量 :
使用 CMWQ ,對不同 publisher 數量做測量,subscriber 數量固定為 500,看起來依然沒甚麼差別。
![kecho_concurrent_performance_test](https://hackmd.io/_uploads/r1fAcXsU0.png)
將 subscriber 數量定為 1 後做測試:
![kecho_concurrent_performance_test3](https://hackmd.io/_uploads/ryUXiQoUR.png)
可以看到會隨著 publisher 數量增加而運行較久。
**目前的 bug:**
不知道為什麼有時候模組掛載後,不會有任何動作,不會發布訊息,導致時間測量出是 0。
這個問題我目前不知道怎麼解決。
![kecho_concurrent_performance3](https://hackmd.io/_uploads/HyOqjQo80.png)
## 未來改進方向(持續努力中)
找出為何掛載後,模組有時候能運作、有時候不會動作的原因並修復
(我認為有 dead lock)
## TODO: 提出效能改進方案
提案:
lock-free