# linux2021: meyr :::info 4 個字母的帳號名稱! 請收下我的膝蓋 :notes: jserv ::: ## δ-1 解釋上述程式碼運作原理並指出實作缺失 ### 1. 運作原理 因為多個thread存取,同一個資源queue(first in first out),所以必須使用lock機制來避免資料不同步的問題。 但是一般是使用一個lock來保護整個queue,造成push和pop容易卡住的問題。而使用兩個lock可以提升效能,但是必須注意同時存取同一個資源的問題。 當con_init()執行之後,queue會被init成下面的樣子。 其中first, last都指向dummy node. ![](https://i.imgur.com/FJ3LKpP.png) last_mutex是來保護last pointer,所以在執行con_push()後我沒會將new node接在dummy node之後。並且update last node。 ![](https://i.imgur.com/2XQTkZP.png) 所以動作為 1. queue->last->next = node; // 把last接上new node 2. queue->last = node; // 更新last pointer 詳細code如下: ```cpp int con_push(con_queue_t *restrict queue, void *restrict new_element) { /* Prepare new node */ node_t *node = _con_node_init(new_element); if (!node) return Q_ERROR; /* Add to queue with lock */ mtx_lock(queue->last_mutex); queue->last->next = node; queue->last = node; mtx_unlock(queue->last_mutex); return Q_OK; } ``` 其中first_mutex是來保護first pointer,所以在執行con_pop()中,在critical section中,我們只會針對first pointer做存取。我打算把dummy後面的第一個node pop出去。 如果當queue為empty,new_header會指向nullptr.則unlock之後返回null. ![](https://i.imgur.com/Abu7llC.png) 當queue裡面有node的時候,更新node->next為new_header->next。 ![](https://i.imgur.com/fTVKtkx.png) 但是當queue只有一個node的時候(除了dummy node),移除node會需要把last pointer更新到dummy node上面。理想上,應該只要存取first pointer變成,一次要存取兩個pointer。 ![](https://i.imgur.com/MFl8WFZ.png) 這邊有點卡住而且卡很久,因為本來預設一個lock只鎖定一個pointer,如果要在一個function使用兩個lock感覺會有很多問題。例如會產生deadlock.而且這個問題不解決,勢必會影響接下來的lock-free實作。而且使用兩個lock勢必會發生同時修改同一個node的情況。 **後來想想也許用一個lock保護整個queue會比較簡單一點,雖然效能會比較低落。** 參考了這個[網站](https://zacard.net/2021/01/19/java-linked-queue/) **把要pop出來的node當常新的dummy node,可以巧妙的避開修改last pinter的問題。** ![](https://i.imgur.com/kQk6TAt.png) 修改後的程式碼: ```cpp void *con_pop(con_queue_t *queue) { mtx_lock(queue->first_mutex); node_t *node = queue->first; /* Node to be removed */ node_t *new_header = queue->first->next; /* become the first in the queue */ /* Queue is empty */ if (!new_header) { mtx_unlock(queue->first_mutex); return NULL; } /* Queue not empty: retrieve data and rewire */ void *return_value = new_header->value; node = new_header; mtx_unlock(queue->first_mutex); /* Free removed node and return */ free(node); return return_value; } ``` ## δ-2 以 lock-free 程式設計 改寫上述程式碼 參考這個網站[簡化概念下的lock-free編程](https://zhuanlan.zhihu.com/p/53012280) lock-free的重點是,要不斷的判斷目前設定的pointer到底有沒有變動,如果沒有就用CAS(compare and swap)來更新現在的pointer. 以con_push()為例。不斷判斷last 是不是等於 queue->last,如果是,表示沒人改到queue->last,就把queue->last設為new node。如果last 不等於queue->last,表示有人改過,則修改last為新的queue->last,重新連結last->next = node,繼續上述的判斷。 ![](https://i.imgur.com/GP3yAPu.png) :::warning 已充分考慮 ABA 問題嗎? :notes: jserv ::: ```cpp int con_push(con_queue_t *restrict queue, void *restrict new_element) { /* Prepare new node */ node_t *node = _con_node_init(new_element); if (!node) return Q_ERROR; /* Add to queue without lock */ while(true) { Node *last = queue->last; last->next = node; if(queue->last.compare_exchange_weak(last, node)) break; } return Q_OK; } ``` 同理,con_pop()也是一樣。判斷node和queue->first一不一樣,一樣表示沒人動到queue->first,則進行交換。 ```cpp void *con_pop(con_queue_t *queue) { while(true) { node_t *node = queue->first; /* Node to be removed */ node_t *new_header = queue->first->next; /* become the first in the queue */ /* Queue is empty */ if (!new_header) return NULL; /* Queue not empty: retrieve data and rewire */ void *return_value = new_header->value; if(queue->first.compare_exchange_weak(node, new_header)) break; } /* Free removed node and return */ free(node); return return_value; } ``` --- ## ζ-1 解釋上述程式碼運作原理 ![](https://i.imgur.com/lV9RDoR.png) 使用listenfd來接收任何ip address連到這台主機的port 1922,當和client連線成功之後,就建立connfd和targetfd,並使用pipeline來達到這兩者的通訊。 使用poll同時監測兩個FD(connfd和targetfd),當一有data進來就使用zero-copy的splice把data頒給對方。 使用poll當中有個重要的data struct pollfd,來表現出要監測那個FD和監測那些events. ```cpp struct pollfd { int fd; /* file descriptor */ short events; /* requested events */ short revents; /* returned events */ }; ``` 根據man page,events是你想監測的事件。revents是kernel回應給你,叫起因為被poll所block住的event。 其中: POLLIN : 有data可以讀取。 POLLPRI : 有特別的狀況。 POLLOUT : 可以寫入。 當中的錯誤訊息event一定會被回報到revents,像是POLLERR, POLLHUP, or POLLNVAL. poll的API如下: ```cpp int poll(struct pollfd *fds, nfds_t nfds, int timeout); ``` nfds_t nfds : 為輸入的fds陣列長度。 timeout : 等待多少ms後跳出。return value = 0. 所以以下的程式碼就是等待connfd和targetfd直到有events進來,如果是POLLIN則把data搬到對方去。 ```cpp // wait 1000ms or event coming while ((ret = poll(polls, 2, 1000)) != -1) { if (ret == 0) // timeout continue; int from_client = polls[0].revents & POLLIN; if (from_client) move(cl_fd, target_fd, fds); else move(target_fd, cl_fd, fds); } ``` --- ## ζ-2 以 epoll 系統呼叫改寫程式碼,並設計實驗來驗證 proxy 程式碼的效率 ###### tags: `linux2021`