---
title: 2024 年 Linux 核心設計/實作課程作業 —— ktcp
image: https://repository-images.githubusercontent.com/249636967/4e4e8900-6e20-11ea-9abf-57f8123e9c66
description: 檢驗學員對 Linux 核心 kthread 和 workqueue 處理機制的認知
tags: linux2024
---
# L11: ktcp
> 主講人: [jserv](https://wiki.csie.ncku.edu.tw/User/jserv) / 課程討論區: [2024 年系統軟體課程](https://www.facebook.com/groups/system.software2024/)
:mega: 返回「[Linux 核心設計/實作](https://wiki.csie.ncku.edu.tw/linux/schedule)」課程進度表
## cserv 高效網頁伺服器
[cserv](https://github.com/sysprog21/cserv) 是個高效的網頁伺服器,採用非阻塞式 I/O 的事件驅動架構。單執行緒、支援多核並善用 CPU affinity。
### 執行方法
* 編譯
```shell
$ make
```
* 執行
```shell
$ ./cserv start
```
* 結束
```shell
$ ./cserv stop
```
* 檢查設定
```shell
$ ./cserv conf
```
其中設定檔位於 `/conf/cserv.conf`,可以設定 log 路徑、等級、worker process 數量、每個 worker 最大連線數、coroutine stack 大小、監聽連接埠以及 HTTP request line 跟 header 的大小
### 原始程式碼目錄結構
* src/coro 目錄中為 coroutine 相關程式碼
* src/http 目錄中為 http parsing 相關程式碼
* src/util 目錄中為一些資料結構跟功能(red-black tree, hashtable, circular buffer, memcache, shared memory, spinlock)
## 壓力測試與比較
### 工具
* [ab (apache benching tool)](https://httpd.apache.org/docs/current/programs/ab.html)
* [httperf](https://github.com/httperf/httperf)
* [htstress](https://github.com/sysprog21/khttpd/blob/master/htstress.c)
### 比較對象
* [lwan](https://github.com/lpereira/lwan)
* cserv 參考的對象
* [nginx](https://github.com/nginx/nginx)
* 市占率第一的網頁伺服器
* [apache](https://github.com/apache/httpd)
* 老牌網頁伺服器
> 市占率來源: [w3techs, 25 May 2022](https://w3techs.com/technologies/overview/web_server)
### 實驗結果
#### ab
100000 requests with 500 requests at a time(concurrency)
`ab -n 100000 -c 500 -k http://127.0.0.1:8081/`
| | Requests per second | Time per request (mean, across all concurrent requests)|
| -------- | -------- | -------- |
| cserv | 17763.63 #/sec | 0.056 ms |
| lwan | 15960.17 #/sec | 0.063 ms |
| nginx | 53021.65 #/sec | 0.019 ms |
| apache2 | 23558.77 #/sec | 0.042 ms |
#### htstress
100000 requests with 500 requests at a time(concurrency)
`./htstress -n 100000 -c 500 127.0.0.1:8081/`
| | requests/sec | total time |
| -------- | -------- | -------- |
| cserv | 21457.851 #/sec | 4.660 s |
| lwan | 18758.664 #/sec | 5.331 s |
| nginx | 17322.661 #/sec | 5.773 s |
| apache2 | 15122.679 #/sec | 6.613 s |
#### httperf
total 100000 requests with 500 connections
> 200 * 500 = 100000
`httperf --server 127.0.0.1 --port 8081 --num-conn 500 --num-call 200 --http-version 1.0`
| | requests/sec | connection rate |
| -------- | -------- | -------- |
| cserv | 21872.7 #/sec | 10936.3 conn/s |
| lwan | 27230.0 #/sec | 136.1 conn/s |
| nginx | 16056.9 #/sec | 159.0 conn/s |
| apache2 | 10087.8 #/sec | 98.9 conn/s |
## Non-blocking I/O multiplexing
* non-blocking: 利用 timer 達成,攔截(hook)阻塞式的 `socket` 系統呼叫,使其在 timer 終止時結束
* 6 個系統呼叫
```c
// @ /src/syscall_hook.h
typedef int (*sys_connect)(int sockfd,
const struct sockaddr *addr,
socklen_t addrlen);
typedef int (*sys_accept)(int sockfd,
struct sockaddr *addr,
socklen_t *addrlen);
typedef ssize_t (*sys_read)(int fd, void *buf, size_t count);
typedef ssize_t (*sys_recv)(int sockfd, void *buf, size_t len, int flags);
typedef ssize_t (*sys_write)(int fd, const void *buf, size_t count);
typedef ssize_t (*sys_send)(int sockfd, const void *buf, size_t len, int flags);
```
* 舉 read 為例,timeout 時便回傳錯誤,達到 non-blocking 特性
```c
// @ /src/syscall_hook.c
ssize_t read(int fd, void *buf, size_t count)
{
ssize_t n;
while ((n = real_sys_read(fd, buf, count)) < 0) {
if (EINTR == errno)
continue;
if (!fd_not_ready())
return -1;
if (add_fd_event(fd, EVENT_READABLE, event_rw_callback, current_coro()))
return -2;
schedule_timeout(READ_TIMEOUT);
del_fd_event(fd, EVENT_READABLE);
if (is_wakeup_by_timeout()) {
errno = ETIME;
return -3;
}
}
return n;
}
```
* I/O multiplexing: 利用強大的 [epoll](https://man7.org/linux/man-pages/man7/epoll.7.html),透過監聽 events 來決定處理的 I/O
* timeout 時間設定(單位:毫秒)
```c
//@ /src/syscall_hook.c
#define CONN_TIMEOUT (10 * 1000)
#define ACCEPT_TIMEOUT (3 * 1000)
#define READ_TIMEOUT (10 * 1000)
#define RECV_TIMEOUT (10 * 1000)
#define WRITE_TIMEOUT (10 * 1000)
#define SEND_TIMEOUT (10 * 1000)
#define KEEP_ALIVE 60
```
### System call hooking
* hook: 鉤子,將正常的系統呼叫跟自己寫的版本掛勾,如此一來當呼叫該 system call 時,就會運行自己寫的版本
* 真正意義上的 system call hooking 較麻煩,因為要使所有呼叫該符號者都使用 hooked 的版本,要達成這個目標,會需要插入核心模組,並更改 system call table
* 但若只是想要整個專案範圍下的 hooking,可簡單的宣告同樣名字的函式,並在該函式中呼叫原始版本的系統呼叫
* 在 Linux 中可用 [dlsym(3)](https://man7.org/linux/man-pages/man3/dlsym.3.html) 來獲得特定符號的地址,cserv 用此法來獲得原始系統呼叫函式的進入點
* 如果使用 gcc 編譯,在 `#include <dlfcn.h>` 之前,需要先 `#define _GNU_SOURCE`,否則會遇到編譯錯誤
* 範例程式
```c
#define _GNU_SOURCE
#include <dlfcn.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
typedef ssize_t (*sys_write_t)(int, const void *, size_t);
static sys_write_t sys_write = NULL;
#define HOOK_SYSCALL(name) \
sys_##name = (sys_##name##_t) dlsym(RTLD_NEXT, #name)
ssize_t write(int fd, const void *buf, size_t count)
{
printf("hooked\n");
ssize_t n;
while ((n = sys_write(fd, buf, count)) < 0);
return n;
}
int main()
{
HOOK_SYSCALL(write);
write(STDOUT_FILENO, "Hello\n", strlen("Hello\n"));
return 0;
}
```
編譯時記得加入 `-ldl`
執行結果如下
```
$ ./a.out
hooked
Hello
```
其中 HOOK_SYSCALL 參考 cserv 中的寫法
```c
//@ /src/syscall_hook.c
#define HOOK_SYSCALL(name) \
real_sys_##name = (sys_##name) dlsym(RTLD_NEXT, #name)
//...
HOOK_SYSCALL(connect); // real_sys_connect = (sys_connect) dlsym(RTLD_NEXT, connect)
```
這邊 real_sys_... 就是該 system call 的原始備份,用在每個 hooked system call 中以及像 logger 等不需要 non-blocking 版本的情境
### Master/Worker process
* 在 1 個 cpu core 運行 1 個 worker process
* 利用 mod 把 worker process 分配給各個 cpu
```c
// @ /src/process.c
for (int i = 0; i < g_worker_processes; i++) {
struct process *p = &worker[i];
p->pid = INVALID_PID;
p->cpuid = i % get_ncpu();
}
```
* 每個 process 只有一個 thread => 沒有 thread-pool 設計
* 由一個 Master process 分配多個 worker process 執行工作
* process 間達到 load balancing
### CPU Affinity
執行 `./cserv start` 啟動伺服器後使用 `ps` 命令可以得到以下結果
```shell
$ ps -ef | grep cserv
# UID PID PPID C STIME TTY TIME CMD
ycwu4142 793 8 0 00:03 pts/0 00:00:00 cserv: master process
ycwu4142 799 793 0 00:03 pts/0 00:00:00 cserv: worker process
ycwu4142 800 793 0 00:03 pts/0 00:00:00 cserv: worker process
ycwu4142 801 793 0 00:03 pts/0 00:00:00 cserv: worker process
ycwu4142 802 793 0 00:03 pts/0 00:00:00 cserv: worker process
ycwu4142 803 793 0 00:03 pts/0 00:00:00 cserv: worker process
ycwu4142 804 793 0 00:03 pts/0 00:00:00 cserv: worker process
ycwu4142 805 793 0 00:03 pts/0 00:00:00 cserv: worker process
ycwu4142 806 793 0 00:03 pts/0 00:00:00 cserv: worker process
```
從上面可得知,預設情況下 cserv 在 8 核的機器上建立 8 個 worker process,pid 分別是 799 到 806,從 ppid 都是 793 可以證明每個 worker process 都是由 master process fork 出來。
接著撰寫一個簡單的 shell script 來檢視各 worker process 的 cpu affinity
```bash
#!/bin/bash
for i in {799..806};
do
taskset -pc $i
done
```
執行後得到以下結果
```
pid 799's current affinity list: 0
pid 800's current affinity list: 1
pid 801's current affinity list: 2
pid 802's current affinity list: 3
pid 803's current affinity list: 4
pid 804's current affinity list: 5
pid 805's current affinity list: 6
pid 806's current affinity list: 7
```
可以發現每個 worker process 各使用 1 個不同的 cpu core,充分運用運算資源。
### Signal
在 Unix, Unix like, 及其他 POSIX 相容的作業系統中,[signal](https://en.wikipedia.org/wiki/Signal_(IPC)) 是一種常用的 IPC(Inter-Process Communication) 方式,可以看作應用程式間的 [interrupt](https://en.wikipedia.org/wiki/Interrupt),用來通知 process 某個事件的發生,並且中斷該 process 的正常流程。
signal 可藉由多種方式觸發,比如使用 [kill](https://man7.org/linux/man-pages/man2/kill.2.html) systemcall 可以對任意 process 發送任意訊號。
常見的訊號有
* SIGTSTP: 暫停某個 process ,可以藉由 ctrl+z 發送
* SIGKILL: 強制立即終止某個 process
* SIGSEGV: 記憶體區段錯誤時自動發送
* SIGINT: 中斷某個 process,可以藉由 ctrl+c 發送
* SIGQUIT: 終止某個 process 並將記憶體中的資訊存到核心 (core dump)
> [man 7 signal](https://man7.org/linux/man-pages/man7/signal.7.html) 中有給出詳細的訊號列表
process 接收到訊號後會藉由自身的 signal handler 進行處理,以下是 signal handler 的大致流程,可以發現 signal 與 interrupt 不同的點是,前者會轉發回 process 並在 user mode 中處理,後者則是全部在 kernel mode 中交由核心處理
```graphviz
digraph g{
node[shape=record]
rankdir=RR
g [label="{user|...|(2)sighandler()|(3)return|...}|{kernel|(1)dosignal()||(4)system_call()|}"]
}
// User | Kernel
// ---------------------------
// ... -> do signal()
// /
// /
// sighandler <-
// |
// v
// return -> system_call()
// /
// /
// ... <-
```
> 上圖改畫自 [Signals and Inter-Process Communication p.12](http://www.cs.unc.edu/~porter/courses/cse506/s16/slides/ipc.pdf)
因此我們可以藉由 signal handler 來處理某些特定的訊號,比如 SIGINT,預設的行為是終止 process,以下例子使用 [signal](https://man7.org/linux/man-pages/man2/signal.2.html) system call 來註冊當程式接收到 SIGINT 訊號後的行為
```c=
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>
#include <string.h>
static void signal_handler(int signo)
{
char *msg = "\nCaught signal!!\n";
write(STDOUT_FILENO, msg, strlen(msg));
exit(EXIT_SUCCESS);
}
int main(int argc, char *argv[])
{
if (signal(SIGINT, signal_handler) == SIG_ERR) {
printf("error!\n");
return EXIT_FAILURE;
}
for(int i=0;; ++i) {
printf("%d...\n", i);
sleep(1);
}
printf("exit!\n");
return EXIT_SUCCESS;
}
```
> signal handler 中使用 `write` 而非 printf 是後者為 non-async-signal-safe,用於訊號處理程式時可能會出錯
執行結果如下
```
$ ./signal
0...
1...
2...
^C # 按下 ctrl + c
Caught signal!!
```
如果將第 11 行拿掉,則按下 ctrl + c 後程式不會停止
而因為 signal 系統的作用在不同的系統/版本中可能會有不同的定義,考慮到可攜性,Linux 建議用 [sigaction](https://man7.org/linux/man-pages/man2/sigaction.2.html) 這個較為通用的系統呼叫
> 參見: [signal(2)](https://man7.org/linux/man-pages/man2/signal.2.html) 中的 warning
下面是使用 sigaction 改寫的版本,輸出結果如上
```c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>
#include <string.h>
static void signal_handler(int signo)
{
char *msg = "\nCaught signal!!\n";
write(STDOUT_FILENO, msg, strlen(msg));
exit(EXIT_SUCCESS);
}
int main(int argc, char *argv[])
{
struct sigaction sa;
sa.sa_handler = signal_handler;
sigemptyset(&sa.sa_mask); // 初始化(清空) signal 的集合
sigaction(SIGINT, &sa, NULL);
for (int i=0; ; ++i) {
printf("%d...\n", i);
sleep(1);
}
printf("exit!\n");
return EXIT_SUCCESS;
}
```
在 cserv 中就利用 sigaction 來處理幾種不同的 signal
並且對部分 signal 作了特殊的定義,如下方程式碼中,將 SIGHUP 當作重新設定的訊號
```c
//@ /src/process.h
enum {
SHUTDOWN_SIGNAL = SIGQUIT,
TERMINATE_SIGNAL = SIGINT,
RECONFIGURE_SIGNAL = SIGHUP
};
```
> SIGHUP: hangup,掛斷,預設行為為終止,出現在當某個使用者登出時,系統會發送此訊號給同一 session 中的所有 process
例如當執行 `./cserv stop` 時,會對 master process 發送 SIGQUIT 訊號
> 其中 read_pidfile 會去讀取 ./conf/cserv.pid,來得到 master process 的 pid
> pid file: 存放 pid 的檔案,通常用來給其他 service 或 daemon 讀取 pid 以便對其發送 signal
```c
//@ /src/main.c
// ...
if (str_equal(argv[1], "stop")) {
send_signal(SHUTDOWN_SIGNAL);
printf("cserv: stopped.\n");
return 0;
}
// ...
static void send_signal(int signo)
{
int pid = read_pidfile();
kill(pid, signo);
}
```
而在 `signal.c` 中,利用 sigaction 將各 signal 與對應的 signal handler 綁定
```c
//@ /src/signal.c
static struct sys_signal signals[] = {
{SIGQUIT, "stop", signal_handler},
{SIGTERM, "force-quit", signal_handler},
{SIGINT, "force-quit", signal_handler},
{SIGHUP, "reload", signal_handler},
{SIGCHLD, "default", signal_handler},
{SIGSYS, "ignored", SIG_IGN},
{SIGPIPE, "ignored", SIG_IGN},
{0, "", NULL},
};
// ...
__INIT static void signal_init()
{
struct sigaction sa;
for (struct sys_signal *sig = signals; sig->signo; sig++) {
memset(&sa, 0, sizeof(struct sigaction));
sa.sa_handler = sig->handler;
sigemptyset(&sa.sa_mask);
if (sigaction(sig->signo, &sa, NULL) == -1) {
printf("Failed to invoke sigaction(%d)\n", sig->signo);
exit(0);
}
}
}
```
當 master process 收到 SIGQUIT 訊號時,會將全域變數 `g_shall_stop` 設為 1 來告知 master process 要停止,並向所有 worker process 發送 SIGQUIT 訊號
```c
//@ /src/signal.c
static void master_signal_handler(int signo)
{
switch (signo) {
case SIGQUIT:
g_shall_stop = 1;
break;
case SIGTERM:
case SIGINT:
g_shall_exit = 1;
break;
case SIGCHLD:
worker_process_get_status();
break;
}
}
//@ /src/process.c
void master_process_cycle()
{
//...
for (;;) {
if (g_shall_stop == 1) {
WARN("notify worker processes to stop");
send_signal_to_workers(SHUTDOWN_SIGNAL);
g_shall_stop = 2;
}
//...
}
// ...
}
```
而當 worker process 收到 SIGQUIT 時,也會將其 `g_shall_stop` 設為 1,以便在 process 迴圈內接收到停止的命令,然後斷線並停止。
```c
//@ /src/signal.c
static void worker_signal_handler(int signo)
{
switch (signo) {
case SIGQUIT:
g_shall_stop = 1;
INFO("In worker, received SIGQUIT");
break;
case SIGTERM:
case SIGINT:
g_shall_exit = 1;
INFO("In worker, received SIGTERM/SIGINT");
break;
}
}
//@ /src/process.c
static void worker_accept_cycle(void *args __UNUSED)
{
for (;;) {
if (unlikely(g_shall_stop)) {
set_proc_title("cserv: worker process is shutting down");
decrease_conn_and_check();
break;
}
// ...
}
}
// ...
static inline void decrease_conn_and_check()
{
if (--connection_count == 0)
exit(0);
}
```
這邊 `g_shall_stop` 變數也被用來檢查 worker prosses 的終止是否正常,若 `g_shall_stop` 或 `g_shall_exit` 均為 0,但 worker 依然要停止時就是發生異常
```c
//@ /src/process.c
void worker_exit_handler(int pid)
{
for (int i = 0; i < g_worker_processes; i++) {
struct process *p = &worker[i];
if (p->pid == pid)
p->pid = INVALID_PID;
}
/* If worker process exits without receiving the signal, it means
* abnormal termination.
*/
if (!g_shall_stop && !g_shall_exit)
shall_create_worker = true;
if (worker_empty() && (g_shall_stop || g_shall_exit))
all_workers_exit = true;
}
```
最後 master process 會等到所有 worker process 結束才終止
```c
//@ /src/process.c - master_process_cycle()
if (all_workers_exit) {
WARN("cserv exit now...");
log_scan_write();
delete_pidfile();
exit(0);
}
```
總而言之就是利用 signal 來進行 master/worker process 間的通訊。
### Shared memory
* 用途
* 讓 master/worker process 間溝通資訊
* 用來設置 spinlock,在 logger 的初始化、設定各 process 的 accept file descriptor 時使用
* 用來分配 `log_object[]` 的空間,讓各個 logger (processes) 均能夠存取
* 見 `shm.*`
* [解析 Linux 共享記憶體機制](https://hackmd.io/@sysprog/linux-shared-memory#POSIX-%E5%85%B1%E4%BA%AB%E8%A8%98%E6%86%B6%E9%AB%94) 中提到數個共享記憶體的實作方法,cserv 採用類似其中 POSIX shared memory 的方式,使用 mmap 來達成
* 因為 master 以及各個 worker process 都是同一支程式 fork 出來的,因此可以使用 mmap 的匿名共享映射機制,來讓各個 process 共用同一塊記憶體
* [mmap](https://man7.org/linux/man-pages/man2/mmap.2.html)
```c
#include <sys/mman.h>
void *mmap(void *addr, size_t length, int prot, int flags,
int fd, off_t offset);
int munmap(void *addr, size_t length);
```
* mmap 的功用是將 fd (file descriptor) 映射到一段虛擬記憶體空間,以便在不使用 read, write 等操作的情形下,對檔案或裝置進行操作 (由對該虛擬記憶體空間進行操作達成)
* 其中 flag 的部分可以選擇 `MAP_PRIVATE` 或著 `MAP_SHARED` 來決定這段記憶體是否能讓別的 process 存取
* 此外,還可以使用 `MAP_ANONYMOUS` 來達到匿名映射,讓 mmap 不對 fd 做記憶體映射(通常將 fd 填入 -1)。
* 於是透過上述的 `MAP_SHARED` 加上 `MAP_ANONYMOUS` 就可以輕鬆地達成親子 process 間的共享記憶體
* 範例程式:
```c
#include <sys/mman.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#define PG_SIZE 32
int main()
{
void *addr = mmap(NULL, PG_SIZE, PROT_READ | PROT_WRITE,
MAP_ANONYMOUS | MAP_SHARED, -1, 0);
char *p = addr;
memset(p, 'A', PG_SIZE);
printf("content: %c %c %c %c\n", p[0], p[1], p[2], p[3]);
int pid = fork();
if (pid == 0){
memset(p, 'B', PG_SIZE);
printf("I'm child.\n");
} else if (pid > 0){
printf("I'm father.\n");
} else {
printf("fork error\n");
exit(EXIT_FAILURE);
}
printf("content: %c %c %c %c\n", p[0], p[1], p[2], p[3]);
if (addr == MAP_FAILED) {
printf("mapping failed\n");
return EXIT_FAILURE;
}
munmap(addr, PG_SIZE);
return EXIT_SUCCESS;
}
```
執行結果
```
content: A A A A
I'm father.
content: B B B B
I'm child.
content: B B B B
```
可以發現本來共享空間中的 A A A A 被子行程改成了 B B B B 後,親代行程也讀到了 B B B B,以此證明該空間是親子行程的共享記憶體空間
* cserv 中實際的使用如下
```c
//@ /src/util/shm.c
struct shm {
char *addr;
size_t size, offset;
spinlock_t lock; /* lock while allocating shared memory */
};
static struct shm *shm_object;
```
首先宣告一個共享記憶體的結構,裡面自帶了一個 spinlock,用來在配置記憶體空間時鎖上,以防止其他 process 同時存取
```c
//@ /src/util/shm.c
/* Allocate without reclaiming */
void *shm_alloc(size_t size_bytes)
{
size_bytes = ROUND_UP(size_bytes, MEM_ALIGN);
spin_lock(&shm_object->lock);
if (unlikely(shm_object->offset + size_bytes > shm_object->size)) {
spin_unlock(&shm_object->lock);
return NULL;
}
char *addr = shm_object->addr + shm_object->offset;
shm_object->offset += size_bytes;
spin_unlock(&shm_object->lock);
return addr;
}
/* unit: page size */
void *shm_pages_alloc(unsigned int pg_count)
{
void *addr = mmap(NULL, pg_count * get_page_size(), PROT_READ | PROT_WRITE,
MAP_ANONYMOUS | MAP_SHARED, -1, 0);
return (addr == MAP_FAILED) ? NULL : addr;
}
```
先看 `shm_pages_alloc`,顧名思義這個函式就是配製一個 page 的共享記憶體空間,其中 page size 取決於每個機器
這邊可以看到 mmap 的使用,大致上跟前面的範例一樣
而 `shm_alloc` 這個函式則是在已經配置的 page 中分配空間出來
假設下圖是一個 shm_page
```graphviz
digraph g{
node[shape=record]
page [label="addr|...|...|<o>||"]
offset [label="addr + offset"]
offset -> page:o
}
```
offset 指向目前的裝到哪邊的位置,size 則是 page size
因此要配置一個 size_bytes 大小的空間,會先將 size_bytes 進位,做記憶體對齊
然後檢查當下的空間夠不夠裝,最後才調整 offset 並返回 addr + offset 的指標,指向這塊空間的起始位置
### Coroutine
> 閱讀 [Life of a HTTP request, as seen by my toy web server](https://tia.mat.br/posts/2014/10/06/life_of_a_http_request.html)
* [coroutine](https://en.wikipedia.org/wiki/Coroutine)
* 跟 thread 很像,但是採用 [cooperative multitasking](https://en.wikipedia.org/wiki/Cooperative_multitasking) 而非 [ preemptively multitasking](https://en.wikipedia.org/wiki/Preemption_(computing)#PREEMPTIVE)
* 擁有自己的 stack 跟 program counter
* 可以利用 dispatcher 輕易的在 thread 之間轉換
* 可以中斷並且繼續執行,跟一般的函式不同
* 是一種在 multi-threading 普及之前產生的概念,優點是不會產生 race-condition
* 通常由 user 控制
* 利用 coroutine 處理各個連線 (而非建立新的 thread)
* 利用可中斷的特性來切換連線
```
| Main loop | | Connection 1 | | Connection 2 |
----------------------------------------------------------
|
|
v
*** --> resume --> ***
***
***
*** <-- yield <-- ***
***
*** ------------> resume -------------> ***
***
*** <------------ yield <------------- ***
***
***
***
*** --> resume --> ***
***
```
> 改繪自 [Life of a HTTP request, as seen by my toy web server](https://tia.mat.br/posts/2014/10/06/life_of_a_http_request.html) 中的 Diagram of main loop plus two coroutines
* 可以看成給 worker 們的 task/job 或者說針對 worker 的專屬排程機制
* timeout 機制
```c
//@ /src/process.c
void worker_process_cycle()
{
if (worker_init_proc && worker_init_proc()) {
ERR("Failed to initialize worker process");
exit(0);
}
schedule_init(g_coro_stack_kbytes, g_worker_connections);
event_loop_init(g_worker_connections);
dispatch_coro(worker_accept_cycle, NULL);
INFO("worker success running...");
schedule_cycle();
}
```
每個 worker 運作時會先派發一個 coroutine 來處理 tcp accept (等待 connection 進來並且處理) 的工作,然後進入 coroutine 排程的循環
```c
//@ /src/process.c
static void worker_accept_cycle(void *args __UNUSED)
{
for (;;) {
if (unlikely(g_shall_stop)) {
set_proc_title("cserv: worker process is shutting down");
decrease_conn_and_check();
break;
}
if (unlikely(g_shall_exit))
exit(0);
int connfd = worker_accept();
if (likely(connfd > 0)) {
if (dispatch_coro(handle_connection, (void *) (intptr_t) connfd)) {
WARN("system busy to handle request.");
close(connfd);
continue;
}
increase_conn();
} else if (connfd == 0) {
schedule_timeout(200);
continue;
}
}
}
```
而在 worker_accept_cycle 中,如果 worker 有新的連線進入,則分派一個 coroutine 來處理,如果無法立即處理則呼叫 `schedule_timeout()` 將該 coroutine 放進 inactive list 中等待, timeout 時再行處理
```c
//@ /src/coro/sched.c
void schedule_cycle()
{
for (;;) {
check_timeout_coroutine();
run_active_coroutine();
sched.policy(get_recent_timespan());
run_active_coroutine();
}
}
```
```c
//@ /src/coro/sched.c
static void check_timeout_coroutine()
{
struct timer_node *node;
long long now = get_curr_mseconds();
while ((node = get_recent_timer_node())) {
if (now < node->timeout)
return;
rb_erase(&node->node, &sched.inactive);
timeout_coroutine_handler(node);
memcache_free(sched.cache, node);
}
}
static inline void timeout_coroutine_handler(struct timer_node *node)
{
struct rb_node *recent;
while ((recent = node->root.rb_node)) {
struct coroutine *coro = container_of(recent, struct coroutine, node);
rb_erase(recent, &node->root);
coro->active_by_timeout = 1;
move_to_active_list_tail_direct(coro);
}
}
```
每個排程循環都會先檢查有沒有等待超過 timeout 的 coroutine
如果有的話,就將它從 inactive list 中轉移到 active list 的尾端
並且執行 active list 頭部的 coroutine (替換調目前的工作,讓整個排程持續前進)
`sched.policy(get_recent_timespan())` 這邊會對應到 `event_cycle`
> (`sched.policy = event_cycle;`)
```c
//@ /src/event.c
void event_cycle(int ms /* in milliseconds */)
{
int value = epoll_wait(evloop.epoll_fd, evloop.ev, evloop.max_conn, ms);
if (value <= 0)
return;
for (int i = 0; i < value; i++) {
struct epoll_event *ev = &evloop.ev[i];
struct fd_event *fe = &evloop.array[ev->data.fd];
fe->proc(fe->args);
}
}
```
在 event cycle 中利用 epoll 來監聽各個事件,並在觸發事件時通知。
這邊 `epoll_wait()` 會佔用 `get_recent_timespan()` 回傳的值,也就是距離目前的 timer_node timeout 還剩下的時間
```c
//@ /src/coro/sched.c
static inline int get_recent_timespan()
{
struct timer_node *node = get_recent_timer_node();
if (!node)
return 10 * 1000; /* at least 10 seconds */
int timespan = node->timeout - get_curr_mseconds();
return (timespan < 0) ? 0 : timespan;
}
```
> [man epoll_wait(2)](https://man7.org/linux/man-pages/man2/epoll_wait.2.html)
> The timeout argument specifies the number of milliseconds that epoll_wait() will block.
這段時間中若有事件觸發,則結束等待,切換處理下一個 coroutine (透過 `run_active_coroutine()`)
* fast task switching: 以組合語言重寫 context switch 程式碼,達到更高效率的 task switching
* 見 `src/coro/switch.*`
* 在 coroutine 排程中利用紅黑樹進行更高效率的搜尋
* 見 `src/coro/sched.*`
### Memcache
> 參見 [Site Cache vs Browser Cache vs Server Cache: What’s the Difference?](https://wp-rocket.me/blog/different-types-of-caching/)
* 實作一套 memcache 系統
* 用來處理 Server Cache,在伺服器端先將網頁內容預先加載好,以便在日後接收到 request 時可以立即反應
* 在 cserv 中用於兩個地方
* caching http_request
* server cache
* 加速 request 的處理
* 每次進入 http_request_handler 時都先從 memcache 中尋找,更改之後再存回去 memcache 中
* caching coroutine 排程中的 timer node
* active 的 timer node 被放進 memcache
* 用來判斷這個 timer 是否為 active
* 利用紅黑樹管理 inactive 的 timer,一旦 active 就將這個 timer 放進 memcache 以便快速存取
* 程式碼分析
* 站在呼叫端的角度命名
* memcache_alloc: 從 memcache 中提取 element 出來
```c
//@ /src/util/memcache.c
void *memcache_alloc(struct memcache *cache)
{
return (cache->curr) ? remove_element(cache) : malloc(cache->obj_size);
}
static inline void *remove_element(struct memcache *cache)
{
return cache->elements[--cache->curr];
}
```
* memcache_free: 把 element 放進 memcache
```c
//@ /src/util/memcache.c
void memcache_free(struct memcache *cache, void *element)
{
add_element(cache, element);
}
static inline void add_element(struct memcache *cache, void *element)
{
if (cache->curr < cache->cache_size) {
cache->elements[cache->curr++] = element;
return;
}
free(element);
}
```
### Logger
* 利用 circular buffer 實作一套 logger 系統
* 運用 circular buffer 的意義在於當 log 數量超過限制的時候,可以有效的剔除(覆蓋)舊的資料,即 circular 的特性
* circular buffer 的實作位於 `/src/util/cirbuf.h`
* 預設寫入 `/tmp/cserv.log` 檔案,更改 `/conf/cserv.log` 中的 `log_path` 來設定
* 分為 `CRIT`, `ERR`, `WARN`, `INFO` 四種等級
* `/conf/cserv.conf` 中的 `log_level` 為要寫入 log 檔案中的臨界點 (threshold)
* cser 簡單的運用 enum 的性質來處理 threshold 的判斷
```c
//@ /src/logger.c
void log_out(enum LOG_LEVEL level,
const char *file,
int line,
const char *fmt,
...)
{
if (level > log_level)
return;
// ...
}
```
* 利用 cserv 的每個 process 來作為 logger,因此會有 worker process + 1 個 logger
* 由呼叫 log_out 的 process 自行處理自己的 log
* 利用 shared memory 來讓每個 logger 共通 `log_object[]`,以便作業
* log_object 可視為 logger 陣列,定義了每個 logger 的 pid, logger 的 circular buffer 以及一個決定是否要 flush 的 flag