# 2020q1 Homework5 (kecho) contributed by < `OscarShiang` > ###### tags: `linux2020` ## 測試環境 ```shell $ cat /etc/os-release NAME="Ubuntu" VERSION="18.04.4 LTS (Bionic Beaver)" $ cat /proc/version Linux version 5.3.0-40-generic (buildd@lcy01-amd64-024) (gcc version 7.4.0 (Ubuntu 7.4.0-1ubuntu1~18.04.1)) $ lscpu Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Byte Order: Little Endian CPU(s): 4 On-line CPU(s) list: 0-3 Thread(s) per core: 2 Core(s) per socket: 2 Socket(s): 1 NUMA node(s): 1 Vendor ID: GenuineIntel CPU family: 6 Model: 142 Model name: Intel(R) Core(TM) i5-7267U CPU @ 3.10GHz Stepping: 9 CPU MHz: 3092.228 CPU max MHz: 3500.0000 CPU min MHz: 400.0000 BogoMIPS: 6199.99 Virtualization: VT-x L1d cache: 32K L1i cache: 32K L2 cache: 256K L3 cache: 4096K NUMA node0 CPU(s): 0-3 ``` ## 作業要求 - [ ] 給定的 `kecho` 已使用 CMWQ,請陳述其優勢和用法 - [ ] 核心文件 [Concurrency Managed Workqueue (cmwq)](https://www.kernel.org/doc/html/latest/core-api/workqueue.html) 提到 "The original create_`*`workqueue() functions are deprecated and scheduled for removal",請參閱 Linux 核心的 git log,揣摩 Linux 核心開發者的考量 - [ ] 解釋 `user-echo-server` 運作原理,特別是 [epoll](http://man7.org/linux/man-pages/man7/epoll.7.html) 系統呼叫的使用 - [ ] 是否理解 `bench` 原理,能否比較 `kecho` 和 `user-echo-server` 表現?佐以製圖 - [ ] 解釋 `drop-tcp-socket` 核心模組運作原理。`TIME-WAIT` sockets 又是什麼? - [ ] 修正 `kecho` 的執行時期的缺失,提升效能和穩健度 (robustness) ## 使用 CMWQ 的優勢 使用 CMWQ 主要有以下這些優勢: - 對 CPU 的資源做更有效率的運用,原先的實作不能讓程式在不同的 CPU 之間進行切換,但是在 CWMQ 中提供 Unbounded thread 讓程序可以切換到 idle 的 CPU 上,增加對系統資源的使用率。 - 讓開發者不需要對核心執行緒進行管理,在 CWMQ 中會透過管理 Thread pool 來進行,避免因為 daemon 動態建立過多的執行緒而導致的問題。 - 當有任務長時間佔用系統資源時,CMWQ 會建立新的執行緒並分配任務到其他的 CPU 來執行。 ## CMWQ 的使用方式 為了在核心模組中引入 CMWQ,我們會需要使用到 `<linux/workqueue.h>` 中的這些函式: 1. `alloc_workqueue` : 在初始化模組時用來建立一個 workqueue 2. `destroy_workqueue` : 用來釋放 workqueue 3. `queue_work` : 將任務放入 workqueue 中排程 4. `INIT_WORK` : 用以初始化任務 因為考慮到如果在 workqueue 不是空的情況時將核心模組卸載時會發生 memory leak 的情況,所以需要使用 Kernel API 所提供的 linked list 來管理所有的任務實體,以便在卸載模組的時候可以將這些任務釋放掉。 ## 改以 `alloc_workqueue` 實作 workqueue 的原因 在 kernel document 中所提到 > “The original create_*workqueue() functions are deprecated and scheduled for removal” 而這個改動最主要的原因是為了簡化新增 workqueue 特徵的流程。 在 `linux/workqueue.h` 的 [commit log](https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git/log/include/linux/workqueue.h) 中可以看到,在新增 `alloc_workqueue` 這個函式之前,如果要增加 workqueue 實作的功能時,往往需要更動到 `create_workqueue` 相關 macro 的參數與相關實作。 如 [workqueue: introduce create_rt_workqueue](https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git/commit/include/linux/workqueue.h?id=0d557dc97f4bb501f086a03d0f00b99a7855d794) 這個 commit 中,為了新增 real time prioritized workqueue,就需要將 `create_*workqueue()` 的 macro 與其連帶的參數進行修改。從這裡就可以看出為了在 workqueue 中新增新的特徵所需付出的代價。 ```diff diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h index 5c158c4..89a5a12 100644 --- a/include/linux/workqueue.h +++ b/include/linux/workqueue.h @@ -149,11 +149,11 @@ struct execute_work { extern struct workqueue_struct * __create_workqueue_key(const char *name, int singlethread, - int freezeable, struct lock_class_key *key, + int freezeable, int rt, struct lock_class_key *key, const char *lock_name); #ifdef CONFIG_LOCKDEP -#define __create_workqueue(name, singlethread, freezeable) \ +#define __create_workqueue(name, singlethread, freezeable, rt) \ ({ \ static struct lock_class_key __key; \ const char *__lock_name; \ @@ -164,17 +164,19 @@ __create_workqueue_key(const char *name, int singlethread, __lock_name = #name; \ \ __create_workqueue_key((name), (singlethread), \ - (freezeable), &__key, \ + (freezeable), (rt), &__key, \ __lock_name); \ }) #else -#define __create_workqueue(name, singlethread, freezeable) \ - __create_workqueue_key((name), (singlethread), (freezeable), NULL, NULL) +#define __create_workqueue(name, singlethread, freezeable, rt) \ + __create_workqueue_key((name), (singlethread), (freezeable), (rt), \ + NULL, NULL) #endif -#define create_workqueue(name) __create_workqueue((name), 0, 0) -#define create_freezeable_workqueue(name) __create_workqueue((name), 1, 1) -#define create_singlethread_workqueue(name) __create_workqueue((name), 1, 0) +#define create_workqueue(name) __create_workqueue((name), 0, 0, 0) +#define create_rt_workqueue(name) __create_workqueue((name), 0, 0, 1) +#define create_freezeable_workqueue(name) __create_workqueue((name), 1, 1, 0) +#define create_singlethread_workqueue(name) __create_workqueue((name), 1, 0, 0) ``` 而當 workqueue 的功能與特徵越來越多後,這樣的修改方式就顯得不太具效率。所以他們引入新的函式 `alloc_workqueue` 來取代以往的 `create_*workqueue` 函式。 在 `alloc_workqueue` 的函式中,可以利用 `flag` 參數設定不同的特徵,並可以利用 bitwise 的方式組合多種特徵。如果要實作新的特徵,只需要透過新增 `flag` 即可。 就如同 [workqueue: implement high priority workqueue](https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git/commit/include/linux/workqueue.h?id=649027d73a6309ac34dc2886362e662bd73456dc) 這個 commit,在 `workqueue.h` 的更動就只有加入一個新的 `enum`。 ```diff diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h index 0a7f797..006dcf7 100644 --- a/include/linux/workqueue.h +++ b/include/linux/workqueue.h @@ -231,6 +231,7 @@ enum { WQ_SINGLE_CPU = 1 << 1, /* only single cpu at a time */ WQ_NON_REENTRANT = 1 << 2, /* guarantee non-reentrance */ WQ_RESCUER = 1 << 3, /* has an rescue worker */ + WQ_HIGHPRI = 1 << 4, /* high priority */ WQ_MAX_ACTIVE = 512, /* I like 512, better ideas? */ WQ_DFL_ACTIVE = WQ_MAX_ACTIVE / 2, ``` ## user-echo-server 運作原理 首先在開啟 server 之前,我們需要先建立 socket 與建立監聽的機制,所以我們使用 `socket` 建立一個 socket 的 file descriptor ```cpp listener = socket(PF_INET, SOCK_STREAM, 0) ``` 根據 [accept(2) - Linux manual page](http://man7.org/linux/man-pages/man2/accept.2.html) 的描述: > **If no pending connections are present on the queue, and the socket is not marked as nonblocking, accept() blocks the caller until a connection is present.** If the socket is marked nonblocking and no pending connections are present on the queue, accept() fails with the error EAGAIN or EWOULDBLOCK. 但是我們希望 server 能處理連線的請求與訊息的發送,所以 server 不應該被 block 在執行 accept 的時候。因此我們使用 `setnonblock` 函式將 server socket 的 file descriptor 利用 `fctrl` 設定為 non-blocking。 ```cpp static int setnonblock(int fd) { int fdflags; if ((fdflags = fcntl(fd, F_GETFL, 0)) == -1) return -1; fdflags |= O_NONBLOCK; if (fcntl(fd, F_SETFL, fdflags) == -1) return -1; return 0; } ``` 接著我們需要設定 server 所使用的 port 與連線的限制,所以我們呼叫 `bind` 將 socket 與 `scoket_addr` 的設定 bind 在一起。 ```cpp bind(listener, (struct sockaddr *) &addr, sizeof(addr); ``` 再來我們讓 socket 開始監聽,並將 socket 的 file descriptor 用 `epoll_ctl` 加入 epoll 監控的列表之中,以對使用者透過 socket 建立連線時可以進行處理。 ```cpp if (listen(listener, 128) < 0) server_err("Fail to listen", &list); int epoll_fd; if ((epoll_fd = epoll_create(EPOLL_SIZE)) < 0) server_err("Fail to create epoll", &list); static struct epoll_event ev = {.events = EPOLLIN | EPOLLET}; ev.data.fd = listener; if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listener, &ev) < 0) server_err("Fail to control epoll", &list); printf("Listener (fd=%d) was added to epoll.\n", epoll_fd); ``` 進入 while 迴圈之後,利用 `epoll_wait` 將 server 的執行暫停,等待 epoll 所監測的列表有事件產生(如使用者建立連線等等)的時候才進行接下來的處理。 當 epoll 監測到有事件發生時,會進入到 for 迴圈的部分,首先會先檢查該事件的 file descriptor 是不是 listener,如果是 listener,就表示有新的連線要求,所以我們利用 `accept` 建立 client 的 file descriptor 並加入 epoll 的列表中追蹤其使用。 如果是以連線的 client 所觸發的事件,則呼叫 `handle_message_from_client` 函式來處理。 在 `handle_message_from_client` 中,使用 `recv` 呼叫從 client 端接收資料到 buf 中,根據從 client 端接收到的資料長度,也就是 `recv` 的回傳值判斷 client 的連接狀況: 1. 長度大於 0 : 正常取得資料。 2. 長度等於 0 : 沒有資料傳入,因為我們使用的是 non-blocking 的操作,所以需要直接切斷連線,如果不是 non-blocking 的話,會進入等待。 3. 長度小於 0 : 發生錯誤 ```cpp static int handle_message_from_client(int client, client_list_t **list) { int len; char buf[BUF_SIZE]; memset(buf, 0, BUF_SIZE); if ((len = recv(client, buf, BUF_SIZE, 0)) < 0) server_err("Fail to receive", list); if (len == 0) { if (close(client) < 0) server_err("Fail to close", list); *list = delete_client(list, client); printf("After fd=%d is closed, current numbers clients = %d\n", client, size_list(*list)); } else { printf("Client #%d :> %s", client, buf); if (send(client, buf, BUF_SIZE, 0) < 0) server_err("Fail to send", list); } return len; } ``` 如果 server 正常的接收到資料時,就將寫入 buf 的內容利用 `send` 呼叫傳送回 client 端。 若 server 端沒有收到 `SIGTERM` 等中斷的訊號的話,就會繼續提供 client 連線,直到使用者中斷程式。 ## `user-echo-server` 可能會發生的問題 在 `user-echo-server.c:handle_message_from_client` 函式中,使用到 `recv` 與 `send` 來收發資料,考慮到 client 有可能會傳送超過 `BUF_SIZE` 長度的資料,`buf` 在使用的過程中可能會出現沒有 `'\0'` 的情況。 但在這個實作中,因為 server 的用途只是單純地將收到的資料回傳給 client,而且在 `recv` 與 `send` 的使用上都有設定接收與傳送的上限,所以在不進行 `buf` 內容操作的情況下並不會有任何問題產生。 但是如果要對 `buf` 的資料進行操作,像是使用 `strlen` 計算 `buf` 長度等等,就會發生超出邊界的問題,或甚至產生 segmentation fault 而中斷 server 的運行。 ## `bench` 程式運作的原理 與 `khttpd/htstress.c` 有類似的作用,不過在 bench 中我們比較在意的是對於同時處理多個客戶端連線的效能表現。 在 bench 程式中,首先在 `bench` 函式裡面使用 `create_worker` 建立 `MAX_THREAD` 個數的執行緒,並使用 `pthread_mutex_lock` 的方式在所有執行緒準備好了之後才一起進行測試。 ```cpp static void bench(void) { for (int i = 0; i < BENCH_COUNT; i++) { ready = false; create_worker(MAX_THREAD); pthread_mutex_lock(&worker_lock); ready = true; /* all workers are ready, let's start bombing kecho */ pthread_cond_broadcast(&worker_wait); pthread_mutex_unlock(&worker_lock); /* waiting for all workers to finish the measurement */ for (int x = 0; x < MAX_THREAD; x++) pthread_join(pt[x], NULL); idx = 0; } for (int i = 0; i < MAX_THREAD; i++) fprintf(bench_fd, "%d %ld\n", i, time_res[i] /= BENCH_COUNT); } ``` 在 `bench_worker` 函式中可以看到對 `woker_lock` 進行 `pthread_mutex_lock` 的行為: ```cpp static void *bench_worker(__attribute__((unused))) { int sock_fd; char dummy[MAX_MSG_LEN]; struct timeval start, end; /* wait until all workers created */ pthread_mutex_lock(&worker_lock); while (!ready) if (pthread_cond_wait(&worker_wait, &worker_lock)) { puts("pthread_cond_wait failed"); exit(-1); } pthread_mutex_unlock(&worker_lock); ... pthread_mutex_lock(&res_lock); time_res[idx++] += time_diff_us(&start, &end); pthread_mutex_unlock(&res_lock); pthread_exit(NULL); } ``` 接著每個執行緒就會開始與 server 建立連線,並使用 `struct timeval` 記錄傳送資料與收到資料所花費的時間,將結果儲存在 `time_res` 的陣列之中。 為了避免資料競爭的情況,在寫入結果前使用 `pthread_mutex_lock` 保護資料。 使用 `pthread_join` 確認每個執行緒都已執行完畢後,利用 file stream 將測試的結果寫入 `bench.txt` 中。 ## 比較 kecho 與 user-echo-server 效能表現 ### user-echo-server 為了避免在運行 user mode 的 server 時遇到資料遷移的問題,我將 `user-echo-server` 使用 `taskset` 綁定 CPU0 進行測試(已透過 `isolcpus` 將 CPU0 從開機任務排除)。 ![](https://i.imgur.com/7sUE7ji.png) 因為 `bench` 所記錄的時間是從 client 送出資料到收到 server 端資料回傳的時間,而且因為 `user-echo-server` 並不是一個並行的 server,所以在 epoll 同時收到多個 client 的事件時,會依據 file descriptor 位於 epoll_ev 陣列的位置依序執行,這樣的作法有可能導致處理速度被拖慢,進而造成測量時間超出 10000 ns 的情況發生,但整體而言仍有超過 90% 的測量時間都小於 10000 ns。 ### kecho ![](https://i.imgur.com/vF8FMM6.png) 因為 kecho 在實作上考慮到 concurrency 的議題,使用 CMWQ 將任務有效率的分配給執行緒執行,並得益於執行在 kernel space 上的效能,所以在整體的執行時間上,是小於 `user-echo-server` 的。但仍看得出在多個連線並行的情況下,server 的回應是會受到影響的。 ## 改寫在 benchmarking 時 kecho 的行為 因為在原先的實作中, `kecho` 在接收與傳送封包之前都會使用 `printk` 來印出訊息。因為 `printk` 為了確保訊息能夠連續的被寫入在 kernel 的 ring buffer 上,會導致回應時間因此被拖慢。 所以我使用 macro 並在 `Makefile` 中加入參數,當我們以 `make BENCH=1` 的方式來編譯模組的時候,就會關閉收發訊息的 `printk` 函式。 關閉 `printk` 之後 `kecho` 的表現如下: ![](https://i.imgur.com/Z5P7Ddd.png) 可以看到在關閉 `printk` 後,因為減去對 ring buffer 作 atomic write 的時間,在處理 requests 的時間花費較原本含有 `printk` 的版本的時間花費更少。 ## 比較 kthread 實作版本與 CMWQ 實作版本的執行差異 根據 [kecho pull request #1](https://github.com/sysprog21/kecho/pull/1) 指出 CMWQ 版本的實作得益於 locality 以及即時可用的執行緒使得 server 的執行時間可以比 kthread 的版本還要好。 我利用 [這個 commit](https://github.com/sysprog21/kecho/commit/7038c2acd46044afe80ecbd97ad6547455dfd21d) 並修正其問題改寫為 [kthread-based kecho](https://github.com/OscarShiang/kecho/tree/kthread_impl) 並與目前使用 CMWQ 所實作的版本比較兩者的差異。 ### kthread-based 實作的測試結果 ![](https://i.imgur.com/AKuMw7s.png) ### CMWQ 實作的測試結果 ![](https://i.imgur.com/NVGmIk8.png) 可以看到兩者在執行上的差異非常大,主要的原因就是因為 kthread-based 的版本在收到 client 連線請求之後才會開始處理 kthread 的建立。而 CMWQ 因為讓 workqueue 可以依照當前任務執行的狀態來分配對應的執行緒,而且 workqueue 使用 thread pool 的概念,因此不需要額外再重新建立新的 kthread,在 client 連線時可以直接將 worker 函式分配給空閒的執行緒來進行。省去了建立 kthread 所耗費的時間成本,在大量的連線湧入時更能體現其好處。 ## 測量 kthread 建立成本 因為在 [kecho pull request #1](https://github.com/sysprog21/kecho/pull/1) 中有提到 kthread-based 的 kecho 在回應時間上有很大的比例是受到 kthread 建立的成本影響,為了瞭解其實際造成影響的比例,我利用 ePBF 來測量建立 `kthread_run` 的時間成本。 為了測量 kthread 建立的成本,使用 `my_thread_run` 來包裝 `kthread_run` 讓 ePBF 可以將測量的動作注入在該函式裡面: ```python from bcc import BPF code = """ #include <uapi/linux/ptrace.h> BPF_HASH(start, u64); int probe_handler(struct pt_regs *ctx) { u64 ts = bpf_ktime_get_ns(); bpf_trace_printk("in %llu\\n",ts); return 0; } int end_function(struct pt_regs *ctx) { u64 ts = bpf_ktime_get_ns(); bpf_trace_printk("out %llu\\n",ts); return 0; } """ b = BPF(text = code) b.attach_kprobe(event = 'my_thread_run', fn_name = 'probe_handler') b.attach_kretprobe(event = 'my_thread_run', fn_name = 'end_function') while True: try: res = b.trace_fields() except ValueError: continue print(res[5].decode("UTF-8")) ``` 在注入 kprobe 之後,執行 `bench` 進行測試: ![](https://i.imgur.com/b9lct0F.png) 可以發現 `kthread_run` 的執行成本大約為 200 us。 ## 參考資料 1. [workqueue: implement high priority workqueue](https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git/commit/include/linux/workqueue.h?id=649027d73a6309ac34dc2886362e662bd73456dc) 2. [linux/workqueue.h - git commit log](https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git/log/include/linux/workqueue.h) 3. [Reimplementing printk()](https://lwn.net/Articles/780556/) 4. [kecho pull request #1](https://github.com/sysprog21/kecho/pull/1)