--- tags: linux2025 --- # [2025q1](https://wiki.csie.ncku.edu.tw/linux/schedule) 第 12 週測驗題 :::info 目的: 檢驗學員對 [CS:APP 第 6 章](https://hackmd.io/@sysprog/CSAPP-ch6)和[並行程式設計](https://hackmd.io/@sysprog/concurrency)的認知 ::: ==[作答表單: 測驗 `1`](https://docs.google.com/forms/d/e/1FAIpQLSdh9It1dzoxI3pC8lNJQmz84USbPxQOO88Aa0oTLOKpJDnASg/viewform?usp=dialog)== ### 測驗 `1` > [Visualization of cache-optimized matrix multiplication](https://x.com/Hesamation/status/1920141361531040152) 考慮以下程式,我們以 32 位元浮點數 (float) 為資料型別,實作一個 64×64 方塊 (tile) 為單位的矩陣乘法 ([GEMM](https://en.wikipedia.org/wiki/Basic_Linear_Algebra_Subprograms#Level_3)): (`unoptimized.c`) ```c #include <stdio.h> #include <stdlib.h> #include <string.h> #include <time.h> #define TILE_SIZE 64 static inline void mm_tile(float *A, float *B, float *C, size_t stride_a, size_t stride_b, size_t stride_c) { for (size_t i = 0; i < TILE_SIZE; i++) { for (size_t j = 0; j < TILE_SIZE; j++) { const size_t ic = i * stride_c + j; for (size_t k = 0; k < TILE_SIZE; k++) { const size_t ia = i * stride_a + k; const size_t ib = k * stride_b + j; C[ic] += A[ia] * B[ib]; } } } } static inline void mm_edge(float *A, float *B, float *C, size_t stride_a, size_t stride_b, size_t stride_c, size_t tile_m, size_t tile_n, size_t tile_p) { for (size_t i = 0; i < tile_m; i++) { for (size_t j = 0; j < tile_p; j++) { const size_t ic = i * stride_c + j; for (size_t k = 0; k < tile_n; k++) { const size_t ia = i * stride_a + k; const size_t ib = k * stride_b + j; C[ic] += A[ia] * B[ib]; } } } } void mm(float *A, float *B, float *C, size_t m, size_t n, size_t p) { const size_t inm = m - m % TILE_SIZE; const size_t inn = n - n % TILE_SIZE; const size_t inp = p - p % TILE_SIZE; memset(C, 0, m * p * sizeof(float)); for (size_t i = 0; i < m; i += TILE_SIZE) { for (size_t j = 0; j < p; j += TILE_SIZE) { const size_t ic = i * p + j; for (size_t k = 0; k < n; k += TILE_SIZE) { const size_t ia = i * n + k; const size_t ib = k * p + j; if (i < inm && j < inp && k < inn) { mm_tile(A + ia, B + ib, C + ic, n, p, p); } else { const size_t tile_m = (i + TILE_SIZE > m) ? (m - i) : TILE_SIZE; const size_t tile_n = (k + TILE_SIZE > n) ? (n - k) : TILE_SIZE; const size_t tile_p = (j + TILE_SIZE > p) ? (p - j) : TILE_SIZE; mm_edge(A + ia, B + ib, C + ic, n, p, p, tile_m, tile_n, tile_p); } } } } } void fill_rand(float *arr, size_t size) { for (size_t i = 0; i < size; i++) arr[i] = (float) (rand() % 10); } void print_mat(const float *mat, size_t m, size_t n) { for (size_t i = 0; i < m; i++) { for (size_t j = 0; j < n; j++) { printf("%.2f", mat[i * n + j]); if (j < n - 1) printf(", "); } printf("\n"); } printf("---\n"); } int parse_int(const char *str) { char *end; long val = strtol(str, &end, 10); if (*end || str == end) { fprintf(stderr, "Invalid integer: '%s'\n", str); exit(1); } return (int) val; } int main(int argc, char *argv[]) { if (argc != 4) { fprintf(stderr, "Usage: %s <m> <n> <p>\n", argv[0]); return 1; } srand(314); size_t m = parse_int(argv[1]); size_t n = parse_int(argv[2]); size_t p = parse_int(argv[3]); float *A = malloc(m * n * sizeof(float)); float *B = malloc(n * p * sizeof(float)); float *C = malloc(m * p * sizeof(float)); fill_rand(A, m * n); fill_rand(B, n * p); #ifdef VALIDATE print_mat(A, m, n); print_mat(B, n, p); #endif struct timespec start, end; clock_gettime(CLOCK_MONOTONIC, &start); mm(A, B, C, m, n, p); clock_gettime(CLOCK_MONOTONIC, &end); #ifndef VALIDATE double elapsed = (end.tv_sec - start.tv_sec) + (end.tv_nsec - start.tv_nsec) / 1e9; printf("%.9f\n", elapsed); #endif #ifdef VALIDATE print_mat(C, m, p); #endif free(A); free(B); free(C); return 0; } ``` 程式解說: * `mm_tile`:專責處理恰為 64×64×64 的完整子矩陣乘法。外層二重迴圈掃描 tile 上的 (i, j),最內層迴圈累加 k 維,計算 `C[i,j] += A[i,k] * B[k,j]`。指標位移以 `stride_*` 參數決定,可對應原矩陣的實際 row 長 * `mm_edge`:當 tile 位於矩陣邊緣、維度不足 64 時呼叫。它與 `mm_tile` 相同,但三個 tile 尺寸 (`tile_m`, `tile_n`, `tile_p`) 在執行期決定,因此迴圈邊界無法在編譯期展開,導致編譯器難以向量化 * `mm`:外層三重 tile 迴圈 (i, j, k) 將整體運算拆分為小方塊;先將 C 清零,再逐塊呼叫 `mm_tile` 或 `mm_edge` 矩陣皆採 [row-major](https://en.wikipedia.org/wiki/Row-_and_column-major_order) 佈局。`mm_tile` 的 A 存取模式 (`ia = i*stride_a + k`) 連續;然而 B 的存取 (`ib = k*stride_b + j`) 固定 j、遞增 k,實際上沿著 column 方向跨 stride\_b (= p) 位移 ── 對 row-major 矩陣而言屬高度跳躍式存取,快取命中率差,也妨礙 SIMD 向量化。 直接把矩陣複製到新緩衝區並以零值實體填充至 64 的倍數,再對 B 做一次轉置,往往比目前的程式碼更快,原因包含: * 可避免對每個 tile 執行大小判斷,消除分支與函式呼叫開銷。雖然這些分支在大量完整 tile 時可被分支預測有效隱藏,但在邊緣區域 (特別是 m、n、p 只要有一邊非 64 的整數倍) 仍需大量 `mm_edge` 呼叫 * `mm_edge` 的迴圈上限為變數,編譯器無法把迴圈攤平成固定 64 步,也難以套用最佳化如完全向量化或內迴圈展開;反之,zero padding 讓所有 tile 尺寸定值,可讓 CPU pipeline、SIMD、prefetcher 維持穩定節奏 * 轉置 B 之後,最內層 `k` 迴圈讀取 B 的位址會變成連續 (stride = 1),大幅降低 TLB 與快取未命中。現行程式在 `k` 方向每步跳 p 個 float,對 64×64 tile 而言一次 k 迴圈就跨越 256 KB (若 p≈1024),遠超 L1D cache 容量 * 在現代處理器上,將少量零值複製到對齊的新緩衝區成本相對極小,且一經填充後可反覆重用,從而降低存取成本 `main.c` 在 `unoptimized.c` 的基礎之上,提出以下關鍵改動: * zero padding 與轉置矩陣 * micro-tile * 工作佇列式 [thread pool](https://hackmd.io/@sysprog/concurrency-thread-pool) * [CPU affinity](https://en.wikipedia.org/wiki/Processor_affinity) 設定 程式初始化階段 `main.c` 先將三個維度都向上補齊到 64 的倍數 (`ALIGN_UP`),再把 A 以 row-major 的方式進行 zero padding,並把 B 轉置後再 zero padding 到新緩衝區。藉由 `aligned_alloc` 以 64-byte 邊界對齊 (`MEM_ALIGNMENT`),隨後所有 tile 都是完整的 64×64,省去原本在內層迴圈檢查邊界的分支。 轉置後的 B 使 `mm_tile` 讀取 B 時沿著連續位址 (`(tj+j)*stride_b + k`) 迭代 k。與 `unoptimized.c` 中每次跳 `p` 個元素的欄存取相比,快取與 TLB 命中顯著改善。A 仍保持 row-major ,因此在 k 迴圈也是一維連續掃描。 新的 `mm_tile` 把 64×64 再切成 8×8 micro-tile (`MICRO_TILE`);內層把 8×8 的局部積累放進 32 個暫存器陣列 `sum`。這種 register blocking 減少寫回頻率,讓編譯器容易產生 SIMD 指令,同時使較小的工作單元適合佈署到執行緒池。 `init_thread_pool` 先建立 `N_CORES` 道執行緒並用 `pthread_setaffinity_np` 綁定至實體處理器核。每個 64×64 輸出 tile 封裝成 `task_t` 後丟進鎖保護的佇列;工作執行緒以 condition variable 等待並取出任務,呼叫 `mm_tile` 完成計算,最後以 `_Atomic` 計數器回報進度。相較於在主執行緒巢狀三重迴圈的單核處理器執行,該佇列模型能在 A、B 重用程度高的情況下讓 `N_CORES` 個處理器核平行運算,且排程成本固定,遠小於每 tile 重新建立執行緒的開銷。 由於已 zero padding,`mm()` 不再需要邊界判斷,也不必呼叫 `mm_edge`。熱區迴圈完全由編譯期常數控制,配合前述微分塊,分支預測失誤幾乎不發生,而 `unoptimized.c` 在 m、n、p 任一邊非 64 的倍數時會大量執行變動上限的迴圈與檢查,導致指令路徑分裂與向量化受阻。 因此,`main.c` 在多核系統上對大型矩陣(特別是 n, m, p ≫ 64)可取得顯著記憶體/指令級提升。 [changes.diff](https://gist.github.com/jserv/278cf8b489a7ba5cd510b02b9f6dac19) 是從 `unoptimized.c` 到 `main.c` 的修改。請補完程式碼,使其運作符合預期。書寫規範: * `AAAA` 包含 `atomic_` 開頭的函式 * `BBBB` 和 `CCCC` 包含 `pthread_` 開頭的函式,注意 conditional variable 的使用 * 均不包含空白字元,也沒有 `;?:` 等字元 以下程式用來驗證 `main.c` 和 `unoptimized.c` (編譯時要指定 `-DVALIDATE`): ```python #!/usr/bin/env python3 import numpy as np import subprocess import sys def run_exec(exe, m, n, p): """Run the executable with dimensions and capture its output.""" cmd = [f"./{exe}", str(m), str(n), str(p)] cp = subprocess.run(cmd, capture_output=True, text=True, check=True) return cp.stdout def parse_matrices(output): """Split the output at '---' and convert each block to a NumPy array.""" blocks = [] curr = [] for line in output.splitlines(): if line.strip() == '---': if curr: blocks.append(curr); curr = [] elif line.strip(): curr.append(line) if len(blocks) < 3: raise RuntimeError(f"Expected 3 matrices, found {len(blocks)} blocks") return [to_array(blocks[i]) for i in range(3)] def to_array(lines): rows = [] for row in lines: vals = [float(x) for x in row.split(',') if x.strip()] rows.append(vals) return np.array(rows, dtype=float) def print_mat(mat, name): """Print a matrix in 2-decimals, comma-separated format.""" r, c = mat.shape print(f"\nMatrix {name} ({r}×{c}):") for i in range(r): row = ", ".join(f"{mat[i,j]:.2f}" for j in range(c)) print(" " + row) def validate(A, B, C, tol=1e-5): D = A.dot(B) bad = np.argwhere(~np.isclose(D, C, atol=tol)) if bad.size == 0: return True, None, None diffs = D - C info = [(i+1, j+1, D[i,j], C[i,j], diffs[i,j]) for i,j in bad] return False, info, D def main(): exe = "main" for size in range(2, 129): print(f"Testing {size}×{size}×{size}...", flush=True) out = run_exec(exe, size, size, size) A, B, C = parse_matrices(out) ok, mismatches, D = validate(A, B, C) if not ok: print(f"\n❌ Mismatch at size {size}:") # dump full matrices print_mat(A, "A") print_mat(B, "B") print_mat(C, "C (from C program)") print_mat(D, "D = A @ B (Python)") # then list the individual mismatches print("\nFirst few differing entries:") for r, c, comp, exp, diff in mismatches[:10]: print(f" row {r}, col {c}: computed={comp}, expected={exp}, diff={diff}") sys.exit(1) else: print(" ✓ OK") print("\n✅ All sizes 2–128 validated successfully.") if __name__ == "__main__": main() ``` 參考執行輸出: ``` ... Testing 126×126×126... ✓ OK Testing 127×127×127... ✓ OK Testing 128×128×128... ✓ OK ✅ All sizes 2–128 validated successfully. ``` :::success 延伸問題: 1. 解釋上述程式碼運作原理 2. 指出上述程式碼效能的缺失並予以改進 3. 提出 lock-free 的解決方案 :::