Try   HackMD

Linux 核心專題: 高效網頁伺服器

執行人: SimonLee0316
解說影片

Reviewed by dockyu

可以將實驗結果畫成圖表,比較能表現出數據間的差異

已補上,謝謝。

Reviewed by steven523

網頁伺服器在應對大量請求的同時,如何確保安全性不受影響?例如防範常見的網路攻擊 DDoS、SQL injection、XSS攻擊等,確保資料完整性和保護使用者隱私。

任務簡介

藉由 coroutine 和 POSIX Thread,以 M:N 執行緒模型建構高性能的網頁伺服器。

實驗環境

$ gcc --version
gcc (Ubuntu 12.3.0-1ubuntu1~22.04) 12.3.0
Copyright (C) 2022 Free Software Foundation, Inc.
This is free software; see the source for copying conditions.  There is NO
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
$ lscpu
Architecture:            x86_64
  CPU op-mode(s):        32-bit, 64-bit
  Address sizes:         39 bits physical, 48 bits virtual
  Byte Order:            Little Endian
CPU(s):                  16
  On-line CPU(s) list:   0-15
Vendor ID:               GenuineIntel
  Model name:            Intel(R) Core(TM) i7-10700 CPU @ 2.90GHz
    CPU family:          6
    Model:               165
    Thread(s) per core:  2
    Core(s) per socket:  8
    Socket(s):           1
    Stepping:            5
    CPU max MHz:         4800.0000
    CPU min MHz:         800.0000
    BogoMIPS:            5799.77
Caches (sum of all):     
  L1d:                   256 KiB (8 instances)
  L1i:                   256 KiB (8 instances)
  L2:                    2 MiB (8 instances)
  L3:                    16 MiB (1 instance)
NUMA:                    
  NUMA node(s):          1
  NUMA node0 CPU(s):     0-15

TODO: 探討第 8 週測驗題提及的任務排程器

留意其 M:N 執行緒模型如何在 GNU/Linux 落實,特別是 __thread (thread-local storage; tls) 和 C11 Atomics 的使用。

測驗八使用 coroutine 並搭配任務排程器實作任務之 啟動/暫停/恢復/建立/分離/收合操作

__thread 的使用讓每一個執行緒都有自己的變數,根據 TLS 說明其用法。

__thread int i;
extern __thread struct state s;
static __thread char *p;

__thread 可以單獨是用也可以與 extern、static 一起使用,但不可以與其他 storage class specifier 一起使用。

不要急著對程式碼進行逐行解說,你可能會落得「舉燭」的下場。揣摩程式開發者的想法,對整個程式碼進行想法的解讀,解釋到底要解決什麼問題,及用什麼手法。

static __thread struct task_list task_runners
static __thread struct task_list task_yielders

task_runners :儲存正在等待使用 cpu 的 task。
task_yielders :儲存等待排程的 task。

static __thread struct task_map task_paused
static struct task_map task_detached

task_map 是一個 hash-map style 的結構使用多個 binary search tree 又稱(aa-tree) 在 hashed shards 裡面,這允許 map 可以均勻增長, 無須分配,並且性能高於使用 single binary search tree。

task_paused : 儲存被暫停的 task。
task_detached : 儲存被分離的 task。
這裡可以想成被從排程分離的 task。

透過 quick_start 建立並啟動 coroutine:

#define quick_start(entry_, cleanup_, udata_) \
    {                                         \
        void *stack = xmalloc(STACK_SIZE);    \
        assert(stack);                        \
        started++;                            \
        task_start(&(struct task_desc){       \
            .stack = stack,                   \
            .stack_size = STACK_SIZE,         \
            .entry = (entry_),                \
            .udata = (udata_),                \
            .cleanup = (cleanup_),            \
        });                                   \
    }

.entry : 要 coroutine 執行的函式。
.udate : 放入的引數。

void task_start(struct task_desc *desc)
{
    task_init();
    struct coro_desc coro_desc = {
        .entry = task_entry,
        .cleanup = desc->cleanup,
        .stack = desc->stack,
        .stack_size = desc->stack_size,
        .udata = desc->udata,
    };
    task_user_entry = desc->entry;
    coro_start(&coro_desc, false);
}

.entrytask_entry 為真正要執行我們要 coroutine 執行函式的函式。

void coro_start(struct coro_desc *desc, bool final)
{
    /*...*/
    coro_switch0(desc, 0, final);
}
static void coro_switch0(struct coro_desc *desc, struct coro *co, bool final)
{
    struct coro *from = coro_cur ? coro_cur : &coro_thread;
    struct coro *to = desc ? NULL : co ? co : &coro_thread;
    if (from != to) {
        if (final) {
            coro_cleanup_needed = true;
            coro_cleanup_desc = from->desc;
        }
        if (desc) {
            coro_desc = *desc;
            coro_switch1(from, 0, desc->stack, desc->stack_size);
        } else {
            coro_cur = to;
            coro_switch1(from, to, 0, 0);
        }
        coro_cleanup_last();
    }
}
static void coro_switch1(struct coro *from,
                         struct coro *to,
                         void *stack,
                         size_t stack_size)
{
    if (to) {
        _coro_asm_switch(&from->ctx, &to->ctx);
    } else {
        struct coro_asmctx ctx = {0};
        coro_asmctx_make(&ctx, stack, stack_size, 0);
        _coro_asm_switch(&from->ctx, &ctx);
    }
}

切換不同 coroutine,呼叫 coro_switch1,如果是第一次呼叫 desc 不會是 0,所以在 coro_switch1 裡面如果是第一次呼叫的話他的 switch to 參數是 0 ,代表需要透過 coro_asmctx_make 建立一個新的 coroutine ,在切換過去,如果是不是第一次的話就透過 _coro_asm_switch,切換過去。

_coro_asm_switch 是用 x86-64 組合語言撰寫,將目前暫存器內容存回 目前執行 coroutine 內,在載入要切換過去之 task 之 contex。

切換後執行 task_entry

static void task_entry(void *udata)
{
    /* Initialize a new coroutine on the user's stack. */
    /*...*/
    if (task_cur) {
        /* Reschedule the coroutine that started this one */
        task_list_push_back(&task_yielders, co);
        task_list_push_back(&task_yielders, task_cur);
        task_nyielders += 2;
        task_switch(false, false);
    }
    task_cur = co;
    if (task_user_entry) {
        task_user_entry(udata);
    }
    /* This coroutine is finished. Switch to the next coroutine. */
    task_switch(false, true);
}

queue 是「佇列」,不是「實作」

先將要啟動的 coroutine 還有目前在執行的 coroutine 放入先前提到的等待排程的實作 上面,並呼叫切換 task。

static void task_switch(bool resumed_from_main, bool final)
{
    if (task_nrunners == 0) {
        /* No more runners */
        if (task_nyielders == 0 || task_exit_to_main_requested ||
            (!resumed_from_main && task_npaused > 0)) {
            task_return_to_main(final);
            return;
        }
        /* Convert the yielders to runners */
        /*...*/
    }
    task_cur = task_list_pop_front(&task_runners);
    task_nrunners--;
    coro_switch(task_cur->llco, final);
}

如果目前沒有正在等待使用 cpu 的 task 且,

  • 目前沒有等待排程的 coroutine
    或者
  • 沒有 coroutine 請求回到 main
    或者
  • 目前有被暫停的 coroutine (!resumed_from_main && task_npaused > 0)

如果其中一個條件滿足則會 task_return_to_main 回到 main。

如果都不滿足則會將在等待排程實作上的 task 都移動到等待 cpu 的實作。

以上為討論一個 corouine 建立並執行的過程。

一個 task 讓出 cpu :

void task_yield(void)
{
    if (task_cur) {
        task_list_push_back(&task_yielders, task_cur);
        task_nyielders++;
        task_switch(false, false);
    }
}

將自己插到等待排程的實作最後端,呼叫這個函式可以想像成主動放棄 cpu 的 task。

將 task 暫停,從排程中分離:

void task_pause(void)
{
    if (task_cur) {
        task_map_insert(&task_paused, task_cur);
        task_npaused++;
        task_switch(false, false);
    }
}

我們在前面說明儲存被分離的 task 結構中,可以看到前面並沒有 __thread 說明這個結構在多執行緒環境下,會有同步的問題所以需要透過加入適當的互斥鎖機制來確保同步問題。

確認上述說法是否合理。TLS 要解決的問題,是否跟任務排程器相關。

TLS 允許執行緒擁有私自的資料。對於每個執行緒來說,TLS 是獨一無二,不會相互影響,假設任務實作搭配使用 TLS ,且不使用多執行緒,在每個 cpu 上面只有一個執行緒,也就是可以說可以達到每個 cpu 上可以有各自的任務實作,並搭配排程器可以達到並行。

static void task_lock(void)
{
    bool expected = false;
    while (!atomic_compare_exchange_weak(&task_locker, &expected, true)) {
        expected = false;
        sched_yield0();
    }
}

static void task_unlock(void)
{
    atomic_store(&task_locker, false);
}

注意用語。

atomic_compare_exchange_weak 為 C11 提供的 atomic 指令 函式可以確保我們在對共享變數存取操作是 atomic,week 是可以允許 fail spuriously。
atomic_store 一個 atomic 操作寫入共享變數。

以下說明 Quiz8 是如何對 coroutine 做排程的相關操作 :

test_task_start

void test_task_start(void)
{
    /*...*/
    quick_start(co_root_entry, co_cleanup, &(int){99999999});
    /*...*/
}

執行 co_root_entry 了,在裡面會不斷的新建 child coroutine。

void co_child_entry(void *udata)
{
    int udid = *(int *) udata;
    assert(udid == nudid);
}
void co_root_entry(void *udata)
{
    assert(*(int *) udata == 99999999);
    assert(*(int *) task_udata() == 99999999);
    assert(task_info_running() == 1);
    for (int i = 0; i < NCHILDREN; i++) {
        printf("start child %d\n",i);
        assert(started == 1 + i);
        assert(cleaned == 1 + i - 1);
        nudid = i;
        quick_start(co_child_entry, co_cleanup, &(int){i});
    }
}

運行順序如下。

test_task_start
start co_root_entry
start child 0
start co_child_entry udid: 0
start child 1
start co_child_entry udid: 1
start child 2
...
start child 99
start co_child_entry udid: 99

do_test(test_task_start),會建立多個 coroutine 。

不要急著說「總結來說」,你真的懂這個程式如何運作?若是,說得出裡頭的實作缺失、可改進之處嗎?倘若沒有徹底掌握,何以「總結」。

了解。

test_task_sleep

透過 quick_start 呼叫 co_sleep 函式執行,會在呼叫 task_sleep。

void co_sleep(void *udata)
{
    assert(*(int *) udata == 99999999);
    task_sleep(1e8); /* 100ms CPU-based sleep */
}
static void task_sleep(int64_t nanosecs)
{
    int64_t start = getnow();
    while (getnow() - start < nanosecs)
        task_yield();
}

task_sleep 中的 task_yeild 功能主要是將當前 coroutine 放入 task_yielders 的串列當中,這可以表示成主動放棄 cpu 的 coroutine ,可以想成在 sleep 的狀態並等待排程,將自己放入 task_yielders 串列的末端後,在呼叫task_switch 呼叫在 task_nrunners 排隊的 coroutine 執行。

這裡會不斷的將 coroutine 加入 task_yielders 直到 100ms 時間到。

test_task_pause

建立100個 co_pause_one coroutine 還有 一個 co_resume_all coroutine ,並輪流執行。

void test_task_pause(void)
{
    reset_stats();
    for (int i = 0; i < NCHILDREN; i++) {
        quick_start(co_pause_one, co_cleanup, &i);
    }
    quick_start(co_resume_all, co_cleanup, 0);
    while (task_active()) {
        task_resume(0);
    }
    assert(npaused == 0);
}
  • co_pause_one
    建立100個的 co_pause_one coroutine,每次都會做一次順序暫停一次逆序暫停,再做一次順序再做一次逆序。
    順序暫停的實做如下
void co_pause_one(void *udata)
{
    int index = *(int *) udata;
    assert(index == npaused);
    paused[index] = task_id();
    /* pause in order */
    npaused++;
    is_paused[index] = true;
    task_pause();
    //return from pause
    is_paused[index] = false;
    npaused--;
    while (!all_resumed)
        task_yield();
}

將目前被暫停的 coroutine 標記為 true ,並做暫停切換到其他 coroutine (包含 test_task_pause),從暫停回來之後將標記改回 false ,如果還有人在被暫停的話,那就將自己排到task_yielders ,等待所有人都恢復。
程式碼是如何做到逆序暫停?
答: 將 id 較小的 coroutine sleep 久一點 這樣 id 較大的 coroutine 就會比較先被暫停。

void co_pause_one(void *udata)
{
    /*...*/
     /* pause in reverse */
    task_sleep(1e6 * (NCHILDREN - index));
    npaused++;
    is_paused[index] = true;
    task_pause();
    is_paused[index] = false;
    npaused--;
    while (!all_resumed) {
        task_yield();
    }
}
  • co_resume_all
    有 100 個 co_pause_one coroutine 和 1 個 co_resume_all 依序交換執行之下,在 co_pause_one 裡面每一個逆/順序暫停中,最後都會檢查是否所有人都被恢復執行了才會繼續往下一個逆/順序暫停,而負責恢復執行的 coroutine 就是 co_resume_all ,在程式碼中可以看到,在暫停的時候的順序是 1. 順序暫停 2. 逆序暫停 3. 順序暫停 4. 逆序暫停,而恢復執行的順序卻是 1. 順序恢復 2. 順序恢復 3. 順序恢復 4. 逆序恢復 ,這種情況會使就算 coroutine 是早暫停的將會晚恢復執行。
void co_resume_all(void *udata)
{
    (void) udata;

    /* wait for all to be paused */
    while (npaused < NCHILDREN) {
        task_yield();
    }
    all_resumed = false;
    /* resume in order */
    for (int i = 0; i < NCHILDREN; i++) {
        task_resume(paused[i]);
    }
    while (npaused > 0) {
        task_yield();
    }
    all_resumed = true;
    /* resume in order */
    /* resume in reverse order */
    /* resume in reverse order */
    /*...*/
}

注意 task_yield 是將自己插入 task_yielders 末端,這樣的操作是可以再度被執行的。
但是如果使用 task_pause 的話是將自己插入到 task_paused 是一個 task_map 的結構末端, 也就是直到它被 resume 之前都不會被返回 task_yielders

test_task_exit

避免「舉燭」,揣摩程式開發者當初的想法和規劃。

co_one 在啟動co_two,co_three,co_four 後,回到 main 等待所有的 task 都執行完畢,並在最後確認結束的順序是否與規劃的相同。

程式碼運作流程:
exitvals[] : 用來紀錄執行順序。
1. 啟動 co_one coroutine
2. co_one 紀錄 1,並啟動 co_two
3. co_two 將自己放入 task_yielders ,task_sleep(1e7 * 2),它會比 co_three 晚被恢復執行
4. 回到 co_one ,啟動 co_three
5. co_three 將自己放入 task_yielders ,task_sleep(1e7)
6. 回到 co_one ,啟動 co_four
7. co_four 紀錄 4 將自己放入 task_yielders
8. co_one 呼叫 task_exit,回到 test_task_exit
9. test_task_exit 確認是否還有 coroutine 沒有執行完,如果有就去執行它
c void test_task_exit(void) { /*...*/ while (task_active()) { task_resume(0); } /*...*/ }
10. 恢復執行 co_three ,紀錄 3
11. 恢復執行 co_two ,紀錄 2
12. test_task_exit 紀錄-2,表示所有任務完成。
至此 exitvals 內的值為 [1,4,-1,3,2,-2]

test_task_order

測試 coroutine 執行順序。
quickstart(co_yield) -> task_start -> coro_start -> coro_switch0 -> coro_switch1 -> task_entry

static void task_entry(void *udata)
{
    /* Initialize a new coroutine on the user's stack. */
    /*...*/
    if (task_cur) {
        /* Reschedule the coroutine that started this one */
        task_list_push_back(&task_yielders, co);
        task_list_push_back(&task_yielders, task_cur);
        task_nyielders += 2;
        task_switch(false, false);
    }
    task_cur = co;
    if (task_user_entry) {
        task_user_entry(udata);
    }
    /* This coroutine is finished. Switch to the next coroutine. */
    task_switch(false, true);
}

注意用語!

  1. 啟動 co_yield 是第一個 coroutine 所以沒有 task_cur,接著執行 co_yield
    task_cur = co_yield
    task_nrunners = {} , task_nyielders = {}
    a = [A]
    quick_start(co_yield1) -> ->task_entry
  2. task_cur 是 co_yield,所以將 co_yeld1 和 co_yeld0 插入 task_nyielders
    task_nrunners = {} , task_nyielders = { co_yield1 -> co_yield }
    task_cur = co_yield1
    呼叫 task_switch()
  3. task_switch() 現在沒有 runners ,所以將 task_nyielders 全部搬到 runners
    task_cur = co_yield1
    task_nrunners = { co_yield1 -> co_yield } , task_nyielders = {}
    並取第一個 runners switch 過去執行,所以是 co_yield1。
  4. 啟動 co_yeild1
    a = [AB]
    呼叫 task_yield(),將自己插到 task_nyielders。
    task_cur = co_yield1
    task_nrunners = { co_yield } , task_nyielders = {co_yield1}
    呼叫 task_switch()
  5. 從runners 取第一個 task = co_yield
    task_nrunners = {} , task_nyielders = {co_yield1}
    a = [ABC]
    quick_start(co_yield2) -> -> task_entry
  6. task_cur 是 co_yield,所以將 co_yeld2 和 co_yield 插入 task_nyielders
    task_nrunners = {} , task_nyielders = { co_yield1 -> co_yield2 -> co_yield }
    呼叫 task_switch()
  7. 沒有 runners 所以將 task_nyielders 全部搬近來
    task_nrunners = { co_yield1 -> co_yield2 -> co_yield } , task_nyielders = {}
    task_cur = co_yield1
    取第一個 runner 執行 = co_yield1
  8. co_yield1
    a = [ABCD]
    執行結束回到 task_entry() ,並呼叫 task_switch
    task_cur = co_yield1
    task_nrunners = { co_yield2 -> co_yield } , task_nyielders = {}
    並取第一個 runners switch 過去執行,所以是 co_yield2。
    task_cur = co_yield2
  9. co_yield2
    a = [ABCDE]
    呼叫 task_yield(),將自己插到 task_nyielders。
    task_cur = co_yield2
    task_nrunners = { co_yield } , task_nyielders = { co_yield2 }
    並取第一個 runners switch 過去執行,所以是 co_yield。
    task_cur = co_yield
  10. co_yield
    a = [ABCDEF]
    呼叫 task_yield(),將自己插到 task_nyielders。
    task_cur = co_yield
    task_nrunners = {} , task_nyielders = { co_yield2 -> co_yield }
    沒有 runners 所以將 task_nyielders 全部搬近來
    task_nrunners = { co_yield2 -> co_yield } , task_nyielders = {}
    並取第一個 runners switch 過去執行,所以是 co_yield。
    task_cur = co_yield2
  11. co_yield2
    a = [ABCDEFG]
    執行結束回到 task_entry ,並呼叫 task_switch
    task_nrunners = { co_yield } , task_nyielders = {}
    並取第一個 runners switch 過去執行,所以是 co_yield。
  12. co_yield
    a = [ABCDEFGH]
    執行結束回到 task_entry ,並呼叫 task_switch
    task_nrunners = {} , task_nyielders = {}
    結束回到 test_task_order

test_task_detach

建立兩個 pthread 分別執行 threda0thread1
執行後透過 pthread_join 等待 pthread 完成並回到 main。

threda0 程式碼分析:
啟動 100 個 coroutine ,並當被暫停的 coroutine 數量為 100 的時後(意思就是 task_paused 串列數量為 100),就開始將 task_paused 的所有 coroutine 搬到 task_detached 串列上。

void *thread0(void *arg)
{
    (void) arg;
    reset_stats();
    for (int i = 0; i < NCHILDREN; i++) {
        quick_start(co_thread_one, co_cleanup, &i);
    }
    while (task_active()) {
        if (task_info_paused() == NCHILDREN) {
            /* Detach all paused */
            for (int i = 0; i < NCHILDREN; i++) {
                task_detach(thpaused[i]);
            }
        }
        task_resume(0);
    }
    return NULL;
}

thread1 程式碼分析:
如果 所有 coroutine 還沒都被 detached 的會就會卡在迴圈裡面。
離開迴圈後將所有 coroutine 從 task_detached 串列移回到 task_paused 串列,並把他們重新加回去 task_yielders 讓他們可以繼續被執行。

void *thread1(void *arg)
{
    (void) arg;
    reset_stats();
    while (task_info_detached() < NCHILDREN) {
        /* Wait */
    }

    /* Attach all paused */
    for (int i = 0; i < NCHILDREN; i++) {
        task_attach(thpaused[i]);
        task_resume(thpaused[i]);
    }
    while (task_active()) {
        task_resume(0);
    }
    return NULL;
}

co_thread_one 程式碼分析:
將自己加到 task_yielders 串列直到 時間到。
在將自己放到 task_paused 串列。

void co_thread_one(void *udata)
{
    int index = *(int *) udata;
    int64_t id = task_id();
    thpaused[index] = id;
    task_sleep(1e6);
    task_pause();
}

task_detach 程式碼分析:

給定 要 detach 的 coroutine id ,找到並加到 task_detached 結構。

void task_detach(int64_t id)
{
    struct task *co = task_map_delete(&task_paused, &(struct task){.id = id});
    if (co) {
        task_npaused--;
        task_lock();
        task_map_insert(&task_detached, co);
        task_ndetached++;
        task_unlock();
    }
}

task_attach 程式碼分析:

給定 要 attach 的 coroutine id ,找到並加到 task_paused 結構。

工程人員該避免說「差不多」,要是有人說林志玲跟令堂的 DNA 序列有 99% 以上相同,就說「差不多」是台灣最美麗的女人,這樣對嗎?

使用精準的說法,避免「舉燭」。

了解。

void task_attach(int64_t id)
{
    task_lock();
    struct task *co = task_map_delete(&task_detached, &(struct task){.id = id});
    if (co) {
        task_ndetached--;
    }
    task_unlock();
    if (co) {
        task_map_insert(&task_paused, co);
        task_npaused++;
    }
}

將上方超連結指向的內容整合到本頁面。

TODO: 擴充上述任務排程器為高效網頁伺服器

第 8 週測驗題提及的任務排程器擴充為網頁伺服器,關於測試和效能分析,見 ktcp
應比較 NGINX、cserv,和本實作的效能表現

透過 fork (cpu 可用數量)個監聽行程 ,並綁定在特定 cpu ,這樣可以達到同時處理多個連線請求。

監聽特定埠號 (port number)

#define PORT 9999

透過 start , stop,開啟跟關閉伺服器

./tcp_server start   # Start the web server.
./tcp_server stop    # Stop the web server.

啟動伺服器

先創建 socket 並將網路協定設定為 IPV4,類型為 TCP。
為了防止 TCP/IP 在關閉 socket 後會處於 TIME-WAIT 狀態,使用 setsockopt 設定 SO_REUSEADDR 選項,好讓我們可以重新綁定 socket。
設定 TCP_CORK ,提高傳輸效率。
使用 bind and listen 將 socket 綁定到特定 port 和 address ,使其成為一個監聽 socket ,準備接受連線請求。

/* Eliminate 'Address already in use' error during the binding process */
    if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (const void *)&optval, sizeof(int)) < 0) {
        perror("Set socket option SO_REUSEADDR failed");
        return -1;
    }

    /* Setting TCP_CORK option */
    if (setsockopt(listenfd, IPPROTO_TCP, TCP_CORK, (const void *)&optval, sizeof(int)) < 0) {
        perror("Set socket option TCP_CORK failed");
        return -1;
    }

建立 cpu 可用數量個行程並綁定在特定 cpu 上面,先取得目前可以使用的 cpu 數量。
sysconf 取得目前可用 cpu 數量。

int num_workers = sysconf(_SC_NPROCESSORS_ONLN);

cpu_set_t
初始化一個 CPU 集合。
將 0-15 號 CPU 添加到集合中。
使用 sched_setaffinity 函数 函式將行程的 CPU 親和力設定為指定的 CPU 集合。

務必使用本課程教材規範的詞彙!
不要捨近求遠,回去看教材。

建立 coroutine 去循環接受請求。

void create_workers(int listenfd, int num_workers) {

    for (int i = 0; i < num_workers; i++) {
        pid_t pid = fork();
        if (pid == 0) {  // Child
            cpu_set_t cpuset;
            CPU_ZERO(&cpuset);
            CPU_SET(i % num_workers, &cpuset);
            if (sched_setaffinity(0, sizeof(cpu_set_t), &cpuset) < 0) {
                perror("Could not set CPU affinity");
                exit(1);
            }
            quick_start(worker_process_cycle, co_cleanup, &(int){listenfd});
            
        } else if (pid > 0) {  // Parent
            pids[i] = pid;
            printf("Child process %d created and assigned to CPU %d\n", pid, i % num_workers);
        } else {
            perror("Fork failed");
        }
    }
    create_pidfile(PID_FILE, pids, num_workers);
    // Parent process waits for children to exit
    while (wait(NULL) > 0);
}

確認行程是否有綁定在特定 cpu 。

$ ps -ef | grep tcp_serv
simon      76329   47602  0 00:12 pts/0    00:00:00 ./tcp_server start
simon      76330   76329  0 00:12 pts/0    00:00:00 ./tcp_server start
simon      76331   76329  0 00:12 pts/0    00:00:00 ./tcp_server start
simon      76332   76329  0 00:12 pts/0    00:00:00 ./tcp_server start
simon      76333   76329  0 00:12 pts/0    00:00:00 ./tcp_server start
simon      76334   76329  0 00:12 pts/0    00:00:00 ./tcp_server start
simon      76335   76329  0 00:12 pts/0    00:00:00 ./tcp_server start
simon      76336   76329  0 00:12 pts/0    00:00:00 ./tcp_server start
simon      76337   76329  0 00:12 pts/0    00:00:00 ./tcp_server start
simon      76338   76329  0 00:12 pts/0    00:00:00 ./tcp_server start
simon      76339   76329  0 00:12 pts/0    00:00:00 ./tcp_server start
simon      76340   76329  0 00:12 pts/0    00:00:00 ./tcp_server start
simon      76341   76329  0 00:12 pts/0    00:00:00 ./tcp_server start
simon      76342   76329  0 00:12 pts/0    00:00:00 ./tcp_server start
simon      76343   76329  0 00:12 pts/0    00:00:00 ./tcp_server start
simon      76344   76329  0 00:12 pts/0    00:00:00 ./tcp_server start
simon      76345   76329  0 00:12 pts/0    00:00:00 ./tcp_server start
$ ./test.sh 
pid 76330's current affinity list: 0
pid 76331's current affinity list: 1
pid 76332's current affinity list: 2
pid 76333's current affinity list: 3
pid 76334's current affinity list: 4
pid 76335's current affinity list: 5
pid 76336's current affinity list: 6
pid 76337's current affinity list: 7
pid 76338's current affinity list: 8
pid 76339's current affinity list: 9
pid 76340's current affinity list: 10
pid 76341's current affinity list: 11
pid 76342's current affinity list: 12
pid 76343's current affinity list: 13
pid 76344's current affinity list: 14
pid 76345's current affinity list: 15

worker_process_cycle 負責接受請求,handle_client 處理請求並給予回應。
accept 是一個 Blocking I/O,它會一直在這裡阻塞直到收到請求。
收到請求之後會建立 coroutine 去處理請求並給予回應。

注意書寫規範:

  • 使用 lab0 規範的程式碼書寫風格,務必用 clang-format 確認一致

收到

void worker_process_cycle(void *udata) {
  int listenfd = *(int *)udata;
  int connfd;
  struct sockaddr_in client_addr;
  socklen_t client_addr_len = sizeof(client_addr);

  // accept cilent request
  while (1) {
    connfd =
        accept(listenfd, (struct sockaddr *)&client_addr, &client_addr_len);
    if (connfd < 0) {
      perror("Accept failed");
      continue;
    }
    quick_start(handle_client, co_cleanup, &(int){connfd});
  }
}
void handle_client(void *udata) {
  int connfd = *(int *)udata;
  char buffer[BUFFER_SIZE];
  int bytes_read;

  // read client request
  bytes_read = read(connfd, buffer, BUFFER_SIZE - 1);
  if (bytes_read < 0) {
    perror("Failed to read from socket");
    close(connfd);
    return;
  }

  // build http response
  char *response = "HTTP/1.1 200 OK\r\n"
                   "Content-Type: text/html\r\n"
                   "Content-Length: 13\r\n"
                   "\r\n"
                   "Hello, world!";

  // send http response to client
  write(connfd, response, strlen(response));

  // close client connect
  close(connfd);
}

關閉伺服器

在開啟伺服器之後,會先將 fork 的子行程,紀錄在 tmp/tcp_server.pid ,這樣當要停止的時候才可以透過 read_pidfile 將所有子行程停止。

注意用語,務必使用本課程教材規範的術語!

create_worker 中如果是親代行程 (pid > 0),會將子行程的 pid 紀錄在 pids[] ,在使用 create_pidfile 將值存入目錄裡面。

注意書寫規範:

  • 使用 lab0 規範的程式碼書寫風格,務必用 clang-format 確認一致
int main(int argc, char **argv) {
  /*...*/
  if (!strcmp(argv[1], "stop")) {
    send_signal(SIGTERM);
    printf("cserv: stopped.\n");
    return 0;
  }
  /*...*/
  return 0;
}

static void send_signal(int signo) {
  int num_pids = 0;
  read_pidfile(PID_FILE, pids, &num_pids);
  for (int i = 0; i < num_pids; i++) {
    if (kill(pids[i], signo) == 0) {
      printf("Stopped process %d successfully.\n", pids[i]);
    } else {
      perror("Failed to stop the process");
    }
  }
  unlink(PID_FILE);
}

而親代行程會在 create_workers 裡面等到所有子行程都停止才會關閉 listenfd 並停止。

void create_workers(int listenfd, int num_workers) {  
  /*...*/
  create_pidfile(PID_FILE, pids, num_workers);
  // Parent process waits for children to exit
  while (wait(NULL) > 0);
}

解讀實驗結果。

使用 coroutine 去接收請求還有處理請求

fork 完子行程之後,會去產生一個 coroutine 去接收請求,而在接受到請求之後會在產生一個 coroutine 去處理請求。

注意書寫規範:

  • 使用 lab0 規範的程式碼書寫風格,務必用 clang-format 確認一致
void create_workers(int listenfd, int num_workers) {
  for (int i = 0; i < num_workers; i++) {
    pid_t pid = fork();
    if (pid == 0) { // Child
      /*...*/
      quick_start(worker_process_cycle, co_cleanup, &(int){listenfd});

    } /*...*/
  }
  /*...*/
}
void worker_process_cycle(void *udata) {
  /*...*/
  // accept cilent request
  while (1) {
    connfd =
        accept(listenfd, (struct sockaddr *)&client_addr, &client_addr_len);
    /*...*/
    quick_start(handle_client, co_cleanup, &(int){connfd});
  }
}

壓力測試與比較

工具

比較對象

htstress
100000 requests with 500 requests at a time(concurrency)

 ./htstress -n 100000 -c 500 http://localhost:9999/
0 requests
10000 requests
20000 requests
30000 requests
40000 requests
50000 requests
60000 requests
70000 requests
80000 requests
90000 requests

requests:      100000
good requests: 100000 [100%]
bad requests:  0 [0%]
socket errors: 0 [0%]
seconds:       2.263
requests/sec:  44194.207
request/sec seconds
apache 37318.529 2.680
nginx 45460.600 2.200
cserv 46730.129 2.140
tcp_server 44194.207 2.263

stress test

解釋實驗結果並量化分析。

從實驗結果中可以得出,在使用並行程式去處理並行的連線請求可以達到不錯的效果

使用精準描述,注意 I/O multiplexing

TODO: 提出進一步效能提升的方案

善用 perf, Ftrace 等工具,找出效能瓶頸,並持續提升效能。

使用 perf 可以看到在關閉連線佔據大量 cpu cycle

+   31.66%     0.19%  tcp_server  libc.so.6         [.] __close             

引入 keep-alive 機制,減少反覆建立和關閉連線的成本。