server-framewrok

contributed by <rwe0214> & <winonecheng>

tags: Linux, rwe0214, NCKU

預期目標

程式碼

async.c

  • 嘗試使用 atom_operation 取代 mutex_lock 並比較效能
    • struct Async 結構下新增一個 int atom_lock 成員,並將它初始化成 0
    • 接著把 async.c 裡的 mutex_lock 改成 atom_operation 風格,如下程式碼:
- pthread_mutex_lock(&async->lock); + while(__sync_fetch_and_add(&(async->atom_lock), 1)!=0){;} ...(critical section)... - pthread_mutex_unlock(&async->lock); + __sync_fetch_and_and(&(async->atom_lock), 0);
  • 執行結果 (只擷取時間部份)
  1. mutex_lock:
$ ./test-async
# signal finish at (218.213214) ms
# elapsed time: (225.651327) ms
  1. atom_operation:
$ ./test-async
# signal finish at (90.613301) ms
# elapsed time: (92.631035) ms

雖然以時間上來看,atom_operation 比 mutex_lock 表現來的優異。但是我並不確定這個結果可以確保在每個情況下都是如此。因為在試著撰寫 atom_lock 的時候有遇到以上的寫法但表現卻遜色於 mutex_lock 的情況@@,所以有點懷疑自己寫的 atom_operation 並不是正確的。Willy Chiu

  • 執行結果 ( 更新 )
    ab -c 32 -n 100 http://localhost:8080/ 並執行 ./httpd 來做效能分析
  1. mutex_lock:
Concurrency Level:      32
Time taken for tests:   16.135 seconds
Complete requests:      100
Failed requests:        0
Total transferred:      9900 bytes
HTML transferred:       1300 bytes
Requests per second:    6.20 [#/sec] (mean)
Time per request:       5163.348 [ms] (mean)
Time per request:       161.355 [ms] (mean, across all concurrent requests)
Transfer rate:          0.60 [Kbytes/sec] received
  1. atom_operation:
Concurrency Level:      32
Time taken for tests:   15.225 seconds
Complete requests:      100
Failed requests:        0
Total transferred:      9900 bytes
HTML transferred:       1300 bytes
Requests per second:    6.57 [#/sec] (mean)
Time per request:       4871.896 [ms] (mean)
Time per request:       152.247 [ms] (mean, across all concurrent requests)
Transfer rate:          0.64 [Kbytes/sec] received

這個執行結果完全和我預期的不同Willy Chiu

推測因為這裡的 critical section 是一段程式碼而不是單一個 integer 或變數,所以 atomic operation 的程式碼可能會使 thread 陷入 busy waiting 的狀態,因而效能提昇不起來。Willy Chiu

表示有 lock contention
參照 Mergesort 效能分析,使用 mutrace 來找出問題點

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
jserv

技術原理 - 整合2017及2016作業內容

Framework 架構介紹

server-framework : 用 C99 撰寫、具備物件導向程式設計風格的伺服器開發框架,架構包含以下三個部分:

  • async: a native POSIX thread pool

    • 結合 pipe(作為喚醒 thread 的訊號) 以及 mutex(管理 task queue,避免 work thread 同時對同一 task 進行讀取)
  • reactor: reactor pattern implementation using callbacks

    • 利用 epoll 系統呼叫,來實做 Reactor pattern
    • Reactor pattern 處理一個 application 收到從一個或多個 client 同時送 request 的情況
  • protocol-server: server construction library

    • 管理有關 server 運作的所有事情,包括 thread pool, process forking, accepting new connections, setting up the initial protocol for new connections, and user space socket writing buffers

運作流程

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

async

  • 在 async.c 的第一個函式便重新定義了 write(),如下:
    • 對應 CS:APP3e 第十章: 10.5 用 RIO 包健壯地讀寫( page 626 )。
static inline ssize_t write_wrapper(int fd, const void *buf, size_t count) { ssize_t s; if ((s = write(fd, buf, count)) < count) perror("write"); return s; } #undef write #define write write_wrapper

因為舊有的 write() 是相當據風險的,它並不會確保資料的寫入是完整的,換句話說,雖然 write() 有提供一個 argument 是讓使用者指定寫入的大小,但是卻沒有保證一定會寫完指定的大小。
這在網路傳輸裡,如果連線不穩定導致漏送封包資料便不完整而出錯,此處重寫則是確保一定 write() 完整的大小,否則便預先印出錯誤提醒。
而在 CS:APP 一書中有提到如何實做一個充分確保 write 能安全寫完指定大小的方式。

  • Async async_p 結構體簡介
struct Async { /** the task queue - MUST be first in the struct */ pthread_mutex_t lock; /**< a mutex for data integrity */ struct AsyncTask * volatile tasks; /**< active tasks */ struct AsyncTask * volatile pool; /**< a task node pool */ struct AsyncTask ** volatile pos; /**< the position for new tasks */ /** The pipe used for thread wakeup */ struct { int in; /**< read incoming data (opaque data), used for wakeup */ int out; /**< write opaque data (single byte), used for wakeup signaling */ } pipe; int count; /**< the number of initialized threads */ unsigned run : 1; /**< the running flag */ /** the thread pool */ pthread_t threads[]; };

當中我覺得很特別的地方是 *tasks*pool*tasks 是一個待進行的 task-list,而 *pool 則是 *tasks 中完成的 task-node 所串接起來的 free-task-list,如果接收到新的 task 時,會優先從 *pool 中取一個 free-task-node ,去接收新的 task,除非 *pool 為空,否則不會另外新增一個 task-node。
如此一來便可以省下創建 node 記憶體的時間,而 **pos 則是利用 pointer to pointer 去修改上述的 list 結構,簡而言之就是接收新 task 的 address。

  • Reactor 的 Client 工作會將處理 request 的工作丟入Work queue,並且在 pipe 寫入 1 byte 紀錄
static int async_run(async_p async, void (*task)(void *), void *arg) { struct AsyncTask *c; /* the container, storing the task */ if (!async || !task) return -1; // 利用 mutex 避免 work thread 同時對同一 task 進行讀取 pthread_mutex_lock(&(async->lock)); ... /* get a container from the pool of grab a new container */ pthread_mutex_unlock(&async->lock); /* wake up any sleeping threads * any activated threads will ask to require the mutex * as soon as we write. * we need to unlock before we write, or we will have excess * context switches. */ // 在 pipe 寫入 1 byte,喚醒 thread write(async->pipe.out, c->task, 1); return 0; }
  • Worker thread 若發現 pipe 有紀錄,則會將該 1 byte 取出並執行 Work queue 中對應的 request
static void *worker_thread_cycle(void *_async) { /* setup signal and thread's local-storage async variable. */ struct Async *async = _async; char sig_buf; /* pause for signal for as long as we're active. */ while (async->run && (read(async->pipe.in, &sig_buf, 1) >= 0)) { perform_tasks(async); // causes the calling thread to relinquish the CPU sched_yield(); } // 執行 Work queue 中剩餘未被執行的 tasks perform_tasks(async); return 0; }

perform_tasks 中頻繁的進行 mutex lock 和 unlock,會對效能有所影響,或許可以在這上面進行改善。ChengCheng

reactor

  • 需先理解 Reactor Pattern (在此系統中以 epoll 實做)

  • Reactor Structure :

    • Resources : 從系統中任何可以提供輸入或 consume 輸出的資源。如: client 的 request 請求。

    • Synchronous Event Demultiplexer : 用單一個 event loop 去阻擋所有 resource,並傳送 resource 給 dispatch。如:epoll 將收到的 client request 放到 work queue 等待執行,而不會被 blocking。

    • Dispatcher : 分派從 demultiplexer 來的 resource 給相關的 request handler。如:將 client 待處理的 request 丟進 work queue ,並喚醒 thread 處理這些 request。函式 async_run( ) 即是呈現此功能。

    • Request Handler : 定義如何應用 requset handler 和他相關的資源。如:worker thread 去處理 client request

  • epoll

    ​​​​// 建立 epoll 物件並回傳其 fd ​​​​int epoll_create(int size) ​​​​int epoll_create1(int flags) ​​​​// 依照 op 去新增或移除監聽的 fd 或更改 fd 的監聽選項 ​​​​int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) ​​​​// 等待已註冊之 event 被觸發或 timeout, 並回傳 ready fd 的數量 ​​​​int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout)
    • 呼叫 epoll_wait 後,會一直 block 住,直到有 event 發生或 timeout 時,回傳 I/O ready 的 fd 數量,並在參數 events 所指向的記憶體空間有被觸發的那些 event
    • set_fd_polling() : 將 request 送入 epoll 之中
    ​​​​int set_fd_polling(int queue, int fd, int action, long milliseconds) ​​​​{ ​​​​ struct epoll_event chevent; ​​​​ chevent.data.fd = fd; ​​​​ // 這邊為 epoll 的系統呼叫參數來表達不同的 epoll 狀態 ​​​​ // EPOLLET : 將 epoll 設為 Edge Triggered ​​​​ chevent.events = EPOLLOUT | EPOLLIN | ​​​​ EPOLLET | EPOLLERR | ​​​​ EPOLLRDHUP | EPOLLHUP; ​​​​ ... ​​​​ // 對 request fd 執行 op 操作 ​​​​ return epoll_ctl(queue, action, fd, &chevent); ​​​​}
    • reator_review() : 將 epoll 中的工作拿出來執行
int reactor_review(struct Reactor *reactor){ ... // 此 Marco 呼叫 epoll_wait(),並回傳有幾個工作要執行 int active_count = _WAIT_FOR_EVENTS_; if (active_count < 0) return -1; if (active_count > 0) { for (int i = 0; i < active_count; i++) { if (_EVENTERROR_()) /*errors are hendled as disconnections (on_close)*/ reator_close(reactor, _GETFD_(i)); }else{ /*no error, then it is an active event*/ // 以下的部分是在 ready queue 等待服務的事件抓出來執行 if (_EVEBTREADY_(i) && reactor->on_ready) reactor->on_ready(reactor, _GETFD_(i)); if (_EVENTDATA_(i) && reactor->on_data) reactor->on_data(reactor, _GETFD_(i)); } } } return active_count; }

epoll 這部分較難理解,和我一樣對於 I/O event 以及 epoll 完全不了解的新手,可以先看這篇 Linux IO模式及 select、poll、epoll详解,能大致理解 epoll 是在做甚麼事情。ChengCheng

protocol-server