# Linux2022 final project -- sehttpd
contributed by < [tommy2234](https://github.com/tommy2234/sehttpd) >
> [題目描述](https://hackmd.io/@sysprog/linux2022-sehttpd)
> [GitHub: sehttpd](https://github.com/tommy2234/sehttpd)
---
## epoll
epoll 能夠同時監視著多個 file descriptors ,確認它們是否準備好做 I/O。
> The epoll API performs a similar task to poll(2): monitoring multiple file descriptors to see if I/O is possible on any of them.
### Interest list
epoll 的監視名單。
> the set of file descriptors that the process has registered an interest in monitoring.
### ready list
ready list 是 interest list 的 subset ,準備好做 I/O 的 fd 會被加入其中。
ready list 是由 kernel 來動態 maintain ,當 fd 需要發生 IO 時, kernel 就會將其加入 epoll 的 ready list
> the set of file descriptors that are "ready" for I/O. The ready list is a subset of (or, more precisely, a set of references to) the file descriptors in the interest list that is dynamically populated by the kernel as a result of I/O activity on those file descriptors.
### epoll_create()
建立一個 epoll instance (建立一個監視者)
### epoll_ctl()
`int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);`
epoll_ctl 用來對建立的 epoll instance 做操作,如加入、修改、移除 interest list 中的 fd。
> This system call is used to add, modify, or remove entries in the interest list of the epoll(7) instance referred to by the file descriptor epfd. It requests that the operation op be performed for the targe file descriptor, fd.
- epdf : epoll instance
- fd : 要操作的 fd
#### struct epoll_event
fd 所對應的資訊和設定都會包含在此結構體當中,並且伴隨 fd 一起在加入時被 epoll 記錄下來。
```c
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
```
- `data`
我們可以將想紀錄的資訊都紀錄在 data 中,利用 ptr 指向任意的包含 fd 資訊的結構體。
在 sehttpd 中我們會將 ptr 指向 http_request_t ,也就是來自此 client fd 的 http_request。
- `events`
events 欄位是一個 bit mask ,可視為此 fd 的設定。
我們可以將許多 event type or 在一起,以此告知 epoll 需要監視 fd 的哪些 IO 行為(read, write, hang up ...)
也可以透過它設定 edge trigger 或是 level trigger (default) 。
#### op
操作選項
1. `EPOLL_CTL_ADD`
加入新的 fd 到 interest list 之中,其對應的資訊和設定都會在 event
2. `EPOLL_CTL_MOD`
將 fd 附帶的 epoll_event 更換為 event 欄位中的新 event。
3. `EPOLL_CTL_DEL`
將 fd 從 interest list 中移除。
### Level-triggered and edge-triggered
1. Level-trigger (default)
只要 fd 還處在 ready 的狀態,epoll 就會在你呼叫 epoll_wait() 時不斷通知你。
2. Edge-trigger
透過 EPOLLET flag 設定此模式。
epoll 只在第一次檢測到 fd 的 IO 行為時通知你,也就是說該 fd 只會在 epoll_wait() 給使用者的 events set 中出現一次,直到下一次 IO 才會再出現,因此在此模式下對 fd 的資料讀寫必須一次做完。
因為我們需要在檢測到 IO 時就一次將 fd 讀取至乾涸,所以勢必會使用一個 while 迴圈不斷呼叫 read,因此==在 edge-trigger 模式下,file descriptor 必須設定為 non-blocking==才能使最後一次 `read()` return `EAGAIN`,否則最後一次 `read()` 在讀不到任何資料的情況下會 block 住。
> An application that employs the EPOLLET flag should use nonblocking file descriptors to avoid having a blocking read or write starve a task that is handling multiple file descriptors.
3. `EPOLLONESHOT`
在 edge trigger 模式下,常見搭配的 flag 是 `EPOLLONESHOT`。
和 edge trigger 的概念一樣,此 flag 會使該 fd 只被檢測到一次 I/O 之後就失效,也就是只會被 `epoll_wait()` 抽取出一次,因此我們必須==利用 `epoll_ctl()` 的 `EPOLL_CTL_MOD` 操作將 fd rearm==。
> the caller has the option to specify the EPOLLONESHOT flag, to tell epoll to disable the associated file descriptor after the receipt of an event with epoll_wait(2). When the EPOLLONESHOT flag is specified, it is the caller's responsibility to rearm the file descriptor using epoll_ctl(2) with EPOLL_CTL_MOD.
---
## epoll web server
### http_request_t
這個 struct 中紀錄著一個 client request 相關的所有資訊,包含 client fd, request method, 還有用來接收 request 的 buffer 等等。
```c
typedef struct {
void *root;
int fd;
int epfd;
char buf[MAX_BUF]; /* ring buffer */
size_t pos, last;
int state;
void *request_start;
int method;
void *uri_start, *uri_end;
int http_major, http_minor;
void *request_end;
struct list_head list; /* store http header */
void *cur_header_key_start, *cur_header_key_end;
void *cur_header_value_start, *cur_header_value_end;
void *timer;
} http_request_t;
```
### Mainloop
sehttpd 在單一執行緒之下使用 epoll 系統呼叫達到多工。
我們在 mainloop 中呼叫 epoll_wait 取得狀態有變化的 event ,也就是想做 I/O 的 fd ,然後用 for 迴圈 (line 9) 將這些 event 一個個處理完畢。
這些 event 分為兩種情境:
1. listenfd is ready (line 12)
> 這意味著有新的 client 要建立連線,此時我們遵循 socket 流程呼叫 accept 得到 client 的 fd,並將其加入 epoll 的監視行列中。
在加入新 fd 之後並不會立刻回應 client 的 request,而是等到下一輪的 while 迴圈,這個 fd 會在呼叫 epoll_wait 時被取出,然後在情境 2 中呼叫 do_request 回應。
2. ready 的是其他 fd (line 46)
這代表已經加入 interest list 的 clent 需要讀寫,此時我們呼叫 do_request 回應其需求。
```c=
/* epoll_wait loop */
while (1) {
int time = find_timer();
debug("wait time = %d", time);
int n = epoll_wait(epfd, events, MAXEVENTS, time);
handle_expired_timers();
printf("n events:%d\n", n);
for (int i = 0; i < n; i++) {
http_request_t *r = events[i].data.ptr;
int fd = r->fd;
if (listenfd == fd) {
/* we have one or more incoming connections */
while (1) {
socklen_t inlen = 1;
struct sockaddr_in clientaddr;
int infd = accept(listenfd, (struct sockaddr *) &clientaddr,
&inlen);
//printf("connection count : %d fd:%d\n", ++count, infd);
if (infd < 0) {
if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
/* we have processed all incoming connections */
break;
}
log_err("accept");
break;
}
rc = sock_set_non_blocking(infd);
assert(rc == 0 && "sock_set_non_blocking");
request = malloc(sizeof(http_request_t));
if (!request) {
log_err("malloc");
break;
}
init_http_request(request, infd, epfd, WEBROOT);
event.data.ptr = request;
event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
printf("add fd:%d\n", infd);
epoll_ctl(epfd, EPOLL_CTL_ADD, infd, &event);
add_timer(request, TIMEOUT_DEFAULT, http_close_conn);
}
} else {
if ((events[i].events & EPOLLERR) ||
(events[i].events & EPOLLHUP) ||
(!(events[i].events & EPOLLIN))) {
log_err("epoll error fd: %d", r->fd);
close(fd);
continue;
}
printf("do req for fd:%d\n", ((http_request_t *)(events[i].data.ptr))->fd);
do_request(events[i].data.ptr);
}
}
}
```
---
## 改進程式碼
### Fix http parse error
在 do_request() 中時常輸出錯誤訊息:
`[ERROR] (src/http.c:253: errno: Resource temporarily unavailable) rc != 0`
用 gdb 看看發生什麼事
```c
Breakpoint 1, do_request (ptr=0x5555555a58a0) at src/http.c:253
253 log_err("rc != 0");
@(gdb) p rc
$1 = 10
```
印出 rc 發現 rc = 10 ,對應到 `HTTP_PARSER_INVALID_METHOD`
```c
enum http_parser_retcode {
HTTP_PARSER_INVALID_METHOD = 10,
HTTP_PARSER_INVALID_REQUEST,
HTTP_PARSER_INVALID_HEADER
};
```
印出方才 read() 得到的字串,還有頭尾位置,發現 r->pos 已處在 `MAX_BUF`(8124),代表上一次呼叫 read() 時已由於已將 `remain_size` 用盡,到達 ring buffer 末端,因此現在回到 ring buffer 頭部繼續讀取。
```c
@(gdb) p plast
$2 = 0x5555555a58b0 "gent: ApacheBench/2.3\r\nAccept: */*\r\n\r\n\r\nHost: localhost:8081\r\nUser-Agent: ApacheBench/2.3\r\nAccept: */*\r\n\r\nGET /"
@(gdb) p r->last
$3 = 8162
@(gdb) p r->pos
$4 = 8124
```
印出 r->pos 往前推的一小段字串,發現頭尾是可相接的,印證了我的想法。
```c
@(gdb) p &r->buf[r->pos - 10]
$11 = 0x5555555a7862 "81\r\nUser-A"
```
為和這樣會造成 parse 失敗呢?
此時 state 正處在 1 ,對應到 `s_method`,而 `http_parse_request_line()` 回傳 `HTTP_PARSER_INVALID_METHOD` 的地方在 第 34 行。
觀察 `http_parse_request_line()` 和 `http_parse_request_body()` 的 parse 流程,發現其運用的是 finite state machine 的方式,雖然能夠透過 r->state 紀錄上次停在哪個 state ,卻沒有紀錄此 state 是屬於 parse line 還是 parse body 時的 state,而每次呼叫完 read() 都是從 `http_parse_request_line()` 開始呼叫,因此當然會在 parse 時出錯。
很明顯此時的 state = 1 要對應到 `http_parse_request_body()` 裡的 `s_key` 才是正確的。
```c
@(gdb) p r->state
$5 = 1
```
```c=
case s_method:
if (ch == ' ') {
m = r->request_start;
switch (p - m) {
case 3:
if (cst_strcmp(m, 'G', 'E', 'T', ' ')) {
r->method = HTTP_GET;
break;
}
break;
case 4:
if (cst_strcmp(m, 'P', 'O', 'S', 'T')) {
r->method = HTTP_POST;
break;
}
if (cst_strcmp(m, 'H', 'E', 'A', 'D')) {
r->method = HTTP_HEAD;
break;
}
break;
default:
r->method = HTTP_UNKNOWN;
break;
}
state = s_spaces_before_uri;
break;
}
if ((ch < 'A' || ch > 'Z') && ch != '_')
return HTTP_PARSER_INVALID_METHOD;
break;
```
解決方案:
==方案一==
紀錄 parse 的進度,`r->progress == PARSE_LINE` 時才要呼叫 `http_parse_request_line()`
```diff
diff --git a/src/http.c b/src/http.c
index 61c315d..266c9c4 100644
--- a/src/http.c
+++ b/src/http.c
@@ -246,12 +246,14 @@ void do_request(void *ptr)
assert(r->last - r->pos < MAX_BUF && "request buffer overflow!");
/* about to parse request line */
- rc = http_parse_request_line(r);
- if (rc == EAGAIN)
- continue;
- if (rc != 0) {
- log_err("rc != 0");
- goto err;
+ if(r->progress == PARSE_LINE) {
+ rc = http_parse_request_line(r);
+ if (rc == EAGAIN)
+ continue;
+ if (rc != 0) {
+ log_err("rc != 0");
+ goto err;
+ }
}
debug("uri = %.*s", (int) (r->uri_end - r->uri_start),
diff --git a/src/http.h b/src/http.h
index b43dc5a..ff5970e 100644
--- a/src/http.h
+++ b/src/http.h
@@ -26,6 +26,11 @@ enum http_status {
HTTP_NOT_FOUND = 404,
};
+enum http_parse_progress {
+ PARSE_LINE = 0,
+ PARSE_BODY,
+};
+
#define MAX_BUF 8124
typedef struct {
@@ -35,6 +40,7 @@ typedef struct {
char buf[MAX_BUF]; /* ring buffer */
size_t pos, last;
int state;
+ int progress;
void *request_start;
int method;
void *uri_start, *uri_end;
@@ -84,7 +90,7 @@ static inline void init_http_request(http_request_t *r,
{
r->fd = fd, r->epfd = epfd;
r->pos = r->last = 0;
- r->state = 0;
+ r->state = r->progress = 0;
r->root = root;
INIT_LIST_HEAD(&(r->list));
}
```
==方案 2==
如果能將 request 一次讀取並處理完畢就不會有以上的問題,因此可以選擇放棄 ring buffer 的技巧,每次都讀取到 buffer 的頭部,並且一次讀取 MAX_BUF 的長度,將 fd 的資料榨乾。
使用 ring buffer 的好處是,當我們遇到一個內容超級長的 request,可以分段讀取,然後用 finite state machine 的技巧分段處理。
但其實長度大於 MAX_BUF 的 request 很罕見,缺點是需要處理 buffer 尾端長度不足的問題會使程式碼變得複雜。
另一個缺點是, buffer 尾端長度不足時,一個 `read()` system call 會變成兩個,而且這情況很常發生,會降低效能。
==綜合以上原因,我選擇採用方案 2 解決此 bug。==
改寫後的 `do_request()` 如下:
- 移除 for loop,一次讀取完畢。
- request 處理完畢後將 r->pos 和 r->last 歸零
```c
void do_request(void *ptr)
{
http_request_t *r = ptr;
int fd = r->fd;
int rc;
char filename[SHORTLINE];
webroot = r->root;
del_timer(r);
int n = read(fd, r->buf, MAX_BUF);
if (n == 0) /* EOF */
goto err;
if (n < 0) {
if (errno != EAGAIN) {
log_err("read err, and errno = %d", errno);
goto err;
}
goto end;
}
r->last += n;
/* about to parse request line */
rc = http_parse_request_line(r);
if (rc == EAGAIN)
goto end;
if (rc != 0) {
log_err("rc != 0, rc = %d", rc);
goto err;
}
debug("uri = %.*s", (int) (r->uri_end - r->uri_start),
(char *) r->uri_start);
rc = http_parse_request_body(r);
if (rc == EAGAIN)
goto end;
if (rc != 0) {
log_err("rc != 0 rc = %d", rc);
goto err;
}
/* handle http header */
http_out_t *out = malloc(sizeof(http_out_t));
if (!out) {
log_err("no enough space for http_out_t");
exit(1);
}
init_http_out(out, fd);
parse_uri(r->uri_start, r->uri_end - r->uri_start, filename);
struct stat sbuf;
if (stat(filename, &sbuf) < 0) {
do_error(fd, filename, "404", "Not Found", "Can't find the file");
goto end;
}
if (!(S_ISREG(sbuf.st_mode)) || !(S_IRUSR & sbuf.st_mode)) {
do_error(fd, filename, "403", "Forbidden", "Can't read the file");
goto end;
}
out->mtime = sbuf.st_mtime;
http_handle_header(r, out);
assert(list_empty(&(r->list)) && "header list should be empty");
if (!out->status)
out->status = HTTP_OK;
serve_static(fd, filename, sbuf.st_size, out);
if (!out->keep_alive) {
debug("no keep_alive! ready to close");
free(out);
goto close;
}
free(out);
end:;
r->pos = r->last = 0;
struct epoll_event event = {
.data.ptr = ptr,
.events = EPOLLIN | EPOLLET | EPOLLONESHOT,
};
epoll_ctl(r->epfd, EPOLL_CTL_MOD, r->fd, &event);
add_timer(r, TIMEOUT_DEFAULT, http_close_conn);
return;
err:
close:
/* TODO: handle the timeout raised by inactive connections */
rc = http_close_conn(r);
assert(rc == 0 && "do_request: http_close_conn");
}
```
:::warning
嘗試提交 pull request,以修正 request 讀取議題。
:notes: jserv
:::
### Use sendfile() for zero-copy
sendfile 將一個 fd 寫到另一個 fd ,且動作在核心完成,因此達成 zero-copy。
```diff=
@@ -195,14 +196,9 @@ static void serve_static(int fd,
int srcfd = open(filename, O_RDONLY, 0);
assert(srcfd > 2 && "open error");
- char *srcaddr = mmap(NULL, filesize, PROT_READ, MAP_PRIVATE, srcfd, 0);
- assert(srcaddr != (void *) -1 && "mmap error");
+ n = sendfile(fd, srcfd, 0, filesize);
+ assert(n == filesize && "mmap error");
close(srcfd);
-
- writen(fd, srcaddr, filesize);
-
- munmap(srcaddr, filesize);
}
```
---
## Condition variable
在引入 thread pool 之前,我先研究 pthread 提供的 condition variable。
>Condition variables are synchronization primitives that enable threads to wait until a particular condition occurs.
Condition variable 是一個 synchronization(同步)機制,使得一個 thread 能夠在某處持續等待,直到某個特定事件發生才被喚醒。
一個 condition variable 總是和一個 mutex lock 成對出現,互相搭配使用。
### Why use condition variable?
為何我們需要大費周章使用 condition variables ,何不單純使用 mutex 就好?
的確,如果只是為了 mutual exclution ,使用 mutex 已經足夠,但是當我們遇到像是 thread pool 這般 `single-producer-mutiple-comsumer` 的情境, condiotion variable 就派上用場了。
condition variable 和 mutex 最大的差別在於它能夠使一個 thread 得知『某個事件發生了』。以 thread pool 為例,單純使用 mutex 勢必會讓多個 thread polling 在同一個 job queue 之上,即使 queue 為空也會重覆地競爭鎖、檢查 queue 中是否有工作可做,這樣無疑是浪費 cpu cycle 的行為。
使用 condition variable ,我們可以讓 workers 等待在 queue 之外,直到有工作插入 queue 或是 queue 非空才喚醒 workers 到 queue 中抽取工作,當 thread 數量眾多時,能夠省下 polling 的資源,對效能有所提昇。
### pthread_cond_wait()
`pthread_cond_wait()` 會使呼叫它的 thread block,直到被 `pthread_cond_broadcast()` 或 `pthread_cond_signal()` 喚醒為止。
在呼叫前,calling thread 必須先取得 lock ,在呼叫後會自動 release lock ,而當 thread 被喚醒之後會自動取得 lock。
🚨🚨🚨
呼叫 `pthread_cond_wait()` 時必須以一個 while 迴圈包覆起來,在 thread 被喚醒時檢查條件是否為真,以防止 [suprious wakup](https://en.wikipedia.org/wiki/Spurious_wakeup)。
### pthread_cond_signal()
> The _pthread\_cond\_signal_() function shall unblock at least one of the threads that are blocked on the specified condition variable _cond_ (if any threads are blocked on _cond_).
`pthread_cond_signal()` 會喚醒一個等待在特定 condition variable 的 thread,並且在呼叫後必須主動呼叫 `pthread_mutex_unlock()` 以釋放 lock ,然後從 `pthread_cond_wait()` 被喚醒的 thread 會自動取得 lock。
至由該喚醒哪個 thread,交由系統的 scheduling policy 來決定。
### pthread_cond_broadcast()
pthread_cond_broadcast() 會喚醒所有等待在特定 condition variable 的 threads。同樣地,使用者也必須主動釋放 lock,然後所有被喚醒的 threads 會一同競爭 lock。
---
## Thread pool
![](https://i.imgur.com/Xmr5DlF.png)
我研究 [Pithikos](https://github.com/Pithikos/C-Thread-Pool) 的 thread pool library ,進行改寫和整合。
程式碼在我的 [epoll branch](https://github.com/tommy2234/sehttpd/tree/epoll)
### Semaphore & Binary semaphore
Semaphore 是一個同步物件,用於保持在 0 至指定最大值之間的一個計數值。當執行緒完成一次對該 semaphore 的等待時,該計數值減一;當執行緒完成一次對 semaphore 物件的釋放時,計數值加一。當計數值為0,則執行緒等待該 semaphore 物件不再能成功直至該 semaphore 變成 signaled 狀態。
Semaphore 可視為一個區域的『名額』,代表著當前有多少 threads 能夠進入該區域。
Binary semaphore 的 semaphore 值只能是 0 or 1 ,在邏輯上等同於一個 mutex。
bsem 結構體包含一個 mutex 和 condition variable,v 代表 semaphore 的值。
```c
/* Binary semaphore */
typedef struct bsem {
pthread_mutex_t mutex;
pthread_cond_t cond;
int v;
} bsem;
```
### Job
由於我們只需要在頭部或尾端進行 push 或 pop 的操作,因此 Job queue 只需要是一個單向的 linked list,由眾多 job 組成。
一個 job 包含一個將要執行的 function pointer。
```c
typedef struct job{
struct job* prev; /* pointer to previous job */
void (*function)(void* arg); /* function pointer */
void* arg; /* function's argument */
} job;
```
### bsem_post()
Semaphore 的值變成 1,並喚醒一個 thread。
```c
/* Post to at least one thread */
static void bsem_post(bsem *bsem_p) {
pthread_mutex_lock(&bsem_p->mutex);
bsem_p->v = 1;
pthread_cond_signal(&bsem_p->cond);
pthread_mutex_unlock(&bsem_p->mutex);
}
```
### bsem_wait()
等待 condition variable 直到 Semaphore 的值變成 1,被喚醒後將 Semaphore 設為 0。
使用 while 迴圈防止 [suprious wake](https://en.wikipedia.org/wiki/Spurious_wakeup)
```c
/* Wait on semaphore until semaphore has value 1 */
static void bsem_wait(bsem* bsem_p) {
pthread_mutex_lock(&bsem_p->mutex);
while (bsem_p->v != 1) {
pthread_cond_wait(&bsem_p->cond, &bsem_p->mutex);
}
bsem_p->v = 0;
pthread_mutex_unlock(&bsem_p->mutex);
}
```
### Add job -- jobqueue_push()
讀寫前要先取得 read-write lock (line 4),寫入完畢後呼叫 `bsem_post()`(line 21) 喚醒一個 worker thread,表示『有工作給你啦!』
```c=
/* Add (allocated) job to queue */
static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob){
pthread_mutex_lock(&jobqueue_p->rwmutex);
newjob->prev = NULL;
switch(jobqueue_p->len){
case 0: /* if no jobs in queue */
jobqueue_p->front = newjob;
jobqueue_p->rear = newjob;
break;
default: /* if jobs in queue */
jobqueue_p->rear->prev = newjob;
jobqueue_p->rear = newjob;
}
jobqueue_p->len++;
bsem_post(jobqueue_p->has_jobs);
pthread_mutex_unlock(&jobqueue_p->rwmutex);
}
```
### Get job -- jobqueue_pull()
此函式主要由 worker threads 呼叫,如果 queue 中不只一個工作,則呼叫 `bsem_post()` 喚醒下一個 worker thread 來抽取工作。
```c
/* Get first job from queue(removes it from queue)
* Notice: Caller MUST hold a mutex
*/
static struct job* jobqueue_pull(jobqueue* jobqueue_p){
pthread_mutex_lock(&jobqueue_p->rwmutex);
job* job_p = jobqueue_p->front;
switch(jobqueue_p->len){
case 0: /* if no jobs in queue */
break;
case 1: /* if one job in queue */
jobqueue_p->front = NULL;
jobqueue_p->rear = NULL;
jobqueue_p->len = 0;
break;
default: /* if >1 jobs in queue */
jobqueue_p->front = job_p->prev;
jobqueue_p->len--;
/* more than one job in queue -> post it */
bsem_post(jobqueue_p->has_jobs);
}
pthread_mutex_unlock(&jobqueue_p->rwmutex);
return job_p;
}
```
### Workers -- thread_do()
這是每個 worker thread 執行的函式。先等待在 queue 之外直到 queue 中有任務時被喚醒(line 3),然後到 queue 中抽取工作(line 14) 並執行(line 18)。
以下節錄重點的 while loop。
```c=
while(threads_keepalive){
bsem_wait(thpool_p->jobqueue.has_jobs);
if (threads_keepalive){
pthread_mutex_lock(&thpool_p->thcount_lock);
thpool_p->num_threads_working++;
pthread_mutex_unlock(&thpool_p->thcount_lock);
/* Read job from queue and execute it */
void (*func_buff)(void*);
void* arg_buff;
job* job_p = jobqueue_pull(&thpool_p->jobqueue);
if (job_p) {
func_buff = job_p->function;
arg_buff = job_p->arg;
func_buff(arg_buff);
free(job_p);
}
pthread_mutex_lock(&thpool_p->thcount_lock);
thpool_p->num_threads_working--;
if (!thpool_p->num_threads_working) {
pthread_cond_signal(&thpool_p->threads_all_idle);
}
pthread_mutex_unlock(&thpool_p->thcount_lock);
}
}
```
### Interface
只要先呼叫 `thpool_init()` 指定需要幾個 thread,再呼叫 `thpool_add_work()` 即可插入工作了。
### Epoll 結合 thread pool
一樣由 main thread 使用 epoll 處理連線,然後將 do_request()
交給 workers 去執行。
```diff
- do_request(events[i].data.ptr);
+ thpool_add_work(pool, do_request, events[i].data.ptr);
```
### 設置 critical section
所有 access 到 shared memory 的地方都要以 lock 包覆起來,例如存放 timer 的 priority queue 就是一塊 shared memory。
呼叫 add_timer(), find_timer(), handle_expired_timers() 這些函式時都要先上鎖。
```c
pthread_mutex_lock(&timer_lock);
add_timer(request, TIMEOUT_DEFAULT, http_close_conn);
pthread_mutex_unlock(&timer_lock);
```
---
## 加入 memory pool
定義 memory pool 的結構體
想法:
將 pool 當成一個 stack ,透過 next 指向 top of stack
```c
struct {
http_request_t **pool;
int next;
pthread_mutex_t lock;
int size;
} req_pool;
```
取出時需要先將 pool 上鎖,然後回傳 next 指向的位置。
結束後 next 會增加一 ,指向下一個要被取出的結構體。
```c
void get_request(http_request_t **r)
{
pthread_mutex_t *lock = &req_pool.lock;
pthread_mutex_lock(lock);
int idx = req_pool.next++;
if (idx == req_pool.size) {
printf("Too much client connected!\n");
pthread_mutex_unlock(lock);
*r = NULL;
}
*r = req_pool.pool[idx];
pthread_mutex_unlock(lock);
}
```
釋放時將 `http_request_t` 結構體放回 top of stack ,也就是將 top of stack 的指標指向要被釋放的結構體。
```c
int free_request(http_request_t *r)
{
pthread_mutex_lock(&req_pool.lock);
int idx = --req_pool.next;
req_pool.pool[idx] = r;
pthread_mutex_unlock(&req_pool.lock);
return 0;
}
```
以上設計使得取出和釋放的操作都是 O(1)
---
## io_uring
io_uring 是在 linux 5.1 首次出現的 asynchronous I/O 機制,由 Jens Axboe 提出。
雖然性能相較於原有的 AIO 並沒有太大提昇,但是解決了 AIO 存在的一些問題
1. AIO 只支援 direct I/O 模式的 storage file ,而 io_uring 支援所有類型的 I/O,包含 cached files、direct-access files 和 blocking sockets,也支援更多的 asynchronous system call ,例如 accept ,而非僅限於 read/write system call。
2. AIO 在某些情況下會 block ,或是出現無法預測的行為,而 io_uring 在設計上是真正的非同步 I/O ,系統呼叫時只會將 request 加入 queue ,等待 kernel thread 去完成,不會做其他多餘的事,因此可以保證不會 block。
3. io_uring 的設計靈活、可擴展
### io_uring 基本架構
![](https://i.imgur.com/bgK10Pb.png)
每個 io_uring instance 都有兩個 ring buffer,在 user space 和 kernel space 之間共享,達成 zero copy。
1. Submission queue
2. Completion queue
user 身為生產者,將待完成的 requeust 加入 submission queue ,kernel 則作為消費者,從 queue 中抽取 request 執行,完成之後將結果放在 completion 讓 user 收割。
### submission queue
一個 submission queue entry 的結構如下
```c
struct io_uring_sqe {
__u8 opcode;
__u8 flags;
__u16 ioprio;
__s32 fd;
__u64 off;
__u64 addr;
__u32 len;
union {
__kernel_rwf_t rw_flags;
__u32 fsync_flags;
__u16 poll_events;
__u32 sync_range_flags;
__u32 msg_flags;
};
__u64 user_data;
union {
__u16 buf_index;
__u64 __pad2[3];
};
};
```
- opcode
將要進行的 operation
- fd
要操作的 file descriptor
- flags
一個 bit mask ,用來做額外的設定
### completion queue
一個 completion queue entry 的結構如下
```c
struct io_uring_cqe {
__u64 user_data;
__s32 res;
__u32 flags;
};
```
`user_data` 指向使用者自行定義的結構體,`res` 則包含 system call 的 return value,例如 read/write。
## io_uring web server
我使用作者本人撰寫的 high level library -- [liburing](https://github.com/axboe/liburing) ,並參考 [io_uring echo server](https://github.com/frevib/io_uring-echo-server/blob/master/io_uring_echo_server.c) 的架構來設計 web server。
### timeout 機制
我透過設定 io_uring 提供的 `IOSQE_IO_LINK` flag 來達成 timeout 機制。
> When this flag is specified, it forms a link with the next SQE in the submission ring. That next SQE will not be started before this one completes.
設定此 flag 會將一個 entry 和 ring 上的下一個 entry 連結在一起,當另一個 request 發生 timeout ,兩個 request 都會被標記為完成並出現在 completion queue 中。
因此,在新增一個 I/O request 之後,我可以將另一個 timeout operation 和它連結在一起,達成 timer 的效果。
### add_timeout_request()
在新增 read/write request 之後呼叫此函式,藉此將這些 request 和一個 timeout operation link 在一起。
```c
void add_timeout_request(struct io_uring *ring, http_request_t *r)
{
struct __kernel_timespec ts;
ts.tv_sec = TIMEOUT_DEFAULT / 1000;
r->event_type = TIMEOUT;
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_link_timeout(sqe, &ts, 0);
io_uring_sqe_set_flags(sqe, 0);
io_uring_sqe_set_data(sqe, r);
}
```
### add_accept_request()
新增一個 accept operation 到 submission queue。
```c
void add_accept_request(struct io_uring *ring,
http_request_t *r,
int listenfd,
struct sockaddr *client_addr,
socklen_t *client_len)
{
r->event_type = ACCEPT;
r->fd = listenfd;
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_accept(sqe, listenfd, client_addr, client_len, 0);
io_uring_sqe_set_flags(sqe, 0);
io_uring_sqe_set_data(sqe, r);
io_uring_submit(ring);
}
```
### add_read_request()
新增一個 read operation 到 submission queue。
```c
void add_read_request(struct io_uring *ring, http_request_t *r)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
int clientfd = r->fd;
r->event_type = READ;
io_uring_prep_recv(sqe, clientfd, r->buf, MAX_BUF, 0);
io_uring_sqe_set_flags(sqe, IOSQE_IO_LINK);
io_uring_sqe_set_data(sqe, r);
r = get_request(r->tid);
add_timeout_request(ring, r);
io_uring_submit(ring);
}
```
### add_write_request()
新增一個 write operation 到 submission queue。
原本在 epoll 的版本中,對 client fd 寫入都是呼叫 write,現在可以此函式取代 write。
```c
void add_write_request(struct io_uring *ring, http_request_t *r, void *buf)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
char *src = buf;
r->event_type = WRITE;
size_t len = strlen(src);
io_uring_prep_send(sqe, r->fd, src, len, 0);
io_uring_sqe_set_flags(sqe, IOSQE_IO_LINK);
io_uring_sqe_set_data(sqe, r);
r = get_request(r->tid);
add_timeout_request(ring, r);
io_uring_submit(ring);
}
```
### Main loop
根據不同的 event type 做對應的處理:
- accept
接受到一個新的 client 連線,此時呼叫 add_read_request() 準備接收 client 的 request。
- read
對一個 client fd 讀取資料完畢。
如果 read_bytes <= 0 代表發生 timeout 或者讀取發生錯誤,此時關閉 client 連線。
如果 read_bytes > 0 代表讀取成功,執行 do_request。
- write
對一個 client fd 寫入資料完畢。
write_bytes <= 0 代表發生 timeout 或者寫入發生錯誤。
如果寫入失敗或者連線類型不是 http keep-alive, 就要關閉連線。
若寫入成功且連線類型是 keep-alive ,則呼叫 add_read_request() ,準備接收 client 的下一個 request。
- timeout
每個 timeout operation 都連結著某個 read/write 的操作,所以這個 timeout event 代表某個操作已經完成或是發生 timeout 。此時呼叫 free_request() 將記憶體歸還到 memory pool 。
```c
void *server_loop(void *arg)
{
thread_data_t *data = arg;
int listenfd = data->listenfd;
int tid = data->tid;
struct io_uring *ring = &data->ring;
http_request_t *r = get_request(tid);
struct sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);
add_accept_request(ring, r, listenfd, (struct sockaddr *) &client_addr,
&client_len);
while (threads_alive) {
struct io_uring_cqe *cqe;
io_uring_wait_cqe(ring, &cqe);
http_request_t *cqe_req = io_uring_cqe_get_data(cqe);
enum event_types type = cqe_req->event_type;
if (type == ACCEPT) {
add_accept_request(ring, r, listenfd,
(struct sockaddr *) &client_addr, &client_len);
int clientfd = cqe->res;
if (clientfd >= 0) {
http_request_t *request = get_request(tid);
init_http_request(request, clientfd, WEBROOT, ring);
add_read_request(ring, request);
}
} else if (type == READ) {
int read_bytes = cqe->res;
if (read_bytes <= 0) {
if (read_bytes < 0)
fprintf(stderr, "Async request failed: %s for event: %d\n",
strerror(-cqe->res), cqe_req->event_type);
http_close_conn(cqe_req);
} else {
do_request(cqe_req, read_bytes);
}
} else if (type == WRITE) {
int write_bytes = cqe->res;
if (write_bytes <= 0) {
if (write_bytes < 0)
fprintf(stderr, "Async request failed: %s for event: %d\n",
strerror(-cqe->res), cqe_req->event_type);
http_close_conn(cqe_req);
} else {
if (cqe_req->keep_alive == false)
http_close_conn(cqe_req);
else
add_read_request(ring, cqe_req);
}
} else if (type == TIMEOUT) {
free_request(cqe_req);
}
io_uring_cq_advance(ring, 1);
}
io_uring_queue_exit(ring);
return NULL;
}
```
---
## io_uring + multi-threading
我根據 io_uring 作者 Jens Axboe 在 liburing 的 [issue #109](https://github.com/axboe/liburing/issues/109) 中所提到的建議結合 multithread。
> So let me ask, why aren't you just using a ring per thread in your thread pool? Most users of io_uring won't be using thread pools at all, since one of the goal was not to need them as blocking processing will be done internally anyway. You can share the async backend between rings for all your threads, so the overhead should be fairly minimal. And then you don't need to share a ring at all.
在每個 thread 之間分享同一個 ring 會在插入任務時導致競爭,因此需要將 ring 上鎖,但是經過我的嘗試這會使效能大幅下降。
比較好的方法如下:
使每個 thread 都持有一個 io_uring 的 instance,也就是將每個 thread 的 ring 分開,如此一來一個 thread 只會將任務插入自己的 ring,這樣就不需要擔心 data race。
除此之外,可以在初始化 io_uring 時透過 `IORING_SETUP_ATTACH_WQ` flag 使每個 ring 共享同一個 asynchronous backend。
此 flag 開啟時,每個 ring 都可以插入任務,但是會共享同一個 kernel thread pool,而不是獨自擁有一個 kernel thread pool,如此一來可以降低 overhead 。
除了 github issue 的討論,Jens Axboe 在提出 [patch](https://lore.kernel.org/lkml/c40338a9989a45ec38f36e5937365eca6a089795.1580170474.git.asml.silence@gmail.com/) 時也有相關的說明。
> This allows creation of "sibling" io_urings, where we prefer to keep the SQ/CQ private, but want to share the async backend to minimize the amount of overhead associated with having multiple rings that belong to the same backend.
初始化的程式碼如下:
```c
pthread_t *threads = malloc(NT * sizeof(pthread_t));
thread_data_t *thread_data = malloc(NT * sizeof(thread_data_t));
/* Initialize io_uring and create threads */
struct io_uring_params params;
memset(¶ms, 0, sizeof(params));
io_uring_queue_init_params(QUEUE_DEPTH, &thread_data[0].ring, ¶ms);
params.flags = IORING_SETUP_ATTACH_WQ;
params.wq_fd = thread_data[0].ring.ring_fd;
for (int i = 1; i < NT; i++) {
io_uring_queue_init_params(QUEUE_DEPTH, &thread_data[i].ring, ¶ms);
thread_data[i].listenfd = listenfd;
thread_data[i].tid = i;
pthread_create(&threads[i], NULL, &server_loop, &thread_data[i]);
}
```
---
## 效能測試
### 測試環境
:::spoiler
```shell
$ lscpu
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
Address sizes: 39 bits physical, 48 bits virtual
CPU(s): 16
On-line CPU(s) list: 0-15
Thread(s) per core: 2
Core(s) per socket: 8
Socket(s): 1
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 167
Model name: 11th Gen Intel(R) Core(TM) i9-11900 @ 2.50GHz
```
:::
### 測量工具
使用 wrk 測量 HTTP keep-alive 模式
使用 htstress 測量沒有 HTTP keep-alive 的模式
- 測量方式
開啟 4 個執行緒發送 request ,連續測量 20 次之後取平均
### 測量結果
在製作資工系專題時, epoll 的版本還存在某些小 bug 沒有修正,因此當時的測量結果是 io_uring 略勝一籌。
後來我在學期末做了多次改進,發現其實 epoll 的版本好過 io_uring,甚至在 HTTP keep-alive 模式關閉時的差距更明顯。
- HTTP Keep-alive 模式下時的效能
![](https://i.imgur.com/rc5LjlF.png)
- 關閉 HTTP keep-alive 時的效能
![](https://i.imgur.com/WrvBXM8.png)
### 結論
雖然在我實做的網頁伺服器中 epoll 的表現似乎比 io_uring 更好,但是並不能直接把伺服器的效能當成兩種 I/O 模式比拼的結果,因為一個伺服器要做的事遠不只於 I/O,除此之外我在 epoll 和 io_uring 兩種版本的實做也有若干差異,例如 timer 的機制,以及結合多執行緒的方式都有不同的設計。
實際上 epoll 和 io_uring 的效能比較確實存在一些爭議,例如在 [Jens Axboe 的測試](https://twitter.com/axboe/status/1362271500489793539?s=20&t=u9ZEykiZawK4TqVURx4jKA
)中 io_uring 明顯勝過 epoll,但是在 [issue #189](https://github.com/axboe/liburing/issues/189) 中卻有人吵得沸沸揚揚。