Try   HackMD

2020q1 Homework4 (khttpd)

contributed by < KYG-yaya573142 >

H08: khttpd

預期目標

模組掛載如何使用初始化的參數

khttpd 可在掛載模組時自訂 portbacklog 的初始化數值,相關的程式碼如下

static ushort port = DEFAULT_PORT;
module_param(port, ushort, S_IRUGO);
static ushort backlog = DEFAULT_BACKLOG;
module_param(backlog, ushort, S_IRUGO);

/include/linux/moduleparam.h 觀察可以發現 module_param 被很多層 wrapper 包住,以下截取相關的部分

#define module_param(name, type, perm)				\
	module_param_named(name, name, type, perm)
    
#define module_param_named(name, value, type, perm)			   \
	param_check_##type(name, &(value));				   \
	module_param_cb(name, &param_ops_##type, &value, perm);		   \
	__MODULE_PARM_TYPE(name, #type)
    
#define module_param_cb(name, ops, arg, perm)				      \
	__module_param_call(MODULE_PARAM_PREFIX, name, ops, arg, perm, -1, 0)
    
#define MODULE_PARAM_PREFIX /* empty */
  • 最終負責的部分是 __module_param_call
    • module_param(port, ushort, S_IRUGO) 為例,展開後是 __module_param_call(MODULE_PARAM_PREFIX, port, &param_ops_ushort, &port, S_IRUGO, -1, 0)
  • param_check_##type(name, &(value)) 會在 compile-time checking,可參閱 __param_check 的註解
  • __MODULE_PARM_TYPE 實際上會展開成 __MODULE_INFO,相關的說明可參閱 fibdrv 作業說明 - Linux 核心模組掛載機制
/* This is the fundamental function for registering boot/module
   parameters. */
#define __module_param_call(prefix, name, ops, arg, perm, level, flags)	\
	/* Default value instead of permissions? */			\
	static const char __param_str_##name[] = prefix #name;		\
	static struct kernel_param __moduleparam_const __param_##name	\
	__used								\
    __attribute__ ((unused,__section__ ("__param"),aligned(sizeof(void *)))) \
	= { __param_str_##name, THIS_MODULE, ops,			\
	    VERIFY_OCTAL_PERMISSIONS(perm), level, flags, { arg } }
  • # - Stringification
  • ## - Concatenation
  • 關鍵字 __attribute__ 會告知 compiler 以下事項
    • 此變數可能不會被使用,不須警告
    • 要放在 __param 這個 section,可用 objdump -s khttpd.ko 查詢
    • 要以 pointer 的大小進行對齊
  • 因此 __module_param_call 最終會定義一個資料類型為 struct kernel_param 的物件並放在 khttpd.ko 的 __param 這個 section

根據 struct kernel_param 的定義,以 module_param(port, ushort, S_IRUGO) 為例進行展開如下

struct kernel_param {
	const char *name = "port";
	struct module *mod = THIS_MODULE;
	const struct kernel_param_ops *ops = &param_ops_ushort;
	const u16 perm = 0444; //S_IRUGO
	s8 level = -1;
	u8 flags = 0;
	union {
		void *arg = &port;
		const struct kparam_string *str;
		const struct kparam_array *arr;
	};
}__param_port;

readelf -r khttpd.ko 觀察,可以看到 __param 內確實有對應的資料

Relocation section '.rela__param' at offset 0xcb78 contains 8 entries:
  Offset          Info           Type           Sym. Value    Sym. Name + Addend
000000000000  000500000001 R_X86_64_64       0000000000000000 .rodata + 1240
000000000008  004200000001 R_X86_64_64       0000000000000000 __this_module + 0
000000000010  004000000001 R_X86_64_64       0000000000000000 param_ops_ushort + 0
000000000020  000b00000001 R_X86_64_64       0000000000000000 .data + 4
000000000028  000500000001 R_X86_64_64       0000000000000000 .rodata + 1248
000000000030  004200000001 R_X86_64_64       0000000000000000 __this_module + 0
000000000038  004000000001 R_X86_64_64       0000000000000000 param_ops_ushort + 0
000000000048  000b00000001 R_X86_64_64       0000000000000000 .data + 6.data

再用 objdump -s -j .rodata khttpd.ko 對照 .rodata 的部分,可以確認 __param 內的兩筆資料就是 module_param(port, ushort, S_IRUGO)module_param(backlog, ushort, S_IRUGO)

khttpd.ko:     file format elf64-x86-64

Contents of section .rodata:
 0000 00000000 00000000 00000000 00000000  ................
 ...
 1240 6261636b 6c6f6700 706f7274 00        backlog.port.  

接著使用 strace 觀察 insmod 時使用的 system calls,比對指定參數和不指定參數的狀況,可以發現差異主要是 finit_module 的參數會不同

$ sudo strace insmod khttpd.ko
...
finit_module(3, "", 0)                  = 0
...

$ sudo strace insmod khttpd.ko port=1999
...
finit_module(3, "port=1999", 0)         = 0
...

finit_module 是 system call,其定義可以在 /kernel/module.c 中找到

SYSCALL_DEFINE3(finit_module, int, fd, const char __user *, uargs, int, flags)
{
	...
	return load_module(&info, uargs, flags);
}
  • 主要的實作則是 load_module
  • finit_module(3, "port=1999", 0) 的第 2 個參數 "port=1999" 對照到 uargs
/* Allocate and load the module: note that size of section 0 is always
   zero, and we rely on this for optional sections. */
static int load_module(struct load_info *info, const char __user *uargs,
		       int flags)
{
	...
	/* Now copy in args */
	mod->args = strndup_user(uargs, ~0UL >> 1);
	if (IS_ERR(mod->args)) {
		err = PTR_ERR(mod->args);
		goto free_arch_cleanup;
	}
	...
	/* Module is ready to execute: parsing args may do that. */
	after_dashes = parse_args(mod->name, mod->args, mod->kp, mod->num_kp,
				  -32768, 32767, mod,
				  unknown_module_param_cb);
	...
}
  • "port=1999" 會被複製到 mod->args
  • 接著使用 parse_args 來執行
/* Args looks like "foo=bar,bar2 baz=fuz wiz". */
char *parse_args(const char *doing,
		 char *args,
		 const struct kernel_param *params,
		 unsigned num,
		 s16 min_level,
		 s16 max_level,
		 void *arg,
		 int (*unknown)(char *param, char *val,
				const char *doing, void *arg))
{
	char *param, *val, *err = NULL;

	/* Chew leading spaces */
	args = skip_spaces(args);

	if (*args)
		pr_debug("doing %s, parsing ARGS: '%s'\n", doing, args);

	while (*args) {
		int ret;
		int irq_was_disabled;

		args = next_arg(args, &param, &val);
		/* Stop at -- */
		if (!val && strcmp(param, "--") == 0)
			return err ?: args;
		irq_was_disabled = irqs_disabled();
		ret = parse_one(param, val, doing, params, num,
				min_level, max_level, arg, unknown);
	...
    }
  • next_arg 會解析原本的 "param=val" 然後將結果存到變數 paramval
  • parse_one 是真正將數值更新到對應的 kernel_param 物件的函數
static int parse_one(char *param,
		     char *val,
		     const char *doing,
		     const struct kernel_param *params,
		     unsigned num_params,
		     s16 min_level,
		     s16 max_level,
		     void *arg,
		     int (*handle_unknown)(char *param, char *val,
				     const char *doing, void *arg))
{
	unsigned int i;
	int err;

	/* Find parameter */
	for (i = 0; i < num_params; i++) {
		if (parameq(param, params[i].name)) {
		...
			kernel_param_lock(params[i].mod);
			if (param_check_unsafe(&params[i]))
				err = params[i].ops->set(val, &params[i]);
			else
				err = -EPERM;
			kernel_param_unlock(params[i].mod);
			return err;
		}
	}

	if (handle_unknown) {
		pr_debug("doing %s: %s='%s'\n", doing, param, val);
		return handle_unknown(param, val, doing, arg);
	}

	pr_debug("Unknown argument '%s'\n", param);
	return -ENOENT;
}
  • if (parameq(param, params[i].name)) 尋找參數對應到的 kernel_param 物件
  • err = params[i].ops->set(val, &params[i]) 使用 kernel_param 物件中參照的 handler 來設置數值
    • 接續前面的例子, ops 指的就是 param_ops_ushort

kHTTPd 與 CSAPP server 的異同

CSAPP 中提到的 Web server 架構如下圖

kHTTPd 執行 server 的流程如下







khttpd


cluster_init

khttpd_init


cluster_daemon

http_server_daemon



proc1

open_listen_socket

sock_create
kernel_bind
kernel_listen



next1

kthread_run



proc1->next1





accept

kernel_accept



next1->accept





next2

kthread_run



accept->next2





worker1

worker



next2->worker1





worker2

worker



next2->worker2





worker3

worker



next2->worker3





比對兩者,可以發現差異主要是

  • CSAPP 的 server 架構在 user space 實行
    • CSAPP 的示意圖雖然沒有使用 thread,但後續的課程有改寫為使用 thread 的 server
  • kHTTPd 的 server 架構則在 kernel space 實行
  • 對應的 kernel api 如下

觀察 Linux Networking api

觀察 /net/socket.ckernel_bindkernel_listenkernel_accept 的實作,會發現全部都是呼叫 sock->ops 中對應的 function pointer,顯然不同的 protocol family 會定義各自的實作,再根據呼叫 sock_create 時使用的參數來取用對應的 function pointer

int kernel_bind(struct socket *sock, struct sockaddr *addr, int addrlen)
{
	return sock->ops->bind(sock, addr, addrlen);
}

int kernel_listen(struct socket *sock, int backlog)
{
	return sock->ops->listen(sock, backlog);
}

int kernel_accept(struct socket *sock, struct socket **newsock, int flags)
{
	...
	err = sock->ops->accept(sock, *newsock, flags, true);
	...
}

sock->ops 會在 sock_create 內進行初始化,但整個流程很不直觀,首先觀察 sock_create 中的 pf->create

int __sock_create(struct net *net, int family, int type, int protocol,
			 struct socket **res, int kern)
{
	...
	const struct net_proto_family *pf;
	...
	pf = rcu_dereference(net_families[family]);
	...
	err = pf->create(net, sock, protocol, kern);
	...
}

/net/ipv4/af_inet.c 可以發現 pf->create 其實就是 inet_create (註:inet_init 會呼叫 sock_register 來初始化 net_families[])

static const struct net_proto_family inet_family_ops = {
	.family = PF_INET,
	.create = inet_create,
	.owner	= THIS_MODULE,
};

觀察 inet_create 會發現 sock->ops 就是在這裡被設置的,其中 list_for_each_entry_rcu 會將 answer 指向 inetsw[] 中對應的 type/protocol pair

static int inet_create(struct net *net, struct socket *sock, int protocol,
		       int kern)
{
	...
	/* Look for the requested type/protocol pair. */
lookup_protocol:
	err = -ESOCKTNOSUPPORT;
	rcu_read_lock();
	list_for_each_entry_rcu(answer, &inetsw[sock->type], list) {
		...
        }
	...
	sock->ops = answer->ops;
	...
}

進一步查看 inetsw 會發現其中確實記錄著 type/protocol pair 的相關資料,例如當 type 是 SOCK_STREAM 時,對應的 opsinet_stream_ops

/* Upon startup we insert all the elements in inetsw_array[] into
 * the linked list inetsw.
 */
static struct inet_protosw inetsw_array[] =
{
	{
		.type =       SOCK_STREAM,
		.protocol =   IPPROTO_TCP,
		.prot =       &tcp_prot,
		.ops =        &inet_stream_ops,
		.flags =      INET_PROTOSW_PERMANENT |
			      INET_PROTOSW_ICSK,
	},
	...
}

最後就可以在 inet_stream_ops[] 中找到對應函數的實際名稱了

const struct proto_ops inet_stream_ops = {
	.family		   = PF_INET,
	.owner		   = THIS_MODULE,
	.release	   = inet_release,
	.bind		   = inet_bind,
	.connect	   = inet_stream_connect,
	.socketpair	   = sock_no_socketpair,
	.accept		   = inet_accept,
	.getname	   = inet_getname,
	.poll		   = tcp_poll,
	.ioctl		   = inet_ioctl,
	.gettstamp	   = sock_gettstamp,
	.listen		   = inet_listen,
	.shutdown	   = inet_shutdown,
	.setsockopt	   = sock_common_setsockopt,
	.getsockopt	   = sock_common_getsockopt,
	.sendmsg	   = inet_sendmsg,
	.recvmsg	   = inet_recvmsg,
#ifdef CONFIG_MMU
	.mmap		   = tcp_mmap,
#endif
	.sendpage	   = inet_sendpage,
	.splice_read	   = tcp_splice_read,
	.read_sock	   = tcp_read_sock,
	.sendmsg_locked    = tcp_sendmsg_locked,
	.sendpage_locked   = tcp_sendpage_locked,
	.peek_len	   = tcp_peek_len,
#ifdef CONFIG_COMPAT
	.compat_setsockopt = compat_sock_common_setsockopt,
	.compat_getsockopt = compat_sock_common_getsockopt,
	.compat_ioctl	   = inet_compat_ioctl,
#endif
	.set_rcvlowat	   = tcp_set_rcvlowat,
};

整合 fibdrv

khttpd 運作邏輯

khttpd 由 1 個 kthread http_server_daemon 負責 kernel_accept,每次 accept 成功後會產生 kthread http_server_worker 來實際執行任務。

static int http_server_worker(void *arg)
{
    char *buf;
    struct http_parser parser;
    struct http_parser_settings setting = {
        .on_message_begin = http_parser_callback_message_begin,
        .on_url = http_parser_callback_request_url,
        .on_header_field = http_parser_callback_header_field,
        .on_header_value = http_parser_callback_header_value,
        .on_headers_complete = http_parser_callback_headers_complete,
        .on_body = http_parser_callback_body,
        .on_message_complete = http_parser_callback_message_complete};
    ...
    while (!kthread_should_stop()) {
        int ret = http_server_recv(socket, buf, RECV_BUFFER_SIZE - 1);
        if (ret <= 0) {
            if (ret)
                pr_err("recv error: %d\n", ret);
            break;
        }
        http_parser_execute(&parser, &setting, buf, ret);
        if (request.complete && !http_should_keep_alive(&parser))
            break;
    }
    ...
}
  • 運作的架構使用 HTTP Parser APIs
    • 每個連線都會有一個對應的 http_parser 物件
    • 根據設置在 http_parser_settings 物件的 callback functions 來處理 HTTP Messages
    • callback functions 需藉由 parser->data 來取用 kthread scope 的資料 (thread safe)
  • 運作的邏輯大致如下
    • 初始化 HTTP Parser 相關的部分,其中 parser->data 會指向 worker 用來儲存資料的 http_request 物件
    • http_server_recv 從 socket 接收訊息至 buf
    • http_parser_execute 開始執行 HTTP Parser 的各個 callback functions
    • 處理 url (使用 on_url 指向的 callback)
    • 依序處理 HTTP header fields 和 values (使用 on_header_fieldon_header_value 指向的 callback)
    • 處理 HTTP body (使用 on_body 指向的 callback)
    • http_server_response 執行對應的動作,產生送回給 client 的資料
    • http_server_send 送回資料給 client

加入 fibdrv

http_server_response 原本執行的動作很單純,將 URL 輸出至 kernel ring buffer,然後根據 client request 的內容回傳不同的固定訊息給 client

static int http_server_response(struct http_request *request, int keep_alive)
{
    char *response;

    pr_info("requested_url = %s\n", request->request_url);
    if (request->method != HTTP_GET)
        response = keep_alive ? HTTP_RESPONSE_501_KEEPALIVE : HTTP_RESPONSE_501;
    else
        response = keep_alive ? HTTP_RESPONSE_200_KEEPALIVE_DUMMY
                              : HTTP_RESPONSE_200_DUMMY;
    http_server_send(request->socket, response, strlen(response));
    return 0;
}

接下來會改寫 http_server_response,追加能計算費氏數列的功能,改寫的過程主要可分為兩個項目

  • http_server_response - 追加從 url 偵測關鍵字並呼叫 do_fibonacci 的能力
  • do_fibonacci - 除了計算費氏數列外,還包含如何從 url 拆解所需參數、如何實現字串轉與數值的轉換、以及簡易的防呆處理

http_server_response

#include "fibdrv.h"
...
#define HTTP_RESPONSE_200                                      \
    ""                                                         \
    "HTTP/1.1 %s" CRLF "Server: " KBUILD_MODNAME CRLF          \
    "Content-Type: text/plain" CRLF "Content-Length: %lu" CRLF \
    "Connection: %s" CRLF CRLF "%s" CRLF
...
static int http_server_response(struct http_request *request, int keep_alive)
{
    char *connection = keep_alive? "Keep-Alive" : "Close";
    char *status = "501 Not Implemented";
    char *body = "501 Not Implemented";
    char *response = NULL;
    char *target = NULL;
    bool flag = 0;

    pr_info("requested_url = %s\n", request->request_url);
    if (request->method == HTTP_GET) {
        status = "200 OK";
        body = "Hello World!"; //default msg
    }

    /* fib function entry */
    if ((target = strstr(request->request_url, "/fib/"))) {
        flag = 1;
        body = do_fibonacci(target);
    }
    
    response = kmalloc(MSG_BUFFER_SIZE, GFP_KERNEL);
    snprintf(response, MSG_BUFFER_SIZE, HTTP_RESPONSE_200, status, strlen(body), connection, body);
    http_server_send(request->socket, response, strlen(response));
    kfree(response);
    if (flag)
        kfree(body);
    return 0;
}
  • if ((target = strstr(request->request_url, "/fib/"))) 負責偵測是否需要呼叫 do_fibonacci,關鍵字為 "\fib\"
    • 使用 strstr 而不是 strnstr 是因為我有改寫 http_parser_callback_request_url,確保 request->request_url 一定是 null terminated,後續討論會提到
  • 由於 do_fibonacci 會使用 kmalloc 來存放回傳的字串,因此 server 回傳資料後須 kfree
  • 只是單純要加入費氏數列計算的功能的話,並不需要將整個 http_server_response 改寫,但目前這種寫法保留了更好的擴充性
    • 只要將 body 指向欲回傳的內容,snprintf 就會負責組成完整的回傳內容
    • connectionstatus 也可以用一樣的方式更換內容
    • 注意 snprintf 的 src 和 dest 不能 overlap,這也是為何 do_fibonacci 會再 kmalloc 的原因

do_fibonacci

static char *do_fibonacci(char* num_ptr)
{
    int n = -1;
    char *msg = kmalloc(MSG_BUFFER_SIZE, GFP_KERNEL);
    char *next_tok = NULL;
    int num_length = 0;
    num_ptr += 5; //skip "/fib/"
    next_tok = strstr(num_ptr, "/");
    if (!next_tok)
        num_length = strlen(num_ptr);
    else
        num_length = (uint64_t) next_tok - (uint64_t) num_ptr;
    strncpy(msg, num_ptr, num_length); //no modification on original URL
    msg[num_length] = '\0';
    kstrtoint(msg, 10, &n);
    
    if (n < 0) {
        snprintf(msg, MSG_BUFFER_SIZE, "fib(%d): invalid arguments!", n);
    } else {
        long long result = fib_sequence_fdouble(n);
        snprintf(msg, MSG_BUFFER_SIZE, "fib(%d): %lld", n, result);
    }
    return msg;
}
  • do_fibonacci 是費氏數列計算函數 fib_sequence_fdouble 的 wrapper,負責處理字串的部分
  • 傳入 do_fibonacci 的 url 會包含關鍵字之後的部分,如 "\fib\..."
  • 使用 strstr(num_ptr, "/") 來定位數字所在的位置,因此可接受數字用 '/' 結尾或是數字直接置於 url 末端
    • ".../fib/22"
    • ".../fib/22/"
  • 使用 kstrtoint 來將數字字串轉換為數值,由於 kstrtoint 只能處理 null terminated 的字串,須確保數字的末端是 '\0'
    • 雖然可以直接使用 request->request_url 的空間,但為了擴充性 (例如還有別的函式會針對 url 進行判斷),會先將原本的 url strcpy 到新的空間再進行處理
  • 一樣使用 snprintf 將費氏數列結果轉換成字串並回傳
  • 防呆可以預防非數字、負數與無參數的狀況
    • ".../fib/"
    • ".../fib/-87"
    • ".../fib//87"
    • ".../fib/nan"
  • 目前 fib_sequence_fdouble 是含有缺陷的簡易版實作,尚未處裡 overflow 與應用大數運算,待補完 fibdrv 相關的部分再修正 欠的債遲早要還的

htstress.c 使用 epoll 系統呼叫,其作用為何?

epoll 簡介

man 7 epoll

The epoll API performs a similar task to poll(2): monitoring multiple file descriptors to see if I/O is possible on any of them.

epoll API 可用來監測數個 file descriptors (fd) 是否有可用的 I/O 事件,epoll API 的關鍵是 epoll instance (一個 kernel data structure - struct eventpoll),使用時概念上可將 epoll instance 視為包含兩個清單的物件

  • interest list - 包含欲監測的 fd
  • ready list - 包含有可用 I/O 事件的 fd (註:ready list 是 interest list 的子集合)

以下簡單介紹 epoll API 的使用方式

#include <sys/epoll.h>
int epoll_create(int size);
int epoll_create1(int flags);
  • 創立一個 epoll instance 並回傳其 fd
    • 實際上是創立一個 struct file,並將 struct eventpoll 放在 file->private_data,這也是為何 epoll instance 是藉由 fd 來使用,且使用完畢後須 close
  • size - 若 Linux 核心版本大於 2.6.8 不會有實際用途,但輸入值須大於 0,詳見 man epoll_create
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
  • 修改 epoll instance 中的 interest list
  • epfd - 欲修改的 epoll instance 對應的 file descriptor
  • op - 欲進行的操作
    • EPOLL_CTL_ADD 新增 fd 至 interest list
    • EPOLL_CTL_DEL 從 interest list 中移除 fd
    • EPOLL_CTL_MOD 修改 interest list 中 fd 被註冊的 event
  • fd - 受操作的目標 fd
  • event - 指向一個與 fd 關聯的 epoll_event 物件,而 epoll_event 包含 eventsdata 這兩個資料成員
    • event->events - 以 bitmask 的形式記錄欲對目標 fd 監測的事件種類,例如
      • EPOLLIN 監測 fd 可被 read
      • EPOLLOUT 監測 fd 可被 write
      • EPOLLERR 監測 fd 發生錯誤
      • 以 OR 操作來同時註冊多個 event
      • 尚有其他 event,詳見 man epoll_ctl
    • event->data - 紀錄與 fd 關聯的資料
      • union 的方式定義,視使用者用途來決定如何使用,以 socket 的應用來說就是登記 fd
typedef union epoll_data {
    void        *ptr;
    int          fd;
    uint32_t     u32;
    uint64_t     u64;
} epoll_data_t;

struct epoll_event {
    uint32_t     events;      /* Epoll events */
    epoll_data_t data;        /* User data variable */
};
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
int epoll_pwait(int epfd, struct epoll_event *events, int maxevents, int timeout, const sigset_t *sigmask);
  • 等待 epoll instance 中的 ready list 是否有可用的 fd
  • epfd - 欲等待的 epoll instance 對應的 file descriptor
  • events - epoll_wait 會將 ready list 中可用的 fd 對應的 epoll_event 存入 events 指向的 buffer
    • events->events 紀錄此 fd 可用的 I/O 事件
    • events->data 會與最近一次對此 fd 執行 epoll_ctl 時註冊的內容一致
  • maxevents - epoll_wait 回傳 epoll_event 的數量上限
  • timeout - 指定 epoll_wait 等待的時間 (ms)
    • -1 代表無等待時間限制
    • 0 代表 ready list 無可用 fd 會直接返回*
  • 其餘說明詳見 man epoll_wait

epoll vs select

epollselect 具有類似的作用,但 epoll 有更好的效能,也沒有監測數量上限的問題。在 epollWiki page 提到,相較於 select 的時間複雜度為

O(n)epoll 的時間複雜度僅為
O(1)
,然而從 /fs/eventpoll.c 觀察 epoll_ctl 的實作,可以發現是使用 RB tree 來管理加入 epoll instance 的 fd,因此準確來說是 epoll_wait 的時間複雜度為
O(1)
,而 epoll_ctl 的時間複雜度為
O(log(n))

/*
 * Each file descriptor added to the eventpoll interface will
 * have an entry of this type linked to the "rbr" RB tree.
 * Avoid increasing the size of this struct, there can be many thousands
 * of these on a server and we do not want this to take another cache line.
 */
struct epitem {
	union {
		/* RB tree node links this structure to the eventpoll RB tree */
		struct rb_node rbn;
		/* Used to free the struct epitem */
		struct rcu_head rcu;
	};

	/* List header used to link this structure to the eventpoll ready list */
	struct list_head rdllink;
    ...
}

同時也可以觀察到,只有在 epoll_ctl 的時候會使用 copy_from_user

/*
 * The following function implements the controller interface for
 * the eventpoll file that enables the insertion/removal/change of
 * file descriptors inside the interest set.
 */
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
		struct epoll_event __user *, event)
{
	struct epoll_event epds;

	if (ep_op_has_event(op) &&
	    copy_from_user(&epds, event, sizeof(struct epoll_event)))
		return -EFAULT;

	return do_epoll_ctl(epfd, op, fd, &epds, false);
}

接著從 /fs/select.c 觀察 select 的實作,會發現每次 select 都會使用 copy_from_user,而且還用了不只一次 (在 core_sys_select 也有使用)

static int kern_select(int n, fd_set __user *inp, fd_set __user *outp,
		       fd_set __user *exp, struct __kernel_old_timeval __user *tvp)
{
	struct timespec64 end_time, *to = NULL;
	struct __kernel_old_timeval tv;
	int ret;

	if (tvp) {
		if (copy_from_user(&tv, tvp, sizeof(tv)))
			return -EFAULT;

		to = &end_time;
		if (poll_select_set_timeout(to,
				tv.tv_sec + (tv.tv_usec / USEC_PER_SEC),
				(tv.tv_usec % USEC_PER_SEC) * NSEC_PER_USEC))
			return -EINVAL;
	}

	ret = core_sys_select(n, inp, outp, exp, to);
	return poll_select_finish(&end_time, tvp, PT_TIMEVAL, ret);
}

SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp,
		fd_set __user *, exp, struct __kernel_old_timeval __user *, tvp)
{
	return kern_select(n, inp, outp, exp, tvp);
}

select 每次執行都需對整個 fd_set 內註冊的 fd 進行檢驗,導致時間複雜度是

O(n),在執行過程還使用了數次的 copy_from_user;相較之下 epoll_wait 僅需確認 ready list 中是否存在可用的 fd,時間複雜度僅為
O(1)
,就算使用 epoll_ctl 修改列表時的複雜度也僅為
O(log(n))
,且只會用到一次 copy_from_user,綜合上述因素,造成 epoll 的效能好於 select

htstress.c 的運作原理

htstress 是一個 client,根據使用者定義的參數向 server 請求數據,最後會列出 server 回應每個連線平均所花費的時間,使用者需定義的參數如下

Usage: htstress [options] [http://]hostname[:port]/path
Options:
   -n, --number       total number of requests (0 for inifinite, Ctrl-C to abort)
   -c, --concurrency  number of concurrent connections
   -t, --threads      number of threads (set this to the number of CPU cores)
   -u, --udaddr       path to unix domain socket
   -h, --host         host to use for http request
   -d, --debug        debug HTTP response
   --help             display this message
  • -n 向 server 請求的 request 總數
  • -c 每個 worker thread 向 server 連線的總數
  • -t 建立的 worker thread 總數

htstress 的運作主要分成兩個部分,負責連線前置作業與列出測試結果的 main,以及實際和 server 建立連線的 worker thread,以下將簡單列出兩者的運作流程與重點,首先是 main 的部分

Created with Raphaël 2.2.0htstressparse argumentsparse URLgetaddrinfo()start_time()pthread_create()output resultworker(s)

接下來是 worker thread 的部分

Created with Raphaël 2.2.0workerepoll_create()socket()connect()epoll_ctl()epoll_wait()communicate with serverall requests?end_time()returnyesno
  • 每個 worker 會向 server 建立共 concurrency 個連線
  • 與 server 建立連線成功後,將 socket fd 加入 epoll
  • 接下來會是一個 for loop,會不斷迴圈直到所有 worker thread 累積處理完畢的 request 數量達到指定的數量
    • 首先使用 epoll_wait 來監測 socket,再以輪詢的方式依序使用 epoll_wait 回傳可用的 fd
    • 利用 socket 註冊的 epoll event 來區分此 socket 和 server 應進行的步驟
      • 每個 socket 最初都是 EPOLLOUT,也就是監測 socket 是否能向 server 傳送資料
      • 傳送成功後,會將註冊的 epoll event 改為 EPOLLIN,也就是監測 socket 是否能向 server 接收資料
      • 接收資料成功後,將完成的 request 計數加 1
    • 接收資料成功後,檢查所有 worker thread 累積處理完畢的 request 數量是否已達標
      • 若尚未達標,再次迴圈
      • 若已足夠,使用 end_time 紀錄結束時間並結束 thread
  • 離開迴圈的同時也代表結束時間已由 end_time 確認,此時 main 就可以輸出測試結果了

可改善的部分

http_parser_callback_request_url

static int http_parser_callback_request_url(http_parser *parser,
                                            const char *p,
                                            size_t len)
{
    struct http_request *request = parser->data;
    size_t old_len = strlen(request->request_url);
    if ((len + old_len) > 127) {  // max length = 128 - 1 ('\0')
        pr_err("url error: url truncated!\n");
        len = 127 - old_len;
    }
    strncat(request->request_url, p, len);
    return 0;
}
  • strncat 保證會 null terminated,但須確保有足夠的空間
  • 確保寫入的大小不超過 127 bytes
  • 由於已經確保 request->request_url 會 null terminated,搜尋 URL 關鍵字時可安全的使用 strstr

recv error

使用 wget 測試時,發現會出現以下錯誤訊息

[139290.526853] khttpd: recv error: -104

查詢後從 errno.h 得知錯誤訊息是 "Connection reset by peer",也就是上個連線被其中一方強制關閉,經確認後可以發現 kernel thread 在建立連線時確實被告知是 Keep-Alive,所以應該是 client 主動關閉連線。查看相關的程式碼,會發現這個錯誤其實不會對 server 造成實質的影響,只會列出錯誤訊息後將連線提早中斷

    while (!kthread_should_stop()) {
        int ret = http_server_recv(socket, buf, RECV_BUFFER_SIZE - 1);
        if (ret <= 0) {
            if (ret)
                pr_err("recv error: %d\n", ret);
            break;
        }
        http_parser_execute(&parser, &setting, buf, ret);
        if (request.complete && !http_should_keep_alive(&parser))
            break;
    }

查詢 GNU Wget 1.18 Manual: HTTP Options 發現可以追加參數來避免 wget 將連線標註為 Keep-Alive,進而避免錯誤訊息出現

$> wget localhost:8081/fib/2 --no-http-keep-alive

如何正確的停止 kthread

khttpd 中的 kthread 具有 3 種結束方式

  • 接收到 SIGKILLSIGTERM
  • 受到 kthread_stop 終止
  • 發生錯誤 (連線錯誤、記憶體配置錯誤等因素)

接下來會針對 kthread_stop 以及 signal 的使用方式進行說明,再針對 khttpd 實作中的問題進行討論

kthread_stop

閱覽 kthread_createkthread_stop 的說明,會發現其中明確的指出 2 種停止 kthread 的方法,且兩種方法互斥

  • 直接 do_exit,前提是沒有其他地方會 kthread_stop 此 kthread
  • 呼叫 kthread_stop 來使 kthread_should_stop 為真後,kthread 才 return (return 的數值會是 kthread_stop 的返回值)

進一步根據此篇 stackoverflow 的文章 的說明,一路查詢 kthread_run 的實作,可以看到最終是 kthread 這個函式會呼叫使用者定義的函式,接著再根據該函示 return 的數值來 do_exit,也就是說直接在 kthread 內使用 returndo_exit 會有一樣的結果,因此上述的說明可以再進一步理解為

  • kthread_stop 只能對還在運行的 kthread 使用,也就是不能用在已經 return 或是 do_exit 的 kthread
  • 在 kthread 中使用 return 或是 do_exit 結束具有一樣的結果

signal in kthread

在 kernel thread 中不像在 user space 時可以使用 signal handler 來處理 signal,需自行建立檢查點來檢查是否有收到 signal,具體的用法可參考 http_server_daemon

int http_server_daemon(void *arg)
{
    ...
    allow_signal(SIGKILL);
    allow_signal(SIGTERM);

    while (!kthread_should_stop()) {
        int err = kernel_accept(param->listen_socket, &socket, 0);
        if (err < 0) {
            if (signal_pending(current))
                break;
            pr_err("kernel_accept() error: %d\n", err);
            continue;
        }
        ...
    }
    return 0;
}
  • 先以 allow_signal 登記要接收的 signal
  • 再以 signal_pending 主動確認是否有 signal
  • 其他 thread 中可以使用 send_sig 來發送 signal 到指定的 kthread
  • user 可以 sudo kill -s signal [pid] 來發送 signal

http_server_daemon 的實作中可以發現 signal_pending 只有在 kernel_accept 失敗時才會被呼叫,經實測後發現這麼做的目的為

  • 確保 kernel_accept 在成功連線後,直到完成建立 worker 前不會被 signal 中斷
  • 避免模組卸載時 http_server_daemon 卡在 kernel_accept 這個步驟,因為 kernel_accept 會被 signal 中斷 (但目前我不知道 kernel_accept 可以被中斷的原理)

使用 kthread_stop + signal

將上述 kthread_stop 與 signal 的部分放在一起思考,會發現若要同時使用兩者,需要注意 kthread_stop 不能用在已經被 signal 停止的 kthread 上,而目前我想到能避免這個狀況的方式為

  • 讓 kthread 在收到 signal 後只是停止原本執行的作業,但仍需等到 kthread_stop 被呼叫才 returndo_exit
  • 避免對已經停止的 kthread 使用 kthread_stop

首先是第一個方式,讓 kthread 在收到 signal 後只是停止原本執行的作業,等到 kthread_stop 被呼叫才 returndo_exit。等待的迴圈目前是使用 schedule_timeout,另外根據這篇 stackoverflow 的討論set_current_state(TASK_INTERRUPTIBLE) 應該寫在第 2 個迴圈外,避免 race condition

static int http_server_worker(void *arg)
{
    while (!kthread_should_stop()) {
        /* do work here */
        if (signal_pending(current))
            break;
    }
    ...
    /* wait for kthread_stop to be called */
    set_current_state(TASK_INTERRUPTIBLE);
    while (!kthread_should_stop()) {
        schedule_timeout(timeout);
    }
    return 0;
}

第二個方式是避免對已經停止的 kthread 使用 kthread_stop,目前想到的實作方式為

  • http_server_daemon 執行 worker 前先用 get_task_struct 來增加 worker 的 reference count,如此一來就算 kthread 已經停止,仍可以安全的對其使用 kthread_stop
    • 但不適合用於 khttpd,因為這代表所有 task_struct 都需要等到模組卸載時才會被釋放
  • 建立可靠的清單,用於追蹤還在運作中 worker,避免對已經停止的 kthread 使用 kthread_stop
    • 這點與模組卸載時如何停止所有 kthread 類似,於下個小節一併討論

模組卸載時如何停止所有 kthread

觀察 khttpd 模組卸載的程式碼 khttpd_exit

static void __exit khttpd_exit(void)
{
    send_sig(SIGTERM, http_server, 1);
    kthread_stop(http_server);
    close_listen_socket(listen_socket);
    pr_info("module unloaded\n");
}
  • 使用 send_sig 來避免 http_server_daemon 卡在 kernel_accept
    • 若移除 send_sig,卸載模組時很可能會卡住,直到下一個連線建立完成後才會成功卸載
  • 模組卸載時僅停止 http_server_daemon (http_serverhttp_server_daemon 對應的 task_struct),卻沒有停止 server 產生的 http_server_worker,這造成模組卸載後標註為 Keep-Alive 的 kthread 仍然存在
    • 可以使用 $ ps -ef | grep khttpd 確認

顯然要避免模組卸載後 worker 仍然存在的情況,方法就是在模組卸載時追加停止 worker 的步驟,然而綜合前面的討論結果,會發現最麻煩的部分其實是 kthread_stop 的使用,主因是 worker 有可能在 kthread_stop 呼叫之前就停止了 (收到 signal、連線中斷、發生錯誤等因素),因此如果要同時使用 signal + kthread_stop,並同時保證模組卸載時會確實停止所有 worker,可能實行的方式有

  • 建立可靠的清單,用於追蹤還在運作中 worker
    • worker 一旦停止運作 (收到 signal、連線中斷、發生錯誤等因素),就必須從清單中移除
    • 最後模組卸載時,根據清單來對運作中的 worker 執行 kthread_stop
    • 同時要避免查詢清單時顯示還在運作中,但剛好在呼叫 kthread_stop 的前一瞬間就停止的 race condition
    • 由於涉及同步的議題,之後如果有時間會嘗試探討
  • 乾脆不要依賴 kthread_stopkthread_should_stop 來停止 worker,建立 global flag 並將 worker 的迴圈改為以此 flag 的數值為判斷依據,當 server 停止時才改變 flag 使所有 worker 跳離迴圈並停止
    • kecho 就是使用這種方式,配合使用 cmwq 來確認所有 worker 在模組卸載時停止
    • 目前暫定使用此方式修正,後續會進行討論

引入 Concurrency Managed Workqueue

接下來會參照 kecho 的寫法,將 cmwq 應用到 khttpd,以下說明會根據檔案名稱列出相關修改的部分

main.c

workqueue 除了在模組掛載與卸載時會被取用,http_server_daemon 也會動態的將建立的 worker 新增進去,因此 workqueue 的 pointer 須為 global variable

struct workqueue_struct *khttpd_wq;

在模擬掛載及卸載時,分別追加創立及清除 workqueue 的部分

static int __init khttpd_init(void)
{
    ...
+   khttpd_wq = alloc_workqueue("khpptd_wq", WQ_UNBOUND, 0);
    http_server = kthread_run(http_server_daemon, &param, KBUILD_MODNAME);
    ...
}

static void __exit khttpd_exit(void)
{
    send_sig(SIGTERM, http_server, 1);
    kthread_stop(http_server);
    close_listen_socket(listen_socket);
+   destroy_workqueue(khttpd_wq);
    pr_info("module unloaded\n");
}

http_server.h

workqueue 傳遞參數的方式與 kthread_create 不同,為了讓 http_server_daemon 將 socket 的資訊傳遞給 worker,需要定義資料結構將參數包起來,worker 再配合使用 container_of 來取得參數

struct khttpd_server {
    bool is_stopped;
    struct list_head worker_head;
};

struct khttpd
{
    struct socket *socket;
    struct list_head list;
    struct work_struct worker;
};
  • 因為 worker 的數量不固定,代表需要動態配置 khttpd 資料結構的空間,因此會再搭配使用 The Linux Kernel API 中 List Management Functions 的部分的來管理,確保模組卸載時能 kfree 所有配置的記憶體
  • is_stopped 用來取代本來 worker 內的 kthread_should_stop,當 http_server_daemon 停止後會改變 is_stopped 來通知所有 worker 停止

http_server.c

首先新增需要的 global variable

struct khttpd_server daemon = {.is_stopped = false};
extern struct workqueue_struct *khttpd_wq;

修改最關鍵的 http_server_daemon

int http_server_daemon(void *arg)
{
    struct socket *socket;
-   struct task_struct *worker;
+   struct work_struct *worker;
    struct http_server_param *param = (struct http_server_param *) arg;
    
    ...
+   INIT_LIST_HEAD(&daemon.worker_head);
    while (!kthread_should_stop()) {
        ...
-       worker = kthread_run(http_server_worker, socket, KBUILD_MODNAME);
+       if (!(worker = create_work(socket))) {
+           pr_err("can't create more worker process\n");
+           kernel_sock_shutdown(socket, SHUT_RDWR);
+           sock_release(socket);
+           continue;
+       }
+       /* start server worker */
+       queue_work(khttpd_wq, worker);
    }
+   daemon.is_stopped = true; /* notify all worker to stop */
+   free_work();
    return 0;
}
  • 接收連線前先使用 INIT_LIST_HEAD 初始化 worker list
  • create_work 負責初始化 worker 所需的訊息,之後會提到
  • 使用 queue_work 將 worker 推送至 workqueue 執行
  • 一旦 daemon 結束,會設置 is_stopped 來通知所有 worker 停止,並呼叫 free_work 來清除所有登記於 worker list 內的 worker

接下來看 create_workfree_work 兩個輔助函數的實作

static struct work_struct *create_work(struct socket *socket)
{
    struct khttpd *work;
    if (!(work = kmalloc(sizeof(struct khttpd), GFP_KERNEL)))
        return NULL;
    work->socket = socket;
    INIT_WORK(&work->worker, http_server_worker);
    list_add(&work->list, &daemon.worker_head);
    return &work->worker;
}
  • 負責配置 khttpd 資料結構所需的空間
  • 將 worker 所需的相關資料都加入 khttpd 資料結構
  • 最後使用 list_addkhttpd 資料結構加入 worker list
static void free_work(void)
{
    struct khttpd *tar, *tmp;
    list_for_each_entry_safe (tar, tmp, &daemon.worker_head, list) {
        kernel_sock_shutdown(tar->socket, SHUT_RDWR);
        flush_work(&tar->worker);
        sock_release(tar->socket);
        kfree(tar);
    }
}
  • 使用 list_for_each_entry_safe 來依序清除 worker list
  • 當初 worker 是 kmalloc 配置來的,記得 kfree
  • 雖然 worker 在結束時也會 kernel_sock_shutdown,但在此使用 kernel_sock_shutdown 可以避免 worker 因為仍在連線狀態而卡在 http_server_recv

最後是微幅修改 http_server_worker

-static int http_server_worker(void *arg)
+static void http_server_worker(struct work_struct *work)
{
+   struct khttpd *worker = container_of(work, struct khttpd, worker);
    ...
-   allow_signal(SIGKILL);
-   allow_signal(SIGTERM);
    ...
-   while (!kthread_should_stop()) {
+   while (!daemon.is_stopped) {
        ...
    }
    kernel_sock_shutdown(worker->socket, SHUT_RDWR);
-   sock_release(worker->socket);
    kfree(buf);
}
  • 改變傳遞參數的方式
  • 用不到 signal
  • 主迴圈改為根據 is_stopped 判斷是否結束
  • 須注意不能在 worker 結束時就 sock_release,這會導致模組卸載時 free_work 對已經不存在的 socket 執行 sock_release

效能差異

直接使用 make check 確認

修改前

requests:      100000
good requests: 100000 [100%]
bad requests:  0 [0%]
socker errors: 0 [0%]
seconds:       3.368
requests/sec:  29687.994

修改成 cmwq 後

requests:      100000
good requests: 100000 [100%]
bad requests:  0 [0%]
socker errors: 0 [0%]
seconds:       1.852
requests/sec:  53986.148

reference

GNU Internet Socket Example
Driver porting: more module changes
Common Variable Attributes
Concurrency Managed Workqueue (cmwq)
HTTP Messages
The method to epoll’s madness

tags: linux 2020