---
tags: linux2022
---
# cserv 高效網頁伺服器
contributed by < `Xx-oX` >
## [cserv](https://github.com/sysprog21/cserv)
> `cserv` is an event-driven and non-blocking web server.
* `cserv` 是一個高效的網頁伺服器
* 採用非阻塞式 I/O 的事件驅動架構
* 單執行緒、支援多核並具備 CPU 親和性
### 執行方法
* 編譯
```shell
$ make
```
* 執行
```shell
$ ./cserv start
```
* 結束
```shell
$ ./cserv stop
```
* 檢查設定
```shell
$ ./cserv conf
```
其中設定檔位於 `/conf/cserv.conf`,可以設定 log 路徑、等級、worker process 數量、每個 worker 最大連線數、coroutine stack 大小、監聽連接埠以及 HTTP request line 跟 header 的大小
:::spoiler
```
# cserv configuration file
#
# 1. Each line is a configuration entry.
# 2. The only supported expression: "key = value"
# 3. Each entry is case sensitive.
# Path of log file
log_path = /tmp/cserv.log
# Levels control which events are processed by logger.
# Levels are cumulative; a logger can process logged objects at the level that
# is set for the logger, and at all levels above the set level.
# Available levels: CRIT, ERR, WARN, INFO
log_level = CRIT
# Number of worker processes.
# default: the number of cpu available to the current process
# For heavy disk I/O, set the value bigger than cpu number and reduce
# worker_connections.
worker_processes = default
# Connection limit of each worker process.
# system-wide access count = worker_processes * worker_connections
worker_connections = 1024
# Coroutine stack size, aligned with PAGE_SIZE
# default: PAGE_SIZE
# maximum system stack memory = coroutine_stack_sizekbytes * worker_connections * worker_processes
coroutine_stack_kbytes = default
# Listen for incoming connection and bind on specific port.
# sample: listen = 127.0.0.1:8081
# listen = 8081
listen = 8081
# HTTP request line and request header total size, in KiB.
# default: 2 KiB
client_header_buffer_kbytes = default
```
:::
> 非常簡單明瞭 ><
## 壓力測試與比較
### 工具
* [ab (apache benching tool)](https://httpd.apache.org/docs/current/programs/ab.html)
* [httperf](https://github.com/httperf/httperf)
* [htstress](https://github.com/sysprog21/khttpd/blob/master/htstress.c)
* compile:
`$ gcc -std=gnu11 -Wall -Wextra -Werror -o htstress htstress.c -lpthread`
* Usage:
```
htstress [options] [http://]hostname[:port]/path
Options:
-n, --number total number of requests (0 for inifinite, Ctrl-C to abort)
-c, --concurrency number of concurrent connections
-t, --threads number of threads (set this to the number of CPU cores)
-u, --udaddr path to unix domain socket
-h, --host host to use for http request
-d, --debug debug HTTP response
--help display this message
```
### 比較對象
* [lwan](https://github.com/lpereira/lwan)
* cserv 參考的對象
* [nginx](https://github.com/nginx/nginx)
* 市占率第一的 web server
* [apache](https://github.com/apache/httpd)
* 老牌 web server
> 市占率來源: [w3techs, 25 May 2022](https://w3techs.com/technologies/overview/web_server)
### 實驗結果
:::spoiler
實驗環境
```shell
$ lscpu
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
Address sizes: 43 bits physical, 48 bits virtual
CPU(s): 8
On-line CPU(s) list: 0-7
Thread(s) per core: 2
Core(s) per socket: 4
Socket(s): 1
NUMA node(s): 1
Vendor ID: AuthenticAMD
CPU family: 23
Model: 17
Model name: AMD Ryzen 7 2700U with Radeon Vega Mobile Gfx
Stepping: 0
Frequency boost: enabled
CPU MHz: 1700.000
CPU max MHz: 2200.0000
CPU min MHz: 1600.0000
BogoMIPS: 4391.49
Virtualization: AMD-V
L1d cache: 128 KiB
L1i cache: 256 KiB
L2 cache: 2 MiB
L3 cache: 4 MiB
NUMA node0 CPU(s): 0-7
```
:::
:::warning
在 windows (裝在 M.2 介面的 SSD)上的 wsl 測試時處理量比在 Linux 中還大,初估可能是將作業系統灌在外接硬碟 (usb3.0 介面的 HDD)導致的 I/O bound。
:::
:::warning
尚在研究如何調整 nginx 跟 apache 的設定,試圖控制多執行緒的變因
:::
#### ab
100000 requests with 500 requests at a time(concurrency)
`ab -n 100000 -c 500 -k http://127.0.0.1:8081/`
| | Requests per second | Time per request (mean, across all concurrent requests)|
| -------- | -------- | -------- |
| cserv | 17763.63 #/sec | 0.056 ms |
| lwan | 15960.17 #/sec | 0.063 ms |
| nginx | 53021.65 #/sec | 0.019 ms |
| apache2 | 23558.77 #/sec | 0.042 ms |
#### htstress
100000 requests with 500 requests at a time(concurrency)
`./htstress -n 100000 -c 500 127.0.0.1:8081/`
| | requests/sec | total time |
| -------- | -------- | -------- |
| cserv | 21457.851 #/sec | 4.660 s |
| lwan | 18758.664 #/sec | 5.331 s |
| nginx | 17322.661 #/sec | 5.773 s |
| apache2 | 15122.679 #/sec | 6.613 s |
#### httperf
total 100000 requests with 500 connections
> 200 * 500 = 100000
`httperf --server 127.0.0.1 --port 8081 --num-conn 500 --num-call 200 --http-version 1.0`
| | requests/sec | connection rate |
| -------- | -------- | -------- |
| cserv | 21872.7 #/sec | 10936.3 conn/s |
| lwan | 27230.0 #/sec | 136.1 conn/s |
| nginx | 16056.9 #/sec | 159.0 conn/s |
| apache2 | 10087.8 #/sec | 98.9 conn/s |
:::spoiler log detail
cserv
```
httperf --client=0/1 --server=127.0.0.1 --port=8081 --uri=/ --http-version=1.0 --send-buffer=4096 --recv-buffer=16384 --num-conns=500 --num-calls=200
Maximum connect burst length: 1
Total: connections 500 requests 1000 replies 500 test-duration 0.046 s
Connection rate: 10936.3 conn/s (0.1 ms/conn, <=1 concurrent connections)
Connection time [ms]: min 0.1 avg 0.1 max 0.2 median 0.5 stddev 0.0
Connection time [ms]: connect 0.0
Connection length [replies/conn]: 1.000
Request rate: 21872.7 req/s (0.0 ms/req)
Request size [B]: 86.0
Reply rate [replies/s]: min 0.0 avg 0.0 max 0.0 stddev 0.0 (0 samples)
Reply time [ms]: response 0.0 transfer 0.0
Reply size [B]: header 82.0 content 111.0 footer 0.0 (total 193.0)
Reply status: 1xx=0 2xx=500 3xx=0 4xx=0 5xx=0
CPU time [s]: user 0.01 system 0.03 (user 25.4% system 74.1% total 99.5%)
Net I/O: 3898.2 KB/s (31.9*10^6 bps)
Errors: total 500 client-timo 0 socket-timo 0 connrefused 0 connreset 500
Errors: fd-unavail 0 addrunavail 0 ftab-full 0 other 0
```
lwan
```
httperf --client=0/1 --server=127.0.0.1 --port=8080 --uri=/ --http-version=1.0 --send-buffer=4096 --recv-buffer=16384 --num-conns=500 --num-calls=200
Maximum connect burst length: 1
Total: connections 500 requests 100000 replies 100000 test-duration 3.672 s
Connection rate: 136.1 conn/s (7.3 ms/conn, <=1 concurrent connections)
Connection time [ms]: min 2.5 avg 7.3 max 25.8 median 7.5 stddev 3.2
Connection time [ms]: connect 0.0
Connection length [replies/conn]: 200.000
Request rate: 27230.0 req/s (0.0 ms/req)
Request size [B]: 86.0
Reply rate [replies/s]: min 0.0 avg 0.0 max 0.0 stddev 0.0 (0 samples)
Reply time [ms]: response 0.0 transfer 0.0
Reply size [B]: header 164.0 content 1142.0 footer 0.0 (total 1306.0)
Reply status: 1xx=0 2xx=0 3xx=0 4xx=100000 5xx=0
CPU time [s]: user 2.03 system 1.63 (user 55.3% system 44.4% total 99.7%)
Net I/O: 37019.0 KB/s (303.3*10^6 bps)
Errors: total 0 client-timo 0 socket-timo 0 connrefused 0 connreset 0
Errors: fd-unavail 0 addrunavail 0 ftab-full 0 other 0
```
nginx
```
httperf --client=0/1 --server=127.0.0.1 --port=80 --uri=/ --http-version=1.0 --send-buffer=4096 --recv-buffer=16384 --num-conns=500 --num-calls=200
Maximum connect burst length: 1
Total: connections 500 requests 50500 replies 50000 test-duration 3.145 s
Connection rate: 159.0 conn/s (6.3 ms/conn, <=1 concurrent connections)
Connection time [ms]: min 4.1 avg 6.3 max 12.2 median 6.5 stddev 1.4
Connection time [ms]: connect 0.0
Connection length [replies/conn]: 100.000
Request rate: 16056.9 req/s (0.1 ms/req)
Request size [B]: 86.0
Reply rate [replies/s]: min 0.0 avg 0.0 max 0.0 stddev 0.0 (0 samples)
Reply time [ms]: response 0.0 transfer 0.0
Reply size [B]: header 249.0 content 10918.0 footer 0.0 (total 11167.0)
Reply status: 1xx=0 2xx=50000 3xx=0 4xx=0 5xx=0
CPU time [s]: user 1.71 system 1.43 (user 54.4% system 45.5% total 99.9%)
Net I/O: 174734.4 KB/s (1431.4*10^6 bps)
Errors: total 500 client-timo 0 socket-timo 0 connrefused 0 connreset 500
Errors: fd-unavail 0 addrunavail 0 ftab-full 0 other 0
```
apache
```
httperf --client=0/1 --server=127.0.0.1 --port=80 --uri=/ --http-version=1.0 --send-buffer=4096 --recv-buffer=16384 --num-conns=500 --num-calls=200
Maximum connect burst length: 1
Total: connections 500 requests 51000 replies 50500 test-duration 5.056 s
Connection rate: 98.9 conn/s (10.1 ms/conn, <=1 concurrent connections)
Connection time [ms]: min 6.0 avg 10.1 max 25.6 median 9.5 stddev 2.8
Connection time [ms]: connect 0.0
Connection length [replies/conn]: 101.000
Request rate: 10087.8 req/s (0.1 ms/req)
Request size [B]: 86.0
Reply rate [replies/s]: min 10000.4 avg 10000.4 max 10000.4 stddev 0.0 (1 samples)
Reply time [ms]: response 0.1 transfer 0.0
Reply size [B]: header 309.0 content 10918.0 footer 0.0 (total 11227.0)
Reply status: 1xx=0 2xx=50500 3xx=0 4xx=0 5xx=0
CPU time [s]: user 3.02 system 2.03 (user 59.8% system 40.1% total 99.9%)
Net I/O: 110369.7 KB/s (904.1*10^6 bps)
Errors: total 500 client-timo 0 socket-timo 0 connrefused 0 connreset 500
Errors: fd-unavail 0 addrunavail 0 ftab-full 0 other 0
```
:::
## 細節分析
### 目錄樹
```
~/linux2022/cserv (master) » tree
.
├── common.mk
├── conf
│ ├── cserv.conf
│ └── cserv.pid
├── LICENSE
├── Makefile
├── README.md
└── src
├── coro
│ ├── sched.c
│ ├── sched.h
│ ├── switch.c
│ └── switch.h
├── env.c
├── env.h
├── event.c
├── event.h
├── http
│ ├── http.c
│ ├── parse.c
│ ├── parse.h
│ ├── request.c
│ ├── request.h
│ ├── response.c
│ └── response.h
├── internal.h
├── logger.c
├── logger.h
├── main.c
├── process.c
├── process.h
├── signal.c
├── syscall_hook.c
└── util
├── buffer.h
├── cirbuf.h
├── conf.c
├── conf.h
├── hashtable.c
├── hashtable.h
├── list.h
├── memcache.c
├── memcache.h
├── net.h
├── rbtree.c
├── rbtree.h
├── shm.c
├── shm.h
├── spinlock.h
├── str.h
├── system.c
└── system.h
```
* 其中
* src/coro 目錄中為 coroutine 相關程式碼
* src/http 目錄中為 http parsing 相關程式碼
* src/util 目錄中為一些酷資料結構跟功能(紅黑樹、hashtable、circular buffer、memcache、shared memory、spinlock...)的實作
* src 目錄中為主要程式碼部份
### Non-blocking I/O multiplexing
* non-blocking: 利用 timer 達成,攔截(hook)阻塞式的 `socket system call`,使其在 timer 終止時結束
* 6 個 system call
```c
// @ /src/syscall_hook.h
typedef int (*sys_connect)(int sockfd,
const struct sockaddr *addr,
socklen_t addrlen);
typedef int (*sys_accept)(int sockfd,
struct sockaddr *addr,
socklen_t *addrlen);
typedef ssize_t (*sys_read)(int fd, void *buf, size_t count);
typedef ssize_t (*sys_recv)(int sockfd, void *buf, size_t len, int flags);
typedef ssize_t (*sys_write)(int fd, const void *buf, size_t count);
typedef ssize_t (*sys_send)(int sockfd, const void *buf, size_t len, int flags);
```
* 舉 read 為例,timeout 時便回傳錯誤,達到 non-blocking 特性
```c
// @ /src/syscall_hook.c
ssize_t read(int fd, void *buf, size_t count)
{
ssize_t n;
while ((n = real_sys_read(fd, buf, count)) < 0) {
if (EINTR == errno)
continue;
if (!fd_not_ready())
return -1;
if (add_fd_event(fd, EVENT_READABLE, event_rw_callback, current_coro()))
return -2;
schedule_timeout(READ_TIMEOUT);
del_fd_event(fd, EVENT_READABLE);
if (is_wakeup_by_timeout()) {
errno = ETIME;
return -3;
}
}
return n;
}
```
* I/O multiplexing: 利用強大的 [epoll](https://man7.org/linux/man-pages/man7/epoll.7.html),透過監聽 events 來決定處理的 I/O
* timeout 時間設定(單位:毫秒)
```c
//@ /src/syscall_hook.c
#define CONN_TIMEOUT (10 * 1000)
#define ACCEPT_TIMEOUT (3 * 1000)
#define READ_TIMEOUT (10 * 1000)
#define RECV_TIMEOUT (10 * 1000)
#define WRITE_TIMEOUT (10 * 1000)
#define SEND_TIMEOUT (10 * 1000)
#define KEEP_ALIVE 60
```
> TODO: 調整 timeout 時間,並作實驗
### System call hooking
* hook: 鉤子,將正常的 system call 跟自己寫的版本掛勾,如此一來當呼叫該 system call 時,就會運行自己寫的版本
* 真正意義上的 system call hooking 比較麻煩,因為要使所有呼叫該 symbol 的人都使用 hooked 的版本,要達成這個目標,會需要插入 kernel module,並且更改 system call table
* 但如果只是想要整個專案範圍下的 hooking,可以簡單的宣告同樣名字的函式,並在該函式中呼叫原始版本的 system call
* 在 linux 中可以利用 [dlsym](https://man7.org/linux/man-pages/man3/dlsym.3.html) 來獲得特定 symbol 的地址,cserv 中使用這個方式來獲得原始 system call 的備份
* 如果使用 gcc 編譯,在 `#include <dlfcn.h>` 之前,需要先 `#define _GNU_SOURCE`,否則會報錯
* 範例程式
```c
#define _GNU_SOURCE
#include <dlfcn.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
typedef ssize_t (*sys_write_t)(int, const void *, size_t);
static sys_write_t sys_write = NULL;
#define HOOK_SYSCALL(name) \
sys_##name = (sys_##name##_t) dlsym(RTLD_NEXT, #name)
ssize_t write(int fd, const void *buf, size_t count)
{
printf("hooked\n");
ssize_t n;
while ((n = sys_write(fd, buf, count)) < 0);
return n;
}
int main()
{
HOOK_SYSCALL(write);
write(STDOUT_FILENO, "Hello\n", strlen("Hello\n"));
return 0;
}
```
編譯的時候記得加入 `-ldl`
執行結果如下
```
$ ./a.out
hooked
Hello
```
其中 HOOK_SYSCALL 參考了 cserv 中的寫法
```c
//@ /src/syscall_hook.c
#define HOOK_SYSCALL(name) \
real_sys_##name = (sys_##name) dlsym(RTLD_NEXT, #name)
//...
HOOK_SYSCALL(connect); // real_sys_connect = (sys_connect) dlsym(RTLD_NEXT, connect)
```
這邊 real_sys_... 就是該 system call 的原始備份,用在每個 hooked system call 中以及像 logger 等不需要 non-blocking 版本的情境
### Master/Worker process
* (理想中)在 1 個 cpu core 運行 1 個 worker process => 驗證見 CPU Affinity 部分
* 利用 mod 把 worker process 分配給各個 cpu
```c
// @ /src/process.c
for (int i = 0; i < g_worker_processes; i++) {
struct process *p = &worker[i];
p->pid = INVALID_PID;
p->cpuid = i % get_ncpu();
}
```
* 每個 process 只有一個 thread => 沒有 thread-pool 設計
* 由一個 Master process 分配多個 worker process 執行工作
* process 間達到 load balancing
* 還在釐清方法
### CPU Affinity
執行 `./cserv start` 啟動伺服器後使用 `ps` 命令可以得到以下結果
```shell
$ ps -ef | grep cserv
# UID PID PPID C STIME TTY TIME CMD
ycwu4142 793 8 0 00:03 pts/0 00:00:00 cserv: master process
ycwu4142 799 793 0 00:03 pts/0 00:00:00 cserv: worker process
ycwu4142 800 793 0 00:03 pts/0 00:00:00 cserv: worker process
ycwu4142 801 793 0 00:03 pts/0 00:00:00 cserv: worker process
ycwu4142 802 793 0 00:03 pts/0 00:00:00 cserv: worker process
ycwu4142 803 793 0 00:03 pts/0 00:00:00 cserv: worker process
ycwu4142 804 793 0 00:03 pts/0 00:00:00 cserv: worker process
ycwu4142 805 793 0 00:03 pts/0 00:00:00 cserv: worker process
ycwu4142 806 793 0 00:03 pts/0 00:00:00 cserv: worker process
```
從上面可得知,預設情況下 cserv 在 8 核的機器上面開了 8 個 worker process,pid 分別是 799 到 806,從 ppid 都是 793 可以證明每個 worker process 都是由 master process fork 出來的
接著撰寫一個簡單的 shell script 來檢視各 worker process 的 cpu affinity
```bash
#!/bin/bash
for i in {799..806};
do
taskset -pc $i
done
```
執行後得到以下結果
```
pid 799's current affinity list: 0
pid 800's current affinity list: 1
pid 801's current affinity list: 2
pid 802's current affinity list: 3
pid 803's current affinity list: 4
pid 804's current affinity list: 5
pid 805's current affinity list: 6
pid 806's current affinity list: 7
```
可以發現每個 worker process 各使用 1 個不同的 cpu core,完整的榨乾運算資源
### Signal
在 Unix, Unix like, 及其他 POSIX 相容的作業系統中,[signal](https://en.wikipedia.org/wiki/Signal_(IPC)) 是一種常用的 IPC(Inter-Process Communication) 方式,可以看作應用程式間的 [interrupt](https://en.wikipedia.org/wiki/Interrupt),用來通知 process 某個事件的發生,並且中斷該 process 的正常流程。
signal 可以藉由多種方式觸發,比如使用 [kill](https://man7.org/linux/man-pages/man2/kill.2.html) systemcall 可以對任意 process 發送任意訊號。
常見的訊號有
* SIGTSTP: 暫停某個 process ,可以藉由 ctrl+z 發送
* SIGKILL: 強制立即終止某個 process
* SIGSEGV: 記憶體區段錯誤時自動發送
* SIGINT: 中斷某個 process,可以藉由 ctrl+c 發送
* SIGQUIT: 終止某個 process 並將記憶體中的資訊存到核心 (core dump)
> [man 7 signal](https://man7.org/linux/man-pages/man7/signal.7.html) 中有給出詳細的訊號列表
process 接收到訊號後會藉由自身的 signal handler 進行處理,以下是 signal handler 的大致流程,可以發現 signal 與 interrupt 不同的點是,前者會轉發回 process 並在 user mode 中處理,後者則是全部在 kernel mode 中交由核心處理
```graphviz
digraph g{
node[shape=record]
rankdir=RR
g [label="{user|...|(2)sighandler()|(3)return|...}|{kernel|(1)dosignal()||(4)system_call()|}"]
}
// User | Kernel
// ---------------------------
// ... -> do signal()
// /
// /
// sighandler <-
// |
// v
// return -> system_call()
// /
// /
// ... <-
```
> 上圖改畫自 [Signals and Inter-Process Communication p.12](http://www.cs.unc.edu/~porter/courses/cse506/s16/slides/ipc.pdf)
因此我們可以藉由 signal handler 來處理某些特定的訊號,比如 SIGINT,預設的行為是終止 process,以下例子使用 [signal](https://man7.org/linux/man-pages/man2/signal.2.html) system call 來註冊當程式接收到 SIGINT 訊號後的行為
```c=
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>
#include <string.h>
static void signal_handler(int signo)
{
char *msg = "\nCaught signal!!\n";
write(STDOUT_FILENO, msg, strlen(msg));
exit(EXIT_SUCCESS);
}
int main(int argc, char *argv[])
{
if (signal(SIGINT, signal_handler) == SIG_ERR) {
printf("error!\n");
return EXIT_FAILURE;
}
for(int i=0;; ++i) {
printf("%d...\n", i);
sleep(1);
}
printf("exit!\n");
return EXIT_SUCCESS;
}
```
> signal handler 中使用 write 而非 printf 是因為 printf 為 non-async-signal-safe,用於訊號處理程式時有機率出錯
> [ref1](https://unix.stackexchange.com/questions/609210/why-printf-is-not-asyc-signal-safe-function)
> [ref2](https://stackoverflow.com/questions/16891019/how-to-avoid-using-printf-in-a-signal-handler)
執行結果如下
```
$ ./signal
0...
1...
2...
^C # 按下 ctrl + c
Caught signal!!
```
如果將第 11 行拿掉,則按下 ctrl + c 後程式不會停止
而因為 signal system call 的作用在不同的系統/版本中可能會有不同的定義,i.e. 缺乏可攜性,更推薦使用 [sigaction](https://man7.org/linux/man-pages/man2/sigaction.2.html) 這個較為通用的 system call
> 來源: [man2 signal](https://man7.org/linux/man-pages/man2/signal.2.html) 中的 warning
下面是使用 sigaction 改寫的版本,輸出結果如上
```c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>
#include <string.h>
static void signal_handler(int signo)
{
char *msg = "\nCaught signal!!\n";
write(STDOUT_FILENO, msg, strlen(msg));
exit(EXIT_SUCCESS);
}
int main(int argc, char *argv[])
{
struct sigaction sa;
sa.sa_handler = signal_handler;
sigemptyset(&sa.sa_mask); // 初始化(清空) signal 的集合
sigaction(SIGINT, &sa, NULL);
for (int i=0; ; ++i) {
printf("%d...\n", i);
sleep(1);
}
printf("exit!\n");
return EXIT_SUCCESS;
}
```
> 範例參考資料
> [gnu c example](https://www.gnu.org/software/libc/manual/html_node/Initial-Signal-Actions.html)
> [Linux进程间通信(一): 信号 signal()、sigaction()](https://www.cnblogs.com/52php/p/5813867.html)
在 cserv 中就利用 sigaction 來處理幾種不同的 signal
並且對部分 signal 作了特殊的定義,如下方程式碼中,將 SIGHUP 當作重新設定的訊號
```c
//@ /src/process.h
enum {
SHUTDOWN_SIGNAL = SIGQUIT,
TERMINATE_SIGNAL = SIGINT,
RECONFIGURE_SIGNAL = SIGHUP
};
```
> SIGHUP: hangup,掛斷,預設行為為終止,出現在當某個使用者登出時,系統會發送此訊號給同一 session 中的所有 process
:::warning
猜測: 可以藉由發送 SIGHUP 訊號來達成在不中止程式的情形下,改變 cserv 監聽的 port 等相關動作
:::
> TODO: 實驗、驗證
> 後來發現程式中並沒有使用到 `RECONFIGURE_SIGNAL`
例如當執行 `./cserv stop` 時,會對 master process 發送 SIGQUIT 訊號
> 其中 read_pidfile 會去讀取 ./conf/cserv.pid,來得到 master process 的 pid
> pid file: 存放 pid 的檔案,通常用來給其他 service 或 daemon 讀取 pid 以便對其發送 signal
```c
//@ /src/main.c
// ...
if (str_equal(argv[1], "stop")) {
send_signal(SHUTDOWN_SIGNAL);
printf("cserv: stopped.\n");
return 0;
}
// ...
static void send_signal(int signo)
{
int pid = read_pidfile();
kill(pid, signo);
}
```
而在 `signal.c` 中,利用 sigaction 將各 signal 與對應的 signal handler 綁定
```c
//@ /src/signal.c
static struct sys_signal signals[] = {{SIGQUIT, "stop", signal_handler},
{SIGTERM, "force-quit", signal_handler},
{SIGINT, "force-quit", signal_handler},
{SIGHUP, "reload", signal_handler},
{SIGCHLD, "default", signal_handler},
{SIGSYS, "ignored", SIG_IGN},
{SIGPIPE, "ignored", SIG_IGN},
{0, "", NULL}};
// ...
__INIT static void signal_init()
{
struct sigaction sa;
for (struct sys_signal *sig = signals; sig->signo; sig++) {
memset(&sa, 0, sizeof(struct sigaction));
sa.sa_handler = sig->handler;
sigemptyset(&sa.sa_mask);
if (sigaction(sig->signo, &sa, NULL) == -1) {
printf("Failed to invoke sigaction(%d)\n", sig->signo);
exit(0);
}
}
}
```
當 master process 收到 SIGQUIT 訊號時,會將全域變數 `g_shall_stop` 設為 1 來告知 master process 要停止了,並向所有 worker process 發送 SIGQUIT 訊號
```c
//@ /src/signal.c
static void master_signal_handler(int signo)
{
switch (signo) {
case SIGQUIT:
g_shall_stop = 1;
break;
case SIGTERM:
case SIGINT:
g_shall_exit = 1;
break;
case SIGCHLD:
worker_process_get_status();
break;
}
}
//@ /src/process.c
void master_process_cycle()
{
//...
for (;;) {
if (g_shall_stop == 1) {
WARN("notify worker processes to stop");
send_signal_to_workers(SHUTDOWN_SIGNAL);
g_shall_stop = 2;
}
//...
}
// ...
}
```
而當 worker process 收到 SIGQUIT 時,也會將他們的 `g_shall_stop` 設為 1,以便在 process 迴圈內接收到停止的命令,然後將 connection 斷開並停止
```c
//@ /src/signal.c
static void worker_signal_handler(int signo)
{
switch (signo) {
case SIGQUIT:
g_shall_stop = 1;
INFO("In worker, received SIGQUIT");
break;
case SIGTERM:
case SIGINT:
g_shall_exit = 1;
INFO("In worker, received SIGTERM/SIGINT");
break;
}
}
//@ /src/process.c
static void worker_accept_cycle(void *args __UNUSED)
{
for (;;) {
if (unlikely(g_shall_stop)) {
set_proc_title("cserv: worker process is shutting down");
decrease_conn_and_check();
break;
}
// ...
}
}
// ...
static inline void decrease_conn_and_check()
{
if (--connection_count == 0)
exit(0);
}
```
這邊 g_shall_stop 變數也被用來檢查 worker prosses 的終止是否正常,若 g_shall_stop 或 g_shall_exit 均為 0,但 worker 依然要停止時就是發生異常
```c
//@ /src/process.c
void worker_exit_handler(int pid)
{
for (int i = 0; i < g_worker_processes; i++) {
struct process *p = &worker[i];
if (p->pid == pid)
p->pid = INVALID_PID;
}
/* If worker process exits without receiving the signal, it means
* abnormal termination.
*/
if (!g_shall_stop && !g_shall_exit)
shall_create_worker = true;
if (worker_empty() && (g_shall_stop || g_shall_exit))
all_workers_exit = true;
}
```
最後 master process 會等到所有 worker process 結束才終止
```c
//@ /src/process.c - master_process_cycle()
if (all_workers_exit) {
WARN("cserv exit now...");
log_scan_write();
delete_pidfile();
exit(0);
}
```
總而言之就是利用 signal 來進行 master/worker process 間的簡單溝通
### Shared memory
* 用途
* 讓 master/worker process 間溝通資訊
* 用來設置 spinlock,在 logger 的初始化、設定各 process 的 accept file descriptor 時使用
* 用來分配 `log_object[]` 的空間,讓各個 logger (processes) 均能夠存取
* 見 `shm.*`
* [解析 Linux 共享記憶體機制](https://hackmd.io/@sysprog/linux-shared-memory#POSIX-%E5%85%B1%E4%BA%AB%E8%A8%98%E6%86%B6%E9%AB%94) 中提到了數個共享記憶體的實作方法,cserv 採用了類似其中 POSIX shared memory 的方式,使用 mmap 來達成
* 因為 master 以及各個 worker process 都是同一支程式 fork 出來的,因此可以使用 mmap 的匿名共享映射機制,來讓各個 process 共用同一塊記憶體
* [mmap](https://man7.org/linux/man-pages/man2/mmap.2.html)
```c
#include <sys/mman.h>
void *mmap(void *addr, size_t length, int prot, int flags,
int fd, off_t offset);
int munmap(void *addr, size_t length);
```
* mmap 的功用是將 fd (file descriptor) 映射到一段虛擬記憶體空間,以便在不使用 read, write 等操作的情形下,對檔案或裝置進行操作 (由對該虛擬記憶體空間進行操作達成)
* 其中 flag 的部分可以選擇 `MAP_PRIVATE` 或著 `MAP_SHARED` 來決定這段記憶體是否能讓別的 process 存取
* 此外,還可以使用 `MAP_ANONYMOUS` 來達到匿名映射,讓 mmap 不對 fd 做記憶體映射(通常將 fd 填入 -1)。
* 於是透過上述的 `MAP_SHARED` 加上 `MAP_ANONYMOUS` 就可以輕鬆地達成親子 process 間的共享記憶體
* 範例程式:
```c
#include <sys/mman.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#define PG_SIZE 32
int main()
{
void *addr = mmap(NULL, PG_SIZE, PROT_READ | PROT_WRITE,
MAP_ANONYMOUS | MAP_SHARED, -1, 0);
char *p = addr;
memset(p, 'A', PG_SIZE);
printf("content: %c %c %c %c\n", p[0], p[1], p[2], p[3]);
int pid = fork();
if (pid == 0){
memset(p, 'B', PG_SIZE);
printf("I'm child.\n");
} else if (pid > 0){
printf("I'm father.\n");
} else {
printf("fork error\n");
exit(EXIT_FAILURE);
}
printf("content: %c %c %c %c\n", p[0], p[1], p[2], p[3]);
if (addr == MAP_FAILED) {
printf("mapping failed\n");
return EXIT_FAILURE;
}
munmap(addr, PG_SIZE);
return EXIT_SUCCESS;
}
```
執行結果
```
content: A A A A
I'm father.
content: B B B B
I'm child.
content: B B B B
```
可以發現本來共享空間中的 A A A A 被子行程改成了 B B B B 後,親代行程也讀到了 B B B B,以此證明該空間是親子行程的共享記憶體空間
* cserv 中實際的使用如下
```c
//@ /src/util/shm.c
struct shm {
char *addr;
size_t size, offset;
spinlock_t lock; /* lock while allocating shared memory */
};
static struct shm *shm_object;
```
首先宣告一個共享記憶體的結構,裡面自帶了一個 spinlock,用來在配置記憶體空間時鎖上,以防止其他 process 同時存取
```c
//@ /src/util/shm.c
/* Allocate without reclaiming */
void *shm_alloc(size_t size_bytes)
{
size_bytes = ROUND_UP(size_bytes, MEM_ALIGN);
spin_lock(&shm_object->lock);
if (unlikely(shm_object->offset + size_bytes > shm_object->size)) {
spin_unlock(&shm_object->lock);
return NULL;
}
char *addr = shm_object->addr + shm_object->offset;
shm_object->offset += size_bytes;
spin_unlock(&shm_object->lock);
return addr;
}
/* unit: page size */
void *shm_pages_alloc(unsigned int pg_count)
{
void *addr = mmap(NULL, pg_count * get_page_size(), PROT_READ | PROT_WRITE,
MAP_ANONYMOUS | MAP_SHARED, -1, 0);
return (addr == MAP_FAILED) ? NULL : addr;
}
```
先看 `shm_pages_alloc`,顧名思義這個函式就是配製一個 page 的共享記憶體空間,其中 page size 取決於每個機器
這邊可以看到 mmap 的使用,大致上跟前面的範例一樣
而 `shm_alloc` 這個函式則是在已經配置的 page 中分配空間出來
假設下圖是一個 shm_page
```graphviz
digraph g{
node[shape=record]
page [label="addr|...|...|<o>||"]
offset [label="addr + offset"]
offset -> page:o
}
```
offset 指向目前的裝到哪邊的位置,size 則是 page size
因此要配置一個 size_bytes 大小的空間,會先將 size_bytes 進位,做記憶體對齊
然後檢查當下的空間夠不夠裝,最後才調整 offset 並返回 addr + offset 的指標,指向這塊空間的起始位置
### Coroutine
> 閱讀 [Life of a HTTP request, as seen by my toy web server](https://tia.mat.br/posts/2014/10/06/life_of_a_http_request.html)
* [coroutine](https://en.wikipedia.org/wiki/Coroutine)
* 跟 thread 很像,但是採用 [cooperative multitasking](https://en.wikipedia.org/wiki/Cooperative_multitasking) 而非 [ preemptively multitasking](https://en.wikipedia.org/wiki/Preemption_(computing)#PREEMPTIVE)
* 擁有自己的 stack 跟 program counter
* 可以利用 dispatcher 輕易的在 thread 之間轉換
* 可以中斷並且繼續執行,跟一般的函式不同
* 是一種在 multi-threading 普及之前產生的概念,優點是不會產生 race-condition
* 通常由 user 控制
* 利用 coroutine 處理各個連線 (而非建立新的 thread)
* 利用可中斷的特性來切換連線
```
| Main loop | | Connection 1 | | Connection 2 |
----------------------------------------------------------
|
|
v
*** --> resume --> ***
***
***
*** <-- yield <-- ***
***
*** ------------> resume -------------> ***
***
*** <------------ yield <------------- ***
***
***
***
*** --> resume --> ***
***
```
> 改繪自 [Life of a HTTP request, as seen by my toy web server](https://tia.mat.br/posts/2014/10/06/life_of_a_http_request.html) 中的 Diagram of main loop plus two coroutines
* 可以看成給 worker 們的 task/job 或者說針對 worker 的專屬排程機制
* timeout 機制
```c
//@ /src/process.c
void worker_process_cycle()
{
if (worker_init_proc && worker_init_proc()) {
ERR("Failed to initialize worker process");
exit(0);
}
schedule_init(g_coro_stack_kbytes, g_worker_connections);
event_loop_init(g_worker_connections);
dispatch_coro(worker_accept_cycle, NULL);
INFO("worker success running...");
schedule_cycle();
}
```
每個 worker 運作時會先派發一個 coroutine 來處理 tcp accept (等待 connection 進來並且處理) 的工作,然後進入 coroutine 排程的循環
```c
//@ /src/process.c
static void worker_accept_cycle(void *args __UNUSED)
{
for (;;) {
if (unlikely(g_shall_stop)) {
set_proc_title("cserv: worker process is shutting down");
decrease_conn_and_check();
break;
}
if (unlikely(g_shall_exit))
exit(0);
int connfd = worker_accept();
if (likely(connfd > 0)) {
if (dispatch_coro(handle_connection, (void *) (intptr_t) connfd)) {
WARN("system busy to handle request.");
close(connfd);
continue;
}
increase_conn();
} else if (connfd == 0) {
schedule_timeout(200);
continue;
}
}
}
```
而在 worker_accept_cycle 中,如果 worker 有新的連線進入,則分派一個 coroutine 來處理,如果無法立即處理則呼叫 `schedule_timeout()` 將該 coroutine 放進 inactive list 中等待, timeout 時再行處理
```c
//@ /src/coro/sched.c
void schedule_cycle()
{
for (;;) {
check_timeout_coroutine();
run_active_coroutine();
sched.policy(get_recent_timespan());
run_active_coroutine();
}
}
```
```c
//@ /src/coro/sched.c
static void check_timeout_coroutine()
{
struct timer_node *node;
long long now = get_curr_mseconds();
while ((node = get_recent_timer_node())) {
if (now < node->timeout)
return;
rb_erase(&node->node, &sched.inactive);
timeout_coroutine_handler(node);
memcache_free(sched.cache, node);
}
}
static inline void timeout_coroutine_handler(struct timer_node *node)
{
struct rb_node *recent;
while ((recent = node->root.rb_node)) {
struct coroutine *coro = container_of(recent, struct coroutine, node);
rb_erase(recent, &node->root);
coro->active_by_timeout = 1;
move_to_active_list_tail_direct(coro);
}
}
```
每個排程循環都會先檢查有沒有等待超過 timeout 的 coroutine
如果有的話,就將它從 inactive list 中轉移到 active list 的尾端
並且執行 active list 頭部的 coroutine (替換調目前的工作,讓整個排程持續前進)
`sched.policy(get_recent_timespan())` 這邊會對應到 `event_cycle`
> (`sched.policy = event_cycle;`)
```c
//@ /src/event.c
void event_cycle(int ms /* in milliseconds */)
{
int value = epoll_wait(evloop.epoll_fd, evloop.ev, evloop.max_conn, ms);
if (value <= 0)
return;
for (int i = 0; i < value; i++) {
struct epoll_event *ev = &evloop.ev[i];
struct fd_event *fe = &evloop.array[ev->data.fd];
fe->proc(fe->args);
}
}
```
在 event cycle 中利用 epoll 來監聽各個事件,並在觸發事件時通知。
這邊 `epoll_wait()` 會佔用 `get_recent_timespan()` 回傳的值,也就是距離目前的 timer_node timeout 還剩下的時間
```c
//@ /src/coro/sched.c
static inline int get_recent_timespan()
{
struct timer_node *node = get_recent_timer_node();
if (!node)
return 10 * 1000; /* at least 10 seconds */
int timespan = node->timeout - get_curr_mseconds();
return (timespan < 0) ? 0 : timespan;
}
```
> [man epoll_wait(2)](https://man7.org/linux/man-pages/man2/epoll_wait.2.html)
> The timeout argument specifies the number of milliseconds that epoll_wait() will block.
這段時間中若有事件觸發,則結束等待,切換處理下一個 coroutine (透過 `run_active_coroutine()`)
:::warning
這樣算是 preemptive 或者是 non-preemptive 的排程呢?
我認為對於 kernel 來說是 non-preemptive,而整個排程機制則是 preemptive 的
有點矛盾,不知道如何解釋
:::
* fast task switching => 重寫 context switch 組合語言,針對此程式達到更高效率的 task switching
* 見 `src/coro/switch.*`
* 在 coroutine 排程中利用紅黑樹進行更高效率的搜尋
* 見 `src/coro/sched.*`
### Memcache
> 參見 [Site Cache vs Browser Cache vs Server Cache: What’s the Difference?](https://wp-rocket.me/blog/different-types-of-caching/)
* 實作一套 memcache 系統
* 用來處理 Server Cache,在伺服器端先將網頁內容預先加載好,以便在日後接收到 request 時可以立即反應
* 在 cserv 中用於兩個地方
* caching http_request
* server cache
* 加速 request 的處理
* 每次進入 http_request_handler 時都先從 memcache 中尋找,更改之後再存回去 memcache 中
* caching coroutine 排程中的 timer node
* active 的 timer node 被放進 memcache
* 用來判斷這個 timer 是否為 active
* 利用紅黑樹管理 inactive 的 timer,一旦 active 就將這個 timer 放進 memcache 以便快速存取
* 程式碼分析
* 站在呼叫端的角度命名
* memcache_alloc: 從 memcache 中提取 element 出來
```c
//@ /src/util/memcache.c
void *memcache_alloc(struct memcache *cache)
{
return (cache->curr) ? remove_element(cache) : malloc(cache->obj_size);
}
static inline void *remove_element(struct memcache *cache)
{
return cache->elements[--cache->curr];
}
```
* memcache_free: 把 element 放進 memcache
```c
//@ /src/util/memcache.c
void memcache_free(struct memcache *cache, void *element)
{
add_element(cache, element);
}
static inline void add_element(struct memcache *cache, void *element)
{
if (cache->curr < cache->cache_size) {
cache->elements[cache->curr++] = element;
return;
}
free(element);
}
```
:::warning
不了解的地方:
```c=43
// @/src/util/memcache.c
max_cache_size >>= 2;
while (cache->curr < max_cache_size) {
void *element = malloc(cache->obj_size);
if (!element) {
free_pool(cache);
return NULL;
}
add_element(cache, element);
}
```
為何 curr 的值要是 max_cache_size 的 1/4?
:::
### Logger
* 利用 circular buffer 實作一套 logger 系統
* 運用 circular buffer 的意義在於當 log 數量超過限制的時候,可以有效的剔除(覆蓋)舊的資料,即 circular 的特性
* circular buffer 的實作位於 `/src/util/cirbuf.h`
* 預設寫入 `/tmp/cserv.log` 檔案,更改 `/conf/cserv.log` 中的 `log_path` 來設定
* 分為 `CRIT`, `ERR`, `WARN`, `INFO` 四種等級
* `/conf/cserv.conf` 中的 `log_level` 為要寫入 log 檔案中的臨界點 (threshold)
* cser 簡單的運用 enum 的性質來處理 threshold 的判斷
```c
//@ /src/logger.c
void log_out(enum LOG_LEVEL level,
const char *file,
int line,
const char *fmt,
...)
{
if (level > log_level)
return;
// ...
}
```
* 利用 cserv 的每個 process 來作為 logger,因此會有 worker process + 1 個 logger
* 由呼叫 log_out 的 process 自行處理自己的 log
* 利用 shared memory 來讓每個 logger 共通 `log_object[]`,以便作業
* log_object 可視為 logger 陣列,定義了每個 logger 的 pid, logger 的 circular buffer 以及一個決定是否要 flush 的 flag
* 驗證正確性:預計單獨把 logger 拉出來,建立一套具備 scalability 以及 portability 的 log 套件
* 剛好最近的其他專案需要,一石二鳥
### Http
* 實作了一套解析 http 表頭的 parser
* 見 /src/http/*
## 改動
### syscall_hook.h
把 `syscall_hook.c` 中的型態宣告以及 `real_sys_write` 搬到專屬的標頭檔
* 目的:讓 `logger.c` 在調用 `real_sys_write` 時不需要再做多餘的宣告
```c
//@ /src/syscall_hook.h
#ifndef CORE_SYSCALL_HOOK_H
#define CORE_SYSCALL_HOOK_H
#include <sys/socket.h>
typedef int (*sys_connect)(int sockfd,
const struct sockaddr *addr,
socklen_t addrlen);
typedef int (*sys_accept)(int sockfd,
struct sockaddr *addr,
socklen_t *addrlen);
typedef ssize_t (*sys_read)(int fd, void *buf, size_t count);
typedef ssize_t (*sys_recv)(int sockfd, void *buf, size_t len, int flags);
typedef ssize_t (*sys_write)(int fd, const void *buf, size_t count);
typedef ssize_t (*sys_send)(int sockfd, const void *buf, size_t len, int flags);
/* declared in syscall_hook.c */
extern sys_write real_sys_write;
#endif
```
## TODO Note
- [ ] shm 用來在不中斷程式的情形下對其做一系列操作
- [ ] 驗證 memcache 實作的正確性
- [ ] 驗證 logger 實作的正確性
- [ ] syscall_hook.c 中攔截 6 個系統呼叫,利用 timeout 達成 non-blocking,注意 1) timeout 時間長度 2) 是否需要攔截其他用到的系統呼叫
- [ ] 研究 signal.c 中 master 跟 worker 的交互溝通
- [ ] 如何動態變更 listen 的 port
- [ ] /http 中的 Keep alive 部分功能是否完整
- [ ] epoll 的 LT/ET 考量
- [ ] /http 中 switch-case 換成 computed goto 的效能提升
- [ ] 參考/比較 [lwan](https://github.com/lpereira/lwan) 等專案