# 專題 ###### @XDEv11 @D4nnyLee > [sehttpd](https://hackmd.io/@sysprog/linux2020-sehttpd) > [高效能 Web 伺服器開發](https://hackmd.io/@sysprog/fast-web-server) > [Optimizing web servers for high throughput and low latency](https://dropbox.tech/infrastructure/optimizing-web-servers-for-high-throughput-and-low-latency) > [Efficient IO with io_uring](https://kernel.dk/io_uring.pdf?fbclid=IwAR1QjL8gviL3yOpKgbMk5NMc_OgFH92rfaLRrc-EFJGr_GUpGfXrPt77sPI) > [Epoll vs. io_uring 效能測試與比較](https://hackmd.io/@shanvia/B1Ds1vlAD) > [zzzxxx00019 / sehttpd-IO_URING](https://github.com/zzzxxx00019/sehttpd-IO_URING) > [Shanola / sehttpd](https://github.com/Shanola/sehttpd) > [Timing wheels](https://www.slideshare.net/supperniu/timing-wheels) > [sysprog21 / timeout](https://github.com/sysprog21/timeout) [TOC] ## TODO - [x] 從 github 抓 [liburing](https://github.com/axboe/liburing) 測試 測試 io_uring, [example](https://github.com/axboe/liburing/tree/master/examples) - [ ] 釐清 [liburing](https://github.com/axboe/liburing) 測試失敗原因 - [x] 換成使用 [wrk2](https://github.com/giltene/wrk2) 做 benchmark - [ ] 改進 tick-based 為 tick-less [sysprog21 / timeout](https://github.com/sysprog21/timeout) [Timing wheels](https://www.slideshare.net/supperniu/timing-wheels) - [ ] Concurrent design ## mainloop.c * [arpa/inet.h](https://man7.org/linux/man-pages/man0/arpa_inet.h.0p.html) - definitions for internet operations * [fcntl.h](https://man7.org/linux/man-pages/man0/fcntl.h.0p.html) - file control options * [signal.h](https://www.man7.org/linux/man-pages/man0/signal.h.0p.html) * [sys/epoll.h](https://man7.org/linux/man-pages/man7/epoll.7.html) * [sys/socket.h](https://man7.org/linux/man-pages/man0/sys_socket.h.0p.html) * [unistd.h](https://man7.org/linux/man-pages/man0/unistd.h.0p.html) - standard symbolic constants and types ```cpp! #include <arpa/inet.h> #include <assert.h> #include <fcntl.h> #include <signal.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/epoll.h> #include <sys/socket.h> #include <unistd.h> #include "http.h" #include "logger.h" #include "timer.h" /* the length of the struct epoll_events array pointed to by *events */ #define MAXEVENTS 1024 #define LISTENQ 1024 ``` ### `open_listenfd()` [*](https://man7.org/linux/man-pages/man3/socket.3p.html) `int socket(int domain, int type, int protocol);` > domain=AF_INET - Internet domain sockets for use with IPv4 addresses. > type=SOCK_STREAM - Provides sequenced, reliable, bidirectional, connection-mode byte streams, and may provide a transmission mechanism for out-of-band data. > protocol=0 - Specifying a protocol of 0 causes socket() to use an unspecified default protocol appropriate for the requested socket type. [*](https://man7.org/linux/man-pages/man3/setsockopt.3p.html) `int setsockopt(int socket, int level, int option_name, const void *option_value, socklen_t option_len);` > level=SOL_SOCKET - Options to be accessed at socket level, not protocol level. > option_name=SO_REUSEADDR - Reuse of local addresses is supported. [*](https://man7.org/linux/man-pages/man3/bind.3p.html) `int bind(int socket, const struct sockaddr *address, socklen_t address_len);` > When a socket is created with [socket(2)](https://man7.org/linux/man-pages/man2/socket.2.html), it exists in a name space (address family) but has no address assigned to it. bind() assigns the address specified by addr to the socket referred to by the file descriptor sockfd. > [`struct sockaddr_in`](https://man7.org/linux/man-pages/man7/ip.7.html) [*](https://man7.org/linux/man-pages/man3/listen.3p.html) `int listen(int socket, int backlog);` > listen for socket connections and limit the queue of incoming connections. ```cpp! static int open_listenfd(int port) { int listenfd, optval = 1; /* Create a socket descriptor */ if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) return -1; /* Eliminate "Address already in use" error from bind. */ if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (const void *) &optval, sizeof(int)) < 0) return -1; /* Listenfd will be an endpoint for all requests to given port. */ struct sockaddr_in serveraddr = { .sin_family = AF_INET, .sin_addr.s_addr = htonl(INADDR_ANY), .sin_port = htons((unsigned short) port), .sin_zero = {0}, }; if (bind(listenfd, (struct sockaddr *) &serveraddr, sizeof(serveraddr)) < 0) return -1; /* Make it a listening socket ready to accept connection requests */ if (listen(listenfd, LISTENQ) < 0) return -1; return listenfd; } ``` ### `sock_set_non_blocking()` [*](https://man7.org/linux/man-pages/man3/fcntl.3p.html) `int fcntl(int fildes, int cmd, ...);` > cmd=F_GETFL - Get the file status flags and file access modes, defined in <fcntl.h>, for the file description associated with fildes. > cmd=F_SETFD - Set the file descriptor flags defined in <fcntl.h>, that are associated with fildes, to the third argument, arg, taken as type int. ```cpp! /* set a socket non-blocking. If a listen socket is a blocking socket, after * it comes out from epoll and accepts the last connection, the next accpet * will block unexpectedly. */ static int sock_set_non_blocking(int fd) { int flags = fcntl(fd, F_GETFL, 0); if (flags == -1) { log_err("fcntl"); return -1; } flags |= O_NONBLOCK; int s = fcntl(fd, F_SETFL, flags); if (s == -1) { log_err("fcntl"); return -1; } return 0; } ``` ### `main()` `Makefile` 中的參數 `-DUNUSED="__attribute__((unused))"` 定義了 [`UNUSED`](https://gcc.gnu.org/onlinedocs/gcc-3.2/gcc/Variable-Attributes.html)。 [*](https://man7.org/linux/man-pages/man3/sigaction.3p.html) `int sigaction(int sig, const struct sigaction *restrict act, struct sigaction *restrict oact);` [*](https://man7.org/linux/man-pages/man2/epoll_create1.2.html) `int epoll_create1(int flags);` [*](https://man7.org/linux/man-pages/man2/epoll_ctl.2.html) `int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);` [*](https://man7.org/linux/man-pages/man2/epoll_wait.2.html) `int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);` [*](https://man7.org/linux/man-pages/man3/accept.3p.html) `int accept(int socket, struct sockaddr *restrict address, socklen_t *restrict address_len);` ```cpp! /* TODO: use command line options to specify */ #define PORT 8081 #define WEBROOT "./www" int main(int argc, char *argv[]) { /* ./sehttpd WEBROOT PORT */ char *webroot = WEBROOT; int port = PORT; if (argc >= 2) webroot = argv[1]; if (argc >= 3) port = atoi(argv[2]); /* when a fd is closed by remote, writing to this fd will cause system * send SIGPIPE to this process, which exit the program */ if (sigaction(SIGPIPE, &(struct sigaction){.sa_handler = SIG_IGN, .sa_flags = 0}, NULL)) { log_err("Failed to install sigal handler for SIGPIPE"); return 0; } int listenfd = open_listenfd(port); int rc UNUSED = sock_set_non_blocking(listenfd); assert(rc == 0 && "sock_set_non_blocking"); /* create epoll and add listenfd */ int epfd = epoll_create1(0 /* flags */); assert(epfd > 0 && "epoll_create1"); struct epoll_event *events = malloc(sizeof(struct epoll_event) * MAXEVENTS); assert(events && "epoll_event: malloc"); http_request_t *request = malloc(sizeof(http_request_t)); init_http_request(request, listenfd, epfd, webroot); struct epoll_event event = { .data.ptr = request, .events = EPOLLIN | EPOLLET, }; epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &event); timer_init(); printf("Web server started.\n"); /* epoll_wait loop */ while (1) { int time = find_timer(); debug("wait time = %d", time); int n = epoll_wait(epfd, events, MAXEVENTS, time); handle_expired_timers(); for (int i = 0; i < n; i++) { http_request_t *r = events[i].data.ptr; int fd = r->fd; if (listenfd == fd) { /* we have one or more incoming connections */ while (1) { socklen_t inlen = 1; struct sockaddr_in clientaddr; int infd = accept(listenfd, (struct sockaddr *) &clientaddr, &inlen); if (infd < 0) { if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { /* we have processed all incoming connections */ break; } log_err("accept"); break; } rc = sock_set_non_blocking(infd); assert(rc == 0 && "sock_set_non_blocking"); request = malloc(sizeof(http_request_t)); if (!request) { log_err("malloc"); break; } init_http_request(request, infd, epfd, webroot); event.data.ptr = request; event.events = EPOLLIN | EPOLLET | EPOLLONESHOT; epoll_ctl(epfd, EPOLL_CTL_ADD, infd, &event); add_timer(request, TIMEOUT_DEFAULT, http_close_conn); } } else { if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || (!(events[i].events & EPOLLIN))) { log_err("epoll error fd: %d", r->fd); close(fd); continue; } do_request(events[i].data.ptr); } } } return 0; } ``` ## [http_parser.c](https://github.com/sysprog21/sehttpd/blob/master/src/http_parser.c) ### `cst_strcmp()` ```cpp=7 /* constant-time string comparison */ #define cst_strcmp(m, c0, c1, c2, c3) \ *(uint32_t *) m == ((c3 << 24) | (c2 << 16) | (c1 << 8) | c0) ``` 針對 Little-Endian 環境設計的常數時間複雜度的字串比較函式。 待比較字串為: * `m` 指向字串的指標 * `c0`, `c1`, `c2`, `c3` 長度為 4 的字串。 > 也就是 4 個字元 :question: 為什麼第二個字串的長度固定為 4 ? :thinking_face: 此函式是用來比較請求方法是否為此網頁伺服器接受的方法,而目前可接受的只有三種: 1. GET 2. POST 3. HEAD 因此最大長度只需要 4 。 > GET 的情況是用 `cst_strcmp(m, 'G', 'E', 'T', ' ')` 來做比較。 ### `http_parse_request_line()` 利用有限狀態機的手法實作。 遍歷 `r->buf` 中 index 從 `r->pos` 到 `r->last - 1` 的所有字元。 每次遍歷都會先紀錄當前指向的字元 `ch` 以及它的指標 `p` 。 並且會根據當前的狀態以及 `ch` 的值決定是否進行狀態轉移。 總共有下列不同的狀態: * `s_start = 0` 接受符合正則表達式 `[\r\n]*[A-Z_]` 的字串。 找到後狀態轉移到 `s_method` ,並紀錄目前的指標 `p` 為 `r->request_start`。 * `s_method` 接受符合 `[A-Z_]*[ ]` 的字串,因此 `[r->request_start, p)` 這個範圍的字串就是此次請求的方法, 利用 `cst_strcmp()` 來確認是否為預期的請求方法。 接著狀態轉移到 `s_spaces_before` * `s_spaces_before_uri` 接受符合 `[ ]*/` 的字串,並紀錄指標為 `r->uri_start` ,然後狀態轉移到 `s_after_slash_in_uri` 。 * `s_after_slash_in_uri` 接受符合 `[^ ][ ]` 的字串,並紀錄指標為 `r->uri_end` ,然後狀態轉移到 `s_http` > 到這邊 `[r->uri_start, r->uri_end)` 這個範圍的字串就是此次請求的 [URI](https://en.wikipedia.org/wiki/Uniform_Resource_Identifier) * `s_http`, `s_http_H`, `s_http_HT`, `s_http_HTT`, `s_http_HTTP` 接受符合 `[ ]*HTTP/` 的字串,然後狀態轉移到 `s_first_major_digit`。 * `s_first_major_digit`, `s_major_digit` 接受符合 `[0-9]+.` 的字串,並將讀取到的十進位數字記錄為 `r->http_major` ,然後狀態轉移到 `s_first_minor_digit`。 * `s_first_minor_digit`, `s_minor_digit`, `s_spaces_after_digit` 接受符合 `[0-9]+[ ]*` 的字串,將讀取到的十進位數字紀錄成 `r->http_minor`。 如果下個字元為 `\n` 則 `goto done;` 否則就狀態轉移到 `s_almost_done` 。 * `s_almost_done` 只接受 `\n` 字元並 `goto done;` 。 :::warning 在這個函式中會去設定 `r->request_end` 這個變數,讓他指向此次請求的字串結尾,但是我在程式碼中完全找不到這個變數會用在哪,因此也不知道為什麼需要這個變數。 ::: 回傳值的部分,只要遇到不符合正則表達式的字串,就會回傳 `HTTP_PARSER_INVALID_METHOD` 。 如果字串符合正則表達式,但是在 parse 完成前就跳出 for 迴圈 (也就是沒有跑到 `goto done;`), 則代表需要再執行一次此函式,所以回傳 `EAGAIN` 。 parse 完成後就回傳 `0` 。 ```cpp int http_parse_request_line(http_request_t *r) { uint8_t ch, *p, *m; size_t pi; enum { s_start = 0, s_method, s_spaces_before_uri, s_after_slash_in_uri, s_http, s_http_H, s_http_HT, s_http_HTT, s_http_HTTP, s_first_major_digit, s_major_digit, s_first_minor_digit, s_minor_digit, s_spaces_after_digit, s_almost_done } state; state = r->state; for (pi = r->pos; pi < r->last; pi++) { p = (uint8_t *) &r->buf[pi % MAX_BUF]; ch = *p; /* TODO: use computed goto for efficient dispatching */ switch (state) { /* HTTP methods: GET, HEAD, POST */ case s_start: r->request_start = p; if (ch == CR || ch == LF) break; if ((ch < 'A' || ch > 'Z') && ch != '_') return HTTP_PARSER_INVALID_METHOD; state = s_method; break; case s_method: if (ch == ' ') { m = r->request_start; switch (p - m) { case 3: if (cst_strcmp(m, 'G', 'E', 'T', ' ')) { r->method = HTTP_GET; break; } break; case 4: if (cst_strcmp(m, 'P', 'O', 'S', 'T')) { r->method = HTTP_POST; break; } if (cst_strcmp(m, 'H', 'E', 'A', 'D')) { r->method = HTTP_HEAD; break; } break; default: r->method = HTTP_UNKNOWN; break; } state = s_spaces_before_uri; break; } if ((ch < 'A' || ch > 'Z') && ch != '_') return HTTP_PARSER_INVALID_METHOD; break; /* space* before URI */ case s_spaces_before_uri: if (ch == '/') { r->uri_start = p; state = s_after_slash_in_uri; break; } switch (ch) { case ' ': break; default: return HTTP_PARSER_INVALID_REQUEST; } break; case s_after_slash_in_uri: switch (ch) { case ' ': r->uri_end = p; state = s_http; break; default: break; } break; /* space+ after URI */ case s_http: switch (ch) { case ' ': break; case 'H': state = s_http_H; break; default: return HTTP_PARSER_INVALID_REQUEST; } break; case s_http_H: switch (ch) { case 'T': state = s_http_HT; break; default: return HTTP_PARSER_INVALID_REQUEST; } break; case s_http_HT: switch (ch) { case 'T': state = s_http_HTT; break; default: return HTTP_PARSER_INVALID_REQUEST; } break; case s_http_HTT: switch (ch) { case 'P': state = s_http_HTTP; break; default: return HTTP_PARSER_INVALID_REQUEST; } break; case s_http_HTTP: switch (ch) { case '/': state = s_first_major_digit; break; default: return HTTP_PARSER_INVALID_REQUEST; } break; /* first digit of major HTTP version */ case s_first_major_digit: if (ch < '1' || ch > '9') return HTTP_PARSER_INVALID_REQUEST; r->http_major = ch - '0'; state = s_major_digit; break; /* major HTTP version or dot */ case s_major_digit: if (ch == '.') { state = s_first_minor_digit; break; } if (ch < '0' || ch > '9') return HTTP_PARSER_INVALID_REQUEST; r->http_major = r->http_major * 10 + ch - '0'; break; /* first digit of minor HTTP version */ case s_first_minor_digit: if (ch < '0' || ch > '9') return HTTP_PARSER_INVALID_REQUEST; r->http_minor = ch - '0'; state = s_minor_digit; break; /* minor HTTP version or end of request line */ case s_minor_digit: if (ch == CR) { state = s_almost_done; break; } if (ch == LF) goto done; if (ch == ' ') { state = s_spaces_after_digit; break; } if (ch < '0' || ch > '9') return HTTP_PARSER_INVALID_REQUEST; r->http_minor = r->http_minor * 10 + ch - '0'; break; case s_spaces_after_digit: switch (ch) { case ' ': break; case CR: state = s_almost_done; break; case LF: goto done; default: return HTTP_PARSER_INVALID_REQUEST; } break; /* end of request line */ case s_almost_done: r->request_end = p - 1; switch (ch) { case LF: goto done; default: return HTTP_PARSER_INVALID_REQUEST; } } } r->pos = pi; r->state = state; return EAGAIN; done: r->pos = pi + 1; if (!r->request_end) r->request_end = p; r->state = s_start; return 0; } ``` ### `http_parse_request_body()` 利用有限狀態機的手法實作。 ## list.h ### `struct list_head` ```cpp! #ifndef LIST_H #define LIST_H #include <stddef.h> typedef struct list_head { struct list_head *prev, *next; } list_head; ``` ### `INIT_LIST_HEAD` [`do {} while(0)`](http://www.bruceblinn.com/linuxinfo/DoWhile.html) 先用 `_ptr` 存 ptr 的值,避免 ptr 有 side effect,被執行多次而產生非預期的結果。 ```cpp! #define INIT_LIST_HEAD(ptr) \ do { \ list_head *_ptr = ptr; \ (_ptr)->next = (_ptr); \ (_ptr->prev) = (_ptr); \ } while (0) ``` ### `list_add()`, `list_add_tail()` ```cpp! /* Insert a new entry to two consecutive entries */ static inline void __list_add(list_head *_new, list_head *prev, list_head *next) { _new->next = next; next->prev = _new; prev->next = _new; _new->prev = prev; } static inline void list_add(list_head *_new, list_head *head) { __list_add(_new, head, head->next); } static inline void list_add_tail(list_head *_new, list_head *head) { __list_add(_new, head->prev, head); } ``` ### `list_del()` ```cpp! /* Delete an entry to two consecutive entries */ static inline void __list_del(list_head *prev, list_head *next) { prev->next = next; next->prev = prev; } static inline void list_del(list_head *entry) { __list_del(entry->prev, entry->next); } ``` ### `list_empty()` ```cpp! static inline int list_empty(list_head *head) { return (head->next == head) && (head->prev == head); } ``` ### `container_of` [`({})`](https://gcc.gnu.org/onlinedocs/gcc/Statement-Exprs.html) 有點類似於 `,` 運算子,藉此達到類似於函式回傳值的效果。 > The last thing in the compound statement should be an expression followed by a semicolon; the value of this subexpression serves as the value of the entire construct. ```cpp! #define container_of(ptr, type, member) \ ({ \ const typeof(((type *) 0)->member) *__mptr = (ptr); \ (type *) ((char *) __mptr - offsetof(type, member)); \ }) ``` ### `list_entry` ```cpp! #define list_entry(ptr, type, member) container_of(ptr, type, member) ``` ### `list_for_each` ```cpp! #define list_for_each(pos, head) \ for (pos = (head)->next; pos != (head); pos = pos->next) #endif ``` ## logger.h `debug` 這個 marco 在 `NDEBUG` 被定義時,會被前處理器轉為空,否則會轉為 `fprintf` 去輸出相關的錯誤訊息。 [`stderr`](https://www.gnu.org/software/libc/manual/html_node/Standard-Streams.html) [`##__VA_ARGS__`](https://gcc.gnu.org/onlinedocs/gcc/Variadic-Macros.html) ```cpp! #ifndef LOGGER_H #define LOGGER_H #include <errno.h> #include <stdio.h> #ifdef NDEBUG #define debug(MSG, ...) #else #define debug(MSG, ...) \ fprintf(stderr, "[DEBUG] (%s:%d): " MSG "\n", __FILE__, __LINE__, \ ##__VA_ARGS__) #endif #define log_err(MSG, ...) \ fprintf(stderr, "[ERROR] (%s:%d: errno: %s) " MSG "\n", __FILE__, \ __LINE__, errno == 0 ? "None" : strerror(errno), ##__VA_ARGS__) #endif ``` ## timer.h `timer_callback` 被定義為指向一個 `int (http_request_t *)` 函式的指標。 ```cpp! #ifndef TIMER_H #define TIMER_H #include <stdbool.h> #include "http.h" #define TIMEOUT_DEFAULT 500 /* ms */ typedef int (*timer_callback)(http_request_t *req); typedef struct { size_t key; bool deleted; /* if remote client close socket first, set deleted true */ timer_callback callback; http_request_t *request; } timer_node; int timer_init(); int find_timer(); void handle_expired_timers(); void add_timer(http_request_t *req, size_t timeout, timer_callback cb); void del_timer(http_request_t *req); #endif ``` ## timer.c * [sys/time.h](https://man7.org/linux/man-pages/man0/sys_time.h.0p.html) ### prio_queue_t 這邊用了 [binary heap](https://en.wikipedia.org/wiki/Binary_heap) 去實現一個 [priority_queue](https://en.wikipedia.org/wiki/Priority_queue),資料型態使用 `void *`,並用函數指標存放比較函數,達成 generic programming 的設計。 `swim` 與 `sink` 分別代表「浮」與「沉」,在放入或刪除元素後,得以進行調整,維護整個資料結構。 :::info 根結點改放在 `0`,簡化整個設計。 :smiling_imp:XDEv11 ::: ```cpp! #include <assert.h> #include <stdbool.h> #include <stdlib.h> #include <string.h> #include <sys/time.h> #include "logger.h" #include "timer.h" #define TIMER_INFINITE (-1) #define PQ_DEFAULT_SIZE 10 typedef bool (*prio_queue_comparator)(void *pi, void *pj); /* priority queue with binary heap */ typedef struct { void **priv; size_t size; size_t capacity; prio_queue_comparator comp; } prio_queue_t; static bool prio_queue_init(prio_queue_t *ptr, prio_queue_comparator comp, size_t capacity) { ptr->priv = malloc(sizeof(void *) * capacity); if (!ptr->priv) { log_err("prio_queue_init: malloc failed"); return false; } ptr->size = 0; ptr->capacity = capacity; ptr->comp = comp; return true; } static inline bool prio_queue_is_empty(prio_queue_t *ptr) { return ptr->size == 0; } static inline size_t prio_queue_size(prio_queue_t *ptr) { return ptr->size; } static inline void *prio_queue_min(prio_queue_t *ptr) { return prio_queue_is_empty(ptr) ? NULL : ptr->priv[0]; } static bool resize(prio_queue_t *ptr, size_t new_capacity) { if (new_capacity < ptr->size) { log_err("resize: new_capacity to small"); return false; } /* TODO: use memory pool to avoid unexpected fragmentation */ void **new_ptr = malloc(sizeof(void *) * new_capacity); if (!new_ptr) { log_err("resize: malloc failed"); return false; } memcpy(new_ptr, ptr->priv, sizeof(void *) * ptr->size); free(ptr->priv); ptr->priv = new_ptr; ptr->capacity = new_capacity; return true; } static inline void swap(prio_queue_t *ptr, size_t i, size_t j) { void *tmp = ptr->priv[i]; ptr->priv[i] = ptr->priv[j]; ptr->priv[j] = tmp; } static inline void swim(prio_queue_t *ptr, size_t k) { while (k > 0 && ptr->comp(ptr->priv[k], ptr->priv[(k - 1) >> 1])) { swap(ptr, k, (k - 1) >> 1); k = (k - 1) >> 1; } } static inline size_t sink(prio_queue_t *ptr, size_t k) { while ((k << 1) + 1 < ptr->size) { size_t j = (k << 1) + 1; if (j + 1 < ptr->size && ptr->comp(ptr->priv[j + 1], ptr->priv[j])) ++j; if (!ptr->comp(ptr->priv[j], ptr->priv[k])) break; swap(ptr, j, k); k = j; } return k; } /* remove the item with minimum key value from the heap */ static bool prio_queue_delmin(prio_queue_t *ptr) { if (prio_queue_is_empty(ptr)) return true; swap(ptr, 0, ptr->size); --ptr->size; sink(ptr, 0); if (ptr->size > 0 && ptr->size <= ptr->capacity / 4) { if (!resize(ptr, ptr->capacity / 2)) return false; } return true; } /* add a new item to the heap */ static bool prio_queue_insert(prio_queue_t *ptr, void *item) { if (ptr->size == ptr->capacity) { if (!resize(ptr, ptr->capacity * 2)) return false; } ptr->priv[ptr->size] = item; swim(ptr, ptr->size); ++ptr->size; return true; } ``` ### `timer_comp()` ```cpp! static bool timer_comp(void *ti, void *tj) { return ((timer_node *) ti)->key < ((timer_node *) tj)->key; } ``` ### `time_update()` [*](https://man7.org/linux/man-pages/man3/gettimeofday.3p.html)`int gettimeofday(struct timeval *restrict tp, void *restrict tzp);` > The gettimeofday() function shall obtain the current time, expressed as seconds and microseconds since the Epoch. 將秒 `tv_sec` 與微秒 `tv_usec` 轉為毫秒。 ```cpp! static prio_queue_t timer; static size_t current_msec; static void time_update() { struct timeval tv; int rc UNUSED = gettimeofday(&tv, NULL); assert(rc == 0 && "time_update: gettimeofday error"); current_msec = tv.tv_sec * 1000 + tv.tv_usec / 1000; } ``` ### `timer_init()` 初始化 priority_queue 並更新初始時間。 ```cpp! int timer_init() { bool ret UNUSED = prio_queue_init(&timer, timer_comp, PQ_DEFAULT_SIZE); assert(ret && "prio_queue_init error"); // time_update(); return 0; } ``` ### `find_timer()` 找到一個最小並且 `deleted` 不為真的 `timer`,並回傳跟當前時間的差距。 ```cpp! int find_timer() { int time = TIMER_INFINITE; while (!prio_queue_is_empty(&timer)) { time_update(); timer_node *node = prio_queue_min(&timer); assert(node && "prio_queue_min error"); if (node->deleted) { bool ret UNUSED = prio_queue_delmin(&timer); assert(ret && "prio_queue_delmin"); free(node); continue; } time = (int) (node->key - current_msec); time = (time > 0 ? time : 0); break; } return time; } ``` ### `handle_expired_timers()` 把 priority_queue 頂端已經超時或應該被刪除的 `timer` 去除。 ```cpp! void handle_expired_timers() { bool ret UNUSED; while (!prio_queue_is_empty(&timer)) { debug("handle_expired_timers, size = %zu", prio_queue_size(&timer)); time_update(); timer_node *node = prio_queue_min(&timer); assert(node && "prio_queue_min error"); if (node->deleted) { ret = prio_queue_delmin(&timer); assert(ret && "handle_expired_timers: prio_queue_delmin error"); free(node); continue; } if (node->key > current_msec) return; if (node->callback) node->callback(node->request); ret = prio_queue_delmin(&timer); assert(ret && "handle_expired_timers: prio_queue_delmin error"); free(node); } } ``` ### `add_timer()` 新增一個 `timer`,把 key 設定為超時的時間點,並放到 priority_queue 裡面。 ```cpp! void add_timer(http_request_t *req, size_t timeout, timer_callback cb) { timer_node *node = malloc(sizeof(timer_node)); assert(node && "add_timer: malloc error"); time_update(); req->timer = node; node->key = current_msec + timeout; node->deleted = false; node->callback = cb; node->request = req; bool ret UNUSED = prio_queue_insert(&timer, node); assert(ret && "add_timer: prio_queue_insert error"); } ``` ### `del_timer()` 把 `deleted` 設為真,從 priority_queue 中被取出時,才會真正進行記憶體釋放。 ```cpp! void del_timer(http_request_t *req) { // time_update(); timer_node *node = req->timer; assert(node && "del_timer: req->timer is NULL"); node->deleted = true; } ``` --- # io_uring :::info [Efficient IO with io_uring(Linux Documentation)](https://kernel.dk/io_uring.pdf?fbclid=IwAR1QjL8gviL3yOpKgbMk5NMc_OgFH92rfaLRrc-EFJGr_GUpGfXrPt77sPI) 重點整理 ::: ## 1.0 Introduction Linux 本身就有許多 file based IO, * `read(2)` / `write(2)` * `pread(2)` / `pwrite(2)` * 從一個給定的偏移量開始讀取(寫入) * `readv(2)` / `writev(2)` * vector-based * `preadv(2)` / `pwritev(2)` * `preadv2(2)` / `pwritev2(2)` * 加入一些改變行為的 flags 但這些都是同步 IO; 非同步 IO 的部分,POSIX 有 `aio_read(3)` / ` aio_write(3)`,但是效能很差。 linux 的確有的非同步 IO 介面 (aio),但有不少的限制與缺陷。 * 只支援 O_DIRECT (or un-buffered) 的輸入輸出 * 即使滿足所有限制,也不保證一定是非同步 * 過多的資料複製 而現在大多的應用,都沒有使用 aio,而是自己寫 thread pool 來處理非同步 IO,而這應該由 kernel 來做,可以得到更好的效能。 ## 2.0 Improving the status quo (現狀) 當延伸現有的架構是可行時,我們應該這麼做;然而在經過不少的努力後,很明顯地,我們需要一個全新的東西。 ## 3.0 New interface design goals 雖然決定開發一個全新的東西是個困難的決定,但也給了我們完全的自由,以下是幾個設計目標, * Easy to use, hard to misuse. * Extendable. * Feature rich. * Efficiency. * Scalability. ## 4.0 Enter io_uring 撇除上述的設計目標,起初的設計是圍繞在效率之上,以 aio 為借鏡,我們知道過多的記憶體複製,對於效率有著可見的傷害,因此在 submissions or completion events 中,我們不希望有任何的記憶體複製或是 [memory in-directions](https://en.wikipedia.org/wiki/Indirection)。 為了避免複製,kernel 與 application 必須共享某些資料, 最基本的事件,有提交請求與請求完成,這是一個生產者-消費者問題,提交請求時,application 是生產者,kernel 是消費者,而請求完成時,則相反,因此我們需要二個環狀資料結構來建立一個有效率的溝通通道,命名為 submission queue (SQ) 與 completion queue (CQ)。 ### 4.1 DATA STRUCTURES ```cpp! struct io_uring_cqe { __u64 user_data; __s32 res; __u32 flags; }; ``` * 後綴 `_cqe` 代表 Completion Queue Event * `user_data` 可以放置關於請求的資料,它單純只是被從 submission 帶到 completion * `res` 存放請求的執行結果 * `flags` 目前沒有用途 (可以存放其他 metadata) ```cpp! struct io_uring_sqe { __u8 opcode; __u8 flags; __u16 ioprio; __s32 fd; __u64 off; __u64 addr; __u32 len; union { __kernel_rwf_t rw_flags; __u32 fsync_flags; __u16 poll_events; __u32 sync_range_flags; __u32 msg_flags; }; __u64 user_data; union { __u16 buf_index; __u64 __pad2[3]; }; }; ``` * `opcode` (operation code) 指名請求的種類 * `flags` 通用的 flags * `ioprio` 請求的優先權 * `fd` (file descriptor) * `off` (offset) IO 開始存取的偏移 * `addr` IO 存取的位址 * `len` 資料長度(non-vectored IO transfer) / vector 數量(vectored IO transfer) * a union of flags that are specific to the op-code. * `user_data` 直接複製到 cqe * last union * `buf_index` * `__pad2` 確保 sqe 在記憶體中是以 64 bytes 進行對齊 ### 4.2 COMMUNICATION CHANNEL 這是一個生產者消費者問題。 #### CQ ring CQ ring 是 cqe 的陣列,當 kernel 產生了一個新的 cqe,會放入 CQ ring 並更新尾端,而當 application 處理一個 cqe 後,會更新前端。 這邊環的大小為二的冪,而更新尾端或前端只有單純地加一,所以要存取時,需要進行遮罩。 #### SQ ring SQ ring 不同的是,他存放到共享陣列的索引,這樣讓 application 可以一次提交多個請求。(`io_uring_enter()` 的參數 `to_submit`) ## 5.0 io_uring interface `int io_uring_setup(unsigned entries, struct io_uring_params *params);` `entries` 要為二的冪(1 ~ 4096),`params` 會被 kernel 讀取與寫入。 ```cpp! struct io_uring_params { __u32 sq_entries;__u32 cq_entries; __u32 flags;__u32 sq_thread_cpu; __u32 sq_thread_idle; __u32 resv[5]; struct io_sqring_offsets sq_off; struct io_cqring_offsets cq_off; }; ``` 因為 kernel 與 application 共用記憶體,application 需要透過 [`mmap(2)`](https://man7.org/linux/man-pages/man2/mmap.2.html) 來存取共享記憶體。 ```cpp! struct io_sqring_offsets { __u32 head; /* offset of ring head */ __u32 tail; /* offset of ring tail */ __u32 ring_mask; /* ring mask value */ __u32 ring_entries; /* entries in ring */ __u32 flags; /* ring flags */ __u32 dropped; /* number of sqes not submitted */ __u32 array; /* sqe index array */ __u32 resv1; __u64 resv2; }; ``` io_uring API 定義了一些關於 `mmap()` 的宏。 因為 cqes 本身就在 CQ ring 中,而 SQ ring 只是存放索引,所以還需要對映 sqes 的陣列。 ```cpp! #define IORING_OFF_SQ_RING 0ULL #define IORING_OFF_CQ_RING 0x8000000ULL #define IORING_OFF_SQES 0x10000000ULL ``` application 本身會定義類似下面的結構來存放所需變數, ```cpp! struct app_sq_ring { unsigned *head; unsigned *tail; unsigned *ring_mask; unsigned *ring_entries; unsigned *flags; unsigned *dropped; unsigned *array; }; ``` ,並且進行初始化設定。 ```cpp! struct app_sq_ring app_setup_sq_ring(int ring_fd,struct io_uring_params *p) { struct app_sq_ringsqring; void *ptr; ptr = mmap(NULL, p→sq_off.array + p→sq_entries *sizeof(__u32), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, ring_fd, IORING_OFF_SQ_RING); sring→head = ptr + p→sq_off.head; sring→tail = ptr + p→sq_off.tail; sring→ring_mask = ptr + p→sq_off.ring_mask; sring→ring_entries = ptr + p→sq_off.ring_entries; sring→flags = ptr + p→sq_off.flags; sring→dropped = ptr + p→sq_off.dropped; sring→array = ptr + p→sq_off.array; return sring; } ``` CQ ring 的部分也是透過類似的方式去設定;因為很大一部分的工作,在不同應用中,都只是做一樣的事,liburing 函數庫有提供一系列的輔助函數。 此外,application 也需要有方式通知 kernel,現在有新的請求了,則透過以下的系統呼叫。 `int io_uring_enter(unsigned int fd, unsigned int to_submit, unsigned int min_complete, unsigned int flags, sigset_t sig);` `flag` 中比較重要的是 `IORING_ENTER_GETEVENTS`,如果它有被設置的話,kernel 會一直持續等到 `min_complete` 個事件。 除非 application 要等 cqe,否則的話,只需要確認 CQ ring tail,而不必呼叫 `io_uring_enter()` 並設置 `IORING_ENTER_GETEVENTS` 來等待。 ### 5.1 SQE ORDERING sqes 通常都是獨立且沒有先後順序之分的,這樣也可以最大程度的獲得平行化帶來的效能;有一種情況是,我們需要所有資料完整的寫入,因為每個寫入完成的順序是隨機的,我們只在意所有寫入都完成的時候。 io_uring 也可以等到所有先前的事件都完成後,才開始處理新的請求,透過設定 `IOSQE_IO_DRAIN`。 ### 5.2 LINKED SQES `IOSQE_IO_DRAIN` 提供了整體的 barrier,而 io_uring 也有提供描述 sqe 的依賴關係,透過設置 `IOSQE_IO_LINK`,從第一個到接著連續的每個有設置的 sqe,形成一個鍊狀的依賴關係,前者完成後,後者才會開始運作。 ### 5.3 TIMEOUT COMMANDS `IORING_OP_TIMEOUT`,有二種類型的觸發機制,第一種是指明等待時間,sqe 中的 `addr` 需要指向這樣的一個結構; ```cpp! struct__kernel_timespec { int64_t tv_sec; longlongtv_nsec; }; ``` 第二種是等待 cqe 發生的次數,填在 sqe 的 `offset`。 ## 6.0 Memory ordering io_uring 安全且有效的溝通,是建立在正確的 [memory ordering](https://en.wikipedia.org/wiki/Memory_ordering) 之上,io_uring 提供了二個簡單的操作, `read_barrier(): Ensure previous writes are visible before doing subsequent memory reads.` `write_barrier(): Order this write after previous writes.` 根據不同的電腦架構,這些有可能是 no-ops,但在另一些上,我們的確需要它們來確保城市正確執行。 看下面 SQ ring 的例子,如果我們沒辦法保證 `sqring→tail` 的寫入,是在前面的寫入都完成並可見之後,kernel 就可能得到一個不完整的 sqe。 ```cpp! 1: sqe→opcode = IORING_OP_READV; 2: sqe→fd = fd; 3: sqe→off = 0; 4: sqe→addr = &iovec; 5: sqe→len = 1; 6: sqe→user_data = some_value; write_barrier();/* ensure previous writes are seen before tail write */ 7: sqring→tail = sqring→tail +1; write_barrier();/* ensure tail write is seen */ ``` 而對於 CQ ring 來說,application 僅僅需要在讀取 `cqring->tail` 前,放置一個 `read_barrier()` 來確保能夠看見 kernel 做的任何寫入。 ## 7.0 liburing library * 提供一些輔助函數,避免每次應用都要寫一些相同又麻煩的程式碼。 * 對於基本的情況,提供一個較簡化的 API。 ### 7.1 LIBURING IO_URING SETUP `int io_uring_queue_init(unsigned entries, struct io_uring *ring, unsigned flags);` `void io_uring_queue_exit(struct io_uring *ring);` 不需要像之前一樣,`io_uring_queue_init()` 會把基本的初始化做好,包含 CQ ring 與 SQ ring 總共三個記憶體對映。 當 application 完成後,只需透過 `io_uring_queue_exit()` 來進行資源釋放。 ```cpp! struct io_uring ring; io_uring_queue_init(ENTRIES,&ring,0); // ... io_uring_queue_exit(&ring); ``` ### 7.2 LIBURING SUBMISSION AND COMPLETION `struct io_uring_sqe *io_uring_get_sqe(struct io_uring *ring);` `int io_uring_submit(struct io_uring *ring);` `int io_uring_peek_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_ptr);` `int io_uring_wait_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_ptr);` `void io_uring_cqe_seen(struct io_uring *ring, struct io_uring_cqe *cqe);` 一個提交請求並等待它的完成的範例,透過 liburing 輔助函數如下。 `io_uring_get_sqe()` 回傳一個 sqe 的指標,sqe 填入完成後,透過 `io_uring_submit()` 來通知 kernel。 `io_uring_wait_cqe()` 會等待 cqe,或是我們可以使用 `io_uring_peek_cqe()` 來檢查是否有 cqe,當我們處理完 cqe 時呼叫 `io_uring_cqe_seen()` 來更新 CQ ring 的尾端。 :::info 文件中的範例,最一開始的 sqe 與 cqe 皆應為指標。 :smiling_imp:XDEv11 ::: ```cpp! struct io_uring_sqe* sqe; struct io_uring_cqe* cqe; /*get an sqe and fill in a READV operation */ sqe = io_uring_get_sqe(&ring); io_uring_prep_readv(sqe, fd, &iovec, 1, offset); /*tell the kernel we have an sqe ready for consumption */ io_uring_submit(&ring); /*wait for the sqe to complete */ io_uring_wait_cqe(&ring, &cqe); /* read and process cqe event */ app_handle_cqe(cqe); io_uring_cqe_seen(&ring, cqe); ``` `io_uring_prep_readv()` 只是一個填入 sqe 的輔助函數,還有其他許多類似的輔助函數。 ## 8.0 Advanced use cases and features 除了上述簡單的用法,io_uring 提供了一些需要選擇的特性 (features)。 ### 8.1 FIXED FILES AND BUFFERS 每次 sqe 中的檔案描述符,kernel 都必須重新取得並在完成時丟棄,對於經常 IO 的情境,這會是個很大的開銷,為了減輕這項影響,io_uring 提供一個方法,能夠預先註冊一個 file-set。 `int io_uring_register(unsigned int fd, unsigned int opcode, void *arg, unsigned int nr_args);` fd 表示 io_uring instance,若要註冊一個 file-set,`opcode` 使用 `IORING_REGISTER_FILES`,`arg` 指向一個已經打開的檔案描述符的陣列,`nr_args` 存放這個陣列的大小,註冊完成後,sqe 中的 `fd` 放置這個陣列的索引,`flags` 設定 `IOSQE_FIXED_FILE`。 也可以註冊一串的 IO buffers,`opcode` 使用 IORING_REGISTER_BUFFERS,`arg` 指向 struct iovec 的陣列並已經填入每個的位址與長度,`nr_args` 存放陣列大小,註冊完成後,`flags` 設定 `IORING_OP_READ_FIXED` 或 `IORING_OP_WRITE_FIXED`,`addr` 包含在這些 buffers 中的位址,`len` 包含請求的長度。 ### 8.2 POLLED IO 要使用 polling 的話,需要在 `io_uring_setup()` 或 liburing 的 `io_uring_queue_init()` 的 `flags` 加上 `IORING_SETUP_IOPOLL`,而此時必須使用 `io_uring_enter()`,設置 `IORING_ENTER_GETEVENTS` 來取得事件。 只有對於 polling 合理的 op-codes 可以使用,像是任何讀寫的指令 (`IORING_OP_READV`, `IORING_OP_WRITEV`, `IORING_OP_READ_FIXED`, `IORING_OP_WRITE_FIXED`)。 ### 8.3 KERNEL SIDE POLLING 儘管 io_uring 因為使用了更少的系統呼叫,而有著更高的效率,在一些情況下,我們仍然可以進一步的減少系統呼叫;其中一個特性是 kernel side polling,我們不必再透過 `io_uring_enter()` 來通知 kernel 有新的 sqe,而是 kernel 會有另一個執行序來輪詢。 `flags` 必須設置 `IORING_SETUP_SQPOLL`,另外,若希望這個執行序存在於特定的 CPU,可以設定 `IORING_SETUP_SQ_AFF`,並在 `sq_thread_cpu ` 指定。 (特別注意的是,`IORING_SETUP_SQPOLL` 是 privileged operation) 為了避免浪費太多 CPU 資源,執行序會在閒置一段時間後自動休眠,而此時,SQ rings 的 `flags` 會設置 `IORING_SQ_NEED_WAKEUP`,需要透過 `io_uring_enter()` 並設置 `IORING_ENTER_SQ_WAKEUP` 來喚醒。 ```cpp! /* fills in new sqe entries */ add_more_io(); /* * need to call io_uring_enter() to make the kernel notice the new IO * if polled and the thread is now sleeping. */ if((*sqring→flags)& IORING_SQ_NEED_WAKEUP) io_uring_enter(ring_fd, to_submit, to_wait, IORING_ENTER_SQ_WAKEUP); ``` 而休眠等待時間可以透過 io_uring_params 的 `sq_thread_idle` 來設定,單位為毫秒。 ## 9.0 Performance 最後,io_uring 的確達到了一開始的設計目標,透過二個環狀結構 (SQ ring & CQ ring),我們有很有效率的傳送機制,唯一複雜的部份是,我們需要顧慮到記憶體排序 (memory ordering) 的原始特性,但在不同應用間,事情都是相同的,隨著 liburing 的成熟,希望未來能夠符合大部分 application 的期待。 以下的數據已經有點過時,數值的絕對值並沒有太大意義,不過相對比較還是可以當作參考。 ### 9.1 RAW PERFORMANCE 測試的方式有很多種,這邊透過隨機讀取 block device 或檔案,io_uring 透過輪詢 (polling),可以達到 1.7M 4k IOPS,而 aio 只有 608K,當然這樣比較有點不公平,因為 aio 並沒有輪詢 (polling),如果關閉輪詢 (polling),io_uring 還是有 1.2M。 io_uring 也有 no-op 的命令,大約能達到 12M/s ~ 20M/s (messages per second)。 ### 9.2 BUFFERED ASYNC PERFORMANCE 前面提到 kernel 中的 buffered aio 會更有效率,很大的原因是跟 cached / un-cached data 有關,buffered IO 很依賴 kernels page cache 來得到好效能;而 userspace 的 application 無從得知這件事,如果要詢問這個資訊的話,勢必需要更多系統呼叫,而且結果可能幾毫秒後就改變了,因此application 的 thread pool 必須變成非同步的,而這樣至少需要二次 context switch,對於已經 cached 的資料,這將是一個很大的效能損耗。 而 io_uring 就沒有這樣的問題,對於已經 cached 的資料,它的處理效率就如同一般同步的介面。 ## 10.0 Further reading 對於一個全新的介面,目前還沒有被太多的應用採納,即使已經有一定文件說明,研究一個真正的應用,還是會讓你知道如何更好的運用。 ## 11.0 References [1] https://lore.kernel.org/linux-block/20190116175003.17880-1-axboe@kernel.dk/ [2] git://git.kernel.dk/fio [3] git://git.kernel.dk/liburing [4] https://lwn.net/Articles/776703/ --- # 實驗 ## 測試環境 * 作業系統 Kali Linux 2021.1 Linux Kernel v5.10.0 * 硬體 CPU: Intel(R) Core(TM) i5-7300HQ CPU @ 2.50GHz Memory: 11.5 GiB ## 重現過去的實驗 程式碼:[Shanola / sehttpd](https://github.com/Shanola/sehttpd) 報告:[Epoll vs. io_uring 效能測試與比較](https://hackmd.io/@shanvia/B1Ds1vlAD) ### 程式碼修正 直接執行 `make` 指令時會發現錯誤訊息顯示 `http_request_t` 這個型別並沒有 `bid` 這個欄位,因此需要自行加上去。 `http.h`: ```diff @@ -31,6 +31,7 @@ enum http_status { typedef struct { void *root; int fd; + int bid; int event_type; char buf[MAX_BUF]; /* ring buffer */ size_t pos, last; ``` 再次執行 `make` 後會發現和 io_uring 有關的 API 都出現 undefined reference 錯誤,因此判斷為 link 時並沒有 link 到 io_uring 的 library ,因此在 Makefile 做修改,讓必要的 library 也在最後產生執行檔的時候被 link 進來。 `Makefile`: ```diff @@ -32,7 +32,7 @@ deps += $(OBJS:%.o=%.o.d) $(TARGET): $(OBJS) $(VECHO) " LD\t$@\n" - $(Q)$(CC) -o $@ $^ $(LDFLAGS) + $(Q)$(CC) -o $@ $^ -luring $(LDFLAGS) check: all @scripts/test.sh ``` ### io_uring with KeepAlive ```shell $ taskset -c 0 ab -n 100000 -c 100 -k http://127.0.0.1:8081/ 1 ⨯ This is ApacheBench, Version 2.3 <$Revision: 1879490 $> Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/ Licensed to The Apache Software Foundation, http://www.apache.org/ Benchmarking 127.0.0.1 (be patient) Completed 10000 requests Completed 20000 requests Completed 30000 requests Completed 40000 requests Completed 50000 requests Completed 60000 requests Completed 70000 requests Completed 80000 requests Completed 90000 requests Completed 100000 requests Finished 100000 requests Server Software: Server Hostname: 127.0.0.1 Server Port: 8081 Document Path: / Document Length: 0 bytes Concurrency Level: 100 Time taken for tests: 2.326 seconds Complete requests: 100000 Failed requests: 0 Non-2xx responses: 100000 Keep-Alive requests: 100000 Total transferred: 10600000 bytes HTML transferred: 0 bytes Requests per second: 42987.62 [#/sec] (mean) Time per request: 2.326 [ms] (mean) Time per request: 0.023 [ms] (mean, across all concurrent requests) Transfer rate: 4449.89 [Kbytes/sec] received Connection Times (ms) min mean[+/-sd] median max Connect: 0 0 0.1 0 3 Processing: 1 2 1.3 2 27 Waiting: 0 2 1.3 2 27 Total: 1 2 1.3 2 27 Percentage of the requests served within a certain time (ms) 50% 2 66% 2 75% 2 80% 2 90% 3 95% 3 98% 6 99% 8 100% 27 (longest request) ``` ### epoll with KeepAlive ```shell $ taskset -c 0 ab -n 100000 -c 100 -k http://127.0.0.1:8081/ This is ApacheBench, Version 2.3 <$Revision: 1879490 $> Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/ Licensed to The Apache Software Foundation, http://www.apache.org/ Benchmarking 127.0.0.1 (be patient) Completed 10000 requests Completed 20000 requests Completed 30000 requests Completed 40000 requests Completed 50000 requests Completed 60000 requests Completed 70000 requests Completed 80000 requests Completed 90000 requests Completed 100000 requests Finished 100000 requests Server Software: seHTTPd Server Hostname: 127.0.0.1 Server Port: 8081 Document Path: / Document Length: 241 bytes Concurrency Level: 100 Time taken for tests: 3.596 seconds Complete requests: 100000 Failed requests: 1242 (Connect: 0, Receive: 0, Length: 1242, Exceptions: 0) Keep-Alive requests: 98758 Total transferred: 41280844 bytes HTML transferred: 23800678 bytes Requests per second: 27809.19 [#/sec] (mean) Time per request: 3.596 [ms] (mean) Time per request: 0.036 [ms] (mean, across all concurrent requests) Transfer rate: 11210.81 [Kbytes/sec] received Connection Times (ms) min mean[+/-sd] median max Connect: 0 0 0.1 0 3 Processing: 0 4 10.9 0 56 Waiting: 0 1 1.1 0 9 Total: 0 4 10.9 0 56 Percentage of the requests served within a certain time (ms) 50% 0 66% 1 75% 1 80% 2 90% 3 95% 43 98% 45 99% 47 100% 56 (longest request) ``` ### io_uring without KeepAlive ```shell $ taskset -c 0 ab -n 100000 -c 100 http://127.0.0.1:8081/ This is ApacheBench, Version 2.3 <$Revision: 1879490 $> Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/ Licensed to The Apache Software Foundation, http://www.apache.org/ Benchmarking 127.0.0.1 (be patient) apr_pollset_poll: The timeout specified has expired (70007) ``` ### epoll without KeepAlive ```shell $ taskset -c 0 ab -n 100000 -c 100 http://127.0.0.1:8081/ This is ApacheBench, Version 2.3 <$Revision: 1879490 $> Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/ Licensed to The Apache Software Foundation, http://www.apache.org/ Benchmarking 127.0.0.1 (be patient) Completed 10000 requests Completed 20000 requests Completed 30000 requests Completed 40000 requests Completed 50000 requests Completed 60000 requests Completed 70000 requests Completed 80000 requests Completed 90000 requests Completed 100000 requests Finished 100000 requests Server Software: seHTTPd Server Hostname: 127.0.0.1 Server Port: 8081 Document Path: / Document Length: 241 bytes Concurrency Level: 100 Time taken for tests: 4.465 seconds Complete requests: 100000 Failed requests: 0 Total transferred: 36900000 bytes HTML transferred: 24100000 bytes Requests per second: 22395.81 [#/sec] (mean) Time per request: 4.465 [ms] (mean) Time per request: 0.045 [ms] (mean, across all concurrent requests) Transfer rate: 8070.37 [Kbytes/sec] received Connection Times (ms) min mean[+/-sd] median max Connect: 0 2 0.3 2 4 Processing: 1 3 0.7 3 15 Waiting: 0 2 0.8 2 15 Total: 3 4 0.7 4 16 Percentage of the requests served within a certain time (ms) 50% 4 66% 4 75% 5 80% 5 90% 5 95% 5 98% 6 99% 6 100% 16 (longest request) ``` ### 結論 1. 在 KeepAlive 的情況下,失敗率確實從原本的約 1% 降為 0% 。 2. 但使用 epoll 時沒有 KeepAlive 的 Request Per Second 比有 KeepAlive 時還低,這部份跟報告的狀況是完全相反的。 3. 在沒有 KeepAlive 的情況下,使用 io_uring 會導致 timeout ,但報告中並沒有遇到此狀況,也因此無法拿來和有 KeepAlive 的情況比較。 ## 使用最新版的 [liburing](https://github.com/axboe/liburing) ### 測試 ```sh $ sudo make runtests [sudo] password for d4nnylee: ... Running test 35fa71a030ca-test: Test 35fa71a030ca-test timed out (may not be a failure) ... Running test connect: cqe 1, res 0, wanted -125 test_connect_timeout(): failed Test connect failed with ret 1 ... Running test link: wrong CQE order, got 1, expected 0 test_link_fail_ordering last failed Test link failed with ret 1 ... Running test poll-mshot-update: submitted -16, 392 poll-many failed Test poll-mshot-update failed with ret 1 ... Running test register-restrictions: submit: -77 test_restrictions_sqe_op failed Test register-restrictions failed with ret 2 ... Running test timeout-new: feature IORING_FEAT_EXT_ARG not supported. Test timeout-new failed with ret 1 ... Tests failed: <connect> <link> <poll-mshot-update> <register-restrictions> <timeout-new> make[1]: *** [Makefile:284: runtests] Error 1 make[1]: Leaving directory '/home/d4nnylee/Documents/liburing/test' make: *** [Makefile:23: runtests] Error 2 ``` 可以看到有部份的測試失敗了,目前還沒釐清原因。 有找到他人遇到相同問題 ([test suite is failing](https://github.com/axboe/liburing/issues/332)),不過目前也還沒看到問題點。 ### 2021 6/27 最新測試狀況 * git HEAD: ==603111a58a04926088bd0493ea5d4d2ff8aaef93== ``` $ sudo make runtests make[1]: Entering directory '/home/d4nnylee/Documents/liburing/test' Running test 232c93d07b74-test: Running test 35fa71a030ca-test: Test 35fa71a030ca-test timed out (may not be a failure) Running test 500f9fbadef8-test: Running test 7ad0e4b2f83c-test: Running test 8a9973408177-test: Running test 917257daa0fe-test: Running test a0908ae19763-test: Running test a4c0b3decb33-test: Running test accept: Running test accept-link: Running test accept-reuse: Running test accept-test: Running test across-fork: Running test splice: Running test b19062a56726-test: Running test b5837bd5311d-test: Running test ce593a6c480a-test: Running test close-opath: Running test connect: Running test cq-full: Running test cq-overflow: Running test cq-peek-batch: Running test cq-ready: Running test cq-size: Running test d4ae271dfaae-test: Running test d77a67ed5f27-test: Running test defer: Running test double-poll-crash: Running test eeed8b54e0df-test: Running test eventfd: Running test eventfd-disable: Running test eventfd-ring: Running test fadvise: Running test fallocate: Running test fc2a85cb02ef-test: Test needs failslab/fail_futex/fail_page_alloc enabled, skipped Running test file-register: Skipping files not supported Running test file-update: Running test files-exit-hang-poll: Running test files-exit-hang-timeout: Running test fixed-link: Running test fsync: Running test hardlink: linkat not supported, skipping Running test io-cancel: 1 -125 child failed 1 test_cancel_req_across_fork() failed Test io-cancel failed with ret 1 Running test io_uring_enter: io_uring_enter(3, 1, 0, 4294967295, (nil)) io_uring_enter(-1, 0, 0, 0, (nil)) io_uring_enter(0, 0, 0, 0, (nil)) io_uring_enter(3, 1, 0, 0, (nil)) Allocating 4096 sqes Submitting 4096 I/Os Done Waiting for 4096 events Reaping 4096 I/Os Submitting invalid sqe index. PASS Running test io_uring_register: RELIMIT_MEMLOCK: 1554682368 (1554682368) io_uring_register(-1, 0, (nil), 0) io_uring_register(3, 0, (nil), 0) io_uring_register(4, 4294967295, (nil), 0) io_uring_register(4, 0, 0x7ffdba275cb0, 1) io_uring_register(4, 0, 0x7ffdba275cb0, 1) io_uring_register(4, 0, 0x7ffdba275cb0, 1) Unable to map a huge page. Try increasing /proc/sys/vm/nr_hugepages by at least 1. Skipping the hugepage test reserve file-backed buffers io_uring_register(4, 0, 0x7ffdba275cb0, 1) io_uring_register(4, 0, 0x7ff7834cc010, 1000000) io_uring_register(4, 0, 0x7ff7834cc010, 1024) Not enough memory for this test, skipping io_uring_submit: opcode: 6 flags: 0x00000000 fd: 4 poll_events: 0x00000005 io_uring_register(4, 2, 0x7ffdba275bec, 1) PASS memfd registration isn't supported, skip Running test io_uring_setup: io_uring_setup(0, 0x7fff5a3199d0), flags: none, feat: none, resv: 0x00000000 0x00000000 0x00000000, sq_thread_cpu: 0 io_uring_setup(1, (nil)), flags: none, feat: none, resv: , sq_thread_cpu: 0 io_uring_setup(1, 0x7fff5a3199d0), flags: none, feat: none, resv: 0x00000001 0x00000001 0x00000001, sq_thread_cpu: 0 io_uring_setup(1, 0x7fff5a3199d0), flags: 0xffffffff, feat: none, resv: 0x00000000 0x00000000 0x00000000, sq_thread_cpu: 0 io_uring_setup(1, 0x7fff5a3199d0), flags: IORING_SETUP_SQ_AFF, feat: none, resv: 0x00000000 0x00000000 0x00000000, sq_thread_cpu: 0 io_uring_setup(1, 0x7fff5a3199d0), flags: IORING_SETUP_SQPOLL|IORING_SETUP_SQ_AFF, feat: none, resv: 0x00000000 0x00000000 0x00000000, sq_thread_cpu: 4 PASS Running test iopoll: Running test lfs-openat: Running test lfs-openat-write: Running test link: Running test link-timeout: Running test link_drain: Running test madvise: Running test mkdir: mkdirat not supported, skipping Running test multicqes_drain: Test multicqes_drain timed out (may not be a failure) Running test nop: Running test nop-all-sizes: Running test open-close: Running test openat2: Running test personality: Running test pipe-eof: Running test pipe-reuse: Running test poll: Running test poll-cancel: Running test poll-cancel-ton: Running test poll-link: Running test poll-many: Running test poll-mshot-update: submitted -16, 392 poll-many failed Test poll-mshot-update failed with ret 1 Running test poll-ring: Running test poll-v-poll: Running test probe: Running test read-write: Running test register-restrictions: submit: -77 test_restrictions_sqe_op failed Test register-restrictions failed with ret 2 Running test rename: Rename not supported, skipping Running test ring-leak: Running test ring-leak2: Running test rw_merge_test: Running test self: Running test send_recv: Non-registered SQPOLL not available, skipping Running test send_recvmsg: Running test shared-wq: Running test short-read: Running test shutdown: Shutdown not supported, skipping Running test sigfd-deadlock: Running test socket-rw: Running test socket-rw-eagain: Running test sq-full: Running test sq-poll-dup: No SQPOLL sharing, skipping No SQPOLL sharing, skipping No SQPOLL sharing, skipping Running test sq-poll-kthread: Running test sq-poll-share: No SQPOLL sharing, skipping Running test sqpoll-disable-exit: Running test sqpoll-exit-hang: Skipping Running test sqpoll-sleep: Running test sq-space_left: Running test stdout: This is a pipe test This is a fixed pipe test Running test submit-reuse: Running test symlink: symlinkat not supported, skipping Running test teardowns: Running test thread-exit: Running test timeout: Running test timeout-new: feature IORING_FEAT_EXT_ARG not supported. Running test timeout-overflow: Skipping Running test unlink: Unlink not supported, skipping Running test wakeup-hang: Running test sendmsg_fs_cve: Running test rsrc_tags: doesn't support rsrc tags, skip Running test statx: Running test sq-full-cpp: Tests failed: <io-cancel> <poll-mshot-update> <register-restrictions> make[1]: *** [Makefile:294: runtests] Error 1 make[1]: Leaving directory '/home/d4nnylee/Documents/liburing/test' make: *** [Makefile:23: runtests] Error 2 ``` `connect`, `link` 的錯誤已經被修正,`timeout-new` 這個測試則是直接跳過。 `poll-mshot-update`, `register-restrictions` 這兩個測試還是不會通過,並且多了 `io-cancel` 會出現錯誤。 ## 使用 [wrk2](https://github.com/giltene/wrk2) 做 benchmark ### 使用到的不同的 sehttpd * 用 epoll: [jwang0306 / sehttpd](https://github.com/jwang0306/sehttpd) * 用 io_uring: [Shanola / sehttpd](https://github.com/Shanola/sehttpd) ### 測試結果 * epoll ```sh $ wrk -t2 -d60s -R1000 -c1000 -L http://127.0.0.1:8081 Running 1m test @ http://127.0.0.1:8081 2 threads and 1000 connections Thread calibration: mean lat.: 32.482ms, rate sampling interval: 190ms Thread calibration: mean lat.: 32.434ms, rate sampling interval: 190ms Thread Stats Avg Stdev Max +/- Stdev Latency -nanus -nanus 0.00us 0.00% Req/Sec 0.00 0.00 0.00 100.00% Latency Distribution (HdrHistogram - Recorded Latency) 50.000% 0.00us 75.000% 0.00us 90.000% 0.00us 99.000% 0.00us 99.900% 0.00us 99.990% 0.00us 99.999% 0.00us 100.000% 0.00us Detailed Percentile spectrum: Value Percentile TotalCount 1/(1-Percentile) 0.000 1.000000 0 inf #[Mean = -nan, StdDeviation = -nan] #[Max = 0.000, Total count = 0] #[Buckets = 27, SubBuckets = 2048] ---------------------------------------------------------- 1000 requests in 1.01m, 360.35KB read Socket errors: connect 0, read 0, write 0, timeout 28600 Requests/sec: 16.53 Transfer/sec: 5.96KB ``` * io_uring ```sh $ wrk -t2 -d60s -R1000 -c1000 -L http://127.0.0.1:8081 Running 1m test @ http://127.0.0.1:8081 2 threads and 1000 connections Thread calibration: mean lat.: 9223372036854776.000ms, rate sampling interval: 10ms Thread calibration: mean lat.: 9223372036854776.000ms, rate sampling interval: 10ms Thread Stats Avg Stdev Max +/- Stdev Latency -nanus -nanus 0.00us 0.00% Req/Sec 0.00 0.00 0.00 100.00% Latency Distribution (HdrHistogram - Recorded Latency) 50.000% 0.00us 75.000% 0.00us 90.000% 0.00us 99.000% 0.00us 99.900% 0.00us 99.990% 0.00us 99.999% 0.00us 100.000% 0.00us Detailed Percentile spectrum: Value Percentile TotalCount 1/(1-Percentile) 0.000 1.000000 0 inf #[Mean = -nan, StdDeviation = -nan] #[Max = 0.000, Total count = 0] #[Buckets = 27, SubBuckets = 2048] ---------------------------------------------------------- 0 requests in 1.01m, 0.00B read Socket errors: connect 0, read 1035, write 6218414, timeout 0 Requests/sec: 0.00 Transfer/sec: 0.00B ``` 上面兩結果可以發現 latency 都是 `-nan` ,這並不是預期會看到的結果。 使用 io_uring 的版本會發生此現象是因為在 [wrk2](https://github.com/giltene/wrk2) 剛開始跑的當下,Server 都會馬上斷線,並顯示 `Error: no buffer space available` 錯誤,且此版本用瀏覽器直接連線到 `127.0.0.1:8081` 時並不會顯示網頁內容,錯誤訊息顯示 `127.0.0.1` 回傳了 invalid response 。 使用 epoll 的版本可以正常的顯示網頁內容,且也不會在剛開始跑 benchmark 的時候馬上斷線,但 latency 還是顯示 `-nan`,目前注意到的部份是有處理的 request 總數都會等於同時連線的數量。 :notes: jserv 針對 [wrk2](https://github.com/giltene/wrk2) 進行以下修改: ```diff diff --git a/src/ae_epoll.c b/src/ae_epoll.c index 4823c28..bcbe7f4 100644 --- a/src/ae_epoll.c +++ b/src/ae_epoll.c @@ -73,7 +73,7 @@ static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { ee.events = 0; mask |= eventLoop->events[fd].mask; /* Merge old events */ - if (mask & AE_READABLE) ee.events |= EPOLLIN; + if (mask & AE_READABLE) ee.events |= (EPOLLIN|EPOLLHUP|EPOLLRDHUP); if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; ee.data.u64 = 0; /* avoid valgrind warning */ ee.data.fd = fd; @@ -114,7 +114,7 @@ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { int mask = 0; struct epoll_event *e = state->events+j; - if (e->events & EPOLLIN) mask |= AE_READABLE; + if (e->events & (EPOLLIN | EPOLLRDHUP | EPOLLHUP)) mask |= AE_READABLE; if (e->events & EPOLLOUT) mask |= AE_WRITABLE; if (e->events & EPOLLERR) mask |= AE_WRITABLE; if (e->events & EPOLLHUP) mask |= AE_WRITABLE; diff --git a/src/net.c b/src/net.c index 75916f7..ef6be92 100644 --- a/src/net.c +++ b/src/net.c @@ -17,7 +17,8 @@ status sock_close(connection *c) { status sock_read(connection *c, size_t *n) { ssize_t r = read(c->fd, c->buf, sizeof(c->buf)); *n = (size_t) r; - return r >= 0 ? OK : ERROR; + if (r == 0) return READ_EOF; + return r > 0 ? OK : ERROR; } status sock_write(connection *c, char *buf, size_t len, size_t *n) { diff --git a/src/net.h b/src/net.h index ed9cbb2..a4648f9 100644 --- a/src/net.h +++ b/src/net.h @@ -9,7 +9,8 @@ typedef enum { OK, ERROR, - RETRY + RETRY, + READ_EOF } status; struct sock { diff --git a/src/wrk.c b/src/wrk.c index 1049f0b..42da0da 100644 --- a/src/wrk.c +++ b/src/wrk.c @@ -653,18 +653,21 @@ static void socket_writeable(aeEventLoop *loop, int fd, void *data, int mask) { static void socket_readable(aeEventLoop *loop, int fd, void *data, int mask) { connection *c = data; size_t n; + int read_status = OK; do { - switch (sock.read(c, &n)) { + switch (read_status = sock.read(c, &n)) { case OK: break; case ERROR: goto error; case RETRY: return; + case READ_EOF: break; } if (http_parser_execute(&c->parser, &parser_settings, c->buf, n) != n) goto error; c->thread->bytes += n; } while (n == RECVBUF && sock.readable(c) > 0); + if (read_status == READ_EOF) goto error; return; error: ``` 測試方法: ```shell $ ./wrk -c 4 -t 4 -d 60s -R 16 -H 'Connection: Close' http://127.0.0.1:8081 ``` 參考執行輸出: ``` Running 1m test @ http://127.0.0.1:8081 4 threads and 4 connections Thread calibration: mean lat.: 1.201ms, rate sampling interval: 10ms Thread calibration: mean lat.: 1.189ms, rate sampling interval: 10ms Thread calibration: mean lat.: 1.172ms, rate sampling interval: 10ms Thread calibration: mean lat.: 1.200ms, rate sampling interval: 10ms Thread Stats Avg Stdev Max +/- Stdev Latency 1.14ms 322.00us 2.32ms 66.21% Req/Sec 4.30 21.08 111.00 95.99% 964 requests in 1.00m, 3.64MB read Requests/sec: 16.07 Transfer/sec: 62.04KB ``` > 照著修改之後目前可以得到正常的結果了! > 謝謝教授 > [name=D4nnyLee]