# SPSC FIFO QUEUE 問題 > 程式碼: [SPSC](https://gist.github.com/Andrewtangtang/9c7a2d6aafbcc6d7bd0db6bd4fc3d032) ## 會不會遇到 ABA 問題 先釐清一下什麼是 ABA 問題,ABA 問題是在並行程式當中,假設有 Thread A 以及 Thread B 都會對於相同的共享變數 `v` 作修改,假設 Thread A 事先檢查並獲取了 `v` 的舊值 `v1` ,而在檢查到真正更新變數 `v` 的這個區間 Thread B 對於共享變數修改為 `v2` 後又改為 `v1`,那麼 Thread A 在對於變數更新使用 CAS 更新變數仍然會正常執行因為 CAS 的比較結果(預期值`v1` 目前 `v` 的數值也為 `v1` )只會針對變數的數值作比較不會檢測到中間曾經有狀態的改變,這樣就會產生 ABA 問題,因為數值相同並不一定保證當前變數的狀態仍然是沒有被更動的狀態,舉例來說鏈結串列的節點的鏈結者被動,並不會修改當前節點指標的數值。 ### SPSC FIFO QUEUE 會不會遇到 ABA 問題 我認為這段程式碼不會遇到 ABA 問題。對於 producer 和 consumer 而言,雙方共享的狀態包含三個變數:`read_count`、`write_count` 以及 `spsc_fifo`(也就是實際的 ring buffer)。其中,consumer 在讀取資料前,會先透過 `atomic_load` 搭配 `memory_order_acquire` 讀取 `write_count`,確保讀取的是 producer 最新寫入資料後所更新的值。這個 acquire 動作也保證,在此之後對 `fifo_buffer` 的讀取操作,不會被重新排序到這個讀取之前,從而避免讀到未完成寫入的資料。 在完成資料複製後,consumer 會透過 `atomic_store` 搭配 `memory_order_release` 將更新後的 `read_count` 寫回。這個 release 操作確保,在這次 store 之前所做的所有讀取(例如從 buffer 中的 memcpy)都已經完成,並對 producer 可見,讓同步正確建立。 :::warning 注意 memory order,本例採 acquire/release ::: ```c spsc_usize spsc_fifo_read(struct spsc_fifo *fifo, spsc_byte *dest, spsc_usize max_bytes) { spsc_usize write_count = atomic_load_explicit(&fifo->write_count, memory_order_acquire); spsc_usize read_count = atomic_load_explicit(&fifo->read_count, memory_order_relaxed); spsc_usize available = write_count - read_count; if (max_bytes > available) { max_bytes = available; } if (max_bytes == 0) { return 0; } spsc_usize read_index = read_count & fifo->mask; spsc_usize first_chunk = min_usize(max_bytes, fifo->capacity - read_index); memcpy(dest, fifo->buffer + read_index, first_chunk); if (first_chunk < max_bytes) { memcpy(dest + first_chunk, fifo->buffer, max_bytes - first_chunk); } atomic_store_explicit(&fifo->read_count, read_count + max_bytes, memory_order_release); return max_bytes; } ``` 對於 producer 而言,在計算還有多少空間可以寫入時,因為 `read_count` 是由 consumer 所更新,因此必須先透過 `atomic_load` 搭配 `memory_order_acquire` 讀取 `read_count`,以確保讀取到的是 consumer 已經釋放過的最新值。這樣才能正確計算出 ring buffer 中剩餘的可寫空間。接著,在完成對 `fifo_buffer` 的資料寫入後,會使用 `atomic_store` 並搭配 `memory_order_release` 更新 `write_count`,以確保所有的資料寫入操作都已在這個 store 執行之前完成,從而讓 consumer 能夠安全地讀取這些資料。 ```c spsc_usize spsc_fifo_write(struct spsc_fifo *fifo, const spsc_byte *src, spsc_usize bytes) { /* Load counters with appropriate memory ordering */ spsc_usize read_count = atomic_load_explicit(&fifo->read_count, memory_order_acquire); spsc_usize write_count = atomic_load_explicit(&fifo->write_count, memory_order_relaxed); /* Calculate available space */ spsc_usize available = fifo->capacity - (write_count - read_count); if (bytes > available) { bytes = available; } if (bytes == 0) { return 0; } /* Calculate ring buffer index and handle wrapping */ spsc_usize write_index = write_count & fifo->mask; spsc_usize first_chunk = min_usize(bytes, fifo->capacity - write_index); /* Copy data (potentially in two chunks due to ring buffer wrapping) */ memcpy(fifo->buffer + write_index, src, first_chunk); if (first_chunk < bytes) { memcpy(fifo->buffer, src + first_chunk, bytes - first_chunk); } /* Update write counter with release ordering to signal completion */ atomic_store_explicit(&fifo->write_count, write_count + bytes, memory_order_release); return bytes; } ``` 此外,由於 `read_count` 和 `write_count` 在整個讀寫過程中都是以單調遞增的方式紀錄資料位置,且各自僅由一個執行緒負責修改(`read_count` 由 consumer 修改,`write_count` 由 producer 修改),因此不會出現 ABA 問題(值雖相同但狀態已改變的情況)。至於 `ring_buffer` 本身可能遇到的問題是:當 consumer 計算出 `read_index` 並確定要讀取的資料範圍後,如果 producer 在此期間寫入了超過 buffer 容量大小的資料,可能會覆蓋掉 consumer 尚未完成讀取的區段,導致讀到非預期的內容。然而,這段程式碼已設計了防範機制,在 producer 嘗試寫入資料前,會先計算剩餘空間,若空間不足則不會進行寫入操作,因此可有效避免覆寫尚未被讀取的資料: ```c /* Calculate available space */ spsc_usize available = fifo->capacity - (write_count - read_count); if (bytes > available) { bytes = available; } if (bytes == 0) { return 0; } ``` ## 程式碼改進空間 :::warning 注意看 [concurrent-programs](https://github.com/sysprog21/concurrent-programs) ::: ### 目前程式碼的設計理解 在改善前,先理解當前程式碼的設計理念。 #### 使用 Cache Line Padding 避免 False Sharing 目前的程式碼嘗試透過 cache line padding 的方式,避免 producer 與 consumer 在同一條 cache line 上操作不同變數所導致的 false sharing 現象。具體來說,`write_count`(由 producer 操作)與 `read_count`(由 consumer 操作)之間加入了 padding,希望讓它們位於不同的 cache line 上,避免彼此更新時觸發不必要的快取同步。然而,目前的排版如下: ```c struct spsc_fifo { /* Queue metadata */ spsc_usize capacity; spsc_usize mask; spsc_byte *buffer; /* Producer's write counter */ _Atomic(spsc_usize) write_count; char pad1[CACHE_LINE_SIZE - sizeof(_Atomic(spsc_usize))]; /* Consumer's read counter */ _Atomic(spsc_usize) read_count; char pad2[CACHE_LINE_SIZE - sizeof(_Atomic(spsc_usize))]; }; ``` 這樣的 padding 設計雖然能避免 `write_count` 和 `read_count` 的衝突,但實際上 `write_count` 仍可能與上方的 `Queue metadata` 落在同一條 cache line 上。這表示,當 producer 更新 `write_count` 時,有可能導致 consumer 存取 metadata(如計算 index 所需的 `mask` 或 `capacity`)時遇到 cache invalidation。 我認為由於 metadata 在執行過程中通常不會被頻繁修改,因此可以將 metadata 本身也單獨置於一條 cache line 中,完全與 `write_count` 和 `read_count` 隔離,最大化 cache locality 。 以下是改進後的排版設計: ```c struct spsc_fifo { /* Queue metadata - occupy first cache line */ spsc_usize capacity; /* 8 bytes */ spsc_usize mask; /* 8 bytes */ spsc_byte *buffer; /* 8 bytes */ /* pad out to exactly 64 bytes */ char meta_pad[ CACHE_LINE_SIZE - ((sizeof(spsc_usize) + sizeof(spsc_usize) + sizeof(spsc_byte *)) % CACHE_LINE_SIZE) ]; /* Producer's write counter - on its own cache line */ _Atomic(spsc_usize) write_count; char pad1[CACHE_LINE_SIZE - sizeof(_Atomic(spsc_usize))]; /* Consumer's read counter - on its own cache line */ _Atomic(spsc_usize) read_count; char pad2[CACHE_LINE_SIZE - sizeof(_Atomic(spsc_usize))]; }; ``` 透過這種方式可以確保了 `write_count` 和 `read_count` 不會互相干擾,也避免了 metadata 的存取被意外影響到。 ## 用 io_uring 來解釋 (變形的) spsc 運作 在 io\_uring 裡,kernel 與使用者之間不再透過傳統的系統呼叫(如 `read()` / `write()`)來進行資料複製,而是透過一塊雙方都可以直接存取的共享記憶體(shared memory)來溝通。這塊共享記憶體是透過 `mmap()` 建立的,讓 user-space 可以直接填寫 I/O 請求(Submission Queue, SQ),而 kernel 也可以直接將結果寫回(Completion Queue, CQ)。 ### 1. 映射 shared memory([`memmap.c`](https://elixir.bootlin.com/linux/v6.16-rc4/source/io_uring/memmap.c) ) 當 user 呼叫 `mmap()` 時,kernel 會進入 `io_uring_mmap()` 函式,根據 `pgoff`(page offset)參數判斷要映射哪個區塊: * `IORING_OFF_SQ_RING`:Submission ring,包含 `head`、`tail`、`ring_mask`、`ring_entries` 等 metadata,還有一個 array 用來記錄 SQE 的索引順序。 * `IORING_OFF_CQ_RING`:Completion ring,用來同步完成事件(CQE)的讀寫。 * `IORING_OFF_SQES`:這是 SQE 陣列本體,user 就是直接在這個區段填入每個操作的參數(如 opcode、fd、addr 等)。 在 kernel 端,這些區塊的實體記憶體頁會先透過 `alloc_pages()` 配出來(如果是 kernel 提供的區塊),或透過 `pin_user_pages_fast()` 將使用者傳進來的緩衝區固定,以保證這段記憶體在執行期間不會被移動或回收。接著,這些實體頁會透過 `vm_insert_pages()` 一頁一頁插入到使用者的 VMA(virtual memory area)中。這樣一來,user 就可以像操作連續記憶體空間一樣,直接對映射進來的記憶體讀寫。 以下是一個 user 映射 SQ ring 的範例: ```c void *sq_ring = mmap(NULL, sq_ring_size, PROT_READ | PROT_WRITE, MAP_SHARED, ring_fd, IORING_OFF_SQ_RING); ``` 這段程式碼代表:將 `ring_fd` 所代表的 io\_uring 資源中,Submission Ring 區塊映射到自己的虛擬記憶體空間中,並設為可讀可寫、與 kernel 共享。 * `NULL`:請 kernel 選一個合適的虛擬地址。 * `sq_ring_size`:映射的記憶體大小。 * `PROT_READ | PROT_WRITE`:允許這塊記憶體被讀寫。 * `MAP_SHARED`:共享映射,修改會同步到 kernel 端。 * `ring_fd`:由 `io_uring_setup()` 回傳的 file descriptor。 * `IORING_OFF_SQ_RING`:指定要映射哪個區塊,這裡是 SQ ring。 完成這段後,`sq_ring` 就是一個指向共享記憶體的指標,可以直接讀寫裡面的 `tail`、`head`、array 等欄位,不需要透過系統呼叫與 kernel 通訊。 ### 2. SQ 請求的過程 當使用者要提交一筆新的 I/O 請求時,必須遵守 io\_uring 設計中的記憶體同步規則,確保資料對 kernel 是可見且有序的。根據原始碼註解 ``` * A note on the read/write ordering memory barriers that are matched between the application and kernel side. * * Likewise, the application must use an appropriate smp_wmb() before * writing the SQ tail (ordering SQ entry stores with the tail store), * which pairs with smp_load_acquire in io_get_sqring (smp_store_release * to store the tail will do). And it needs a barrier ordering the SQ * head load before writing new SQ entries (smp_load_acquire to read * head will do). * * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after* * updating the SQ tail; a full memory barrier smp_mb() is needed * between. * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens * from data shared between the kernel and application. This is done both * for ordering purposes, but also to ensure that once a value is loaded from * data that the application could potentially modify, it remains stable. ``` 使用者應先透過 `smp_load_acquire()` 讀取 `sq.head`,以確認 kernel 已經處理到哪一筆請求,確保自己接下來填寫的 SQE 不會覆蓋尚未處理的資料。接著,使用者會在 SQE 陣列(由 `mmap()` 映射進來的共享記憶體)中填入新的請求內容。填寫完成後,需執行 `smp_wmb()`,以保證所有欄位的寫入都在更新 `tail` 之前完成,避免 kernel 提前讀到尚未初始化完畢的請求。接下來,使用 `smp_store_release()` 將更新後的 `sq.tail` 寫入共享記憶體,這個動作代表使用者正式提交了一筆新的請求,讓 kernel 開始觀察到它的存在。最後,使用者在填寫完 SQE、更新完 `tail` 之後,會透過 `io_uring_enter()` 系統呼叫顯式通知 kernel 有新的請求需要處理(若啟用 `SQPOLL`,則由 kernel 自行輪詢,不需要這步)。 #### io_submit_sqes() 系統呼叫 `io_uring_enter()` 接下來會進入函式 `io_submit_sqes()`。這個函式會先呼叫 `io_sqring_entries()` 來判斷目前還有多少筆尚未被 kernel 處理的請求。 ```c static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx) { struct io_rings *rings = ctx->rings; unsigned int entries; /* make sure SQ entry isn't read before tail */ entries = smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head; return min(entries, ctx->sq_entries); } ``` 這裡的 `smp_load_acquire(&rings->sq.tail)` 會讀取使用者寫入的 `tail` 值,並搭配 acquire barrier,確保 tail 前面填寫的 SQE 內容對 kernel 是可見。再搭配 `ctx->cached_sq_head`(表示 kernel 目前已經處理到哪),就能計算出待處理的 SQE 數量。 接著,根據前面計算出的待處理數量,初始化內部的提交狀態(`io_submit_state_start()`),並依序從 SQ ring 中逐筆讀取請求,將其消費(consume)並提交到內部的排程隊列中: ```c unsigned int entries = io_sqring_entries(ctx); unsigned int left; int ret; if (unlikely(!entries)) return 0; /* make sure SQ entry isn't read before tail */ ret = left = min(nr, entries); io_get_task_refs(left); io_submit_state_start(&ctx->submit_state, left); do { const struct io_uring_sqe *sqe; struct io_kiocb *req; if (unlikely(!io_alloc_req(ctx, &req))) break; if (unlikely(!io_get_sqe(ctx, &sqe))) { io_req_add_to_cache(req, ctx); break; } if (unlikely(io_submit_sqe(ctx, req, sqe)) && !(ctx->flags & IORING_SETUP_SUBMIT_ALL)) { left--; break; } } while (--left); ``` 在每次迴圈中,kernel 會執行以下步驟: 1. 配置一筆新的 `io_kiocb` 結構,作為 kernel 內部對應的請求; 2. 透過 `io_get_sqe()` 從共享記憶體中擷取一筆 SQE(這些 SQE 是由 user-space 透過 `mmap()` 提前填寫好的) 3. 呼叫 `io_submit_sqe()` 將 SQE 解析並提交至內部 I/O 處理流程中。 若其中一筆提交失敗,且未設置 `IORING_SETUP_SUBMIT_ALL`,則 kernel 會提前中止這批提交流程;否則就會持續處理剩餘的請求。 當這一批 SQE 處理完後,系統會進行收尾階段: ```c io_submit_state_end(ctx); // flush 尚未提交的請求 io_commit_sqring(ctx); // 將 cached_sq_head 回寫至 user-space 的 sq.head ``` ```c static void io_commit_sqring(struct io_ring_ctx *ctx) { struct io_rings *rings = ctx->rings; /* * Ensure any loads from the SQEs are done at this point, * since once we write the new head, the application could * write new data to them. */ smp_store_release(&rings->sq.head, ctx->cached_sq_head); } ``` 此處的 `ctx->cached_sq_head` 是 kernel 端自行維護的 SQ head 快取,用來追蹤當前已處理到哪筆 SQE。為了效能考量,這個值並不會在每次處理請求後立即同步至共享記憶體,而是等到整批處理完成後,再透過 `smp_store_release()` 將更新後的值寫回 user-space 可見的 `sq.head`,以確保記憶體操作順序的正確性。這樣就完整結束一次 batched SQ 提交的過程。 #### 為何不保證 lock-free 雖然從架構設計上來看,io\_uring 的 SQ(Submission Queue)和 CQ(Completion Queue)確實是以 SPSC(Single Producer Single Consumer)的 lock-free 模型設計 —— user 負責寫 SQ,kernel 負責讀 SQ,彼此操作不同欄位,再透過 memory barrier 確保順序。 但實際上,真正要交換的資料,不只是 SQ/CQ 上的幾個欄位而已,而是牽涉到很多需要共用的狀態,這些都維護在 `io_ring_ctx` 這個核心結構裡。 舉例來說,當使用者透過 `io_uring_enter()` 提交請求時,kernel 需要做的事可能包含: * 分配一筆新的 `io_kiocb`; * 從 SQE 陣列中抓出對應的請求內容; * 更新 `cached_sq_head`,也就是目前 kernel 處理到的位置; * 還可能會處理像 linked request、延遲執行的任務等比較複雜的流程。 這些操作都不是簡單的變數更新,會同時動到多個欄位,而且彼此之間有順序與一致性的要求。如果有兩個執行緒同時呼叫 `io_uring_enter()`,沒有使用鎖的話,很容易出現 race condition,造成資料錯亂或狀態不一致。為了避免這種情況,kernel 在進入 `io_submit_sqes()` 之前,會先透過 `mutex_lock()` 把整個 `ctx` 鎖起來,確保一次只有一個執行緒能進入這段邏輯。也就是下面這段: ```c else if (to_submit) { ret = io_uring_add_tctx_node(ctx); if (unlikely(ret)) goto out; mutex_lock(&ctx->uring_lock); ret = io_submit_sqes(ctx, to_submit); if (ret != to_submit) { mutex_unlock(&ctx->uring_lock); goto out; } if (flags & IORING_ENTER_GETEVENTS) { if (ctx->syscall_iopoll) goto iopoll_locked; if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) (void)io_run_local_work_locked(ctx, min_complete); } mutex_unlock(&ctx->uring_lock); } ``` 除了提交請求的這段,像是在 GETEVENTS 或是 SQPOLL 模式下,kernel 也一樣會使用這 `uring_lock`,來避免 user 和 kernel 端的 polling 執行緒同時操作同一塊結構時發生衝突。