--- tags: 2022 linux kernel --- # 2022q1 Homework6 [(kecho)](https://github.com/Risheng1128/kecho) contributed by < [`Risheng1128`](https://github.com/Risheng1128) > > [作業說明](https://hackmd.io/@sysprog/linux2022-ktcp) > [作業區](https://hackmd.io/@sysprog/linux2022-homework6) ## 實驗環境 ```shell $ gcc --version gcc (Ubuntu 9.3.0-17ubuntu1~20.04) 9.3.0 Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Byte Order: Little Endian Address sizes: 39 bits physical, 48 bits virtual CPU(s): 4 On-line CPU(s) list: 0-3 Thread(s) per core: 2 Core(s) per socket: 2 Socket(s): 1 NUMA node(s): 1 Vendor ID: GenuineIntel CPU family: 6 Model: 142 Model name: Intel(R) Core(TM) i5-7200U CPU @ 2.50GHz Stepping: 9 CPU MHz: 2700.000 CPU max MHz: 3100.0000 CPU min MHz: 400.0000 BogoMIPS: 5399.81 Virtualization: VT-x L1d cache: 64 KiB L1i cache: 64 KiB L2 cache: 512 KiB L3 cache: 3 MiB NUMA node0 CPU(s): 0-3 ``` ## 自我檢查清單 :::info - [x] 給定的 `kecho` 已使用 CMWQ,請陳述其優勢和用法 - [x] 核心文件 [Concurrency Managed Workqueue (cmwq)](https://www.kernel.org/doc/html/latest/core-api/workqueue.html) 提到 "The original create_`*`workqueue() functions are deprecated and scheduled for removal",請參閱 Linux 核心的 git log (不要用 Google 搜尋!),揣摩 Linux 核心開發者的考量 - [x] 解釋 `user-echo-server` 運作原理,特別是 [epoll](http://man7.org/linux/man-pages/man7/epoll.7.html) 系統呼叫的使用 - [x] 是否理解 `bench` 原理,能否比較 `kecho` 和 `user-echo-server` 表現?佐以製圖 - [ ] 解釋 `drop-tcp-socket` 核心模組運作原理。`TIME-WAIT` sockets 又是什麼? ::: ### 為什麼需要 Concurrency Managed Workqueue (CMWQ)? 在理解 CMWQ 的優勢及用法之前,先理解為什麼需要特別實作 CMWQ ,參考 [Why cmwq?](https://www.kernel.org/doc/html/latest/core-api/workqueue.html#why-cmwq) 原本的 workqueue 實作可分為兩種 — multi threaded workqueue 及 single threaded workqueue - multi threaded workqueue: 每個 CPU 都有一個 worker thread ,因此每個 multi threaded workqueue 都會有和 CPU 數相同的 worker thread - single threaded workqueue: 整個系統只有一個 worker thread 由於逐年上升的 MT wq 使用者且同時 CPU 的核數逐漸上升,導致某些系統剛啟動時可能就會將預設的 32k PID 空間用完 而需要 CMWQ 的最主要原因在於,原始的 MT wq 浪費了許多的資源,但 MT wq 的 level of concurrency 並沒有來的比較好。MT wq 建立了固定數量的執行緒,但問題出在每個 CPU 上都固定綁定一個執行緒,導致不同 CPU 的執行緒間無法相互轉移,進而降低了系統的效能 CMWQ 著重在以下的實作 - 維持和原本的 workqueue API 的相容性 - 在每個 CPU 建立統一的 worker pools 並且被所有的 workqueue 共享,為了提昇 level of concurrency 的彈性以及不浪費太多資源 - 自動調節 worker pool 以及 level concurrency ,讓 API 的使用者不用注意細節 ### 使用 CMWQ 的優勢 以下為 CMWQ 的架構,一共分成 Thread pool 及 Unbound Thread pool - Thread pool: 分成 normal thread pool 及 high priority thread ,分別管理普通的執行緒和高優先權的執行緒。其總數是固定的,如果有 n 個 CPU ,則會建立 2n 的 thread pool - Unbound Thread pool: 這種 thread pool 是動態建立且可運行在任意的 CPU 上 ![](https://i.imgur.com/NI66J4h.png) CMWQ 的優勢: - 使用者不需要管理執行緒, CMWQ 會自動管理執行緒,可以避免建立過多的執行緒,降低系統效能 - 在原本的 workqueue 執行緒是被綁定在 CPU 上,而 CMWQ 除了有綁定特定 CPU 的 thread pool ,也有可以執行在任意 CPU 的 Unbound thread pool ,可以分配執行緒給 idle 的 CPU ### CMWQ 的 API 用法 函式 `alloc_workqueue`: 分配一個 workqueue。函式 `alloc_workqueue` 一共有 3 個參數 — `@"name"`, `@flags` 及 `@max_active` - `@"name"`: workqueue 的名稱 - `@flags`: - `WQ_UNBOUND`: Work items queued to an unbound wq are served by the special worker-pools which host workers which are not bound to any specific CPU - `WQ_FREEZABLE`: A freezable wq participates in the freeze phase of the system suspend operations. Work items on the wq are drained and no new work item starts execution until thawed - `WQ_MEM_RECLAIM`: All wq which might be used in the memory reclaim paths MUST have this flag set. The wq is guaranteed to have at least one execution context regardless of memory pressure - `WQ_HIGHPRI`: Work items of a highpriority wq are queued to the highpri worker-pool of the target cpu. Highpri worker-pools are served by worker threads with elevated nice level - `WQ_CPU_INTENSIVE`: Work items of a CPU intensive wq do not contribute to the concurrency level - `@max_active`: 決定 workqueue 每個 CPU 上最多可存放的 work item ,假設 `max_active = 16` ,表示每個 workqueue 最多可以存放 16 個 work item ### 解讀函式 `__create_workqueue` 為何被移除 參考 [commit](https://github.com/torvalds/linux/commit/d320c03830b17af64e4547075003b1eeb274bc6c) 和 [OscarShiang 的開發紀錄](https://hackmd.io/@oscarshiang/linux_kecho#%E6%94%B9%E4%BB%A5-alloc_workqueue-%E5%AF%A6%E4%BD%9C-workqueue-%E7%9A%84%E5%8E%9F%E5%9B%A0) ,可以發現 `__create_workqueue` 改成 `alloc_workqueue` 的原因是為了讓新增功能的方法變得更簡潔 > This patch makes changes to make new workqueue features available to its users 參考 [[PATCH] Support for freezeable workqueues](https://github.com/torvalds/linux/commit/341a595850dac1b0503df34260257d71b4fdf72c) 和 [workqueue: introduce create_rt_workqueue](https://github.com/torvalds/linux/commit/0d557dc97f4bb501f086a03d0f00b99a7855d794) 的修改可以發現,在改名成 `alloc_workqueue` 之前,新增功能的實作常常都是要動到大量的函式宣告,並且增加函式參數,以下給兩個例子 - freezeable workqueues &rarr; 新增參數 `freezeable` ```diff extern struct workqueue_struct *__create_workqueue(const char *name, - int singlethread); - #define create_workqueue(name) __create_workqueue((name), 0) - #define create_singlethread_workqueue(name) __create_workqueue((name), 1) + int singlethread, + int freezeable); + #define create_workqueue(name) __create_workqueue((name), 0, 0) + #define create_freezeable_workqueue(name) __create_workqueue((name), 0, 1) + #define create_singlethread_workqueue(name) __create_workqueue((name), 1, 0) ``` - real time workqueues &rarr; 新增參數 `rt` ```diff - #define create_workqueue(name) __create_workqueue((name), 0, 0) - #define create_freezeable_workqueue(name) __create_workqueue((name), 1, 1) - #define create_singlethread_workqueue(name) __create_workqueue((name), 1, 0) + #define create_workqueue(name) __create_workqueue((name), 0, 0, 0) + #define create_rt_workqueue(name) __create_workqueue((name), 0, 0, 1) + #define create_freezeable_workqueue(name) __create_workqueue((name), 1, 1, 0) + #define create_singlethread_workqueue(name) __create_workqueue((name), 1, 0, 0) ``` 接著從 [workqueue: merge feature parameters into flags](https://github.com/torvalds/linux/commit/97e37d7b9e65a6ac939f796f91081135b7a08acc) 可以看到原本功能的實作從函式參數改成使用 `flags` 的方式 ```diff + enum { + WQ_FREEZEABLE = 1 << 0, /* freeze during suspend */ + WQ_SINGLE_THREAD = 1 << 1, /* no per-cpu worker */ + }; - #define create_workqueue(name) __create_workqueue((name), 0, 0) - #define create_freezeable_workqueue(name) __create_workqueue((name), 1, 1) - #define create_singlethread_workqueue(name) __create_workqueue((name), 1, 0) + #define create_workqueue(name) \ + __create_workqueue((name), 0) + #define create_freezeable_workqueue(name) \ + __create_workqueue((name), WQ_FREEZEABLE | WQ_SINGLE_THREAD) + #define create_singlethread_workqueue(name) \ + __create_workqueue((name), WQ_SINGLE_THREAD) ``` 改成 `flags` 後新增功能的方法變得相對簡單許多,以 [workqueue: implement high priority workqueue](https://github.com/torvalds/linux/commit/649027d73a6309ac34dc2886362e662bd73456dc) 及 [workqueue: implement cpu intensive workqueue](https://github.com/torvalds/linux/commit/fb0e7beb5c1b6fb4da786ba709d7138373d5fb22) 為例 - high priority workqueue ```diff GCWQ_DISASSOCIATED = 1 << 2, /* cpu can't serve workers */ GCWQ_FREEZING = 1 << 3, /* freeze in progress */ + GCWQ_HIGHPRI_PENDING = 1 << 4, /* highpri works on queue */ ``` - cpu intensive workqueue ```diff WQ_NON_REENTRANT = 1 << 2, /* guarantee non-reentrance */ WQ_RESCUER = 1 << 3, /* has an rescue worker */ WQ_HIGHPRI = 1 << 4, /* high priority */ + WQ_CPU_INTENSIVE = 1 << 5, /* cpu instensive workqueue */ ``` 可以觀察到在標頭檔裡,新增功能的所需要的修正單純許多,已經不需要再修改大量的函式宣告,只需要新增 `flag` 即可 ### 解釋 `user-echo-server` 運作原理 以下為 socket 的流程圖,可以很明顯看到 server 的流程為 `socket()` &rarr; `bind()` &rarr; `listen()` &rarr; `accept()` ,接著開始收發資料,以下對照檔案 `user-echo-server.c` 的程式碼 ![](https://i.imgur.com/0rG4ArH.png) - `socket()`: 建立一個 socket descriptor ```c /** * 建立一個 socket descriptor * PF_INET: 使用 32 bit IP 地址 * SOCK_STREAM: Provides sequenced, reliable, two-way, connection-based byte streams. */ if ((listener = socket(PF_INET, SOCK_STREAM, 0)) < 0) server_err("Fail to create socket", &list); ``` - `bind()`: 將 addr 裡的 socket address 和 socket descriptor 連繫起來 ```c // 將 addr 和 listener 連繫起來 if (bind(listener, (struct sockaddr *) &addr, sizeof(addr)) < 0) server_err("Fail to bind", &list); ``` - `listen()`: 將 sockfd 轉換成 listening socket ,可以接收來自客戶端的請求,而這邊可以連接 128 個客戶端 ```c if (listen(listener, 128) < 0) server_err("Fail to listen", &list); ``` - `epoll_create`: 建立一個 epoll 的 file descriptor ```c // creates a new epoll instance int epoll_fd; if ((epoll_fd = epoll_create(EPOLL_SIZE)) < 0) server_err("Fail to create epoll", &list); ``` - `epoll_ctl`: 用來新增、修改及移除所要監聽的 file descriptor 的事件類型 下面的程式碼表示 `epoll_fd` 監聽 `listener` ,並且增加的監聽類型為 `EPOLLIN` 及 `EPOLLET` ```c // EPOLLIN: The associated file is available for read operations // EPOLLET: Requests edge-triggered notification for the associated file descriptor static struct epoll_event ev = {.events = EPOLLIN | EPOLLET}; ev.data.fd = listener; /* * epoll_ctl: This system call is used to add, modify, or remove entries in the * interest list of the epoll(7) instance referred to by the file descriptor epfd * EPOLL_CTL_ADD: Add an entry to the interest list of the epoll file descriptor */ if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listener, &ev) < 0) server_err("Fail to control epoll", &list); ``` - `epoll_wait`: 從 user space 切換到 kernel space ,開始監聽 `timeout` 長度的時間並且回傳時間內發生的事件數,這裡的 `timeout = -1` ,因此表示 block indefinitely ```c /** * epoll_wait: mode transition 並開始監聽 * EPOLL_RUN_TIMEOUT: block indefinitely */ int epoll_events_count; if ((epoll_events_count = epoll_wait(epoll_fd, events, EPOLL_SIZE, EPOLL_RUN_TIMEOUT)) < 0) ``` 發生事件後,要判斷是新的 client 還是舊的,用發生事件的 `fd` 和 `listen` 判斷,如果相等表示為新的 client ```c /* EPOLLIN event for listener (new client connection) */ if (events[i].data.fd == listener) { int client; ... } else { ... } ``` 接著分成兩種情況討論,新的 client 與舊的 client ,先討論新的連線 - `accept()`: 等待來自 client 的連線請求 ```c while ((client = accept(listener, (struct sockaddr *) &client_addr, &socklen)) > 0) ``` 接著新增 `epoll_fd` 的監聽對象 (client) ,並且將 client 加到 linked list 裡 ```c // epoll_fd 新增新的監聽對象 (client) if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client, &ev) < 0) server_err("Fail to control epoll", &list); // 將 client 加到 linked list 裡 push_back_client(&list, client, inet_ntoa(client_addr.sin_addr)); ``` 如果收到的是舊的 client 的請求,對收到資料進行處理 ```c /* EPOLLIN event for others (new incoming message from client) */ if (handle_message_from_client(events[i].data.fd, &list) < 0) server_err("Handle message from client", &list); ``` - `recv()`: 收到來自 socket 的訊息 ```c // 接收資料 if ((len = recv(client, buf, BUF_SIZE, 0)) < 0) server_err("Fail to receive", list); ``` 最後如果收到資料就使用函式 `send()` 送到 client ,如果沒有就使用函式 `close()` 關閉 client 並從 linked list 移除 ```c // 沒收到訊息 if (len == 0) { // 關閉 client if (close(client) < 0) server_err("Fail to close", list); // 將 client 從 linke list 移除 *list = delete_client(list, client); printf("After fd=%d is closed, current numbers clients = %d\n", client, size_list(*list)); } else { printf("Client #%d :> %s", client, buf); // 送出資料 if (send(client, buf, BUF_SIZE, 0) < 0) server_err("Fail to send", list); } ``` ### [EPOLL](https://man7.org/linux/man-pages/man7/epoll.7.html) 系統呼叫 參考 [Linux 核心設計: 針對事件驅動的 I/O 模型演化](https://hackmd.io/@sysprog/linux-io-model/https%3A%2F%2Fhackmd.io%2F%40sysprog%2Fevent-driven-server) 裡對於 epoll 的說明 首先 I/O 模型主要分成 Blocking I/O, Non blocking I/O, I/O multiplexing I/O 及 Asynchronous I/O 四種,而 epoll 屬於 I/O multiplexing ,以下為模型的流程圖 ![](https://i.imgur.com/Wrz4cDz.png) 比較特別的地方在於 I/O Multiplexing 在呼叫系統呼叫後,會開始計算期間內已經準備好傳送資料的連線數並回傳,再經由其他系統呼叫進行資料的接收 以 `user-echo-server` 為例,在呼叫函式 `epoll_wait` 會開始監聽且時間由參數 `timeout` 決定,並且回傳監聽期間內可以開始接收資料的事件數量,可以對應到上述 I/O 模型等待資料的部份 接著 `user-echo-server` 使用函式 `recv` 的部份也可以對應上述 I/O 模型讀取資料的部份 ### 理解 `bench` 原理 看了檔案 `bench.c` 的程式碼後,可以很清楚的了解其目的是要利用多執行緒的方式測試伺服器的效能 首先是函式 `bench` 的部份,一開始建立 `MAX_THREAD = 1000` 個執行緒,接著等到所有執行緒都準備好,使用函式 `pthread_cond_broadcast` 開始啟動所有執行緒,最後計算每個執行緒的平均處理時間 ```c static void bench(void) { for (int i = 0; i < BENCH_COUNT; i++) { ready = false; // 建立 client create_worker(MAX_THREAD); pthread_mutex_lock(&worker_lock); ready = true; /* all workers are ready, let's start bombing kecho */ pthread_cond_broadcast(&worker_wait); pthread_mutex_unlock(&worker_lock); /* waiting for all workers to finish the measurement */ for (int x = 0; x < MAX_THREAD; x++) pthread_join(pt[x], NULL); idx = 0; } // 計算平均時間 for (int i = 0; i < MAX_THREAD; i++) fprintf(bench_fd, "%d %ld\n", i, time_res[i] /= BENCH_COUNT); } ``` 函式 `pthread_cond_broadcast` 參考 [pthread_cond_broadcast(3) - Linux man page](https://linux.die.net/man/3/pthread_cond_broadcast) ,這邊的用途是開始執行所有被 `worker_wait` 鎖住的執行緒 > The pthread_cond_broadcast() function shall unblock all threads currently blocked on the specified condition variable cond. 討論函式 `create_worker` ,其用途是建立執行緒,預設建立 1000 個執行緒且每個執行緒都是執行函式 `bench_worker` ```c static void create_worker(int thread_qty) { for (int i = 0; i < thread_qty; i++) { if (pthread_create(&pt[i], NULL, bench_worker, NULL)) { puts("thread creation failed"); exit(-1); } } } ``` 開始討論最關鍵的函式 `bench_worker` ```c= static void *bench_worker(__attribute__((unused))) { int sock_fd; char dummy[MAX_MSG_LEN]; struct timeval start, end; /* wait until all workers created */ pthread_mutex_lock(&worker_lock); while (!ready) // 等待 worker_wait 被 broadcast 或是 signal if (pthread_cond_wait(&worker_wait, &worker_lock)) { puts("pthread_cond_wait failed"); exit(-1); } pthread_mutex_unlock(&worker_lock); // 建立 socket descriptor sock_fd = socket(AF_INET, SOCK_STREAM, 0); if (sock_fd == -1) { perror("socket"); exit(-1); } struct sockaddr_in info = { .sin_family = PF_INET, .sin_addr.s_addr = inet_addr(TARGET_HOST), .sin_port = htons(TARGET_PORT), }; // 建立連線 if (connect(sock_fd, (struct sockaddr *) &info, sizeof(info)) == -1) { perror("connect"); exit(-1); } gettimeofday(&start, NULL); // 送資料給 server send(sock_fd, msg_dum, strlen(msg_dum), 0); // 從 server 接收資料 recv(sock_fd, dummy, MAX_MSG_LEN, 0); gettimeofday(&end, NULL); shutdown(sock_fd, SHUT_RDWR); close(sock_fd); // 比較接受回來的資料和原本的資料是否相同 if (strncmp(msg_dum, dummy, strlen(msg_dum))) { puts("echo message validation failed"); exit(-1); } // 計算經過時間 pthread_mutex_lock(&res_lock); time_res[idx++] += time_diff_us(&start, &end); pthread_mutex_unlock(&res_lock); pthread_exit(NULL); } ``` 第 11 行使用函式 `pthread_cond_wait` ,參考 [pthread_cond_wait(3) - Linux man page](https://linux.die.net/man/3/pthread_cond_wait) ,可以得知目的是要等待 `worker_wait` 被 broadcast 或是 signal > The `pthread_cond_timedwait()` and `pthread_cond_wait()` functions shall block on a condition variable. They shall be called with mutex locked by the calling thread or undefined behavior results. 接著就是進行 socket 的建立 (第 18 行) 以及連線 (第 31 行) ,最後和 server 送收資料 (第 38 行和第 40 行) 並且開始計算經過時間 (第 54 行) ### 比較 `kecho` 和 `user-echo-server` 表現 首先測試 `user-echo-server` 的效能,輸入命令 `./user-echo-server` 並且開啟新的終端機作為 client 輸入命令 `./bench` ,最後使用 gnuplot 作圖,輸入命令 `make plot` ,結果如下 ![](https://i.imgur.com/rf9rEbh.png) 接著測試 `kecho` 的效能,首先輸入命令 `sudo insmod kecho.ko` ,接著輸入命令 `./bench` ,最後畫圖,結果如下所示 ![](https://i.imgur.com/IJcoEY1.png) 可以明顯觀察 `kecho` 的速度比 `user-echo-server` 快 嘗試使用實驗室(效能較好)的電腦測試 - `user-echo-server` ![](https://i.imgur.com/Dulx2on.png) - `kecho` ![](https://i.imgur.com/vzgbXcz.png) 可以發現效能其實差非常多 ### 解釋 `drop-tcp-socket` 核心模組運作原理 輸入命令 `sudo insmod drop-tcp-socket.ko` 載入核心模組時,會執行函式 `drop_tcp_init` ```c static int drop_tcp_init(void) { int res = register_pernet_subsys(&droptcp_pernet_ops); if (res) return res; return 0; } ``` ## 開發紀錄 :::info 作業要求 - 在 GitHub 上 fork [kecho](https://github.com/sysprog21/kecho),目標是修正 `kecho` 的執行時期的缺失,提升效能和穩健度 (robustness) - [ ] 若使用者層級的程式頻繁傳遞過長的字串給 `kecho` 核心模組,會發生什麼事? - [ ] 參照 [kecho pull request #1](https://github.com/sysprog21/kecho/pull/1),嘗試比較 kthread 為基礎的實作和 CMWQ,指出兩者效能的落差並解釋 - [ ] 如果使用者層級的程式建立與 `kecho` 核心模組的連線後,就長期等待,會導致什麼問題? - [ ] 研讀 [Linux Applications Performance: Introduction](https://unixism.net/2019/04/linux-applications-performance-introduction/),嘗試將上述實作列入考量,比較多種 TCP 伺服器實作手法的效能表現 ::: ### 掛載 `kecho` 模組 在研究 `kecho` 的行為前,先分析 `kecho` 的實作考量,首先討論掛載模組時會執行的函式 `kecho_init_module` ```c static int kecho_init_module(void) { int error = open_listen(&listen_sock); if (error < 0) { printk(KERN_ERR MODULE_NAME ": listen socket open error\n"); return error; } param.listen_sock = listen_sock; /* * Create a dedicated workqueue instead of using system_wq * since the task could be a CPU-intensive work item * if its lifetime of connection is too long, e.g., using * `telnet` to communicate with kecho. Flag WQ_UNBOUND * fits this scenario. Note that the trade-off of this * flag is cache locality. * * You can specify module parameter "bench=1" if you won't * use telnet-like program to interact with the module. * This earns you better cache locality than using default * flag, `WQ_UNBOUND`. Note that your machine may going * unstable if you use telnet-like program along with * module parameter "bench=1" to interact with the module. * Since without `WQ_UNBOUND` flag specified, a * long-running task may delay other tasks in the kernel. */ kecho_wq = alloc_workqueue(MODULE_NAME, bench ? 0 : WQ_UNBOUND, 0); echo_server = kthread_run(echo_server_daemon, &param, MODULE_NAME); if (IS_ERR(echo_server)) { printk(KERN_ERR MODULE_NAME ": cannot start server daemon\n"); close_listen(listen_sock); } return 0; } ``` 可以簡單把函式 `kecho_init_module` 用函式分成三個部份,著重在 `alloc_workqueue` 及 `kthread_run` - `open_listen`: 建立伺服器並等待連線 - `alloc_workqueue`: 分配一個 workequeue - `kthread_run`: 用於建立一個立刻執行的執行緒 根據註解說明,可以很清楚的知道 `kecho` 利用參數 `bench` 決定 workqueue 的設定,參考 [Application Programming Interface (API)](https://www.kernel.org/doc/html/latest/core-api/workqueue.html#application-programming-interface-api) ```c kecho_wq = alloc_workqueue(MODULE_NAME, bench ? 0 : WQ_UNBOUND, 0); ``` 函式 `kthread_run` 的部份可以參考 [Linux核心多執行緒kthread](https://codertw.com/%E7%A8%8B%E5%BC%8F%E8%AA%9E%E8%A8%80/609895/#outline__1_2_1) 及 [include/linux/kthread.h](https://github.com/torvalds/linux/blob/master/include/linux/kthread.h) ,其功能是用來建立執行緒並直接執行,以下為原始碼的部份 ```c /** * kthread_run - create and wake a thread. * @threadfn: the function to run until signal_pending(current). * @data: data ptr for @threadfn. * @namefmt: printf-style name for the thread. * * Description: Convenient wrapper for kthread_create() followed by * wake_up_process(). Returns the kthread or ERR_PTR(-ENOMEM). */ #define kthread_run(threadfn, data, namefmt, ...) \ ({ \ struct task_struct *__k \ = kthread_create(threadfn, data, namefmt, ## __VA_ARGS__); \ if (!IS_ERR(__k)) \ wake_up_process(__k); \ __k; \ }) ``` 可以清楚知道 `kthread_run` 先使用 `kthread_create` 建立執行緒,接著使用 `IS_ERR` 判斷建立有無出現問題,程式若正常就直接使用 `wake_up_process` 啟動執行緒並且執行 根據 `kecho` 原始碼以及 Linux 核心原始碼的註解,可以得知 `kthread_run` 會做以下的動作 - 執行緒會執行函式 `echo_server_daemon` 直到 `signal_pending(current)` ,相關敘述可以在函式 `echo_server_daemon` 找到 - 變數 `param` 會作為 `echo_server_daemon` 的參數傳入 - 字串 `"kecho"` 作為執行緒的名稱 接著可以開始分析 `echo_server_daemon` 函式 ```c int echo_server_daemon(void *arg) { struct echo_server_param *param = arg; struct socket *sock; struct work_struct *work; // 登記要接收的 Signal allow_signal(SIGKILL); allow_signal(SIGTERM); INIT_LIST_HEAD(&daemon.worker); // 判斷執行緒是否該被中止 while (!kthread_should_stop()) { /* using blocking I/O */ int error = kernel_accept(param->listen_sock, &sock, 0); if (error < 0) { // 檢查當前執行緒是否有 signal 處理 if (signal_pending(current)) break; printk(KERN_ERR MODULE_NAME ": socket accept error = %d\n", error); continue; } if (unlikely(!(work = create_work(sock)))) { printk(KERN_ERR MODULE_NAME ": create work error, connection closed\n"); kernel_sock_shutdown(sock, SHUT_RDWR); sock_release(sock); continue; } /* start server worker */ queue_work(kecho_wq, work); } printk(MODULE_NAME ": daemon shutdown in progress...\n"); daemon.is_stopped = true; free_work(); return 0; } ``` 函式 `echo_server_daemon` 會做以下幾件事 1. 登記會使用到的 signal ,這裡用到 `SIGKILL` 及 `SIGTERM` ,兩者都是用來結束程式,接著參考 [SIGTERM vs SIGKILL: What's the Difference?](https://linuxhandbook.com/sigterm-vs-sigkill/) 可以了解兩者的差異 2. 使用 `kthread_should_stop` 判斷執行緒是否該被中止 3. 使用 `kernel_accept` 檢查是否有上述的 signal 發生,有的話就跳出迴圈並且結束程式 4. 和 client 建立連線後,使用函式 `create_work` 建立工作 5. 一切準備就緒,使用函式 `queue_work` 開始工作 接著查看函式 `create_work` ```c static struct work_struct *create_work(struct socket *sk) { struct kecho *work; // 分配 kecho 結構大小的空間 // GFP_KERNEL: 正常配置記憶體 if (!(work = kmalloc(sizeof(struct kecho), GFP_KERNEL))) return NULL; work->sock = sk; // 初始化已經建立的 work ,且會執行函式 echo_server_worker INIT_WORK(&work->kecho_work, echo_server_worker); list_add(&work->list, &daemon.worker); return &work->kecho_work; } ``` 很明顯函式 `create_work` 只是要把每個建立的連線資料作為 linked list 的節點儲存起來,而每個連線都會執行函式 `echo_server_worker` (後續會有更多分析) 看完 `kecho` 大致的流程,加上以下定義的結構,可以完全得知 `kecho` 是如何管理每個連線 ```c struct echo_service { bool is_stopped; struct list_head worker; }; struct kecho { struct socket *sock; struct list_head list; struct work_struct kecho_work; }; ``` ```graphviz digraph G { rankdir = LR; splines = false; node[shape = "record"] echo_service[label = "echo_service|is_stopped|<h>worker"] kecho0[label = "kecho 0|sock|<l>list|kecho_work"] kecho1[label = "kecho 1|sock|<l>list|kecho_work"] kechon[label = "kecho n|sock|<l>list|kecho_work"] block[shape = plaintext label = "..."] echo_service:h -> kecho0:l -> echo_service:h kecho0:l -> kecho1:l -> kecho0:l kecho1:l -> block:l -> kecho1:l block:l -> kechon:l -> block:l } ``` `kecho` 使用參數 `is_stopped` 判斷整個程式是否需要結束,並且使用 `worker` 作為 linked list 的頭將每個連線資料串連起來 ### 執行函式 `echo_server_worker` 在 `kecho` 裡,每個連線請求被建立後都會執行函式 `echo_server_worker` ```c static void echo_server_worker(struct work_struct *work) { struct kecho *worker = container_of(work, struct kecho, kecho_work); unsigned char *buf; // 取得 buffer 空間 buf = kzalloc(BUF_SIZE, GFP_KERNEL); if (!buf) { printk(KERN_ERR MODULE_NAME ": kmalloc error....\n"); return; } // 當程式還沒有要中斷前,執行無限迴圈 while (!daemon.is_stopped) { // 取得資料 int res = get_request(worker->sock, buf, BUF_SIZE - 1); if (res <= 0) { if (res) { printk(KERN_ERR MODULE_NAME ": get request error = %d\n", res); } break; } // 回傳資料 res = send_request(worker->sock, buf, res); if (res < 0) { printk(KERN_ERR MODULE_NAME ": send request error = %d\n", res); break; } // 重置 buffer memset(buf, 0, res); } kernel_sock_shutdown(worker->sock, SHUT_RDWR); kfree(buf); } ``` 很明顯函式 `echo_server_worker` 就是負責取得資料並且回傳相同資料的函式,使用函式 `get_request` 及 `send_request` 分別進行資料的收及送 首先觀察函式 `get_request` ,很明顯就只是用 `kernel_recvmsg` 接收來自 socket 的資料,不過有蠻多特別的結構可以深入研究 ```c static int get_request(struct socket *sock, unsigned char *buf, size_t size) { struct msghdr msg; struct kvec vec; int length; /* kvec setting */ vec.iov_len = size; vec.iov_base = buf; /* msghdr setting */ msg.msg_name = 0; msg.msg_namelen = 0; msg.msg_control = NULL; msg.msg_controllen = 0; msg.msg_flags = 0; /* * TODO: during benchmarking, such printk() is useless and lead to worse * result. Add a specific build flag for these printk() would be good. */ printk(MODULE_NAME ": start get response\n"); /* get msg */ length = kernel_recvmsg(sock, &msg, &vec, size, size, msg.msg_flags); printk(MODULE_NAME ": get request = %s\n", buf); return length; } ``` 參考 [kernel_recvmsg](https://www.kernel.org/doc/htmldocs/networking/API-kernel-recvmsg.html) 可以得到其定義 ```c // 從 socket 取得資料 (message) int kernel_recvmsg (struct socket * sock, // The socket to receive the message from struct msghdr * msg, // Received message struct kvec * vec, // Input s/g array for message data size_t num, // Size of input s/g array size_t size, // Number of bytes to read int flags // Message flags (MSG_DONTWAIT, etc...)); ``` 接著可以分析結構 `msghdr` 及 `kvec` ,分別定義在 [include/linux/socket.h](https://github.com/torvalds/linux/blob/master/include/linux/socket.h) 及 [include/linux/uio.h](https://github.com/torvalds/linux/blob/master/include/linux/uio.h) ```c struct msghdr { void *msg_name; /* ptr to socket address structure */ int msg_namelen; /* size of socket address structure */ struct iov_iter msg_iter; /* data */ /* * Ancillary data. msg_control_user is the user buffer used for the * recv* side when msg_control_is_user is set, msg_control is the kernel * buffer used for all other cases. */ union { void *msg_control; void __user *msg_control_user; }; bool msg_control_is_user : 1; __kernel_size_t msg_controllen; /* ancillary data buffer length */ unsigned int msg_flags; /* flags on received message */ struct kiocb *msg_iocb; /* ptr to iocb for async requests */ }; struct kvec { void *iov_base; /* and that should *never* hold a userland pointer */ size_t iov_len; }; ``` 有了以上的結構後,可以對應 `get_request` 的程式碼,對變數 `msg` 來說沒有特別的設定,而對變數 `vec` 來說,很明顯可以看到資料最後會送到 `vec.iov_base` 所指向的地址 接著我很好奇函式 `kernel_recvmsg` 對參數 `vec` 的註解 > Input s/g array for message data 裡面的 s/g 的意思為 scatter/gather ,搜尋後查到 [DMA](https://zh.m.wikipedia.org/zh-tw/%E7%9B%B4%E6%8E%A5%E8%A8%98%E6%86%B6%E9%AB%94%E5%AD%98%E5%8F%96) 有相關的功能,因此可以推論 `kernel_recvmsg` 在讀取來自 socket 的資料時應該會用到 DMA (不是很確定) 接著可以開始分析函式 `send_request` ,主要邏輯都和 ```c static int send_request(struct socket *sock, unsigned char *buf, size_t size) { int length; struct kvec vec; struct msghdr msg; msg.msg_name = NULL; msg.msg_namelen = 0; msg.msg_control = NULL; msg.msg_controllen = 0; msg.msg_flags = 0; vec.iov_base = buf; vec.iov_len = strlen(buf); printk(MODULE_NAME ": start send request.\n"); length = kernel_sendmsg(sock, &msg, &vec, 1, size); printk(MODULE_NAME ": send request = %s\n", buf); return length; } ``` 參考 [kernel_sendmsg](https://www.kernel.org/doc/html/v5.6/networking/kapi.html#c.kernel_sendmsg) 以下為其定義 ```c int kernel_sendmsg(struct socket * sock, // socket struct msghdr * msg, // message header struct kvec * vec, // kernel vec size_t num, // vec array length size_t size) // total message data size ``` 和 `kernel_recvmsg` 的用法差不多,只差在這裡是使用結構 `vec` 將資料輸出 ### 改寫 benchmarking 在處理資料時, `kecho` 原本的實作會經過不少的 [printk](https://www.kernel.org/doc/html/latest/core-api/printk-basics.html) ,導致 `bench` 計算的時間會包含到這些函式,產生多餘的耗能,因此首先需要移除多餘函式的使用 參考 [kevinshieh0225 的開發紀錄](https://hackmd.io/@Masamaloka/linux2022-ktcp/https%3A%2F%2Fhackmd.io%2F%40Masamaloka%2Flinux2022-kecho#RUNTIME_STAT-flags) 裡提到的 [sysprog21/concurrent-programs](https://github.com/sysprog21/concurrent-programs/blob/master/hp_list/main.c) 進行模仿,在程式結束時再把紀錄印出 在 `echo_server.h` 新增 ```c enum { TRACE_kzalloc_err = 1, // kzalloc 失敗的次數 TRACE_get_err, // get request 失敗的次數 TRACE_send_err, // send request 失敗的次數 TRACE_recvmsg, // recvmsg 的次數 TRACE_sendmsg, // sendmsg 的次數 TRACE_accept_err, // accept 失敗的次數 TRACE_work_err // 建立 work 失敗的次數 }; struct runtime_state { atomic_t kzalloc_err, get_err; atomic_t send_err, recvmsg; atomic_t sendmsg, accept_err; atomic_t work_err; }; #define TRACE(ops) \ do { \ if (TRACE_##ops) \ atomic_add(1, &states.ops); \ } while (0) ``` 首先列舉的成員是為了讓巨集 `TRACE(ops)` 判斷是否真的有該成員,有的話就使用函式 `atomic_add` 加 `1` 在 `echo_server.c` 新增 `do_analysis` 用來印出資訊 ```c static void do_analysis(void) { __atomic_thread_fence(__ATOMIC_SEQ_CST); #define TRACE_PRINT(ops) printk(MODULE_NAME ": %s : %d\n", #ops, atomic_read(&states.ops)); TRACE_PRINT(recvmsg); TRACE_PRINT(sendmsg); TRACE_PRINT(kzalloc_err); TRACE_PRINT(get_err); TRACE_PRINT(send_err); TRACE_PRINT(accept_err); TRACE_PRINT(work_err); } ``` 最後印出結果 ```c [69825.652096] kecho: recvmsg : 40060 [69825.652098] kecho: sendmsg : 20030 [69825.652100] kecho: kzalloc_err : 0 [69825.652101] kecho: get_err : 0 [69825.652102] kecho: send_err : 0 [69825.652103] kecho: accept_err : 0 [69825.652105] kecho: work_err : 0 ``` 發現一件有趣的事, `recvmsg` 的數量居然是 `sendmsg` 的兩倍,嘗試將 `bench.c` 的執行緒數量改成 1 後並重新開啟 `printk` 再測試 ```c [69294.983057] kecho: start get response [69294.983068] kecho: get request = dummy message [69294.983073] kecho: start send request. [69294.983106] kecho: send request = dummy message [69294.983109] kecho: start get response [69294.983148] kecho: get request = ``` 參考 CS:APP 的伺服器架構,可以明顯發現,在中斷連線時, client 是會對 server 發出中斷請求,這也是為什麼實際收到資料的次數會是送出的 2 倍 ![](https://i.imgur.com/4hdltM0.png) 接著稍微修改函式 `get_request` ,主要判斷如果不是 `EOF` 的話就將接收資料的次數加 1 ```diff static int get_request(struct socket *sock, unsigned char *buf, size_t size) { struct msghdr msg; struct kvec vec; int length; /* kvec setting */ vec.iov_len = size; vec.iov_base = buf; /* msghdr setting */ msg.msg_name = 0; msg.msg_namelen = 0; msg.msg_control = NULL; msg.msg_controllen = 0; msg.msg_flags = 0; /* get msg */ length = kernel_recvmsg(sock, &msg, &vec, size, size, msg.msg_flags); + if (likely(length)) /* Not EOF */ + TRACE(recvmsg); + return length; } ``` 另外在結構 `runtime_state` 新增計算 shutdown 總數的變數 `shutdown` ,修改程式並重新測試 ```c [602673.678105] kecho: recvmsg : 10002 [602673.678106] kecho: sendmsg : 10002 [602673.678106] kecho: shutdown : 10001 [602673.678107] kecho: kzalloc_err : 0 [602673.678107] kecho: get_err : 0 [602673.678107] kecho: send_err : 0 [602673.678108] kecho: accept_err : 0 [602673.678108] kecho: work_err : 0 ``` 如此一來,已經符合接收和發送的總數 最後和修改之前的程式比較效能 - 含有 `printk` ![](https://i.imgur.com/IJcoEY1.png) - 不含 `printk` ![](https://i.imgur.com/BeAUUAr.png) 也特別用實驗室電腦測試 - 含有 `printk` ![](https://i.imgur.com/OERrEcA.png) - 不含 `printk` ![](https://i.imgur.com/2Us5KPj.png) 很明顯的, server 的效率變得更好了 ### 建立獨特字串 為了模擬更符合現實世界的情況,每次的測試字串也應當不同,參考了 [quiz8 - 測驗 `1`](https://hackmd.io/@sysprog/linux2022-quiz8/https%3A%2F%2Fhackmd.io%2F%40sysprog%2FHyg5nxO79) 裡提及的 [SWAR](https://en.wikipedia.org/wiki/SWAR) 手法,以下為產生隨機字串的程式碼 ```c #define GENRAND64(X) (((X) & 0x7F7F7F7F7F7F7F7F) | 0x2020202020202020) #define GENRAND8(X) (((X) & 0x7F) | 0x20) #define DETECT_NULL(X) (((X) -0x0101010101010101) & ~(X) & 0x8080808080808080) #define DETECT_CHAR(X, MASK) (DETECT_NULL((X) ^ (MASK))) static void GenRandString(char *str) { int size = (rand() & (MAX_MSG_LEN - 1)) + 1; uint64_t *lptr = (uint64_t *) str; while (size >= 8) { uint64_t rand64 = (uint64_t) rand() << 32 | rand(); *lptr = GENRAND64(rand64); // 如果偵測到 DEL if (DETECT_CHAR(*lptr, 0x7F7F7F7F7F7F7F7F)) continue; lptr++; size -= 8; } char *cptr = (char *) lptr; while (size) { *cptr = GENRAND8(rand()); // 如果產生 DEL if (*cptr == 0x7F) continue; cptr++; size--; } *cptr = '\0'; } ``` 上述的程式主要產生長度為 `1 ~ MAX_MSG_LEN` 的字串,當字串長度大於等於 8 時,會一次計算 8 個字元直到長度小於 8 為止,而選擇的字元範圍參考 [ASCII](https://zh.wikipedia.org/wiki/ASCII) 可顯示的字元,範圍為 `32 ~ 126` 透過巨集 `GENRAND64` 產生 8 個位元組且每個位元組的範圍的數值介在 `32 ~ 127` ,接著透過巨集 `DETECT_CHAR` 去偵測是否有位元組的值為 127 (為 [Delete 字元](https://zh.wikipedia.org/wiki/Delete%E5%AD%97%E7%AC%A6)),如果存在就重新產生一次 以下為部份產生的字串樣貌 ```shell str = )v/|,<%$;kgk9?/tcrtp'o7b9rg str = ?l:2ck{g,l&&0u$h}og~ str = !h+fjmo!l-s#= )w7kv!'t4wp*dsy)y( str = x9|644 tm&vu4~zfj str = -zug!v6 1k;h:#j(xdn3y&{m!!v` str = j)zme2:-#{rs?!rm%}0, str = =&?4ex7|yvr#t+#&k>tll str = %:-;}ngs~<~sg6;6 str = =~z#(nw ``` ### 傳遞過長的字串給 `kecho` 首先使用命令 `telnet localhost 12345` 測試 `kecho` 可以回傳的長度的限制。直接講結論,當輸入的字串長度大於 4095 時,可以很明顯觀察到回傳的字串是有缺少的,可以從以下 `kecho` 的程式碼觀察 ```c length = kernel_recvmsg(sock, &msg, &vec, size, size, msg.msg_flags); ``` 函式 `kernel_recvmsg` 是 `kecho` 模組中用來接收資料的函式,其中從右數來第二個參數 `size` 是用來決定接收資料的最長長度,而這裡的大小為巨集 `BUF_SIZE` 所定義 ```c #define BUF_SIZE 4096 ``` 因此最後可以得知目前實作的限制,即接收長度不能超過 4095 ,否則會產生缺少字元的風險 :::warning TODO: 思考如何解決字串長度太大的問題中... ::: ### 參照 [kecho pull request #1](https://github.com/sysprog21/kecho/pull/1)