--- tags: linux2025 --- # [2025q1](https://wiki.csie.ncku.edu.tw/linux/schedule) 第 10 週測驗題 :::info 目的: 檢驗學員對 [I/O 模型](https://hackmd.io/@sysprog/linux-io-model)、[CPU 排程器](https://hackmd.io/@sysprog/linux-scheduler)和〈[Concurrency Primer](https://github.com/sysprog21/concurrency-primer)〉的認知 ::: ==[作答表單: 測驗 `1`, `2`](https://docs.google.com/forms/d/e/1FAIpQLSf4PAgAbOoQcHooCeubnZ81qrhIM8O2lpn0X-dSNneo7yl0lw/viewform?usp=dialog)== ==[作答表單: 測驗 `3`](https://docs.google.com/forms/d/e/1FAIpQLSeqPC0HXVb_SKBhdCzIqSaltaB0BwT-hj2XzH3zTORYQFnGTw/viewform?usp=dialog)== ### 測驗 `1` 考慮 `strtrim` 函式的作用是從給定的 C 字串中移除前導與尾端的空白字元,並回傳修剪 (trim) 後的子字串起始位址。它會直接修改原始字串內容(in-place),將尾端的空白字元以 `\0` 結束字元截斷,並將指標移到第一個非空白字元,供後續使用。以下字元會被當作空白處理: | 字元 | 意義 | 十六進位 | |------|------------|----------| | `' '` | 空白鍵 | `0x20` | | `'\t'` | tab 鍵 | `0x09` | | `'\n'` | 換行字元 (LF) | `0x0A` | | `'\r'` | 斷行符號 (CR) | `0x0D` | | `'\v'` | 垂直定位字元 | `0x0B` | | `'\f'` | 換頁字元 | `0x0C` | > 在 Windows 系統中,每行結尾使用 CRLF 作為斷行符號;而在 Linux 則僅使用 LF 字元。因此,相較於 Linux,Windows 中的每行會多出一個 CR 字元。CR 原本的意思是 Carriage Return,對應鍵盤上的 Enter 鍵,LF 意思是 Line Feed,表示實際的換行動作。macOS 目前也與 Linux 一樣,使用 LF 作為行尾字元。 應用情境 - 處理使用者輸入:避免輸入欄前後的空格影響後續操作 (如登入、搜尋) - 解析設定檔:行尾或行首常有空白,需先清除 - 處理網路協定標頭或資料行:HTTP headers、CSV 欄位等 我們利用[第 3 週測驗題](https://hackmd.io/@sysprog/linux2025-quiz3)提及的 SWAR 技巧,來實作 `strtrim` 函式 (只處理 ASCII,部分程式碼遮蔽) ```c /* ASCII whitespace check */ static inline bool is_ascii_space(uint8_t c) { return c == ' ' || (c - '\t' <= '\r' - '\t'); } /* Count leading ASCII space characters in a 64-bit word */ static inline int count_leading_spaces(uint64_t word) { for (int i = 0; i < AAAA; i++) { uint8_t c = (word >> (BBBB)) & 0xFF; if (!is_ascii_space(c)) return i; } return 8; } static char *strtrim(char *str) { uint8_t *s = (uint8_t *) str; /* Fast skip leading whitespace using SWAR if 8-byte aligned */ for (; ((uintptr_t) s & CCCC) == 0; s += 8) { uint64_t w = *(uint64_t *) s; int n = count_leading_spaces(w); if (n < 8) { s += n; break; } } while (*s && is_ascii_space(*s)) s++; if (!*s) return (char *) s; uint8_t *end = s + strlen((char *) s); while (end > s && is_ascii_space(*(DDDD))) end--; *end = '\0'; return (char *) s; } ``` 考慮 [client.c](https://gist.github.com/jserv/e76914f2cd0fe8ff19f66c6149f60220) 是我們打造的網路聊天客戶端程式碼,對應的巨集定義如下: ```c #define SERVER_ADDR "127.0.0.1" #define SERVER_PORT 7788 ``` 而 `chat.c` 則是網路聊天的伺服器端程式碼,定義如下 (忽略標頭檔): ```c /* Server configuration constants */ #define BACKLOG 128 /* Maximum pending connections */ #define MAX_EVENTS 64 /* Maximum epoll events per wait */ #define MAX_CLIENTS 1024 /* Maximum concurrent clients */ #define NICK_MAXLEN 32 /* Maximum nickname length */ enum { CHAT_OK = 0, CHAT_ERR = -1 }; /* store client information */ typedef struct { int fd; char nick[NICK_MAXLEN]; } client_t; /* manage server state */ typedef struct { int fd; client_t *clients[MAX_CLIENTS]; } server_t; ``` 主函式和事件主迴圈如下: (部分程式碼遮蔽) ```c /* Allocates memory with error handling */ static inline void *my_alloc(size_t size) { void *ptr = malloc(size); if (!ptr) { perror("malloc"); exit(EXIT_FAILURE); } return ptr; } int main(void) { /* Initialize chat server */ server_t server = {.fd = 0, .clients = {NULL}}; if (chat_listen(&server, SERVER_ADDR, SERVER_PORT) != CHAT_OK) { fprintf(stderr, "Failed to listen on %s:%d\n", SERVER_ADDR, SERVER_PORT); return CHAT_ERR; } /* Create epoll instance */ int epollfd = epoll_create1(0); if (epollfd < 0) { perror("epoll_create1"); return CHAT_ERR; } /* Add server socket to epoll */ struct epoll_event ev = {.events = EPOLLIN, .data.fd = server.fd}; if (FFFF < 0) { return CHAT_ERR; } char buf[256]; /* Main event loop */ for (;;) { /* Epoll event array for handling socket events */ struct epoll_event events[MAX_EVENTS]; int nfds = GGGG; if (nfds < 0) { return CHAT_ERR; } /* Process each epoll event */ for (int i = 0; i < nfds; i++) { int fd = events[i].data.fd; if (fd == server.fd) { /* Handle new client connection */ int client_fd = chat_accept(&server); if (client_fd < 0) continue; /* Initialize new client */ client_t *client = my_alloc(sizeof(client_t)); client->fd = client_fd; snprintf(client->nick, NICK_MAXLEN, "anon:%d", client_fd); server.clients[client_fd] = client; /* Send welcome message */ snprintf( buf, sizeof(buf), "Server\r\nWelcome %s! Use /nick to set a nickname\r\n", client->nick); if (write(client_fd, buf, strlen(buf)) < 0) perror("write welcome"); /* Broadcast join message */ snprintf(buf, sizeof(buf), "%s joined\n", client->nick); broadcast_message(&server, buf, client_fd, 1); /* Add client to epoll */ ev.events = EPOLLIN | EPOLLET; ev.data.fd = client_fd; if (HHHH < 0) { return CHAT_ERR; } } else { /* Handle client data */ ssize_t nread = read(fd, buf, sizeof(buf) - 1); if (nread <= 0) { handle_disconnect(&server, epollfd, fd, buf); } else { handle_client_message(&server, epollfd, fd, buf, nread); } } } } return 0; } ``` 對應的輔助函式如下: ```c /* Sets a file descriptor to non-blocking mode * @fd - File descriptor * Returns: CHAT_OK on success, CHAT_ERR on failure */ static int set_nonblocking(int fd) { int flags = fcntl(fd, F_GETFL, 0); if (flags < 0 || fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) { perror("fcntl"); return CHAT_ERR; } return CHAT_OK; } /* Initializes server socket for listening * @server - Server structure * @host - Host address * @port - Port number * Returns: CHAT_OK on success, CHAT_ERR on failure */ static int chat_listen(server_t *server, const char *host, int port) { struct addrinfo hints = { .ai_family = AF_INET, .ai_socktype = SOCK_STREAM, .ai_flags = AI_PASSIVE, }; char port_str[6]; snprintf(port_str, sizeof(port_str), "%d", port); struct addrinfo *res; if (getaddrinfo(host, port_str, &hints, &res)) { perror("getaddrinfo"); return CHAT_ERR; } int fd = -1; for (struct addrinfo *rp = res; rp; rp = rp->ai_next) { fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); if (fd < 0) continue; if (0 > setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &(int){1}, sizeof(int)) || bind(fd, rp->ai_addr, rp->ai_addrlen) < 0 || listen(fd, BACKLOG) < 0) { close(fd); continue; } break; } freeaddrinfo(res); if (fd < 0) { perror("bind/listen"); return CHAT_ERR; } server->fd = fd; return set_nonblocking(fd); } /* Accepts a new client connection * Returns: Client file descriptor on success, CHAT_ERR on failure */ static int chat_accept(server_t *server) { int fd = accept(server->fd, NULL, NULL); if (fd < 0) { if (errno != EAGAIN && errno != EWOULDBLOCK) perror("accept"); return CHAT_ERR; } return set_nonblocking(fd) == CHAT_OK ? fd : CHAT_ERR; } /* Sends a message to a single client * @client - Client to send to * @msg - Message to send * @len - Message length */ static void send_to_client(client_t *client, const char *msg, int len) { if (write(client->fd, msg, len) < 0) perror("write"); } /* Broadcasts a message to all clients except the sender * @server - Server structure * @buf - Message content * @sender_fd - Sender's file descriptor * @server_info - Flag for server-generated messages */ static void broadcast_message(server_t *server, const char *buf, int sender_fd, int server_info) { char msg[256]; int len = server_info ? snprintf(msg, sizeof(msg), "Server\r\n%s", buf) : snprintf(msg, sizeof(msg), "%s\r\n%s", server->clients[sender_fd]->nick, buf); /* Iterate through client array to send message */ for (int i = 0; i < MAX_CLIENTS; i++) { client_t *client = server->clients[i]; if (client && i != sender_fd) send_to_client(client, msg, len); } } static void handle_disconnect(server_t *server, int epollfd, int fd, char *buf) { client_t *client = server->clients[fd]; snprintf(buf, 256, "%s left\n", client->nick); broadcast_message(server, buf, fd, 1); EEEE; close(fd); free(client); server->clients[fd] = NULL; } static void handle_client_message(server_t *server, int epollfd, int fd, char *buf, ssize_t nread) { buf[nread] = '\0'; client_t *client = server->clients[fd]; if (!strncmp(buf, "/quit", 5) || !strncmp(buf, "/exit", 5)) { handle_disconnect(server, epollfd, fd, buf); } else if (!strncmp(buf, "/nick", 5)) { const char *nick = strtrim(buf + 5); strncpy(client->nick, nick, NICK_MAXLEN - 1); client->nick[NICK_MAXLEN - 1] = '\0'; } else { broadcast_message(server, buf, fd, 0); } } ``` 補完程式碼,使其運作符合預期。作答規範: * `AAAA` 和 `CCCC` 為十進位數值 * `BBBB` 和 `DDDD` 為合法表示式,不得出現乘法和除法運算子 * `EEEE`, `FFFF`, `GGGG`, `HHHH` 皆為以 `epoll_` 開頭的函式呼叫 * `EEEE`, `FFFF`, `HHHH` 包含 `EPOLL_` 開頭的巨集 * GGGG 包含 `MAX_EVENTS` * 使用一致的程式碼縮排風格,以最精簡的方式撰寫,不包含表示式前後的空白字元,皆不得出現 `;` 或 `?` 或 `;` 字元 :::success 延伸問題: * 解釋上方程式碼運作原理 * 藉由在客戶端引入對話模擬 (如 [ELIZA](https://zh.wikipedia.org/zh-tw/ELIZA%E6%95%88%E6%87%89)),量化上方程式碼,並提出針對其並行處理能力的改進 ::: --- ### 測驗 `2` 延續[第 6 週測驗題](https://hackmd.io/@sysprog/linux2025-quiz6)的測驗 `3`。在處理大規模資料集時,常見的需求是快速判斷某個元素是否存在於一個龐大的集合中,這就是成員資格查詢 (membership query)。傳統的資料結構面臨挑戰: * 雜湊表 (Hash Table): 雖然平均查詢時間為 $O(1)$,但需要儲存元素本身或其標識,空間開銷可能非常大 * 串列/陣列 (List/Array): 無序串列查詢效率低 ($O(n)$);有序結構(如平衡樹、排序陣列)查詢效率尚可 ($O(\log{n})$),但仍需儲存元素,且插入和維護成本較高 當應用場景滿足以下條件時,Bloom Filter 提供截然不同的解決方案: 1. 可以容忍一定的誤報 (False Positives):即查詢結果為「可能存在」,但元素實際上不在集合中 2. 絕不允許漏報 (No False Negatives):即查詢結果為「絕對不存在」時,元素一定不在集合中 3. 對空間效率和查詢速度有極高要求 Bloom Filter 正是藉由犧牲絕對的準確性(允許誤報),換取極佳的空間和時間效率。 Bloom Filter 由以下二部分組成: 1. 位元陣列 (Bit Array): 一個長度為 $m$ 的位元陣列,所有位元初始值均為 0 2. k 個雜湊函數: $k$ 個獨立的雜湊函數 $h_1, h_2, \dots, h_k$。每個函數將輸入的鍵 (key) 均勻地映射到 $[0, m-1]$ 的範圍內。理想情況下,這些雜湊函數的結果應相互獨立 關鍵操作: * 添加元素 (Add / Insertion): 1. 對於要添加的元素 $x$,使用 $k$ 個雜湊函數分別計算其雜湊值:$h_1(x), h_2(x), \dots, h_k(x)$。這些雜湊值對應位元陣列中的 $k$ 個索引位置。 2. 將位元陣列中這 $k$ 個索引位置的位元設定為 1。 (注意:即使某個位元原本已經是 1,也保持為 1)。 * 查詢元素 (Query / Check): 1. 對於要查詢的元素 $y$,同樣使用 $k$ 個雜湊函數計算其雜湊值:$h_1(y), h_2(y), \dots, h_k(y)$。 2. 檢查位元陣列中這 $k$ 個索引位置的位元: * 如果所有位元都為 1: 則判斷元素 $y$ 可能存在 (Possibly Present) 於集合中。這是可能產生誤報的情況。 * 如果至少有一個位元為 0: 則判斷元素 $y$ 絕對不存在 (Definitely Not Present) 於集合中。這是 Bloom Filter 不會有漏報的保證。 誤報率 (False Positive Probability) 是理解 Bloom Filter 性能和進行參數設計的關鍵。我們的目標是計算當查詢一個從未被添加過的元素時,被誤判為存在的機率,即誤報率 ($P_{fp}$)。 假設 (理想情況): 1. 均勻性 (Uniformity): 每個雜湊函數都將任意鍵均勻映射到位元陣列的 $[0, m-1]$ 範圍內,即映射到任何一個特定位置的機率是 $\frac{1}{m}$ 2. 獨立性 (Independence): $k$ 個雜湊函數的輸出結果是相互獨立的 推導步驟: 1. 單個位元保持為 0 的機率 (單次雜湊): 對於位元陣列中的某一個特定位元,當插入一個鍵時,被某個特定的雜湊函數 $h_i$ 選中的機率是 $\frac{1}{m}$。因此,這個位元不被 $h_i$ 選中的機率是 $(1 - \frac{1}{m})$。 2. 單個位元保持為 0 的機率 (k 次雜湊, 單個鍵): 由於有 $k$ 個獨立的雜湊函數,這個特定位元在插入單個鍵(經過 $k$ 次雜湊)後仍然保持為 0 的機率是: $P(\text{bit is 0 after 1 key}) = (1 - \frac{1}{m})^k$ 3. 單個位元保持為 0 的機率 (插入 N 個鍵): 假設我們獨立地插入 $N$ 個不同的鍵。對於某一個特定位元,它在插入 $N$ 個鍵後仍然保持為 0 的機率是(每次插入的 $k$ 次雜湊都不改變此位元): $$ P_0 = \left(1 - \frac{1}{m}\right)^{kn} $$ 4. 使用近似簡化: 當位元陣列長度 $m$ 相對於 $k N$ 很大時,$1/m$ 是一個很小的值。根據極限 $\lim_{x \to \infty} (1 - 1/x)^x = e^{-1}$,我們可以得到近似: $(1 - \frac{1}{m}) \approx e^{-1/m}$ 這個近似在 $m$ 越大時越精確。 5. 近似單個位元保持為 0 的機率: 將上述近似代入第 3 步的公式: $$ P_0 \approx (e^{-1/m})^{kN} = e^{-kN/m} $$ 6. 單個位元被設為 1 的機率: 在插入 $N$ 個鍵後,某個特定位元被設為 1 的機率是: $$ P_1 = 1 - P_0 \approx 1 - e^{-kN/m} $$ 7. 誤報率 ($P_{fp}$) 的計算: 考慮一個從未被插入集合的元素 $y$。查詢 $y$ 時發生誤報,意味著 $y$ 對應的 $k$ 個雜湊位置 $h_1(y), \dots, h_k(y)$ 上的位元恰好都為 1。 為了簡化計算,我們做一個關鍵的(近似)假設:對於一個不在集合中的元素 $y$,其 $k$ 個雜湊位置上的位元狀態是相互獨立的。 在該獨立性假設下,誤報率 $P_{fp}$(也常用 $f$ 表示)約等於單個位元為 1 的機率的 $k$ 次方: $$ f = (P_1)^k \approx (1 - e^{-kN/m})^k $$ 參數最佳化:尋找最優的 k 給定預計插入的元素數量 $N$ 和位元陣列大小 $m$,我們希望選擇一個最佳的雜湊函數個數 $k$,使得誤報率 $f(k) = (1 - e^{-kN/m})^k$ 最小。 藉由對 $\ln(f(k))$ 求導並令導數為 0,可以解得使誤報率最小的最優雜湊函數個數 $k_{opt}$ 滿足: $\frac{k_{opt} N}{m} = \ln 2$ 即: $k_{opt} = \frac{m}{N} \ln 2$ 由於 $k$ 必須是整數,實際應用中通常取 $k = \text{round}(\frac{m}{N} \ln 2)$。 反向計算:給定 N 和期望誤報率 f,計算 m 和 k 1. 計算 k: 當 $k$ 取最優值時,誤報率 $f \approx (1/2)^k$。因此: $k_{opt} \approx \log_2(1/f) = -\log_2(f)$。 2. 計算 m: 從 $k_{opt} = \frac{m}{N} \ln 2$ 得到 $m = \frac{N k_{opt}}{\ln 2}$。代入 $k_{opt}$: $m \approx \frac{N (-\log_2(f))}{\ln 2} = -N \frac{\ln(f)}{(\ln 2)^2}$ 整理以上可之: * 所需位元數 $m \ge \frac{N \ln(1/f)}{(\ln 2)^2} \approx 1.44 N \log_2(1/f)$ * 雜湊函數個數 $k = \text{round}(\frac{m}{N} \ln 2) \approx \text{round}(\log_2(1/f))$ Bloom filter 的優點: * 空間效率極高: 只需 $m$ 個位元,無需儲存元素本身 * 時間效率高: 插入和查詢操作時間複雜度均為 O(k),與集合元素總數 $N$ 無關 * 無漏報 (No False Negatives): 查詢結果為「不存在」時絕對可靠。 * 可合併性: 兩個參數 ($m, k$) 相同的 Bloom Filter 可藉由按位或 (Bitwise OR) 操作合併,代表兩個集合的並集 Bloom filter 的缺點: * 存在誤報 (False Positives): 查詢結果為「可能存在」時可能不準確。誤報率 $f$ 受 $m, N, k$ 影響 * 無法安全刪除元素 (標準 Bloom Filter): 直接將位元從 1 改為 0 可能影響其他元素,導致漏報。(可用 Counting Bloom Filter 改進,但增加空間開銷) * 需要預估元素數量 N: 為獲得理想誤報率,需提前估計 $N$ 以合理設定 $m$ 和 $k$。若實際 $N$ 遠超預期,誤報率會急劇上升。(可用 Scalable Bloom Filter 改進) 理論上 Bloom Filter 需要 $k$ 個獨立且均勻的雜湊函數,在實際硬中,產生 $k$ 個高品質且完全獨立的雜湊函數成本較高。常見的解決方案有: - [ ] 方法一:使用單一高品質雜湊函數配合不同種子/參數 Linux 核心的 eBPF Bloom Filter 實作採用此方法。它使用核心的 `jhash` 函數,並為產生第 $i$ 個雜湊值時提供不同的種子 (`bloom->hash_seed + i`)。 ```c /* Linux Kernel BPF Bloom Filter hash generation (示意) */ static u32 hash(struct bpf_bloom_filter *bloom, const void *value, u32 value_size, u32 i) { u32 h; // 使用 jhash 函數,種子 = filter的基礎種子 + 雜湊函數索引 i h = jhash(value, value_size, bloom->hash_seed + i); // 將 32 位雜湊值 h 映射到位元陣列索引 (使用掩碼) return h & bloom->bitset_mask; } ``` - [ ] 方法二:使用單一雜湊函數進行雙重雜湊或衍生 (如 FNV-1a 範例) 以下是用 FNV-1a 雜湊函數和簡化版雙重雜湊策略的具體實作範例: * 參數設定: * 使用 $k = 4$ 個雜湊函式產生 4 個位元索引。 * 位元陣列長度設為 $m = 2^{27} = 134,217,728$。 * 雜湊函數 (64 位元 FNV-1a 變體): ```c #define hash(key, keylen) ({ \ uint64_t h = 14695981039346656037ULL; /* FNV offset basis */ \ const char *data = (const char *)key; \ for (size_t i = 0; i < keylen; i++) { \ h = (h ^ data[i]) * 1099511628211ULL; /* FNV prime */ \ } \ h; \ }) ``` * 索引產生方式 (簡化版雙重雜湊): 首先計算一次基礎雜湊值 `hbase = hash(key, keylen)`,然後產生 $k$ 個索引: ```c // hbase 是 64 位元的 FNV-1a 雜湊結果 // filter->size 應為 m (即 2^27) // 產生第 i 個索引 (i 從 0 到 k-1) indices[i] = (hbase + i * (hbase >> 32)) & (filter->size - 1); ``` 這裡利用 64 位雜湊值的高 32 位作為步長因子,與基礎雜湊值組合後藉由位與操作(因為 $m$ 是 2 的冪)映射到索引範圍。 * 理論誤判率計算: 假設此 Bloom Filter 插入 $n = 1,000,000$ 個元素: $$ \frac{kn}{m} = \frac{4 \times 1,000,000}{2^{27}} = \frac{4,000,000}{134,217,728} \approx 0.0298 $$ 根據誤判率公式 $f \approx (1 - e^{-kn/m})^k$: $$ f \approx (1 - e^{-0.0298})^k = (1 - 0.9706)^4 = (0.0294)^4 $$ $$ f \approx 7.4 \times 10^{-7} $$ → 理論誤判率約為 $7.4 \times 10^{-5} \%$ 或 0.000074%。 FNV-1a 與索引衍生的實務考量與限制: 1. 雜湊偏態: FNV-1a 雜湊函數雖然計算速度快,但對於具有相似字首的字串(例如 "item-001", "item-002")容易產生相似的雜湊值或碰撞,其輸出分佈的均勻性可能不如 MurmurHash 或 xxHash 等現代雜湊函數,這會影響位元陣列中位元設定的均勻性。 2. 索引衍生獨立性不足: 這種藉由單一基礎雜湊值 `hbase` 推導產生 $k$ 個索引的方法(包括簡化版雙重雜湊),其產生的 `indices[0], indices[1], ..., indices[k-1]` 之間存在較強的相關性,並非真正獨立。這違反了理論推導中「位元設定為獨立事件」的關鍵假設,可能導致實際的誤判率高於理論預估值。 數學推導中使用的「雜湊函數獨立」和「位元狀態獨立」的假設在現實中通常是近似滿足的。 * 使用同一個雜湊算法配合不同種子或藉由衍生方式產生的 $k$ 個值並非嚴格獨立 * 位元陣列中不同位元的狀態也非完全獨立 然而: 1. 實用性: 只要基礎雜湊函數具有良好的均勻分佈性與雪崩效應(輸入微小變化導致輸出大幅隨機變化),那麼即使產生 $k$ 個值的方法存在一定的依賴性,基於獨立性假設推導出的誤判率公式 $f \approx (1 - e^{-kN/m})^k$ 仍然是一個非常有用的性能估計與參數設計指南。 2. 雜湊品質是關鍵: 無論如何產生 $k$ 個值,基礎雜湊函數的品質(均勻性、低碰撞率)對於 Bloom Filter 的實際性能至關重要。選擇高品質的雜湊函數是改進 Bloom Filter 性能的重要一步。 3. 權衡: 這體現了 Bloom Filter 設計中的權衡:追求理論上的完美獨立性可能代價過高,而實用的近似方法在效率和效果之間取得了平衡。認識到理論與實踐的差距,並了解所用方法的局限性很重要。 Linux 核心在 eBPF (extended Berkeley Packet Filter) 子系統中,藉由 `BPF_MAP_TYPE_BLOOM_FILTER` 類型的 eBPF map,為核心空間和使用者空間的 eBPF 程序提供了使用 Bloom Filter 的能力。 參數計算與實作 ([kernel/bpf/bloom_filter.c](https://github.com/torvalds/linux/blob/master/kernel/bpf/bloom_filter.c)) 核心程式碼 `bloom_filter_map_alloc` 負責創建 Bloom Filter map。關鍵步驟是根據使用者提供的 `max_entries` ($N$) 和 `nr_hash_funcs` ($k$) 計算所需的位元陣列大小 $m$ (`nr_bits`): ```c static struct bpf_map *bloom_filter_map_alloc(union bpf_attr *attr) { // ... (獲取 N = attr->max_entries 和 k = attr->nr_hash_funcs) ... u64 nr_bits; // 對應 m // ... /* 根據 N 和 k 計算最佳 m (nr_bits) */ /* 使用公式: m = N * k / ln(2),近似 1/ln(2) ≈ 7/5 */ if (check_mul_overflow(attr->max_entries, nr_hash_funcs, &nr_bits) || /* 計算 nr_bits ≈ (N * k / 5) * 7 */ check_mul_overflow(nr_bits / 5, (u32)7, &nr_bits) || nr_bits > (1UL << 31)) { /* 溢出或過大,限制 m 最大為 U32_MAX */ // ... (設定 bitset_bytes, bitset_mask) } else { /* 確保最小尺寸,並向上取整到 2 的冪次方 */ if (nr_bits <= BITS_PER_LONG) nr_bits = BITS_PER_LONG; else nr_bits = roundup_pow_of_two(nr_bits); // ... (設定 bitset_bytes, bitset_mask) } // ... (分配記憶體) ... // ... (初始化隨機種子 bloom->hash_seed) ... } ``` 程式碼分析: 1. 計算目標: 根據使用者輸入的 $N$ 和 $k$,計算出一個能使誤報率接近最小的位元陣列大小 $m$。 2. 公式應用與近似: 核心程式碼直接用 $m = \frac{N k}{\ln 2}$ 的關係,並使用 $\frac{1}{\ln 2} \approx \frac{7}{5}$ 進行定點近似。 3. 邊界處理與效率改進:處理計算溢出,確保最小尺寸,並將 $m$ 取整到 2 的冪次方,以使用高效的位與操作進行索引映射。 4. 隨機種子: 每個 Bloom Filter 實例使用隨機的 `hash_seed`,配合 `jhash` 產生 $k$ 個不同的雜湊值。 本次測驗中,採用 FNV-1a 的 64 位元變種。 Bloom filter 適合高效能並行場景的考量是: 1. 新增與查詢操作都是 lock-free 2. 不會移除元素 然而,這些操作在多核處理器或多執行緒環境下,就會遇到幾個重要的問題: 1. Data Race 風險:假設二個執行緒同時對 `filter->data[index]` 做 bitwise-OR 操作或查詢,若未使用 atomics,則會出現下列典型的競爭情形: ```c filter->data[index] |= bit; ``` 這可能會導致其中一個數值被覆蓋,產生 bit flip 丟失問題,進而導致查詢誤判機率上升,甚至出現 false negative(理論上不該出現)。 2. Cache coherence / False Sharing: 由於多執行緒可能同時寫入鄰近的 `uint64_t` 區塊,若 `data[]` 未妥善對齊,則可能導致 CPU 快取行為效率低下。若二個執行緒操作相鄰但不同的 `data[i]`,這些可能落在同一 cache line,導致 false sharing;每次更新位元,會造成整個 cache line 的 write invalidation,增加資料匯流排負擔與記憶體同步延遲 > 延伸閱讀: [並行程式設計: Atomics 操作](https://hackmd.io/@sysprog/concurrency-atomics) 需要留意的是,儘管使用 atomic RMW,但不足以完全解決 contention:若多執行緒集中寫入相同區段 (高 collision key),仍會造成 atomic cache ping-pong,於是需要改進雜湊函數的數值分布。另外,雖然使用 `_Alignas(64)` 已有助於對齊,但若結構為 `struct { _Atomic uint64_t data[n]; }`,仍需考慮整體記憶體佈局 (memory layout) 和頁面分佈。 考慮以下 concurrent bloom filter,其標頭檔: (`bloom.h`) ```c #pragma once #include <stddef.h> /** Opaque Bloom filter structure */ typedef struct bloom_internal bloom_t; /** Function type for allocating memory */ typedef void *(*bloom_alloc_t)(size_t); /** Function type for deallocating memory */ typedef void (*bloom_dealloc_t)(void *, size_t); /*------------------------------*/ /* Construction and destruction */ /*------------------------------*/ /** * @brief Create a Bloom filter with default size. * @param alloc Optional allocator. If NULL, a default allocator is used. * @return Pointer to a new Bloom filter or NULL on failure. */ bloom_t *bloom_new(bloom_alloc_t alloc); /** * @brief Create a Bloom filter with specified size in bits. * @param size Number of bits. Must be a power of two. * @param alloc Optional allocator. If NULL, a default allocator is used. * @return Pointer to a new Bloom filter or NULL on failure. */ bloom_t *bloom_new_with_size(size_t size, bloom_alloc_t alloc); /** * @brief Destroy a Bloom filter and free resources. * @param filter Pointer to a pointer to a Bloom filter. Will be set to NULL. * @param dealloc Optional deallocator. If NULL, a default deallocator is used. */ void bloom_destroy(bloom_t **filter, bloom_dealloc_t dealloc); /*------------------*/ /* Filter operations */ /*------------------*/ /** * @brief Clear all bits in the Bloom filter. * @param filter Bloom filter instance. */ void bloom_clear(bloom_t *filter); /** * @brief Add a key to the Bloom filter. * @param filter Bloom filter instance. * @param key Pointer to the key. * @param keylen Length of the key in bytes. */ void bloom_add(bloom_t *filter, const void *key, size_t keylen); /** * @brief Test whether a key may be present in the Bloom filter. * @param filter Bloom filter instance. * @param key Pointer to the key. * @param keylen Length of the key in bytes. * @return 1 if possibly present, 0 if definitely not present. */ int bloom_test(bloom_t *filter, const void *key, size_t keylen); /*--------------------------*/ /* Default memory allocators */ /*--------------------------*/ /** * @brief Default allocator using malloc. * @param size Number of bytes to allocate. * @return Pointer to allocated memory. */ void *bloom_alloc(size_t size); /** * @brief Default deallocator using free. * @param ptr Pointer to memory to free. * @param size Number of bytes to free (ignored by default). */ void bloom_free(void *ptr, size_t size); ``` 效能評比程式可見 [bench.c](https://gist.github.com/jserv/042517212164c32d9998aafa77ee740a) 對應的 C 程式: ```c #include <stdatomic.h> #include <stdint.h> #include <stdlib.h> #include <string.h> #include "bloom.h" struct bloom_internal { size_t size; /* Number of bits in the filter */ _Atomic uint64_t *data; /* Word-aligned, atomic bit array */ }; /* Create a new Bloom filter with default size (2^27 bits) */ bloom_t *bloom_new(bloom_alloc_t allocator) { return bloom_new_with_size(1ULL << 27, allocator); } /* Create a new Bloom filter with specified size in bits */ bloom_t *bloom_new_with_size(size_t size, bloom_alloc_t allocator) { /* Round up to nearest 64-bit word */ size_t words = (size + 63) / 64; size_t bytes = words * sizeof(uint64_t); bloom_t *filter = allocator(sizeof(bloom_t) + bytes); if (!filter) return NULL; filter->size = size; filter->data = (_Atomic uint64_t *) ((char *) filter + sizeof(bloom_t)); /* Initialize to zero */ for (size_t i = 0; i < words; i++) { atomic_store(&filter->data[i], 0); } return filter; } /* Destroy the Bloom filter and set pointer to NULL */ void bloom_destroy(bloom_t **filter, bloom_dealloc_t deallocator) { if (filter && *filter) { deallocator(*filter, sizeof(bloom_t) + (((*filter)->size + 63) / 64) * sizeof(uint64_t)); *filter = NULL; } } /* Clear all bits in the filter (requires external synchronization) */ void bloom_clear(bloom_t *filter) { size_t words = (filter->size + 63) / 64; for (size_t i = 0; i < words; i++) { atomic_store(&filter->data[i], 0); } } /* Set a bit at the given key position */ static void set(bloom_t *filter, size_t key) { size_t index = key / 64; uint64_t bit = 1ULL << (key % 64); IIII; } /* Test if a bit is set at the given key position */ static int get(bloom_t *filter, size_t key) { size_t index = key / 64; uint64_t bit = 1ULL << (key % 64); return (JJJJ & bit) != 0; } /* Add a key to the Bloom filter */ void bloom_add(bloom_t *filter, const void *key, size_t keylen) { uint64_t hbase = hash(key, keylen); /* Use double hashing for 4 indices */ for (int i = 0; i < 4; i++) { size_t h = (hbase + i * (hbase >> 32)) & (filter->size - 1); /* Assumes size is power of 2 */ set(filter, h); } } /* Test if a key is likely in the Bloom filter */ int bloom_test(bloom_t *filter, const void *key, size_t keylen) { uint64_t hbase = hash(key, keylen); for (int i = 0; i < 4; i++) { size_t h = (hbase + i * (hbase >> 32)) & (filter->size - 1); if (!get(filter, h)) return 0; } return 1; } /* Default allocator using calloc for zeroed memory */ void *bloom_alloc(size_t size) { return calloc(1, size); } /* Default deallocator */ void bloom_free(void *ptr, size_t size) { (void) size; free(ptr); } ``` 補完程式碼,使其運作符合預期。作答規範: * `IIII` 和 `JJJJ` 為 `atomic_` 開頭的函式呼叫 * 使用一致的程式碼縮排風格,以最精簡的方式撰寫,不包含表示式前後的空白字元,皆不得出現 `;` 或 `?` 或 `;` 字元 --- ### 測驗 `3` 我們想精確測量 GNU/Linux 上行程和執行緒 (NPTL) 的 context switch(上下文切換)成本,其基本原理是,利用 [pipe](https://man7.org/linux/man-pages/man2/pipe.2.html) 系統呼叫雙向通訊機制造成二個執行單位 (process 或 thread) 之間交替等待與喚醒,並量測整體來回的時間,進而估算一次 context switch 的平均耗時。 > 延伸閱讀: [Arm-Linux](https://wiki.csie.ncku.edu.tw/embedded/arm-linux) 測量 context switch 前,我們準備以下程式碼: ```c #define _GNU_SOURCE #include <math.h> #include <poll.h> #include <pthread.h> #include <stdbool.h> #include <stdio.h> #include <stdlib.h> #include <sys/wait.h> #define N_ITERATIONS 1000 #define N_REPETITIONS 10 #define PIPE_TIMEOUT_MS 1000 typedef struct { double avg, min, max, stddev; } stat_t; typedef struct { int iterations, pipe_read_fd, pipe_write_fd, cpu_id, priority; pthread_barrier_t *barrier; } thread_args_t; int set_affinity(int cpu_id) { cpu_set_t cpuset; CPU_ZERO(&cpuset); CPU_SET(cpu_id, &cpuset); return sched_setaffinity(0, sizeof(cpuset), &cpuset); } int set_realtime_priority(int priority) { struct sched_param param = {.sched_priority = priority}; return sched_setscheduler(0, SCHED_FIFO, &param); } bool check_root() { return geteuid() == 0; } void compute_stats(const double *times, int n, stat_t *stats) { double sum = 0.0, sum_sq = 0.0; stats->min = stats->max = times[0]; for (int i = 0; i < n; i++) { sum += times[i]; sum_sq += times[i] * times[i]; if (times[i] < stats->min) stats->min = times[i]; if (times[i] > stats->max) stats->max = times[i]; } stats->avg = sum / n; stats->stddev = sqrt((sum_sq / n) - stats->avg * stats->avg); } ``` 以下程式會建立二個 `pipe`: - `pipe1`: A 寫入、B 載入 - `pipe2`: B 寫入、A 載入 這構成 A ↔ B 之間一來一回的同步通訊機制,無論是行程抑或執行緒。 彼此等待與喚醒造成 context switch - A 寫入 pipe1 → B 等待 pipe1 資料 → 喚醒並載入 - B 寫入 pipe2 → A 等待 pipe2 資料 → 喚醒並載入 - 如此重複若干次 每次來回交替,會造成二次 context switch (即 A → B 和 B → A)。 > 對照[第 3 週測驗題](https://hackmd.io/@sysprog/linux2025-quiz3)的測驗三 針對行程的測試流程 - 親代行程 (A) 綁定 CPU~0~,設定 `SCHED_FIFO` 優先權 95 - 子行程 (B) 綁定 CPU~1~,設定 `SCHED_FIFO` 優先權 94 - 親代行程記錄時間,在每次來回後讀出 pipe2 的資料 - 結束後計算「平均每次 context switch = 來回時間 / (2 * iterations)」 對應的原始程式碼 (部分遮蔽): ```c double measure_context_switch(int iterations, int pipe1[2], int pipe2[2], pid_t pid) { char token = 'x'; struct timespec start, end; if (pid == 0) { set_affinity(THIS_ISNT_VALID1); set_realtime_priority(THIS_ISNT_VALID2); close(pipe1[1]); close(pipe2[0]); for (int i = 0; i < iterations; i++) { char buf; poll((struct pollfd[]){{pipe1[0], POLLIN}}, THIS_ISNT_VALID3, PIPE_TIMEOUT_MS); if (read(pipe1[0], &buf, 1) != 1) exit(EXIT_FAILURE); if (write(pipe2[1], &token, 1) != 1) exit(EXIT_FAILURE); } close(pipe1[0]); close(pipe2[1]); exit(EXIT_SUCCESS); } set_affinity(IGNORE_THIS1); set_realtime_priority(IGNORE_THIS2); close(pipe1[0]); close(pipe2[1]); clock_gettime(CLOCK_MONOTONIC, &start); for (int i = 0; i < iterations; i++) { if (write(pipe1[1], &token, 1) != 1) exit(EXIT_FAILURE); char buf; poll((struct pollfd[]){{pipe2[0], POLLIN}}, IGNORE_THIS3, PIPE_TIMEOUT_MS); if (read(pipe2[0], &buf, 1) != 1) exit(EXIT_FAILURE); } clock_gettime(CLOCK_MONOTONIC, &end); close(pipe1[1]); close(pipe2[0]); wait(NULL); return ((end.tv_sec - start.tv_sec) * 1e6 + (end.tv_nsec - start.tv_nsec) / 1e3) / (2.0 * iterations); } ``` 關鍵操作: - 使用 `poll()` 等待對方寫入資料,確保有 blocking,進而觸發 context switch - 使用 `SCHED_FIFO` 保證完全不被 NORMAL/OTHER 等非即時行程干擾 - CPU affinity 保證切換不受處理器核遷移 (CPU migration) 影響 執行緒測試流程: - 主執行緒與副執行緒分別固定在 CPU~0~ / CPU~1~,使用 [pthread_barrier](https://man7.org/linux/man-pages/man3/pthread_barrier_wait.3p.html) 同步起跑 - 也是透過二條 pipe 傳遞訊號與資料,一來一回代表二次 context switch 對應的原始程式碼 (部分遮蔽): ```c void *thread_func(void *arg) { thread_args_t *args = arg; char token = 'x'; set_affinity(args->cpu_id); set_realtime_priority(args->priority); if (args->cpu_id == 0) if (write(args->pipe_write_fd, &token, 1) != 1) exit(EXIT_FAILURE); pthread_barrier_wait(args->barrier); for (int i = 0; i < args->iterations; i++) { char buf; poll((struct pollfd[]){{args->pipe_read_fd, POLLIN}}, IGNORE_THIS4, PIPE_TIMEOUT_MS); if (read(args->pipe_read_fd, &buf, 1) != 1) exit(EXIT_FAILURE); if (write(args->pipe_write_fd, &token, 1) != 1) exit(EXIT_FAILURE); } return NULL; } double measure_thread_switch(int iterations) { int pipe1[2], pipe2[2]; pthread_t t1, t2; pthread_barrier_t barrier; thread_args_t a1 = {iterations, 0, 0, 0, 95, &barrier}, a2 = {iterations, 0, 0, 1, 94, &barrier}; if (pipe(pipe1) == -1 || pipe(pipe2) == -1) exit(EXIT_FAILURE); pthread_barrier_init(&barrier, NULL, 2); a1.pipe_read_fd = pipe1[0], a1.pipe_write_fd = pipe2[1]; a2.pipe_read_fd = pipe2[0], a2.pipe_write_fd = pipe1[1]; struct timespec start, end; clock_gettime(CLOCK_MONOTONIC, &start); pthread_create(&t1, NULL, thread_func, &a1); pthread_create(&t2, NULL, thread_func, &a2); pthread_join(t1, NULL); pthread_join(t2, NULL); clock_gettime(CLOCK_MONOTONIC, &end); close(pipe1[0]); close(pipe1[1]); close(pipe2[0]); close(pipe2[1]); pthread_barrier_destroy(&barrier); return ((end.tv_sec - start.tv_sec) * 1e6 + (end.tv_nsec - start.tv_nsec) / 1e3) / (2.0 * iterations); } ``` 實作手法整理: | 設計要素 | 原因 | |-----------------|------| | `pipe + poll()` | 強制 thread/process block,保證產生 context switch,而非 busy-wait 或同步競爭 | | `SCHED_FIFO` | 避免被其他一般行程干擾,確保測量只涵蓋必要切換成本 | | `CPU affinity` | 排除 CPU 排程器中遷移導致的 TLB flush、熱區 (hot region) 切換等雜訊 | | `fork()` 與 `pthread_create()` | 分別製造行程/執行緒的互動情境,便於比較差異 | 主程式: ```c int main(int argc, char *argv[]) { int iters = N_ITERATIONS; int reps = N_REPETITIONS; if (!check_root()) fprintf(stderr, "Not running as root: RT scheduling may fail\n"); double *results = malloc(reps * sizeof(double)); stat_t stats; for (int i = 0; i < reps; i++) { int pipe1[2], pipe2[2]; if (pipe(pipe1) == -1 || pipe(pipe2) == -1) exit(EXIT_FAILURE); pid_t pid = fork(); if (pid == -1) exit(EXIT_FAILURE); results[i] = measure_context_switch(iters, pipe1, pipe2, pid); } compute_stats(results, reps, &stats); printf("Process context switch (%d iters x %d reps):\n", iters, reps); printf(" Avg: %.3f us, Min: %.3f us, Max: %.3f us, Stddev: %.3f us\n", stats.avg, stats.min, stats.max, stats.stddev); for (int i = 0; i < reps; i++) { results[i] = measure_thread_switch(iters); } compute_stats(results, reps, &stats); printf("Thread context switch (%d iters x %d reps):\n", iters, reps); printf(" Avg: %.3f us, Min: %.3f us, Max: %.3f us, Stddev: %.3f us\n", stats.avg, stats.min, stats.max, stats.stddev); free(results); return 0; ``` 本程式檔名為 `ctx.c`,編譯方式: ```shell $ gcc -Wall -O2 -o ctx ctx.c -lm -lpthread ``` 執行時務必要用超級使用者權限: ```shell $ sudo ./ctx ``` 在 AMD Ryzen Threadripper 2990WX 32-Core (64) @ 3.00 GHz 測試,參考輸出: ``` Process context switch (1000 iters x 10 reps): Avg: 3.903 us, Min: 3.790 us, Max: 4.479 us, Stddev: 0.196 us Thread context switch (1000 iters x 10 reps): Avg: 3.957 us, Min: 3.932 us, Max: 3.991 us, Stddev: 0.017 us ``` 行程和執行緒的 context switch 切換成本為何如此接近?二者相差非常小,幾乎在誤差範圍內。 在 Linux 中,無論是 process 抑或 thread,其實都是 `task_struct`,亦即 Linux 核心用以管理排程的基本單元。差異僅在於: - process:有獨立的 `mm_struct`,即自身的虛擬記憶體空間 - thread:與其他執行緒共享 `mm_struct`,也就是與其他執行緒共享同一定址空間 (address space) 從核心的 CPU 排程器 (如 CFS 或 EEVDF) 角度來看,切換的是 `task_struct` 實體,而非高階語意上的 "process vs thread" (把你書架上的[恐龍書](https://gholk.github.io/ptt/M.1528439323.A.5B7.html)移開,不要弄髒書架)。這說明何以二者的 context switch 成本在 Linux 上幾乎一樣。 > [學不好作業系統,不能怪你](https://www.facebook.com/groups/system.software2025/posts/973639714972830/) 無論是透過 `fork()` 建立的子行程還是透過 `pthread_create()` 建立的執行緒,Linux 都以相同的結構(`task_struct`)管理它們,且使用相同的排程機制。因此,context switch 的基本開銷:儲存/載入暫存器、切換 kernel stack、切換分頁表 (page table) 等動作,本質相同。 由於執行緒的切換過程中,不需更換分頁表,看似會有較低的切換成本,但現代 Linux 在同 NUMA node、TLB cache 表現良好的情況下,行程中的分頁表重新載入的開銷極低,且現代中央處理器於 TLB 切換已有顯著改進,因此我們不易看出行程和執行緒切換的成本差異。 注意,許多教材或討論提及的執行緒存在的優勢,並非來自後者 context switch 成本較低 (幾乎相同),而是來自: * 可共享 address space → 不需 IPC 可直接存取同一份資料 * 可共享 file descriptor、heap、stack (部分) 等資源 * 更容易建立大量輕量單位(例如 10 萬個 thread vs process) 作答規範: * IGNORE_THIS1 到 IGNORE_THIS4 和 THIS_ISNT_VALID1 到 THIS_ISNT_VALID3 皆為十進位數值 :::success 延伸題目: * 以 eBPF 進行上述測量,亦即取得行程和執行緒切換的成本 * 在多個 NUMA 節點的環境,上述程式碼能否精準測量?若無法,有哪些干擾因素、又如何排除? :::