contributed by <rwe0214
> & <winonecheng
>
Linux
, rwe0214
, NCKU
struct Async
結構下新增一個 int atom_lock
成員,並將它初始化成 0
async.c
裡的 mutex_lock 改成 atom_operation 風格,如下程式碼:
- pthread_mutex_lock(&async->lock);
+ while(__sync_fetch_and_add(&(async->atom_lock), 1)!=0){;}
...(critical section)...
- pthread_mutex_unlock(&async->lock);
+ __sync_fetch_and_and(&(async->atom_lock), 0);
$ ./test-async
# signal finish at (218.213214) ms
# elapsed time: (225.651327) ms
$ ./test-async
# signal finish at (90.613301) ms
# elapsed time: (92.631035) ms
雖然以時間上來看,atom_operation 比 mutex_lock 表現來的優異。但是我並不確定這個結果可以確保在每個情況下都是如此。因為在試著撰寫 atom_lock 的時候有遇到以上的寫法但表現卻遜色於 mutex_lock 的情況@@,所以有點懷疑自己寫的 atom_operation 並不是正確的。Willy Chiu
ab -c 32 -n 100 http://localhost:8080/
並執行 ./httpd
來做效能分析Concurrency Level: 32
Time taken for tests: 16.135 seconds
Complete requests: 100
Failed requests: 0
Total transferred: 9900 bytes
HTML transferred: 1300 bytes
Requests per second: 6.20 [#/sec] (mean)
Time per request: 5163.348 [ms] (mean)
Time per request: 161.355 [ms] (mean, across all concurrent requests)
Transfer rate: 0.60 [Kbytes/sec] received
Concurrency Level: 32
Time taken for tests: 15.225 seconds
Complete requests: 100
Failed requests: 0
Total transferred: 9900 bytes
HTML transferred: 1300 bytes
Requests per second: 6.57 [#/sec] (mean)
Time per request: 4871.896 [ms] (mean)
Time per request: 152.247 [ms] (mean, across all concurrent requests)
Transfer rate: 0.64 [Kbytes/sec] received
這個執行結果完全和我預期的不同…Willy Chiu
推測因為這裡的 critical section 是一段程式碼而不是單一個 integer 或變數,所以 atomic operation 的程式碼可能會使 thread 陷入 busy waiting 的狀態,因而效能提昇不起來。Willy Chiu
表示有 lock contention
參照 Mergesort 效能分析,使用 mutrace 來找出問題點
server-framework : 用 C99 撰寫、具備物件導向程式設計風格的伺服器開發框架,架構包含以下三個部分:
async
: a native POSIX thread pool
pipe
(作為喚醒 thread 的訊號) 以及 mutex
(管理 task queue,避免 work thread 同時對同一 task 進行讀取)reactor
: reactor pattern implementation using callbacks
epoll
系統呼叫,來實做 Reactor patternprotocol-server
: server construction library
static inline ssize_t write_wrapper(int fd, const void *buf, size_t count)
{
ssize_t s;
if ((s = write(fd, buf, count)) < count) perror("write");
return s;
}
#undef write
#define write write_wrapper
因為舊有的 write() 是相當據風險的,它並不會確保資料的寫入是完整的,換句話說,雖然 write() 有提供一個 argument 是讓使用者指定寫入的大小,但是卻沒有保證一定會寫完指定的大小。
這在網路傳輸裡,如果連線不穩定導致漏送封包資料便不完整而出錯,此處重寫則是確保一定 write() 完整的大小,否則便預先印出錯誤提醒。
而在 CS:APP 一書中有提到如何實做一個充分確保 write 能安全寫完指定大小的方式。
struct Async {
/** the task queue - MUST be first in the struct */
pthread_mutex_t lock; /**< a mutex for data integrity */
struct AsyncTask * volatile tasks; /**< active tasks */
struct AsyncTask * volatile pool; /**< a task node pool */
struct AsyncTask ** volatile pos; /**< the position for new tasks */
/** The pipe used for thread wakeup */
struct {
int in; /**< read incoming data (opaque data), used for wakeup */
int out; /**< write opaque data (single byte),
used for wakeup signaling */
} pipe;
int count; /**< the number of initialized threads */
unsigned run : 1; /**< the running flag */
/** the thread pool */
pthread_t threads[];
};
當中我覺得很特別的地方是 *tasks
和 *pool
,*tasks
是一個待進行的 task-list,而 *pool
則是 *tasks
中完成的 task-node 所串接起來的 free-task-list,如果接收到新的 task 時,會優先從 *pool
中取一個 free-task-node ,去接收新的 task,除非 *pool
為空,否則不會另外新增一個 task-node。
如此一來便可以省下創建 node 記憶體的時間,而 **pos
則是利用 pointer to pointer 去修改上述的 list 結構,簡而言之就是接收新 task 的 address。
static int async_run(async_p async, void (*task)(void *), void *arg)
{
struct AsyncTask *c; /* the container, storing the task */
if (!async || !task) return -1;
// 利用 mutex 避免 work thread 同時對同一 task 進行讀取
pthread_mutex_lock(&(async->lock));
... /* get a container from the pool of grab a new container */
pthread_mutex_unlock(&async->lock);
/* wake up any sleeping threads
* any activated threads will ask to require the mutex
* as soon as we write.
* we need to unlock before we write, or we will have excess
* context switches.
*/
// 在 pipe 寫入 1 byte,喚醒 thread
write(async->pipe.out, c->task, 1);
return 0;
}
static void *worker_thread_cycle(void *_async)
{
/* setup signal and thread's local-storage async variable. */
struct Async *async = _async;
char sig_buf;
/* pause for signal for as long as we're active. */
while (async->run && (read(async->pipe.in, &sig_buf, 1) >= 0)) {
perform_tasks(async);
// causes the calling thread to relinquish the CPU
sched_yield();
}
// 執行 Work queue 中剩餘未被執行的 tasks
perform_tasks(async);
return 0;
}
在
perform_tasks
中頻繁的進行 mutex lock 和 unlock,會對效能有所影響,或許可以在這上面進行改善。ChengCheng
需先理解 Reactor Pattern (在此系統中以 epoll
實做)
Resources : 從系統中任何可以提供輸入或 consume 輸出的資源。如: client 的 request 請求。
Synchronous Event Demultiplexer : 用單一個 event loop 去阻擋所有 resource,並傳送 resource 給 dispatch。如:epoll 將收到的 client request 放到 work queue 等待執行,而不會被 blocking。
Dispatcher : 分派從 demultiplexer 來的 resource 給相關的 request handler。如:將 client 待處理的 request 丟進 work queue ,並喚醒 thread 處理這些 request。函式 async_run( ) 即是呈現此功能。
Request Handler : 定義如何應用 requset handler 和他相關的資源。如:worker thread 去處理 client request
epoll
// 建立 epoll 物件並回傳其 fd
int epoll_create(int size)
int epoll_create1(int flags)
// 依照 op 去新增或移除監聽的 fd 或更改 fd 的監聽選項
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
// 等待已註冊之 event 被觸發或 timeout, 並回傳 ready fd 的數量
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout)
events
所指向的記憶體空間有被觸發的那些 eventset_fd_polling()
: 將 request
送入 epoll
之中
int set_fd_polling(int queue, int fd, int action, long milliseconds)
{
struct epoll_event chevent;
chevent.data.fd = fd;
// 這邊為 epoll 的系統呼叫參數來表達不同的 epoll 狀態
// EPOLLET : 將 epoll 設為 Edge Triggered
chevent.events = EPOLLOUT | EPOLLIN |
EPOLLET | EPOLLERR |
EPOLLRDHUP | EPOLLHUP;
...
// 對 request fd 執行 op 操作
return epoll_ctl(queue, action, fd, &chevent);
}
reator_review()
: 將 epoll 中的工作拿出來執行
int reactor_review(struct Reactor *reactor){
...
// 此 Marco 呼叫 epoll_wait(),並回傳有幾個工作要執行
int active_count = _WAIT_FOR_EVENTS_;
if (active_count < 0) return -1;
if (active_count > 0) {
for (int i = 0; i < active_count; i++) {
if (_EVENTERROR_())
/*errors are hendled as disconnections (on_close)*/
reator_close(reactor, _GETFD_(i));
}else{
/*no error, then it is an active event*/
// 以下的部分是在 ready queue 等待服務的事件抓出來執行
if (_EVEBTREADY_(i) && reactor->on_ready)
reactor->on_ready(reactor, _GETFD_(i));
if (_EVENTDATA_(i) && reactor->on_data)
reactor->on_data(reactor, _GETFD_(i));
}
}
}
return active_count;
}
epoll 這部分較難理解,和我一樣對於 I/O event 以及 epoll 完全不了解的新手,可以先看這篇 Linux IO模式及 select、poll、epoll详解,能大致理解 epoll 是在做甚麼事情。ChengCheng