Try   HackMD

L11: ktcp

主講人: jserv / 課程討論區: 2024 年系統軟體課程

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
返回「Linux 核心設計/實作」課程進度表

cserv 高效網頁伺服器

cserv 是個高效的網頁伺服器,採用非阻塞式 I/O 的事件驅動架構。單執行緒、支援多核並善用 CPU affinity。

執行方法

  • 編譯
    ​​​​$ make
    
  • 執行
    ​​​​$ ./cserv start
    
  • 結束
    ​​​​$ ./cserv stop
    
  • 檢查設定
    ​​​​$ ./cserv conf
    
    其中設定檔位於 /conf/cserv.conf,可以設定 log 路徑、等級、worker process 數量、每個 worker 最大連線數、coroutine stack 大小、監聽連接埠以及 HTTP request line 跟 header 的大小

原始程式碼目錄結構

  • src/coro 目錄中為 coroutine 相關程式碼
  • src/http 目錄中為 http parsing 相關程式碼
  • src/util 目錄中為一些資料結構跟功能(red-black tree, hashtable, circular buffer, memcache, shared memory, spinlock)

壓力測試與比較

工具

比較對象

  • lwan
    • cserv 參考的對象
  • nginx
    • 市占率第一的網頁伺服器
  • apache
    • 老牌網頁伺服器

市占率來源: w3techs, 25 May 2022

實驗結果

ab

100000 requests with 500 requests at a time(concurrency)
ab -n 100000 -c 500 -k http://127.0.0.1:8081/

Requests per second Time per request (mean, across all concurrent requests)
cserv 17763.63 #/sec 0.056 ms
lwan 15960.17 #/sec 0.063 ms
nginx 53021.65 #/sec 0.019 ms
apache2 23558.77 #/sec 0.042 ms

htstress

100000 requests with 500 requests at a time(concurrency)
./htstress -n 100000 -c 500 127.0.0.1:8081/

requests/sec total time
cserv 21457.851 #/sec 4.660 s
lwan 18758.664 #/sec 5.331 s
nginx 17322.661 #/sec 5.773 s
apache2 15122.679 #/sec 6.613 s

httperf

total 100000 requests with 500 connections

200 * 500 = 100000

httperf --server 127.0.0.1 --port 8081 --num-conn 500 --num-call 200 --http-version 1.0

requests/sec connection rate
cserv 21872.7 #/sec 10936.3 conn/s
lwan 27230.0 #/sec 136.1 conn/s
nginx 16056.9 #/sec 159.0 conn/s
apache2 10087.8 #/sec 98.9 conn/s

Non-blocking I/O multiplexing

  • non-blocking: 利用 timer 達成,攔截(hook)阻塞式的 socket 系統呼叫,使其在 timer 終止時結束

    • 6 個系統呼叫
      ​​​​​​​​// @ /src/syscall_hook.h
      ​​​​​​​​typedef int (*sys_connect)(int sockfd,
      ​​​​​​​​                   const struct sockaddr *addr,
      ​​​​​​​​                   socklen_t addrlen);
      ​​​​​​​​typedef int (*sys_accept)(int sockfd,
      ​​​​​​​​                  struct sockaddr *addr,
      ​​​​​​​​                  socklen_t *addrlen);
      
      ​​​​​​​​typedef ssize_t (*sys_read)(int fd, void *buf, size_t count);
      ​​​​​​​​typedef ssize_t (*sys_recv)(int sockfd, void *buf, size_t len, int flags);
      
      ​​​​​​​​typedef ssize_t (*sys_write)(int fd, const void *buf, size_t count);
      ​​​​​​​​typedef ssize_t (*sys_send)(int sockfd, const void *buf, size_t len, int flags);
      
    • 舉 read 為例,timeout 時便回傳錯誤,達到 non-blocking 特性
      ​​​​​​​​// @ /src/syscall_hook.c        
      ​​​​​​​​ssize_t read(int fd, void *buf, size_t count)
      ​​​​​​​​{
      ​​​​​​​​    ssize_t n;
      ​​​​​​​​    
      ​​​​​​​​    while ((n = real_sys_read(fd, buf, count)) < 0) {
      ​​​​​​​​        if (EINTR == errno)
      ​​​​​​​​            continue;
      
      ​​​​​​​​        if (!fd_not_ready())
      ​​​​​​​​            return -1;
      
      ​​​​​​​​        if (add_fd_event(fd, EVENT_READABLE, event_rw_callback, current_coro()))
      ​​​​​​​​            return -2;
      
      ​​​​​​​​        schedule_timeout(READ_TIMEOUT);
      ​​​​​​​​        del_fd_event(fd, EVENT_READABLE);
      ​​​​​​​​        if (is_wakeup_by_timeout()) {
      ​​​​​​​​            errno = ETIME;
      ​​​​​​​​            return -3;
      ​​​​​​​​        }
      ​​​​​​​​    }
      
      ​​​​​​​​    return n;
      ​​​​​​​​}
      
  • I/O multiplexing: 利用強大的 epoll,透過監聽 events 來決定處理的 I/O

  • timeout 時間設定(單位:毫秒)

//@ /src/syscall_hook.c
#define CONN_TIMEOUT (10 * 1000)
#define ACCEPT_TIMEOUT (3 * 1000)

#define READ_TIMEOUT (10 * 1000)
#define RECV_TIMEOUT (10 * 1000)

#define WRITE_TIMEOUT (10 * 1000)
#define SEND_TIMEOUT (10 * 1000)

#define KEEP_ALIVE 60

System call hooking

  • hook: 鉤子,將正常的系統呼叫跟自己寫的版本掛勾,如此一來當呼叫該 system call 時,就會運行自己寫的版本
  • 真正意義上的 system call hooking 較麻煩,因為要使所有呼叫該符號者都使用 hooked 的版本,要達成這個目標,會需要插入核心模組,並更改 system call table
  • 但若只是想要整個專案範圍下的 hooking,可簡單的宣告同樣名字的函式,並在該函式中呼叫原始版本的系統呼叫
  • 在 Linux 中可用 dlsym(3) 來獲得特定符號的地址,cserv 用此法來獲得原始系統呼叫函式的進入點
  • 如果使用 gcc 編譯,在 #include <dlfcn.h> 之前,需要先 #define _GNU_SOURCE,否則會遇到編譯錯誤
  • 範例程式
#define _GNU_SOURCE
#include <dlfcn.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>

typedef ssize_t (*sys_write_t)(int,  const void *, size_t);
static sys_write_t sys_write = NULL;

#define HOOK_SYSCALL(name) \
    sys_##name = (sys_##name##_t) dlsym(RTLD_NEXT, #name)

ssize_t write(int fd, const void *buf, size_t count)
{
    printf("hooked\n");
    ssize_t n;
    while ((n = sys_write(fd, buf, count)) < 0);
    return n;
}

int main()
{
    HOOK_SYSCALL(write);
    write(STDOUT_FILENO, "Hello\n", strlen("Hello\n"));
    return 0;
}

編譯時記得加入 -ldl
執行結果如下

$ ./a.out
hooked
Hello

其中 HOOK_SYSCALL 參考 cserv 中的寫法

//@ /src/syscall_hook.c
#define HOOK_SYSCALL(name) \
    real_sys_##name = (sys_##name) dlsym(RTLD_NEXT, #name)
//...
HOOK_SYSCALL(connect); // real_sys_connect = (sys_connect) dlsym(RTLD_NEXT, connect)

這邊 real_sys_ 就是該 system call 的原始備份,用在每個 hooked system call 中以及像 logger 等不需要 non-blocking 版本的情境

Master/Worker process

  • 在 1 個 cpu core 運行 1 個 worker process
    • 利用 mod 把 worker process 分配給各個 cpu
    ​​​​// @ /src/process.c
    ​​​​for (int i = 0; i < g_worker_processes; i++) {
    ​​​​    struct process *p = &worker[i];
    ​​​​    p->pid = INVALID_PID;
    ​​​​    p->cpuid = i % get_ncpu();
    ​​​​}
    
  • 每個 process 只有一個 thread => 沒有 thread-pool 設計
  • 由一個 Master process 分配多個 worker process 執行工作
  • process 間達到 load balancing

CPU Affinity

執行 ./cserv start 啟動伺服器後使用 ps 命令可以得到以下結果

$ ps -ef | grep cserv
#  UID     PID   PPID C STIME  TTY       TIME           CMD
ycwu4142   793     8  0 00:03 pts/0    00:00:00 cserv: master process
ycwu4142   799   793  0 00:03 pts/0    00:00:00 cserv: worker process
ycwu4142   800   793  0 00:03 pts/0    00:00:00 cserv: worker process
ycwu4142   801   793  0 00:03 pts/0    00:00:00 cserv: worker process
ycwu4142   802   793  0 00:03 pts/0    00:00:00 cserv: worker process
ycwu4142   803   793  0 00:03 pts/0    00:00:00 cserv: worker process
ycwu4142   804   793  0 00:03 pts/0    00:00:00 cserv: worker process
ycwu4142   805   793  0 00:03 pts/0    00:00:00 cserv: worker process
ycwu4142   806   793  0 00:03 pts/0    00:00:00 cserv: worker process

從上面可得知,預設情況下 cserv 在 8 核的機器上建立 8 個 worker process,pid 分別是 799 到 806,從 ppid 都是 793 可以證明每個 worker process 都是由 master process fork 出來。

接著撰寫一個簡單的 shell script 來檢視各 worker process 的 cpu affinity

#!/bin/bash
for i in {799..806};
do
    taskset -pc $i
done

執行後得到以下結果

pid 799's current affinity list: 0
pid 800's current affinity list: 1
pid 801's current affinity list: 2
pid 802's current affinity list: 3
pid 803's current affinity list: 4
pid 804's current affinity list: 5
pid 805's current affinity list: 6
pid 806's current affinity list: 7

可以發現每個 worker process 各使用 1 個不同的 cpu core,充分運用運算資源。

Signal

在 Unix, Unix like, 及其他 POSIX 相容的作業系統中,signal 是一種常用的 IPC(Inter-Process Communication) 方式,可以看作應用程式間的 interrupt,用來通知 process 某個事件的發生,並且中斷該 process 的正常流程。

signal 可藉由多種方式觸發,比如使用 kill systemcall 可以對任意 process 發送任意訊號。
常見的訊號有

  • SIGTSTP: 暫停某個 process ,可以藉由 ctrl+z 發送
  • SIGKILL: 強制立即終止某個 process
  • SIGSEGV: 記憶體區段錯誤時自動發送
  • SIGINT: 中斷某個 process,可以藉由 ctrl+c 發送
  • SIGQUIT: 終止某個 process 並將記憶體中的資訊存到核心 (core dump)

man 7 signal 中有給出詳細的訊號列表

process 接收到訊號後會藉由自身的 signal handler 進行處理,以下是 signal handler 的大致流程,可以發現 signal 與 interrupt 不同的點是,前者會轉發回 process 並在 user mode 中處理,後者則是全部在 kernel mode 中交由核心處理







g



g

user

...

(2)sighandler()

(3)return

...

kernel

(1)dosignal()

 

(4)system_call()

 



上圖改畫自 Signals and Inter-Process Communication p.12

因此我們可以藉由 signal handler 來處理某些特定的訊號,比如 SIGINT,預設的行為是終止 process,以下例子使用 signal system call 來註冊當程式接收到 SIGINT 訊號後的行為

#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <signal.h> #include <string.h> static void signal_handler(int signo) { char *msg = "\nCaught signal!!\n"; write(STDOUT_FILENO, msg, strlen(msg)); exit(EXIT_SUCCESS); } int main(int argc, char *argv[]) { if (signal(SIGINT, signal_handler) == SIG_ERR) { printf("error!\n"); return EXIT_FAILURE; } for(int i=0;; ++i) { printf("%d...\n", i); sleep(1); } printf("exit!\n"); return EXIT_SUCCESS; }

signal handler 中使用 write 而非 printf 是後者為 non-async-signal-safe,用於訊號處理程式時可能會出錯

執行結果如下

$ ./signal
0...
1...
2...
^C  # 按下 ctrl + c
Caught signal!!

如果將第 11 行拿掉,則按下 ctrl + c 後程式不會停止

而因為 signal 系統的作用在不同的系統/版本中可能會有不同的定義,考慮到可攜性,Linux 建議用 sigaction 這個較為通用的系統呼叫

參見: signal(2) 中的 warning

下面是使用 sigaction 改寫的版本,輸出結果如上

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>
#include <string.h>

static void signal_handler(int signo)
{
    char *msg = "\nCaught signal!!\n";
    write(STDOUT_FILENO, msg, strlen(msg));
    exit(EXIT_SUCCESS);
}

int main(int argc, char *argv[])
{
    struct sigaction sa;
    sa.sa_handler = signal_handler;
    sigemptyset(&sa.sa_mask); // 初始化(清空) signal 的集合
    sigaction(SIGINT, &sa, NULL);

    for (int i=0; ; ++i) {
        printf("%d...\n", i);
        sleep(1);
    }

    printf("exit!\n");
    return EXIT_SUCCESS;
}

在 cserv 中就利用 sigaction 來處理幾種不同的 signal
並且對部分 signal 作了特殊的定義,如下方程式碼中,將 SIGHUP 當作重新設定的訊號

//@ /src/process.h
enum {
    SHUTDOWN_SIGNAL = SIGQUIT,
    TERMINATE_SIGNAL = SIGINT,
    RECONFIGURE_SIGNAL = SIGHUP
};

SIGHUP: hangup,掛斷,預設行為為終止,出現在當某個使用者登出時,系統會發送此訊號給同一 session 中的所有 process

例如當執行 ./cserv stop 時,會對 master process 發送 SIGQUIT 訊號

其中 read_pidfile 會去讀取 ./conf/cserv.pid,來得到 master process 的 pid
pid file: 存放 pid 的檔案,通常用來給其他 service 或 daemon 讀取 pid 以便對其發送 signal

//@ /src/main.c
// ...
    if (str_equal(argv[1], "stop")) {
        send_signal(SHUTDOWN_SIGNAL);
        printf("cserv: stopped.\n");
        return 0;
    }
// ...
static void send_signal(int signo)
{
    int pid = read_pidfile();
    kill(pid, signo);
}

而在 signal.c 中,利用 sigaction 將各 signal 與對應的 signal handler 綁定

//@ /src/signal.c
static struct sys_signal signals[] = {
    {SIGQUIT, "stop", signal_handler},
    {SIGTERM, "force-quit", signal_handler},
    {SIGINT, "force-quit", signal_handler},
    {SIGHUP, "reload", signal_handler},
    {SIGCHLD, "default", signal_handler},
    {SIGSYS, "ignored", SIG_IGN},
    {SIGPIPE, "ignored", SIG_IGN},
     {0, "", NULL},
};
// ...
__INIT static void signal_init()
{
    struct sigaction sa;

    for (struct sys_signal *sig = signals; sig->signo; sig++) {
        memset(&sa, 0, sizeof(struct sigaction));
        sa.sa_handler = sig->handler;
        sigemptyset(&sa.sa_mask);
        if (sigaction(sig->signo, &sa, NULL) == -1) {
            printf("Failed to invoke sigaction(%d)\n", sig->signo);
            exit(0);
        }
    }
}

當 master process 收到 SIGQUIT 訊號時,會將全域變數 g_shall_stop 設為 1 來告知 master process 要停止,並向所有 worker process 發送 SIGQUIT 訊號

//@ /src/signal.c
static void master_signal_handler(int signo)
{
    switch (signo) {
    case SIGQUIT:
        g_shall_stop = 1;
        break;
    case SIGTERM:
    case SIGINT:
        g_shall_exit = 1;
        break;

    case SIGCHLD:
        worker_process_get_status();
        break;
    }
}

//@ /src/process.c
void master_process_cycle()
{
    //...
    for (;;) {
        if (g_shall_stop == 1) {
            WARN("notify worker processes to stop");
            send_signal_to_workers(SHUTDOWN_SIGNAL);
            g_shall_stop = 2;
        }

        //...
    }
    // ...
}

而當 worker process 收到 SIGQUIT 時,也會將其 g_shall_stop 設為 1,以便在 process 迴圈內接收到停止的命令,然後斷線並停止。

//@ /src/signal.c
static void worker_signal_handler(int signo)
{
    switch (signo) {
    case SIGQUIT:
        g_shall_stop = 1;
        INFO("In worker, received SIGQUIT");
        break;
    case SIGTERM:
    case SIGINT:
        g_shall_exit = 1;
        INFO("In worker, received SIGTERM/SIGINT");
        break;
    }
}

//@ /src/process.c
static void worker_accept_cycle(void *args __UNUSED)
{
    for (;;) {
        if (unlikely(g_shall_stop)) {
            set_proc_title("cserv: worker process is shutting down");
            decrease_conn_and_check();
            break;
        }
        // ...
    }
}
// ...
static inline void decrease_conn_and_check()
{
    if (--connection_count == 0)
        exit(0);
}

這邊 g_shall_stop 變數也被用來檢查 worker prosses 的終止是否正常,若 g_shall_stopg_shall_exit 均為 0,但 worker 依然要停止時就是發生異常

//@ /src/process.c
void worker_exit_handler(int pid)
{
    for (int i = 0; i < g_worker_processes; i++) {
        struct process *p = &worker[i];
        if (p->pid == pid)
            p->pid = INVALID_PID;
    }

    /* If worker process exits without receiving the signal, it means
     * abnormal termination.
     */
    if (!g_shall_stop && !g_shall_exit)
        shall_create_worker = true;

    if (worker_empty() && (g_shall_stop || g_shall_exit))
        all_workers_exit = true;
}

最後 master process 會等到所有 worker process 結束才終止

//@ /src/process.c - master_process_cycle()
    if (all_workers_exit) {
        WARN("cserv exit now...");
        log_scan_write();
        delete_pidfile();
        exit(0);
    }

總而言之就是利用 signal 來進行 master/worker process 間的通訊。

Shared memory

  • 用途
    • 讓 master/worker process 間溝通資訊
    • 用來設置 spinlock,在 logger 的初始化、設定各 process 的 accept file descriptor 時使用
    • 用來分配 log_object[] 的空間,讓各個 logger (processes) 均能夠存取
  • shm.*
  • 解析 Linux 共享記憶體機制 中提到數個共享記憶體的實作方法,cserv 採用類似其中 POSIX shared memory 的方式,使用 mmap 來達成
  • 因為 master 以及各個 worker process 都是同一支程式 fork 出來的,因此可以使用 mmap 的匿名共享映射機制,來讓各個 process 共用同一塊記憶體
  • mmap
    ​​​​    #include <sys/mman.h>
    
    ​​​​    void *mmap(void *addr, size_t length, int prot, int flags,
    ​​​​              int fd, off_t offset);
    ​​​​    int munmap(void *addr, size_t length);
    
    • mmap 的功用是將 fd (file descriptor) 映射到一段虛擬記憶體空間,以便在不使用 read, write 等操作的情形下,對檔案或裝置進行操作 (由對該虛擬記憶體空間進行操作達成)
    • 其中 flag 的部分可以選擇 MAP_PRIVATE 或著 MAP_SHARED 來決定這段記憶體是否能讓別的 process 存取
    • 此外,還可以使用 MAP_ANONYMOUS 來達到匿名映射,讓 mmap 不對 fd 做記憶體映射(通常將 fd 填入 -1)。
  • 於是透過上述的 MAP_SHARED 加上 MAP_ANONYMOUS 就可以輕鬆地達成親子 process 間的共享記憶體
  • 範例程式:
#include <sys/mman.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#define PG_SIZE 32

int main()
{
    void *addr = mmap(NULL, PG_SIZE, PROT_READ | PROT_WRITE, 
                      MAP_ANONYMOUS | MAP_SHARED, -1, 0);
    
    char *p = addr;
    memset(p, 'A', PG_SIZE);
    
    printf("content: %c %c %c %c\n", p[0], p[1], p[2], p[3]);
    
    int pid = fork();

    if (pid == 0){
        memset(p, 'B', PG_SIZE);
        printf("I'm child.\n");
    } else if (pid > 0){
        printf("I'm father.\n");
    } else {
        printf("fork error\n");
        exit(EXIT_FAILURE);
    }
    
    printf("content: %c %c %c %c\n", p[0], p[1], p[2], p[3]);

    if (addr == MAP_FAILED) {
        printf("mapping failed\n");
        return EXIT_FAILURE;
    }
    munmap(addr, PG_SIZE);
    return EXIT_SUCCESS;
}

執行結果

content: A A A A
I'm father.
content: B B B B
I'm child.
content: B B B B

可以發現本來共享空間中的 A A A A 被子行程改成了 B B B B 後,親代行程也讀到了 B B B B,以此證明該空間是親子行程的共享記憶體空間

  • cserv 中實際的使用如下
//@ /src/util/shm.c
struct shm {
    char *addr;
    size_t size, offset;
    spinlock_t lock; /* lock while allocating shared memory */
};

static struct shm *shm_object;

首先宣告一個共享記憶體的結構,裡面自帶了一個 spinlock,用來在配置記憶體空間時鎖上,以防止其他 process 同時存取

//@ /src/util/shm.c
/* Allocate without reclaiming */
void *shm_alloc(size_t size_bytes)
{
    size_bytes = ROUND_UP(size_bytes, MEM_ALIGN);

    spin_lock(&shm_object->lock);
    if (unlikely(shm_object->offset + size_bytes > shm_object->size)) {
        spin_unlock(&shm_object->lock);
        return NULL;
    }

    char *addr = shm_object->addr + shm_object->offset;
    shm_object->offset += size_bytes;
    spin_unlock(&shm_object->lock);

    return addr;
}

/* unit: page size */
void *shm_pages_alloc(unsigned int pg_count)
{
    void *addr = mmap(NULL, pg_count * get_page_size(), PROT_READ | PROT_WRITE,
                      MAP_ANONYMOUS | MAP_SHARED, -1, 0);
    return (addr == MAP_FAILED) ? NULL : addr;
}

先看 shm_pages_alloc,顧名思義這個函式就是配製一個 page 的共享記憶體空間,其中 page size 取決於每個機器
這邊可以看到 mmap 的使用,大致上跟前面的範例一樣
shm_alloc 這個函式則是在已經配置的 page 中分配空間出來
假設下圖是一個 shm_page







g



page

addr

...

...

 

 

 



offset

addr + offset



offset->page:o





offset 指向目前的裝到哪邊的位置,size 則是 page size
因此要配置一個 size_bytes 大小的空間,會先將 size_bytes 進位,做記憶體對齊
然後檢查當下的空間夠不夠裝,最後才調整 offset 並返回 addr + offset 的指標,指向這塊空間的起始位置

Coroutine

閱讀 Life of a HTTP request, as seen by my toy web server

  • coroutine
    • 跟 thread 很像,但是採用 cooperative multitasking 而非 preemptively multitasking
    • 擁有自己的 stack 跟 program counter
    • 可以利用 dispatcher 輕易的在 thread 之間轉換
    • 可以中斷並且繼續執行,跟一般的函式不同
    • 是一種在 multi-threading 普及之前產生的概念,優點是不會產生 race-condition
    • 通常由 user 控制
  • 利用 coroutine 處理各個連線 (而非建立新的 thread)
    • 利用可中斷的特性來切換連線
    ​​​​| Main loop |      | Connection 1 |      | Connection 2 |
    ​​​​----------------------------------------------------------
    ​​​​      |
    ​​​​      |
    ​​​​      v
    ​​​​     ***  --> resume --> ***
    ​​​​                         ***
    ​​​​                         ***
    ​​​​     ***  <--  yield <-- ***
    ​​​​     ***
    ​​​​     ***  ------------> resume ------------->   ***
    ​​​​          ***
    ​​​​     ***  <------------  yield <-------------   ***
    ​​​​     ***
    ​​​​     ***
    ​​​​     ***
    ​​​​     ***  --> resume --> ***
    ​​​​                         ***
    

    改繪自 Life of a HTTP request, as seen by my toy web server 中的 Diagram of main loop plus two coroutines

  • 可以看成給 worker 們的 task/job 或者說針對 worker 的專屬排程機制
  • timeout 機制
//@ /src/process.c
void worker_process_cycle()
{
    if (worker_init_proc && worker_init_proc()) {
        ERR("Failed to initialize worker process");
        exit(0);
    }

    schedule_init(g_coro_stack_kbytes, g_worker_connections);
    event_loop_init(g_worker_connections);
    dispatch_coro(worker_accept_cycle, NULL);
    INFO("worker success running...");
    schedule_cycle();
}

每個 worker 運作時會先派發一個 coroutine 來處理 tcp accept (等待 connection 進來並且處理) 的工作,然後進入 coroutine 排程的循環

//@ /src/process.c
static void worker_accept_cycle(void *args __UNUSED)
{
    for (;;) {
        if (unlikely(g_shall_stop)) {
            set_proc_title("cserv: worker process is shutting down");
            decrease_conn_and_check();
            break;
        }

        if (unlikely(g_shall_exit))
            exit(0);

        int connfd = worker_accept();
        if (likely(connfd > 0)) {
            if (dispatch_coro(handle_connection, (void *) (intptr_t) connfd)) {
                WARN("system busy to handle request.");
                close(connfd);
                continue;
            }
            increase_conn();
        } else if (connfd == 0) {
            schedule_timeout(200);
            continue;
        }
    }
}

而在 worker_accept_cycle 中,如果 worker 有新的連線進入,則分派一個 coroutine 來處理,如果無法立即處理則呼叫 schedule_timeout() 將該 coroutine 放進 inactive list 中等待, timeout 時再行處理

//@ /src/coro/sched.c
void schedule_cycle()
{
    for (;;) {
        check_timeout_coroutine();
        run_active_coroutine();

        sched.policy(get_recent_timespan());
        run_active_coroutine();
    }
}
//@ /src/coro/sched.c
static void check_timeout_coroutine()
{
    struct timer_node *node;
    long long now = get_curr_mseconds();

    while ((node = get_recent_timer_node())) {
        if (now < node->timeout)
            return;

        rb_erase(&node->node, &sched.inactive);
        timeout_coroutine_handler(node);
        memcache_free(sched.cache, node);
    }
}

static inline void timeout_coroutine_handler(struct timer_node *node)
{
    struct rb_node *recent;

    while ((recent = node->root.rb_node)) {
        struct coroutine *coro = container_of(recent, struct coroutine, node);
        rb_erase(recent, &node->root);
        coro->active_by_timeout = 1;
        move_to_active_list_tail_direct(coro);
    }
}

每個排程循環都會先檢查有沒有等待超過 timeout 的 coroutine
如果有的話,就將它從 inactive list 中轉移到 active list 的尾端
並且執行 active list 頭部的 coroutine (替換調目前的工作,讓整個排程持續前進)

sched.policy(get_recent_timespan()) 這邊會對應到 event_cycle

(sched.policy = event_cycle;)

//@ /src/event.c
void event_cycle(int ms /* in milliseconds */)
{
    int value = epoll_wait(evloop.epoll_fd, evloop.ev, evloop.max_conn, ms);
    if (value <= 0)
        return;

    for (int i = 0; i < value; i++) {
        struct epoll_event *ev = &evloop.ev[i];
        struct fd_event *fe = &evloop.array[ev->data.fd];
        fe->proc(fe->args);
    }
}

在 event cycle 中利用 epoll 來監聽各個事件,並在觸發事件時通知。
這邊 epoll_wait() 會佔用 get_recent_timespan() 回傳的值,也就是距離目前的 timer_node timeout 還剩下的時間

//@ /src/coro/sched.c
static inline int get_recent_timespan()
{
    struct timer_node *node = get_recent_timer_node();
    if (!node)
        return 10 * 1000; /* at least 10 seconds */

    int timespan = node->timeout - get_curr_mseconds();
    return (timespan < 0) ? 0 : timespan;
}

man epoll_wait(2)
The timeout argument specifies the number of milliseconds that epoll_wait() will block.

這段時間中若有事件觸發,則結束等待,切換處理下一個 coroutine (透過 run_active_coroutine())

  • fast task switching: 以組合語言重寫 context switch 程式碼,達到更高效率的 task switching
    • src/coro/switch.*
  • 在 coroutine 排程中利用紅黑樹進行更高效率的搜尋
    • src/coro/sched.*

Memcache

參見 Site Cache vs Browser Cache vs Server Cache: What’s the Difference?

  • 實作一套 memcache 系統
  • 用來處理 Server Cache,在伺服器端先將網頁內容預先加載好,以便在日後接收到 request 時可以立即反應
  • 在 cserv 中用於兩個地方
    • caching http_request
      • server cache
      • 加速 request 的處理
      • 每次進入 http_request_handler 時都先從 memcache 中尋找,更改之後再存回去 memcache 中
    • caching coroutine 排程中的 timer node
      • active 的 timer node 被放進 memcache
      • 用來判斷這個 timer 是否為 active
      • 利用紅黑樹管理 inactive 的 timer,一旦 active 就將這個 timer 放進 memcache 以便快速存取
  • 程式碼分析
    • 站在呼叫端的角度命名
    • memcache_alloc: 從 memcache 中提取 element 出來
    ​​​​//@ /src/util/memcache.c
    ​​​​void *memcache_alloc(struct memcache *cache)
    ​​​​{
    ​​​​    return (cache->curr) ? remove_element(cache) : malloc(cache->obj_size);
    ​​​​}
    
    ​​​​static inline void *remove_element(struct memcache *cache)
    ​​​​{
    ​​​​    return cache->elements[--cache->curr];
    ​​​​}
    
    • memcache_free: 把 element 放進 memcache
    ​​​​//@ /src/util/memcache.c
    ​​​​void memcache_free(struct memcache *cache, void *element)
    ​​​​{
    ​​​​    add_element(cache, element);
    ​​​​
    ​​​​}
    
    ​​​​static inline void add_element(struct memcache *cache, void *element)
    ​​​​{
    ​​​​    if (cache->curr < cache->cache_size) {
    ​​​​        cache->elements[cache->curr++] = element;
    ​​​​        return;
    ​​​​    }
    
    ​​​​    free(element);
    ​​​​}
    

Logger

  • 利用 circular buffer 實作一套 logger 系統
    • 運用 circular buffer 的意義在於當 log 數量超過限制的時候,可以有效的剔除(覆蓋)舊的資料,即 circular 的特性
    • circular buffer 的實作位於 /src/util/cirbuf.h
  • 預設寫入 /tmp/cserv.log 檔案,更改 /conf/cserv.log 中的 log_path 來設定
  • 分為 CRIT, ERR, WARN, INFO 四種等級
    • /conf/cserv.conf 中的 log_level 為要寫入 log 檔案中的臨界點 (threshold)
    • cser 簡單的運用 enum 的性質來處理 threshold 的判斷
    ​​​​//@ /src/logger.c
    ​​​​void log_out(enum LOG_LEVEL level,
    ​​​​     const char *file,
    ​​​​     int line,
    ​​​​     const char *fmt,
    ​​​​     ...)
    ​​​​{
    ​​​​    if (level > log_level)
    ​​​​        return;
    ​​​​    // ...
    ​​​​}
    
  • 利用 cserv 的每個 process 來作為 logger,因此會有 worker process + 1 個 logger
    • 由呼叫 log_out 的 process 自行處理自己的 log
  • 利用 shared memory 來讓每個 logger 共通 log_object[],以便作業
    • log_object 可視為 logger 陣列,定義了每個 logger 的 pid, logger 的 circular buffer 以及一個決定是否要 flush 的 flag