--- tags: linux2022 --- # cserv 高效網頁伺服器 contributed by < `Xx-oX` > ## [cserv](https://github.com/sysprog21/cserv) > `cserv` is an event-driven and non-blocking web server. * `cserv` 是一個高效的網頁伺服器 * 採用非阻塞式 I/O 的事件驅動架構 * 單執行緒、支援多核並具備 CPU 親和性 ### 執行方法 * 編譯 ```shell $ make ``` * 執行 ```shell $ ./cserv start ``` * 結束 ```shell $ ./cserv stop ``` * 檢查設定 ```shell $ ./cserv conf ``` 其中設定檔位於 `/conf/cserv.conf`,可以設定 log 路徑、等級、worker process 數量、每個 worker 最大連線數、coroutine stack 大小、監聽連接埠以及 HTTP request line 跟 header 的大小 :::spoiler ``` # cserv configuration file # # 1. Each line is a configuration entry. # 2. The only supported expression: "key = value" # 3. Each entry is case sensitive. # Path of log file log_path = /tmp/cserv.log # Levels control which events are processed by logger. # Levels are cumulative; a logger can process logged objects at the level that # is set for the logger, and at all levels above the set level. # Available levels: CRIT, ERR, WARN, INFO log_level = CRIT # Number of worker processes. # default: the number of cpu available to the current process # For heavy disk I/O, set the value bigger than cpu number and reduce # worker_connections. worker_processes = default # Connection limit of each worker process. # system-wide access count = worker_processes * worker_connections worker_connections = 1024 # Coroutine stack size, aligned with PAGE_SIZE # default: PAGE_SIZE # maximum system stack memory = coroutine_stack_sizekbytes * worker_connections * worker_processes coroutine_stack_kbytes = default # Listen for incoming connection and bind on specific port. # sample: listen = 127.0.0.1:8081 # listen = 8081 listen = 8081 # HTTP request line and request header total size, in KiB. # default: 2 KiB client_header_buffer_kbytes = default ``` ::: > 非常簡單明瞭 >< ## 壓力測試與比較 ### 工具 * [ab (apache benching tool)](https://httpd.apache.org/docs/current/programs/ab.html) * [httperf](https://github.com/httperf/httperf) * [htstress](https://github.com/sysprog21/khttpd/blob/master/htstress.c) * compile: `$ gcc -std=gnu11 -Wall -Wextra -Werror -o htstress htstress.c -lpthread` * Usage: ``` htstress [options] [http://]hostname[:port]/path Options: -n, --number total number of requests (0 for inifinite, Ctrl-C to abort) -c, --concurrency number of concurrent connections -t, --threads number of threads (set this to the number of CPU cores) -u, --udaddr path to unix domain socket -h, --host host to use for http request -d, --debug debug HTTP response --help display this message ``` ### 比較對象 * [lwan](https://github.com/lpereira/lwan) * cserv 參考的對象 * [nginx](https://github.com/nginx/nginx) * 市占率第一的 web server * [apache](https://github.com/apache/httpd) * 老牌 web server > 市占率來源: [w3techs, 25 May 2022](https://w3techs.com/technologies/overview/web_server) ### 實驗結果 :::spoiler 實驗環境 ```shell $ lscpu Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Byte Order: Little Endian Address sizes: 43 bits physical, 48 bits virtual CPU(s): 8 On-line CPU(s) list: 0-7 Thread(s) per core: 2 Core(s) per socket: 4 Socket(s): 1 NUMA node(s): 1 Vendor ID: AuthenticAMD CPU family: 23 Model: 17 Model name: AMD Ryzen 7 2700U with Radeon Vega Mobile Gfx Stepping: 0 Frequency boost: enabled CPU MHz: 1700.000 CPU max MHz: 2200.0000 CPU min MHz: 1600.0000 BogoMIPS: 4391.49 Virtualization: AMD-V L1d cache: 128 KiB L1i cache: 256 KiB L2 cache: 2 MiB L3 cache: 4 MiB NUMA node0 CPU(s): 0-7 ``` ::: :::warning 在 windows (裝在 M.2 介面的 SSD)上的 wsl 測試時處理量比在 Linux 中還大,初估可能是將作業系統灌在外接硬碟 (usb3.0 介面的 HDD)導致的 I/O bound。 ::: :::warning 尚在研究如何調整 nginx 跟 apache 的設定,試圖控制多執行緒的變因 ::: #### ab 100000 requests with 500 requests at a time(concurrency) `ab -n 100000 -c 500 -k http://127.0.0.1:8081/` | | Requests per second | Time per request (mean, across all concurrent requests)| | -------- | -------- | -------- | | cserv | 17763.63 #/sec | 0.056 ms | | lwan | 15960.17 #/sec | 0.063 ms | | nginx | 53021.65 #/sec | 0.019 ms | | apache2 | 23558.77 #/sec | 0.042 ms | #### htstress 100000 requests with 500 requests at a time(concurrency) `./htstress -n 100000 -c 500 127.0.0.1:8081/` | | requests/sec | total time | | -------- | -------- | -------- | | cserv | 21457.851 #/sec | 4.660 s | | lwan | 18758.664 #/sec | 5.331 s | | nginx | 17322.661 #/sec | 5.773 s | | apache2 | 15122.679 #/sec | 6.613 s | #### httperf total 100000 requests with 500 connections > 200 * 500 = 100000 `httperf --server 127.0.0.1 --port 8081 --num-conn 500 --num-call 200 --http-version 1.0` | | requests/sec | connection rate | | -------- | -------- | -------- | | cserv | 21872.7 #/sec | 10936.3 conn/s | | lwan | 27230.0 #/sec | 136.1 conn/s | | nginx | 16056.9 #/sec | 159.0 conn/s | | apache2 | 10087.8 #/sec | 98.9 conn/s | :::spoiler log detail cserv ``` httperf --client=0/1 --server=127.0.0.1 --port=8081 --uri=/ --http-version=1.0 --send-buffer=4096 --recv-buffer=16384 --num-conns=500 --num-calls=200 Maximum connect burst length: 1 Total: connections 500 requests 1000 replies 500 test-duration 0.046 s Connection rate: 10936.3 conn/s (0.1 ms/conn, <=1 concurrent connections) Connection time [ms]: min 0.1 avg 0.1 max 0.2 median 0.5 stddev 0.0 Connection time [ms]: connect 0.0 Connection length [replies/conn]: 1.000 Request rate: 21872.7 req/s (0.0 ms/req) Request size [B]: 86.0 Reply rate [replies/s]: min 0.0 avg 0.0 max 0.0 stddev 0.0 (0 samples) Reply time [ms]: response 0.0 transfer 0.0 Reply size [B]: header 82.0 content 111.0 footer 0.0 (total 193.0) Reply status: 1xx=0 2xx=500 3xx=0 4xx=0 5xx=0 CPU time [s]: user 0.01 system 0.03 (user 25.4% system 74.1% total 99.5%) Net I/O: 3898.2 KB/s (31.9*10^6 bps) Errors: total 500 client-timo 0 socket-timo 0 connrefused 0 connreset 500 Errors: fd-unavail 0 addrunavail 0 ftab-full 0 other 0 ``` lwan ``` httperf --client=0/1 --server=127.0.0.1 --port=8080 --uri=/ --http-version=1.0 --send-buffer=4096 --recv-buffer=16384 --num-conns=500 --num-calls=200 Maximum connect burst length: 1 Total: connections 500 requests 100000 replies 100000 test-duration 3.672 s Connection rate: 136.1 conn/s (7.3 ms/conn, <=1 concurrent connections) Connection time [ms]: min 2.5 avg 7.3 max 25.8 median 7.5 stddev 3.2 Connection time [ms]: connect 0.0 Connection length [replies/conn]: 200.000 Request rate: 27230.0 req/s (0.0 ms/req) Request size [B]: 86.0 Reply rate [replies/s]: min 0.0 avg 0.0 max 0.0 stddev 0.0 (0 samples) Reply time [ms]: response 0.0 transfer 0.0 Reply size [B]: header 164.0 content 1142.0 footer 0.0 (total 1306.0) Reply status: 1xx=0 2xx=0 3xx=0 4xx=100000 5xx=0 CPU time [s]: user 2.03 system 1.63 (user 55.3% system 44.4% total 99.7%) Net I/O: 37019.0 KB/s (303.3*10^6 bps) Errors: total 0 client-timo 0 socket-timo 0 connrefused 0 connreset 0 Errors: fd-unavail 0 addrunavail 0 ftab-full 0 other 0 ``` nginx ``` httperf --client=0/1 --server=127.0.0.1 --port=80 --uri=/ --http-version=1.0 --send-buffer=4096 --recv-buffer=16384 --num-conns=500 --num-calls=200 Maximum connect burst length: 1 Total: connections 500 requests 50500 replies 50000 test-duration 3.145 s Connection rate: 159.0 conn/s (6.3 ms/conn, <=1 concurrent connections) Connection time [ms]: min 4.1 avg 6.3 max 12.2 median 6.5 stddev 1.4 Connection time [ms]: connect 0.0 Connection length [replies/conn]: 100.000 Request rate: 16056.9 req/s (0.1 ms/req) Request size [B]: 86.0 Reply rate [replies/s]: min 0.0 avg 0.0 max 0.0 stddev 0.0 (0 samples) Reply time [ms]: response 0.0 transfer 0.0 Reply size [B]: header 249.0 content 10918.0 footer 0.0 (total 11167.0) Reply status: 1xx=0 2xx=50000 3xx=0 4xx=0 5xx=0 CPU time [s]: user 1.71 system 1.43 (user 54.4% system 45.5% total 99.9%) Net I/O: 174734.4 KB/s (1431.4*10^6 bps) Errors: total 500 client-timo 0 socket-timo 0 connrefused 0 connreset 500 Errors: fd-unavail 0 addrunavail 0 ftab-full 0 other 0 ``` apache ``` httperf --client=0/1 --server=127.0.0.1 --port=80 --uri=/ --http-version=1.0 --send-buffer=4096 --recv-buffer=16384 --num-conns=500 --num-calls=200 Maximum connect burst length: 1 Total: connections 500 requests 51000 replies 50500 test-duration 5.056 s Connection rate: 98.9 conn/s (10.1 ms/conn, <=1 concurrent connections) Connection time [ms]: min 6.0 avg 10.1 max 25.6 median 9.5 stddev 2.8 Connection time [ms]: connect 0.0 Connection length [replies/conn]: 101.000 Request rate: 10087.8 req/s (0.1 ms/req) Request size [B]: 86.0 Reply rate [replies/s]: min 10000.4 avg 10000.4 max 10000.4 stddev 0.0 (1 samples) Reply time [ms]: response 0.1 transfer 0.0 Reply size [B]: header 309.0 content 10918.0 footer 0.0 (total 11227.0) Reply status: 1xx=0 2xx=50500 3xx=0 4xx=0 5xx=0 CPU time [s]: user 3.02 system 2.03 (user 59.8% system 40.1% total 99.9%) Net I/O: 110369.7 KB/s (904.1*10^6 bps) Errors: total 500 client-timo 0 socket-timo 0 connrefused 0 connreset 500 Errors: fd-unavail 0 addrunavail 0 ftab-full 0 other 0 ``` ::: ## 細節分析 ### 目錄樹 ``` ~/linux2022/cserv (master) » tree . ├── common.mk ├── conf │   ├── cserv.conf │   └── cserv.pid ├── LICENSE ├── Makefile ├── README.md └── src ├── coro │   ├── sched.c │   ├── sched.h │   ├── switch.c │   └── switch.h ├── env.c ├── env.h ├── event.c ├── event.h ├── http │   ├── http.c │   ├── parse.c │   ├── parse.h │   ├── request.c │   ├── request.h │   ├── response.c │   └── response.h ├── internal.h ├── logger.c ├── logger.h ├── main.c ├── process.c ├── process.h ├── signal.c ├── syscall_hook.c └── util ├── buffer.h ├── cirbuf.h ├── conf.c ├── conf.h ├── hashtable.c ├── hashtable.h ├── list.h ├── memcache.c ├── memcache.h ├── net.h ├── rbtree.c ├── rbtree.h ├── shm.c ├── shm.h ├── spinlock.h ├── str.h ├── system.c └── system.h ``` * 其中 * src/coro 目錄中為 coroutine 相關程式碼 * src/http 目錄中為 http parsing 相關程式碼 * src/util 目錄中為一些酷資料結構跟功能(紅黑樹、hashtable、circular buffer、memcache、shared memory、spinlock...)的實作 * src 目錄中為主要程式碼部份 ### Non-blocking I/O multiplexing * non-blocking: 利用 timer 達成,攔截(hook)阻塞式的 `socket system call`,使其在 timer 終止時結束 * 6 個 system call ```c // @ /src/syscall_hook.h typedef int (*sys_connect)(int sockfd, const struct sockaddr *addr, socklen_t addrlen); typedef int (*sys_accept)(int sockfd, struct sockaddr *addr, socklen_t *addrlen); typedef ssize_t (*sys_read)(int fd, void *buf, size_t count); typedef ssize_t (*sys_recv)(int sockfd, void *buf, size_t len, int flags); typedef ssize_t (*sys_write)(int fd, const void *buf, size_t count); typedef ssize_t (*sys_send)(int sockfd, const void *buf, size_t len, int flags); ``` * 舉 read 為例,timeout 時便回傳錯誤,達到 non-blocking 特性 ```c // @ /src/syscall_hook.c ssize_t read(int fd, void *buf, size_t count) { ssize_t n; while ((n = real_sys_read(fd, buf, count)) < 0) { if (EINTR == errno) continue; if (!fd_not_ready()) return -1; if (add_fd_event(fd, EVENT_READABLE, event_rw_callback, current_coro())) return -2; schedule_timeout(READ_TIMEOUT); del_fd_event(fd, EVENT_READABLE); if (is_wakeup_by_timeout()) { errno = ETIME; return -3; } } return n; } ``` * I/O multiplexing: 利用強大的 [epoll](https://man7.org/linux/man-pages/man7/epoll.7.html),透過監聽 events 來決定處理的 I/O * timeout 時間設定(單位:毫秒) ```c //@ /src/syscall_hook.c #define CONN_TIMEOUT (10 * 1000) #define ACCEPT_TIMEOUT (3 * 1000) #define READ_TIMEOUT (10 * 1000) #define RECV_TIMEOUT (10 * 1000) #define WRITE_TIMEOUT (10 * 1000) #define SEND_TIMEOUT (10 * 1000) #define KEEP_ALIVE 60 ``` > TODO: 調整 timeout 時間,並作實驗 ### System call hooking * hook: 鉤子,將正常的 system call 跟自己寫的版本掛勾,如此一來當呼叫該 system call 時,就會運行自己寫的版本 * 真正意義上的 system call hooking 比較麻煩,因為要使所有呼叫該 symbol 的人都使用 hooked 的版本,要達成這個目標,會需要插入 kernel module,並且更改 system call table * 但如果只是想要整個專案範圍下的 hooking,可以簡單的宣告同樣名字的函式,並在該函式中呼叫原始版本的 system call * 在 linux 中可以利用 [dlsym](https://man7.org/linux/man-pages/man3/dlsym.3.html) 來獲得特定 symbol 的地址,cserv 中使用這個方式來獲得原始 system call 的備份 * 如果使用 gcc 編譯,在 `#include <dlfcn.h>` 之前,需要先 `#define _GNU_SOURCE`,否則會報錯 * 範例程式 ```c #define _GNU_SOURCE #include <dlfcn.h> #include <stdio.h> #include <string.h> #include <stdlib.h> #include <unistd.h> typedef ssize_t (*sys_write_t)(int, const void *, size_t); static sys_write_t sys_write = NULL; #define HOOK_SYSCALL(name) \ sys_##name = (sys_##name##_t) dlsym(RTLD_NEXT, #name) ssize_t write(int fd, const void *buf, size_t count) { printf("hooked\n"); ssize_t n; while ((n = sys_write(fd, buf, count)) < 0); return n; } int main() { HOOK_SYSCALL(write); write(STDOUT_FILENO, "Hello\n", strlen("Hello\n")); return 0; } ``` 編譯的時候記得加入 `-ldl` 執行結果如下 ``` $ ./a.out hooked Hello ``` 其中 HOOK_SYSCALL 參考了 cserv 中的寫法 ```c //@ /src/syscall_hook.c #define HOOK_SYSCALL(name) \ real_sys_##name = (sys_##name) dlsym(RTLD_NEXT, #name) //... HOOK_SYSCALL(connect); // real_sys_connect = (sys_connect) dlsym(RTLD_NEXT, connect) ``` 這邊 real_sys_... 就是該 system call 的原始備份,用在每個 hooked system call 中以及像 logger 等不需要 non-blocking 版本的情境 ### Master/Worker process * (理想中)在 1 個 cpu core 運行 1 個 worker process => 驗證見 CPU Affinity 部分 * 利用 mod 把 worker process 分配給各個 cpu ```c // @ /src/process.c for (int i = 0; i < g_worker_processes; i++) { struct process *p = &worker[i]; p->pid = INVALID_PID; p->cpuid = i % get_ncpu(); } ``` * 每個 process 只有一個 thread => 沒有 thread-pool 設計 * 由一個 Master process 分配多個 worker process 執行工作 * process 間達到 load balancing * 還在釐清方法 ### CPU Affinity 執行 `./cserv start` 啟動伺服器後使用 `ps` 命令可以得到以下結果 ```shell $ ps -ef | grep cserv # UID PID PPID C STIME TTY TIME CMD ycwu4142 793 8 0 00:03 pts/0 00:00:00 cserv: master process ycwu4142 799 793 0 00:03 pts/0 00:00:00 cserv: worker process ycwu4142 800 793 0 00:03 pts/0 00:00:00 cserv: worker process ycwu4142 801 793 0 00:03 pts/0 00:00:00 cserv: worker process ycwu4142 802 793 0 00:03 pts/0 00:00:00 cserv: worker process ycwu4142 803 793 0 00:03 pts/0 00:00:00 cserv: worker process ycwu4142 804 793 0 00:03 pts/0 00:00:00 cserv: worker process ycwu4142 805 793 0 00:03 pts/0 00:00:00 cserv: worker process ycwu4142 806 793 0 00:03 pts/0 00:00:00 cserv: worker process ``` 從上面可得知,預設情況下 cserv 在 8 核的機器上面開了 8 個 worker process,pid 分別是 799 到 806,從 ppid 都是 793 可以證明每個 worker process 都是由 master process fork 出來的 接著撰寫一個簡單的 shell script 來檢視各 worker process 的 cpu affinity ```bash #!/bin/bash for i in {799..806}; do taskset -pc $i done ``` 執行後得到以下結果 ``` pid 799's current affinity list: 0 pid 800's current affinity list: 1 pid 801's current affinity list: 2 pid 802's current affinity list: 3 pid 803's current affinity list: 4 pid 804's current affinity list: 5 pid 805's current affinity list: 6 pid 806's current affinity list: 7 ``` 可以發現每個 worker process 各使用 1 個不同的 cpu core,完整的榨乾運算資源 ### Signal 在 Unix, Unix like, 及其他 POSIX 相容的作業系統中,[signal](https://en.wikipedia.org/wiki/Signal_(IPC)) 是一種常用的 IPC(Inter-Process Communication) 方式,可以看作應用程式間的 [interrupt](https://en.wikipedia.org/wiki/Interrupt),用來通知 process 某個事件的發生,並且中斷該 process 的正常流程。 signal 可以藉由多種方式觸發,比如使用 [kill](https://man7.org/linux/man-pages/man2/kill.2.html) systemcall 可以對任意 process 發送任意訊號。 常見的訊號有 * SIGTSTP: 暫停某個 process ,可以藉由 ctrl+z 發送 * SIGKILL: 強制立即終止某個 process * SIGSEGV: 記憶體區段錯誤時自動發送 * SIGINT: 中斷某個 process,可以藉由 ctrl+c 發送 * SIGQUIT: 終止某個 process 並將記憶體中的資訊存到核心 (core dump) > [man 7 signal](https://man7.org/linux/man-pages/man7/signal.7.html) 中有給出詳細的訊號列表 process 接收到訊號後會藉由自身的 signal handler 進行處理,以下是 signal handler 的大致流程,可以發現 signal 與 interrupt 不同的點是,前者會轉發回 process 並在 user mode 中處理,後者則是全部在 kernel mode 中交由核心處理 ```graphviz digraph g{ node[shape=record] rankdir=RR g [label="{user|...|(2)sighandler()|(3)return|...}|{kernel|(1)dosignal()||(4)system_call()|}"] } // User | Kernel // --------------------------- // ... -> do signal() // / // / // sighandler <- // | // v // return -> system_call() // / // / // ... <- ``` > 上圖改畫自 [Signals and Inter-Process Communication p.12](http://www.cs.unc.edu/~porter/courses/cse506/s16/slides/ipc.pdf) 因此我們可以藉由 signal handler 來處理某些特定的訊號,比如 SIGINT,預設的行為是終止 process,以下例子使用 [signal](https://man7.org/linux/man-pages/man2/signal.2.html) system call 來註冊當程式接收到 SIGINT 訊號後的行為 ```c= #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <signal.h> #include <string.h> static void signal_handler(int signo) { char *msg = "\nCaught signal!!\n"; write(STDOUT_FILENO, msg, strlen(msg)); exit(EXIT_SUCCESS); } int main(int argc, char *argv[]) { if (signal(SIGINT, signal_handler) == SIG_ERR) { printf("error!\n"); return EXIT_FAILURE; } for(int i=0;; ++i) { printf("%d...\n", i); sleep(1); } printf("exit!\n"); return EXIT_SUCCESS; } ``` > signal handler 中使用 write 而非 printf 是因為 printf 為 non-async-signal-safe,用於訊號處理程式時有機率出錯 > [ref1](https://unix.stackexchange.com/questions/609210/why-printf-is-not-asyc-signal-safe-function) > [ref2](https://stackoverflow.com/questions/16891019/how-to-avoid-using-printf-in-a-signal-handler) 執行結果如下 ``` $ ./signal 0... 1... 2... ^C # 按下 ctrl + c Caught signal!! ``` 如果將第 11 行拿掉,則按下 ctrl + c 後程式不會停止 而因為 signal system call 的作用在不同的系統/版本中可能會有不同的定義,i.e. 缺乏可攜性,更推薦使用 [sigaction](https://man7.org/linux/man-pages/man2/sigaction.2.html) 這個較為通用的 system call > 來源: [man2 signal](https://man7.org/linux/man-pages/man2/signal.2.html) 中的 warning 下面是使用 sigaction 改寫的版本,輸出結果如上 ```c #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <signal.h> #include <string.h> static void signal_handler(int signo) { char *msg = "\nCaught signal!!\n"; write(STDOUT_FILENO, msg, strlen(msg)); exit(EXIT_SUCCESS); } int main(int argc, char *argv[]) { struct sigaction sa; sa.sa_handler = signal_handler; sigemptyset(&sa.sa_mask); // 初始化(清空) signal 的集合 sigaction(SIGINT, &sa, NULL); for (int i=0; ; ++i) { printf("%d...\n", i); sleep(1); } printf("exit!\n"); return EXIT_SUCCESS; } ``` > 範例參考資料 > [gnu c example](https://www.gnu.org/software/libc/manual/html_node/Initial-Signal-Actions.html) > [Linux进程间通信(一): 信号 signal()、sigaction()](https://www.cnblogs.com/52php/p/5813867.html) 在 cserv 中就利用 sigaction 來處理幾種不同的 signal 並且對部分 signal 作了特殊的定義,如下方程式碼中,將 SIGHUP 當作重新設定的訊號 ```c //@ /src/process.h enum { SHUTDOWN_SIGNAL = SIGQUIT, TERMINATE_SIGNAL = SIGINT, RECONFIGURE_SIGNAL = SIGHUP }; ``` > SIGHUP: hangup,掛斷,預設行為為終止,出現在當某個使用者登出時,系統會發送此訊號給同一 session 中的所有 process :::warning 猜測: 可以藉由發送 SIGHUP 訊號來達成在不中止程式的情形下,改變 cserv 監聽的 port 等相關動作 ::: > TODO: 實驗、驗證 > 後來發現程式中並沒有使用到 `RECONFIGURE_SIGNAL` 例如當執行 `./cserv stop` 時,會對 master process 發送 SIGQUIT 訊號 > 其中 read_pidfile 會去讀取 ./conf/cserv.pid,來得到 master process 的 pid > pid file: 存放 pid 的檔案,通常用來給其他 service 或 daemon 讀取 pid 以便對其發送 signal ```c //@ /src/main.c // ... if (str_equal(argv[1], "stop")) { send_signal(SHUTDOWN_SIGNAL); printf("cserv: stopped.\n"); return 0; } // ... static void send_signal(int signo) { int pid = read_pidfile(); kill(pid, signo); } ``` 而在 `signal.c` 中,利用 sigaction 將各 signal 與對應的 signal handler 綁定 ```c //@ /src/signal.c static struct sys_signal signals[] = {{SIGQUIT, "stop", signal_handler}, {SIGTERM, "force-quit", signal_handler}, {SIGINT, "force-quit", signal_handler}, {SIGHUP, "reload", signal_handler}, {SIGCHLD, "default", signal_handler}, {SIGSYS, "ignored", SIG_IGN}, {SIGPIPE, "ignored", SIG_IGN}, {0, "", NULL}}; // ... __INIT static void signal_init() { struct sigaction sa; for (struct sys_signal *sig = signals; sig->signo; sig++) { memset(&sa, 0, sizeof(struct sigaction)); sa.sa_handler = sig->handler; sigemptyset(&sa.sa_mask); if (sigaction(sig->signo, &sa, NULL) == -1) { printf("Failed to invoke sigaction(%d)\n", sig->signo); exit(0); } } } ``` 當 master process 收到 SIGQUIT 訊號時,會將全域變數 `g_shall_stop` 設為 1 來告知 master process 要停止了,並向所有 worker process 發送 SIGQUIT 訊號 ```c //@ /src/signal.c static void master_signal_handler(int signo) { switch (signo) { case SIGQUIT: g_shall_stop = 1; break; case SIGTERM: case SIGINT: g_shall_exit = 1; break; case SIGCHLD: worker_process_get_status(); break; } } //@ /src/process.c void master_process_cycle() { //... for (;;) { if (g_shall_stop == 1) { WARN("notify worker processes to stop"); send_signal_to_workers(SHUTDOWN_SIGNAL); g_shall_stop = 2; } //... } // ... } ``` 而當 worker process 收到 SIGQUIT 時,也會將他們的 `g_shall_stop` 設為 1,以便在 process 迴圈內接收到停止的命令,然後將 connection 斷開並停止 ```c //@ /src/signal.c static void worker_signal_handler(int signo) { switch (signo) { case SIGQUIT: g_shall_stop = 1; INFO("In worker, received SIGQUIT"); break; case SIGTERM: case SIGINT: g_shall_exit = 1; INFO("In worker, received SIGTERM/SIGINT"); break; } } //@ /src/process.c static void worker_accept_cycle(void *args __UNUSED) { for (;;) { if (unlikely(g_shall_stop)) { set_proc_title("cserv: worker process is shutting down"); decrease_conn_and_check(); break; } // ... } } // ... static inline void decrease_conn_and_check() { if (--connection_count == 0) exit(0); } ``` 這邊 g_shall_stop 變數也被用來檢查 worker prosses 的終止是否正常,若 g_shall_stop 或 g_shall_exit 均為 0,但 worker 依然要停止時就是發生異常 ```c //@ /src/process.c void worker_exit_handler(int pid) { for (int i = 0; i < g_worker_processes; i++) { struct process *p = &worker[i]; if (p->pid == pid) p->pid = INVALID_PID; } /* If worker process exits without receiving the signal, it means * abnormal termination. */ if (!g_shall_stop && !g_shall_exit) shall_create_worker = true; if (worker_empty() && (g_shall_stop || g_shall_exit)) all_workers_exit = true; } ``` 最後 master process 會等到所有 worker process 結束才終止 ```c //@ /src/process.c - master_process_cycle() if (all_workers_exit) { WARN("cserv exit now..."); log_scan_write(); delete_pidfile(); exit(0); } ``` 總而言之就是利用 signal 來進行 master/worker process 間的簡單溝通 ### Shared memory * 用途 * 讓 master/worker process 間溝通資訊 * 用來設置 spinlock,在 logger 的初始化、設定各 process 的 accept file descriptor 時使用 * 用來分配 `log_object[]` 的空間,讓各個 logger (processes) 均能夠存取 * 見 `shm.*` * [解析 Linux 共享記憶體機制](https://hackmd.io/@sysprog/linux-shared-memory#POSIX-%E5%85%B1%E4%BA%AB%E8%A8%98%E6%86%B6%E9%AB%94) 中提到了數個共享記憶體的實作方法,cserv 採用了類似其中 POSIX shared memory 的方式,使用 mmap 來達成 * 因為 master 以及各個 worker process 都是同一支程式 fork 出來的,因此可以使用 mmap 的匿名共享映射機制,來讓各個 process 共用同一塊記憶體 * [mmap](https://man7.org/linux/man-pages/man2/mmap.2.html) ```c #include <sys/mman.h> void *mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset); int munmap(void *addr, size_t length); ``` * mmap 的功用是將 fd (file descriptor) 映射到一段虛擬記憶體空間,以便在不使用 read, write 等操作的情形下,對檔案或裝置進行操作 (由對該虛擬記憶體空間進行操作達成) * 其中 flag 的部分可以選擇 `MAP_PRIVATE` 或著 `MAP_SHARED` 來決定這段記憶體是否能讓別的 process 存取 * 此外,還可以使用 `MAP_ANONYMOUS` 來達到匿名映射,讓 mmap 不對 fd 做記憶體映射(通常將 fd 填入 -1)。 * 於是透過上述的 `MAP_SHARED` 加上 `MAP_ANONYMOUS` 就可以輕鬆地達成親子 process 間的共享記憶體 * 範例程式: ```c #include <sys/mman.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #define PG_SIZE 32 int main() { void *addr = mmap(NULL, PG_SIZE, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_SHARED, -1, 0); char *p = addr; memset(p, 'A', PG_SIZE); printf("content: %c %c %c %c\n", p[0], p[1], p[2], p[3]); int pid = fork(); if (pid == 0){ memset(p, 'B', PG_SIZE); printf("I'm child.\n"); } else if (pid > 0){ printf("I'm father.\n"); } else { printf("fork error\n"); exit(EXIT_FAILURE); } printf("content: %c %c %c %c\n", p[0], p[1], p[2], p[3]); if (addr == MAP_FAILED) { printf("mapping failed\n"); return EXIT_FAILURE; } munmap(addr, PG_SIZE); return EXIT_SUCCESS; } ``` 執行結果 ``` content: A A A A I'm father. content: B B B B I'm child. content: B B B B ``` 可以發現本來共享空間中的 A A A A 被子行程改成了 B B B B 後,親代行程也讀到了 B B B B,以此證明該空間是親子行程的共享記憶體空間 * cserv 中實際的使用如下 ```c //@ /src/util/shm.c struct shm { char *addr; size_t size, offset; spinlock_t lock; /* lock while allocating shared memory */ }; static struct shm *shm_object; ``` 首先宣告一個共享記憶體的結構,裡面自帶了一個 spinlock,用來在配置記憶體空間時鎖上,以防止其他 process 同時存取 ```c //@ /src/util/shm.c /* Allocate without reclaiming */ void *shm_alloc(size_t size_bytes) { size_bytes = ROUND_UP(size_bytes, MEM_ALIGN); spin_lock(&shm_object->lock); if (unlikely(shm_object->offset + size_bytes > shm_object->size)) { spin_unlock(&shm_object->lock); return NULL; } char *addr = shm_object->addr + shm_object->offset; shm_object->offset += size_bytes; spin_unlock(&shm_object->lock); return addr; } /* unit: page size */ void *shm_pages_alloc(unsigned int pg_count) { void *addr = mmap(NULL, pg_count * get_page_size(), PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_SHARED, -1, 0); return (addr == MAP_FAILED) ? NULL : addr; } ``` 先看 `shm_pages_alloc`,顧名思義這個函式就是配製一個 page 的共享記憶體空間,其中 page size 取決於每個機器 這邊可以看到 mmap 的使用,大致上跟前面的範例一樣 而 `shm_alloc` 這個函式則是在已經配置的 page 中分配空間出來 假設下圖是一個 shm_page ```graphviz digraph g{ node[shape=record] page [label="addr|...|...|<o>||"] offset [label="addr + offset"] offset -> page:o } ``` offset 指向目前的裝到哪邊的位置,size 則是 page size 因此要配置一個 size_bytes 大小的空間,會先將 size_bytes 進位,做記憶體對齊 然後檢查當下的空間夠不夠裝,最後才調整 offset 並返回 addr + offset 的指標,指向這塊空間的起始位置 ### Coroutine > 閱讀 [Life of a HTTP request, as seen by my toy web server](https://tia.mat.br/posts/2014/10/06/life_of_a_http_request.html) * [coroutine](https://en.wikipedia.org/wiki/Coroutine) * 跟 thread 很像,但是採用 [cooperative multitasking](https://en.wikipedia.org/wiki/Cooperative_multitasking) 而非 [ preemptively multitasking](https://en.wikipedia.org/wiki/Preemption_(computing)#PREEMPTIVE) * 擁有自己的 stack 跟 program counter * 可以利用 dispatcher 輕易的在 thread 之間轉換 * 可以中斷並且繼續執行,跟一般的函式不同 * 是一種在 multi-threading 普及之前產生的概念,優點是不會產生 race-condition * 通常由 user 控制 * 利用 coroutine 處理各個連線 (而非建立新的 thread) * 利用可中斷的特性來切換連線 ``` | Main loop | | Connection 1 | | Connection 2 | ---------------------------------------------------------- | | v *** --> resume --> *** *** *** *** <-- yield <-- *** *** *** ------------> resume -------------> *** *** *** <------------ yield <------------- *** *** *** *** *** --> resume --> *** *** ``` > 改繪自 [Life of a HTTP request, as seen by my toy web server](https://tia.mat.br/posts/2014/10/06/life_of_a_http_request.html) 中的 Diagram of main loop plus two coroutines * 可以看成給 worker 們的 task/job 或者說針對 worker 的專屬排程機制 * timeout 機制 ```c //@ /src/process.c void worker_process_cycle() { if (worker_init_proc && worker_init_proc()) { ERR("Failed to initialize worker process"); exit(0); } schedule_init(g_coro_stack_kbytes, g_worker_connections); event_loop_init(g_worker_connections); dispatch_coro(worker_accept_cycle, NULL); INFO("worker success running..."); schedule_cycle(); } ``` 每個 worker 運作時會先派發一個 coroutine 來處理 tcp accept (等待 connection 進來並且處理) 的工作,然後進入 coroutine 排程的循環 ```c //@ /src/process.c static void worker_accept_cycle(void *args __UNUSED) { for (;;) { if (unlikely(g_shall_stop)) { set_proc_title("cserv: worker process is shutting down"); decrease_conn_and_check(); break; } if (unlikely(g_shall_exit)) exit(0); int connfd = worker_accept(); if (likely(connfd > 0)) { if (dispatch_coro(handle_connection, (void *) (intptr_t) connfd)) { WARN("system busy to handle request."); close(connfd); continue; } increase_conn(); } else if (connfd == 0) { schedule_timeout(200); continue; } } } ``` 而在 worker_accept_cycle 中,如果 worker 有新的連線進入,則分派一個 coroutine 來處理,如果無法立即處理則呼叫 `schedule_timeout()` 將該 coroutine 放進 inactive list 中等待, timeout 時再行處理 ```c //@ /src/coro/sched.c void schedule_cycle() { for (;;) { check_timeout_coroutine(); run_active_coroutine(); sched.policy(get_recent_timespan()); run_active_coroutine(); } } ``` ```c //@ /src/coro/sched.c static void check_timeout_coroutine() { struct timer_node *node; long long now = get_curr_mseconds(); while ((node = get_recent_timer_node())) { if (now < node->timeout) return; rb_erase(&node->node, &sched.inactive); timeout_coroutine_handler(node); memcache_free(sched.cache, node); } } static inline void timeout_coroutine_handler(struct timer_node *node) { struct rb_node *recent; while ((recent = node->root.rb_node)) { struct coroutine *coro = container_of(recent, struct coroutine, node); rb_erase(recent, &node->root); coro->active_by_timeout = 1; move_to_active_list_tail_direct(coro); } } ``` 每個排程循環都會先檢查有沒有等待超過 timeout 的 coroutine 如果有的話,就將它從 inactive list 中轉移到 active list 的尾端 並且執行 active list 頭部的 coroutine (替換調目前的工作,讓整個排程持續前進) `sched.policy(get_recent_timespan())` 這邊會對應到 `event_cycle` > (`sched.policy = event_cycle;`) ```c //@ /src/event.c void event_cycle(int ms /* in milliseconds */) { int value = epoll_wait(evloop.epoll_fd, evloop.ev, evloop.max_conn, ms); if (value <= 0) return; for (int i = 0; i < value; i++) { struct epoll_event *ev = &evloop.ev[i]; struct fd_event *fe = &evloop.array[ev->data.fd]; fe->proc(fe->args); } } ``` 在 event cycle 中利用 epoll 來監聽各個事件,並在觸發事件時通知。 這邊 `epoll_wait()` 會佔用 `get_recent_timespan()` 回傳的值,也就是距離目前的 timer_node timeout 還剩下的時間 ```c //@ /src/coro/sched.c static inline int get_recent_timespan() { struct timer_node *node = get_recent_timer_node(); if (!node) return 10 * 1000; /* at least 10 seconds */ int timespan = node->timeout - get_curr_mseconds(); return (timespan < 0) ? 0 : timespan; } ``` > [man epoll_wait(2)](https://man7.org/linux/man-pages/man2/epoll_wait.2.html) > The timeout argument specifies the number of milliseconds that epoll_wait() will block. 這段時間中若有事件觸發,則結束等待,切換處理下一個 coroutine (透過 `run_active_coroutine()`) :::warning 這樣算是 preemptive 或者是 non-preemptive 的排程呢? 我認為對於 kernel 來說是 non-preemptive,而整個排程機制則是 preemptive 的 有點矛盾,不知道如何解釋 ::: * fast task switching => 重寫 context switch 組合語言,針對此程式達到更高效率的 task switching * 見 `src/coro/switch.*` * 在 coroutine 排程中利用紅黑樹進行更高效率的搜尋 * 見 `src/coro/sched.*` ### Memcache > 參見 [Site Cache vs Browser Cache vs Server Cache: What’s the Difference?](https://wp-rocket.me/blog/different-types-of-caching/) * 實作一套 memcache 系統 * 用來處理 Server Cache,在伺服器端先將網頁內容預先加載好,以便在日後接收到 request 時可以立即反應 * 在 cserv 中用於兩個地方 * caching http_request * server cache * 加速 request 的處理 * 每次進入 http_request_handler 時都先從 memcache 中尋找,更改之後再存回去 memcache 中 * caching coroutine 排程中的 timer node * active 的 timer node 被放進 memcache * 用來判斷這個 timer 是否為 active * 利用紅黑樹管理 inactive 的 timer,一旦 active 就將這個 timer 放進 memcache 以便快速存取 * 程式碼分析 * 站在呼叫端的角度命名 * memcache_alloc: 從 memcache 中提取 element 出來 ```c //@ /src/util/memcache.c void *memcache_alloc(struct memcache *cache) { return (cache->curr) ? remove_element(cache) : malloc(cache->obj_size); } static inline void *remove_element(struct memcache *cache) { return cache->elements[--cache->curr]; } ``` * memcache_free: 把 element 放進 memcache ```c //@ /src/util/memcache.c void memcache_free(struct memcache *cache, void *element) { add_element(cache, element); } static inline void add_element(struct memcache *cache, void *element) { if (cache->curr < cache->cache_size) { cache->elements[cache->curr++] = element; return; } free(element); } ``` :::warning 不了解的地方: ```c=43 // @/src/util/memcache.c max_cache_size >>= 2; while (cache->curr < max_cache_size) { void *element = malloc(cache->obj_size); if (!element) { free_pool(cache); return NULL; } add_element(cache, element); } ``` 為何 curr 的值要是 max_cache_size 的 1/4? ::: ### Logger * 利用 circular buffer 實作一套 logger 系統 * 運用 circular buffer 的意義在於當 log 數量超過限制的時候,可以有效的剔除(覆蓋)舊的資料,即 circular 的特性 * circular buffer 的實作位於 `/src/util/cirbuf.h` * 預設寫入 `/tmp/cserv.log` 檔案,更改 `/conf/cserv.log` 中的 `log_path` 來設定 * 分為 `CRIT`, `ERR`, `WARN`, `INFO` 四種等級 * `/conf/cserv.conf` 中的 `log_level` 為要寫入 log 檔案中的臨界點 (threshold) * cser 簡單的運用 enum 的性質來處理 threshold 的判斷 ```c //@ /src/logger.c void log_out(enum LOG_LEVEL level, const char *file, int line, const char *fmt, ...) { if (level > log_level) return; // ... } ``` * 利用 cserv 的每個 process 來作為 logger,因此會有 worker process + 1 個 logger * 由呼叫 log_out 的 process 自行處理自己的 log * 利用 shared memory 來讓每個 logger 共通 `log_object[]`,以便作業 * log_object 可視為 logger 陣列,定義了每個 logger 的 pid, logger 的 circular buffer 以及一個決定是否要 flush 的 flag * 驗證正確性:預計單獨把 logger 拉出來,建立一套具備 scalability 以及 portability 的 log 套件 * 剛好最近的其他專案需要,一石二鳥 ### Http * 實作了一套解析 http 表頭的 parser * 見 /src/http/* ## 改動 ### syscall_hook.h 把 `syscall_hook.c` 中的型態宣告以及 `real_sys_write` 搬到專屬的標頭檔 * 目的:讓 `logger.c` 在調用 `real_sys_write` 時不需要再做多餘的宣告 ```c //@ /src/syscall_hook.h #ifndef CORE_SYSCALL_HOOK_H #define CORE_SYSCALL_HOOK_H #include <sys/socket.h> typedef int (*sys_connect)(int sockfd, const struct sockaddr *addr, socklen_t addrlen); typedef int (*sys_accept)(int sockfd, struct sockaddr *addr, socklen_t *addrlen); typedef ssize_t (*sys_read)(int fd, void *buf, size_t count); typedef ssize_t (*sys_recv)(int sockfd, void *buf, size_t len, int flags); typedef ssize_t (*sys_write)(int fd, const void *buf, size_t count); typedef ssize_t (*sys_send)(int sockfd, const void *buf, size_t len, int flags); /* declared in syscall_hook.c */ extern sys_write real_sys_write; #endif ``` ## TODO Note - [ ] shm 用來在不中斷程式的情形下對其做一系列操作 - [ ] 驗證 memcache 實作的正確性 - [ ] 驗證 logger 實作的正確性 - [ ] syscall_hook.c 中攔截 6 個系統呼叫,利用 timeout 達成 non-blocking,注意 1) timeout 時間長度 2) 是否需要攔截其他用到的系統呼叫 - [ ] 研究 signal.c 中 master 跟 worker 的交互溝通 - [ ] 如何動態變更 listen 的 port - [ ] /http 中的 Keep alive 部分功能是否完整 - [ ] epoll 的 LT/ET 考量 - [ ] /http 中 switch-case 換成 computed goto 的效能提升 - [ ] 參考/比較 [lwan](https://github.com/lpereira/lwan) 等專案