# Chapter 12: Concurrent Programming 閱讀筆記 本文旨在記錄 *Computer Systems: A Programmer's Perspective* (CS:APP) 一書第七章閱讀心得,該書是 CMU 的計算機系統概論的教材 (難度相當於台灣的大學高年級),該書的簡體中文翻譯名稱為《深入理解計算機系統》。 CS:APP 亦是 [Linux Kernel Internals 2024 Spring](https://wiki.csie.ncku.edu.tw/linux/schedule) 課程指定教材,並一同收錄於 [Linux Kernel Internals 2024 Spring Collections](https://hackmd.io/@Kuanch/linux2024-collection)。 本章一開始就介紹了三種常見的並行方法,並在以下三節介紹 ## 12.1 Concurrent Programming with Processes ```c /* * echoserverp.c - A concurrent echo server based on processes */ /* $begin echoserverpmain */ #include "csapp.h" void echo(int connfd); void sigchld_handler(int sig) //line:conc:echoserverp:handlerstart { while (waitpid(-1, 0, WNOHANG) > 0) ; return; } //line:conc:echoserverp:handlerend int main(int argc, char **argv) { int listenfd, connfd; socklen_t clientlen; struct sockaddr_storage clientaddr; if (argc != 2) { fprintf(stderr, "usage: %s <port>\n", argv[0]); exit(0); } Signal(SIGCHLD, sigchld_handler); listenfd = Open_listenfd(argv[1]); while (1) { clientlen = sizeof(struct sockaddr_storage); connfd = Accept(listenfd, (SA *) &clientaddr, &clientlen); if (Fork() == 0) { Close(listenfd); /* Child closes its listening socket */ echo(connfd); /* Child services client */ //line:conc:echoserverp:echofun Close(connfd); /* Child closes connection with client */ //line:conc:echoserverp:childclose exit(0); /* Child exits */ } Close(connfd); /* Parent closes connected socket (important!) */ //line:conc:echoserverp:parentclose } } ``` 使用 Process 撰寫並行程式的優缺點很明顯;優點是每個行程都有獨立運作的記憶體區塊,撰寫時幾乎不需注意保護,而缺點就是行程間溝通之 IPC 開銷巨大,且必然涉入核心操作。 ## 12.2 Concurrent Programming with I/O Multiplexing ```c /* * echoservers.c - A concurrent echo server based on select */ /* $begin echoserversmain */ #include "csapp.h" typedef struct { /* Represents a pool of connected descriptors */ //line:conc:echoservers:beginpool int maxfd; /* Largest descriptor in read_set */ fd_set read_set; /* Set of all active descriptors */ fd_set ready_set; /* Subset of descriptors ready for reading */ int nready; /* Number of ready descriptors from select */ int maxi; /* Highwater index into client array */ int clientfd[FD_SETSIZE]; /* Set of active descriptors */ rio_t clientrio[FD_SETSIZE]; /* Set of active read buffers */ } pool; //line:conc:echoservers:endpool /* $end echoserversmain */ void init_pool(int listenfd, pool *p); void add_client(int connfd, pool *p); void check_clients(pool *p); /* $begin echoserversmain */ int byte_cnt = 0; /* Counts total bytes received by server */ int main(int argc, char **argv) { int listenfd, connfd; socklen_t clientlen; struct sockaddr_storage clientaddr; static pool pool; if (argc != 2) { fprintf(stderr, "usage: %s <port>\n", argv[0]); exit(0); } listenfd = Open_listenfd(argv[1]); init_pool(listenfd, &pool); //line:conc:echoservers:initpool while (1) { /* Wait for listening/connected descriptor(s) to become ready */ pool.ready_set = pool.read_set; pool.nready = Select(pool.maxfd+1, &pool.ready_set, NULL, NULL, NULL); /* If listening descriptor ready, add new client to pool */ if (FD_ISSET(listenfd, &pool.ready_set)) { //line:conc:echoservers:listenfdready clientlen = sizeof(struct sockaddr_storage); connfd = Accept(listenfd, (SA *)&clientaddr, &clientlen); //line:conc:echoservers:accept add_client(connfd, &pool); //line:conc:echoservers:addclient } /* Echo a text line from each ready connected descriptor */ check_clients(&pool); //line:conc:echoservers:checkclients } } /* $end echoserversmain */ /* $begin init_pool */ void init_pool(int listenfd, pool *p) { /* Initially, there are no connected descriptors */ int i; p->maxi = -1; //line:conc:echoservers:beginempty for (i=0; i< FD_SETSIZE; i++) p->clientfd[i] = -1; //line:conc:echoservers:endempty /* Initially, listenfd is only member of select read set */ p->maxfd = listenfd; //line:conc:echoservers:begininit FD_ZERO(&p->read_set); FD_SET(listenfd, &p->read_set); //line:conc:echoservers:endinit } /* $end init_pool */ /* $begin add_client */ void add_client(int connfd, pool *p) { int i; p->nready--; for (i = 0; i < FD_SETSIZE; i++) /* Find an available slot */ if (p->clientfd[i] < 0) { /* Add connected descriptor to the pool */ p->clientfd[i] = connfd; //line:conc:echoservers:beginaddclient Rio_readinitb(&p->clientrio[i], connfd); //line:conc:echoservers:endaddclient /* Add the descriptor to descriptor set */ FD_SET(connfd, &p->read_set); //line:conc:echoservers:addconnfd /* Update max descriptor and pool highwater mark */ if (connfd > p->maxfd) //line:conc:echoservers:beginmaxfd p->maxfd = connfd; //line:conc:echoservers:endmaxfd if (i > p->maxi) //line:conc:echoservers:beginmaxi p->maxi = i; //line:conc:echoservers:endmaxi break; } if (i == FD_SETSIZE) /* Couldn't find an empty slot */ app_error("add_client error: Too many clients"); } /* $end add_client */ /* $begin check_clients */ void check_clients(pool *p) { int i, connfd, n; char buf[MAXLINE]; rio_t rio; for (i = 0; (i <= p->maxi) && (p->nready > 0); i++) { connfd = p->clientfd[i]; rio = p->clientrio[i]; /* If the descriptor is ready, echo a text line from it */ if ((connfd > 0) && (FD_ISSET(connfd, &p->ready_set))) { p->nready--; if ((n = Rio_readlineb(&rio, buf, MAXLINE)) != 0) { byte_cnt += n; //line:conc:echoservers:beginecho printf("Server received %d (%d total) bytes on fd %d\n", n, byte_cnt, connfd); Rio_writen(connfd, buf, n); //line:conc:echoservers:endecho } /* EOF detected, remove descriptor from pool */ else { Close(connfd); //line:conc:echoservers:closeconnfd FD_CLR(connfd, &p->read_set); //line:conc:echoservers:beginremove p->clientfd[i] = -1; //line:conc:echoservers:endremove } } } } /* $end check_clients */ ``` I/O Multiplexing 能夠在 user space 中進行大多數操作而不涉入核心,故較有效率,但問題是程式的大小與複雜度膨脹快速,單是比較 Process 程式之行數就能夠看出來。 ## 12.3 Concurrent Programming with Threads ```c /* * echoservert.c - A concurrent echo server using threads */ /* $begin echoservertmain */ #include "csapp.h" void echo(int connfd); void *thread(void *vargp); int main(int argc, char **argv) { int listenfd, *connfdp; socklen_t clientlen; struct sockaddr_storage clientaddr; pthread_t tid; if (argc != 2) { fprintf(stderr, "usage: %s <port>\n", argv[0]); exit(0); } listenfd = Open_listenfd(argv[1]); while (1) { clientlen=sizeof(struct sockaddr_storage); connfdp = Malloc(sizeof(int)); //line:conc:echoservert:beginmalloc *connfdp = Accept(listenfd, (SA *) &clientaddr, &clientlen); //line:conc:echoservert:endmalloc Pthread_create(&tid, NULL, thread, connfdp); } } /* Thread routine */ void *thread(void *vargp) { int connfd = *((int *)vargp); Pthread_detach(pthread_self()); //line:conc:echoservert:detach Free(vargp); //line:conc:echoservert:free echo(connfd); Close(connfd); return NULL; } /* $end echoservertmain */ ``` Thread 並行程式大概是權衡後較常被使用的方案,本章後續也幾乎都是基於 Thread 之方案;其優點是 switch 與溝通之成本較 Process 並行便宜許多,且程式篇幅與複雜度也較 I/O Multiplexing 更小;缺點是需要注意執行緒之記憶體管理與保護。