# 2025q1 Homework6 (ktcp)
contributed by <`leowu0411`>
{%hackmd NrmQUGbRQWemgwPfhzXj6g %}
## kecho
### Socket setup
首先看到 `kecho` 初始化 socket 的方式與 cs:app 所使用之系統呼叫 `socket()`相異:不同於使用者模式將 socket 視為描述符,在核心模式下,需針對直接指向 socket 結構之的指標,呼叫 kernel API 進行初始化設定
```c
struct socket *sock;
struct sockaddr_in addr;
// PF_INET: IPv4, SOCK_STREAM: TCP, IPPROTO_TCP: Protocol = TCP
sock_create(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
// TCP_NODELAY : Disables Nagle's algorithm
kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *) &opt,
sizeof(opt));
// SO_REUSEPORT: Allows multiple sockets to bind to the same port
kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, (char *) &opt,
sizeof(opt));
```
接著呼叫 `kernel_bind()` 將地址資訊與此 `socket` 結構進行綁定,也可以對照到使用者模式中的系統呼叫 `bind()`
```c
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_ANY);
addr.sin_port = htons(port);
kernel_bind(sock, (struct sockaddr *) &addr, sizeof(addr));
```
最後呼叫 `kernel_listen`(對應於系統呼叫 `listen()`) 將 `socket` 自 active socket 轉換為 listening socket,表示此 socket 能夠用於接收請求,而 `backlog` 參數設定在核心拒絕連線請求之前,隊伍中未完成連接請求的數量,這裡設為 default 128。
```c
kernel_listen(sock, backlog);
```
### workqueue setup
```c
kecho_wq = alloc_workqueue(MODULE_NAME, bench ? 0 : WQ_UNBOUND, 0);
```
`kecho` 核心模組能夠透過參數 `bench` 決定 workqueue 為 per-CPU bound workqueue 或者 unbound workqueue:
* `bench == 1` :work item 被排入呼叫 `queue_work()` 時所在 CPU 的 bound-pool。
* 這樣做的優點為可以享有 cache locality,與 system wq 行為相近
* 但當伺服器請求量起伏很大,此模式可能在尖峰期間在每個 CPU 的 worker pool 生成多個 kworker;負載回落後,這些 kworker 變成 idle,可能會造成暫時的資源浪費 (閒置 kworker 會被釋放)
* `bench == 0` : 此設定下 work item 會排進共享的 unbound-pool 由此 pool 管理之 worker 不綁定於任何 CPU,cmwq 能夠依據 CPU 負載選擇於哪一個處理器新增 worker
### echo_server_daemon
`echo_server_daemon` 執行於核心執行緒,由 `kecho` 模組由 `kthread_run()` 發起。
首先為了能夠使此執行緒能夠接收 signal 必須先呼叫以下:
```c
allow_signal(SIGKILL);
allow_signal(SIGTERM);
```
kecho 模組或者其他 context 即能夠透過 `send_sig()`,通知此執行緒
(In kecho `kecho_cleanup_module` --> `send_sig(SIGTERM, echo_server, 1);`)
#### 伺服器主迴圈
```c=
while (!kthread_should_stop()) {
/* using blocking I/O */
int error = kernel_accept(param->listen_sock, &sock, 0);
/*
* error handle ....
*/
if (unlikely(!(work = create_work(sock))))
/*
* skip
*/
/* start server worker */
queue_work(kecho_wq, work);
}
```
* 首先(第 3 行)呼叫 `kernel_accept()` 監聽 `param->listen_sock` 是否有 client 請求連線,此步為 blocking。
(注意此 `listen_sock` 即是模組剛剛初始化的 listen socket,對應於使用者模式下的 `listen_fd`,而 `sock` 對應於使用者模式下的 `conn_fd`)
* 當上一步成功初始化 `sock` 代表此 socket 已經與客戶端完成連線,即將此 socket 當作參數,創建一個 work item (line 9)並放入 `work_queue()` (line 16)用於與此 client 進行互動。
* 迴圈繼續回到 `kernel_accept()` 等待新連線。
#### echo_server_worker
不同於 `kthread` 能夠在呼叫 `kthread_run()` 時透過 `void *data` 傳遞參數:
```c
struct kecho {
struct socket *sock;
struct list_head list;
struct work_struct kecho_work;
};
static void echo_server_worker(struct work_struct *work)
{
struct kecho *worker = container_of(work, struct kecho, kecho_work);
...
}
```
針對要排進去 `work_queue` 內的 `work_item`,能夠透過將 `work_stuct` 鑲嵌於 `struct kecho`,並利用 `container_of` 取得參數,完成針對 `work_strcut` 的參數傳遞。
* get_request():讀取來自 `sock` 的資訊並存入 `vec.iov_base`
```c
vec.iov_base = buf; // target buffer to store received data
vec.iov_len = size; // max length to read
msg.msg_name = 0; // no address info needed (TCP is connection-based)
msg.msg_namelen = 0;
msg.msg_control = NULL; // no ancillary data (no control messages)
msg.msg_controllen = 0;
msg.msg_flags = 0; // default behavior
// first size means num of kvec
// second size means max len can read
length = kernel_recvmsg(sock, &msg, &vec, size, size, msg.msg_flags);
```
* send_response() 類似但呼叫以下傳送回覆:
```c
length = kernel_sendmsg(sock, &msg, &vec, 1, size);
```
### user-echo-server
在使用者層級的 echo 伺服器,使用 epoll 實現單執行續多工:
```c
int epoll_fd;
if ((epoll_fd = epoll_create(EPOLL_SIZE)) < 0)
server_err("Fail to create epoll", &list);
static struct epoll_event ev = {.events = EPOLLIN | EPOLLET};
ev.data.fd = listener;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listener, &ev) < 0)
server_err("Fail to control epoll", &list);
printf("Listener (fd=%d) was added to epoll.\n", epoll_fd);
```
listener 已經是經過 bind() 以及 listen() 系統呼叫之 listen socket 描述符,將其加入 epoll 交給核心監控。
#### 主要迴圈
```c=
while (1) {
struct sockaddr_in client_addr;
int epoll_events_count;
if ((epoll_events_count = epoll_wait(epoll_fd, events, EPOLL_SIZE,
EPOLL_RUN_TIMEOUT)) < 0)
server_err("Fail to wait epoll", &list);
for (int i = 0; i < epoll_events_count; i++) {
/* EPOLLIN event for listener (new client connection) */
if (events[i].data.fd == listener) {
int client;
while (
(client = accept(listener, (struct sockaddr *) &client_addr,
&socklen)) > 0) {
setnonblock(client);
ev.data.fd = client;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client, &ev) < 0)
server_err("Fail to control epoll", &list);
push_back_client(&list, client,
inet_ntoa(client_addr.sin_addr));
}
if (errno != EWOULDBLOCK)
server_err("Fail to accept", &list);
} else {
/* EPOLLIN event for others (new incoming message from client)
*/
if (handle_message_from_client(events[i].data.fd, &list) < 0)
server_err("Handle message from client", &list);
}
}
}
```
此迴圈利用 epoll 同時監聽 listener 及所有已建立連線的描述符是否具有可讀事件(EPOLLIN):
* `listener`:當 `epoll` 回傳事件中包含 `listener` 描述符,代表有新的連線請求到達。此時進入內部 while 迴圈(第 11 行),利用 non-blocking `accept()` 依序取出所有 `backlog` 佇列中的待處理連線,直到 `accept()` 回傳 `EAGAIN/EWOULDBLOCK` 為止,表示佇列已清空。每個成功建立的 client 連線會透過 `push_back_client()` 儲存其描述符與連線資訊,並將此已連線描述符加入 epoll。
* 已連線的 client 描述符:代表該用戶端已有訊息送達且可以被讀取,程式會呼叫 `handle_message_from_client()` 處理該用戶的請求。
* 注意不管是 listener 或者已連線的 client 描述符皆被設置為 non-blocking,這麼做能夠在 I/O 阻塞時,先返回事件迴圈處理其他已 ready 的事件,而 epoll 會負責輪巡所有事件並將準備好的事件放入佇列等待下一輪迴圈執行 -- 即 Reactor Pattern。
* 使用 ntohs()、htonl() 等函式,這類函式的目的是為了避免不同主機之間因位元組順序(Endianness)不同而產生溝通上的誤差。由於網路通訊協定統一採用 Big Endian 表示法,因此在傳輸資料前後,必須進行適當的轉換以確保正確解析。
使用者空間的 echo server 採用單執行緒 + `epoll`,雖能同時維持多條連線 (I/O 多工),但本質上CPU 一次只服務一個事件,屬於序列化處理。
反觀核心模式下, 每個連線任務會被指派給於 pool 中等待的 worker thread,可分佈到任何 CPU 核心上執行,實現多條連線得以**真正並行處理**,更充分利用多核心硬體資源 (但核心模式之實作為 blocking I/O,故雖然能夠利用多核的硬體資源,仍然可能會出現大量閒置等待)。
> benchmark running and analysis TBD....
### khttpd
Socket 初始化以及 `http_server_daemon` 呼叫皆與 kecho 相似不再重複敘述,兩者的差別為 kecho 使用 CMWQ 執行用戶任務,而 khttpd 則使用 `kernel_thread` (line 9)。
且可以注意程式碼第 4 行,藉由檢查是否有 signal 傳送至此執行緒決定是否終止迴圈 (需先呼叫`allow_signal()` 允許執行緒捕捉信號)。
```c=
while (!kthread_should_stop()) {
int err = kernel_accept(param->listen_socket, &socket, 0);
if (err < 0) {
if (signal_pending(current))
break;
pr_err("kernel_accept() error: %d\n", err);
continue;
}
worker = kthread_run(http_server_worker, socket, KBUILD_MODNAME);
// error handling skip...
}
```
相比 cs:app 實作之 TINY Wed Server (HTTP 1.0):
* 額外實現 keep alive 機制。
* 利用 kernel thread 達到並行,能夠服務多個請求,**注意實作仍然為 blocking I/O**。
* 利用 http_parser 等定義好之 API,不必手動寫 http 請求解析器。
* 功能方面則與 TINY 無異,無視 http 請求之 header 以及 body,並單純受理 GET method.
利用以下命令測試性能
```shell
$ ./htstress -n 200000 -c 1 -t 4 http://localhost:8081/
```
* `-n` : 表示對 server 請求連線的數量
* `-c` : 表示總體對 server 的連線數量
* `-t` : 表示使用多少執行緒
可以得到以下結果
```
requests: 200000
good requests: 200000 [100%]
bad requests: 0 [0%]
socket errors: 0 [0%]
seconds: 2.104
requests/sec: 95059.610
```
#### 利用 CMWQ 增進效能
主要改動如下:
```diff
static int __init khttpd_init(void)
{
int err = open_listen_socket(port, backlog, &listen_socket);
// error handling skip...
+ struct workqueue_struct *khttpd_wq = alloc_workqueue(MODULE_NAME, WQ_UNBOUND, 0);
param.listen_socket = listen_socket;
+ param.khttpd_wq = khttpd_wq;
http_server = kthread_run(http_server_daemon, ¶m, KBUILD_MODNAME);
// error handling skip...
}
```
* 首先在模組初始化時建立 workqueue 並加入 param 結構體,供 `http_server_daemon` 使用
```diff=
struct http_server_param {
struct socket *listen_socket;
+ struct workqueue_struct *khttpd_wq;
};
+struct http_service {
+ bool is_stopped;
+ struct list_head worker;
+};
+struct khttpd {
+ struct socket *sock;
+ struct list_head list;
+ struct work_struct khttpd_work;
+};
```
* `struct http_server.h` 有如上改動,第 3 行的功能如上介紹,除外新增了二個資料結構
* `http_service` : 用於管理 work item
* 由於 cmwq worker item 是跑在 worker 執行緒內的上下文而非一個執行緒,所以不能跟原先的 kthread 方式一樣透過信號中止
* 當 `http_server_daemon` 將 `is_stopped` 設為 true,中止所有 worker item
* 而 struct list_head worker 用於遍歷任務鍊結串列以釋放記憶體
* `struct http_service daemon = {.is_stopped = false};` daemon 為全域變數
* `struct khttpd` 透過嵌入 `struct work_struct khttpd_work` 並利用 container_of 巨集傳遞資訊給 `work_func`
```c
static struct work_struct *create_work(struct socket *sk)
{
struct khttpd *work;
if (!(work = kmalloc(sizeof(struct khttpd), GFP_KERNEL)))
return NULL;
work->sock = sk;
INIT_WORK(&work->khttpd_work, http_server_worker);
list_add(&work->list, &daemon.worker);
return &work->khttpd_work;
}
static void free_work(void)
{
struct khttpd *l, *tar;
/* cppcheck-suppress uninitvar */
list_for_each_entry_safe (tar, l, &daemon.worker, list) {
flush_work(&tar->khttpd_work);
kfree(tar);
}
pr_info("free work done\n");
}
```
* `create_work` 利用 INIT_WORK 巨集能夠運行 `http_server_worker` 函式的 `work_struct` 並將其嵌入至 `struct khttpd *work`
* `free_work` 則夠過存於 daemon 內的 `list_head` 釋放各節點記憶體,於卸載模組時呼叫
```diff=
static void http_server_worker(struct work_struct *work)
{
struct khttpd *worker = container_of(work, struct khttpd, khttpd_work);
char *buf;
//skip
struct http_request request;
+ struct socket *socket = worker->sock;
buf = kzalloc(RECV_BUFFER_SIZE, GFP_KERNEL);
// skip
request.socket = socket;
http_parser_init(&parser, HTTP_REQUEST);
parser.data = &request;
+ while (!daemon.is_stopped) {
int ret = http_server_recv(socket, buf, RECV_BUFFER_SIZE - 1);
// skip
http_parser_execute(&parser, &setting, buf, ret);
if (request.complete && !http_should_keep_alive(&parser))
break;
memset(buf, 0, RECV_BUFFER_SIZE);
+ }
kernel_sock_shutdown(socket, SHUT_RDWR);
sock_release(socket);
kfree(buf);
return;
}
```
測試導入 CMWQ 後的效能提昇
```
/htstress -n 200000 -c 1 -t 4 http://localhost:8081/
requests: 200000
good requests: 200000 [100%]
bad requests: 0 [0%]
socket errors: 0 [0%]
seconds: 0.939
requests/sec: 213032.702
```
| banchmark | cmwq_ver | kthread_ver | improve |
| -------- | -------- | -------- | -------- |
| requests/sec | 213032.702 | 95059.610 | **2.24105**
吞吐量約提昇 2.25 倍,造成如此差距之的主要原因為:
* cmwq 已經預先建立好 worker thread,新任務能夠被立即被這些 kworker 執行;而原本的模式接到請求方建立執行緒
* 當請求數量提高,建立執行緒的成本即被放大