contributed by < jwang0306
>
提示: 參考實驗程式碼: test_epoll_lt_and_et
狀態機是為了紀錄維護 request 的狀態
timer 是為了控制每個 task 的執行時間,執行一個 task 後會將他所花時間加上去。我們維護一個以 priority queue 來實做的 timer 結構,目的是為了每次都可以找出最小的 key ,也就是執行時間最早超過的 task ,然後在下一個 event loop 處理掉。
不用搶 lock 有機會提昇效能。 mutex 會慢的原因主要就是因為 lock contention ,一旦發生搶 lock 的情況,thread 就會進入 kernel space 睡覺等待,其中會涉及 context switch,這樣的 kernel-user space 的時間開銷是造成它慢的主因;相較之下, lock-free 的等待方式就是透過 CAS 不斷的 spin ,而萬一一旦這整個 spin 的計算開銷很大的話,那也是有可能比 mutex 更慢的。一般來說在 critical section 不大的時候, lock-free 有機會帶來加速。我將實做並比較:
- lock-based task queue
- lock-free per-thread task queue
http-parse-sample.py
運作機制,以及 eBPF 程式在 Linux 核心內部分析封包的優勢為何?$ cat /etc/os-release
NAME="Ubuntu"
VERSION="20.04 LTS (Focal Fossa)"
$ cat /proc/version
Linux version 5.4.0-26-generic (buildd@lcy01-amd64-029) \
(gcc version 9.3.0 (Ubuntu 9.3.0-10ubuntu2))
$ lscpu
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
Address sizes: 39 bits physical, 48 bits virtual
CPU(s): 4
On-line CPU(s) list: 0-3
Thread(s) per core: 2
Core(s) per socket: 2
Socket(s): 1
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 78
Model name: Intel(R) Core(TM) i7-6500U CPU @ 2.50GHz
Stepping: 3
CPU MHz: 3013.336
CPU max MHz: 3100.0000
CPU min MHz: 400.0000
BogoMIPS: 5199.98
Virtualization: VT-x
L1d cache: 64 KiB
L1i cache: 64 KiB
L2 cache: 512 KiB
L3 cache: 4 MiB
NUMA node0 CPU(s): 0-3
// event loop
while (1) {
events = epoll_wait();
for 1 to events {
if (fd == listen_fd) {
accept_conn(); // non-blocking
} else {
if (not interest)
continue;
do_request();
}
}
}
EPOLLONESHOT
struct epoll_event
參數之一,因為可能會有新的資料進來使 fd 可以讀取,如果這時候這個 fd 被分給另外一個執行續就會壞掉,因此要重新放回 epoll-list 重來。event.data.ptr
struct epoll_event
提供了這樣的一塊 pointer ,可以用來裝你要的資訊(自行定義),像是 file descriptor 、 status… 等等Thread pool 裡的執行緒負責執行 task queue 裡面的連線請求, event loop 會一直將任務放進去 task queue ,同時透過 accept
接受新的 connection request (放到 epoll event list 等待下一輪 event loop 開始)。
// event loop
while (1) {
events = epoll_wait();
for 1 to events {
if (fd == listen_fd) {
accept_conn();
} else {
if (not interest)
continue;
// queueing task into threadpool
thpool_enq(do_request);
}
}
}
因此想法就是把 do_request()
整個丟給其他 thread 去做,但是中間涉及的 data race 就要自己想辦法來解決(像是 timer 的維護)。實做上就是用 lock 來保護一切共享資源,也就是從 work queue 拿出或放進任務的時候。
但我看了 sysprog/server-framework 的實做,它不管是 accept 的 new connection 請求還是一般的 read/write 請求,都被丟到 thread pool 去執行了 (甚至連 event loop 都丟到 thread pool 執行,反覆丟進去來達到無窮迴圈的效果):
static void on_data(struct Reactor *reactor, int fd)
{
if (fd == _server_(reactor)->srvfd) {
/*listening socket. accept connections. */
Async.run(_server_(reactor)->async,
(void (*)(void *)) accept_async, reactor);
} else if (_protocol_(reactor, fd) && _protocol_(reactor, fd)->on_data) {
_server_(reactor)->idle[fd] = 0;
/* clients, forward on */
Async.run(_server_(reactor)->async,
(void (*)(void *)) async_on_data,
&(_server_(reactor)->server_map[fd]));
}
}
哪種方法比較好?我決定直接去看 Nginx 怎麼做的。
其實原本 single-thread I/O 已經夠快了,在絕大多情況下。因此問題就是,什麼時候會需要 multi-thread ,什麼樣的 task 交給其他 thread 執行會比較好呢?對此我參照 Nginx 的想法:
At the moment, offloading to thread pools is implemented only for three essential operations: the
read()
syscall on most operating systems,sendfile()
on Linux, andaio_write()
on Linux which is used when writing some temporary files such as those for the cache. We will continue to test and benchmark the implementation, and we may offload other operations to the thread pools in future releases if there’s a clear benefit.
他們認為使用 thread pool 是為了解決 blocking 的問題。特別是在一些潛在會發生 blocking 的 call :
read()
sendfile()
aio_write()
在某些情況下,使用 thread pool 會快上很多的,會說是某些情況是因為「原本就很快」了,只有在大量涉及 hard disk 的資源去讀寫的時候才會造成嚴重的 blocking ,一般情況下,都只是簡單的 memory copy and paste ,不需要 multi-thread 來處理速度也可以很快,硬要的話有時候還會造成負擔。
有了初步了解,再細讀一下 Nginx 的 source code ,不難發現其實 Nginx 只有一條 main thread 負責處理 accept 請求,然後上述幾個跟 file 操作相關的 system call 才丟給其他 thread 去做,因此我認為只將 do_request()
丟到 thread pool 去是比較接近 Nginx 作法的,我打算暫時採用這個方法,雖然得因此把 timer 上鎖,造成額外的 overhead 。
但我覺得很神奇的是,很久很久以前 Nginx 也有把 timer 的操作加上 lock ,但是現在沒有了,我怎麼也找不到它把 timer 的 lock 給拔掉的 commit 紀錄。
所以 Nginx 增減 timer 的地方可能不太一樣。我讀著 source code ,推測可能的方式如下:
sendfile
丟給 thread pool ,threads 執行完 sendfile
以後把任務給放到 ngx_thread_pool_done
這個結構裡,然後呼叫 ngx_notify
通知 main thread 來執行 ngx_thread_pool_handler
ngx_thread_pool_handler
會執行 ngx_thread_pool_done
裡面的 task ,也就是呼叫 event->handler
, handler 裡面會呼叫 finalize_request
(或其他),將 timer 處理掉如果真的是這樣的話,那確實 timer 不需要加上 lock ,因為從頭到尾都只有 main thread 在操作。不曉得我的想法有沒有錯?希望老師能夠給我提示。
這正是 Reactor pattern 的精髓,以單執行緒處理 mainloop,不過 timer 是另外的狀況,要思考 reentrancy
最穩妥的方法,就是使用 mutex 和 condvar 。 Nginx 的 thread pool 也是使用這樣的組合。但是 Nginx 貌似是有額外進行優化的,會搶在 thread 進入休眠狀態之前就取得 lock ,降低沒有搶到 lock 的 overhead 。
thpool_t
structure/* define a task node */
typedef struct {
void (*function)(void *);
void *arg;
} task_t;
/* define a task queue */
typedef struct {
task_t *buffer; // ring-buffer task queue
int size; // size of queue
int in; // index of pushing a task
int out; // index of popping a task
int task_count; // number of tasks in queue
} taskqueue_t;
/* define a thread pool */
typedef struct {
pthread_t *threads;
taskqueue_t *queue;
int thread_count;
int is_stopped;
} thpool_t;
worker_thread_cycle
& perform_task
pthread_cond_wait
讓 thread 睡在該處等sched_yield
會將 CPU 使用權排給其他 thread ,讓大家都有一個機會worker_thread_cycle
無窮迴圈外還有一個 perform_task
,原本要設計成如果有 signal 來中斷迴圈的話,跳出來後還能夠把剩下的工作給做完,未來預計會補上 signal 的部份。static void perform_tasks(thpool_t *thpool)
{
task_t *task = NULL;
do {
pthread_mutex_lock(&(thpool->lock));
while (queue_is_empty(thpool))
pthread_cond_wait(&(thpool->cond), &(thpool->lock));
/* grab a task from queue */
task = queue_deq(thpool);
pthread_mutex_unlock(&(thpool->lock));
/* preform the task */
if (task)
(task->function)(task->arg);
} while (task);
}
static void *worker_thread_cycle(void *arg)
{
while (1) {
perform_tasks(arg);
sched_yield();
}
perform_tasks(arg);
return 0;
}
queue_add
:pthread_cond_signal
依序叫醒方才睡著的 threadvoid queue_add(thpool_t *thpool, void (*task)(void *), void *arg)
{
pthread_mutex_lock(&(thpool->lock));
...
if (thpool->queue->task_count == 1)
pthread_cond_signal(&(thpool->cond));
pthread_mutex_unlock(&(thpool->lock));
}
實做看看 lock-free 的版本,參照 LFThpool 的想法,每個 thread 分配一個 workqueue ,這樣 producer 和 consumer 就都各只有一個,要避免 race condition 相對容易,用到一點點 atomic operation 就好。
bool __sync_bool_compare_and_swap (type *ptr, type oldval, type newval);
bool __sync_val_compare_and_swap (type *ptr, type oldval, type newval);
type __sync_fetch_and_add (type *ptr, type value);
type __sync_add_and_fetch (type *ptr, type value);
lf_thpool_t
structuretask_count
會有同時有 producer (event loop) 與 consumer (thread) 修改,標明 volatile
以抑制編譯器對其進行最佳化 (例如避免刪去部分變數指派和狀態更新的操作)typedef struct {
task_t *buffer; // workqueue
pthread_t thr; // the actual thread
int id; // thread id
int size; // size of queue
unsigned int in; // index of pushing a task
unsigned int out; // index of popping a task
volatile int task_count; // number of tasks in queue
} thread_t;
typedef struct {
thread_t *threads;
int thread_count;
int is_stopped;
} lf_thpool_t;
lf_thpool_enq
: round robin & dispatchround_robin_schedule
平均分配到每一個 workqueuedispatch_task
分配,並使用 atomic operation 避免 task_count
的 data racevoid lf_thpool_enq(lf_thpool_t *lf_thpool, void (*task)(void *), void *arg)
{
thread_t *thread = round_robin_schedule(lf_thpool);
dispatch_task(thread, task, arg);
}
thread_t *round_robin_schedule(lf_thpool_t *lf_thpool)
{
static int cur_thread_index = -1;
cur_thread_index = (cur_thread_index + 1) % THREAD_COUNT;
return lf_thpool->threads + cur_thread_index;
}
int dispatch_task(thread_t *thread, void (*task)(void *), void *arg)
{
(thread->buffer + thread->in)->function = task;
(thread->buffer + thread->in)->arg = arg;
__sync_fetch_and_add(&(thread->task_count), 1);
thread->in = (thread->in + 1) % thread->size;
return 1;
}
lf_thpool_deq
task_count
data racetask_t *lf_thpool_deq(thread_t *thread)
{
if (!__sync_val_compare_and_swap(&(thread->task_count), 0, 0))
return NULL;
int out_offset = thread->out;
thread->out = (thread->out + 1) % thread->size;
__sync_fetch_and_sub(&(thread->task_count), 1);
return thread->buffer + out_offset;
}
worker_thread_cycle
& perform_task
pthread_self()
把當前的 thread 找出來static void perform_tasks(thread_t *thread)
{
task_t *task = NULL;
do {
task = lf_thpool_deq(thread);
if (task)
(task->function)(task->arg);
} while (task);
}
static void *worker_thread_cycle(void *arg)
{
lf_thpool_t *lf_thpool = arg;
thread_t *thread = get_cur_thread(lf_thpool);
while (1) {
perform_tasks(thread);
sched_yield();
}
perform_tasks(thread);
return 0;
}
TODO:
__atomic
builtin我首先以 htstress 對三種實做進行壓力測試,皆使用以下參數:
$ ./htstress -n 10000 -c 500 -t 4 -f http://localhost:8081/
其中 concurrency level 從 1~500 。
可以看到 baseline (single-thread at all) 表現最差,再者是 lock-based thread pool ,表現最好的是 lock-free thread pool 。
multi-threaded 之下, main thread 與 worker thread 可能同時用到 request ,造成 data race:
handle_expire_timer
裡面執行的 http_close_conn
,將 request 記憶體給釋放的時候do_request
卻同時正在處理該 request此問題尚未解決,目前無想法
Nginx 雖然是 single-threaded event loop ,但是它可以有多個 process ,在開始執行的時候會進行 fork ,然後讓每個 process 都跑一個 event loop 。
這是 prefork 技巧,在 NPTL 尚未完善時,對於 Linux 核心有更好的效能表現
參考了 lear 與 Nginx 的實做,我將原本的單行程修改成多行程的版本。但是暫時把 thread pool 給停用,因為不曉得在多行程之下它該扮演怎樣的角色?我需要再研究一下,況且 timer 的 qruority queue 操作需要 lock 與否的問題我還沒解決。
在 Linux Applications Performance: Introduction 展現多種實作手法:
應可解答上述的問題。
master_process_cycle
sigwait
來暫停,等待結束信號spawn_process
single_process_cycle
single_process_cycle
worker_init
process_events_and_timers
process_events
- epoll wait loophandle_expired_timers
- 處理超時的 requestvoid master_process_cycle(int listenfd)
{
int worker_pid[worker_processes];
for (int i = 0; i < worker_processes; ++i) {
worker_pid[i] = spawn_process(listenfd);
if (worker_pid[i] <= 0) {
perror("error during worker creation");
break;
}
}
...
// pause and wait for signal
}
int spawn_process(int listenfd)
{
int pid = fork();
switch (pid) {
case -1:
perror("run master process fork error");
exit(EXIT_FAILURE);
case 0:
single_process_cycle(listenfd);
exit(EXIT_SUCCESS);
default:
return pid;
}
}
void single_process_cycle(int listenfd)
{
int listenfd = worker_init();
/* event loop */
while (1) {
process_events_and_timers(listenfd);
}
}
int worker_init()
{
event_init();
timer_init();
int listenfd = open_listenfd(PORT);
request_init(listenfd);
return listenfd;
}
void process_events_and_timers(int listenfd)
{
process_events(listenfd);
handle_expired_timers();
}
多行程還是能再細分,像是 listen socket 的類型與分配,接下來仔細探討一下,在 socket 有無設定 SO_REUSEPORT
的不同。
SO_REUSEPORT
SO_REUSEPORT
SO_REUSEPORT
雖然解決了 worker process 的 load balancing 問題,但若是以等候理論來探討,它或許會增加 max latency 。就像我們總是覺得,為什麼隔壁排的隊伍總是前進比我快?以下的影片有詳細的解說:
假設有三條等候線,那麼要剛好去到最快的那條線的機率只有
如果在 epoll-and-accept 的設置下, Linux kernel 喚醒 process 的行為是 FIFO 的話,那或許使用一條 accept queue 就是最好的解法,太可惜了。
以單執行緒為準,一樣透過 htstress 來測試 concurrency 1 ~ 100 :
$ ./htstress -n 10000 -c 100 -t 4 http://localhost:8081/
for i in {1..100}; do
./htstress -n 100000 -c $i -t 4 http://localhost:8081/
done
效能首度超越 NGINX 預設組態,賀!
thread pool 的 context switch 和 signaling 有一定的代價。下一步來引入 tasklet (但不使用 thread pool)
好 jwang0306
雖然會印出錯誤,但是 SIGPIPE
其實這是原本專案就有處理過的 ( SIG_IGN
它,否則 server 會直接關掉),所以收到此信號會印出 error 並回傳 -1 。目前不知道更好的忽略 SIGPIPE
方法。
[ERROR] (src/http.c:32: errno: Broken pipe) errno == 32
[ERROR] (src/http.c:33: errno: Connection reset by peer) errno == 104
[ERROR] (src/http.c:253: errno: Resource temporarily unavailable) rc != 0
這個例外的發生原因是因為 read
的時候沒有完整讀取 client 的訊息,導致 parsing 時出錯,另外也可以在使用 ApacheBench 壓力測試時看到以下錯誤:
Failed requests:
(Connect: 0, Receive: 0, Length: 1227, Exceptions: 0)
在 do_request
裡面, buffer 滿的時候把 index 調回來,再進行 read
把剩下的讀完:
...
do_read:
plast = &r->buf[r->last % MAX_BUF];
remain_size =
MIN(MAX_BUF - (r->last - r->pos) - 1, MAX_BUF - r->last % MAX_BUF);
n = read(fd, plast, remain_size);
assert(r->last - r->pos < MAX_BUF && "request buffer overflow!");
if (n == 0) /* EOF */
goto err;
if (n < 0) {
if (errno != EAGAIN) {
log_err("read err, and errno = %d", errno);
goto err;
}
break;
}
r->last += n;
assert(r->last - r->pos < MAX_BUF && "request buffer overflow!");
if (r->last % MAX_BUF == 0) {
goto do_read;
}
...
我傾向於這個做法,這也是普遍最常看到的。與剛剛的邏輯一樣,差別在 buffer 滿的時候就擴充它:
...
do_read:
plast = &r->buf[r->last];
remain_size =
MIN(r->buf_size - (r->last - r->pos) - 1, r->buf_size - r->last);
n = read(fd, plast, remain_size);
assert(r->last - r->pos < MAX_BUF && "request buffer overflow!");
if (n == 0) /* EOF */
goto err;
if (n < 0) {
if (errno != EAGAIN) {
log_err("read err, and errno = %d", errno);
goto err;
}
break;
}
r->last += n;
assert(r->last - r->pos < MAX_BUF && "request buffer overflow!");
if (r->last == r->buf_size) {
r->buf_size *= 2;
if (r->buf_size > MAX_BUF)
r->buf_size = MAX_BUF;
r->buf = (char *) realloc(r->buf, r->buf_size);
goto do_read;
}
...
(註:此 MAX_BUF
已經不是原本的意義了,我把它作為最大能夠擴充的值,設定為 8MB 。)
為了避免浪費記憶體 ,所以每筆請求都傳送完後,就讓 index 從來開始:
if (!out->keep_alive) {
debug("no keep_alive! ready to close");
free(out);
goto close;
} else {
r->last = 0;
r->pos = 0;
}
另一個想法來自某次的小考,利用 mmap 來將兩段 virtual memory 給映射進 physical memory ,達成頭尾連接的效果:
read
的行為就是只能讀到 buffer 滿為止,經過上述的處理後,就不怕超出邊界,因為超出去也是從頭開始。
static inline void init_http_request(http_request_t *r,
int fd,
int epfd,
char *root)
{
...
// Check that the requested size is a multiple of a page
if (MAX_BUF % getpagesize() != 0) {
perror("Requested size is not a multiple of the page size");
}
// Create an anonymous file backed by memory
if ((r->mem_fd = memfd_create("req_buf", 0)) == -1)
perror("init_http_request: memfd_create error");
// Set buffer size
if (ftruncate(r->mem_fd, MAX_BUF) != 0)
perror("Could not set size of anonymous file");
// Ask mmap for a good address
if ((r->buf = mmap(NULL, 2 * MAX_BUF, PROT_NONE,
MAP_PRIVATE | MAP_ANONYMOUS, -1, 0)) == MAP_FAILED)
perror("Could not allocate virtual memory");
// Mmap first region
if (mmap(r->buf, MAX_BUF, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_FIXED,
r->mem_fd, 0) == MAP_FAILED)
perror("Could not map buffer into virtual memory");
// Mmap second region, with exact address
if (mmap(r->buf + MAX_BUF, MAX_BUF, PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_FIXED, r->mem_fd, 0) == MAP_FAILED)
perror("Could not map buffer into virtual memory");
}
如此一來也不需要每次都 mod 檢查邊界,只要在 index 超過邊界的時候把它拉回來就好,時機點選在 do_request
一開始:
if (r->last >= MAX_BUF) {
r->last -= MAX_BUF;
r->pos -= MAX_BUF;
}
這個解法看似很理想,它也確實優化了 circular buffer ,但其實也是要看情況的。使用 keep-alive request 像是 ApacheBench 的時候,效果很好,但是沒有 keep-alive 像是 htstress 的時候就會很差。這是因為頻繁的進行 mmap
是耗費成本的,要將 page faults 和 TLB misses 也考量進去。
因此使用 htstress 時,每一筆請求都會進行一次 http_request_init
,頻繁呼叫 mmap
,而 keep-alive request 就還好,因為就是固定的那些 concurrency connection 不停發起多次請求,呼叫 mmap
次數相對少,也就沒有造成什麼影響。