contributed by < Risheng1128
>
$ gcc --version
gcc (Ubuntu 9.3.0-17ubuntu1~20.04) 9.3.0
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
Address sizes: 39 bits physical, 48 bits virtual
CPU(s): 4
On-line CPU(s) list: 0-3
Thread(s) per core: 2
Core(s) per socket: 2
Socket(s): 1
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 142
Model name: Intel(R) Core(TM) i5-7200U CPU @ 2.50GHz
Stepping: 9
CPU MHz: 2700.000
CPU max MHz: 3100.0000
CPU min MHz: 400.0000
BogoMIPS: 5399.81
Virtualization: VT-x
L1d cache: 64 KiB
L1i cache: 64 KiB
L2 cache: 512 KiB
L3 cache: 3 MiB
NUMA node0 CPU(s): 0-3
kecho
已使用 CMWQ,請陳述其優勢和用法*
workqueue() functions are deprecated and scheduled for removal",請參閱 Linux 核心的 git log (不要用 Google 搜尋!),揣摩 Linux 核心開發者的考量user-echo-server
運作原理,特別是 epoll 系統呼叫的使用bench
原理,能否比較 kecho
和 user-echo-server
表現?佐以製圖drop-tcp-socket
核心模組運作原理。TIME-WAIT
sockets 又是什麼?在理解 CMWQ 的優勢及用法之前,先理解為什麼需要特別實作 CMWQ ,參考 Why cmwq?
原本的 workqueue 實作可分為兩種 — multi threaded workqueue 及 single threaded workqueue
由於逐年上升的 MT wq 使用者且同時 CPU 的核數逐漸上升,導致某些系統剛啟動時可能就會將預設的 32k PID 空間用完
而需要 CMWQ 的最主要原因在於,原始的 MT wq 浪費了許多的資源,但 MT wq 的 level of concurrency 並沒有來的比較好。MT wq 建立了固定數量的執行緒,但問題出在每個 CPU 上都固定綁定一個執行緒,導致不同 CPU 的執行緒間無法相互轉移,進而降低了系統的效能
CMWQ 著重在以下的實作
以下為 CMWQ 的架構,一共分成 Thread pool 及 Unbound Thread pool
CMWQ 的優勢:
函式 alloc_workqueue
: 分配一個 workqueue。函式 alloc_workqueue
一共有 3 個參數 — @"name"
, @flags
及 @max_active
@"name"
: workqueue 的名稱@flags
:
WQ_UNBOUND
: Work items queued to an unbound wq are served by the special worker-pools which host workers which are not bound to any specific CPUWQ_FREEZABLE
: A freezable wq participates in the freeze phase of the system suspend operations. Work items on the wq are drained and no new work item starts execution until thawedWQ_MEM_RECLAIM
: All wq which might be used in the memory reclaim paths MUST have this flag set. The wq is guaranteed to have at least one execution context regardless of memory pressureWQ_HIGHPRI
: Work items of a highpriority wq are queued to the highpri worker-pool of the target cpu. Highpri worker-pools are served by worker threads with elevated nice levelWQ_CPU_INTENSIVE
: Work items of a CPU intensive wq do not contribute to the concurrency level@max_active
: 決定 workqueue 每個 CPU 上最多可存放的 work item ,假設 max_active = 16
,表示每個 workqueue 最多可以存放 16 個 work item__create_workqueue
為何被移除參考 commit 和 OscarShiang 的開發紀錄 ,可以發現 __create_workqueue
改成 alloc_workqueue
的原因是為了讓新增功能的方法變得更簡潔
This patch makes changes to make new workqueue features available to its users
參考 [PATCH] Support for freezeable workqueues 和 workqueue: introduce create_rt_workqueue 的修改可以發現,在改名成 alloc_workqueue
之前,新增功能的實作常常都是要動到大量的函式宣告,並且增加函式參數,以下給兩個例子
freezeable
extern struct workqueue_struct *__create_workqueue(const char *name,
- int singlethread);
- #define create_workqueue(name) __create_workqueue((name), 0)
- #define create_singlethread_workqueue(name) __create_workqueue((name), 1)
+ int singlethread,
+ int freezeable);
+ #define create_workqueue(name) __create_workqueue((name), 0, 0)
+ #define create_freezeable_workqueue(name) __create_workqueue((name), 0, 1)
+ #define create_singlethread_workqueue(name) __create_workqueue((name), 1, 0)
rt
- #define create_workqueue(name) __create_workqueue((name), 0, 0)
- #define create_freezeable_workqueue(name) __create_workqueue((name), 1, 1)
- #define create_singlethread_workqueue(name) __create_workqueue((name), 1, 0)
+ #define create_workqueue(name) __create_workqueue((name), 0, 0, 0)
+ #define create_rt_workqueue(name) __create_workqueue((name), 0, 0, 1)
+ #define create_freezeable_workqueue(name) __create_workqueue((name), 1, 1, 0)
+ #define create_singlethread_workqueue(name) __create_workqueue((name), 1, 0, 0)
接著從 workqueue: merge feature parameters into flags 可以看到原本功能的實作從函式參數改成使用 flags
的方式
+ enum {
+ WQ_FREEZEABLE = 1 << 0, /* freeze during suspend */
+ WQ_SINGLE_THREAD = 1 << 1, /* no per-cpu worker */
+ };
- #define create_workqueue(name) __create_workqueue((name), 0, 0)
- #define create_freezeable_workqueue(name) __create_workqueue((name), 1, 1)
- #define create_singlethread_workqueue(name) __create_workqueue((name), 1, 0)
+ #define create_workqueue(name) \
+ __create_workqueue((name), 0)
+ #define create_freezeable_workqueue(name) \
+ __create_workqueue((name), WQ_FREEZEABLE | WQ_SINGLE_THREAD)
+ #define create_singlethread_workqueue(name) \
+ __create_workqueue((name), WQ_SINGLE_THREAD)
改成 flags
後新增功能的方法變得相對簡單許多,以 workqueue: implement high priority workqueue 及 workqueue: implement cpu intensive workqueue 為例
GCWQ_DISASSOCIATED = 1 << 2, /* cpu can't serve workers */
GCWQ_FREEZING = 1 << 3, /* freeze in progress */
+ GCWQ_HIGHPRI_PENDING = 1 << 4, /* highpri works on queue */
WQ_NON_REENTRANT = 1 << 2, /* guarantee non-reentrance */
WQ_RESCUER = 1 << 3, /* has an rescue worker */
WQ_HIGHPRI = 1 << 4, /* high priority */
+ WQ_CPU_INTENSIVE = 1 << 5, /* cpu instensive workqueue */
可以觀察到在標頭檔裡,新增功能的所需要的修正單純許多,已經不需要再修改大量的函式宣告,只需要新增 flag
即可
user-echo-server
運作原理以下為 socket 的流程圖,可以很明顯看到 server 的流程為 socket()
→ bind()
→ listen()
→ accept()
,接著開始收發資料,以下對照檔案 user-echo-server.c
的程式碼
socket()
: 建立一個 socket descriptor/**
* 建立一個 socket descriptor
* PF_INET: 使用 32 bit IP 地址
* SOCK_STREAM: Provides sequenced, reliable, two-way, connection-based byte streams.
*/
if ((listener = socket(PF_INET, SOCK_STREAM, 0)) < 0)
server_err("Fail to create socket", &list);
bind()
: 將 addr 裡的 socket address 和 socket descriptor 連繫起來// 將 addr 和 listener 連繫起來
if (bind(listener, (struct sockaddr *) &addr, sizeof(addr)) < 0)
server_err("Fail to bind", &list);
listen()
: 將 sockfd 轉換成 listening socket ,可以接收來自客戶端的請求,而這邊可以連接 128 個客戶端if (listen(listener, 128) < 0)
server_err("Fail to listen", &list);
epoll_create
: 建立一個 epoll 的 file descriptor// creates a new epoll instance
int epoll_fd;
if ((epoll_fd = epoll_create(EPOLL_SIZE)) < 0)
server_err("Fail to create epoll", &list);
epoll_ctl
: 用來新增、修改及移除所要監聽的 file descriptor 的事件類型下面的程式碼表示 epoll_fd
監聽 listener
,並且增加的監聽類型為 EPOLLIN
及 EPOLLET
// EPOLLIN: The associated file is available for read operations
// EPOLLET: Requests edge-triggered notification for the associated file descriptor
static struct epoll_event ev = {.events = EPOLLIN | EPOLLET};
ev.data.fd = listener;
/*
* epoll_ctl: This system call is used to add, modify, or remove entries in the
* interest list of the epoll(7) instance referred to by the file descriptor epfd
* EPOLL_CTL_ADD: Add an entry to the interest list of the epoll file descriptor
*/
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listener, &ev) < 0)
server_err("Fail to control epoll", &list);
epoll_wait
: 從 user space 切換到 kernel space ,開始監聽 timeout
長度的時間並且回傳時間內發生的事件數,這裡的 timeout = -1
,因此表示 block indefinitely/**
* epoll_wait: mode transition 並開始監聽
* EPOLL_RUN_TIMEOUT: block indefinitely
*/
int epoll_events_count;
if ((epoll_events_count = epoll_wait(epoll_fd, events, EPOLL_SIZE,
EPOLL_RUN_TIMEOUT)) < 0)
發生事件後,要判斷是新的 client 還是舊的,用發生事件的 fd
和 listen
判斷,如果相等表示為新的 client
/* EPOLLIN event for listener (new client connection) */
if (events[i].data.fd == listener) {
int client;
...
} else {
...
}
接著分成兩種情況討論,新的 client 與舊的 client ,先討論新的連線
accept()
: 等待來自 client 的連線請求while ((client = accept(listener, (struct sockaddr *) &client_addr, &socklen)) > 0)
接著新增 epoll_fd
的監聽對象 (client) ,並且將 client 加到 linked list 裡
// epoll_fd 新增新的監聽對象 (client)
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client, &ev) < 0)
server_err("Fail to control epoll", &list);
// 將 client 加到 linked list 裡
push_back_client(&list, client, inet_ntoa(client_addr.sin_addr));
如果收到的是舊的 client 的請求,對收到資料進行處理
/* EPOLLIN event for others (new incoming message from client) */
if (handle_message_from_client(events[i].data.fd, &list) < 0)
server_err("Handle message from client", &list);
recv()
: 收到來自 socket 的訊息// 接收資料
if ((len = recv(client, buf, BUF_SIZE, 0)) < 0)
server_err("Fail to receive", list);
最後如果收到資料就使用函式 send()
送到 client ,如果沒有就使用函式 close()
關閉 client 並從 linked list 移除
// 沒收到訊息
if (len == 0) {
// 關閉 client
if (close(client) < 0)
server_err("Fail to close", list);
// 將 client 從 linke list 移除
*list = delete_client(list, client);
printf("After fd=%d is closed, current numbers clients = %d\n", client,
size_list(*list));
} else {
printf("Client #%d :> %s", client, buf);
// 送出資料
if (send(client, buf, BUF_SIZE, 0) < 0)
server_err("Fail to send", list);
}
參考 Linux 核心設計: 針對事件驅動的 I/O 模型演化 裡對於 epoll 的說明
首先 I/O 模型主要分成 Blocking I/O, Non blocking I/O, I/O multiplexing I/O 及 Asynchronous I/O 四種,而 epoll 屬於 I/O multiplexing ,以下為模型的流程圖
比較特別的地方在於 I/O Multiplexing 在呼叫系統呼叫後,會開始計算期間內已經準備好傳送資料的連線數並回傳,再經由其他系統呼叫進行資料的接收
以 user-echo-server
為例,在呼叫函式 epoll_wait
會開始監聽且時間由參數 timeout
決定,並且回傳監聽期間內可以開始接收資料的事件數量,可以對應到上述 I/O 模型等待資料的部份
接著 user-echo-server
使用函式 recv
的部份也可以對應上述 I/O 模型讀取資料的部份
bench
原理看了檔案 bench.c
的程式碼後,可以很清楚的了解其目的是要利用多執行緒的方式測試伺服器的效能
首先是函式 bench
的部份,一開始建立 MAX_THREAD = 1000
個執行緒,接著等到所有執行緒都準備好,使用函式 pthread_cond_broadcast
開始啟動所有執行緒,最後計算每個執行緒的平均處理時間
static void bench(void)
{
for (int i = 0; i < BENCH_COUNT; i++) {
ready = false;
// 建立 client
create_worker(MAX_THREAD);
pthread_mutex_lock(&worker_lock);
ready = true;
/* all workers are ready, let's start bombing kecho */
pthread_cond_broadcast(&worker_wait);
pthread_mutex_unlock(&worker_lock);
/* waiting for all workers to finish the measurement */
for (int x = 0; x < MAX_THREAD; x++)
pthread_join(pt[x], NULL);
idx = 0;
}
// 計算平均時間
for (int i = 0; i < MAX_THREAD; i++)
fprintf(bench_fd, "%d %ld\n", i, time_res[i] /= BENCH_COUNT);
}
函式 pthread_cond_broadcast
參考 pthread_cond_broadcast(3) - Linux man page ,這邊的用途是開始執行所有被 worker_wait
鎖住的執行緒
The pthread_cond_broadcast() function shall unblock all threads currently blocked on the specified condition variable cond.
討論函式 create_worker
,其用途是建立執行緒,預設建立 1000 個執行緒且每個執行緒都是執行函式 bench_worker
static void create_worker(int thread_qty)
{
for (int i = 0; i < thread_qty; i++) {
if (pthread_create(&pt[i], NULL, bench_worker, NULL)) {
puts("thread creation failed");
exit(-1);
}
}
}
開始討論最關鍵的函式 bench_worker
static void *bench_worker(__attribute__((unused)))
{
int sock_fd;
char dummy[MAX_MSG_LEN];
struct timeval start, end;
/* wait until all workers created */
pthread_mutex_lock(&worker_lock);
while (!ready)
// 等待 worker_wait 被 broadcast 或是 signal
if (pthread_cond_wait(&worker_wait, &worker_lock)) {
puts("pthread_cond_wait failed");
exit(-1);
}
pthread_mutex_unlock(&worker_lock);
// 建立 socket descriptor
sock_fd = socket(AF_INET, SOCK_STREAM, 0);
if (sock_fd == -1) {
perror("socket");
exit(-1);
}
struct sockaddr_in info = {
.sin_family = PF_INET,
.sin_addr.s_addr = inet_addr(TARGET_HOST),
.sin_port = htons(TARGET_PORT),
};
// 建立連線
if (connect(sock_fd, (struct sockaddr *) &info, sizeof(info)) == -1) {
perror("connect");
exit(-1);
}
gettimeofday(&start, NULL);
// 送資料給 server
send(sock_fd, msg_dum, strlen(msg_dum), 0);
// 從 server 接收資料
recv(sock_fd, dummy, MAX_MSG_LEN, 0);
gettimeofday(&end, NULL);
shutdown(sock_fd, SHUT_RDWR);
close(sock_fd);
// 比較接受回來的資料和原本的資料是否相同
if (strncmp(msg_dum, dummy, strlen(msg_dum))) {
puts("echo message validation failed");
exit(-1);
}
// 計算經過時間
pthread_mutex_lock(&res_lock);
time_res[idx++] += time_diff_us(&start, &end);
pthread_mutex_unlock(&res_lock);
pthread_exit(NULL);
}
第 11 行使用函式 pthread_cond_wait
,參考 pthread_cond_wait(3) - Linux man page ,可以得知目的是要等待 worker_wait
被 broadcast 或是 signal
The
pthread_cond_timedwait()
andpthread_cond_wait()
functions shall block on a condition variable. They shall be called with mutex locked by the calling thread or undefined behavior results.
接著就是進行 socket 的建立 (第 18 行) 以及連線 (第 31 行) ,最後和 server 送收資料 (第 38 行和第 40 行) 並且開始計算經過時間 (第 54 行)
kecho
和 user-echo-server
表現首先測試 user-echo-server
的效能,輸入命令 ./user-echo-server
並且開啟新的終端機作為 client 輸入命令 ./bench
,最後使用 gnuplot 作圖,輸入命令 make plot
,結果如下
接著測試 kecho
的效能,首先輸入命令 sudo insmod kecho.ko
,接著輸入命令 ./bench
,最後畫圖,結果如下所示
可以明顯觀察 kecho
的速度比 user-echo-server
快
嘗試使用實驗室(效能較好)的電腦測試
user-echo-server
kecho
可以發現效能其實差非常多
drop-tcp-socket
核心模組運作原理輸入命令 sudo insmod drop-tcp-socket.ko
載入核心模組時,會執行函式 drop_tcp_init
static int drop_tcp_init(void)
{
int res = register_pernet_subsys(&droptcp_pernet_ops);
if (res)
return res;
return 0;
}
作業要求
kecho
的執行時期的缺失,提升效能和穩健度 (robustness)
kecho
核心模組,會發生什麼事?kecho
核心模組的連線後,就長期等待,會導致什麼問題?kecho
模組在研究 kecho
的行為前,先分析 kecho
的實作考量,首先討論掛載模組時會執行的函式 kecho_init_module
static int kecho_init_module(void)
{
int error = open_listen(&listen_sock);
if (error < 0) {
printk(KERN_ERR MODULE_NAME ": listen socket open error\n");
return error;
}
param.listen_sock = listen_sock;
/*
* Create a dedicated workqueue instead of using system_wq
* since the task could be a CPU-intensive work item
* if its lifetime of connection is too long, e.g., using
* `telnet` to communicate with kecho. Flag WQ_UNBOUND
* fits this scenario. Note that the trade-off of this
* flag is cache locality.
*
* You can specify module parameter "bench=1" if you won't
* use telnet-like program to interact with the module.
* This earns you better cache locality than using default
* flag, `WQ_UNBOUND`. Note that your machine may going
* unstable if you use telnet-like program along with
* module parameter "bench=1" to interact with the module.
* Since without `WQ_UNBOUND` flag specified, a
* long-running task may delay other tasks in the kernel.
*/
kecho_wq = alloc_workqueue(MODULE_NAME, bench ? 0 : WQ_UNBOUND, 0);
echo_server = kthread_run(echo_server_daemon, ¶m, MODULE_NAME);
if (IS_ERR(echo_server)) {
printk(KERN_ERR MODULE_NAME ": cannot start server daemon\n");
close_listen(listen_sock);
}
return 0;
}
可以簡單把函式 kecho_init_module
用函式分成三個部份,著重在 alloc_workqueue
及 kthread_run
open_listen
: 建立伺服器並等待連線alloc_workqueue
: 分配一個 workequeuekthread_run
: 用於建立一個立刻執行的執行緒根據註解說明,可以很清楚的知道 kecho
利用參數 bench
決定 workqueue 的設定,參考 Application Programming Interface (API)
kecho_wq = alloc_workqueue(MODULE_NAME, bench ? 0 : WQ_UNBOUND, 0);
函式 kthread_run
的部份可以參考 Linux核心多執行緒kthread 及 include/linux/kthread.h ,其功能是用來建立執行緒並直接執行,以下為原始碼的部份
/**
* kthread_run - create and wake a thread.
* @threadfn: the function to run until signal_pending(current).
* @data: data ptr for @threadfn.
* @namefmt: printf-style name for the thread.
*
* Description: Convenient wrapper for kthread_create() followed by
* wake_up_process(). Returns the kthread or ERR_PTR(-ENOMEM).
*/
#define kthread_run(threadfn, data, namefmt, ...) \
({ \
struct task_struct *__k \
= kthread_create(threadfn, data, namefmt, ## __VA_ARGS__); \
if (!IS_ERR(__k)) \
wake_up_process(__k); \
__k; \
})
可以清楚知道 kthread_run
先使用 kthread_create
建立執行緒,接著使用 IS_ERR
判斷建立有無出現問題,程式若正常就直接使用 wake_up_process
啟動執行緒並且執行
根據 kecho
原始碼以及 Linux 核心原始碼的註解,可以得知 kthread_run
會做以下的動作
echo_server_daemon
直到 signal_pending(current)
,相關敘述可以在函式 echo_server_daemon
找到param
會作為 echo_server_daemon
的參數傳入"kecho"
作為執行緒的名稱接著可以開始分析 echo_server_daemon
函式
int echo_server_daemon(void *arg)
{
struct echo_server_param *param = arg;
struct socket *sock;
struct work_struct *work;
// 登記要接收的 Signal
allow_signal(SIGKILL);
allow_signal(SIGTERM);
INIT_LIST_HEAD(&daemon.worker);
// 判斷執行緒是否該被中止
while (!kthread_should_stop()) {
/* using blocking I/O */
int error = kernel_accept(param->listen_sock, &sock, 0);
if (error < 0) {
// 檢查當前執行緒是否有 signal 處理
if (signal_pending(current))
break;
printk(KERN_ERR MODULE_NAME ": socket accept error = %d\n", error);
continue;
}
if (unlikely(!(work = create_work(sock)))) {
printk(KERN_ERR MODULE_NAME
": create work error, connection closed\n");
kernel_sock_shutdown(sock, SHUT_RDWR);
sock_release(sock);
continue;
}
/* start server worker */
queue_work(kecho_wq, work);
}
printk(MODULE_NAME ": daemon shutdown in progress...\n");
daemon.is_stopped = true;
free_work();
return 0;
}
函式 echo_server_daemon
會做以下幾件事
SIGKILL
及 SIGTERM
,兩者都是用來結束程式,接著參考 SIGTERM vs SIGKILL: What's the Difference? 可以了解兩者的差異kthread_should_stop
判斷執行緒是否該被中止kernel_accept
檢查是否有上述的 signal 發生,有的話就跳出迴圈並且結束程式create_work
建立工作queue_work
開始工作接著查看函式 create_work
static struct work_struct *create_work(struct socket *sk)
{
struct kecho *work;
// 分配 kecho 結構大小的空間
// GFP_KERNEL: 正常配置記憶體
if (!(work = kmalloc(sizeof(struct kecho), GFP_KERNEL)))
return NULL;
work->sock = sk;
// 初始化已經建立的 work ,且會執行函式 echo_server_worker
INIT_WORK(&work->kecho_work, echo_server_worker);
list_add(&work->list, &daemon.worker);
return &work->kecho_work;
}
很明顯函式 create_work
只是要把每個建立的連線資料作為 linked list 的節點儲存起來,而每個連線都會執行函式 echo_server_worker
(後續會有更多分析)
看完 kecho
大致的流程,加上以下定義的結構,可以完全得知 kecho
是如何管理每個連線
struct echo_service {
bool is_stopped;
struct list_head worker;
};
struct kecho {
struct socket *sock;
struct list_head list;
struct work_struct kecho_work;
};
kecho
使用參數 is_stopped
判斷整個程式是否需要結束,並且使用 worker
作為 linked list 的頭將每個連線資料串連起來
echo_server_worker
在 kecho
裡,每個連線請求被建立後都會執行函式 echo_server_worker
static void echo_server_worker(struct work_struct *work)
{
struct kecho *worker = container_of(work, struct kecho, kecho_work);
unsigned char *buf;
// 取得 buffer 空間
buf = kzalloc(BUF_SIZE, GFP_KERNEL);
if (!buf) {
printk(KERN_ERR MODULE_NAME ": kmalloc error....\n");
return;
}
// 當程式還沒有要中斷前,執行無限迴圈
while (!daemon.is_stopped) {
// 取得資料
int res = get_request(worker->sock, buf, BUF_SIZE - 1);
if (res <= 0) {
if (res) {
printk(KERN_ERR MODULE_NAME ": get request error = %d\n", res);
}
break;
}
// 回傳資料
res = send_request(worker->sock, buf, res);
if (res < 0) {
printk(KERN_ERR MODULE_NAME ": send request error = %d\n", res);
break;
}
// 重置 buffer
memset(buf, 0, res);
}
kernel_sock_shutdown(worker->sock, SHUT_RDWR);
kfree(buf);
}
很明顯函式 echo_server_worker
就是負責取得資料並且回傳相同資料的函式,使用函式 get_request
及 send_request
分別進行資料的收及送
首先觀察函式 get_request
,很明顯就只是用 kernel_recvmsg
接收來自 socket 的資料,不過有蠻多特別的結構可以深入研究
static int get_request(struct socket *sock, unsigned char *buf, size_t size)
{
struct msghdr msg;
struct kvec vec;
int length;
/* kvec setting */
vec.iov_len = size;
vec.iov_base = buf;
/* msghdr setting */
msg.msg_name = 0;
msg.msg_namelen = 0;
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = 0;
/*
* TODO: during benchmarking, such printk() is useless and lead to worse
* result. Add a specific build flag for these printk() would be good.
*/
printk(MODULE_NAME ": start get response\n");
/* get msg */
length = kernel_recvmsg(sock, &msg, &vec, size, size, msg.msg_flags);
printk(MODULE_NAME ": get request = %s\n", buf);
return length;
}
參考 kernel_recvmsg 可以得到其定義
// 從 socket 取得資料 (message)
int kernel_recvmsg (struct socket * sock, // The socket to receive the message from
struct msghdr * msg, // Received message
struct kvec * vec, // Input s/g array for message data
size_t num, // Size of input s/g array
size_t size, // Number of bytes to read
int flags // Message flags (MSG_DONTWAIT, etc...));
接著可以分析結構 msghdr
及 kvec
,分別定義在 include/linux/socket.h 及 include/linux/uio.h
struct msghdr {
void *msg_name; /* ptr to socket address structure */
int msg_namelen; /* size of socket address structure */
struct iov_iter msg_iter; /* data */
/*
* Ancillary data. msg_control_user is the user buffer used for the
* recv* side when msg_control_is_user is set, msg_control is the kernel
* buffer used for all other cases.
*/
union {
void *msg_control;
void __user *msg_control_user;
};
bool msg_control_is_user : 1;
__kernel_size_t msg_controllen; /* ancillary data buffer length */
unsigned int msg_flags; /* flags on received message */
struct kiocb *msg_iocb; /* ptr to iocb for async requests */
};
struct kvec {
void *iov_base; /* and that should *never* hold a userland pointer */
size_t iov_len;
};
有了以上的結構後,可以對應 get_request
的程式碼,對變數 msg
來說沒有特別的設定,而對變數 vec
來說,很明顯可以看到資料最後會送到 vec.iov_base
所指向的地址
接著我很好奇函式 kernel_recvmsg
對參數 vec
的註解
Input s/g array for message data
裡面的 s/g 的意思為 scatter/gather ,搜尋後查到 DMA 有相關的功能,因此可以推論 kernel_recvmsg
在讀取來自 socket 的資料時應該會用到 DMA (不是很確定)
接著可以開始分析函式 send_request
,主要邏輯都和
static int send_request(struct socket *sock, unsigned char *buf, size_t size)
{
int length;
struct kvec vec;
struct msghdr msg;
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = 0;
vec.iov_base = buf;
vec.iov_len = strlen(buf);
printk(MODULE_NAME ": start send request.\n");
length = kernel_sendmsg(sock, &msg, &vec, 1, size);
printk(MODULE_NAME ": send request = %s\n", buf);
return length;
}
參考 kernel_sendmsg 以下為其定義
int kernel_sendmsg(struct socket * sock, // socket
struct msghdr * msg, // message header
struct kvec * vec, // kernel vec
size_t num, // vec array length
size_t size) // total message data size
和 kernel_recvmsg
的用法差不多,只差在這裡是使用結構 vec
將資料輸出
在處理資料時, kecho
原本的實作會經過不少的 printk ,導致 bench
計算的時間會包含到這些函式,產生多餘的耗能,因此首先需要移除多餘函式的使用
參考 kevinshieh0225 的開發紀錄 裡提到的 sysprog21/concurrent-programs 進行模仿,在程式結束時再把紀錄印出
在 echo_server.h
新增
enum {
TRACE_kzalloc_err = 1, // kzalloc 失敗的次數
TRACE_get_err, // get request 失敗的次數
TRACE_send_err, // send request 失敗的次數
TRACE_recvmsg, // recvmsg 的次數
TRACE_sendmsg, // sendmsg 的次數
TRACE_accept_err, // accept 失敗的次數
TRACE_work_err // 建立 work 失敗的次數
};
struct runtime_state {
atomic_t kzalloc_err, get_err;
atomic_t send_err, recvmsg;
atomic_t sendmsg, accept_err;
atomic_t work_err;
};
#define TRACE(ops) \
do { \
if (TRACE_##ops) \
atomic_add(1, &states.ops); \
} while (0)
首先列舉的成員是為了讓巨集 TRACE(ops)
判斷是否真的有該成員,有的話就使用函式 atomic_add
加 1
在 echo_server.c
新增 do_analysis
用來印出資訊
static void do_analysis(void)
{
__atomic_thread_fence(__ATOMIC_SEQ_CST);
#define TRACE_PRINT(ops) printk(MODULE_NAME ": %s : %d\n", #ops, atomic_read(&states.ops));
TRACE_PRINT(recvmsg);
TRACE_PRINT(sendmsg);
TRACE_PRINT(kzalloc_err);
TRACE_PRINT(get_err);
TRACE_PRINT(send_err);
TRACE_PRINT(accept_err);
TRACE_PRINT(work_err);
}
最後印出結果
[69825.652096] kecho: recvmsg : 40060
[69825.652098] kecho: sendmsg : 20030
[69825.652100] kecho: kzalloc_err : 0
[69825.652101] kecho: get_err : 0
[69825.652102] kecho: send_err : 0
[69825.652103] kecho: accept_err : 0
[69825.652105] kecho: work_err : 0
發現一件有趣的事, recvmsg
的數量居然是 sendmsg
的兩倍,嘗試將 bench.c
的執行緒數量改成 1 後並重新開啟 printk
再測試
[69294.983057] kecho: start get response
[69294.983068] kecho: get request = dummy message
[69294.983073] kecho: start send request.
[69294.983106] kecho: send request = dummy message
[69294.983109] kecho: start get response
[69294.983148] kecho: get request =
參考 CS:APP 的伺服器架構,可以明顯發現,在中斷連線時, client 是會對 server 發出中斷請求,這也是為什麼實際收到資料的次數會是送出的 2 倍
接著稍微修改函式 get_request
,主要判斷如果不是 EOF
的話就將接收資料的次數加 1
static int get_request(struct socket *sock, unsigned char *buf, size_t size)
{
struct msghdr msg;
struct kvec vec;
int length;
/* kvec setting */
vec.iov_len = size;
vec.iov_base = buf;
/* msghdr setting */
msg.msg_name = 0;
msg.msg_namelen = 0;
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = 0;
/* get msg */
length = kernel_recvmsg(sock, &msg, &vec, size, size, msg.msg_flags);
+ if (likely(length)) /* Not EOF */
+ TRACE(recvmsg);
+
return length;
}
另外在結構 runtime_state
新增計算 shutdown 總數的變數 shutdown
,修改程式並重新測試
[602673.678105] kecho: recvmsg : 10002
[602673.678106] kecho: sendmsg : 10002
[602673.678106] kecho: shutdown : 10001
[602673.678107] kecho: kzalloc_err : 0
[602673.678107] kecho: get_err : 0
[602673.678107] kecho: send_err : 0
[602673.678108] kecho: accept_err : 0
[602673.678108] kecho: work_err : 0
如此一來,已經符合接收和發送的總數
最後和修改之前的程式比較效能
含有 printk
不含 printk
也特別用實驗室電腦測試
含有 printk
不含 printk
很明顯的, server 的效率變得更好了
為了模擬更符合現實世界的情況,每次的測試字串也應當不同,參考了 quiz8 - 測驗 1
裡提及的 SWAR 手法,以下為產生隨機字串的程式碼
#define GENRAND64(X) (((X) & 0x7F7F7F7F7F7F7F7F) | 0x2020202020202020)
#define GENRAND8(X) (((X) & 0x7F) | 0x20)
#define DETECT_NULL(X) (((X) -0x0101010101010101) & ~(X) & 0x8080808080808080)
#define DETECT_CHAR(X, MASK) (DETECT_NULL((X) ^ (MASK)))
static void GenRandString(char *str)
{
int size = (rand() & (MAX_MSG_LEN - 1)) + 1;
uint64_t *lptr = (uint64_t *) str;
while (size >= 8) {
uint64_t rand64 = (uint64_t) rand() << 32 | rand();
*lptr = GENRAND64(rand64);
// 如果偵測到 DEL
if (DETECT_CHAR(*lptr, 0x7F7F7F7F7F7F7F7F))
continue;
lptr++;
size -= 8;
}
char *cptr = (char *) lptr;
while (size) {
*cptr = GENRAND8(rand());
// 如果產生 DEL
if (*cptr == 0x7F)
continue;
cptr++;
size--;
}
*cptr = '\0';
}
上述的程式主要產生長度為 1 ~ MAX_MSG_LEN
的字串,當字串長度大於等於 8 時,會一次計算 8 個字元直到長度小於 8 為止,而選擇的字元範圍參考 ASCII 可顯示的字元,範圍為 32 ~ 126
透過巨集 GENRAND64
產生 8 個位元組且每個位元組的範圍的數值介在 32 ~ 127
,接著透過巨集 DETECT_CHAR
去偵測是否有位元組的值為 127 (為 Delete 字元),如果存在就重新產生一次
以下為部份產生的字串樣貌
str = )v/|,<%$;kgk9?/tcrtp'o7b9rg
str = ?l:2ck{g,l&&0u$h}og~
str = !h+fjmo!l-s#= )w7kv!'t4wp*dsy)y(
str = x9|644 tm&vu4~zfj
str = -zug!v6 1k;h:#j(xdn3y&{m!!v`
str = j)zme2:-#{rs?!rm%}0,
str = =&?4ex7|yvr#t+#&k>tll
str = %:-;}ngs~<~sg6;6
str = =~z#(nw
kecho
首先使用命令 telnet localhost 12345
測試 kecho
可以回傳的長度的限制。直接講結論,當輸入的字串長度大於 4095 時,可以很明顯觀察到回傳的字串是有缺少的,可以從以下 kecho
的程式碼觀察
length = kernel_recvmsg(sock, &msg, &vec, size, size, msg.msg_flags);
函式 kernel_recvmsg
是 kecho
模組中用來接收資料的函式,其中從右數來第二個參數 size
是用來決定接收資料的最長長度,而這裡的大小為巨集 BUF_SIZE
所定義
#define BUF_SIZE 4096
因此最後可以得知目前實作的限制,即接收長度不能超過 4095 ,否則會產生缺少字元的風險
TODO: 思考如何解決字串長度太大的問題中…