contributed by < OscarShiang
>
linux2020
$ cat /etc/os-release
NAME="Ubuntu"
VERSION="18.04.4 LTS (Bionic Beaver)"
$ cat /proc/version
Linux version 5.3.0-40-generic (buildd@lcy01-amd64-024)
(gcc version 7.4.0 (Ubuntu 7.4.0-1ubuntu1~18.04.1))
$ lscpu
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 4
On-line CPU(s) list: 0-3
Thread(s) per core: 2
Core(s) per socket: 2
Socket(s): 1
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 142
Model name: Intel(R) Core(TM) i5-7267U CPU @ 3.10GHz
Stepping: 9
CPU MHz: 3092.228
CPU max MHz: 3500.0000
CPU min MHz: 400.0000
BogoMIPS: 6199.99
Virtualization: VT-x
L1d cache: 32K
L1i cache: 32K
L2 cache: 256K
L3 cache: 4096K
NUMA node0 CPU(s): 0-3
kecho
已使用 CMWQ,請陳述其優勢和用法*
workqueue() functions are deprecated and scheduled for removal",請參閱 Linux 核心的 git log,揣摩 Linux 核心開發者的考量user-echo-server
運作原理,特別是 epoll 系統呼叫的使用bench
原理,能否比較 kecho
和 user-echo-server
表現?佐以製圖drop-tcp-socket
核心模組運作原理。TIME-WAIT
sockets 又是什麼?kecho
的執行時期的缺失,提升效能和穩健度 (robustness)使用 CMWQ 主要有以下這些優勢:
為了在核心模組中引入 CMWQ,我們會需要使用到 <linux/workqueue.h>
中的這些函式:
alloc_workqueue
: 在初始化模組時用來建立一個 workqueuedestroy_workqueue
: 用來釋放 workqueuequeue_work
: 將任務放入 workqueue 中排程INIT_WORK
: 用以初始化任務因為考慮到如果在 workqueue 不是空的情況時將核心模組卸載時會發生 memory leak 的情況,所以需要使用 Kernel API 所提供的 linked list 來管理所有的任務實體,以便在卸載模組的時候可以將這些任務釋放掉。
alloc_workqueue
實作 workqueue 的原因在 kernel document 中所提到
“The original create_*workqueue() functions are deprecated and scheduled for removal”
而這個改動最主要的原因是為了簡化新增 workqueue 特徵的流程。
在 linux/workqueue.h
的 commit log 中可以看到,在新增 alloc_workqueue
這個函式之前,如果要增加 workqueue 實作的功能時,往往需要更動到 create_workqueue
相關 macro 的參數與相關實作。
如 workqueue: introduce create_rt_workqueue 這個 commit 中,為了新增 real time prioritized workqueue,就需要將 create_*workqueue()
的 macro 與其連帶的參數進行修改。從這裡就可以看出為了在 workqueue 中新增新的特徵所需付出的代價。
diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 5c158c4..89a5a12 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -149,11 +149,11 @@ struct execute_work {
extern struct workqueue_struct *
__create_workqueue_key(const char *name, int singlethread,
- int freezeable, struct lock_class_key *key,
+ int freezeable, int rt, struct lock_class_key *key,
const char *lock_name);
#ifdef CONFIG_LOCKDEP
-#define __create_workqueue(name, singlethread, freezeable) \
+#define __create_workqueue(name, singlethread, freezeable, rt) \
({ \
static struct lock_class_key __key; \
const char *__lock_name; \
@@ -164,17 +164,19 @@ __create_workqueue_key(const char *name, int singlethread,
__lock_name = #name; \
\
__create_workqueue_key((name), (singlethread), \
- (freezeable), &__key, \
+ (freezeable), (rt), &__key, \
__lock_name); \
})
#else
-#define __create_workqueue(name, singlethread, freezeable) \
- __create_workqueue_key((name), (singlethread), (freezeable), NULL, NULL)
+#define __create_workqueue(name, singlethread, freezeable, rt) \
+ __create_workqueue_key((name), (singlethread), (freezeable), (rt), \
+ NULL, NULL)
#endif
-#define create_workqueue(name) __create_workqueue((name), 0, 0)
-#define create_freezeable_workqueue(name) __create_workqueue((name), 1, 1)
-#define create_singlethread_workqueue(name) __create_workqueue((name), 1, 0)
+#define create_workqueue(name) __create_workqueue((name), 0, 0, 0)
+#define create_rt_workqueue(name) __create_workqueue((name), 0, 0, 1)
+#define create_freezeable_workqueue(name) __create_workqueue((name), 1, 1, 0)
+#define create_singlethread_workqueue(name) __create_workqueue((name), 1, 0, 0)
而當 workqueue 的功能與特徵越來越多後,這樣的修改方式就顯得不太具效率。所以他們引入新的函式 alloc_workqueue
來取代以往的 create_*workqueue
函式。
在 alloc_workqueue
的函式中,可以利用 flag
參數設定不同的特徵,並可以利用 bitwise 的方式組合多種特徵。如果要實作新的特徵,只需要透過新增 flag
即可。
就如同 workqueue: implement high priority workqueue 這個 commit,在 workqueue.h
的更動就只有加入一個新的 enum
。
diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 0a7f797..006dcf7 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -231,6 +231,7 @@ enum {
WQ_SINGLE_CPU = 1 << 1, /* only single cpu at a time */
WQ_NON_REENTRANT = 1 << 2, /* guarantee non-reentrance */
WQ_RESCUER = 1 << 3, /* has an rescue worker */
+ WQ_HIGHPRI = 1 << 4, /* high priority */
WQ_MAX_ACTIVE = 512, /* I like 512, better ideas? */
WQ_DFL_ACTIVE = WQ_MAX_ACTIVE / 2,
首先在開啟 server 之前,我們需要先建立 socket 與建立監聽的機制,所以我們使用 socket
建立一個 socket 的 file descriptor
listener = socket(PF_INET, SOCK_STREAM, 0)
根據 accept(2) - Linux manual page 的描述:
If no pending connections are present on the queue, and the socket is not marked as nonblocking, accept() blocks the caller until a connection is present. If the socket is marked nonblocking and no pending connections are present on the queue, accept() fails with the error EAGAIN or EWOULDBLOCK.
但是我們希望 server 能處理連線的請求與訊息的發送,所以 server 不應該被 block 在執行 accept 的時候。因此我們使用 setnonblock
函式將 server socket 的 file descriptor 利用 fctrl
設定為 non-blocking。
static int setnonblock(int fd)
{
int fdflags;
if ((fdflags = fcntl(fd, F_GETFL, 0)) == -1)
return -1;
fdflags |= O_NONBLOCK;
if (fcntl(fd, F_SETFL, fdflags) == -1)
return -1;
return 0;
}
接著我們需要設定 server 所使用的 port 與連線的限制,所以我們呼叫 bind
將 socket 與 scoket_addr
的設定 bind 在一起。
bind(listener, (struct sockaddr *) &addr, sizeof(addr);
再來我們讓 socket 開始監聽,並將 socket 的 file descriptor 用 epoll_ctl
加入 epoll 監控的列表之中,以對使用者透過 socket 建立連線時可以進行處理。
if (listen(listener, 128) < 0)
server_err("Fail to listen", &list);
int epoll_fd;
if ((epoll_fd = epoll_create(EPOLL_SIZE)) < 0)
server_err("Fail to create epoll", &list);
static struct epoll_event ev = {.events = EPOLLIN | EPOLLET};
ev.data.fd = listener;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listener, &ev) < 0)
server_err("Fail to control epoll", &list);
printf("Listener (fd=%d) was added to epoll.\n", epoll_fd);
進入 while 迴圈之後,利用 epoll_wait
將 server 的執行暫停,等待 epoll 所監測的列表有事件產生(如使用者建立連線等等)的時候才進行接下來的處理。
當 epoll 監測到有事件發生時,會進入到 for 迴圈的部分,首先會先檢查該事件的 file descriptor 是不是 listener,如果是 listener,就表示有新的連線要求,所以我們利用 accept
建立 client 的 file descriptor 並加入 epoll 的列表中追蹤其使用。
如果是以連線的 client 所觸發的事件,則呼叫 handle_message_from_client
函式來處理。
在 handle_message_from_client
中,使用 recv
呼叫從 client 端接收資料到 buf 中,根據從 client 端接收到的資料長度,也就是 recv
的回傳值判斷 client 的連接狀況:
static int handle_message_from_client(int client, client_list_t **list)
{
int len;
char buf[BUF_SIZE];
memset(buf, 0, BUF_SIZE);
if ((len = recv(client, buf, BUF_SIZE, 0)) < 0)
server_err("Fail to receive", list);
if (len == 0) {
if (close(client) < 0)
server_err("Fail to close", list);
*list = delete_client(list, client);
printf("After fd=%d is closed, current numbers clients = %d\n", client,
size_list(*list));
} else {
printf("Client #%d :> %s", client, buf);
if (send(client, buf, BUF_SIZE, 0) < 0)
server_err("Fail to send", list);
}
return len;
}
如果 server 正常的接收到資料時,就將寫入 buf 的內容利用 send
呼叫傳送回 client 端。
若 server 端沒有收到 SIGTERM
等中斷的訊號的話,就會繼續提供 client 連線,直到使用者中斷程式。
user-echo-server
可能會發生的問題在 user-echo-server.c:handle_message_from_client
函式中,使用到 recv
與 send
來收發資料,考慮到 client 有可能會傳送超過 BUF_SIZE
長度的資料,buf
在使用的過程中可能會出現沒有 '\0'
的情況。
但在這個實作中,因為 server 的用途只是單純地將收到的資料回傳給 client,而且在 recv
與 send
的使用上都有設定接收與傳送的上限,所以在不進行 buf
內容操作的情況下並不會有任何問題產生。
但是如果要對 buf
的資料進行操作,像是使用 strlen
計算 buf
長度等等,就會發生超出邊界的問題,或甚至產生 segmentation fault 而中斷 server 的運行。
bench
程式運作的原理與 khttpd/htstress.c
有類似的作用,不過在 bench 中我們比較在意的是對於同時處理多個客戶端連線的效能表現。
在 bench 程式中,首先在 bench
函式裡面使用 create_worker
建立 MAX_THREAD
個數的執行緒,並使用 pthread_mutex_lock
的方式在所有執行緒準備好了之後才一起進行測試。
static void bench(void)
{
for (int i = 0; i < BENCH_COUNT; i++) {
ready = false;
create_worker(MAX_THREAD);
pthread_mutex_lock(&worker_lock);
ready = true;
/* all workers are ready, let's start bombing kecho */
pthread_cond_broadcast(&worker_wait);
pthread_mutex_unlock(&worker_lock);
/* waiting for all workers to finish the measurement */
for (int x = 0; x < MAX_THREAD; x++)
pthread_join(pt[x], NULL);
idx = 0;
}
for (int i = 0; i < MAX_THREAD; i++)
fprintf(bench_fd, "%d %ld\n", i, time_res[i] /= BENCH_COUNT);
}
在 bench_worker
函式中可以看到對 woker_lock
進行 pthread_mutex_lock
的行為:
static void *bench_worker(__attribute__((unused)))
{
int sock_fd;
char dummy[MAX_MSG_LEN];
struct timeval start, end;
/* wait until all workers created */
pthread_mutex_lock(&worker_lock);
while (!ready)
if (pthread_cond_wait(&worker_wait, &worker_lock)) {
puts("pthread_cond_wait failed");
exit(-1);
}
pthread_mutex_unlock(&worker_lock);
...
pthread_mutex_lock(&res_lock);
time_res[idx++] += time_diff_us(&start, &end);
pthread_mutex_unlock(&res_lock);
pthread_exit(NULL);
}
接著每個執行緒就會開始與 server 建立連線,並使用 struct timeval
記錄傳送資料與收到資料所花費的時間,將結果儲存在 time_res
的陣列之中。
為了避免資料競爭的情況,在寫入結果前使用 pthread_mutex_lock
保護資料。
使用 pthread_join
確認每個執行緒都已執行完畢後,利用 file stream 將測試的結果寫入 bench.txt
中。
為了避免在運行 user mode 的 server 時遇到資料遷移的問題,我將 user-echo-server
使用 taskset
綁定 CPU0 進行測試(已透過 isolcpus
將 CPU0 從開機任務排除)。
因為 bench
所記錄的時間是從 client 送出資料到收到 server 端資料回傳的時間,而且因為 user-echo-server
並不是一個並行的 server,所以在 epoll 同時收到多個 client 的事件時,會依據 file descriptor 位於 epoll_ev 陣列的位置依序執行,這樣的作法有可能導致處理速度被拖慢,進而造成測量時間超出 10000 ns 的情況發生,但整體而言仍有超過 90% 的測量時間都小於 10000 ns。
因為 kecho 在實作上考慮到 concurrency 的議題,使用 CMWQ 將任務有效率的分配給執行緒執行,並得益於執行在 kernel space 上的效能,所以在整體的執行時間上,是小於 user-echo-server
的。但仍看得出在多個連線並行的情況下,server 的回應是會受到影響的。
因為在原先的實作中, kecho
在接收與傳送封包之前都會使用 printk
來印出訊息。因為 printk
為了確保訊息能夠連續的被寫入在 kernel 的 ring buffer 上,會導致回應時間因此被拖慢。
所以我使用 macro 並在 Makefile
中加入參數,當我們以 make BENCH=1
的方式來編譯模組的時候,就會關閉收發訊息的 printk
函式。
關閉 printk
之後 kecho
的表現如下:
可以看到在關閉 printk
後,因為減去對 ring buffer 作 atomic write 的時間,在處理 requests 的時間花費較原本含有 printk
的版本的時間花費更少。
根據 kecho pull request #1 指出 CMWQ 版本的實作得益於 locality 以及即時可用的執行緒使得 server 的執行時間可以比 kthread 的版本還要好。
我利用 這個 commit 並修正其問題改寫為 kthread-based kecho 並與目前使用 CMWQ 所實作的版本比較兩者的差異。
可以看到兩者在執行上的差異非常大,主要的原因就是因為 kthread-based 的版本在收到 client 連線請求之後才會開始處理 kthread 的建立。而 CMWQ 因為讓 workqueue 可以依照當前任務執行的狀態來分配對應的執行緒,而且 workqueue 使用 thread pool 的概念,因此不需要額外再重新建立新的 kthread,在 client 連線時可以直接將 worker 函式分配給空閒的執行緒來進行。省去了建立 kthread 所耗費的時間成本,在大量的連線湧入時更能體現其好處。
因為在 kecho pull request #1 中有提到 kthread-based 的 kecho 在回應時間上有很大的比例是受到 kthread 建立的成本影響,為了瞭解其實際造成影響的比例,我利用 ePBF 來測量建立 kthread_run
的時間成本。
為了測量 kthread 建立的成本,使用 my_thread_run
來包裝 kthread_run
讓 ePBF 可以將測量的動作注入在該函式裡面:
from bcc import BPF
code = """
#include <uapi/linux/ptrace.h>
BPF_HASH(start, u64);
int probe_handler(struct pt_regs *ctx)
{
u64 ts = bpf_ktime_get_ns();
bpf_trace_printk("in %llu\\n",ts);
return 0;
}
int end_function(struct pt_regs *ctx)
{
u64 ts = bpf_ktime_get_ns();
bpf_trace_printk("out %llu\\n",ts);
return 0;
}
"""
b = BPF(text = code)
b.attach_kprobe(event = 'my_thread_run', fn_name = 'probe_handler')
b.attach_kretprobe(event = 'my_thread_run', fn_name = 'end_function')
while True:
try:
res = b.trace_fields()
except ValueError:
continue
print(res[5].decode("UTF-8"))
在注入 kprobe 之後,執行 bench
進行測試:
可以發現 kthread_run
的執行成本大約為 200 us。