Try   HackMD

2024 Linux 核心實作期末專題 :Pub/Sub 模擬

TODO: https://hackmd.io/@sysprog/linux2024-integration # 以 simrupt 為基礎,建構 Quiz 13 提及的 Pub/Sub 模型,要用到 CMWQ/workqueue
TODO: 除了 Pub/Sub 模型的正確性,擴充 simrupt 進行效能分析 (訂閱者數量 vs. 執行時間)

熟悉 ktime, CMWQ, list_api

解說影片連結

Prerequisite

simrupt :
使用 CMWQ/work queue ,我在實做 pub/sub 時可以沿用,定義好我的 work_func 即可。
使用 kfifo ,kfifo 是 linux kernel 中的一個結構,是 First-In-First-Out 的,這個結構在 Single Producer Single Consumer 情況中不需要額外的 lock 維護。
我單純使用一個陣列去儲存 Publisher 發佈的訊息。
CMWQ :
CMWQ 通過多個 work queue 來管理任務,每個 work queue 可以分配給不同的處理器核心,從而充分利用多核處理器的並行計算能力。
CMWQ 提供了一個統一的接口來提交和處理任務,簡化了應用程序的開發和維護。並且系統自動管理 work queue 的創建、銷毀和調度,開發者只需關注任務的提交和執行邏輯,減少了操作上的複雜度。

Pub/Sub :
發布者與訂閱者分離: Pub/Sub 模型將發布者和訂閱者完全解耦合,發布者將消息發布到主題而不需要知道誰會接收這些消息。同樣,訂閱者訂閱主題以接收消息而不需要知道誰發布了消息。
輕鬆擴展: Pub/Sub 模型支持輕鬆擴展,可以在不影響現有系統的情況下添加新的發布者或訂閱者。

熟讀 Pub/Sub 教材,以理解 Pub/Sub 模型 :
根據系統設計入門:Pub/Sub Pattern,Pub/Sub 和觀察者模式(Observer Pattern),最大的不同就在 Pub/Sub 有一個中介平台(broker),存放 message 並識別 Publisher, Subscriber。

根據 Google Cloud 的 Pub/Sub 文件

Pub/Sub is an asynchronous and scalable messaging service that decouples services producing messages from services processing those messages.

Pub/Sub 模型是不同步更新,Publisher 只要把訊息或事件傳給 Pub/Sub 服務,就不需要管了,Pub/Sub 會把訊息或是事件傳送給相關的 Subscriber。

Pub/Sub 服務的中介平台, broker 會根據 topic 把訊息傳給訂閱 topic 的 subscriber , topic 是判斷要不要傳給 subscriber 的依據, broker 則是負責儲存訊息的角色。

Pub/Sub 模型的特別之處

和 Observer pattern 做比較

Observer pattern:







%0



Subject

Subject



Observer

Observer



Subject->Observer


message



Observer->Subject


subscribe



Observer Pattern 重視 Subject 有更新時,主動通知 Observer,且通常為一個 Subject 對多個 Observer。這是一種同步更新的方式,實現一對多的通知

Pub/Sub 運行圖:







%0



Publisher

Publisher



broker

broker



Publisher->broker


Publish message



Subscriber

Subscriber



broker->Subscriber


message



Subscriber->broker


subscribe



Pub/Sub 模式則處理資料傳遞的過程。Publisher 上傳資料到中介(broker),而不需關心資料由誰接收。此模式支持異步更新,且具有多對多的特性
Pub/Sub 的好處是,即使 Publisher 或 Subscriber 的任一個 server 發生故障,也不會影響整體系統的運作。

開發構想 :

Pub/Sub 模型可以有很多 Publisher 和很多 Subscriber ,很多 topic , 我先以一個 publisher、一個 topic 、 不同數量的 subscriber 來做效能量測。

  1. 使用一個 module ,內有 publisher 和 subscriber,publisher 發佈訊息到 broker, broker 再把訊息發出去給 subscriber。

  2. 使用兩個 module,一個是 publisher,專門去寫訊息到一個文件,一個是subscriber module,可以設定 subscriber 數量去讀取訊息。(測試困難)

問題 : publisher 發布訊息時應該要將內容鎖住,但不同 subscriber 讀的時候需要鎖嗎?
我認為單純讀取的話,不需要鎖住。
後續會做實驗比較有沒有鎖的效能差距。

實作:

主要函數:

  • broker_fn(struct work_struct *work):這是工作佇列的處理函數,用來將發布者的訊息放入訊息緩衝區,並喚醒訂閱者。
  • 注意 : 訊息寫入時要將 buffer 鎖起來,避免正在寫入時被 subscriber 讀取。
static void broker_fn(struct work_struct *work) {
    struct pubsub_work *ps_work = container_of(work, struct pubsub_work, work);

    mutex_lock(&buffer_lock);
    if (message_count < BUFFER_SIZE) {
        strncpy(message_buffer[write_index], ps_work->message, 256);
        ktime_get_real_ts64(&publish_times[write_index]);
        write_index = (write_index + 1) % BUFFER_SIZE;
        message_count++;
        wake_up_interruptible(&subscriber_queue);
    }
    mutex_unlock(&buffer_lock);

    kfree(ps_work);
}
  • publish_message(const char *msg):這個函數用來建立一個新的工作結構體,並將其放入工作佇列中。
static void publish_message(const char *msg) {
    struct pubsub_work *ps_work;

    ps_work = kmalloc(sizeof(*ps_work), GFP_KERNEL);
    if (!ps_work) {
        pr_err("Failed to allocate memory for pubsub_work\n");
        return;
    }

    strncpy(ps_work->message, msg, 256);
    INIT_WORK(&ps_work->work, broker_fn);
    queue_work(pubsub_wq, &ps_work->work);
}
  • publisher_work_fn(struct work_struct *work):發布者的工作函數,會產生訊息並呼叫 publish_message。
static void publisher_work_fn(struct work_struct *work) {
    struct pubsub_id_work *id_work = container_of(work, struct pubsub_id_work, work);
    int id = id_work->id;
    int i;

    kfree(id_work);

    for (i = 0; i < 3; i++) {
        char msg[32];
        snprintf(msg, sizeof(msg), "Publisher %d message %d", id, i);
        publish_message(msg);
    }
}
  • subscriber_work_fn(struct work_struct *work):訂閱者的工作函數,會等待新的訊息並計算處理時間。
static void subscriber_work_fn(struct work_struct *work) {
    struct pubsub_id_work *id_work = container_of(work, struct pubsub_id_work, work);
    char msg[256];
    int id = id_work->id;
    struct timespec64 receive_time, time_diff;
    int my_read_index = 0;

    kfree(id_work);
    while (!exit_flag) {
        wait_event_interruptible_timeout(subscriber_queue, message_count > my_read_index, msecs_to_jiffies(100));

        if (message_count > my_read_index) {
            int buffer_index = (my_read_index + write_index - message_count + BUFFER_SIZE) % BUFFER_SIZE;
            strncpy(msg, message_buffer[buffer_index], 256);
            ktime_get_real_ts64(&receive_time);
            time_diff = timespec64_sub(receive_time, publish_times[buffer_index]);
            total_time_ns += timespec64_to_ns(&time_diff);
            pr_info("Subscriber %d received: %s, Time taken: %lld.%.9lds\n", id, msg,
                    (long long)time_diff.tv_sec, time_diff.tv_nsec);
            my_read_index++;
        }
    }
}
  • __init pubsub_init(void):模組初始化函數,建立工作佇列並初始化發布者和訂閱者的工作。
static int __init pubsub_init(void) {
    int i;
    ktime_get_real_ts64(&overall_start_time);
    mutex_init(&buffer_lock);
    pubsub_wq = alloc_workqueue("pubsub_wq", WQ_UNBOUND, 0);
    if (!pubsub_wq) {
        pr_err("Failed to create workqueue\n");
        return -ENOMEM;
    }

    for (i = 0; i < num_publishers; i++) {
        struct pubsub_id_work *id_work = kmalloc(sizeof(*id_work), GFP_KERNEL);
        if (!id_work) {
            pr_err("Failed to allocate memory for publisher ID %d\n", i);
            destroy_workqueue(pubsub_wq);
            return -ENOMEM;
        }
        id_work->id = i;
        INIT_WORK(&id_work->work, publisher_work_fn);
        queue_work(pubsub_wq, &id_work->work);
    }

    for (i = 0; i < num_subscribers; i++) {
        struct pubsub_id_work *id_work = kmalloc(sizeof(*id_work), GFP_KERNEL);
        if (!id_work) {
            pr_err("Failed to allocate memory for subscriber ID %d\n", i);
            destroy_workqueue(pubsub_wq);
            return -ENOMEM;
        }
        id_work->id = i;
        INIT_WORK(&id_work->work, subscriber_work_fn);
        queue_work(pubsub_wq, &id_work->work);
    }

    pr_info("Pub-Sub module loaded with %d subscribers and %d publishers\n", num_subscribers, num_publishers);
    return 0;
}
  • __exit pubsub_exit(void):模組卸載函數,銷毀工作佇列並計算總時間。
static void __exit pubsub_exit(void) {
    struct timespec64 time_diff;
    ktime_get_real_ts64(&overall_end_time);
    exit_flag = true;
    flush_workqueue(pubsub_wq);
    destroy_workqueue(pubsub_wq);
    time_diff = timespec64_sub(overall_end_time, overall_start_time);
    overall_time = timespec64_to_ns(&time_diff);
    pr_info("Total time taken by all subscribers: %lld.%.9lds\n",
            (long long)total_time_ns / 1000000000, (long)total_time_ns % 1000000000);
    pr_info("Overall time taken by all subscribers: %lld.%.9lds\n",
            (long long)overall_time / 1000000000, (long)overall_time % 1000000000);
    pr_info("Pub-Sub module unloaded\n");
}

執行結果

[五  六  28 01:36:46 2024] Pub-Sub module loaded with 5 subscribers and 2 publishers
[五  六  28 01:36:46 2024] Subscriber 4 received: Publisher 0 message 0, Time taken: 0.000002367s
[五  六  28 01:36:46 2024] Subscriber 2 received: Publisher 0 message 0, Time taken: 0.000002641s
[五  六  28 01:36:46 2024] Subscriber 1 received: Publisher 0 message 0, Time taken: 0.000003143s
[五  六  28 01:36:46 2024] Subscriber 0 received: Publisher 0 message 0, Time taken: 0.000003864s
[五  六  28 01:36:46 2024] Subscriber 2 received: Publisher 0 message 1, Time taken: 0.000003511s
[五  六  28 01:36:46 2024] Subscriber 1 received: Publisher 0 message 1, Time taken: 0.000003704s
[五  六  28 01:36:46 2024] Subscriber 3 received: Publisher 0 message 0, Time taken: 0.000005079s
[五  六  28 01:36:46 2024] Subscriber 4 received: Publisher 0 message 1, Time taken: 0.000003962s
[五  六  28 01:36:46 2024] Subscriber 1 received: Publisher 0 message 2, Time taken: 0.000004681s
[五  六  28 01:36:46 2024] Subscriber 0 received: Publisher 0 message 1, Time taken: 0.000005636s
[五  六  28 01:36:46 2024] Subscriber 4 received: Publisher 0 message 2, Time taken: 0.000005259s
[五  六  28 01:36:46 2024] Subscriber 2 received: Publisher 0 message 2, Time taken: 0.000005269s
[五  六  28 01:36:46 2024] Subscriber 1 received: Publisher 1 message 0, Time taken: 0.000005517s
[五  六  28 01:36:46 2024] Subscriber 3 received: Publisher 0 message 1, Time taken: 0.000007413s
[五  六  28 01:36:46 2024] Subscriber 4 received: Publisher 1 message 0, Time taken: 0.000005874s
[五  六  28 01:36:46 2024] Subscriber 2 received: Publisher 1 message 0, Time taken: 0.000006338s
[五  六  28 01:36:46 2024] Subscriber 1 received: Publisher 1 message 1, Time taken: 0.000006210s
[五  六  28 01:36:46 2024] Subscriber 4 received: Publisher 1 message 1, Time taken: 0.000006860s
[五  六  28 01:36:46 2024] Subscriber 0 received: Publisher 0 message 2, Time taken: 0.000008905s
[五  六  28 01:36:46 2024] Subscriber 2 received: Publisher 1 message 1, Time taken: 0.000007885s
[五  六  28 01:36:46 2024] Subscriber 1 received: Publisher 1 message 2, Time taken: 0.000007265s
[五  六  28 01:36:46 2024] Subscriber 0 received: Publisher 1 message 0, Time taken: 0.000009909s
[五  六  28 01:36:46 2024] Subscriber 4 received: Publisher 1 message 2, Time taken: 0.000008406s
[五  六  28 01:36:46 2024] Subscriber 2 received: Publisher 1 message 2, Time taken: 0.000008638s
[五  六  28 01:36:46 2024] Subscriber 3 received: Publisher 0 message 2, Time taken: 0.000012894s
[五  六  28 01:36:46 2024] Subscriber 0 received: Publisher 1 message 1, Time taken: 0.000011922s
[五  六  28 01:36:46 2024] Subscriber 0 received: Publisher 1 message 2, Time taken: 0.000012701s
[五  六  28 01:36:46 2024] Subscriber 3 received: Publisher 1 message 0, Time taken: 0.000014676s
[五  六  28 01:36:46 2024] Subscriber 3 received: Publisher 1 message 1, Time taken: 0.000015833s
[五  六  28 01:36:46 2024] Subscriber 3 received: Publisher 1 message 2, Time taken: 0.000016599s
[五  六  28 01:42:29 2024] Total time taken by all subscribers: 0.000217702s
[五  六  28 01:42:29 2024] Overall time taken by all subscribers: 343.724282463s

我使用5個 subscriber 和 2 個 publisher 為例。
可以看到每個 subscriber 都有接收到每個 publisher 發出的 3 則 messsage。

測量效能 :

使用 ktime,紀錄執行時間來評估效能,這時候遇到一個問題: 時間怎麼計算才好?

  1. 每一個 subscriber 讀取每一則訊息的時間都記錄下來,用那則訊息發布的時間到 subscriber 讀取所花費的時間,全部加總。
  2. 測量整體所耗費時間,從加載模型到跑完卸載模型的時間。
  3. 每一則訊息發布,到最後一個 subscriber 讀取訊息的時間,為一個訊息所耗費的時間,將全部訊息所耗費的時間加總。

我使用 python 寫測試腳本。
使用 subprocess.run 執行 insmodrmmod、以及 dmesg 這些命令。

def run_command(command):
    result = subprocess.run(command, shell=True, text=True, capture_output=True)
    return result.stdout

讀取 dmesg 輸出的訊息,找到我在模組中輸出的時間資訊。

def get_total_time_from_dmesg():
    dmesg_output = run_command("sudo dmesg | tail -n 20")
    match = re.search(r'Overall time taken by all subscribers:\s+([\d\.]+)s', dmesg_output)
    if match:
        return float(match.group(1))
    return None

for 迴圈去 insmod 不同數量的 subscribers,並將時間記錄起來以供後續畫圖使用。

subscribers_range = range(1, 1001)
times = []

for subscribers in subscribers_range:
    print(f"Testing with {subscribers} subscribers...")
    run_command(f"make load PUBLISHERS=1 SUBSCRIBERS={subscribers}")
    run_command("sudo rmmod pubsub")
    total_time = get_total_time_from_dmesg()
    print(total_time)
    if total_time is not None and total_time < 50:
        times.append(total_time)
    else:
        times.append(0)

會做單純使用 kthread 和 CMWQ 的時間比較

實驗一:

只有一個 Publisher,一個topic ,有不同數量的 Subscriber ,使用第一個方法計算時間。
Publisher 固定為1,Subscriber 數量由1到1000。

  • 實作時遇到的問題
    問題一:
    原本想要同時執行兩個模組去,進行效能量測,但是發現兩者時間一樣,原因是我的 python script 檔抓取時間是單純的抓 dmesg 最後一行,我應該要使用 grep ,只過濾我想要的訊息,再讀取時間,就能分辨兩個模組各自的執行時間。(但是同時執行的效能,不能做為比較的依據,因為會互相影響)
    問題二:
    一直遇到模組無法正確卸載的狀況,發現是我的 kthread_should_stop() 這個條件沒有成立,thread 一直沒有關掉,導致在卸載模型時,會有錯誤。
    問題三:
    發現我的 Kthread 版本是完全照順序的讀取,我覺得沒有 cocurrent 的工作,也是因為我讀取訊息這件事速度很快,在下一個 thread 創建好前,這個 thread 就結束了,並且在讀取時我有用 lock 鎖住,才會看起來像是沒有 cocurrent 做事。
    我改變了一下順序,先將 subscriber thread 創建好,然後使用wait_event_interruptible
    去監控有沒有訊息被寫到 buffer 中,有訊息實再讀取,這樣會提升很多效率。

CMWQ(讀取有 lock ):

kecho_concurrent_performance

CMWQ(讀取沒有 lock ):

kecho_concurrent_performance

Kthread(照順序):

kecho_concurrent_performance

Kthread(不照順序,讀取沒有 lock):

kecho_concurrent_performance

可以看到照順序的版本所需時間遠比 CMWQ 多,因為 CMWQ 有 workqueue, worker pool 的概念,可以自動根據執行狀況去分配工具,照順序的版本是輪到了才建立 thread 並且讀取時有用 lock 鎖住,多了等待和建立 thread 的時間,所以所需時間較長。

Kthread (沒有照順序,沒有 lock ),效能比 Kthread (照順序,有 lock ),提升了很多,可以看到 subscriber 多的時候,所花費的時間大致是 1/2。

CMWQ 沒有 lock 的版本也比有 lock 版本快很多。

實驗二:

使用第二種方法計算時間,和版本依同樣使用一個 Publisher,一個topic ,測量不同數量的 Subscriber 所花費的時間。
因為已經實驗過得知,沒有 lock 比較好,接下來都會以沒有 lock 的版本進行測試。

Kthread:

kecho_concurrent_performance_test

CMWQ:

kecho_concurrent_performance_test

可以看出 CMWQ 的效率還是比單純使用 Kthread 好,有個有趣的現象是 Kthread 的表現比較有規律,花費的時間隨著 subscriber 的數量逐漸上升。
而 CMWQ 花費的時間,在一開始還算規律,但是 subscriber 數量漸漸增加後,所花費的時間就會有高有低。

兩種計算時間的方法代表的意義和一個我心裡一直有的疑問一樣,以前上課老師都會說 : "你浪費了全班1分鐘,全班40個人,你就是浪費了40分鐘!",但是時間明明只過去1分鐘,第一種時間計算方式像是老師所說浪費40分鐘的計算方法,把所有 subscriber 花費的時間相加。第二種時間計算方式則是只看每一則訊息傳到所有 subscriber 所需的時間,只看實際所花費的時間。哪種計算方法才是正確的呢?

從實驗中發現實作錯誤

實驗三:

測量不同數量的 Publisher 的效能差距,使用第一種方法計算時間。
將 Subscriber 數量固定在 500,Publisher 數量由1到1000。

Kthread

kecho_concurrent_performance2

CMWQ

kecho_concurrent_performance2

實驗四:

測量不同數量的 Publisher 的效能差距,使用第二種方法計算時間。

Kthread

kecho_concurrent_performance_test2

CMWQ

kecho_concurrent_performance_test2

從實驗三實驗四中看出 publisher 數量,對於整體效能的影響比較小, subscriber 數量對於效能影響比較大。

我覺得不太合理,想到我的 buffer_size 設置太小,導致 Subscriber 還沒讀到 message , Publisher 發的訊息就將之前的覆蓋掉了,導致 message 遺失。因為 message 數量最多只能達到我設定的數值,所以大部分時間會相差不多。
我將 buffer_size 設定成 10 ,使用 5 個 publisher,每個 publisher 發 3 封 message,總訊息數 15 大於 buffer_size 10 ,可以看到有訊息遺失。

[五  六  28 01:55:11 2024] Pub-Sub module loaded with 2 subscribers and 5 publishers
[五  六  28 01:55:11 2024] Subscriber 0 received: Publisher 0 message 0, Time taken: 0.000007686s
[五  六  28 01:55:11 2024] Subscriber 1 received: Publisher 0 message 0, Time taken: 0.000012225s
[五  六  28 01:55:11 2024] Subscriber 0 received: Publisher 0 message 1, Time taken: 0.000009814s
[五  六  28 01:55:11 2024] Subscriber 0 received: Publisher 0 message 2, Time taken: 0.000012108s
[五  六  28 01:55:11 2024] Subscriber 1 received: Publisher 0 message 1, Time taken: 0.000015387s
[五  六  28 01:55:11 2024] Subscriber 0 received: Publisher 1 message 0, Time taken: 0.000014315s
[五  六  28 01:55:11 2024] Subscriber 1 received: Publisher 0 message 2, Time taken: 0.000018394s
[五  六  28 01:55:11 2024] Subscriber 0 received: Publisher 1 message 1, Time taken: 0.000016483s
[五  六  28 01:55:11 2024] Subscriber 1 received: Publisher 1 message 0, Time taken: 0.000020439s
[五  六  28 01:55:11 2024] Subscriber 0 received: Publisher 1 message 2, Time taken: 0.000014714s
[五  六  28 01:55:11 2024] Subscriber 1 received: Publisher 1 message 1, Time taken: 0.000022577s
[五  六  28 01:55:11 2024] Subscriber 0 received: Publisher 2 message 0, Time taken: 0.000016224s
[五  六  28 01:55:11 2024] Subscriber 1 received: Publisher 1 message 2, Time taken: 0.000021655s
[五  六  28 01:55:11 2024] Subscriber 0 received: Publisher 2 message 1, Time taken: 0.000018250s
[五  六  28 01:55:11 2024] Subscriber 0 received: Publisher 2 message 2, Time taken: 0.000020130s
[五  六  28 01:55:11 2024] Subscriber 1 received: Publisher 2 message 0, Time taken: 0.000024077s
[五  六  28 01:55:11 2024] Subscriber 0 received: Publisher 3 message 0, Time taken: 0.000022043s
[五  六  28 01:55:11 2024] Subscriber 1 received: Publisher 2 message 1, Time taken: 0.000025915s
[五  六  28 01:55:11 2024] Subscriber 1 received: Publisher 2 message 2, Time taken: 0.000027926s
[五  六  28 01:55:11 2024] Subscriber 1 received: Publisher 3 message 0, Time taken: 0.000028948s

測試不同 subscriber 數量的效能差異時,因為 publisher 只有一個,所以不會造成此問題。

修正 bug

使用 circular buffer ,並把所有 subscriber 都讀取過的 message 從 buffer 中刪除。
這樣還不能解決問題, publisher 需要在 buffer 滿了以後,暫停發佈訊息到 buffer。
因為有對 buffer 做刪除以及再寫入的動作,使用 lock 和 atomic 操作。

修改部分 :

  1. 使用 atomic 進行同步:
    使用atomic_t來處理多線程之間的同步操作,更安全地處理訊息計數和讀取計數。
    atomic_t message_count = ATOMIC_INIT(0);
    atomic_t read_count[BUFFER_SIZE];

  2. 使用 head 和 tail 指針:
    使用 head 和 tail 指針來管理 circular buffer,取代單一的 write_index。

  3. 用等待佇列處理特定情況:
    為 publisher 和 subscriber 分別使用等待佇列(publisher_queue和subscriber_queue)來更有效地處理阻塞情況,比如 buffer 滿的情況。

效能測量 :
使用 CMWQ ,對不同 publisher 數量做測量,subscriber 數量固定為 500,看起來依然沒甚麼差別。

kecho_concurrent_performance_test
將 subscriber 數量定為 1 後做測試:
kecho_concurrent_performance_test3

可以看到會隨著 publisher 數量增加而運行較久。

目前的 bug:
不知道為什麼有時候模組掛載後,不會有任何動作,不會發布訊息,導致時間測量出是 0。
這個問題我目前不知道怎麼解決。

kecho_concurrent_performance3

未來改進方向

找出為何掛載後,模組有時候能運作、有時候不會動作的原因並修復