--- tags: linux-summer-2021 --- # linux2021: RinHizakura > [測驗題](https://hackmd.io/@sysprog/linux2021-summer/https%3A%2F%2Fhackmd.io%2F%40sysprog%2Flinux2021-summer-quiz?fbclid=IwAR3gWR-9UYlZ7vrcou6_fxIp4QfR2QNB7V4ULpk6s_SVkPPG56XLfEGSqUg) ## 測驗 $\alpha$ 以最簡單的 8 bits 為例,例如向左 rotate `c` 之情形,即先左 shift `c` bits,而高位元的 `c` bits 需要回到低位,即向右 `8-c` 之結果,因此最直覺的答案是 ```cpp (v << c) | (v >> (bits - c)); ``` :::warning 雖然直覺但事實上存在一些問題,請看 [Linux 中的 bit rotation](#Linux-中的-bit-rotation) ::: 但規定不能用到 `bits`,但觀察可知因為數字都是 $2^n$ bits,因此可以用補數方式來代替 `8 - c` ```cpp (v << c) | (v >> ((~c + 1) & mask); ``` 題目沒提到可以用 `~`,但 `~c + 1` 就相當於 `-c`,因此: * LLL = `v >> (-c & mask)` * RRR = `v << (-c & mask)` ### Linux 中的 bit rotation 對照 [include/linux/bitops.h](https://github.com/torvalds/linux/blob/master/include/linux/bitops.h) 也有相似的函數,例如 32 bits 的 left/right rotation。其中,比較有趣的發現是 bit rotation 在 linux 核心的演化。 對於 bit rotation 所提交的 patch,主要是針對一些特殊 input 參數會造成的 undefined behavior 去做了修改,在 [`d7e35df`](https://github.com/torvalds/linux/commit/d7e35dfa2531b53618b9e6edcd8752ce988ac555) 的 patch 以前,rotation 都是以下的形式 ```cpp static inline __u32 rol32(__u32 word, unsigned int shift) { return (word << shift) | (word >> (32 - shift)); } ``` 乍看之下是我題解時說的**最直覺的答案**,不過這邊我們仔細思考會發現 `(32 - shift)` 和 `(-shift) & 31` 實際上是有不同的,差異在於當 `shift` 為 0 時,前者的 `>> 32` 對於 32 bits 的整數在在 C 語言中是 undefined behavior: > The integer promotions are performed on each of the operands. The type of the result is that of the promoted left operand. If the value of the right operand is negative or is greater than or equal to the width of the promoted left operand, the behavior is undefined. 後者則可以產生有明確行為的 `>> 0`。因此 [`d7e35df`](https://github.com/torvalds/linux/commit/d7e35dfa2531b53618b9e6edcd8752ce988ac555) 的 patch 對此進行了修改,使 `shift` 參數的適用範圍變成了 [0, 31],但不確定僅止於修改 `rol32` 的具體原因。 ```cpp static inline __u32 rol32(__u32 word, unsigned int shift) { return (word << shift) | (word >> ((-shift) & 31)); } ``` :::info 總結正確性來說當然是後者更好,但後者在舊的編譯器其實還有轉換成 assembly 時會相較於前者慢的狀況(無法產生對應的 rotation instruction),足以見到程式在考量正確性與執行速度需要思考的更多面向,這也是延伸問題 2 所要求探討的問題。 可以參考 [Poor optimization of portable rotate idiom](https://gcc.gnu.org/bugzilla/show_bug.cgi?id=57157) ::: patch [`ef4d6f6`](https://github.com/torvalds/linux/commit/ef4d6f6b275c498f8e5626c99dbeefdc5027f843#diff-ccef12ab7ed614f48bfce71f238a5feb431a31557779e581d63f701ad2c72914) 進行了更完善的改進,包含了其它不同長度整數的 rotation 中未被修改的 `(bits - shift)` pattern,也對例如 32 bits 的 `shift` 超出 31 時的行為進行了調整: ```cpp static inline __u32 rol32(__u32 word, unsigned int shift) { return (word << (shift & 31)) | (word >> ((-shift) & 31)); } ``` 值得一提的是,雖然這個 `ef4d6f6` patch 為了一致性的考量將所有的 rotation 都設計為類似的模式,但其實並非所有位元的 bit rotation 都要完全遵守的這個模式以滿足想要的正確性,需要考慮 C 語言標準的 integer promotion 所對應的程式行為。例如 8 bits 的 `rol` ```cpp static inline __u8 rol8(__u8 word, unsigned int shift) { return (word << shift) | (word >> (8 - shift)); } ``` 當 `shift == 0` 時,作為 8 bits 非負整數的 `word` 之 `>> 8` 行為並非 undefined behavior,因為要注意到 integer promotion 的影響。根據 C 語言規格書 6.3.1.1: > If an int can represent all values of the original type (as restricted by the width, for a bit-field), the value is converted to an int; otherwise, it is converted to an unsigned int. These are called the integer promotions. All other types are unchanged by the integer promotions. 當 `word` 被 promote 成 int 型態後,右移 8 bits 的語言行為為 well-defined。 ## 測驗 $\beta$ 先考慮最容易的 general 的 align down,只要先除以 `alignment` 取整數,再乘以 `alignment` 即可。而要做到 general 的 align up,先將原數字加上 `alignment - 1`,然後除以 `alignment` 取整數,再乘以 `alignment` 即可。 當 `alignment` 為 $2^n$ 時,由於除以 `alignment` 取整數,再乘以 `alignment` 的這個操作等同於 `& ~(alignment - 1)`。因此此題答案是: * MMM = `(sz + mask) & ~mask` :::success :negative_squared_cross_mark: Wrong answer? 另一個思路是當 alignment 是 $2^n$ 時,我們要知道 $k$ 還差多少可以補到對齊,該值即為對 $k$ 取補數 + 1 之結果。因此: * MMM = `sz + ((~sz + 1) & mask)` ::: ### Linux 中的 align [include/uapi/linux/const.h](https://github.com/torvalds/linux/blob/master/include/uapi/linux/const.h) 中可以看到 ```cpp #define __ALIGN_KERNEL(x, a) __ALIGN_KERNEL_MASK(x, (typeof(x))(a) - 1) #define __ALIGN_KERNEL_MASK(x, mask) (((x) + (mask)) & ~(mask)) ``` :::info 在 patch [`a85cbe6`](https://github.com/torvalds/linux/commit/a85cbe6159ffc973e5702f70a3bd5185f8f3c38d) 以前存在於 include/uapi/linux/kernel.h 路徑,主要是避免 include 之間的間接引用導致的重定義 ::: [include/linux/align.h](https://github.com/torvalds/linux/blob/master/include/linux/align.h) 則有以此定義出來的 macro: ```cpp /* @a is a power of 2 value */ #define ALIGN(x, a) __ALIGN_KERNEL((x), (a)) #define ALIGN_DOWN(x, a) __ALIGN_KERNEL((x) - ((a) - 1), (a)) ``` ## 測驗 $\gamma$ 首先,將 fork 的過程可以用以下的圖理解,以 `NNN == 3` 為例: ```graphviz digraph graphname{ node[shape=none] rankdir=LR A[shape = none, label= <<table border="0" cellspacing="0"> <tr><td port="port1" border="1" bgcolor="red">A</td></tr> <tr><td port="port2" border="1">i=0</td></tr> <tr><td port="port3" border="1">i=1</td></tr> <tr><td port="port4" border="1">i=2</td></tr> </table>> ] B[shape = none, label= <<table border="0" cellspacing="0"> <tr><td port="port1" border="1" bgcolor="red">B</td></tr> <tr><td port="port2" border="1" bgcolor="gray">i=0</td></tr> <tr><td port="port3" border="1">i=1</td></tr> <tr><td port="port4" border="1">i=2</td></tr> </table>> ] C[shape = none, label= <<table border="0" cellspacing="0"> <tr><td port="port1" border="1" bgcolor="red">C</td></tr> <tr><td port="port2" border="1" bgcolor="gray">i=1</td></tr> <tr><td port="port3" border="1">i=2</td></tr> </table>> ] D[shape = none, label= <<table border="0" cellspacing="0"> <tr><td port="port1" border="1" bgcolor="red">D</td></tr> <tr><td port="port2" border="1" bgcolor="gray">i=1</td></tr> <tr><td port="port3" border="1">i=2</td></tr> </table>> ] E[shape = none, label= <<table border="0" cellspacing="0"> <tr><td port="port1" border="1" bgcolor="red">E</td></tr> <tr><td port="port2" border="1" bgcolor="gray">i=2</td></tr> </table>> ] F[shape = none, label= <<table border="0" cellspacing="0"> <tr><td port="port1" border="1" bgcolor="red">F</td></tr> <tr><td port="port2" border="1" bgcolor="gray">i=2</td></tr> </table>> ] G[shape = none, label= <<table border="0" cellspacing="0"> <tr><td port="port1" border="1" bgcolor="red">G</td></tr> <tr><td port="port2" border="1" bgcolor="gray">i=2</td></tr> </table>> ] H[shape = none, label= <<table border="0" cellspacing="0"> <tr><td port="port1" border="1" bgcolor="red">H</td></tr> <tr><td port="port2" border="1" bgcolor="gray">i=2</td></tr> </table>> ] A:port2->B:port1 A:port3->C:port1 B:port3->D:port1 A:port4->E:port1:w B:port4->F:port1 D:port3->G:port1 C:port3->H:port1 } ``` * 每一次迴圈會產生 2 倍數量的 process,因此總共有 $2^{NNN}$ 個 process * 圖中每一個 `i` 表示一次 `printf` 呼叫,我們可以統整出 `printf` 數量的式子: $$ NNN + \sum_{i=0}^{NNN-1} (NNN - i) \times 2^i $$ 若不考慮 buffer I/O,可以通過 `setvbuf(stdout, NULL, _IONBF, 0);` 使 printf 不進 buffer I/O 來對照結果。 但是若考慮 buffer I/O,正常的情形是用 line buffering,即遇到換行或者程式結束才輸出結果。此時需要考慮在 fork 的時候,前一個 buffer 的內容會複製到下一個 process 去,所以如下圖所示: * 例如 `E:2` 表示一開始 buffer 就有兩個 `-` * 繼承的算法是從初始的 buffer 數量加上 fork 時的該 process 又向內加入的數量 * 注意是先 fork 再 printf ```graphviz digraph graphname{ node[shape=none] rankdir=LR A[shape = none, label= <<table border="0" cellspacing="0"> <tr><td port="port1" border="1" bgcolor="red">A:0</td></tr> <tr><td port="port2" border="1">i=0</td></tr> <tr><td port="port3" border="1">i=1</td></tr> <tr><td port="port4" border="1">i=2</td></tr> </table>> ] B[shape = none, label= <<table border="0" cellspacing="0"> <tr><td port="port1" border="1" bgcolor="red">B:0</td></tr> <tr><td port="port2" border="1" bgcolor="gray">i=0</td></tr> <tr><td port="port3" border="1">i=1</td></tr> <tr><td port="port4" border="1">i=2</td></tr> </table>> ] C[shape = none, label= <<table border="0" cellspacing="0"> <tr><td port="port1" border="1" bgcolor="red">C:1</td></tr> <tr><td port="port2" border="1" bgcolor="gray">i=1</td></tr> <tr><td port="port3" border="1">i=2</td></tr> </table>> ] D[shape = none, label= <<table border="0" cellspacing="0"> <tr><td port="port1" border="1" bgcolor="red">D:1</td></tr> <tr><td port="port2" border="1" bgcolor="gray">i=1</td></tr> <tr><td port="port3" border="1">i=2</td></tr> </table>> ] E[shape = none, label= <<table border="0" cellspacing="0"> <tr><td port="port1" border="1" bgcolor="red">E:2</td></tr> <tr><td port="port2" border="1" bgcolor="gray">i=2</td></tr> </table>> ] F[shape = none, label= <<table border="0" cellspacing="0"> <tr><td port="port1" border="1" bgcolor="red">F:2</td></tr> <tr><td port="port2" border="1" bgcolor="gray">i=2</td></tr> </table>> ] G[shape = none, label= <<table border="0" cellspacing="0"> <tr><td port="port1" border="1" bgcolor="red">G:2</td></tr> <tr><td port="port2" border="1" bgcolor="gray">i=2</td></tr> </table>> ] H[shape = none, label= <<table border="0" cellspacing="0"> <tr><td port="port1" border="1" bgcolor="red">H:2</td></tr> <tr><td port="port2" border="1" bgcolor="gray">i=2</td></tr> </table>> ] A:port2->B:port1 A:port3->C:port1 B:port3->D:port1 A:port4->E:port1:w B:port4->F:port1 D:port3->G:port1 C:port3->H:port1 } ``` 所以整理規則後,考慮 buffer I/O 的輸出數量公式如下 $$ NNN + \sum_{i=0}^{NNN-1} ((NNN - i) \times 2^i )+ \sum_{i=1}^{NNN-1} i \times 2^i $$ 附上一個醜醜的 python 進行測試,可以得到 * `NNN` = 12 ```python def fun(k): sum = k for i in range(0, k): sum += (k - i) * (2**i) for i in range(1, k): sum += i * (2**i) return sum ``` :::warning :warning: 再往下延伸思考,這個式子能保持正確的 `N` 是有極限的! 這是因為 buffer I/O 的 buffer 大小也是有限的,因此除了遇到換行,若 buffer 已經滿了也會造成 flush 的行為發生。例如我們可以設置一個只有 10 個 char 大小的 buffer,觀察 `NNN = 12` 時的結果。 ```char char buffer[10]; setvbuf(stdout, buffer, _IOLBF, 10); ``` ::: ## 測驗 $\delta$ ### 實作 `thrd_` 為 prefix 的相關結構與函數來自 `threads.h` 函式庫,與 `pthread_` 系列的 thread 是 UNIX 標準而後者遵守 ISO C/POSIX 標準。 [ISO C Thread Management (The GNU C Library)](https://www.gnu.org/software/libc/manual/html_node/ISO-C-Thread-Management.html) #### `con_queue_t` ```cpp enum { Q_OK, Q_ERROR }; typedef struct { /* Queue node */ void *value; void *next; } node_t; typedef struct { /* Two lock queue */ node_t *first, *last; mtx_t *first_mutex, *last_mutex; } con_queue_t; ``` 示例為此 MPMC 結構的基本資料結構 * `first`、`last` 指向維護的 queue 結構中的一個節點,從節點可以看到 queue 只是單向的 linked list 且存有一個值 #### `con_init` ```cpp con_queue_t *con_init() { /* Allocate queue */ con_queue_t *queue = malloc(sizeof(con_queue_t)); if (!queue) return NULL; if ((queue->first_mutex = malloc(sizeof(mtx_t))) == NULL) { free(queue); return NULL; } if ((queue->last_mutex = malloc(sizeof(mtx_t))) == NULL) { free(queue->first_mutex); free(queue); return NULL; } ``` 分配 `con_queue_t` 以及 mutex 結構需要的動態空間 ```cpp if (mtx_init(queue->first_mutex, mtx_plain) != thrd_success || mtx_init(queue->last_mutex, mtx_plain) != thrd_success) { con_free(queue); return NULL; } ``` [`mtx_init`](https://en.cppreference.com/w/c/thread/mtx_init) 建立 mutex 物件。其中指定為 [`mtx_plain`](https://en.cppreference.com/w/c/thread/mtx_types) 類型: > *a simple, non-recursive mutex* recursive mutex 又名 [Reentrant mutex](https://en.wikipedia.org/wiki/Reentrant_mutex),總結來說就是允許遞迴呼叫使用的 mutex,thread 被允許對自己有持有權的 mutex 多次的進行 lock,因此該 thread 將不會 block 自己。 > * [Recursive Lock (Mutex) vs Non-Recursive Lock (Mutex)](https://stackoverflow.com/questions/187761/recursive-lock-mutex-vs-non-recursive-lock-mutex) ```cpp node_t *dummy = _con_node_init(NULL); if (!dummy) { con_free(queue); return NULL; } queue->first = queue->last = dummy; return queue; } ``` `_con_node_init` 以給定的值 value 建立一個 dummy node,且初始化時將 first 與 last 都指向此 node。 #### `push_thread` ```cpp int push_thread(void *queue_ptr) { con_queue_t *queue = (con_queue_t *) queue_ptr; /* Push ints into queue */ for (int i = 0; i < NUM; ++i) { int *pushed_value = malloc(sizeof(int)); *pushed_value = i; if (con_push(queue, pushed_value) != Q_OK) printf("Error pushing element %i\n", i); } thrd_exit(0); } ``` 生產者的 thread 會迴圈建立 int pointer 並透過核心的 `con_push` 加入 queue 中 #### `con_push` ```cpp int con_push(con_queue_t *restrict queue, void *restrict new_element) { /* Prepare new node */ node_t *node = _con_node_init(new_element); if (!node) return Q_ERROR; /* Add to queue with lock */ mtx_lock(queue->last_mutex); AAA; queue->last = node; mtx_unlock(queue->last_mutex); return Q_OK; } ``` * `last_mutex` 提供對 `last` 指標節點的互斥。[`mtx_lock`](https://en.cppreference.com/w/c/thread/mtx_lock) 將 thread block 住直到 `last_mutex` 不被其他 thread 持有,取得之並上鎖 * 將建立的 node append 在 linked list 的 最後位置,並且將 last 指向之 * AAA = `queue->last->next = node` * [`mtx_unlock`](https://en.cppreference.com/w/c/thread/mtx_unlock) 釋放 lock 所有權 #### `pop_thread` ```cpp int pop_thread(void *queue_ptr) { con_queue_t *queue = (con_queue_t *) queue_ptr; /* Read values from queue. Break loop on -1 */ while (1) { int *popped_value = con_pop(queue); if (popped_value) { if (*popped_value == -1) { free(popped_value); break; } free(popped_value); } } thrd_exit(0); } ``` 消費者的 thread 會迴圈取出 queue 中的 int pointer,透過核心的 `con_pop` 來達到此一目的。其中,queue 中若出現值為 `-1` 的 int pointer,表示終止程式運行。 #### `con_pop` ```cpp void *con_pop(con_queue_t *queue) { mtx_lock(queue->first_mutex); node_t *node = queue->first; /* Node to be removed */ node_t *new_header = queue->first->next; /* become the first in the queue */ /* Queue is empty */ if (!new_header) { mtx_unlock(queue->first_mutex); return NULL; } /* Queue not empty: retrieve data and rewire */ void *return_value = BBB; CCC; mtx_unlock(queue->first_mutex); /* Free removed node and return */ free(node); return return_value; } ``` * `first_mutex` 提供對 `first` 指標節點的互斥 * 若 `queue->first->next` 並非 NULL,表示 queue 裡的並非只有一個 node (dummy node),則表示 queue 並非空,取出 element * BBB = `new_header->value` :::danger 注意這裡 `node->value` 是錯誤的,正確的值必須是在下一個 node 中而非即將釋放的 node(思考一下 dummy 與新加入 node 的關係)。 ::: * 並且將 `queue->first` 也進行更新 * CCC = `queue->first = new_header` ## 測驗 $\epsilon$ ### 實作 #### `mpool` ```cpp typedef struct { int cnt; /* actual pool count */ int pal; /* pool array length (2^x ceil of cnt) */ int min_pool, max_pool; /* minimum/maximum pool size */ void **ps; /* pools */ int *sizes; /* chunk size for each pool */ void *hs[1]; /* heads for pools' free lists */ } mpool; ``` * `cnt` 是共使用了多少 memory pool 數量 * `pal` 是所有已配置的 memory pool 數量(包含已啟用與未啟用) * `min_pool`, `max_pool` 是最大與最小的 pool 之實際大小 * `**ps` 是一個動態分配的 pointer 陣列,而每個 pointer 將會指向一個 memory pool * `*size` 是動態分配的整數陣列,每個 `idx` 對應的 element 表示 `ps[idx]` 裡的 pool 之分配大小 * `*hs[1]` 是一個 struct hack 技巧,允許配置動態大小的 struct,對應每個大小的 pool,會指向 `**ps` 中的一段空間 * 和 C90 後的 [Zero length array](https://gcc.gnu.org/onlinedocs/gcc/Zero-Length.html) 語法可提供的效果相近 #### `iceil2` ```cpp static unsigned int iceil2(unsigned int x) { x = x - 1; x = x | (x >> 1); x = x | (x >> 2); x = x | (x >> 4); x = x | (x >> 8); x = x | (x >> 16); return XXX; } ``` iceil2 用來取得比數字 `x` 大且最相近的整數 $2^n$。想法為: 對於數字 32 位元的數字 `x`,假設最高位的 1 在第 n 個 bit,如果該 bit 後面都是 0 則即為解,如果不是,只要把後面的 0 都補為 1 然後再加 1 即可。 > 類似想法可參考我的筆記: [Round Up / Down Power-of-2](https://hackmd.io/9tGfpJtLTwyyOzDvlyJS2w#Round-Up--Down-Power-of-2) * XXX = `x + 1` #### `mpool_init` ```cpp mpool *mpool_init(int min2, int max2) { int cnt = max2 - min2 + 1; /* pool array count */ int palen = iceil2(cnt); assert(cnt > 0); mpool *mp = malloc(sizeof(mpool) + (cnt - 1) * sizeof(void *)); void *pools = malloc(palen * sizeof(void **)); int *sizes = malloc(palen * sizeof(int)); if (!mp || !pools || !sizes) return NULL; mp->cnt = cnt; mp->ps = pools; mp->pal = palen; mp->sizes = sizes; mp->min_pool = (1 << min2), mp->max_pool = (1 << max2); memset(sizes, 0, palen * sizeof(int)); memset(pools, 0, palen * sizeof(void *)); memset(mp->hs, 0, cnt * sizeof(void *)); return mp; } ``` 初始化範圍在 $2^{min2}$ and $2^{max2}$ 的 memory pool。 * `mpool *mp = malloc(sizeof(mpool) + (cnt - 1) * sizeof(void *));` 配置 mpool 需要的空間 * 預先分配一定數量(`palen`) 的 memory pool array 和紀錄其對應分配大小的 array * 初始化 `mp` 的成員,包含動態空間的內容(必要,因為需以內容是否為 NULL 判斷是否為空) #### `mpool_new_pool` ```cpp void **mpool_new_pool(unsigned int sz, unsigned int total_sz) { int o = 0; /* o=offset */ void *p = get_mmap(sz > total_sz ? sz : total_sz); if (!p) return NULL; int **pool = (int **) p; assert(pool); assert(sz > sizeof(void *)); void *last = NULL; int lim = (total_sz / sz); for (int i = 0; i < lim; i++) { if (last) assert(last == pool[o]); o = (i * sz) / sizeof(void *); pool[o] = (int *) &pool[o + (sz / sizeof(void *))]; last = pool[o]; } pool[o] = NULL; return p; } ``` `mpool_new_pool` 與 pool 的設計和 `mpool_alloc` 的設計是有關聯的 * 首先,先透過 mmap 得到一塊 anonymous page(在此案例中是 `PAGE_SIZE` 大小) * `lim` 來自對這塊 anonymous page 可以切成多少等份要求的 `sz` 大小的計算 for 迴圈中,計算的 `o` 可以理解為若每次分配的大小 `sz` 相當於是 `k` 個 `void *` (`(sz / void*) == k`),那麼分配的地址其實就依次要是 `pool + i * k`(i 從 0 開始)。 而對於記憶體的配置,該空間被分配前相當於是配置器可以完全去善用的。所以這裡就將尚未分配的空間中儲存下一個要分配的空間之位置,用類似於 linked list 的架構可以讓 `mpool_alloc` 變得相對簡單。 假設一個 `void *` 是 4 bytes,pool 分配的目標 size 是 12 bytes,而 pool 大小是 36 bytes。結構如下圖所示: ```graphviz digraph { node [shape=plaintext, fontcolor=red, fontsize=18]; "Address:" -> "Array:" [color=white]; node [shape=record, fontcolor=black, fontsize=14, width=4.75, fixedsize=true]; pointers [label="<f0> A | <f1> A+4 | <f2> A+8 | <f3> A+12 | <f4> A+16 | <f5> A+20 | <f6> A+24| <f7> A+28| <f8> A+32", color=white]; { rankdir = LR rank=same arrays [ shape = none label=<<table border="0" cellborder="1" cellspacing="0"> <tr> <td port="f0" bgcolor="green"><font point-size="18">A[0]</font></td> <td port="f1" bgcolor="green"><font point-size="18">A[1]</font></td> <td port="f2" bgcolor="green"><font point-size="18">A[2]</font></td> <td port="f3" bgcolor="red"><font point-size="18">A[3]</font></td> <td port="f4" bgcolor="red"><font point-size="18">A[4]</font></td> <td port="f5" bgcolor="red"><font point-size="18">A[5]</font></td> <td port="f6" bgcolor="yellow"><font point-size="18">A[6]</font></td> <td port="f7" bgcolor="yellow"><font point-size="18">A[7]</font></td> <td port="f8" bgcolor="yellow"><font point-size="18">A[8]</font></td> </tr> </table>> ] null[shape=plaintext,width=1.5] } { rank=same; "Array:"; arrays } edge [color=red]; arrays:f0:s -> arrays:f3:s; arrays:f3:s -> arrays:f6:s; arrays:f6:s -> null edge [color=blue]; pointers:f0 -> arrays:f0; pointers:f1 -> arrays:f1; pointers:f2 -> arrays:f2; pointers:f3 -> arrays:f3; pointers:f4 -> arrays:f4; pointers:f5 -> arrays:f5; pointers:f6 -> arrays:f6; pointers:f7 -> arrays:f7; pointers:f8 -> arrays:f8; } ``` #### `add_pool` ```cpp static int add_pool(mpool *mp, void *p, int sz) { assert(p); assert(sz > 0); if (mp->cnt == mp->pal) { mp->pal *= 2; /* RAM will exhaust before overflow */ void **nps = realloc(mp->ps, mp->pal * sizeof(void **)); void *nsizes = realloc(mp->sizes, mp->pal * sizeof(int *)); if (!nps || !nsizes) return -1; mp->sizes = nsizes; mp->ps = nps; } mp->ps[mp->cnt] = p; mp->sizes[mp->cnt] = sz; mp->cnt++; return 0; } ``` `add_pool` 將新建立的 pool `p`(為 `sz` 大小)加入 `mp` * 如果預先建立的 pool 數量已經不夠使用(`mp->cnt == mp->pal`),則需要將 pool array 和 size array 都擴大(realloc) #### `mpool_alloc` ```cpp void *mpool_alloc(mpool *mp, int sz) { void **cur; int i, p; assert(mp); if (sz >= mp->max_pool) { cur = get_mmap(sz); /* just mmap it */ if (!cur) return NULL; return cur; } ``` * 透過 `mpool_alloc` 介面要求的大小大於 `mp->max_pool` 時,直接透過 `mmap` 建立一塊可讀可寫的 anonymous page ```cpp long szceil = 0; for (i = 0, p = mp->min_pool;; i++, p *= 2) { if (p > sz) { szceil = p; break; } } assert(szceil > 0); ``` 根據要求的大小尋找適合大小的 pool 以及其 index ```cpp cur = mp->hs[i]; /* get current head */ if (!cur) { /* lazily allocate and initialize pool */ void **pool = mpool_new_pool(szceil, PAGE_SIZE); if (!pool) return NULL; mp->ps[i] = mp->hs[i] = pool; mp->sizes[i] = szceil; cur = mp->hs[i]; } assert(cur); ``` `mp->hs[i]` 應該要指向即將被分配的空間位址。這裡的 if 中的 context 會在某個尺寸的 pool 最初被啟用時執行 * 最開始時 `mp->hs[i]` 為空,先透過 `mpool_new_pool` 去配置 pool 空間 * 並將 `ps[i]` 和 `hs[i]` 指向該空間,`sizes[i]` 也根據大小去設置 * 最後將 cur 指向 pool 的開頭,也就是此次要 allocate 的位址 ```cpp if (!(*cur)) { /* if at end, attach to a new page */ void **np = mpool_new_pool(szceil, PAGE_SIZE); if (!np) return NULL; *cur = &np[0]; assert(*cur); if (add_pool(mp, np, szceil) < 0) return NULL; } assert(*cur > (void *) PAGE_SIZE); mp->hs[i] = *cur; /* set head to next head */ return cur; } ``` * 如果 `cur` 指向的內容(`*cur`)為 NULL,表示這個 pool 已經用完了,則透過 `mpool_new_pool` 配置新的 memory pool * 相對於前一個 if 有明確的位置,這裡的 pool 需要透過 `add_pool` 加到 `mp` 結構中 * 最後更新 `mp->hs[i]` 指向下一個配置的位址,並回傳此次的配置位置 `cur` #### `mpool_repool` ```cpp void mpool_repool(mpool *mp, void *p, int sz) { int i = 0; if (sz > mp->max_pool) { if (munmap(p, sz) == -1) fprintf(stderr, "Failed to unmap %d bytes at %p\n", sz, p); return; } void **ip = (void **) p; YYY; assert(ip); mp->hs[i] = ip; } ``` :::danger 如果沒理解錯誤,`repool` 的用途應是將大小為 `sz` 的指標 `*p` 釋放回 pool 中? ::: * 對於超過 `max_pool` 大小的配置記憶體,實際上沒有特殊的管理,因此直接 `munmap` 即可 * 對於其他大小 1. 首先應該要找到對應的 freelist header 2. 將 freelist header 所指向內容放到要釋放的 `*p` 中,然後再把 freelist header 指向 `*p` :::warning :warning: 答案是給 YYY = `*ip = mp->hs[i];`,但我認為如果考慮行為的正確性,`YYY` 的答案應該沒辦法用一個 statement 概括(?) 我會認為 YYY = ```cpp for (int j = mp->min_pool;; i++, j *= 2) { if (j > sz) break; } *ip = mp->hs[i]; ``` ::: ## 測驗 $\zeta$ ### 實作 #### `move` ```cpp static void move(int in_fd, int out_fd, int pip[2]) { int ret = splice(in_fd, NULL, pip[1], NULL, 8, SPLICE_F_MOVE); if (ret == -1) { perror("splice(1)"); return; } ret = splice(pip[0], NULL, out_fd, NULL, 8, SPLICE_F_MOVE); if (ret == -1) { perror("splice(1)"); return; } } ``` 對於建立好的 pipe(`pip[2]`),[`splice`](https://man7.org/linux/man-pages/man2/splice.2.html) 可以避免一般的 file descriptor 間的資料交換需要在 kernel space 和 user space 間來回的搬動的問題(詳見 [splice 系統呼叫](https://hackmd.io/@sysprog/linux2020-zerocopy#splice-%E7%B3%BB%E7%B5%B1%E5%91%BC%E5%8F%AB)) * `splice` 的其中一個 fd 參數必須是 pipe,所以要透過兩次 splice 移動 `in_fd` 的資料到 `out_fd` #### `proxy` ```cpp static void proxy(int cl_fd, int target_fd) { if (target_fd == -1) return; int fds[2]; if (pipe(fds) == -1) { perror("pipe"); return; } struct pollfd polls[2] = { [1] = {III}, [0] = {JJJ}, }; int ret; while ((ret = poll(polls, 2, 1000)) != -1) { if (ret == 0) continue; int from_client = polls[0].revents & POLLIN; if (from_client) move(cl_fd, target_fd, fds); else move(target_fd, cl_fd, fds); } perror("poll"); } ``` * 透過 [`pipe`](https://man7.org/linux/man-pages/man2/pipe.2.html) 為 `splice` 的調用進行準備 > pipe() creates a pipe, a unidirectional data channel that can be used for interprocess communication. The array `pipefd` is used to return two file descriptors referring to the ends of the pipe. `pipefd[0]` refers to the read end of the pipe. `pipefd[1] refers` to the write end of the pipe. Data written to the write end of the pipe is buffered by the kernel until it is read from the read end of the pipe. * [`poll`](https://man7.org/linux/man-pages/man2/poll.2.html) 監控 `polls` fd 集合的事件發生,其中 timeout 設置為 1000ms * `POLLIN` 表示回傳的事件是要讀取某個 fd,所以 `polls` 的 index 0 對應的 fd 應該是 `cl_fd` :::danger :negative_squared_cross_mark: Wrong answer? * JJJ = `{.fd=cl_fd, .events=POLLIN}` * III = `{.fd=target_fd, .events=POLLIN}` ::: #### `main` ```cpp int main() { ... int listenfd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_addr.s_addr = htonl(INADDR_ANY); addr.sin_port = htons(PORT); int optval = 1; setsockopt(listenfd, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval)); bind(listenfd, (struct sockaddr *) &addr, sizeof(addr)); listen(listenfd, 1); ``` * [`socket`](https://man7.org/linux/man-pages/man2/socket.2.html) 建立一個連接的端點(endpoint),返回的值是該端點的 file descriptor * AF_INET: IPv4 Internet protocols * SOCK_STREAM: 使用TCP(資料流)方式提供可靠、雙向、串流的通信頻道 * 用 [`bind`](https://man7.org/linux/man-pages/man2/bind.2.html) 和 socket 建立綁定,將 `listenfd` 和 `addr` 連繫起來 > Traditionally, this operation is called "assigning a name to a socket". * `struct sockaddr` 結構包含連接 socket 的相關資訊 * `.sin_family`: AF_INET = IPv4 Internet protocols * `.sin_port`: htons 將 PORT(這裡是 1922) 從 unsigned short 數字轉換成 network byte order * `.sin_addr.s_addr`: htonl 將 unsigned integer 數字轉換成 network byte order > [ip](https://man7.org/linux/man-pages/man7/ip.7.html): INADDR_ANY (0.0.0.0) means any address for binding * 利用 [`llisten`](https://man7.org/linux/man-pages/man2/listen.2.html) 去監聽 socket * 第二個參數的 `1` 是連接請求能夠 pending 的最大數量 ```cpp while (1) { int connfd = accept(listenfd, (struct sockaddr *) NULL, NULL); int target_fd = connect_to(argv[1], atoi(argv[2])); if (target_fd >= 0) proxy(connfd, target_fd); close(connfd); } ``` * 對監聽的 socket 之 file descriptor `listfd` 進行 [`accept()`](https://man7.org/linux/man-pages/man2/accept.2.html) system call,獲得在監聽對列中的連接請求 * 第二個參數在 system call 後會填入請求的 client 端之相關資訊,設定為 NULL 則不取得之 * 第三個參數是也因此設為 NULL * 返回新的 socket `connfd` 可以用來跟接受的 client 做 communication * `connect_to` 根據執行時給定的參數(ip 位址和 port)建立對儲存伺服端的 socket * `proxy` 對兩個 fd 之間的資料傳遞進行對應處理