---
tags: linux2025
---
# [2025q1](https://wiki.csie.ncku.edu.tw/linux/schedule) 第 10 週測驗題
:::info
目的: 檢驗學員對 [I/O 模型](https://hackmd.io/@sysprog/linux-io-model)、[CPU 排程器](https://hackmd.io/@sysprog/linux-scheduler)和〈[Concurrency Primer](https://github.com/sysprog21/concurrency-primer)〉的認知
:::
==[作答表單: 測驗 `1`, `2`](https://docs.google.com/forms/d/e/1FAIpQLSf4PAgAbOoQcHooCeubnZ81qrhIM8O2lpn0X-dSNneo7yl0lw/viewform?usp=dialog)==
==[作答表單: 測驗 `3`](https://docs.google.com/forms/d/e/1FAIpQLSeqPC0HXVb_SKBhdCzIqSaltaB0BwT-hj2XzH3zTORYQFnGTw/viewform?usp=dialog)==
### 測驗 `1`
考慮 `strtrim` 函式的作用是從給定的 C 字串中移除前導與尾端的空白字元,並回傳修剪 (trim) 後的子字串起始位址。它會直接修改原始字串內容(in-place),將尾端的空白字元以 `\0` 結束字元截斷,並將指標移到第一個非空白字元,供後續使用。以下字元會被當作空白處理:
| 字元 | 意義 | 十六進位 |
|------|------------|----------|
| `' '` | 空白鍵 | `0x20` |
| `'\t'` | tab 鍵 | `0x09` |
| `'\n'` | 換行字元 (LF) | `0x0A` |
| `'\r'` | 斷行符號 (CR) | `0x0D` |
| `'\v'` | 垂直定位字元 | `0x0B` |
| `'\f'` | 換頁字元 | `0x0C` |
> 在 Windows 系統中,每行結尾使用 CRLF 作為斷行符號;而在 Linux 則僅使用 LF 字元。因此,相較於 Linux,Windows 中的每行會多出一個 CR 字元。CR 原本的意思是 Carriage Return,對應鍵盤上的 Enter 鍵,LF 意思是 Line Feed,表示實際的換行動作。macOS 目前也與 Linux 一樣,使用 LF 作為行尾字元。
應用情境
- 處理使用者輸入:避免輸入欄前後的空格影響後續操作 (如登入、搜尋)
- 解析設定檔:行尾或行首常有空白,需先清除
- 處理網路協定標頭或資料行:HTTP headers、CSV 欄位等
我們利用[第 3 週測驗題](https://hackmd.io/@sysprog/linux2025-quiz3)提及的 SWAR 技巧,來實作 `strtrim` 函式 (只處理 ASCII,部分程式碼遮蔽)
```c
/* ASCII whitespace check */
static inline bool is_ascii_space(uint8_t c)
{
return c == ' ' || (c - '\t' <= '\r' - '\t');
}
/* Count leading ASCII space characters in a 64-bit word */
static inline int count_leading_spaces(uint64_t word)
{
for (int i = 0; i < AAAA; i++) {
uint8_t c = (word >> (BBBB)) & 0xFF;
if (!is_ascii_space(c))
return i;
}
return 8;
}
static char *strtrim(char *str)
{
uint8_t *s = (uint8_t *) str;
/* Fast skip leading whitespace using SWAR if 8-byte aligned */
for (; ((uintptr_t) s & CCCC) == 0; s += 8) {
uint64_t w = *(uint64_t *) s;
int n = count_leading_spaces(w);
if (n < 8) {
s += n;
break;
}
}
while (*s && is_ascii_space(*s))
s++;
if (!*s)
return (char *) s;
uint8_t *end = s + strlen((char *) s);
while (end > s && is_ascii_space(*(DDDD)))
end--;
*end = '\0';
return (char *) s;
}
```
考慮 [client.c](https://gist.github.com/jserv/e76914f2cd0fe8ff19f66c6149f60220) 是我們打造的網路聊天客戶端程式碼,對應的巨集定義如下:
```c
#define SERVER_ADDR "127.0.0.1"
#define SERVER_PORT 7788
```
而 `chat.c` 則是網路聊天的伺服器端程式碼,定義如下 (忽略標頭檔):
```c
/* Server configuration constants */
#define BACKLOG 128 /* Maximum pending connections */
#define MAX_EVENTS 64 /* Maximum epoll events per wait */
#define MAX_CLIENTS 1024 /* Maximum concurrent clients */
#define NICK_MAXLEN 32 /* Maximum nickname length */
enum { CHAT_OK = 0, CHAT_ERR = -1 };
/* store client information */
typedef struct {
int fd;
char nick[NICK_MAXLEN];
} client_t;
/* manage server state */
typedef struct {
int fd;
client_t *clients[MAX_CLIENTS];
} server_t;
```
主函式和事件主迴圈如下: (部分程式碼遮蔽)
```c
/* Allocates memory with error handling */
static inline void *my_alloc(size_t size)
{
void *ptr = malloc(size);
if (!ptr) {
perror("malloc");
exit(EXIT_FAILURE);
}
return ptr;
}
int main(void)
{
/* Initialize chat server */
server_t server = {.fd = 0, .clients = {NULL}};
if (chat_listen(&server, SERVER_ADDR, SERVER_PORT) != CHAT_OK) {
fprintf(stderr, "Failed to listen on %s:%d\n", SERVER_ADDR,
SERVER_PORT);
return CHAT_ERR;
}
/* Create epoll instance */
int epollfd = epoll_create1(0);
if (epollfd < 0) {
perror("epoll_create1");
return CHAT_ERR;
}
/* Add server socket to epoll */
struct epoll_event ev = {.events = EPOLLIN, .data.fd = server.fd};
if (FFFF < 0) {
return CHAT_ERR;
}
char buf[256];
/* Main event loop */
for (;;) {
/* Epoll event array for handling socket events */
struct epoll_event events[MAX_EVENTS];
int nfds = GGGG;
if (nfds < 0) {
return CHAT_ERR;
}
/* Process each epoll event */
for (int i = 0; i < nfds; i++) {
int fd = events[i].data.fd;
if (fd == server.fd) {
/* Handle new client connection */
int client_fd = chat_accept(&server);
if (client_fd < 0)
continue;
/* Initialize new client */
client_t *client = my_alloc(sizeof(client_t));
client->fd = client_fd;
snprintf(client->nick, NICK_MAXLEN, "anon:%d", client_fd);
server.clients[client_fd] = client;
/* Send welcome message */
snprintf(
buf, sizeof(buf),
"Server\r\nWelcome %s! Use /nick to set a nickname\r\n",
client->nick);
if (write(client_fd, buf, strlen(buf)) < 0)
perror("write welcome");
/* Broadcast join message */
snprintf(buf, sizeof(buf), "%s joined\n", client->nick);
broadcast_message(&server, buf, client_fd, 1);
/* Add client to epoll */
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = client_fd;
if (HHHH < 0) {
return CHAT_ERR;
}
} else {
/* Handle client data */
ssize_t nread = read(fd, buf, sizeof(buf) - 1);
if (nread <= 0) {
handle_disconnect(&server, epollfd, fd, buf);
} else {
handle_client_message(&server, epollfd, fd, buf, nread);
}
}
}
}
return 0;
}
```
對應的輔助函式如下:
```c
/* Sets a file descriptor to non-blocking mode
* @fd - File descriptor
* Returns: CHAT_OK on success, CHAT_ERR on failure
*/
static int set_nonblocking(int fd)
{
int flags = fcntl(fd, F_GETFL, 0);
if (flags < 0 || fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) {
perror("fcntl");
return CHAT_ERR;
}
return CHAT_OK;
}
/* Initializes server socket for listening
* @server - Server structure
* @host - Host address
* @port - Port number
* Returns: CHAT_OK on success, CHAT_ERR on failure
*/
static int chat_listen(server_t *server, const char *host, int port)
{
struct addrinfo hints = {
.ai_family = AF_INET,
.ai_socktype = SOCK_STREAM,
.ai_flags = AI_PASSIVE,
};
char port_str[6];
snprintf(port_str, sizeof(port_str), "%d", port);
struct addrinfo *res;
if (getaddrinfo(host, port_str, &hints, &res)) {
perror("getaddrinfo");
return CHAT_ERR;
}
int fd = -1;
for (struct addrinfo *rp = res; rp; rp = rp->ai_next) {
fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (fd < 0)
continue;
if (0 > setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &(int){1},
sizeof(int)) ||
bind(fd, rp->ai_addr, rp->ai_addrlen) < 0 ||
listen(fd, BACKLOG) < 0) {
close(fd);
continue;
}
break;
}
freeaddrinfo(res);
if (fd < 0) {
perror("bind/listen");
return CHAT_ERR;
}
server->fd = fd;
return set_nonblocking(fd);
}
/* Accepts a new client connection
* Returns: Client file descriptor on success, CHAT_ERR on failure
*/
static int chat_accept(server_t *server)
{
int fd = accept(server->fd, NULL, NULL);
if (fd < 0) {
if (errno != EAGAIN && errno != EWOULDBLOCK)
perror("accept");
return CHAT_ERR;
}
return set_nonblocking(fd) == CHAT_OK ? fd : CHAT_ERR;
}
/* Sends a message to a single client
* @client - Client to send to
* @msg - Message to send
* @len - Message length
*/
static void send_to_client(client_t *client, const char *msg, int len)
{
if (write(client->fd, msg, len) < 0)
perror("write");
}
/* Broadcasts a message to all clients except the sender
* @server - Server structure
* @buf - Message content
* @sender_fd - Sender's file descriptor
* @server_info - Flag for server-generated messages
*/
static void broadcast_message(server_t *server,
const char *buf,
int sender_fd,
int server_info)
{
char msg[256];
int len = server_info ? snprintf(msg, sizeof(msg), "Server\r\n%s", buf)
: snprintf(msg, sizeof(msg), "%s\r\n%s",
server->clients[sender_fd]->nick, buf);
/* Iterate through client array to send message */
for (int i = 0; i < MAX_CLIENTS; i++) {
client_t *client = server->clients[i];
if (client && i != sender_fd)
send_to_client(client, msg, len);
}
}
static void handle_disconnect(server_t *server, int epollfd, int fd, char *buf)
{
client_t *client = server->clients[fd];
snprintf(buf, 256, "%s left\n", client->nick);
broadcast_message(server, buf, fd, 1);
EEEE;
close(fd);
free(client);
server->clients[fd] = NULL;
}
static void handle_client_message(server_t *server,
int epollfd,
int fd,
char *buf,
ssize_t nread)
{
buf[nread] = '\0';
client_t *client = server->clients[fd];
if (!strncmp(buf, "/quit", 5) || !strncmp(buf, "/exit", 5)) {
handle_disconnect(server, epollfd, fd, buf);
} else if (!strncmp(buf, "/nick", 5)) {
const char *nick = strtrim(buf + 5);
strncpy(client->nick, nick, NICK_MAXLEN - 1);
client->nick[NICK_MAXLEN - 1] = '\0';
} else {
broadcast_message(server, buf, fd, 0);
}
}
```
補完程式碼,使其運作符合預期。作答規範:
* `AAAA` 和 `CCCC` 為十進位數值
* `BBBB` 和 `DDDD` 為合法表示式,不得出現乘法和除法運算子
* `EEEE`, `FFFF`, `GGGG`, `HHHH` 皆為以 `epoll_` 開頭的函式呼叫
* `EEEE`, `FFFF`, `HHHH` 包含 `EPOLL_` 開頭的巨集
* GGGG 包含 `MAX_EVENTS`
* 使用一致的程式碼縮排風格,以最精簡的方式撰寫,不包含表示式前後的空白字元,皆不得出現 `;` 或 `?` 或 `;` 字元
:::success
延伸問題:
* 解釋上方程式碼運作原理
* 藉由在客戶端引入對話模擬 (如 [ELIZA](https://zh.wikipedia.org/zh-tw/ELIZA%E6%95%88%E6%87%89)),量化上方程式碼,並提出針對其並行處理能力的改進
:::
---
### 測驗 `2`
延續[第 6 週測驗題](https://hackmd.io/@sysprog/linux2025-quiz6)的測驗 `3`。在處理大規模資料集時,常見的需求是快速判斷某個元素是否存在於一個龐大的集合中,這就是成員資格查詢 (membership query)。傳統的資料結構面臨挑戰:
* 雜湊表 (Hash Table): 雖然平均查詢時間為 $O(1)$,但需要儲存元素本身或其標識,空間開銷可能非常大
* 串列/陣列 (List/Array): 無序串列查詢效率低 ($O(n)$);有序結構(如平衡樹、排序陣列)查詢效率尚可 ($O(\log{n})$),但仍需儲存元素,且插入和維護成本較高
當應用場景滿足以下條件時,Bloom Filter 提供截然不同的解決方案:
1. 可以容忍一定的誤報 (False Positives):即查詢結果為「可能存在」,但元素實際上不在集合中
2. 絕不允許漏報 (No False Negatives):即查詢結果為「絕對不存在」時,元素一定不在集合中
3. 對空間效率和查詢速度有極高要求
Bloom Filter 正是藉由犧牲絕對的準確性(允許誤報),換取極佳的空間和時間效率。
Bloom Filter 由以下二部分組成:
1. 位元陣列 (Bit Array): 一個長度為 $m$ 的位元陣列,所有位元初始值均為 0
2. k 個雜湊函數: $k$ 個獨立的雜湊函數 $h_1, h_2, \dots, h_k$。每個函數將輸入的鍵 (key) 均勻地映射到 $[0, m-1]$ 的範圍內。理想情況下,這些雜湊函數的結果應相互獨立
關鍵操作:
* 添加元素 (Add / Insertion):
1. 對於要添加的元素 $x$,使用 $k$ 個雜湊函數分別計算其雜湊值:$h_1(x), h_2(x), \dots, h_k(x)$。這些雜湊值對應位元陣列中的 $k$ 個索引位置。
2. 將位元陣列中這 $k$ 個索引位置的位元設定為 1。 (注意:即使某個位元原本已經是 1,也保持為 1)。
* 查詢元素 (Query / Check):
1. 對於要查詢的元素 $y$,同樣使用 $k$ 個雜湊函數計算其雜湊值:$h_1(y), h_2(y), \dots, h_k(y)$。
2. 檢查位元陣列中這 $k$ 個索引位置的位元:
* 如果所有位元都為 1: 則判斷元素 $y$ 可能存在 (Possibly Present) 於集合中。這是可能產生誤報的情況。
* 如果至少有一個位元為 0: 則判斷元素 $y$ 絕對不存在 (Definitely Not Present) 於集合中。這是 Bloom Filter 不會有漏報的保證。
誤報率 (False Positive Probability) 是理解 Bloom Filter 性能和進行參數設計的關鍵。我們的目標是計算當查詢一個從未被添加過的元素時,被誤判為存在的機率,即誤報率 ($P_{fp}$)。
假設 (理想情況):
1. 均勻性 (Uniformity): 每個雜湊函數都將任意鍵均勻映射到位元陣列的 $[0, m-1]$ 範圍內,即映射到任何一個特定位置的機率是 $\frac{1}{m}$
2. 獨立性 (Independence): $k$ 個雜湊函數的輸出結果是相互獨立的
推導步驟:
1. 單個位元保持為 0 的機率 (單次雜湊):
對於位元陣列中的某一個特定位元,當插入一個鍵時,被某個特定的雜湊函數 $h_i$ 選中的機率是 $\frac{1}{m}$。因此,這個位元不被 $h_i$ 選中的機率是 $(1 - \frac{1}{m})$。
2. 單個位元保持為 0 的機率 (k 次雜湊, 單個鍵):
由於有 $k$ 個獨立的雜湊函數,這個特定位元在插入單個鍵(經過 $k$ 次雜湊)後仍然保持為 0 的機率是:
$P(\text{bit is 0 after 1 key}) = (1 - \frac{1}{m})^k$
3. 單個位元保持為 0 的機率 (插入 N 個鍵):
假設我們獨立地插入 $N$ 個不同的鍵。對於某一個特定位元,它在插入 $N$ 個鍵後仍然保持為 0 的機率是(每次插入的 $k$ 次雜湊都不改變此位元):
$$
P_0 = \left(1 - \frac{1}{m}\right)^{kn}
$$
4. 使用近似簡化:
當位元陣列長度 $m$ 相對於 $k N$ 很大時,$1/m$ 是一個很小的值。根據極限 $\lim_{x \to \infty} (1 - 1/x)^x = e^{-1}$,我們可以得到近似:
$(1 - \frac{1}{m}) \approx e^{-1/m}$
這個近似在 $m$ 越大時越精確。
5. 近似單個位元保持為 0 的機率:
將上述近似代入第 3 步的公式:
$$
P_0 \approx (e^{-1/m})^{kN} = e^{-kN/m}
$$
6. 單個位元被設為 1 的機率:
在插入 $N$ 個鍵後,某個特定位元被設為 1 的機率是:
$$
P_1 = 1 - P_0 \approx 1 - e^{-kN/m}
$$
7. 誤報率 ($P_{fp}$) 的計算:
考慮一個從未被插入集合的元素 $y$。查詢 $y$ 時發生誤報,意味著 $y$ 對應的 $k$ 個雜湊位置 $h_1(y), \dots, h_k(y)$ 上的位元恰好都為 1。
為了簡化計算,我們做一個關鍵的(近似)假設:對於一個不在集合中的元素 $y$,其 $k$ 個雜湊位置上的位元狀態是相互獨立的。
在該獨立性假設下,誤報率 $P_{fp}$(也常用 $f$ 表示)約等於單個位元為 1 的機率的 $k$ 次方:
$$
f = (P_1)^k \approx (1 - e^{-kN/m})^k
$$
參數最佳化:尋找最優的 k
給定預計插入的元素數量 $N$ 和位元陣列大小 $m$,我們希望選擇一個最佳的雜湊函數個數 $k$,使得誤報率 $f(k) = (1 - e^{-kN/m})^k$ 最小。
藉由對 $\ln(f(k))$ 求導並令導數為 0,可以解得使誤報率最小的最優雜湊函數個數 $k_{opt}$ 滿足:
$\frac{k_{opt} N}{m} = \ln 2$
即:
$k_{opt} = \frac{m}{N} \ln 2$
由於 $k$ 必須是整數,實際應用中通常取 $k = \text{round}(\frac{m}{N} \ln 2)$。
反向計算:給定 N 和期望誤報率 f,計算 m 和 k
1. 計算 k: 當 $k$ 取最優值時,誤報率 $f \approx (1/2)^k$。因此:
$k_{opt} \approx \log_2(1/f) = -\log_2(f)$。
2. 計算 m: 從 $k_{opt} = \frac{m}{N} \ln 2$ 得到 $m = \frac{N k_{opt}}{\ln 2}$。代入 $k_{opt}$:
$m \approx \frac{N (-\log_2(f))}{\ln 2} = -N \frac{\ln(f)}{(\ln 2)^2}$
整理以上可之:
* 所需位元數 $m \ge \frac{N \ln(1/f)}{(\ln 2)^2} \approx 1.44 N \log_2(1/f)$
* 雜湊函數個數 $k = \text{round}(\frac{m}{N} \ln 2) \approx \text{round}(\log_2(1/f))$
Bloom filter 的優點:
* 空間效率極高: 只需 $m$ 個位元,無需儲存元素本身
* 時間效率高: 插入和查詢操作時間複雜度均為 O(k),與集合元素總數 $N$ 無關
* 無漏報 (No False Negatives): 查詢結果為「不存在」時絕對可靠。
* 可合併性: 兩個參數 ($m, k$) 相同的 Bloom Filter 可藉由按位或 (Bitwise OR) 操作合併,代表兩個集合的並集
Bloom filter 的缺點:
* 存在誤報 (False Positives): 查詢結果為「可能存在」時可能不準確。誤報率 $f$ 受 $m, N, k$ 影響
* 無法安全刪除元素 (標準 Bloom Filter): 直接將位元從 1 改為 0 可能影響其他元素,導致漏報。(可用 Counting Bloom Filter 改進,但增加空間開銷)
* 需要預估元素數量 N: 為獲得理想誤報率,需提前估計 $N$ 以合理設定 $m$ 和 $k$。若實際 $N$ 遠超預期,誤報率會急劇上升。(可用 Scalable Bloom Filter 改進)
理論上 Bloom Filter 需要 $k$ 個獨立且均勻的雜湊函數,在實際硬中,產生 $k$ 個高品質且完全獨立的雜湊函數成本較高。常見的解決方案有:
- [ ] 方法一:使用單一高品質雜湊函數配合不同種子/參數
Linux 核心的 eBPF Bloom Filter 實作採用此方法。它使用核心的 `jhash` 函數,並為產生第 $i$ 個雜湊值時提供不同的種子 (`bloom->hash_seed + i`)。
```c
/* Linux Kernel BPF Bloom Filter hash generation (示意) */
static u32 hash(struct bpf_bloom_filter *bloom, const void *value,
u32 value_size, u32 i)
{
u32 h;
// 使用 jhash 函數,種子 = filter的基礎種子 + 雜湊函數索引 i
h = jhash(value, value_size, bloom->hash_seed + i);
// 將 32 位雜湊值 h 映射到位元陣列索引 (使用掩碼)
return h & bloom->bitset_mask;
}
```
- [ ] 方法二:使用單一雜湊函數進行雙重雜湊或衍生 (如 FNV-1a 範例)
以下是用 FNV-1a 雜湊函數和簡化版雙重雜湊策略的具體實作範例:
* 參數設定:
* 使用 $k = 4$ 個雜湊函式產生 4 個位元索引。
* 位元陣列長度設為 $m = 2^{27} = 134,217,728$。
* 雜湊函數 (64 位元 FNV-1a 變體):
```c
#define hash(key, keylen) ({ \
uint64_t h = 14695981039346656037ULL; /* FNV offset basis */ \
const char *data = (const char *)key; \
for (size_t i = 0; i < keylen; i++) { \
h = (h ^ data[i]) * 1099511628211ULL; /* FNV prime */ \
} \
h; \
})
```
* 索引產生方式 (簡化版雙重雜湊): 首先計算一次基礎雜湊值 `hbase = hash(key, keylen)`,然後產生 $k$ 個索引:
```c
// hbase 是 64 位元的 FNV-1a 雜湊結果
// filter->size 應為 m (即 2^27)
// 產生第 i 個索引 (i 從 0 到 k-1)
indices[i] = (hbase + i * (hbase >> 32)) & (filter->size - 1);
```
這裡利用 64 位雜湊值的高 32 位作為步長因子,與基礎雜湊值組合後藉由位與操作(因為 $m$ 是 2 的冪)映射到索引範圍。
* 理論誤判率計算: 假設此 Bloom Filter 插入 $n = 1,000,000$ 個元素:
$$
\frac{kn}{m} = \frac{4 \times 1,000,000}{2^{27}} = \frac{4,000,000}{134,217,728} \approx 0.0298
$$
根據誤判率公式 $f \approx (1 - e^{-kn/m})^k$:
$$
f \approx (1 - e^{-0.0298})^k = (1 - 0.9706)^4 = (0.0294)^4
$$
$$
f \approx 7.4 \times 10^{-7}
$$
→ 理論誤判率約為 $7.4 \times 10^{-5} \%$ 或 0.000074%。
FNV-1a 與索引衍生的實務考量與限制:
1. 雜湊偏態: FNV-1a 雜湊函數雖然計算速度快,但對於具有相似字首的字串(例如 "item-001", "item-002")容易產生相似的雜湊值或碰撞,其輸出分佈的均勻性可能不如 MurmurHash 或 xxHash 等現代雜湊函數,這會影響位元陣列中位元設定的均勻性。
2. 索引衍生獨立性不足: 這種藉由單一基礎雜湊值 `hbase` 推導產生 $k$ 個索引的方法(包括簡化版雙重雜湊),其產生的 `indices[0], indices[1], ..., indices[k-1]` 之間存在較強的相關性,並非真正獨立。這違反了理論推導中「位元設定為獨立事件」的關鍵假設,可能導致實際的誤判率高於理論預估值。
數學推導中使用的「雜湊函數獨立」和「位元狀態獨立」的假設在現實中通常是近似滿足的。
* 使用同一個雜湊算法配合不同種子或藉由衍生方式產生的 $k$ 個值並非嚴格獨立
* 位元陣列中不同位元的狀態也非完全獨立
然而:
1. 實用性: 只要基礎雜湊函數具有良好的均勻分佈性與雪崩效應(輸入微小變化導致輸出大幅隨機變化),那麼即使產生 $k$ 個值的方法存在一定的依賴性,基於獨立性假設推導出的誤判率公式 $f \approx (1 - e^{-kN/m})^k$ 仍然是一個非常有用的性能估計與參數設計指南。
2. 雜湊品質是關鍵: 無論如何產生 $k$ 個值,基礎雜湊函數的品質(均勻性、低碰撞率)對於 Bloom Filter 的實際性能至關重要。選擇高品質的雜湊函數是改進 Bloom Filter 性能的重要一步。
3. 權衡: 這體現了 Bloom Filter 設計中的權衡:追求理論上的完美獨立性可能代價過高,而實用的近似方法在效率和效果之間取得了平衡。認識到理論與實踐的差距,並了解所用方法的局限性很重要。
Linux 核心在 eBPF (extended Berkeley Packet Filter) 子系統中,藉由 `BPF_MAP_TYPE_BLOOM_FILTER` 類型的 eBPF map,為核心空間和使用者空間的 eBPF 程序提供了使用 Bloom Filter 的能力。
參數計算與實作 ([kernel/bpf/bloom_filter.c](https://github.com/torvalds/linux/blob/master/kernel/bpf/bloom_filter.c))
核心程式碼 `bloom_filter_map_alloc` 負責創建 Bloom Filter map。關鍵步驟是根據使用者提供的 `max_entries` ($N$) 和 `nr_hash_funcs` ($k$) 計算所需的位元陣列大小 $m$ (`nr_bits`):
```c
static struct bpf_map *bloom_filter_map_alloc(union bpf_attr *attr)
{
// ... (獲取 N = attr->max_entries 和 k = attr->nr_hash_funcs) ...
u64 nr_bits; // 對應 m
// ...
/* 根據 N 和 k 計算最佳 m (nr_bits) */
/* 使用公式: m = N * k / ln(2),近似 1/ln(2) ≈ 7/5 */
if (check_mul_overflow(attr->max_entries, nr_hash_funcs, &nr_bits) ||
/* 計算 nr_bits ≈ (N * k / 5) * 7 */
check_mul_overflow(nr_bits / 5, (u32)7, &nr_bits) ||
nr_bits > (1UL << 31)) {
/* 溢出或過大,限制 m 最大為 U32_MAX */
// ... (設定 bitset_bytes, bitset_mask)
} else {
/* 確保最小尺寸,並向上取整到 2 的冪次方 */
if (nr_bits <= BITS_PER_LONG)
nr_bits = BITS_PER_LONG;
else
nr_bits = roundup_pow_of_two(nr_bits);
// ... (設定 bitset_bytes, bitset_mask)
}
// ... (分配記憶體) ...
// ... (初始化隨機種子 bloom->hash_seed) ...
}
```
程式碼分析:
1. 計算目標: 根據使用者輸入的 $N$ 和 $k$,計算出一個能使誤報率接近最小的位元陣列大小 $m$。
2. 公式應用與近似: 核心程式碼直接用 $m = \frac{N k}{\ln 2}$ 的關係,並使用 $\frac{1}{\ln 2} \approx \frac{7}{5}$ 進行定點近似。
3. 邊界處理與效率改進:處理計算溢出,確保最小尺寸,並將 $m$ 取整到 2 的冪次方,以使用高效的位與操作進行索引映射。
4. 隨機種子: 每個 Bloom Filter 實例使用隨機的 `hash_seed`,配合 `jhash` 產生 $k$ 個不同的雜湊值。
本次測驗中,採用 FNV-1a 的 64 位元變種。
Bloom filter 適合高效能並行場景的考量是:
1. 新增與查詢操作都是 lock-free
2. 不會移除元素
然而,這些操作在多核處理器或多執行緒環境下,就會遇到幾個重要的問題:
1. Data Race 風險:假設二個執行緒同時對 `filter->data[index]` 做 bitwise-OR 操作或查詢,若未使用 atomics,則會出現下列典型的競爭情形:
```c
filter->data[index] |= bit;
```
這可能會導致其中一個數值被覆蓋,產生 bit flip 丟失問題,進而導致查詢誤判機率上升,甚至出現 false negative(理論上不該出現)。
2. Cache coherence / False Sharing: 由於多執行緒可能同時寫入鄰近的 `uint64_t` 區塊,若 `data[]` 未妥善對齊,則可能導致 CPU 快取行為效率低下。若二個執行緒操作相鄰但不同的 `data[i]`,這些可能落在同一 cache line,導致 false sharing;每次更新位元,會造成整個 cache line 的 write invalidation,增加資料匯流排負擔與記憶體同步延遲
> 延伸閱讀: [並行程式設計: Atomics 操作](https://hackmd.io/@sysprog/concurrency-atomics)
需要留意的是,儘管使用 atomic RMW,但不足以完全解決 contention:若多執行緒集中寫入相同區段 (高 collision key),仍會造成 atomic cache ping-pong,於是需要改進雜湊函數的數值分布。另外,雖然使用 `_Alignas(64)` 已有助於對齊,但若結構為 `struct { _Atomic uint64_t data[n]; }`,仍需考慮整體記憶體佈局 (memory layout) 和頁面分佈。
考慮以下 concurrent bloom filter,其標頭檔: (`bloom.h`)
```c
#pragma once
#include <stddef.h>
/** Opaque Bloom filter structure */
typedef struct bloom_internal bloom_t;
/** Function type for allocating memory */
typedef void *(*bloom_alloc_t)(size_t);
/** Function type for deallocating memory */
typedef void (*bloom_dealloc_t)(void *, size_t);
/*------------------------------*/
/* Construction and destruction */
/*------------------------------*/
/**
* @brief Create a Bloom filter with default size.
* @param alloc Optional allocator. If NULL, a default allocator is used.
* @return Pointer to a new Bloom filter or NULL on failure.
*/
bloom_t *bloom_new(bloom_alloc_t alloc);
/**
* @brief Create a Bloom filter with specified size in bits.
* @param size Number of bits. Must be a power of two.
* @param alloc Optional allocator. If NULL, a default allocator is used.
* @return Pointer to a new Bloom filter or NULL on failure.
*/
bloom_t *bloom_new_with_size(size_t size, bloom_alloc_t alloc);
/**
* @brief Destroy a Bloom filter and free resources.
* @param filter Pointer to a pointer to a Bloom filter. Will be set to NULL.
* @param dealloc Optional deallocator. If NULL, a default deallocator is used.
*/
void bloom_destroy(bloom_t **filter, bloom_dealloc_t dealloc);
/*------------------*/
/* Filter operations */
/*------------------*/
/**
* @brief Clear all bits in the Bloom filter.
* @param filter Bloom filter instance.
*/
void bloom_clear(bloom_t *filter);
/**
* @brief Add a key to the Bloom filter.
* @param filter Bloom filter instance.
* @param key Pointer to the key.
* @param keylen Length of the key in bytes.
*/
void bloom_add(bloom_t *filter, const void *key, size_t keylen);
/**
* @brief Test whether a key may be present in the Bloom filter.
* @param filter Bloom filter instance.
* @param key Pointer to the key.
* @param keylen Length of the key in bytes.
* @return 1 if possibly present, 0 if definitely not present.
*/
int bloom_test(bloom_t *filter, const void *key, size_t keylen);
/*--------------------------*/
/* Default memory allocators */
/*--------------------------*/
/**
* @brief Default allocator using malloc.
* @param size Number of bytes to allocate.
* @return Pointer to allocated memory.
*/
void *bloom_alloc(size_t size);
/**
* @brief Default deallocator using free.
* @param ptr Pointer to memory to free.
* @param size Number of bytes to free (ignored by default).
*/
void bloom_free(void *ptr, size_t size);
```
效能評比程式可見 [bench.c](https://gist.github.com/jserv/042517212164c32d9998aafa77ee740a)
對應的 C 程式:
```c
#include <stdatomic.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include "bloom.h"
struct bloom_internal {
size_t size; /* Number of bits in the filter */
_Atomic uint64_t *data; /* Word-aligned, atomic bit array */
};
/* Create a new Bloom filter with default size (2^27 bits) */
bloom_t *bloom_new(bloom_alloc_t allocator)
{
return bloom_new_with_size(1ULL << 27, allocator);
}
/* Create a new Bloom filter with specified size in bits */
bloom_t *bloom_new_with_size(size_t size, bloom_alloc_t allocator)
{
/* Round up to nearest 64-bit word */
size_t words = (size + 63) / 64;
size_t bytes = words * sizeof(uint64_t);
bloom_t *filter = allocator(sizeof(bloom_t) + bytes);
if (!filter)
return NULL;
filter->size = size;
filter->data = (_Atomic uint64_t *) ((char *) filter + sizeof(bloom_t));
/* Initialize to zero */
for (size_t i = 0; i < words; i++) {
atomic_store(&filter->data[i], 0);
}
return filter;
}
/* Destroy the Bloom filter and set pointer to NULL */
void bloom_destroy(bloom_t **filter, bloom_dealloc_t deallocator)
{
if (filter && *filter) {
deallocator(*filter, sizeof(bloom_t) + (((*filter)->size + 63) / 64) *
sizeof(uint64_t));
*filter = NULL;
}
}
/* Clear all bits in the filter (requires external synchronization) */
void bloom_clear(bloom_t *filter)
{
size_t words = (filter->size + 63) / 64;
for (size_t i = 0; i < words; i++) {
atomic_store(&filter->data[i], 0);
}
}
/* Set a bit at the given key position */
static void set(bloom_t *filter, size_t key)
{
size_t index = key / 64;
uint64_t bit = 1ULL << (key % 64);
IIII;
}
/* Test if a bit is set at the given key position */
static int get(bloom_t *filter, size_t key)
{
size_t index = key / 64;
uint64_t bit = 1ULL << (key % 64);
return (JJJJ & bit) != 0;
}
/* Add a key to the Bloom filter */
void bloom_add(bloom_t *filter, const void *key, size_t keylen)
{
uint64_t hbase = hash(key, keylen);
/* Use double hashing for 4 indices */
for (int i = 0; i < 4; i++) {
size_t h = (hbase + i * (hbase >> 32)) &
(filter->size - 1); /* Assumes size is power of 2 */
set(filter, h);
}
}
/* Test if a key is likely in the Bloom filter */
int bloom_test(bloom_t *filter, const void *key, size_t keylen)
{
uint64_t hbase = hash(key, keylen);
for (int i = 0; i < 4; i++) {
size_t h = (hbase + i * (hbase >> 32)) & (filter->size - 1);
if (!get(filter, h))
return 0;
}
return 1;
}
/* Default allocator using calloc for zeroed memory */
void *bloom_alloc(size_t size)
{
return calloc(1, size);
}
/* Default deallocator */
void bloom_free(void *ptr, size_t size)
{
(void) size;
free(ptr);
}
```
補完程式碼,使其運作符合預期。作答規範:
* `IIII` 和 `JJJJ` 為 `atomic_` 開頭的函式呼叫
* 使用一致的程式碼縮排風格,以最精簡的方式撰寫,不包含表示式前後的空白字元,皆不得出現 `;` 或 `?` 或 `;` 字元
---
### 測驗 `3`
我們想精確測量 GNU/Linux 上行程和執行緒 (NPTL) 的 context switch(上下文切換)成本,其基本原理是,利用 [pipe](https://man7.org/linux/man-pages/man2/pipe.2.html) 系統呼叫雙向通訊機制造成二個執行單位 (process 或 thread) 之間交替等待與喚醒,並量測整體來回的時間,進而估算一次 context switch 的平均耗時。
> 延伸閱讀: [Arm-Linux](https://wiki.csie.ncku.edu.tw/embedded/arm-linux)
測量 context switch 前,我們準備以下程式碼:
```c
#define _GNU_SOURCE
#include <math.h>
#include <poll.h>
#include <pthread.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/wait.h>
#define N_ITERATIONS 1000
#define N_REPETITIONS 10
#define PIPE_TIMEOUT_MS 1000
typedef struct {
double avg, min, max, stddev;
} stat_t;
typedef struct {
int iterations, pipe_read_fd, pipe_write_fd, cpu_id, priority;
pthread_barrier_t *barrier;
} thread_args_t;
int set_affinity(int cpu_id)
{
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(cpu_id, &cpuset);
return sched_setaffinity(0, sizeof(cpuset), &cpuset);
}
int set_realtime_priority(int priority)
{
struct sched_param param = {.sched_priority = priority};
return sched_setscheduler(0, SCHED_FIFO, ¶m);
}
bool check_root()
{
return geteuid() == 0;
}
void compute_stats(const double *times, int n, stat_t *stats)
{
double sum = 0.0, sum_sq = 0.0;
stats->min = stats->max = times[0];
for (int i = 0; i < n; i++) {
sum += times[i];
sum_sq += times[i] * times[i];
if (times[i] < stats->min)
stats->min = times[i];
if (times[i] > stats->max)
stats->max = times[i];
}
stats->avg = sum / n;
stats->stddev = sqrt((sum_sq / n) - stats->avg * stats->avg);
}
```
以下程式會建立二個 `pipe`:
- `pipe1`: A 寫入、B 載入
- `pipe2`: B 寫入、A 載入
這構成 A ↔ B 之間一來一回的同步通訊機制,無論是行程抑或執行緒。
彼此等待與喚醒造成 context switch
- A 寫入 pipe1 → B 等待 pipe1 資料 → 喚醒並載入
- B 寫入 pipe2 → A 等待 pipe2 資料 → 喚醒並載入
- 如此重複若干次
每次來回交替,會造成二次 context switch (即 A → B 和 B → A)。
> 對照[第 3 週測驗題](https://hackmd.io/@sysprog/linux2025-quiz3)的測驗三
針對行程的測試流程
- 親代行程 (A) 綁定 CPU~0~,設定 `SCHED_FIFO` 優先權 95
- 子行程 (B) 綁定 CPU~1~,設定 `SCHED_FIFO` 優先權 94
- 親代行程記錄時間,在每次來回後讀出 pipe2 的資料
- 結束後計算「平均每次 context switch = 來回時間 / (2 * iterations)」
對應的原始程式碼 (部分遮蔽):
```c
double measure_context_switch(int iterations,
int pipe1[2],
int pipe2[2],
pid_t pid)
{
char token = 'x';
struct timespec start, end;
if (pid == 0) {
set_affinity(THIS_ISNT_VALID1);
set_realtime_priority(THIS_ISNT_VALID2);
close(pipe1[1]);
close(pipe2[0]);
for (int i = 0; i < iterations; i++) {
char buf;
poll((struct pollfd[]){{pipe1[0], POLLIN}}, THIS_ISNT_VALID3, PIPE_TIMEOUT_MS);
if (read(pipe1[0], &buf, 1) != 1)
exit(EXIT_FAILURE);
if (write(pipe2[1], &token, 1) != 1)
exit(EXIT_FAILURE);
}
close(pipe1[0]);
close(pipe2[1]);
exit(EXIT_SUCCESS);
}
set_affinity(IGNORE_THIS1);
set_realtime_priority(IGNORE_THIS2);
close(pipe1[0]);
close(pipe2[1]);
clock_gettime(CLOCK_MONOTONIC, &start);
for (int i = 0; i < iterations; i++) {
if (write(pipe1[1], &token, 1) != 1)
exit(EXIT_FAILURE);
char buf;
poll((struct pollfd[]){{pipe2[0], POLLIN}}, IGNORE_THIS3, PIPE_TIMEOUT_MS);
if (read(pipe2[0], &buf, 1) != 1)
exit(EXIT_FAILURE);
}
clock_gettime(CLOCK_MONOTONIC, &end);
close(pipe1[1]);
close(pipe2[0]);
wait(NULL);
return ((end.tv_sec - start.tv_sec) * 1e6 +
(end.tv_nsec - start.tv_nsec) / 1e3) /
(2.0 * iterations);
}
```
關鍵操作:
- 使用 `poll()` 等待對方寫入資料,確保有 blocking,進而觸發 context switch
- 使用 `SCHED_FIFO` 保證完全不被 NORMAL/OTHER 等非即時行程干擾
- CPU affinity 保證切換不受處理器核遷移 (CPU migration) 影響
執行緒測試流程:
- 主執行緒與副執行緒分別固定在 CPU~0~ / CPU~1~,使用 [pthread_barrier](https://man7.org/linux/man-pages/man3/pthread_barrier_wait.3p.html) 同步起跑
- 也是透過二條 pipe 傳遞訊號與資料,一來一回代表二次 context switch
對應的原始程式碼 (部分遮蔽):
```c
void *thread_func(void *arg)
{
thread_args_t *args = arg;
char token = 'x';
set_affinity(args->cpu_id);
set_realtime_priority(args->priority);
if (args->cpu_id == 0)
if (write(args->pipe_write_fd, &token, 1) != 1)
exit(EXIT_FAILURE);
pthread_barrier_wait(args->barrier);
for (int i = 0; i < args->iterations; i++) {
char buf;
poll((struct pollfd[]){{args->pipe_read_fd, POLLIN}}, IGNORE_THIS4,
PIPE_TIMEOUT_MS);
if (read(args->pipe_read_fd, &buf, 1) != 1)
exit(EXIT_FAILURE);
if (write(args->pipe_write_fd, &token, 1) != 1)
exit(EXIT_FAILURE);
}
return NULL;
}
double measure_thread_switch(int iterations)
{
int pipe1[2], pipe2[2];
pthread_t t1, t2;
pthread_barrier_t barrier;
thread_args_t a1 = {iterations, 0, 0, 0, 95, &barrier},
a2 = {iterations, 0, 0, 1, 94, &barrier};
if (pipe(pipe1) == -1 || pipe(pipe2) == -1)
exit(EXIT_FAILURE);
pthread_barrier_init(&barrier, NULL, 2);
a1.pipe_read_fd = pipe1[0], a1.pipe_write_fd = pipe2[1];
a2.pipe_read_fd = pipe2[0], a2.pipe_write_fd = pipe1[1];
struct timespec start, end;
clock_gettime(CLOCK_MONOTONIC, &start);
pthread_create(&t1, NULL, thread_func, &a1);
pthread_create(&t2, NULL, thread_func, &a2);
pthread_join(t1, NULL);
pthread_join(t2, NULL);
clock_gettime(CLOCK_MONOTONIC, &end);
close(pipe1[0]);
close(pipe1[1]);
close(pipe2[0]);
close(pipe2[1]);
pthread_barrier_destroy(&barrier);
return ((end.tv_sec - start.tv_sec) * 1e6 +
(end.tv_nsec - start.tv_nsec) / 1e3) /
(2.0 * iterations);
}
```
實作手法整理:
| 設計要素 | 原因 |
|-----------------|------|
| `pipe + poll()` | 強制 thread/process block,保證產生 context switch,而非 busy-wait 或同步競爭 |
| `SCHED_FIFO` | 避免被其他一般行程干擾,確保測量只涵蓋必要切換成本 |
| `CPU affinity` | 排除 CPU 排程器中遷移導致的 TLB flush、熱區 (hot region) 切換等雜訊 |
| `fork()` 與 `pthread_create()` | 分別製造行程/執行緒的互動情境,便於比較差異 |
主程式:
```c
int main(int argc, char *argv[])
{
int iters = N_ITERATIONS;
int reps = N_REPETITIONS;
if (!check_root())
fprintf(stderr, "Not running as root: RT scheduling may fail\n");
double *results = malloc(reps * sizeof(double));
stat_t stats;
for (int i = 0; i < reps; i++) {
int pipe1[2], pipe2[2];
if (pipe(pipe1) == -1 || pipe(pipe2) == -1)
exit(EXIT_FAILURE);
pid_t pid = fork();
if (pid == -1)
exit(EXIT_FAILURE);
results[i] = measure_context_switch(iters, pipe1, pipe2, pid);
}
compute_stats(results, reps, &stats);
printf("Process context switch (%d iters x %d reps):\n", iters, reps);
printf(" Avg: %.3f us, Min: %.3f us, Max: %.3f us, Stddev: %.3f us\n",
stats.avg, stats.min, stats.max, stats.stddev);
for (int i = 0; i < reps; i++) {
results[i] = measure_thread_switch(iters);
}
compute_stats(results, reps, &stats);
printf("Thread context switch (%d iters x %d reps):\n", iters, reps);
printf(" Avg: %.3f us, Min: %.3f us, Max: %.3f us, Stddev: %.3f us\n",
stats.avg, stats.min, stats.max, stats.stddev);
free(results);
return 0;
```
本程式檔名為 `ctx.c`,編譯方式:
```shell
$ gcc -Wall -O2 -o ctx ctx.c -lm -lpthread
```
執行時務必要用超級使用者權限:
```shell
$ sudo ./ctx
```
在 AMD Ryzen Threadripper 2990WX 32-Core (64) @ 3.00 GHz 測試,參考輸出:
```
Process context switch (1000 iters x 10 reps):
Avg: 3.903 us, Min: 3.790 us, Max: 4.479 us, Stddev: 0.196 us
Thread context switch (1000 iters x 10 reps):
Avg: 3.957 us, Min: 3.932 us, Max: 3.991 us, Stddev: 0.017 us
```
行程和執行緒的 context switch 切換成本為何如此接近?二者相差非常小,幾乎在誤差範圍內。
在 Linux 中,無論是 process 抑或 thread,其實都是 `task_struct`,亦即 Linux 核心用以管理排程的基本單元。差異僅在於:
- process:有獨立的 `mm_struct`,即自身的虛擬記憶體空間
- thread:與其他執行緒共享 `mm_struct`,也就是與其他執行緒共享同一定址空間 (address space)
從核心的 CPU 排程器 (如 CFS 或 EEVDF) 角度來看,切換的是 `task_struct` 實體,而非高階語意上的 "process vs thread" (把你書架上的[恐龍書](https://gholk.github.io/ptt/M.1528439323.A.5B7.html)移開,不要弄髒書架)。這說明何以二者的 context switch 成本在 Linux 上幾乎一樣。
> [學不好作業系統,不能怪你](https://www.facebook.com/groups/system.software2025/posts/973639714972830/)
無論是透過 `fork()` 建立的子行程還是透過 `pthread_create()` 建立的執行緒,Linux 都以相同的結構(`task_struct`)管理它們,且使用相同的排程機制。因此,context switch 的基本開銷:儲存/載入暫存器、切換 kernel stack、切換分頁表 (page table) 等動作,本質相同。
由於執行緒的切換過程中,不需更換分頁表,看似會有較低的切換成本,但現代 Linux 在同 NUMA node、TLB cache 表現良好的情況下,行程中的分頁表重新載入的開銷極低,且現代中央處理器於 TLB 切換已有顯著改進,因此我們不易看出行程和執行緒切換的成本差異。
注意,許多教材或討論提及的執行緒存在的優勢,並非來自後者 context switch 成本較低 (幾乎相同),而是來自:
* 可共享 address space → 不需 IPC 可直接存取同一份資料
* 可共享 file descriptor、heap、stack (部分) 等資源
* 更容易建立大量輕量單位(例如 10 萬個 thread vs process)
作答規範:
* IGNORE_THIS1 到 IGNORE_THIS4 和 THIS_ISNT_VALID1 到 THIS_ISNT_VALID3 皆為十進位數值
:::success
延伸題目:
* 以 eBPF 進行上述測量,亦即取得行程和執行緒切換的成本
* 在多個 NUMA 節點的環境,上述程式碼能否精準測量?若無法,有哪些干擾因素、又如何排除?
:::