---
tags: linux2023
---
# 2023q1 Homework7 (ktcp)
contributed by < `linhoward0522` >
## 開發環境
```shell
$ gcc --version
gcc (Ubuntu 11.3.0-1ubuntu1~22.04) 11.3.0
Copyright (C) 2021 Free Software Foundation, Inc.
This is free software; see the source for copying conditions. There is NO
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
$ lscpu
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Address sizes: 39 bits physical, 48 bits virtual
Byte Order: Little Endian
CPU(s): 12
On-line CPU(s) list: 0-11
Vendor ID: GenuineIntel
Model name: 12th Gen Intel(R) Core(TM) i5-12400
CPU family: 6
Model: 151
Thread(s) per core: 2
Core(s) per socket: 6
Socket(s): 1
Stepping: 2
CPU max MHz: 5600.0000
CPU min MHz: 800.0000
BogoMIPS: 4992.00
```
## `CMWQ`
>[`Concurrency Managed Workqueue`](https://www.kernel.org/doc/html/latest/core-api/workqueue.html)
### Introduction
許多情況下需要非同步來執行 `process` ,這個情況下最常用的方法就是 `workqueue` 的 API
當需要這樣非同步來執行 `process` 時,會將工作放入 `queue` 中 。 一個獨立的`thread` 會進行非同步執行
- `queue` 稱為 `workqueue`
- `thread` 稱為 `worker`
當 `workqueue` 上有工作時, `worker` 會依序地執行與該工作關聯的功能
當 `workqueue` 沒有工作時, `worker` 就會 `idle`
### Why cmwq?
原本的 `workqueue` 有兩種方式:
- `multi thread` : 每個 `CPU` 有一個 `worker` ,每一個 `multi thread` 需要保持與 `CPU` 大致相同的 `worker` 數量
- `single thread` : 整個系統內只有一個 `worker`
使用`multi thread` 方法會消耗許多的資源,且並行的效果不盡人意
`Concurrency Managed Workqueue (cmwq)` 是針對 `workqueue` 的改善,並著重於以下目標 :
- 保持與原始 `workqueue` API 的兼容性
- 每個 `CPU` 共享所有的 `Worker Pools` 並根據不同需求提供彈性的並行等級,減少資源浪費
- 自動調整 `worker pool` 和 並行等級,使 API 使用者無需擔心這些細節
### The Design
- 為了簡化非同步執行,所以引入了 `work item` 的概念。是一個簡單的結構包含指向非同步執行函式的函式指標,當想要非同步執行函式時,必須設置一個指向該函式的 `work item` ,並放入 `workqueue`
- `worker thread` 依序會處理 `workqueue` 中的任務,若沒有任務 `thread` 會變為 `idle` 狀態,而由`worker-pools` 負責管理這些 `threads`
- 有兩種 `worker-pools` :
- `Bound` : 服務綁定在特定的 `CPU` 上的 `work item` ,在指定的 `CPU` 上執行。且又分兩個 `worker-pools`,一種用於正常優先級的 `work item` ,另一種用於高優先級 `work item`
- `Unbound` : `worker` 可以在多個 `CPU` 上調度,且` worker-pools` 裡面的 `worker` 數量是動態的
- 對於 `worker-pools` 的實作,管理並行等級(有多少任務同時工作)是一個問題。 `CMWQ` 會將並行保持在最低但足夠的等級,以節省資源
- 每當 `worker` 醒來或是睡覺時, `worker-pool` 都會收到通知,並持續追蹤當前可工作的 `worker`
- 通常 `work item` 不會佔用 `CPU` 很多週期,表示保持足夠的並行性來防止工作停止是最佳的
- 只要 `CPU` 上有一個或多個可運行的 `worker` , `worker-pool` 就不會開始執行新工作,但是當最後一個運行的 `worker` 進入睡眠狀態時,它會立即排程一個新的 `worker` ,這樣 `CPU` 就不會在有待處理的 `work item` 時閒置
- 除了 `kthreads` 的記憶體空間外,保持閒置的 `worker` 不會花費任何成本,因此 `CMWQ` 在釋放它們的系統資源之前會保留閒置的 `worker` 一段時間
## 給定的 `kecho` 已使用 `CMWQ`,請陳述其優勢和用法,應重現相關實驗
`kecho_mod.c` 中的 71 行利用 `alloc_workqueue` 來創建 `workqueue`
```cpp
struct workqueue_struct *kecho_wq;
kecho_wq = alloc_workqueue(MODULE_NAME, bench ? 0 : WQ_UNBOUND, 0);
```
其中 `WQ_UNBOUND` flag 表示 `work item` 不需要綁定在特定的 `CPU` 上執行
進入到 `unbound workqueue` 的 `work item` 會被盡快執行,這樣雖然犧牲了 `locality` ,但有以下好處 :
- 並行等級的波動是被預期的,若使用 `bound workqueue` 可能會創建大量的 `worker` 卻未使用到
- 系統排程器可以更好地管理長時間在 `CPU` 運行的工作
>Work items queued to an unbound wq are served by the special worker-pools which host workers which are not bound to any specific CPU. This makes the wq behave as a simple execution context provider without concurrency management. The unbound worker-pools try to start execution of work items as soon as possible. Unbound wq sacrifices locality but is useful for the following cases.
>- Wide fluctuation in the concurrency level requirement is expected and using bound wq may end up creating large number of mostly unused workers across different CPUs as the issuer hops through different CPUs.
>- Long running CPU intensive workloads which can be better managed by the system scheduler.
`kecho_mod.c` 中的 86 行利用 `destroy_workqueue` 來將 `workqueue` 釋放
```cpp
destroy_workqueue(kecho_wq);
```
>Safely destroy a workqueue. All work currently pending will be done first.
`echo_server.c` 中的 109 行利用 `INIT_WORK` 來將任務初始化 ,透過 `function pointer` 指派任務內容
```cpp
INIT_WORK(&work->kecho_work, echo_server_worker);
```
>initialize all of a work item in one go
`echo_server.c` 中的 160 行利用 `queue_work` 來將任務加入 `workqueue` 中
```cpp
/* start server worker */
queue_work(kecho_wq, work);
```
>We queue the work to the CPU on which it was submitted, but if the CPU dies it can be processed by another CPU.
### 比較 `kecho` 和 `user-echo-server` 的效能
- `kecho` 的效能
![](https://i.imgur.com/VfBVsOK.png)
> 實驗 `bench` 不論設為 `true` 或是 `false` ,效能都差不多
- `user-echo-server` 的效能
![](https://i.imgur.com/K1VbYOv.png)
> 可以看出因為 `kecho` 是在 `kernel` 執行,並且又引入了 `CMWQ`,所以執行時間比 `user-echo-server` 快很多
## 引入 `Concurrency Managed Workqueue (cmwq)`,改寫 `kHTTPd`,分析效能表現和提出改進方案
>參考 [`kecho`](https://github.com/sysprog21/kecho) 來作修改
### `http_server.h`
`http_service` 用來管理 `workqueue`
- `is_stopped` 用來取代本來 `worker` 內的 `kthread_should_stop`,改變 `is_stopped` 可以通知所有 `worker` 停止
- `worker` 使用 `List` 來維護,可以利用 [`The Linux Kernel API`](https://www.kernel.org/doc/html/latest/core-api/kernel-api.html) 來做新增或刪除
```cpp
#define MODULE_NAME "khttpd"
struct http_service {
bool is_stopped;
struct list_head worker;
};
```
`khttpd` 用來管理 `work`
```cpp
struct khttpd {
struct socket *sock;
struct list_head list;
struct work_struct http_work;
};
```
### `main.c`
原本 `khttpd` 核心模組是採用系統預設的 `workqueue` ,為了要引入 `CMWQ`,必須在掛載以及卸載模組時用到 `workqueue` ,所以需要 `alloc_workqueue` 以及 `destroy_workqueue`
```diff
-#include <linux/kthread.h>
#include <linux/sched/signal.h>
#include <linux/tcp.h>
#include <linux/version.h>
...
+struct workqueue_struct *khttpd_wq;
static int __init khttpd_init(void)
{
...
+ khttpd_wq = alloc_workqueue(MODULE_NAME, WQ_UNBOUND, 0);
http_server = kthread_run(http_server_daemon, ¶m, KBUILD_MODNAME);
...
}
static void __exit khttpd_exit(void)
{
send_sig(SIGTERM, http_server, 1);
kthread_stop(http_server);
close_listen_socket(listen_socket);
+ destroy_workqueue(khttpd_wq);
pr_info("module unloaded\n");
}
```
### `http_server.c`
首先使用全域變數 `is_stopped` ,可以用來通知所有 `worker` 是否停止,以及一個指標指向 `workqueue`
```cpp
struct http_service daemon = {.is_stopped = false};
extern struct workqueue_struct *khttpd_wq;
```
- `http_server_worker`
- 建立一個 `worker` ,並使用 `container_of` 巨集來獲得 `socket` 資料
- `while` 迴圈使用 `is_stopped` 來判斷是否結束
```diff
-static int http_server_worker(void *arg)
+static void http_server_worker(struct work_struct *work)
{
+ struct khttpd *worker = container_of(work, struct khttpd, http_work);
char *buf;
...
struct http_request request;
- struct socket *socket = (struct socket *) arg;
+ struct socket *socket = worker->sock;
allow_signal(SIGKILL);
allow_signal(SIGTERM);
static int http_server_worker(void *arg)
buf = kzalloc(RECV_BUFFER_SIZE, GFP_KERNEL);
if (!buf) {
pr_err("can't allocate memory!\n");
- return -1;
+ return;
}
request.socket = socket;
http_parser_init(&parser, HTTP_REQUEST);
parser.data = &request;
- while (!kthread_should_stop()) {
+ while (!daemon.is_stopped) {
int ret = http_server_recv(socket, buf, RECV_BUFFER_SIZE - 1);
if (ret <= 0) {
if (ret)
}
...
kernel_sock_shutdown(socket, SHUT_RDWR);
sock_release(socket);
kfree(buf);
- return 0;
}
```
- `create_work`
- 使用 `kmalloc` 為每一個 `work_item` 動態配置 `kernel space` 的記憶體空間
- 利用 `INIT_WORK` 來將任務初始化 ,透過 `function pointer` 將 `work_item` 綁在對應的函式上
- 最後使用 `list_add` 來加入到 `workqueue` 中
- `free_work`
- 使用 [`list_for_each_entry_safe`](https://www.kernel.org/doc/html/latest/core-api/kernel-api.html#c.list_for_each_entry_safe) 走訪整個 `linked list`
- [`kernel_sock_shutdown`](https://www.kernel.org/doc/html/next/networking/kapi.html#c.kernel_sock_shutdown)斷開 `socket` 的連線
- [`flush_work`](https://www.kernel.org/doc/html/v4.13/driver-api/basics.html#c.flush_workqueue) 等待 `work_item` 執行完畢
- [`sock_release`](https://www.kernel.org/doc/html/next/networking/kapi.html#c.sock_release) 關閉 `socket`
- [`kfree`](https://archive.kernel.org/oldlinux/htmldocs/kernel-api/API-kfree.html ) 釋放從 `kmalloc` 所配置的記憶體空間
- `http_server_daemon`
- 使用 `create_work` 會為每一個連線的請求建立一個 `work_item` 進行處理
- `queue_work` 會將 `work_item` 放入 `workqueue` 中排隊
- 最後當 `daemon` 結束,呼叫 `free_work` 來釋放掉建立連線所分配的記憶體空間
```cpp
int http_server_daemon(void *arg)
{
struct socket *socket;
struct work_struct *work;
struct http_server_param *param = (struct http_server_param *) arg;
allow_signal(SIGKILL);
allow_signal(SIGTERM);
INIT_LIST_HEAD(&daemon.worker);
while (!kthread_should_stop()) {
int err = kernel_accept(param->listen_socket, &socket, 0);
if (err < 0) {
if (signal_pending(current))
break;
pr_err("kernel_accept() error: %d\n", err);
continue;
}
if (unlikely(!(work = create_work(socket)))) {
printk(KERN_ERR MODULE_NAME
": create work error, connection closed\n");
kernel_sock_shutdown(socket, SHUT_RDWR);
sock_release(socket);
continue;
}
/* start server worker */
queue_work(khttpd_wq, work);
}
printk(MODULE_NAME ": daemon shutdown in progress...\n");
daemon.is_stopped = true;
free_work();
return 0;
}
```
### 實驗結果
使用 `make check` 來實驗原始版本,以及引入 `CMWQ` 後的效能
```
origin: CMWQ:
requests: 100000 requests: 100000
good requests: 100000 [100%] good requests: 100000 [100%]
bad requests: 0 [0%] bad requests: 0 [0%]
socker errors: 0 [0%] socker errors: 0 [0%]
seconds: 1.207 seconds: 0.588
requests/sec: 82870.570 requests/sec: 169943.171
```
可以發現 `requests/sec` 提昇了將近一倍