Try   HackMD

2025q1 第 10 週測驗題

目的: 檢驗學員對 I/O 模型CPU 排程器和〈Concurrency Primer〉的認知

作答表單: 測驗 1, 2
作答表單: 測驗 3

測驗 1

考慮 strtrim 函式的作用是從給定的 C 字串中移除前導與尾端的空白字元,並回傳修剪 (trim) 後的子字串起始位址。它會直接修改原始字串內容(in-place),將尾端的空白字元以 \0 結束字元截斷,並將指標移到第一個非空白字元,供後續使用。以下字元會被當作空白處理:

字元 意義 十六進位
' ' 空白鍵 0x20
'\t' tab 鍵 0x09
'\n' 換行字元 (LF) 0x0A
'\r' 斷行符號 (CR) 0x0D
'\v' 垂直定位字元 0x0B
'\f' 換頁字元 0x0C

在 Windows 系統中,每行結尾使用 CRLF 作為斷行符號;而在 Linux 則僅使用 LF 字元。因此,相較於 Linux,Windows 中的每行會多出一個 CR 字元。CR 原本的意思是 Carriage Return,對應鍵盤上的 Enter 鍵,LF 意思是 Line Feed,表示實際的換行動作。macOS 目前也與 Linux 一樣,使用 LF 作為行尾字元。

應用情境

  • 處理使用者輸入:避免輸入欄前後的空格影響後續操作 (如登入、搜尋)
  • 解析設定檔:行尾或行首常有空白,需先清除
  • 處理網路協定標頭或資料行:HTTP headers、CSV 欄位等

我們利用第 3 週測驗題提及的 SWAR 技巧,來實作 strtrim 函式 (只處理 ASCII,部分程式碼遮蔽)

/* ASCII whitespace check */
static inline bool is_ascii_space(uint8_t c)
{
    return c == ' ' || (c - '\t' <= '\r' - '\t');
}

/* Count leading ASCII space characters in a 64-bit word */
static inline int count_leading_spaces(uint64_t word)
{
    for (int i = 0; i < AAAA; i++) {
        uint8_t c = (word >> (BBBB)) & 0xFF;
        if (!is_ascii_space(c))
            return i;
    }
    return 8;
}

static char *strtrim(char *str)
{
    uint8_t *s = (uint8_t *) str;

    /* Fast skip leading whitespace using SWAR if 8-byte aligned */
    for (; ((uintptr_t) s & CCCC) == 0; s += 8) {
        uint64_t w = *(uint64_t *) s;
        int n = count_leading_spaces(w);
        if (n < 8) {
            s += n;
            break;
        }
    }

    while (*s && is_ascii_space(*s))
        s++;

    if (!*s)
        return (char *) s;

    uint8_t *end = s + strlen((char *) s);
    while (end > s && is_ascii_space(*(DDDD)))
        end--;

    *end = '\0';
    return (char *) s;
}

考慮 client.c 是我們打造的網路聊天客戶端程式碼,對應的巨集定義如下:

#define SERVER_ADDR "127.0.0.1"
#define SERVER_PORT 7788

chat.c 則是網路聊天的伺服器端程式碼,定義如下 (忽略標頭檔):

/* Server configuration constants */
#define BACKLOG 128      /* Maximum pending connections */
#define MAX_EVENTS 64    /* Maximum epoll events per wait */
#define MAX_CLIENTS 1024 /* Maximum concurrent clients */
#define NICK_MAXLEN 32   /* Maximum nickname length */
    
enum { CHAT_OK = 0, CHAT_ERR = -1 };
        
/* store client information */
typedef struct {
    int fd;
    char nick[NICK_MAXLEN];
} client_t;

/* manage server state */
typedef struct {
    int fd;
    client_t *clients[MAX_CLIENTS];
} server_t;

主函式和事件主迴圈如下: (部分程式碼遮蔽)

/* Allocates memory with error handling */
static inline void *my_alloc(size_t size)
{
    void *ptr = malloc(size);
    if (!ptr) {
        perror("malloc");
        exit(EXIT_FAILURE);
    }
    return ptr;
}

int main(void)
{
    /* Initialize chat server */
    server_t server = {.fd = 0, .clients = {NULL}};
    if (chat_listen(&server, SERVER_ADDR, SERVER_PORT) != CHAT_OK) {
        fprintf(stderr, "Failed to listen on %s:%d\n", SERVER_ADDR,
                SERVER_PORT);
        return CHAT_ERR;
    }

    /* Create epoll instance */
    int epollfd = epoll_create1(0);
    if (epollfd < 0) {
        perror("epoll_create1");
        return CHAT_ERR;
    }

    /* Add server socket to epoll */
    struct epoll_event ev = {.events = EPOLLIN, .data.fd = server.fd};
    if (FFFF < 0) {
        return CHAT_ERR;
    }

    char buf[256];
    /* Main event loop */
    for (;;) {
        /* Epoll event array for handling socket events */
        struct epoll_event events[MAX_EVENTS];
        int nfds = GGGG;
        if (nfds < 0) {
            return CHAT_ERR;
        }

        /* Process each epoll event */
        for (int i = 0; i < nfds; i++) {
            int fd = events[i].data.fd;
            if (fd == server.fd) {
                /* Handle new client connection */
                int client_fd = chat_accept(&server);
                if (client_fd < 0)
                    continue;

                /* Initialize new client */
                client_t *client = my_alloc(sizeof(client_t));
                client->fd = client_fd;
                snprintf(client->nick, NICK_MAXLEN, "anon:%d", client_fd);
                server.clients[client_fd] = client;

                /* Send welcome message */
                snprintf(
                    buf, sizeof(buf),
                    "Server\r\nWelcome %s! Use /nick to set a nickname\r\n",
                    client->nick);
                if (write(client_fd, buf, strlen(buf)) < 0)
                    perror("write welcome");

                /* Broadcast join message */
                snprintf(buf, sizeof(buf), "%s joined\n", client->nick);
                broadcast_message(&server, buf, client_fd, 1);

                /* Add client to epoll */
                ev.events = EPOLLIN | EPOLLET;
                ev.data.fd = client_fd;
                if (HHHH < 0) {
                    return CHAT_ERR;
                }
            } else {
                /* Handle client data */
                ssize_t nread = read(fd, buf, sizeof(buf) - 1);
                if (nread <= 0) {
                    handle_disconnect(&server, epollfd, fd, buf);
                } else {
                    handle_client_message(&server, epollfd, fd, buf, nread);
                }
            }
        }
    }
    return 0;
}

對應的輔助函式如下:

/* Sets a file descriptor to non-blocking mode
 * @fd - File descriptor
 * Returns: CHAT_OK on success, CHAT_ERR on failure
 */
static int set_nonblocking(int fd)
{
    int flags = fcntl(fd, F_GETFL, 0);
    if (flags < 0 || fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) {
        perror("fcntl");
        return CHAT_ERR;
    }
    return CHAT_OK;
}

/* Initializes server socket for listening
 * @server - Server structure
 * @host - Host address
 * @port - Port number
 * Returns: CHAT_OK on success, CHAT_ERR on failure
 */
static int chat_listen(server_t *server, const char *host, int port)
{
    struct addrinfo hints = {
        .ai_family = AF_INET,
        .ai_socktype = SOCK_STREAM,
        .ai_flags = AI_PASSIVE,
    };
    char port_str[6];
    snprintf(port_str, sizeof(port_str), "%d", port);

    struct addrinfo *res;
    if (getaddrinfo(host, port_str, &hints, &res)) {
        perror("getaddrinfo");
        return CHAT_ERR;
    }

    int fd = -1;
    for (struct addrinfo *rp = res; rp; rp = rp->ai_next) {
        fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
        if (fd < 0)
            continue;
        if (0 > setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &(int){1},
                           sizeof(int)) ||
            bind(fd, rp->ai_addr, rp->ai_addrlen) < 0 ||
            listen(fd, BACKLOG) < 0) {
            close(fd);
            continue;
        }
        break;
    }
    freeaddrinfo(res);

    if (fd < 0) {
        perror("bind/listen");
        return CHAT_ERR;
    }

    server->fd = fd;
    return set_nonblocking(fd);
}

/* Accepts a new client connection
 * Returns: Client file descriptor on success, CHAT_ERR on failure
 */
static int chat_accept(server_t *server)
{
    int fd = accept(server->fd, NULL, NULL);
    if (fd < 0) {
        if (errno != EAGAIN && errno != EWOULDBLOCK)
            perror("accept");
        return CHAT_ERR;
    }
    return set_nonblocking(fd) == CHAT_OK ? fd : CHAT_ERR;
}

/* Sends a message to a single client
 * @client - Client to send to
 * @msg - Message to send
 * @len - Message length
 */
static void send_to_client(client_t *client, const char *msg, int len)
{
    if (write(client->fd, msg, len) < 0)
        perror("write");
}

/* Broadcasts a message to all clients except the sender
 * @server - Server structure
 * @buf - Message content
 * @sender_fd - Sender's file descriptor
 * @server_info - Flag for server-generated messages
 */
static void broadcast_message(server_t *server,
                              const char *buf,
                              int sender_fd,
                              int server_info)
{
    char msg[256];
    int len = server_info ? snprintf(msg, sizeof(msg), "Server\r\n%s", buf)
                          : snprintf(msg, sizeof(msg), "%s\r\n%s",
                                     server->clients[sender_fd]->nick, buf);

    /* Iterate through client array to send message */
    for (int i = 0; i < MAX_CLIENTS; i++) {
        client_t *client = server->clients[i];
        if (client && i != sender_fd)
            send_to_client(client, msg, len);
    }
}

static void handle_disconnect(server_t *server, int epollfd, int fd, char *buf)
{
    client_t *client = server->clients[fd];
    snprintf(buf, 256, "%s left\n", client->nick);
    broadcast_message(server, buf, fd, 1);
    EEEE;
    close(fd);
    free(client);
    server->clients[fd] = NULL;
}

static void handle_client_message(server_t *server,
                                  int epollfd,
                                  int fd,
                                  char *buf,
                                  ssize_t nread)
{
    buf[nread] = '\0';
    client_t *client = server->clients[fd];

    if (!strncmp(buf, "/quit", 5) || !strncmp(buf, "/exit", 5)) {
        handle_disconnect(server, epollfd, fd, buf);
    } else if (!strncmp(buf, "/nick", 5)) {
        const char *nick = strtrim(buf + 5);
        strncpy(client->nick, nick, NICK_MAXLEN - 1);
        client->nick[NICK_MAXLEN - 1] = '\0';
    } else {
        broadcast_message(server, buf, fd, 0);
    }
}

補完程式碼,使其運作符合預期。作答規範:

  • AAAACCCC 為十進位數值
  • BBBBDDDD 為合法表示式,不得出現乘法和除法運算子
  • EEEE, FFFF, GGGG, HHHH 皆為以 epoll_ 開頭的函式呼叫
  • EEEE, FFFF, HHHH 包含 EPOLL_ 開頭的巨集
  • GGGG 包含 MAX_EVENTS
  • 使用一致的程式碼縮排風格,以最精簡的方式撰寫,不包含表示式前後的空白字元,皆不得出現 ;?; 字元

延伸問題:

  • 解釋上方程式碼運作原理
  • 藉由在客戶端引入對話模擬 (如 ELIZA),量化上方程式碼,並提出針對其並行處理能力的改進

測驗 2

延續第 6 週測驗題的測驗 3。在處理大規模資料集時,常見的需求是快速判斷某個元素是否存在於一個龐大的集合中,這就是成員資格查詢 (membership query)。傳統的資料結構面臨挑戰:

  • 雜湊表 (Hash Table): 雖然平均查詢時間為
    O(1)
    ,但需要儲存元素本身或其標識,空間開銷可能非常大
  • 串列/陣列 (List/Array): 無序串列查詢效率低 (
    O(n)
    );有序結構(如平衡樹、排序陣列)查詢效率尚可 (
    O(logn)
    ),但仍需儲存元素,且插入和維護成本較高

當應用場景滿足以下條件時,Bloom Filter 提供截然不同的解決方案:

  1. 可以容忍一定的誤報 (False Positives):即查詢結果為「可能存在」,但元素實際上不在集合中
  2. 絕不允許漏報 (No False Negatives):即查詢結果為「絕對不存在」時,元素一定不在集合中
  3. 對空間效率和查詢速度有極高要求

Bloom Filter 正是藉由犧牲絕對的準確性(允許誤報),換取極佳的空間和時間效率。

Bloom Filter 由以下二部分組成:

  1. 位元陣列 (Bit Array): 一個長度為
    m
    的位元陣列,所有位元初始值均為 0
  2. k 個雜湊函數:
    k
    個獨立的雜湊函數
    h1,h2,,hk
    。每個函數將輸入的鍵 (key) 均勻地映射到
    [0,m1]
    的範圍內。理想情況下,這些雜湊函數的結果應相互獨立

關鍵操作:

  • 添加元素 (Add / Insertion):
    1. 對於要添加的元素
      x
      ,使用
      k
      個雜湊函數分別計算其雜湊值:
      h1(x),h2(x),,hk(x)
      。這些雜湊值對應位元陣列中的
      k
      個索引位置。
    2. 將位元陣列中這
      k
      個索引位置的位元設定為 1。 (注意:即使某個位元原本已經是 1,也保持為 1)。
  • 查詢元素 (Query / Check):
    1. 對於要查詢的元素
      y
      ,同樣使用
      k
      個雜湊函數計算其雜湊值:
      h1(y),h2(y),,hk(y)
    2. 檢查位元陣列中這
      k
      個索引位置的位元:
      • 如果所有位元都為 1: 則判斷元素
        y
        可能存在 (Possibly Present) 於集合中。這是可能產生誤報的情況。
      • 如果至少有一個位元為 0: 則判斷元素
        y
        絕對不存在 (Definitely Not Present) 於集合中。這是 Bloom Filter 不會有漏報的保證。

誤報率 (False Positive Probability) 是理解 Bloom Filter 性能和進行參數設計的關鍵。我們的目標是計算當查詢一個從未被添加過的元素時,被誤判為存在的機率,即誤報率 (

Pfp)。

假設 (理想情況):

  1. 均勻性 (Uniformity): 每個雜湊函數都將任意鍵均勻映射到位元陣列的
    [0,m1]
    範圍內,即映射到任何一個特定位置的機率是
    1m
  2. 獨立性 (Independence):
    k
    個雜湊函數的輸出結果是相互獨立的

推導步驟:

  1. 單個位元保持為 0 的機率 (單次雜湊):
    對於位元陣列中的某一個特定位元,當插入一個鍵時,被某個特定的雜湊函數
    hi
    選中的機率是
    1m
    。因此,這個位元不被
    hi
    選中的機率是
    (11m)
  2. 單個位元保持為 0 的機率 (k 次雜湊, 單個鍵):
    由於有
    k
    個獨立的雜湊函數,這個特定位元在插入單個鍵(經過
    k
    次雜湊)後仍然保持為 0 的機率是:
    P(bit is 0 after 1 key)=(11m)k
  3. 單個位元保持為 0 的機率 (插入 N 個鍵):
    假設我們獨立地插入
    N
    個不同的鍵。對於某一個特定位元,它在插入
    N
    個鍵後仍然保持為 0 的機率是(每次插入的
    k
    次雜湊都不改變此位元):
    P0=(11m)kn
  4. 使用近似簡化:
    當位元陣列長度
    m
    相對於
    kN
    很大時,
    1/m
    是一個很小的值。根據極限
    limx(11/x)x=e1
    ,我們可以得到近似:
    (11m)e1/m

    這個近似在
    m
    越大時越精確。
  5. 近似單個位元保持為 0 的機率:
    將上述近似代入第 3 步的公式:
    P0(e1/m)kN=ekN/m
  6. 單個位元被設為 1 的機率:
    在插入
    N
    個鍵後,某個特定位元被設為 1 的機率是:
    P1=1P01ekN/m
  7. 誤報率 (
    Pfp
    ) 的計算:
    考慮一個從未被插入集合的元素
    y
    。查詢
    y
    時發生誤報,意味著
    y
    對應的
    k
    個雜湊位置
    h1(y),,hk(y)
    上的位元恰好都為 1。
    為了簡化計算,我們做一個關鍵的(近似)假設:對於一個不在集合中的元素
    y
    ,其
    k
    個雜湊位置上的位元狀態是相互獨立的。
    在該獨立性假設下,誤報率
    Pfp
    (也常用
    f
    表示)約等於單個位元為 1 的機率的
    k
    次方:
    f=(P1)k(1ekN/m)k

參數最佳化:尋找最優的 k

給定預計插入的元素數量

N 和位元陣列大小
m
,我們希望選擇一個最佳的雜湊函數個數
k
,使得誤報率
f(k)=(1ekN/m)k
最小。

藉由對

ln(f(k)) 求導並令導數為 0,可以解得使誤報率最小的最優雜湊函數個數
kopt
滿足:
koptNm=ln2

即:

kopt=mNln2

由於

k 必須是整數,實際應用中通常取
k=round(mNln2)

反向計算:給定 N 和期望誤報率 f,計算 m 和 k

  1. 計算 k: 當
    k
    取最優值時,誤報率
    f(1/2)k
    。因此:
    koptlog2(1/f)=log2(f)
  2. 計算 m: 從
    kopt=mNln2
    得到
    m=Nkoptln2
    。代入
    kopt

    mN(log2(f))ln2=Nln(f)(ln2)2

整理以上可之:

  • 所需位元數
    mNln(1/f)(ln2)21.44Nlog2(1/f)
  • 雜湊函數個數
    k=round(mNln2)round(log2(1/f))

Bloom filter 的優點:

  • 空間效率極高: 只需
    m
    個位元,無需儲存元素本身
  • 時間效率高: 插入和查詢操作時間複雜度均為 O(k),與集合元素總數
    N
    無關
  • 無漏報 (No False Negatives): 查詢結果為「不存在」時絕對可靠。
  • 可合併性: 兩個參數 (
    m,k
    ) 相同的 Bloom Filter 可藉由按位或 (Bitwise OR) 操作合併,代表兩個集合的並集

Bloom filter 的缺點:

  • 存在誤報 (False Positives): 查詢結果為「可能存在」時可能不準確。誤報率
    f
    m,N,k
    影響
  • 無法安全刪除元素 (標準 Bloom Filter): 直接將位元從 1 改為 0 可能影響其他元素,導致漏報。(可用 Counting Bloom Filter 改進,但增加空間開銷)
  • 需要預估元素數量 N: 為獲得理想誤報率,需提前估計
    N
    以合理設定
    m
    k
    。若實際
    N
    遠超預期,誤報率會急劇上升。(可用 Scalable Bloom Filter 改進)

理論上 Bloom Filter 需要

k 個獨立且均勻的雜湊函數,在實際硬中,產生
k
個高品質且完全獨立的雜湊函數成本較高。常見的解決方案有:

  • 方法一:使用單一高品質雜湊函數配合不同種子/參數

Linux 核心的 eBPF Bloom Filter 實作採用此方法。它使用核心的 jhash 函數,並為產生第

i 個雜湊值時提供不同的種子 (bloom->hash_seed + i)。

/* Linux Kernel BPF Bloom Filter hash generation (示意) */
static u32 hash(struct bpf_bloom_filter *bloom, const void *value,
        u32 value_size, u32 i)
{
    u32 h;
    // 使用 jhash 函數,種子 = filter的基礎種子 + 雜湊函數索引 i
    h = jhash(value, value_size, bloom->hash_seed + i);
    // 將 32 位雜湊值 h 映射到位元陣列索引 (使用掩碼)
    return h & bloom->bitset_mask;
}
  • 方法二:使用單一雜湊函數進行雙重雜湊或衍生 (如 FNV-1a 範例)

以下是用 FNV-1a 雜湊函數和簡化版雙重雜湊策略的具體實作範例:

  • 參數設定:

    • 使用
      k=4
      個雜湊函式產生 4 個位元索引。
    • 位元陣列長度設為
      m=227=134,217,728
  • 雜湊函數 (64 位元 FNV-1a 變體):

    ​​​​#define hash(key, keylen) ({ \
    ​​​​    uint64_t h = 14695981039346656037ULL; /* FNV offset basis */ \
    ​​​​    const char *data = (const char *)key; \
    ​​​​    for (size_t i = 0; i < keylen; i++) { \
    ​​​​        h = (h ^ data[i]) * 1099511628211ULL; /* FNV prime */ \
    ​​​​    } \
    ​​​​    h; \
    ​​​​})
    
  • 索引產生方式 (簡化版雙重雜湊): 首先計算一次基礎雜湊值 hbase = hash(key, keylen),然後產生

    k 個索引:

    ​​​​// hbase 是 64 位元的 FNV-1a 雜湊結果
    ​​​​// filter->size 應為 m (即 2^27)
    ​​​​// 產生第 i 個索引 (i 從 0 到 k-1)
    ​​​​indices[i] = (hbase + i * (hbase >> 32)) & (filter->size - 1);
    

    這裡利用 64 位雜湊值的高 32 位作為步長因子,與基礎雜湊值組合後藉由位與操作(因為

    m 是 2 的冪)映射到索引範圍。

  • 理論誤判率計算: 假設此 Bloom Filter 插入

    n=1,000,000 個元素:
    knm=4×1,000,000227=4,000,000134,217,7280.0298

    根據誤判率公式
    f(1ekn/m)k

    f(1e0.0298)k=(10.9706)4=(0.0294)4

    f7.4×107

    → 理論誤判率約為
    7.4×105%
    或 0.000074%。

FNV-1a 與索引衍生的實務考量與限制:

  1. 雜湊偏態: FNV-1a 雜湊函數雖然計算速度快,但對於具有相似字首的字串(例如 "item-001", "item-002")容易產生相似的雜湊值或碰撞,其輸出分佈的均勻性可能不如 MurmurHash 或 xxHash 等現代雜湊函數,這會影響位元陣列中位元設定的均勻性。
  2. 索引衍生獨立性不足: 這種藉由單一基礎雜湊值 hbase 推導產生
    k
    個索引的方法(包括簡化版雙重雜湊),其產生的 indices[0], indices[1], ..., indices[k-1] 之間存在較強的相關性,並非真正獨立。這違反了理論推導中「位元設定為獨立事件」的關鍵假設,可能導致實際的誤判率高於理論預估值。

數學推導中使用的「雜湊函數獨立」和「位元狀態獨立」的假設在現實中通常是近似滿足的。

  • 使用同一個雜湊算法配合不同種子或藉由衍生方式產生的
    k
    個值並非嚴格獨立
  • 位元陣列中不同位元的狀態也非完全獨立

然而:

  1. 實用性: 只要基礎雜湊函數具有良好的均勻分佈性與雪崩效應(輸入微小變化導致輸出大幅隨機變化),那麼即使產生
    k
    個值的方法存在一定的依賴性,基於獨立性假設推導出的誤判率公式
    f(1ekN/m)k
    仍然是一個非常有用的性能估計與參數設計指南。
  2. 雜湊品質是關鍵: 無論如何產生
    k
    個值,基礎雜湊函數的品質(均勻性、低碰撞率)對於 Bloom Filter 的實際性能至關重要。選擇高品質的雜湊函數是改進 Bloom Filter 性能的重要一步。
  3. 權衡: 這體現了 Bloom Filter 設計中的權衡:追求理論上的完美獨立性可能代價過高,而實用的近似方法在效率和效果之間取得了平衡。認識到理論與實踐的差距,並了解所用方法的局限性很重要。

Linux 核心在 eBPF (extended Berkeley Packet Filter) 子系統中,藉由 BPF_MAP_TYPE_BLOOM_FILTER 類型的 eBPF map,為核心空間和使用者空間的 eBPF 程序提供了使用 Bloom Filter 的能力。

參數計算與實作 (kernel/bpf/bloom_filter.c)

核心程式碼 bloom_filter_map_alloc 負責創建 Bloom Filter map。關鍵步驟是根據使用者提供的 max_entries (

N) 和 nr_hash_funcs (
k
) 計算所需的位元陣列大小
m
(nr_bits):

static struct bpf_map *bloom_filter_map_alloc(union bpf_attr *attr)
{
    // ... (獲取 N = attr->max_entries 和 k = attr->nr_hash_funcs) ...
	u64 nr_bits; // 對應 m
    // ...

    /* 根據 N 和 k 計算最佳 m (nr_bits) */
    /* 使用公式: m = N * k / ln(2),近似 1/ln(2) ≈ 7/5 */
	if (check_mul_overflow(attr->max_entries, nr_hash_funcs, &nr_bits) ||
	    /* 計算 nr_bits ≈ (N * k / 5) * 7 */
	    check_mul_overflow(nr_bits / 5, (u32)7, &nr_bits) ||
	    nr_bits > (1UL << 31)) {
		/* 溢出或過大,限制 m 最大為 U32_MAX */
		// ... (設定 bitset_bytes, bitset_mask)
	} else {
	    /* 確保最小尺寸,並向上取整到 2 的冪次方 */
		if (nr_bits <= BITS_PER_LONG)
			nr_bits = BITS_PER_LONG;
		else
			nr_bits = roundup_pow_of_two(nr_bits);
		// ... (設定 bitset_bytes, bitset_mask)
	}
    // ... (分配記憶體) ...
    // ... (初始化隨機種子 bloom->hash_seed) ...
}

程式碼分析:

  1. 計算目標: 根據使用者輸入的
    N
    k
    ,計算出一個能使誤報率接近最小的位元陣列大小
    m
  2. 公式應用與近似: 核心程式碼直接用
    m=Nkln2
    的關係,並使用
    1ln275
    進行定點近似。
  3. 邊界處理與效率改進:處理計算溢出,確保最小尺寸,並將
    m
    取整到 2 的冪次方,以使用高效的位與操作進行索引映射。
  4. 隨機種子: 每個 Bloom Filter 實例使用隨機的 hash_seed,配合 jhash 產生
    k
    個不同的雜湊值。

本次測驗中,採用 FNV-1a 的 64 位元變種。

Bloom filter 適合高效能並行場景的考量是:

  1. 新增與查詢操作都是 lock-free
  2. 不會移除元素

然而,這些操作在多核處理器或多執行緒環境下,就會遇到幾個重要的問題:

  1. Data Race 風險:假設二個執行緒同時對 filter->data[index] 做 bitwise-OR 操作或查詢,若未使用 atomics,則會出現下列典型的競爭情形:
    ​​​​filter->data[index] |= bit;
    
    這可能會導致其中一個數值被覆蓋,產生 bit flip 丟失問題,進而導致查詢誤判機率上升,甚至出現 false negative(理論上不該出現)。
  2. Cache coherence / False Sharing: 由於多執行緒可能同時寫入鄰近的 uint64_t 區塊,若 data[] 未妥善對齊,則可能導致 CPU 快取行為效率低下。若二個執行緒操作相鄰但不同的 data[i],這些可能落在同一 cache line,導致 false sharing;每次更新位元,會造成整個 cache line 的 write invalidation,增加資料匯流排負擔與記憶體同步延遲

延伸閱讀: 並行程式設計: Atomics 操作

需要留意的是,儘管使用 atomic RMW,但不足以完全解決 contention:若多執行緒集中寫入相同區段 (高 collision key),仍會造成 atomic cache ping-pong,於是需要改進雜湊函數的數值分布。另外,雖然使用 _Alignas(64) 已有助於對齊,但若結構為 struct { _Atomic uint64_t data[n]; },仍需考慮整體記憶體佈局 (memory layout) 和頁面分佈。

考慮以下 concurrent bloom filter,其標頭檔: (bloom.h)

#pragma once

#include <stddef.h>

/** Opaque Bloom filter structure */
typedef struct bloom_internal bloom_t;

/** Function type for allocating memory */
typedef void *(*bloom_alloc_t)(size_t);

/** Function type for deallocating memory */
typedef void (*bloom_dealloc_t)(void *, size_t);

/*------------------------------*/
/* Construction and destruction */
/*------------------------------*/

/**
 * @brief Create a Bloom filter with default size.
 * @param alloc Optional allocator. If NULL, a default allocator is used.
 * @return Pointer to a new Bloom filter or NULL on failure.
 */
bloom_t *bloom_new(bloom_alloc_t alloc);

/**
 * @brief Create a Bloom filter with specified size in bits.
 * @param size Number of bits. Must be a power of two.
 * @param alloc Optional allocator. If NULL, a default allocator is used.
 * @return Pointer to a new Bloom filter or NULL on failure.
 */
bloom_t *bloom_new_with_size(size_t size, bloom_alloc_t alloc);

/**
 * @brief Destroy a Bloom filter and free resources.
 * @param filter Pointer to a pointer to a Bloom filter. Will be set to NULL.
 * @param dealloc Optional deallocator. If NULL, a default deallocator is used.
 */
void bloom_destroy(bloom_t **filter, bloom_dealloc_t dealloc);

/*------------------*/
/* Filter operations */
/*------------------*/

/**
 * @brief Clear all bits in the Bloom filter.
 * @param filter Bloom filter instance.
 */
void bloom_clear(bloom_t *filter);

/**
 * @brief Add a key to the Bloom filter.
 * @param filter Bloom filter instance.
 * @param key Pointer to the key.
 * @param keylen Length of the key in bytes.
 */
void bloom_add(bloom_t *filter, const void *key, size_t keylen);

/**
 * @brief Test whether a key may be present in the Bloom filter.
 * @param filter Bloom filter instance.
 * @param key Pointer to the key.
 * @param keylen Length of the key in bytes.
 * @return 1 if possibly present, 0 if definitely not present.
 */
int bloom_test(bloom_t *filter, const void *key, size_t keylen);

/*--------------------------*/
/* Default memory allocators */
/*--------------------------*/

/**
 * @brief Default allocator using malloc.
 * @param size Number of bytes to allocate.
 * @return Pointer to allocated memory.
 */
void *bloom_alloc(size_t size);

/**
 * @brief Default deallocator using free.
 * @param ptr Pointer to memory to free.
 * @param size Number of bytes to free (ignored by default).
 */
void bloom_free(void *ptr, size_t size);

效能評比程式可見 bench.c

對應的 C 程式:

#include <stdatomic.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>

#include "bloom.h"

struct bloom_internal {
    size_t size;            /* Number of bits in the filter */
    _Atomic uint64_t *data; /* Word-aligned, atomic bit array */
};

/* Create a new Bloom filter with default size (2^27 bits) */
bloom_t *bloom_new(bloom_alloc_t allocator)
{
    return bloom_new_with_size(1ULL << 27, allocator);
}

/* Create a new Bloom filter with specified size in bits */
bloom_t *bloom_new_with_size(size_t size, bloom_alloc_t allocator)
{
    /* Round up to nearest 64-bit word */
    size_t words = (size + 63) / 64;
    size_t bytes = words * sizeof(uint64_t);
    bloom_t *filter = allocator(sizeof(bloom_t) + bytes);
    if (!filter)
        return NULL;
    filter->size = size;
    filter->data = (_Atomic uint64_t *) ((char *) filter + sizeof(bloom_t));
    /* Initialize to zero */
    for (size_t i = 0; i < words; i++) {
        atomic_store(&filter->data[i], 0);
    }
    return filter;
}

/* Destroy the Bloom filter and set pointer to NULL */
void bloom_destroy(bloom_t **filter, bloom_dealloc_t deallocator)
{
    if (filter && *filter) {
        deallocator(*filter, sizeof(bloom_t) + (((*filter)->size + 63) / 64) *
                                                   sizeof(uint64_t));
        *filter = NULL;
    }
}

/* Clear all bits in the filter (requires external synchronization) */
void bloom_clear(bloom_t *filter)
{
    size_t words = (filter->size + 63) / 64;
    for (size_t i = 0; i < words; i++) {
        atomic_store(&filter->data[i], 0);
    }
}

/* Set a bit at the given key position */
static void set(bloom_t *filter, size_t key)
{
    size_t index = key / 64;
    uint64_t bit = 1ULL << (key % 64);
    IIII;
}

/* Test if a bit is set at the given key position */
static int get(bloom_t *filter, size_t key)
{
    size_t index = key / 64;
    uint64_t bit = 1ULL << (key % 64);
    return (JJJJ & bit) != 0;
}

/* Add a key to the Bloom filter */
void bloom_add(bloom_t *filter, const void *key, size_t keylen)
{
    uint64_t hbase = hash(key, keylen);
    /* Use double hashing for 4 indices */
    for (int i = 0; i < 4; i++) {
        size_t h = (hbase + i * (hbase >> 32)) &
                   (filter->size - 1); /* Assumes size is power of 2 */
        set(filter, h);
    }
}

/* Test if a key is likely in the Bloom filter */
int bloom_test(bloom_t *filter, const void *key, size_t keylen)
{
    uint64_t hbase = hash(key, keylen);
    for (int i = 0; i < 4; i++) {
        size_t h = (hbase + i * (hbase >> 32)) & (filter->size - 1);
        if (!get(filter, h))
            return 0;
    }
    return 1;
}

/* Default allocator using calloc for zeroed memory */
void *bloom_alloc(size_t size)
{
    return calloc(1, size);
}

/* Default deallocator */
void bloom_free(void *ptr, size_t size)
{
    (void) size;
    free(ptr);
}

補完程式碼,使其運作符合預期。作答規範:

  • IIIIJJJJatomic_ 開頭的函式呼叫
  • 使用一致的程式碼縮排風格,以最精簡的方式撰寫,不包含表示式前後的空白字元,皆不得出現 ;?; 字元

測驗 3

我們想精確測量 GNU/Linux 上行程和執行緒 (NPTL) 的 context switch(上下文切換)成本,其基本原理是,利用 pipe 系統呼叫雙向通訊機制造成二個執行單位 (process 或 thread) 之間交替等待與喚醒,並量測整體來回的時間,進而估算一次 context switch 的平均耗時。

延伸閱讀: Arm-Linux

測量 context switch 前,我們準備以下程式碼:

#define _GNU_SOURCE
#include <math.h>
#include <poll.h>
#include <pthread.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/wait.h>

#define N_ITERATIONS 1000
#define N_REPETITIONS 10                     
#define PIPE_TIMEOUT_MS 1000

typedef struct {
    double avg, min, max, stddev;
} stat_t;

typedef struct {
    int iterations, pipe_read_fd, pipe_write_fd, cpu_id, priority;
    pthread_barrier_t *barrier;
} thread_args_t;

int set_affinity(int cpu_id)
{
    cpu_set_t cpuset;
    CPU_ZERO(&cpuset);
    CPU_SET(cpu_id, &cpuset);
    return sched_setaffinity(0, sizeof(cpuset), &cpuset);
}

int set_realtime_priority(int priority)
{
    struct sched_param param = {.sched_priority = priority};
    return sched_setscheduler(0, SCHED_FIFO, &param);
}

bool check_root()
{
    return geteuid() == 0;
}

void compute_stats(const double *times, int n, stat_t *stats)
{
    double sum = 0.0, sum_sq = 0.0;
    stats->min = stats->max = times[0];
    for (int i = 0; i < n; i++) {
        sum += times[i];
        sum_sq += times[i] * times[i];
        if (times[i] < stats->min)
            stats->min = times[i];
        if (times[i] > stats->max)
            stats->max = times[i];
    }
    stats->avg = sum / n;
    stats->stddev = sqrt((sum_sq / n) - stats->avg * stats->avg);
}

以下程式會建立二個 pipe

  • pipe1: A 寫入、B 載入
  • pipe2: B 寫入、A 載入

這構成 A ↔ B 之間一來一回的同步通訊機制,無論是行程抑或執行緒。

彼此等待與喚醒造成 context switch

  • A 寫入 pipe1 → B 等待 pipe1 資料 → 喚醒並載入
  • B 寫入 pipe2 → A 等待 pipe2 資料 → 喚醒並載入
  • 如此重複若干次

每次來回交替,會造成二次 context switch (即 A → B 和 B → A)。

對照第 3 週測驗題的測驗三

針對行程的測試流程

  • 親代行程 (A) 綁定 CPU0,設定 SCHED_FIFO 優先權 95
  • 子行程 (B) 綁定 CPU1,設定 SCHED_FIFO 優先權 94
  • 親代行程記錄時間,在每次來回後讀出 pipe2 的資料
  • 結束後計算「平均每次 context switch = 來回時間 / (2 * iterations)」

對應的原始程式碼 (部分遮蔽):

double measure_context_switch(int iterations,
                              int pipe1[2],
                              int pipe2[2],
                              pid_t pid)
{
    char token = 'x';
    struct timespec start, end;
    if (pid == 0) {
        set_affinity(THIS_ISNT_VALID1);
        set_realtime_priority(THIS_ISNT_VALID2);
        close(pipe1[1]);
        close(pipe2[0]);
        for (int i = 0; i < iterations; i++) {
            char buf;
            poll((struct pollfd[]){{pipe1[0], POLLIN}}, THIS_ISNT_VALID3, PIPE_TIMEOUT_MS);
            if (read(pipe1[0], &buf, 1) != 1)
                exit(EXIT_FAILURE);
            if (write(pipe2[1], &token, 1) != 1)
                exit(EXIT_FAILURE);
        }
        close(pipe1[0]);
        close(pipe2[1]);
        exit(EXIT_SUCCESS);
    }
    set_affinity(IGNORE_THIS1);
    set_realtime_priority(IGNORE_THIS2);
    close(pipe1[0]);
    close(pipe2[1]);
    clock_gettime(CLOCK_MONOTONIC, &start);
    for (int i = 0; i < iterations; i++) {
        if (write(pipe1[1], &token, 1) != 1)
            exit(EXIT_FAILURE);
        char buf;
        poll((struct pollfd[]){{pipe2[0], POLLIN}}, IGNORE_THIS3, PIPE_TIMEOUT_MS);
        if (read(pipe2[0], &buf, 1) != 1)
            exit(EXIT_FAILURE);
    }
    clock_gettime(CLOCK_MONOTONIC, &end);
    close(pipe1[1]);
    close(pipe2[0]);
    wait(NULL);
    return ((end.tv_sec - start.tv_sec) * 1e6 +
            (end.tv_nsec - start.tv_nsec) / 1e3) /
           (2.0 * iterations);
}

關鍵操作:

  • 使用 poll() 等待對方寫入資料,確保有 blocking,進而觸發 context switch
  • 使用 SCHED_FIFO 保證完全不被 NORMAL/OTHER 等非即時行程干擾
  • CPU affinity 保證切換不受處理器核遷移 (CPU migration) 影響

執行緒測試流程:

  • 主執行緒與副執行緒分別固定在 CPU0 / CPU1,使用 pthread_barrier 同步起跑
  • 也是透過二條 pipe 傳遞訊號與資料,一來一回代表二次 context switch

對應的原始程式碼 (部分遮蔽):

void *thread_func(void *arg)
{
    thread_args_t *args = arg;
    char token = 'x';
    set_affinity(args->cpu_id);
    set_realtime_priority(args->priority);
    if (args->cpu_id == 0)
        if (write(args->pipe_write_fd, &token, 1) != 1)
            exit(EXIT_FAILURE);
    pthread_barrier_wait(args->barrier);
    for (int i = 0; i < args->iterations; i++) {
        char buf;
        poll((struct pollfd[]){{args->pipe_read_fd, POLLIN}}, IGNORE_THIS4,
             PIPE_TIMEOUT_MS);
        if (read(args->pipe_read_fd, &buf, 1) != 1)
            exit(EXIT_FAILURE);
        if (write(args->pipe_write_fd, &token, 1) != 1)
            exit(EXIT_FAILURE);
    }
    return NULL;
}

double measure_thread_switch(int iterations)
{
    int pipe1[2], pipe2[2];
    pthread_t t1, t2;
    pthread_barrier_t barrier;
    thread_args_t a1 = {iterations, 0, 0, 0, 95, &barrier},
                  a2 = {iterations, 0, 0, 1, 94, &barrier};
    if (pipe(pipe1) == -1 || pipe(pipe2) == -1)
        exit(EXIT_FAILURE);
    pthread_barrier_init(&barrier, NULL, 2);
    a1.pipe_read_fd = pipe1[0], a1.pipe_write_fd = pipe2[1];
    a2.pipe_read_fd = pipe2[0], a2.pipe_write_fd = pipe1[1];
    struct timespec start, end;
    clock_gettime(CLOCK_MONOTONIC, &start);
    pthread_create(&t1, NULL, thread_func, &a1);
    pthread_create(&t2, NULL, thread_func, &a2);
    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
    clock_gettime(CLOCK_MONOTONIC, &end);
    close(pipe1[0]);
    close(pipe1[1]);
    close(pipe2[0]);
    close(pipe2[1]);
    pthread_barrier_destroy(&barrier);
    return ((end.tv_sec - start.tv_sec) * 1e6 +
            (end.tv_nsec - start.tv_nsec) / 1e3) /
           (2.0 * iterations);
}

實作手法整理:

設計要素 原因
pipe + poll() 強制 thread/process block,保證產生 context switch,而非 busy-wait 或同步競爭
SCHED_FIFO 避免被其他一般行程干擾,確保測量只涵蓋必要切換成本
CPU affinity 排除 CPU 排程器中遷移導致的 TLB flush、熱區 (hot region) 切換等雜訊
fork()pthread_create() 分別製造行程/執行緒的互動情境,便於比較差異

主程式:

int main(int argc, char *argv[])
{
    int iters = N_ITERATIONS;
    int reps = N_REPETITIONS;

    if (!check_root())
        fprintf(stderr, "Not running as root: RT scheduling may fail\n");

    double *results = malloc(reps * sizeof(double));
    stat_t stats;

    for (int i = 0; i < reps; i++) {
        int pipe1[2], pipe2[2];
        if (pipe(pipe1) == -1 || pipe(pipe2) == -1)
            exit(EXIT_FAILURE);
        pid_t pid = fork();
        if (pid == -1)
            exit(EXIT_FAILURE);
        results[i] = measure_context_switch(iters, pipe1, pipe2, pid);
    }
    compute_stats(results, reps, &stats);
    printf("Process context switch (%d iters x %d reps):\n", iters, reps);
    printf("  Avg: %.3f us, Min: %.3f us, Max: %.3f us, Stddev: %.3f us\n",
           stats.avg, stats.min, stats.max, stats.stddev);

    for (int i = 0; i < reps; i++) {
        results[i] = measure_thread_switch(iters);
    }
    compute_stats(results, reps, &stats);
    printf("Thread context switch (%d iters x %d reps):\n", iters, reps);
    printf("  Avg: %.3f us, Min: %.3f us, Max: %.3f us, Stddev: %.3f us\n",
           stats.avg, stats.min, stats.max, stats.stddev);
    free(results);
     return 0;

本程式檔名為 ctx.c,編譯方式:

$ gcc -Wall -O2 -o ctx ctx.c -lm -lpthread

執行時務必要用超級使用者權限:

$ sudo ./ctx

在 AMD Ryzen Threadripper 2990WX 32-Core (64) @ 3.00 GHz 測試,參考輸出:

Process context switch (1000 iters x 10 reps):
  Avg: 3.903 us, Min: 3.790 us, Max: 4.479 us, Stddev: 0.196 us
Thread context switch (1000 iters x 10 reps):
  Avg: 3.957 us, Min: 3.932 us, Max: 3.991 us, Stddev: 0.017 us

行程和執行緒的 context switch 切換成本為何如此接近?二者相差非常小,幾乎在誤差範圍內。

在 Linux 中,無論是 process 抑或 thread,其實都是 task_struct,亦即 Linux 核心用以管理排程的基本單元。差異僅在於:

  • process:有獨立的 mm_struct,即自身的虛擬記憶體空間
  • thread:與其他執行緒共享 mm_struct,也就是與其他執行緒共享同一定址空間 (address space)

從核心的 CPU 排程器 (如 CFS 或 EEVDF) 角度來看,切換的是 task_struct 實體,而非高階語意上的 "process vs thread" (把你書架上的恐龍書移開,不要弄髒書架)。這說明何以二者的 context switch 成本在 Linux 上幾乎一樣。

學不好作業系統,不能怪你

無論是透過 fork() 建立的子行程還是透過 pthread_create() 建立的執行緒,Linux 都以相同的結構(task_struct)管理它們,且使用相同的排程機制。因此,context switch 的基本開銷:儲存/載入暫存器、切換 kernel stack、切換分頁表 (page table) 等動作,本質相同。

由於執行緒的切換過程中,不需更換分頁表,看似會有較低的切換成本,但現代 Linux 在同 NUMA node、TLB cache 表現良好的情況下,行程中的分頁表重新載入的開銷極低,且現代中央處理器於 TLB 切換已有顯著改進,因此我們不易看出行程和執行緒切換的成本差異。

注意,許多教材或討論提及的執行緒存在的優勢,並非來自後者 context switch 成本較低 (幾乎相同),而是來自:

  • 可共享 address space → 不需 IPC 可直接存取同一份資料
  • 可共享 file descriptor、heap、stack (部分) 等資源
  • 更容易建立大量輕量單位(例如 10 萬個 thread vs process)

作答規範:

  • IGNORE_THIS1 到 IGNORE_THIS4 和 THIS_ISNT_VALID1 到 THIS_ISNT_VALID3 皆為十進位數值

延伸題目:

  • 以 eBPF 進行上述測量,亦即取得行程和執行緒切換的成本
  • 在多個 NUMA 節點的環境,上述程式碼能否精準測量?若無法,有哪些干擾因素、又如何排除?