Try   HackMD

2022q1 Homework6 (kecho)

contributed by < Risheng1128 >

作業說明
作業區

實驗環境

$ gcc --version
gcc (Ubuntu 9.3.0-17ubuntu1~20.04) 9.3.0

Architecture:                    x86_64
CPU op-mode(s):                  32-bit, 64-bit
Byte Order:                      Little Endian
Address sizes:                   39 bits physical, 48 bits virtual
CPU(s):                          4
On-line CPU(s) list:             0-3
Thread(s) per core:              2
Core(s) per socket:              2
Socket(s):                       1
NUMA node(s):                    1
Vendor ID:                       GenuineIntel
CPU family:                      6
Model:                           142
Model name:                      Intel(R) Core(TM) i5-7200U CPU @ 2.50GHz
Stepping:                        9
CPU MHz:                         2700.000
CPU max MHz:                     3100.0000
CPU min MHz:                     400.0000
BogoMIPS:                        5399.81
Virtualization:                  VT-x
L1d cache:                       64 KiB
L1i cache:                       64 KiB
L2 cache:                        512 KiB
L3 cache:                        3 MiB
NUMA node0 CPU(s):               0-3

自我檢查清單

  • 給定的 kecho 已使用 CMWQ,請陳述其優勢和用法
  • 核心文件 Concurrency Managed Workqueue (cmwq) 提到 "The original create_*workqueue() functions are deprecated and scheduled for removal",請參閱 Linux 核心的 git log (不要用 Google 搜尋!),揣摩 Linux 核心開發者的考量
  • 解釋 user-echo-server 運作原理,特別是 epoll 系統呼叫的使用
  • 是否理解 bench 原理,能否比較 kechouser-echo-server 表現?佐以製圖
  • 解釋 drop-tcp-socket 核心模組運作原理。TIME-WAIT sockets 又是什麼?

為什麼需要 Concurrency Managed Workqueue (CMWQ)?

在理解 CMWQ 的優勢及用法之前,先理解為什麼需要特別實作 CMWQ ,參考 Why cmwq?

原本的 workqueue 實作可分為兩種 — multi threaded workqueue 及 single threaded workqueue

  • multi threaded workqueue: 每個 CPU 都有一個 worker thread ,因此每個 multi threaded workqueue 都會有和 CPU 數相同的 worker thread
  • single threaded workqueue: 整個系統只有一個 worker thread

由於逐年上升的 MT wq 使用者且同時 CPU 的核數逐漸上升,導致某些系統剛啟動時可能就會將預設的 32k PID 空間用完

而需要 CMWQ 的最主要原因在於,原始的 MT wq 浪費了許多的資源,但 MT wq 的 level of concurrency 並沒有來的比較好。MT wq 建立了固定數量的執行緒,但問題出在每個 CPU 上都固定綁定一個執行緒,導致不同 CPU 的執行緒間無法相互轉移,進而降低了系統的效能

CMWQ 著重在以下的實作

  • 維持和原本的 workqueue API 的相容性
  • 在每個 CPU 建立統一的 worker pools 並且被所有的 workqueue 共享,為了提昇 level of concurrency 的彈性以及不浪費太多資源
  • 自動調節 worker pool 以及 level concurrency ,讓 API 的使用者不用注意細節

使用 CMWQ 的優勢

以下為 CMWQ 的架構,一共分成 Thread pool 及 Unbound Thread pool

  • Thread pool: 分成 normal thread pool 及 high priority thread ,分別管理普通的執行緒和高優先權的執行緒。其總數是固定的,如果有 n 個 CPU ,則會建立 2n 的 thread pool
  • Unbound Thread pool: 這種 thread pool 是動態建立且可運行在任意的 CPU 上

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

CMWQ 的優勢:

  • 使用者不需要管理執行緒, CMWQ 會自動管理執行緒,可以避免建立過多的執行緒,降低系統效能
  • 在原本的 workqueue 執行緒是被綁定在 CPU 上,而 CMWQ 除了有綁定特定 CPU 的 thread pool ,也有可以執行在任意 CPU 的 Unbound thread pool ,可以分配執行緒給 idle 的 CPU

CMWQ 的 API 用法

函式 alloc_workqueue: 分配一個 workqueue。函式 alloc_workqueue 一共有 3 個參數 — @"name", @flags@max_active

  • @"name": workqueue 的名稱
  • @flags:
    • WQ_UNBOUND: Work items queued to an unbound wq are served by the special worker-pools which host workers which are not bound to any specific CPU
    • WQ_FREEZABLE: A freezable wq participates in the freeze phase of the system suspend operations. Work items on the wq are drained and no new work item starts execution until thawed
    • WQ_MEM_RECLAIM: All wq which might be used in the memory reclaim paths MUST have this flag set. The wq is guaranteed to have at least one execution context regardless of memory pressure
    • WQ_HIGHPRI: Work items of a highpriority wq are queued to the highpri worker-pool of the target cpu. Highpri worker-pools are served by worker threads with elevated nice level
    • WQ_CPU_INTENSIVE: Work items of a CPU intensive wq do not contribute to the concurrency level
  • @max_active: 決定 workqueue 每個 CPU 上最多可存放的 work item ,假設 max_active = 16 ,表示每個 workqueue 最多可以存放 16 個 work item

解讀函式 __create_workqueue 為何被移除

參考 commitOscarShiang 的開發紀錄 ,可以發現 __create_workqueue 改成 alloc_workqueue 的原因是為了讓新增功能的方法變得更簡潔

This patch makes changes to make new workqueue features available to its users

參考 [PATCH] Support for freezeable workqueuesworkqueue: introduce create_rt_workqueue 的修改可以發現,在改名成 alloc_workqueue 之前,新增功能的實作常常都是要動到大量的函式宣告,並且增加函式參數,以下給兩個例子

  • freezeable workqueues → 新增參數 freezeable
  extern struct workqueue_struct *__create_workqueue(const char *name,
-								 int singlethread);
- #define create_workqueue(name) __create_workqueue((name), 0)
- #define create_singlethread_workqueue(name) __create_workqueue((name), 1)
+ 								 int singlethread,
+								 int freezeable);
+ #define create_workqueue(name) __create_workqueue((name), 0, 0)
+ #define create_freezeable_workqueue(name) __create_workqueue((name), 0, 1)
+ #define create_singlethread_workqueue(name) __create_workqueue((name), 1, 0)
  • real time workqueues → 新增參數 rt
- #define create_workqueue(name) __create_workqueue((name), 0, 0)
- #define create_freezeable_workqueue(name) __create_workqueue((name), 1, 1)
- #define create_singlethread_workqueue(name) __create_workqueue((name), 1, 0)
+ #define create_workqueue(name) __create_workqueue((name), 0, 0, 0)
+ #define create_rt_workqueue(name) __create_workqueue((name), 0, 0, 1)
+ #define create_freezeable_workqueue(name) __create_workqueue((name), 1, 1, 0)
+ #define create_singlethread_workqueue(name) __create_workqueue((name), 1, 0, 0)

接著從 workqueue: merge feature parameters into flags 可以看到原本功能的實作從函式參數改成使用 flags 的方式

+ enum {
+ 	WQ_FREEZEABLE		= 1 << 0, /* freeze during suspend */
+ 	WQ_SINGLE_THREAD	= 1 << 1, /* no per-cpu worker */
+ };

- #define create_workqueue(name) __create_workqueue((name), 0, 0)
- #define create_freezeable_workqueue(name) __create_workqueue((name), 1, 1)
- #define create_singlethread_workqueue(name) __create_workqueue((name), 1, 0)
+ #define create_workqueue(name)					\
+ 	__create_workqueue((name), 0)
+ #define create_freezeable_workqueue(name)			\
+ 	__create_workqueue((name), WQ_FREEZEABLE | WQ_SINGLE_THREAD)
+ #define create_singlethread_workqueue(name)			\
+ 	__create_workqueue((name), WQ_SINGLE_THREAD)

改成 flags 後新增功能的方法變得相對簡單許多,以 workqueue: implement high priority workqueueworkqueue: implement cpu intensive workqueue 為例

  • high priority workqueue
  GCWQ_DISASSOCIATED    = 1 << 2,	/* cpu can't serve workers */
  GCWQ_FREEZING         = 1 << 3,	/* freeze in progress */
+ GCWQ_HIGHPRI_PENDING  = 1 << 4,	/* highpri works on queue */
  • cpu intensive workqueue
  WQ_NON_REENTRANT	= 1 << 2, /* guarantee non-reentrance */
  WQ_RESCUER		= 1 << 3, /* has an rescue worker */
  WQ_HIGHPRI		= 1 << 4, /* high priority */
+ WQ_CPU_INTENSIVE	= 1 << 5, /* cpu instensive workqueue */

可以觀察到在標頭檔裡,新增功能的所需要的修正單純許多,已經不需要再修改大量的函式宣告,只需要新增 flag 即可

解釋 user-echo-server 運作原理

以下為 socket 的流程圖,可以很明顯看到 server 的流程為 socket()bind()listen()accept() ,接著開始收發資料,以下對照檔案 user-echo-server.c 的程式碼

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

  • socket(): 建立一個 socket descriptor
/**
 * 建立一個 socket descriptor
 * PF_INET: 使用 32 bit IP 地址
 * SOCK_STREAM: Provides sequenced, reliable, two-way, connection-based byte  streams.
 */
if ((listener = socket(PF_INET, SOCK_STREAM, 0)) < 0)
	server_err("Fail to create socket", &list);
  • bind(): 將 addr 裡的 socket address 和 socket descriptor 連繫起來
// 將 addr 和 listener 連繫起來
if (bind(listener, (struct sockaddr *) &addr, sizeof(addr)) < 0)
	server_err("Fail to bind", &list);
  • listen(): 將 sockfd 轉換成 listening socket ,可以接收來自客戶端的請求,而這邊可以連接 128 個客戶端
if (listen(listener, 128) < 0)
        server_err("Fail to listen", &list);
  • epoll_create: 建立一個 epoll 的 file descriptor
// creates a new epoll instance
int epoll_fd;
if ((epoll_fd = epoll_create(EPOLL_SIZE)) < 0)
	server_err("Fail to create epoll", &list);
  • epoll_ctl: 用來新增、修改及移除所要監聽的 file descriptor 的事件類型

下面的程式碼表示 epoll_fd 監聽 listener ,並且增加的監聽類型為 EPOLLINEPOLLET

// EPOLLIN: The associated file is available for read operations
// EPOLLET: Requests edge-triggered notification for the associated file descriptor
static struct epoll_event ev = {.events = EPOLLIN | EPOLLET};
ev.data.fd = listener;

/*
 * epoll_ctl: This system call is used to add, modify, or remove entries in the
 * interest list of the epoll(7) instance referred to by the file descriptor epfd
 * EPOLL_CTL_ADD: Add an entry to the interest list of the epoll file descriptor
 */
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listener, &ev) < 0)
	server_err("Fail to control epoll", &list);
  • epoll_wait: 從 user space 切換到 kernel space ,開始監聽 timeout 長度的時間並且回傳時間內發生的事件數,這裡的 timeout = -1 ,因此表示 block indefinitely
/**
 * epoll_wait: mode transition 並開始監聽
 * EPOLL_RUN_TIMEOUT: block indefinitely
 */
int epoll_events_count;
if ((epoll_events_count = epoll_wait(epoll_fd, events, EPOLL_SIZE,
									 EPOLL_RUN_TIMEOUT)) < 0)

發生事件後,要判斷是新的 client 還是舊的,用發生事件的 fdlisten 判斷,如果相等表示為新的 client

/* EPOLLIN event for listener (new client connection) */
if (events[i].data.fd == listener) {
	int client;
	...
} else {
	...
}

接著分成兩種情況討論,新的 client 與舊的 client ,先討論新的連線

  • accept(): 等待來自 client 的連線請求
while ((client = accept(listener, (struct sockaddr *) &client_addr, &socklen)) > 0)

接著新增 epoll_fd 的監聽對象 (client) ,並且將 client 加到 linked list 裡

// epoll_fd 新增新的監聽對象 (client)
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client, &ev) < 0)
	server_err("Fail to control epoll", &list);
// 將 client 加到 linked list 裡
push_back_client(&list, client, inet_ntoa(client_addr.sin_addr));

如果收到的是舊的 client 的請求,對收到資料進行處理

/* EPOLLIN event for others (new incoming message from client) */
if (handle_message_from_client(events[i].data.fd, &list) < 0)
	server_err("Handle message from client", &list);
  • recv(): 收到來自 socket 的訊息
// 接收資料
if ((len = recv(client, buf, BUF_SIZE, 0)) < 0)
	server_err("Fail to receive", list);

最後如果收到資料就使用函式 send() 送到 client ,如果沒有就使用函式 close() 關閉 client 並從 linked list 移除

// 沒收到訊息
if (len == 0) {
	// 關閉 client
	if (close(client) < 0)
		server_err("Fail to close", list);
	// 將 client 從 linke list 移除
	*list = delete_client(list, client);
	printf("After fd=%d is closed, current numbers clients = %d\n", client,
		   size_list(*list));
} else {
	printf("Client #%d :> %s", client, buf);
	// 送出資料
	if (send(client, buf, BUF_SIZE, 0) < 0)
		server_err("Fail to send", list);
}

EPOLL 系統呼叫

參考 Linux 核心設計: 針對事件驅動的 I/O 模型演化 裡對於 epoll 的說明

首先 I/O 模型主要分成 Blocking I/O, Non blocking I/O, I/O multiplexing I/O 及 Asynchronous I/O 四種,而 epoll 屬於 I/O multiplexing ,以下為模型的流程圖

比較特別的地方在於 I/O Multiplexing 在呼叫系統呼叫後,會開始計算期間內已經準備好傳送資料的連線數並回傳,再經由其他系統呼叫進行資料的接收

user-echo-server 為例,在呼叫函式 epoll_wait 會開始監聽且時間由參數 timeout 決定,並且回傳監聽期間內可以開始接收資料的事件數量,可以對應到上述 I/O 模型等待資料的部份

接著 user-echo-server 使用函式 recv 的部份也可以對應上述 I/O 模型讀取資料的部份

理解 bench 原理

看了檔案 bench.c 的程式碼後,可以很清楚的了解其目的是要利用多執行緒的方式測試伺服器的效能

首先是函式 bench 的部份,一開始建立 MAX_THREAD = 1000 個執行緒,接著等到所有執行緒都準備好,使用函式 pthread_cond_broadcast 開始啟動所有執行緒,最後計算每個執行緒的平均處理時間

static void bench(void)
{
    for (int i = 0; i < BENCH_COUNT; i++) {
        ready = false;
        // 建立 client
        create_worker(MAX_THREAD);

        pthread_mutex_lock(&worker_lock);

        ready = true;

        /* all workers are ready, let's start bombing kecho */
        pthread_cond_broadcast(&worker_wait);

        pthread_mutex_unlock(&worker_lock);

        /* waiting for all workers to finish the measurement */
        for (int x = 0; x < MAX_THREAD; x++)
            pthread_join(pt[x], NULL);

        idx = 0;
    }
    // 計算平均時間
    for (int i = 0; i < MAX_THREAD; i++)
        fprintf(bench_fd, "%d %ld\n", i, time_res[i] /= BENCH_COUNT);
}

函式 pthread_cond_broadcast 參考 pthread_cond_broadcast(3) - Linux man page ,這邊的用途是開始執行所有被 worker_wait 鎖住的執行緒

The pthread_cond_broadcast() function shall unblock all threads currently blocked on the specified condition variable cond.

討論函式 create_worker ,其用途是建立執行緒,預設建立 1000 個執行緒且每個執行緒都是執行函式 bench_worker

static void create_worker(int thread_qty)
{
    for (int i = 0; i < thread_qty; i++) {
        if (pthread_create(&pt[i], NULL, bench_worker, NULL)) {
            puts("thread creation failed");
            exit(-1);
        }
    }
}

開始討論最關鍵的函式 bench_worker

static void *bench_worker(__attribute__((unused))) { int sock_fd; char dummy[MAX_MSG_LEN]; struct timeval start, end; /* wait until all workers created */ pthread_mutex_lock(&worker_lock); while (!ready) // 等待 worker_wait 被 broadcast 或是 signal if (pthread_cond_wait(&worker_wait, &worker_lock)) { puts("pthread_cond_wait failed"); exit(-1); } pthread_mutex_unlock(&worker_lock); // 建立 socket descriptor sock_fd = socket(AF_INET, SOCK_STREAM, 0); if (sock_fd == -1) { perror("socket"); exit(-1); } struct sockaddr_in info = { .sin_family = PF_INET, .sin_addr.s_addr = inet_addr(TARGET_HOST), .sin_port = htons(TARGET_PORT), }; // 建立連線 if (connect(sock_fd, (struct sockaddr *) &info, sizeof(info)) == -1) { perror("connect"); exit(-1); } gettimeofday(&start, NULL); // 送資料給 server send(sock_fd, msg_dum, strlen(msg_dum), 0); // 從 server 接收資料 recv(sock_fd, dummy, MAX_MSG_LEN, 0); gettimeofday(&end, NULL); shutdown(sock_fd, SHUT_RDWR); close(sock_fd); // 比較接受回來的資料和原本的資料是否相同 if (strncmp(msg_dum, dummy, strlen(msg_dum))) { puts("echo message validation failed"); exit(-1); } // 計算經過時間 pthread_mutex_lock(&res_lock); time_res[idx++] += time_diff_us(&start, &end); pthread_mutex_unlock(&res_lock); pthread_exit(NULL); }

第 11 行使用函式 pthread_cond_wait ,參考 pthread_cond_wait(3) - Linux man page ,可以得知目的是要等待 worker_wait 被 broadcast 或是 signal

The pthread_cond_timedwait() and pthread_cond_wait() functions shall block on a condition variable. They shall be called with mutex locked by the calling thread or undefined behavior results.

接著就是進行 socket 的建立 (第 18 行) 以及連線 (第 31 行) ,最後和 server 送收資料 (第 38 行和第 40 行) 並且開始計算經過時間 (第 54 行)

比較 kechouser-echo-server 表現

首先測試 user-echo-server 的效能,輸入命令 ./user-echo-server 並且開啟新的終端機作為 client 輸入命令 ./bench ,最後使用 gnuplot 作圖,輸入命令 make plot ,結果如下

接著測試 kecho 的效能,首先輸入命令 sudo insmod kecho.ko ,接著輸入命令 ./bench ,最後畫圖,結果如下所示

可以明顯觀察 kecho 的速度比 user-echo-server

嘗試使用實驗室(效能較好)的電腦測試

  • user-echo-server

  • kecho

可以發現效能其實差非常多

解釋 drop-tcp-socket 核心模組運作原理

輸入命令 sudo insmod drop-tcp-socket.ko 載入核心模組時,會執行函式 drop_tcp_init

static int drop_tcp_init(void)
{
    int res = register_pernet_subsys(&droptcp_pernet_ops);
    if (res)
        return res;
    return 0;
}

開發紀錄

作業要求

  • 在 GitHub 上 fork kecho,目標是修正 kecho 的執行時期的缺失,提升效能和穩健度 (robustness)
    • 若使用者層級的程式頻繁傳遞過長的字串給 kecho 核心模組,會發生什麼事?
    • 參照 kecho pull request #1,嘗試比較 kthread 為基礎的實作和 CMWQ,指出兩者效能的落差並解釋
    • 如果使用者層級的程式建立與 kecho 核心模組的連線後,就長期等待,會導致什麼問題?
    • 研讀 Linux Applications Performance: Introduction,嘗試將上述實作列入考量,比較多種 TCP 伺服器實作手法的效能表現

掛載 kecho 模組

在研究 kecho 的行為前,先分析 kecho 的實作考量,首先討論掛載模組時會執行的函式 kecho_init_module

static int kecho_init_module(void)
{
    int error = open_listen(&listen_sock);
    if (error < 0) {
        printk(KERN_ERR MODULE_NAME ": listen socket open error\n");
        return error;
    }

    param.listen_sock = listen_sock;

    /*
     * Create a dedicated workqueue instead of using system_wq
     * since the task could be a CPU-intensive work item
     * if its lifetime of connection is too long, e.g., using
     * `telnet` to communicate with kecho. Flag WQ_UNBOUND
     * fits this scenario. Note that the trade-off of this
     * flag is cache locality.
     *
     * You can specify module parameter "bench=1" if you won't
     * use telnet-like program to interact with the module.
     * This earns you better cache locality than using default
     * flag, `WQ_UNBOUND`. Note that your machine may going
     * unstable if you use telnet-like program along with
     * module parameter "bench=1" to interact with the module.
     * Since without `WQ_UNBOUND` flag specified, a
     * long-running task may delay other tasks in the kernel.
     */
    kecho_wq = alloc_workqueue(MODULE_NAME, bench ? 0 : WQ_UNBOUND, 0);
    echo_server = kthread_run(echo_server_daemon, &param, MODULE_NAME);
    if (IS_ERR(echo_server)) {
        printk(KERN_ERR MODULE_NAME ": cannot start server daemon\n");
        close_listen(listen_sock);
    }

    return 0;
}

可以簡單把函式 kecho_init_module 用函式分成三個部份,著重在 alloc_workqueuekthread_run

  • open_listen: 建立伺服器並等待連線
  • alloc_workqueue: 分配一個 workequeue
  • kthread_run: 用於建立一個立刻執行的執行緒

根據註解說明,可以很清楚的知道 kecho 利用參數 bench 決定 workqueue 的設定,參考 Application Programming Interface (API)

kecho_wq = alloc_workqueue(MODULE_NAME, bench ? 0 : WQ_UNBOUND, 0);

函式 kthread_run 的部份可以參考 Linux核心多執行緒kthreadinclude/linux/kthread.h ,其功能是用來建立執行緒並直接執行,以下為原始碼的部份

/**
 * kthread_run - create and wake a thread.
 * @threadfn: the function to run until signal_pending(current).
 * @data: data ptr for @threadfn.
 * @namefmt: printf-style name for the thread.
 *
 * Description: Convenient wrapper for kthread_create() followed by
 * wake_up_process().  Returns the kthread or ERR_PTR(-ENOMEM).
 */
#define kthread_run(threadfn, data, namefmt, ...)			   \
({									   \
	struct task_struct *__k						   \
		= kthread_create(threadfn, data, namefmt, ## __VA_ARGS__); \
	if (!IS_ERR(__k))						   \
		wake_up_process(__k);					   \
	__k;								   \
})

可以清楚知道 kthread_run 先使用 kthread_create 建立執行緒,接著使用 IS_ERR 判斷建立有無出現問題,程式若正常就直接使用 wake_up_process 啟動執行緒並且執行

根據 kecho 原始碼以及 Linux 核心原始碼的註解,可以得知 kthread_run 會做以下的動作

  • 執行緒會執行函式 echo_server_daemon 直到 signal_pending(current) ,相關敘述可以在函式 echo_server_daemon 找到
  • 變數 param 會作為 echo_server_daemon 的參數傳入
  • 字串 "kecho" 作為執行緒的名稱

接著可以開始分析 echo_server_daemon 函式

int echo_server_daemon(void *arg)
{
    struct echo_server_param *param = arg;
    struct socket *sock;
    struct work_struct *work;

    // 登記要接收的 Signal
    allow_signal(SIGKILL);
    allow_signal(SIGTERM);

    INIT_LIST_HEAD(&daemon.worker);

    // 判斷執行緒是否該被中止
    while (!kthread_should_stop()) {
        /* using blocking I/O */
        int error = kernel_accept(param->listen_sock, &sock, 0);
        if (error < 0) {
            // 檢查當前執行緒是否有 signal 處理
            if (signal_pending(current))
                break;
            printk(KERN_ERR MODULE_NAME ": socket accept error = %d\n", error);
            continue;
        }

        if (unlikely(!(work = create_work(sock)))) {
            printk(KERN_ERR MODULE_NAME
                   ": create work error, connection closed\n");
            kernel_sock_shutdown(sock, SHUT_RDWR);
            sock_release(sock);
            continue;
        }

        /* start server worker */
        queue_work(kecho_wq, work);
    }

    printk(MODULE_NAME ": daemon shutdown in progress...\n");

    daemon.is_stopped = true;
    free_work();

    return 0;
}

函式 echo_server_daemon 會做以下幾件事

  1. 登記會使用到的 signal ,這裡用到 SIGKILLSIGTERM ,兩者都是用來結束程式,接著參考 SIGTERM vs SIGKILL: What's the Difference? 可以了解兩者的差異
  2. 使用 kthread_should_stop 判斷執行緒是否該被中止
  3. 使用 kernel_accept 檢查是否有上述的 signal 發生,有的話就跳出迴圈並且結束程式
  4. 和 client 建立連線後,使用函式 create_work 建立工作
  5. 一切準備就緒,使用函式 queue_work 開始工作

接著查看函式 create_work

static struct work_struct *create_work(struct socket *sk)
{
    struct kecho *work;

    // 分配 kecho 結構大小的空間
    // GFP_KERNEL: 正常配置記憶體
    if (!(work = kmalloc(sizeof(struct kecho), GFP_KERNEL)))
        return NULL;

    work->sock = sk;

    // 初始化已經建立的 work ,且會執行函式 echo_server_worker
    INIT_WORK(&work->kecho_work, echo_server_worker);

    list_add(&work->list, &daemon.worker);

    return &work->kecho_work;
}

很明顯函式 create_work 只是要把每個建立的連線資料作為 linked list 的節點儲存起來,而每個連線都會執行函式 echo_server_worker (後續會有更多分析)

看完 kecho 大致的流程,加上以下定義的結構,可以完全得知 kecho 是如何管理每個連線

struct echo_service {
    bool is_stopped;
    struct list_head worker;
};

struct kecho {
    struct socket *sock;
    struct list_head list;
    struct work_struct kecho_work;
};






G



echo_service

echo_service

is_stopped

worker



kecho0

kecho 0

sock

list

kecho_work



echo_service:h->kecho0:l





kecho0:l->echo_service:h





kecho1

kecho 1

sock

list

kecho_work



kecho0:l->kecho1:l





kecho1:l->kecho0:l





block
...



kecho1:l->block:l





kechon

kecho n

sock

list

kecho_work



kechon:l->block:l





block:l->kecho1:l





block:l->kechon:l





kecho 使用參數 is_stopped 判斷整個程式是否需要結束,並且使用 worker 作為 linked list 的頭將每個連線資料串連起來

執行函式 echo_server_worker

kecho 裡,每個連線請求被建立後都會執行函式 echo_server_worker

static void echo_server_worker(struct work_struct *work)
{
    struct kecho *worker = container_of(work, struct kecho, kecho_work);
    unsigned char *buf;

    // 取得 buffer 空間
    buf = kzalloc(BUF_SIZE, GFP_KERNEL);
    if (!buf) {
        printk(KERN_ERR MODULE_NAME ": kmalloc error....\n");
        return;
    }

    // 當程式還沒有要中斷前,執行無限迴圈
    while (!daemon.is_stopped) {
        // 取得資料
        int res = get_request(worker->sock, buf, BUF_SIZE - 1);
        if (res <= 0) {
            if (res) {
                printk(KERN_ERR MODULE_NAME ": get request error = %d\n", res);
            }
            break;
        }
        // 回傳資料
        res = send_request(worker->sock, buf, res);
        if (res < 0) {
            printk(KERN_ERR MODULE_NAME ": send request error = %d\n", res);
            break;
        }
        // 重置 buffer
        memset(buf, 0, res);
    }

    kernel_sock_shutdown(worker->sock, SHUT_RDWR);
    kfree(buf);
}

很明顯函式 echo_server_worker 就是負責取得資料並且回傳相同資料的函式,使用函式 get_requestsend_request 分別進行資料的收及送

首先觀察函式 get_request ,很明顯就只是用 kernel_recvmsg 接收來自 socket 的資料,不過有蠻多特別的結構可以深入研究

static int get_request(struct socket *sock, unsigned char *buf, size_t size)
{
    struct msghdr msg;
    struct kvec vec;
    int length;

    /* kvec setting */
    vec.iov_len = size;
    vec.iov_base = buf;

    /* msghdr setting */
    msg.msg_name = 0;
    msg.msg_namelen = 0;
    msg.msg_control = NULL;
    msg.msg_controllen = 0;
    msg.msg_flags = 0;

    /*
     * TODO: during benchmarking, such printk() is useless and lead to worse
     * result. Add a specific build flag for these printk() would be good.
     */
    printk(MODULE_NAME ": start get response\n");
    /* get msg */
    length = kernel_recvmsg(sock, &msg, &vec, size, size, msg.msg_flags);
    printk(MODULE_NAME ": get request = %s\n", buf);

    return length;
}

參考 kernel_recvmsg 可以得到其定義

// 從 socket 取得資料 (message)
int kernel_recvmsg (struct socket * sock, // The socket to receive the message from
                    struct msghdr * msg,  // Received message
                    struct kvec * vec,    // Input s/g array for message data
                    size_t num,           // Size of input s/g array
                    size_t size,          // Number of bytes to read
                    int flags             // Message flags (MSG_DONTWAIT, etc...));

接著可以分析結構 msghdrkvec ,分別定義在 include/linux/socket.hinclude/linux/uio.h

struct msghdr {
	void		*msg_name;	/* ptr to socket address structure */
	int		msg_namelen;	/* size of socket address structure */
	struct iov_iter	msg_iter;	/* data */

	/*
	 * Ancillary data. msg_control_user is the user buffer used for the
	 * recv* side when msg_control_is_user is set, msg_control is the kernel
	 * buffer used for all other cases.
	 */
	union {
		void		*msg_control;
		void __user	*msg_control_user;
	};
	bool		msg_control_is_user : 1;
	__kernel_size_t	msg_controllen;	/* ancillary data buffer length */
	unsigned int	msg_flags;	/* flags on received message */
	struct kiocb	*msg_iocb;	/* ptr to iocb for async requests */
};

struct kvec {
	void *iov_base; /* and that should *never* hold a userland pointer */
	size_t iov_len;
};

有了以上的結構後,可以對應 get_request 的程式碼,對變數 msg 來說沒有特別的設定,而對變數 vec 來說,很明顯可以看到資料最後會送到 vec.iov_base 所指向的地址

接著我很好奇函式 kernel_recvmsg 對參數 vec 的註解

Input s/g array for message data

裡面的 s/g 的意思為 scatter/gather ,搜尋後查到 DMA 有相關的功能,因此可以推論 kernel_recvmsg 在讀取來自 socket 的資料時應該會用到 DMA (不是很確定)

接著可以開始分析函式 send_request ,主要邏輯都和

static int send_request(struct socket *sock, unsigned char *buf, size_t size)
{
    int length;
    struct kvec vec;
    struct msghdr msg;

    msg.msg_name = NULL;
    msg.msg_namelen = 0;
    msg.msg_control = NULL;
    msg.msg_controllen = 0;
    msg.msg_flags = 0;

    vec.iov_base = buf;
    vec.iov_len = strlen(buf);

    printk(MODULE_NAME ": start send request.\n");

    length = kernel_sendmsg(sock, &msg, &vec, 1, size);

    printk(MODULE_NAME ": send request = %s\n", buf);

    return length;
}

參考 kernel_sendmsg 以下為其定義

int kernel_sendmsg(struct socket * sock, // socket
                   struct msghdr * msg,  // message header
                   struct kvec * vec,    // kernel vec
                   size_t num,           // vec array length
                   size_t size)          // total message data size

kernel_recvmsg 的用法差不多,只差在這裡是使用結構 vec 將資料輸出

改寫 benchmarking

在處理資料時, kecho 原本的實作會經過不少的 printk ,導致 bench 計算的時間會包含到這些函式,產生多餘的耗能,因此首先需要移除多餘函式的使用

參考 kevinshieh0225 的開發紀錄 裡提到的 sysprog21/concurrent-programs 進行模仿,在程式結束時再把紀錄印出

echo_server.h 新增

enum {
    TRACE_kzalloc_err = 1, // kzalloc 失敗的次數
    TRACE_get_err,         // get request 失敗的次數
    TRACE_send_err,        // send request 失敗的次數
    TRACE_recvmsg,     	   // recvmsg 的次數
    TRACE_sendmsg,         // sendmsg 的次數
    TRACE_accept_err,      // accept 失敗的次數
    TRACE_work_err         // 建立 work 失敗的次數
};

struct runtime_state {
    atomic_t kzalloc_err, get_err;
    atomic_t send_err, recvmsg;
    atomic_t sendmsg, accept_err;
    atomic_t work_err;
};

#define TRACE(ops)                           \
    do {                                     \
        if (TRACE_##ops)                     \
            atomic_add(1, &states.ops); \
    } while (0)

首先列舉的成員是為了讓巨集 TRACE(ops) 判斷是否真的有該成員,有的話就使用函式 atomic_add1

echo_server.c 新增 do_analysis 用來印出資訊

static void do_analysis(void)
{
    __atomic_thread_fence(__ATOMIC_SEQ_CST);
#define TRACE_PRINT(ops) printk(MODULE_NAME ": %s : %d\n", #ops, atomic_read(&states.ops));
    TRACE_PRINT(recvmsg);
    TRACE_PRINT(sendmsg);
    TRACE_PRINT(kzalloc_err);
    TRACE_PRINT(get_err);
    TRACE_PRINT(send_err);
    TRACE_PRINT(accept_err);
    TRACE_PRINT(work_err);
}

最後印出結果

[69825.652096] kecho: recvmsg     : 40060
[69825.652098] kecho: sendmsg     : 20030
[69825.652100] kecho: kzalloc_err : 0
[69825.652101] kecho: get_err     : 0
[69825.652102] kecho: send_err    : 0
[69825.652103] kecho: accept_err  : 0
[69825.652105] kecho: work_err    : 0

發現一件有趣的事, recvmsg 的數量居然是 sendmsg 的兩倍,嘗試將 bench.c 的執行緒數量改成 1 後並重新開啟 printk 再測試

[69294.983057] kecho: start get response
[69294.983068] kecho: get request = dummy message
[69294.983073] kecho: start send request.
[69294.983106] kecho: send request = dummy message
[69294.983109] kecho: start get response
[69294.983148] kecho: get request = 

參考 CS:APP 的伺服器架構,可以明顯發現,在中斷連線時, client 是會對 server 發出中斷請求,這也是為什麼實際收到資料的次數會是送出的 2 倍

接著稍微修改函式 get_request ,主要判斷如果不是 EOF 的話就將接收資料的次數加 1

static int get_request(struct socket *sock, unsigned char *buf, size_t size)
{
    struct msghdr msg;
    struct kvec vec;
    int length;

    /* kvec setting */
    vec.iov_len = size;
    vec.iov_base = buf;

    /* msghdr setting */
    msg.msg_name = 0;
    msg.msg_namelen = 0;
    msg.msg_control = NULL;
    msg.msg_controllen = 0;
    msg.msg_flags = 0;

    /* get msg */
    length = kernel_recvmsg(sock, &msg, &vec, size, size, msg.msg_flags);
+   if (likely(length))  /* Not EOF */
+       TRACE(recvmsg);
+
    return length;
}

另外在結構 runtime_state 新增計算 shutdown 總數的變數 shutdown ,修改程式並重新測試

[602673.678105] kecho: recvmsg : 10002
[602673.678106] kecho: sendmsg : 10002
[602673.678106] kecho: shutdown : 10001
[602673.678107] kecho: kzalloc_err : 0
[602673.678107] kecho: get_err : 0
[602673.678107] kecho: send_err : 0
[602673.678108] kecho: accept_err : 0
[602673.678108] kecho: work_err : 0

如此一來,已經符合接收和發送的總數

最後和修改之前的程式比較效能

  • 含有 printk

  • 不含 printk

也特別用實驗室電腦測試

  • 含有 printk

  • 不含 printk

很明顯的, server 的效率變得更好了

建立獨特字串

為了模擬更符合現實世界的情況,每次的測試字串也應當不同,參考了 quiz8 - 測驗 1 裡提及的 SWAR 手法,以下為產生隨機字串的程式碼

#define GENRAND64(X) (((X) & 0x7F7F7F7F7F7F7F7F) | 0x2020202020202020)
#define GENRAND8(X)  (((X) & 0x7F) | 0x20)

#define DETECT_NULL(X)         (((X) -0x0101010101010101) & ~(X) & 0x8080808080808080)
#define DETECT_CHAR(X, MASK)   (DETECT_NULL((X) ^ (MASK)))

static void GenRandString(char *str)
{
    int size = (rand() & (MAX_MSG_LEN - 1)) + 1;
    uint64_t *lptr = (uint64_t *) str;

    while (size >= 8) {
        uint64_t rand64 = (uint64_t) rand() << 32 | rand();
        *lptr = GENRAND64(rand64);
        // 如果偵測到 DEL
        if (DETECT_CHAR(*lptr, 0x7F7F7F7F7F7F7F7F))
            continue;
        lptr++;
        size -= 8;
    }

    char *cptr = (char *) lptr;

    while (size) {
        *cptr = GENRAND8(rand());
        // 如果產生 DEL
        if (*cptr == 0x7F)
            continue;
        cptr++;
        size--;
    }   
    *cptr = '\0';
}

上述的程式主要產生長度為 1 ~ MAX_MSG_LEN 的字串,當字串長度大於等於 8 時,會一次計算 8 個字元直到長度小於 8 為止,而選擇的字元範圍參考 ASCII 可顯示的字元,範圍為 32 ~ 126

透過巨集 GENRAND64 產生 8 個位元組且每個位元組的範圍的數值介在 32 ~ 127 ,接著透過巨集 DETECT_CHAR 去偵測是否有位元組的值為 127 (為 Delete 字元),如果存在就重新產生一次

以下為部份產生的字串樣貌

str = )v/|,<%$;kgk9?/tcrtp'o7b9rg
str = ?l:2ck{g,l&&0u$h}og~
str = !h+fjmo!l-s#= )w7kv!'t4wp*dsy)y(
str = x9|644 tm&vu4~zfj
str = -zug!v6 1k;h:#j(xdn3y&{m!!v`
str = j)zme2:-#{rs?!rm%}0,
str = =&?4ex7|yvr#t+#&k>tll
str = %:-;}ngs~<~sg6;6
str = =~z#(nw

傳遞過長的字串給 kecho

首先使用命令 telnet localhost 12345 測試 kecho 可以回傳的長度的限制。直接講結論,當輸入的字串長度大於 4095 時,可以很明顯觀察到回傳的字串是有缺少的,可以從以下 kecho 的程式碼觀察

length = kernel_recvmsg(sock, &msg, &vec, size, size, msg.msg_flags);

函式 kernel_recvmsgkecho 模組中用來接收資料的函式,其中從右數來第二個參數 size 是用來決定接收資料的最長長度,而這裡的大小為巨集 BUF_SIZE 所定義

#define BUF_SIZE 4096

因此最後可以得知目前實作的限制,即接收長度不能超過 4095 ,否則會產生缺少字元的風險

TODO: 思考如何解決字串長度太大的問題中

參照 kecho pull request #1