---
tags: concurrency
---
# [並行程式設計](https://hackmd.io/@sysprog/concurrency): 排程器原理
==[解說錄影 (上)](https://youtu.be/Y2uYOsRev7o)==
==[解說錄影 (下)](https://youtu.be/kZAq41Y3-cw)==
## 緣起
本講座嘗試運用 Linux 系統呼叫,建構使用者空間的任務排程器,探討 coroutine、執行緒、行程,及排程器的原理,隨後該任務排程器會擴充為支援搶佔式多工的實作,後者將進一步擴充以對應到多核處理器和 Linux 核心 CPU 排程器的原理。本講座以「做中學」的手法,讓參與者得以兼顧作業系統原理和排程器的具體設計議題。
## 探究排程器
> 搭配 [Scheduling Internals](https://tontinton.com/posts/scheduling-internals/) 及 [Using the Thread Scheduler](https://www.qnx.com/developers/docs/6.5.0SP1.update/com.qnx.doc.adaptive_partitioning_en_user_guide/aps_details.html) 學習。
![fiber](https://hackmd.io/_uploads/Bym_WJGpp.png)
Wikipedia: [Fiber](https://en.wikipedia.org/wiki/Fiber_(computer_science))
進階的手法可用 TLS (thread-local storage),見[並行程式設計: 建立相容於 POSIX Thread 的實作](https://hackmd.io/@sysprog/concurrency-thread-package)
> [學習實作小型作業系統](https://hackmd.io/@tina0405/S1Ab0A9pX)
> [Re: [問卦] 精通作業系統對Coding有什麼幫助?](https://www.ptt.cc/man/Gossiping/D601/DBC0/DC08/DAEC/M.1587751043.A.28E.html)
## [coroutine](https://en.wikipedia.org/wiki/Coroutine) 並非新概念
早期電腦僅有單一 CPU (甚至沒有 microprocessor 技術),但那時的作業系統就有多任務並行的需求:當一個任務正在執行時,另一個任務到來,於是作業系統 (早期的術語稱為 monitor) 就會判斷是否切換到新的任務,若是,作業系統會保存目前的 context,然後切換到另一個任務。
[coroutine](https://en.wikipedia.org/wiki/Coroutine) 於 1958 年由 Melvin Conway 提出,引述 [Wikipidia](https://en.wikipedia.org/wiki/Coroutine) 的描述:
> Coroutines are computer program components that generalize subroutines for non-preemptive multitasking, by allowing execution to be suspended and resumed. Coroutines are well-suited for implementing familiar program components such as cooperative tasks, exceptions, event loops, iterators, infinite lists and pipes.
延伸閱讀: [UNIX 作業系統 fork/exec 系統呼叫的前世今生](https://hackmd.io/@sysprog/unix-fork-exec)
一圖勝千語:
![image](https://hackmd.io/_uploads/S1T01TZ6T.png)
將函式呼叫從原本的面貌: [ [source](https://www.csl.mtu.edu/cs4411.ck/www/NOTES/non-local-goto/coroutine.html) ]
![image](https://hackmd.io/_uploads/HyHJxa-aa.png)
轉換成以下:
![image](https://hackmd.io/_uploads/r13xga-6a.png)
除了上述所列的實用情境外,也用於網頁伺服器和虛擬機器等領域。而關於 preemptive multitasking ,請不要侷限於定義上,在其他實作中,coroutine 也可藉由 [signal](https://man7.org/linux/man-pages/man7/signal.7.html) 等相關操作實作 [preemptive](https://www.ptt.cc/bbs/C_and_CPP/M.1584751926.A.0D6.html) 。
在分類上,coroutine 分成 stackful 和 stackless 兩種。一般來說 stackful 可自行切換 subroutine ,而 stackless 需要提供 `suspend` 和 `resume` 等功能自行手動切換。然而,stackful 需要有配置記憶體空間儲存 stack 等資訊,在 context switch 上會比 stackless 沒有效率(需要考量複製現存資訊、cache locality 等議題)。在 [C++20](https://en.cppreference.com/w/cpp/language/coroutines) ,C++ 提供 coroutine 的功能,而在 2014 年時,有一份關於 coroutine 新增至 C++ Standard Library 的報告,可參見:[Stackful Coroutines and Stackless Resumable Functions](https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2014/n4232.pdf); 2018 年的報告: [Working Draft, C++ Extensions for Coroutines](https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/n4775.pdf);也可看此部落格所寫的文章:[Stackless vs. Stackful Coroutines](https://blog.varunramesh.net/posts/stackless-vs-stackful-coroutines/)。另外,其他實作案例包含,知名虛擬機器 QEMU 的 [coroutine](https://www.youtube.com/watch?v=2gNszk7OjIY) [[1]](https://www.qemu.org/docs/master/devel/block-coroutine-wrapper.html)[[2]](http://blog.vmsplice.net/2014/01/coroutines-in-qemu-basics.html) 及 Meta 的 folly 中所使用的 [coro](https://github.com/facebook/folly/tree/c2c5b28d3273a24bd4fe5393b3fb728cfe541284/folly/experimental/coro)。甚至 LLVM 或者說 Clang 編譯器也有提供 [builtin coroutine support](https://clang.llvm.org/docs/LanguageExtensions.html#c-coroutines-support-builtins) 。
以非 C/C++ 的語言來說,由 Google 開發的 [Golang](https://en.wikipedia.org/wiki/Go_(programming_language)) 也有 [Goroutine](https://go.dev/tour/concurrency/1) 。除此之外,Google 也有對 [Linux Kernel](https://lwn.net/Articles/863386/) 提出 [User-managed concurrency groups](https://lwn.net/Articles/879398/) ,相當於使用層級執行緒,該實作方式由作業系統進行 context switch 。
coroutine 當中的 context switch 實作方式,可藉由 [setjmp](https://en.cppreference.com/w/c/program/setjmp) / [longjmp](https://man7.org/linux/man-pages/man3/longjmp.3p.html)、[組合語言](https://github.com/hnes/libaco/blob/master/acosw.S) 或 POSIX 的 [ucontext](https://pubs.opengroup.org/onlinepubs/7908799/xsh/ucontext.h.html) 實作。各有利弊需要考量運用情境,例如最簡單的 setjmp / longjmp ,可在 cppreference 的描述可見,對於變數的型態有相關的限制:
> Upon return to the scope of setjmp, all accessible objects, floating-point status flags, and other components of the abstract machine have the same values as they had when longjmp was executed, **except for the non-[volatile](https://en.cppreference.com/w/c/language/volatile) local variables** in the function containing the invocation of setjmp, whose values are indeterminate if they have been changed since the setjmp invocation.
而 [volatile](https://lwn.net/Articles/233479/) 會造成編譯器無法對其變數進行最佳化,而導致執行效率無法提昇。並且,若要在 signal handler 以 setjmp / longjmp 實作,則須考量 glibc 等函式庫所作的安全防範措施。可見 [CS170: Project 2 - User Mode Thread Library (20% of project score)](https://sites.cs.ucsb.edu/~chris/teaching/cs170/projects/proj2.html)。
延伸閱讀: [coroutine 的歷史、現在和未來](https://blog.youxu.info/2014/12/04/coroutine/)
### 為何現在仍有 coroutine 的需求?
任務可概略分為:
* CPU 密集型
* I/O 密集型
早期伺服器為了解決並行需求,採用一個網路連線對應一個行程 (如 Apache HTTP Server,即 `httpd`)。後來 CPU 發展到多核,該架構調整為一個網路連線對應到一個執行緒,或者是事先配置的的 thread pool。
逐漸作業系統和應用程式框架提出 event loop 的開發介面,即 EventLoop + Callback 的方式,該模式存在多個變種,如: loop per thread,但共同特點都是 callback function 中處理事件。隨著軟體規模的擴展,callback 的變得更複雜,不乏有巢狀呼叫 (nested),這對於程式開發和除錯帶來嚴重的負面影響。
[goroutine](https://tour.golang.org/concurrency/1) 的出現帶來新的改變 —— 以同步的方式處理原本非同步執行的程式碼,於是變得更直覺、程式碼易於閱讀且除錯,適合 I/O 密集型任務,這背後的機制就是 coroutine。相較於執行緒,coroutine 佔用的記憶體空間更少,且通常不需要作業系統介入,從而在高度並行的情境可帶來優異的效能表現。
### 案例探討
QEMU 內建 coroutine
* [QEMU 中的協程: qemu-coroutine](https://royhunter.github.io/2016/06/24/qemu-coroutine/)
* [include/qemu/coroutine.h](https://github.com/qemu/qemu/blob/master/include/qemu/coroutine.h)
* [include/qemu/coroutine_int.h](https://github.com/qemu/qemu/blob/master/include/qemu/coroutine_int.h)
* [util/qemu-coroutine.c](https://github.com/qemu/qemu/blob/master/util/qemu-coroutine.c)
* [util/coroutine-sigaltstack.c](https://github.com/qemu/qemu/blob/master/util/coroutine-sigaltstack.c)
* [util/coroutine-ucontext.c](https://github.com/qemu/qemu/blob/master/util/coroutine-ucontext.c)
* [tests/unit/test-coroutine.c](https://github.com/qemu/qemu/blob/master/tests/unit/test-coroutine.c)
LLVM 提供 coroutine 支援
[Coroutine](https://hackmd.io/@YLowy/HkT8-S20D)
FreeRTOS 的 [coroutine](https://www.freertos.org/kernel/coroutines.html)
- [Coroutine in Depth - Context Switch](http://jinke.me/2018-09-14-coroutine-context-switch/)
- [Fibers, Oh My!](https://graphitemaster.github.io/fibers/)
- [Doing multitasking the hard way](https://notnik.cc/posts/async/)
### 實作分析
* [coro](https://github.com/sysprog21/concurrent-programs/tree/master/coro)
* [tinync](https://github.com/sysprog21/concurrent-programs/tree/master/tinync)
* [fiber](https://github.com/sysprog21/concurrent-programs/tree/master/fiber)
* [cserv](https://github.com/sysprog21/cserv)
* [minicoro](https://github.com/edubart/minicoro)
延伸閱讀: [Evolution of the x86 context switch in Linux](https://www.maizure.org/projects/evolution_x86_context_switch_linux/)
## coroutine 實作體驗
### 協同式多工
> [coro](https://github.com/sysprog21/concurrent-programs/tree/master/coro): 使用 setjmp/longjmp
`struct task` 有 2 個成員,分別為用於 `setjmp()` 的 `jmp_buf` `env` 以及用於排程的 `list`。`main` 函式中,`registered_task` 是 函式指標陣列,其中 function 的傳入參數是 generic type void pointer。在 `schedule` 函式中,利用 [ASLR](https://en.wikipedia.org/wiki/Address_space_layout_randomization) 設定亂數種子。接著呼叫 `setjmp(sched)`,每當新的任務加進排程,會呼叫 `longjmp(sched, 1)`,讓 `schedule()` 可繼續將新任務加進排程。
而 `task0` 和 `task1` 的結構一樣,有三大部份:第一部份根據 `schedule()` 設定的參數決定迴圈次數,將 task 加入排程後呼叫 `longjmp(sched, 1)`,讓 `schedule()` 可繼續將新任務加進排程。當所有任務被加入排程後,`schedule()` 會呼叫 ` task_switch()`,並根據 list 的 first entry `longjmp` 回該 task 的 `setjmp` 位置:
```c
jmp_buf env;
static int n;
n = *(int *) arg;
printf("Task 0: n = %d\n", n);
if (setjmp(env) == 0) {
task_add(&tasklist, env);
longjmp(sched, 1);
}
```
第二部份是兩個 task 交互排程,每次執行一個 loop 後,呼叫 `task_add()` 重新將 task 加入 list 的尾端,並呼叫 `task_switch` 指定 list 的 first task 執行:
```c
for (int i = 0; i < n; i++) {
if (setjmp(env) == 0) {
task_add(&tasklist, env);
task_switch(&tasklist);
}
printf("Task 0: resume\n");
}
```
第三部份完成該 task,會呼叫 `longjmp(sched, 1)` 跳到 `schedule()`,接著會執行 `task_join(&tasklist)` 執行尚未執行完的 task:
```c
printf("Task 1: complete\n");
longjmp(sched, 1);
```
亦可使用 `goto` 來達成 coroutine,只是限制較大,參見[你所不知道的 C 語言: goto 和流程控制篇](https://hackmd.io/@sysprog/c-control-flow)。
延伸閱讀: [A Minimal User-Level Thread Package](https://homepage.divms.uiowa.edu/~jones/opsys/threads/)
### 搶佔式多工
> [preempt_sched](https://github.com/sysprog21/concurrent-programs/tree/master/preempt_sched): 使用 `SIGALRM`
> 背景知識: [Linux 核心搶佔](https://hackmd.io/@sysprog/linux-preempt)
其中 `list.h` 取自 [linux-list/include/list.h](https://github.com/sysprog21/linux-list/blob/master/include/list.h)。該 ULT 原理是透過 [SIGALRM signal](https://www.gnu.org/software/libc/manual/html_node/Alarm-Signals.html) 來模擬作業系統的 timer 中斷,設定的時間間隔為 10 ms,於是每 10 ms 觸發模擬的 timer 中斷,帶動 ULT 排程器,實作 round-robin 排程策略。留意到 [trampoline](https://en.wikipedia.org/wiki/Trampoline_(computing)) 的使用:
> Trampolines (sometimes referred to as indirect jump vectors) are memory locations holding addresses pointing to interrupt service routines, I/O routines, etc. Execution jumps into the trampoline and then immediately jumps out, or bounces, hence the term trampoline.
![image](https://hackmd.io/_uploads/Hy5rR3ZT6.png)
> 出處: [Trampoline mirror may push laser pulse through fabric of the Universe](https://arstechnica.com/science/2019/09/trampoline-mirror-may-push-laser-pulse-through-fabric-of-the-universe/)
預期程式執行輸出:
```
[1] sort: begin
[1] sort: start sorting
[2] sort: begin
[2] sort: start sorting
[3] sort: begin
[3] sort: start sorting
[2] sort: end
[3] sort: end
[1] sort: end
```
注意後 3 行的順序可能是 $3 \to 2 \to 1$ 或 $2 \to 3 \to 1$。
程式碼改成 preemptive scheduling,需要借助 timer。參考 [alarm(2)](https://man7.org/linux/man-pages/man2/alarm.2.html) 以及 [ualarm(3)](https://man7.org/linux/man-pages/man3/ualarm.3.html),使得 timer 在指定時間間隔以後送出 `SIGALRM` signal。若沒有進行訊號處理,預設是會使得程式中止,我們需要將上述程式碼 `SIGSEGV` 改為 `SIGALRM`,做對應的訊號處理。
> 注意:若 timeslice 過小,會發生錯誤。
### The very first scheduler
在 linux 核心早期 (v0.01~v2.4.x) 在〈[A brief history of the Linux Kernel’s process scheduler: The very first scheduler, v0.01](https://dev.to/satorutakeuchi/a-brief-history-of-the-linux-kernel-s-process-scheduler-the-very-first-scheduler-v0-01-9e4)〉中稱作 "The very first scheduler",只使用 20 行左右的程式碼。在 v0.01 中,所有 task 用一個陣列表示,同時作為 runqueue (這裡 runqueue 應該是指陣列中所有可執行的 task 的集合) 使用,陣列長度為 64 (即最大容許的 task 數量為 64)。若陣列的某個 entry 為空,則表示成 NULL。
排程器的 timeslice 為 150 ms,而 interval timer 每 10 ms 會發出一次中斷,對應的 handler function 會減少目前執行的 task `Current` 的 timeslice,當 timeslice 歸零時,排程器會從 runqueue 中找出下一個 task。
在後續的版本,timeslice 和 timer 的 interval 都有做一些調整。
### The scheduling algorithm
以下是 Linux v0.01 的排程器策略:
1. 從 runqueue 中以倒序找出第一個可執行且剩餘 timeslice 不為 0 的 task (這邊原文描述有錯誤,應修正為找出 runqueue 中剩餘 timeslice 最大且狀態為 runnable 的 task)。
2. 如果所有可執行的 task 的 timeslice 歸零,則 reset 所有 task 的 timeslice。這裡排程器會給所有 task 150 ms 的 timeslice,並且額外給 sleeping task 一半的 current timeslice (後者是為了要讓剛被喚醒的 task 更早被排程,增加 interactivity)。
假設某一時間陣列的狀態如下,timeslice 的單位是 10 ms:
![image](https://hackmd.io/_uploads/ryMijhZ6T.png)
排程器從 runqueue 中以倒序找出剩餘 timeslice 最大的 task,挑選出 `t1`:
![image](https://hackmd.io/_uploads/SkuojhZT6.png)
`t1` 一直執行直到 timeslice 用完:
![image](https://hackmd.io/_uploads/SyRiohbTp.png)
同理,排程器從 runqueue 中以倒序找出剩餘 timeslice 最大的 task。如果 runqueue 中有多個 tasks 同時擁有最大的剩餘 timeslice,則選取第一個找到的 task,因此挑選出 `t2`:
![image](https://hackmd.io/_uploads/Skzdh3Z6T.png)
最後,所有可執行的 task 都用盡自己的 timeslice:
![image](https://hackmd.io/_uploads/SkH2o3ZTT.png)
則 reset 所有 task 的 timeslice,runnable 的 task 增加 150 ms 的 timeslice,sleep 的 task 增加 150 ms 加上原本剩餘 timeslice 一半的 timeslice (`t4` 為 sleep,reset 後的 timeslice 為 150 + 120 / 2 = 210):
![image](https://hackmd.io/_uploads/Bkkai2ZTT.png)
相關程式碼可在 [v0.01/include/linux/sched.h](https://kernel.googlesource.com/pub/scm/linux/kernel/git/nico/archive/+/v0.01/include/linux/sched.h) 以及 [v0.01/kernel/sched.c](https://kernel.googlesource.com/pub/scm/linux/kernel/git/nico/archive/+/v0.01/kernel/sched.c) 找到:
```c
// include/linux/sched.h
#define NR_TASKS 64
#define FIRST_TASK task[0]
#define LAST_TASK task[NR_TASKS-1]
```
```c=
// kernel/sched.c
/*
* 'schedule()' is the scheduler function. This is GOOD CODE! There
* probably won't be any reason to change this, as it should work well
* in all circumstances (ie gives IO-bound processes good response etc).
* The one thing you might take a look at is the signal-handler code here.
*
* NOTE!! Task 0 is the 'idle' task, which gets called when no other
* tasks can run. It can not be killed, and it cannot sleep. The 'state'
* information in task[0] is never used.
*/
void schedule(void)
{
int i, next, c;
struct task_struct ** p;
/* check alarm, wake up any interruptible tasks that have got a signal */
for(p = &LAST_TASK ; p > &FIRST_TASK ; --p)
if (*p) {
if ((*p)->alarm && (*p)->alarm < jiffies) {
(*p)->signal |= (1<<(SIGALRM-1));
(*p)->alarm = 0;
}
if ((*p)->signal && (*p)->state==TASK_INTERRUPTIBLE)
(*p)->state=TASK_RUNNING;
}
/* this is the scheduler proper: */
while (1) {
c = -1;
next = 0;
i = NR_TASKS;
p = &task[NR_TASKS];
while (--i) {
if (!*--p)
continue;
if ((*p)->state == TASK_RUNNING && (*p)->counter > c)
c = (*p)->counter, next = i;
}
if (c)
break;
for(p = &LAST_TASK ; p > &FIRST_TASK ; --p)
if (*p)
(*p)->counter = ((*p)->counter >> 1) + (*p)->priority;
}
switch_to(next);
}
```
`task` 是一個指向 `struct task_struct` 的指標的陣列,大小為 64,`FIRST_TASK` 和 `LAST_TASK` 巨集分別代表陣列的頭和尾。
程式碼第 17 到 25 行將所有 interruptible 並且有收到 signal 的 task 喚醒,從 waiting 狀態變成 running 的狀態 (注意,這裡的 running 狀態不代表執行的狀態,而是 runnable 的狀態,即 ready to run)。
`jiffies` 的實作在在 [kernel/system_call.s:158](https://kernel.googlesource.com/pub/scm/linux/kernel/git/nico/archive/+/v0.01/kernel/system_call.s)。該檔案除了 system-call low-level handling routines,也包括 timer-interrupt handler。其中有使用到 `incl _jiffies` 指令,用來計算 timer interrupt 發生的次數。
程式碼第 27 行開始即是排程器本身。程式碼第 32 到 37 對應到上面排程器策略的第 1 點,以倒序找出 runqueue 中剩餘 timeslice 最大的 task,並用 `next` 代表該 task 的編號 (慣用 nr)。如果所有 runnable 的 task 的 timeslice 用光,會執行第 40 ~ 42 行的程式,對應到排程器策略的第 2 點。這裡是增加 `(*p)->priority` 的 timeslice,並非固定 150 ms 的 timeslice,而可調整:
```c
int sys_nice(long increment)
{
if (current->priority-increment > 0)
current->priority -= increment;
return 0;
}
```
## neco
[neco](https://github.com/tidwall/neco) 是個 C 語言撰寫的函式庫,藉由 coroutine 提供並行處理功能。它體積小且速度快,旨在降低並行 I/O 和網路程式設計的難度。建構於以下元件:
* [llco](https://github.com/tidwall/llco): low-level coroutine
* [sco](https://github.com/tidwall/sco): coroutine scheduler
特色:
* coroutine: 啟動、休眠、暫停、恢復、建立和收合
* 同步處理:channels, generators, mutexes, condition variables, 和 waitgroups
* coroutine 生命週期由內建的排程器管理
* 快速的使用者層級的上下文切換。
* stackful coroutine,可支援巢狀執行
詳情見 [Neco C API](https://github.com/tidwall/neco/blob/main/docs/API.md)。
[sco](https://github.com/tidwall/sco) 運用 C11 Atomics 來確保並行的程式碼得以正確運作,請對照閱讀 [並行程式設計: Atomics 操作](https://hackmd.io/@sysprog/concurrency-atomics)。
[bluebox](https://github.com/tidwall/bluebox) 是個迷你的 Redis 實作,旨在展示 [neco](https://github.com/tidwall/neco) 的功能,支持 SET、GET、DEL 和 PING 操作。較單執行緒的 Redis 有更高的資料吞吐量和更低的延遲。
## 混合使用者和核心層級執行緒的模式
![image](https://hackmd.io/_uploads/rJn1pQbW0.png)
[M:N userspace threading subsystem backed](https://lkml.org/lkml/2020/7/22/1204)
video: [User-level threads....... with threads](https://youtu.be/KXuZi9aeGTw)
[ghOSt: Fast & Flexible User-Space Delegation of Linux Scheduling](https://dl.acm.org/doi/10.1145/3477132.3483542)
* [video](https://youtu.be/j4ABe4dsbIY)
* source: [ghost-kernel](https://github.com/google/ghost-kernel), [ghost-userspace](https://github.com/google/ghost-userspace)
> ghOSt 透過新增一優先序低於 CFS 的 scheduling class (考量為 agent crash 時還有 CFS 作為 fallback 排程器),使 workload 在此 class 中做排程。而排程相關邏輯取決於 userspace ghOSt agent 的實作。agent 透過 syscall 以及 BPF 與向 kernel 傳達排程決策,kernel 則透過 shared memory (ringbuffer) 傳遞 agent 感興趣的資訊 (例如 task 建立、task 阻塞、timer tick 等)。
> ghOSt 也使用到 BPF,其中一個例子是在 CPU 即將 idle 時所呼叫的 pick_next_task_ghost() 函式中掛載 BPF,並透過 BPF map 取得 agent 存放的 runnable task 來執行。此外,agent 也可以針對需求設計自己的 BPF program,增加排程決策的靈活性。
## Go 語言的多工特徵
[goroutine](https://tour.golang.org/concurrency/1) 是 Go 程式語言中的使用者層級的執行緒 (user-level thread, ULT),在語言層級支援多工處理,範例程式碼:
```go
func main() {
go say("world")
say("hello")
}
```
示意圖:
![image](https://hackmd.io/_uploads/ryfg37-WC.png)
> `say("world")` 執行於 goroutine 上,達到並行
換言之,goroutine 建立一組 ULT,語法:
```go
go f(x, y, z)
```
以 `go` 開頭的函式呼叫可使 `f` 執行在另一個 goroutine,其中 `f`, `x`, `y`, `z` 取自目前的 goroutine,注意到 `main` 函式也是執行於 goroutine 上。當名為 `main` 的 goroutine 執行結束,其他 goroutine 會跟著關閉。
多執行緒環境中,經常需要處理的是執行緒之間的狀態管理,其中一個常見的操作是等待 (wait),例如執行緒 `A` 需要等執行緒 `B` 運算並取得資料後,方可繼續執行。伴隨著 [goroutine](https://tour.golang.org/concurrency/1) 存在,還有個名為 [channel](https://tour.golang.org/concurrency/2) 的機制,最初是 goroutine 之間通訊的用途,但因其 blocking 的特性,也可作為等待 goroutine 的用途。
程式碼範例:
```go
func say(s string, c chan string) {
for i := 0; i < 5; i++ {
time.Sleep(100 * time.Millisecond)
fmt.Println(s)
}
c <- "FINISH"
}
func main() {
ch := make(chan string)
go say("world", ch)
go say("hello", ch)
<-ch
<-ch
}
```
![image](https://hackmd.io/_uploads/HJax3Xb-A.png)
> 系統執行二個 goroutine,即 `say("world", ch)` 和 `say("hello", ch)`,因此需要等待 2 個 `FINISH` 推入 Channel 中,才能結束 main goroutine。
Go channel 區分「讀」和「寫」的操作:
```cpp
Write(chan<- int)
Read(<-chan int)
ReadWrite(chan int)
```
兩者的區分就看 `<-` 符號放在哪邊: `chan<-` 指向自己就是「寫」,`<-chan` 離開自己就是「讀」。
[channel](https://github.com/sysprog21/concurrent-programs/tree/master/channel) 透過 C11 Atomics 和 Linux 核心提供的 [futex](https://man7.org/linux/man-pages/man2/futex.2.html) 系統呼叫,模擬 Go 程式語言的 [goroutine](https://tour.golang.org/concurrency/1) 及 [channel](https://tour.golang.org/concurrency/2) 機制。
## 應用 coroutine 到事件驅動的場景
[NGINX 開發日誌](https://www.nginx.com/blog/thread-pools-boost-performance-9x/)中以送貨員以及排隊等候的機制去看待這個問題
![](https://hackmd.io/_uploads/ByZBMI7d2.png)
假設有一堆顧客,他們需要取貨,而有些貨物(1, 2, 3 號箱) 近在眼前,同時也很多貨物存在遙遠的倉庫中,此時注意這邊這位工作者 (worker) 只會盲目的遵從目前顧客的要求,並且服務目前的顧客,假設有一個顧客要求比較遠的 4 號箱子,如此一來造成護衛效應,擋住後面也許有機會服務更快的顧客。
讓我們再想想另外一個情形,當這個 worker 學乖了,他利用一個送貨區,當每次有太遠的貨物需要取貨時,他並不會自己慢慢跑過去,而是一通電話叫送貨人員送來。如此一來就可更快的服務到其他顧客,與此同時目前顧客也不會因服務其他人而導致他的任務沒有進展。
![](https://hackmd.io/_uploads/BJGXXUQdn.png)
而 NGINX 的 worker 也一樣,當一個進程可能需要很長的操作時間時,他不會自己處理,而是將這個任務放進 TaskQueue 中,而任何空閒的 Thread 都可將此 queue 中的任務取出來執行。
TasksQueue 以及 Thread Pool 在這邊扮演的角色就是送貨員以及送貨車,Taskqueue 會接收由 worker process 來的工作,處理完之後再把結果送回 worker process 中。
![](https://hackmd.io/_uploads/SkgcLwlDn.png)
為驗證這個事實,可先使用以下命令
```shell
$ sudo apt-get install nginx
$ ps -ef --forest | grep nginx
# output:
# root 920 1 0 Fri 30 ? 00:00:00 nginx: master process /usr/sbin/nginx -g daemon on; master_process on;
# www-data 922 920 0 Fri 30 ? 00:00:00 \_ nginx: worker process
# www-data 923 920 0 Fri 30 ? 00:00:40 \_ nginx: worker process
# www-data 925 920 0 Fri 30 ? 00:00:07 \_ nginx: worker process
# www-data 926 920 0 Fri 30 ? 00:00:29 \_ nginx: worker process
# www-data 928 920 0 Fri 30 ? 00:00:00 \_ nginx: worker process
# www-data 929 920 0 Fri 30 ? 00:00:00 \_ nginx: worker process
# www-data 931 920 0 Fri 30 ? 00:00:00 \_ nginx: worker process
# www-data 932 920 0 Fri 30 ? 00:00:00 \_ nginx: worker process
# www-data 933 920 0 Fri 30 ? 00:00:00 \_ nginx: worker process
# www-data 934 920 0 Fri 30 ? 00:00:00 \_ nginx: worker process
# www-data 935 920 0 Fri 30 ? 00:00:00 \_ nginx: worker process
# www-data 936 920 0 Fri 30 ? 00:00:00 \_ nginx: worker process
```
數一下的確可發現 `nginx: worker process` 存在於 `nginx: master process` 底下。
參照[事件驅動伺服器:原理和實例](https://hackmd.io/@sysprog/event-driven-server)對於 nginx 的介紹
- 組成
nginx 主要由一個 Master process 開始做初始化,包含 worker 建立與管理。而 worker 一旦建立且初始化, Master Process 會將其綁定在特定的 CPU 上,當其 Process 被終止或 crash 被 Master Process 發現則可回收其資源。
- 工作
其 worker 主要負責監測事件發生,也就是 epoll,當監聽到事件發生, worker 就會將任務丟至 queue 裡面,thread pool 裡的 thread 則負責從 queue 中領取任務並 lock 住避免任務被重複領取,執行完成後就把任務放置 finish queue,這邊就類似 io_uring 的 SQ(submission queue) 與 CQ(complete queue)
- Signal
Process 間的溝通使用 signal,nginx 在 [ngx_process.c](https://github.com/nginx/nginx/blob/4b0266174814e6cf60a275321121dbaab084ee64/src/os/unix/ngx_process.c) 定義 signal 種類與其 signal_headler,但經過 `grep -r ngx_signal_handler`,當接收到訊號,就會觸發對應的 signal_handler,讓某些跟 Signal 相關的數值發生改變,而 [ngx_process_cycle.c](https://github.com/nginx/nginx/blob/master/src/os/unix/ngx_process_cycle.c) 內的 `ngx_master_process_cycle` 和 `ngx_worker_process_cycle` 會檢查這些數值去做對應的動作。
### 利用 Coroutine 實作 Event loop
[lwan](https://lwan.ws/) 利用 coroutine 實作 event loop,主要結構還是 `epoll_wait`,但在監聽到 event 發生時處理 I/O 事件時會 resume 至目標連線的 corotine,這時處理 I/O 結束時會有以下狀況:
- 結束連線
- 完成 I/O 處理後 keep alive
第一個狀況監聽到處理完就會直接結束,但第二個 coroutine 機制就會有明顯的效果,在 close 前處於等待時就可 yield 至 event loop 繼續處理監聽到的 I/O 事件。
```graphviz
digraph demo{
rankdir=TB;
ranksep=0;
splines=false;
{
node[shape=plaintext group=g0 width=0];
1[label=""];
2[label=""];
3[label=""];
4[label=""];
5[label=""];
6[label=""];
7[label=""];
8[label=""];
emp[label=""];
1->emp->2->3->4->5->6->7->8[penwidth=0 arrowsize=0];
};
{
node[shape=rect group=g1 width=0.2];
main[label="main loop" width=2];
main1[label=""];
main2[label=""];
main3[label=""];
main4[label=""];
main->main1
main1->main2->main3->main4[style=dashed arrowsize=0];
};
{
node[shape=rect group=g2 width=0.2];
conn1[label="connection 1" width=2];
conn1_coro1[label=""];
conn1_coro2[label=""];
conn1->conn1_coro1->conn1_coro2[style=dashed arrowsize=0];
};
{
node[shape=rect group=g3 width=0.2];
conn2[label="connection 2" width=2];
conn2_coro1[label=""];
conn2->conn2_coro1[style=dashed arrowsize=0];
}
{
main1:se->conn1_coro1:nw[constrait=true];
conn1_coro1:sw->main2:ne;
main2:se->conn2_coro1:nw;
conn2_coro1:sw->main3:ne;
main3:se->conn1_coro2:nw;
conn1_coro2:sw->main4:ne;
}
{
{rank=same 1 main conn1 conn2};
{rank=same 2 main1};
{rank=same 3 conn1_coro1};
{rank=same 4 main2};
{rank=same 5 conn2_coro1};
{rank=smae 6 main3};
{rank=same 7 conn1_coro2};
{rank=same 8 main4};
};
}
```
```graphviz
digraph state{
rankdir=LR
{
node[shape=plaintext]
1[label=""]
2[label=""]
1->Resume;
2->Yeild[dir=back];
}
}
```
> 參考 [Life of a HTTP request, as seen by my toy web server](https://tia.mat.br/posts/2014/10/06/life_of_a_http_request.html#coroutines) 的示意圖
在 [lwan-thread.c](https://github.com/lpereira/lwan/blob/master/src/lib/lwan-thread.c) 裡可看到 `lwan_thread_io_loop`
```c
for (;;) {
int timeout = turn_timer_wheel(&tq, t, epoll_fd);
int n_fds = epoll_wait(epoll_fd, events, max_events, timeout);
bool created_coros = false;
if (UNLIKELY(n_fds < 0)) {
if (errno == EBADF || errno == EINVAL)
break;
continue;
}
for (struct epoll_event *event = events; n_fds--; event++) {
struct lwan_connection *conn = event->data.ptr;
assert(!(conn->flags & CONN_ASYNC_AWAIT));
...
if (!conn->coro) {
if (UNLIKELY(!spawn_coro(conn, &switcher, &tq))) {
send_last_response_without_coro(t->lwan, conn, HTTP_UNAVAILABLE);
continue;
}
created_coros = true;
}
resume_coro(&tq, conn, epoll_fd);
}
if (created_coros)
timeouts_add(t->wheel, &tq.timeout, 1000);
}
```
在 event loop 中,會先檢查 `conn->coro` 是否建立,若沒有建立則會呼叫 `spawn_coro` 幫連線建立一個 coroutine,有了 coroutine 之後便會 resume 至連線處理事件。
### 建立 connect structure 與 Accept connection
在 lwan 在剛開始會建立一個 array of connection structure,這個陣列是依照可接受連線最大數量 `SOMAXCONN` 決定陣列大小。
```c
static int backlog_size;
static void init_backlog_size(void)
{
#ifdef __linux__
FILE *somaxconn;
somaxconn = fopen("/proc/sys/net/core/somaxconn", "re");
if (somaxconn) {
int tmp;
if (fscanf(somaxconn, "%d", &tmp) == 1)
backlog_size = tmp;
fclose(somaxconn);
}
#endif
if (!backlog_size)
backlog_size = SOMAXCONN;
}
```
`SOMAXCONN` 在 linux 中可在 `/proc/sys/net/core/somaxconn` 看到系統預設值
```shell
$ cat /proc/sys/net/core/somaxconn
4096
```
其他若沒有 `somaxconn`,lwan 在 [socket.h](https://github.com/lpereira/lwan/blob/master/src/lib/missing/sys/socket.h) 預設 `SOMAXCONN` 預設 128。
```c
ALWAYS_INLINE int
lwan_connection_get_fd(const struct lwan *lwan, const struct lwan_connection *conn)
{
return (int)(intptr_t)(conn - lwan->conns);
}
```
而且在文中提到取得 fd 的方法是「目標 `conn` 的位址 - `conn` 的起始位址」,在 lwan 連線建立時回傳的 fd 會當成 `conn` 的 index 來使用。
### handler trie
針對不同種類的 request 有不一樣的 handler,這些 handler 會在 main function 定義好,[lwan](https://lwan.ws/) 裡將 request handler search 使用 [trie](https://en.wikipedia.org/wiki/Trie) 結構實作出來。
Trie 的結構與 huffman 編碼類似,但 trie 在 node 之間的走訪是依據 char 的 ASCII code 進行走訪,只要輸入特定的 prefix (string 型態),就可沿著 prefix 找到我們要找的 output leaf,而 Time Complexiy 保證為 $O(n)$,其中 n 為 prefix 長度,這樣的架構也在某些情境可取代 hash table(比較不會發生碰撞)。
lwan lookup handler 的原理是給定一個特定的 prefix,接下來照著 prefix 往下存取到 leaf,其 leaf 就有我們所要找的 handler function。在 trie 的結構上 lwan 將 node 設計成以下結構。
```c
struct lwan_trie_node {
struct lwan_trie_node *next[8];
struct lwan_trie_leaf *leaf;
int ref_count;
};
```
lwan 為了符合 x86_64 cacheline 的長度,每個節點最多只給 8 個,這樣剛好一個 node 填滿一個 cacheline。
再者根據 lwan 原本的設計 [256 pointers per node](https://github.com/lpereira/lwan/blob/b2c9b37e63c7ffedfcbd00c25349ab9501dc4985/lwan-trie.c#L27-L31) 對虛擬記憶體負擔太大,因此將 array 縮減成 8 pointers per node。
然而可在 [lwan-trie.c](https://github.com/lpereira/lwan/blob/master/src/lib/lwan-trie.c) 裡的 function 發現,index 是根據 `*key & 7` 決定的(key 為 prefix)
```c
for (knode = &trie->root; *key; knode = &node->next[(int)(*key++ & 7)])
GET_NODE();
```
> 位於 `lwan_trie_add()` 函式
這樣的設計雖說可能會發生碰撞,但該設計較符合硬體架構。
## 針對 Arm64,實作原生的 [coroutine](https://github.com/lpereira/lwan/blob/master/src/lib/lwan-coro.c)
> 相關 [pull request](https://github.com/lpereira/lwan/pull/356)
不依賴 [ucontext](https://pubs.opengroup.org/onlinepubs/7908799/xsh/ucontext.h.html),在 lwan 既有的 coro 程式碼提供 Arm64 組合語言實作,比對 ucontext 和自行實作的效能落差,過程中應當要留意相關 ABI 規範。
> 參照 [minicoro](https://github.com/edubart/minicoro) 實作
lwan 的 coroutine 是讓整個網頁伺服器輕量化重要的一環,藉由 coroutine 與 non-blocking 機制使得整個 request 處理簡化,以下方程式為例:
```c
while (true) {
struct lwan_request_parser_helper helper = {
.buffer = &buffer,
.next_request = next_request,
.error_when_n_packets = error_when_n_packets,
.header_start = header_start,
};
struct lwan_request request = {.conn = conn,
.global_response_headers = &lwan->headers,
.fd = fd,
.response = {.buffer = &strbuf},
.flags = flags,
.proxy = &proxy,
.helper = &helper};
...
if (next_request && *next_request) {
conn->flags |= CONN_CORK;
if (!(conn->flags & CONN_EVENTS_WRITE))
coro_yield(coro, CONN_CORO_WANT_WRITE);
} else {
conn->flags &= ~CONN_CORK;
coro_yield(coro, CONN_CORO_WANT_READ);
}
/* Ensure string buffer is reset between requests, and that the backing
* store isn't over 2KB. */
lwan_strbuf_reset_trim(&strbuf, 2048);
/* Only allow flags from config. */
flags = request.flags & (REQUEST_PROXIED | REQUEST_ALLOW_CORS | REQUEST_WANTS_HSTS_HEADER);
next_request = helper.next_request;
}
coro_yield(coro, CONN_CORO_ABORT);
__builtin_unreachable();
```
> 位於 [process_request_coro](https://github.com/lpereira/lwan/blob/b1119340cf43c816e8f9518758a4a9b4515860a8/src/lib/lwan-thread.c#L363-L445) 函式
此函式是讀取並處理 request 的過程,當 `epoll_wait` 監測到連線有資料進來便會 `coro_resume` 至此連線的 `process_request_coro`,當讀取或寫入結束後需等待對方訊息,就可先 `coro_yield` 回 main loop 繼續監聽,當下一次再監聽到跳至此 task 時,就會從上次 yield 的位置繼續執行,就不用擔心重複執行 loop 前面的一些程式碼太多次。
lwan 裡 x86_64 `coro_swapcontext` 與 [minicoro 的 `_mco_switch`](https://github.com/edubart/minicoro/blob/8673ca62ed938c0b436bc2a548f172865f65bf1d/minicoro.h#L688-L740) 作法相似,只有在函式與 `struct` 的定義上有差異。
```c
typedef uintptr_t coro_context[10];
```
lwan 不像 minicoro 一樣建立一個 struct 紀錄各個暫存器的值,而是使用 `uintptr_t` 的陣列存取暫存器的值,`uintptr_t` 與暫存器一樣是看處理器位元數而定,因此可完整紀錄處理器的值。
因此想要實作 Arm64 的原生 coroutine,可參見 [minicoro 針對 aarch64 linux 之 `_mco_switch`](https://github.com/edubart/minicoro/blob/8673ca62ed938c0b436bc2a548f172865f65bf1d/minicoro.h#L1091-L1134) 與 [aapcs64](https://github.com/ARM-software/abi-aa/blob/2982a9f3b512a5bfdc9e3fea5d3b298f9165c36b/aapcs64/aapcs64.rst) 裡針對暫存器與程式呼叫規範的介紹
### Arm64 架構下的原生 Coroutine
在實作原生 Coroutine 下,接著參考 [lwan-coro.c](https://github.com/lpereira/lwan/blob/master/src/lib/lwan-coro.c) 與 [minicoro](https://github.com/edubart/minicoro) 的設計風格,為保留 lwan 的程式風格,多半還是以專案本身的設計手法為主。
在原生 coroutine 實作上,就是將暫存器的資訊存入記憶體,等需要時再將其從記憶體取出。
Arm64 一共有 30 個暫存器 (不包括 stack pointer)
- r0 到 r7 : 參數與回傳值使用的暫存器,當呼叫函式時將參數放入此暫存器,回傳執也利用這些暫存器傳遞。
- r8 : 間接結果暫存器(indirect result register),可透過紀錄位址來間接回傳結果,例如回傳值為大型資料結構。
- r9 到 r15 : Caller 使用的一般目的暫存器,即呼叫函式者。
- r16、r17 : IP (intra-procedure-call) 暫存器
- r18 : 為平台 ABI 保留的暫存器。
- r19 到 r28 : Callee 使用的一般目的暫存器,即被呼叫者。
- r29 : Frame pointer,指向這次 stack 儲存的基底位址。
- r30 : link register,儲存返回位址。
除了暫存器類型還有資料形態需注意(以 r0 暫存器為例):
整數:
- `w0` : 為 32-bit 整數形態。
- `x0` : 為 64-bit 整數形態。
浮點數:
- `b0` : 為 8-bit 浮點數形態。
- `h0` : 為 16-bit 浮點數形態。
- `s0` : 為 32-bit 浮點數形態。
- `d0` : 為 64-bit 浮點數形態。
- `q0` : 為 128-bit 浮點數形態。
向量 `V0` : 為 128-bit,可根據浮點數的代號從此向量中取部份的值。
參考 [minicoro](https://github.com/edubart/minicoro) 與 lwan-coro 中 x86-64 asm,需要保存的暫存器可能就是 x9 到 x15、x19 到 x30,還有 x0 與 x1 ,總共 22 個暫存器。
指定的架構為 aarch64,因此在 [lwan-coro.h](https://github.com/lpereira/lwan/blob/master/src/lib/lwan-coro.h) `coro_context` 宣告時新增架構判斷
```c
#elif defined(__aarch64__)
typedef uintptr_t coro_context[22];
```
#### `coro_swapcontext`
接下來就是 [lwan-coro.c](https://github.com/lpereira/lwan/blob/master/src/lib/lwan-coro.c) `coro_swapcontext`,首先要把 `sp` 與 `x30` 的值取出,我們放在 `x10` 與 `x11`。
```c
mov x10, sp
mov x11, x30
```
接下來使用 `stp` 指令一次將兩個暫存器存入 `coro_context` 的兩個區域。
以第一行為例,當使用 `stp` 指令存入,`x8`、`x9` 的存放位置會是 `x0[2]` 與 `x0[3]`。
```c
stp x8, x9, [x0, #(1*16)]
stp x10, x11, [x0, #(2*16)]
stp x12, x13, [x0, #(3*16)]
stp x14, x15, [x0, #(4*16)]
stp x19, x20, [x0, #(5*16)]
stp x21, x22, [x0, #(6*16)]
stp x23, x24, [x0, #(7*16)]
stp x25, x26, [x0, #(8*16)]
stp x27, x28, [x0, #(9*16)]
stp x29, x30, [x0, #(10*16)]
stp x0, x1, [x0, #(0*16)]
```
而取出指令為 `ldr` ,用法與 `stp` 正好相反
```c
ldp x8, x9, [x1, #(1*16)]
ldp x10, x11, [x1, #(2*16)]
ldp x12, x13, [x1, #(3*16)]
ldp x14, x15, [x1, #(4*16)]
ldp x19, x20, [x1, #(5*16)]
ldp x21, x22, [x1, #(6*16)]
ldp x23, x24, [x1, #(7*16)]
ldp x25, x26, [x1, #(8*16)]
ldp x27, x28, [x1, #(9*16)]
ldp x29, x30, [x1, #(10*16)]
ldp x0, x1, [x1, #(0*16)]
```
注意在取出暫存器時,`x1` 必須在最後一個取出,否則會導致後續位址錯誤!
最後是設定 `sp`,結束後跳至 `x11`。
```c
mov sp, x10
br x11
```
#### coro 初始化
在一開始 `malloc` 時是沒有 context 存入 `coro_context` 的,所以要針對一些暫存器做初始化
```c
#elif defined(__aarch64__)
coro->context[19/* x28 */] = (uintptr_t) data;
coro->context[0 /* x0 */] = (uintptr_t) coro;
coro->context[1 /* x1 */] = (uintptr_t) func;
coro->context[5 /* lr */] = (uintptr_t) coro_entry_point_arm64;
uintptr_t rsp = (uintptr_t)stack + CORO_STACK_SIZE;
#define STACK_PTR 4
coro->context[STACK_PTR] = rsp & ~0xful;
```
上面是個 4 個初始化數值是為了第一次跳躍而準備的,在第一次呼叫 `coro_entry_point_arm64` 時會將 `x28` 的數值放入 `x2`(第 3 個參數)再呼叫 `coro_entry_point`。
```c
mov x2, x28
bl coro_entry_point
```
`coro_entry_point` 會呼叫 `coro_yield`,主要任務是設置 `yield_value`。
而 stack 在 `coro_new` 時就會 mmap 配置一個 stack 空間。
```c
void *stack = mmap(NULL, CORO_STACK_SIZE, PROT_READ | PROT_WRITE,
MAP_STACK | MAP_ANON | MAP_PRIVATE, -1, 0);
if (UNLIKELY(stack == MAP_FAILED))
return NULL;
coro = lwan_aligned_alloc(sizeof(*coro), 64);
if (UNLIKELY(!coro)) {
munmap(stack, CORO_STACK_SIZE);
return NULL;
}
coro->stack = stack;
```
最後 `coro->context[STACK_PTR] = rsp & ~0xful` 就是對齊記憶體位址。
記憶體開銷分析
- ucontext
![](https://hackmd.io/_uploads/S1GCXbjwh.png)
總 heap 開銷為 1.3 MB
- Arm64 原生 Coroutine
![](https://hackmd.io/_uploads/ByW1N-jP2.png)
總 heap 開銷為 904.3 KB
[cserv](https://github.com/sysprog21/cserv) 也運用 coroutine 於事件驅動的高效網頁伺服器。