---
tags: linux2025
---
# [2025q1](https://wiki.csie.ncku.edu.tw/linux/schedule) 第 12 週測驗題
:::info
目的: 檢驗學員對 [CS:APP 第 6 章](https://hackmd.io/@sysprog/CSAPP-ch6)和[並行程式設計](https://hackmd.io/@sysprog/concurrency)的認知
:::
==[作答表單: 測驗 `1`](https://docs.google.com/forms/d/e/1FAIpQLSdh9It1dzoxI3pC8lNJQmz84USbPxQOO88Aa0oTLOKpJDnASg/viewform?usp=dialog)==
### 測驗 `1`
> [Visualization of cache-optimized matrix multiplication](https://x.com/Hesamation/status/1920141361531040152)
考慮以下程式,我們以 32 位元浮點數 (float) 為資料型別,實作一個 64×64 方塊 (tile) 為單位的矩陣乘法 ([GEMM](https://en.wikipedia.org/wiki/Basic_Linear_Algebra_Subprograms#Level_3)): (`unoptimized.c`)
```c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#define TILE_SIZE 64
static inline void mm_tile(float *A,
float *B,
float *C,
size_t stride_a,
size_t stride_b,
size_t stride_c)
{
for (size_t i = 0; i < TILE_SIZE; i++) {
for (size_t j = 0; j < TILE_SIZE; j++) {
const size_t ic = i * stride_c + j;
for (size_t k = 0; k < TILE_SIZE; k++) {
const size_t ia = i * stride_a + k;
const size_t ib = k * stride_b + j;
C[ic] += A[ia] * B[ib];
}
}
}
}
static inline void mm_edge(float *A,
float *B,
float *C,
size_t stride_a,
size_t stride_b,
size_t stride_c,
size_t tile_m,
size_t tile_n,
size_t tile_p)
{
for (size_t i = 0; i < tile_m; i++) {
for (size_t j = 0; j < tile_p; j++) {
const size_t ic = i * stride_c + j;
for (size_t k = 0; k < tile_n; k++) {
const size_t ia = i * stride_a + k;
const size_t ib = k * stride_b + j;
C[ic] += A[ia] * B[ib];
}
}
}
}
void mm(float *A, float *B, float *C, size_t m, size_t n, size_t p)
{
const size_t inm = m - m % TILE_SIZE;
const size_t inn = n - n % TILE_SIZE;
const size_t inp = p - p % TILE_SIZE;
memset(C, 0, m * p * sizeof(float));
for (size_t i = 0; i < m; i += TILE_SIZE) {
for (size_t j = 0; j < p; j += TILE_SIZE) {
const size_t ic = i * p + j;
for (size_t k = 0; k < n; k += TILE_SIZE) {
const size_t ia = i * n + k;
const size_t ib = k * p + j;
if (i < inm && j < inp && k < inn) {
mm_tile(A + ia, B + ib, C + ic, n, p, p);
} else {
const size_t tile_m =
(i + TILE_SIZE > m) ? (m - i) : TILE_SIZE;
const size_t tile_n =
(k + TILE_SIZE > n) ? (n - k) : TILE_SIZE;
const size_t tile_p =
(j + TILE_SIZE > p) ? (p - j) : TILE_SIZE;
mm_edge(A + ia, B + ib, C + ic, n, p, p, tile_m, tile_n,
tile_p);
}
}
}
}
}
void fill_rand(float *arr, size_t size)
{
for (size_t i = 0; i < size; i++)
arr[i] = (float) (rand() % 10);
}
void print_mat(const float *mat, size_t m, size_t n)
{
for (size_t i = 0; i < m; i++) {
for (size_t j = 0; j < n; j++) {
printf("%.2f", mat[i * n + j]);
if (j < n - 1)
printf(", ");
}
printf("\n");
}
printf("---\n");
}
int parse_int(const char *str)
{
char *end;
long val = strtol(str, &end, 10);
if (*end || str == end) {
fprintf(stderr, "Invalid integer: '%s'\n", str);
exit(1);
}
return (int) val;
}
int main(int argc, char *argv[])
{
if (argc != 4) {
fprintf(stderr, "Usage: %s <m> <n> <p>\n", argv[0]);
return 1;
}
srand(314);
size_t m = parse_int(argv[1]);
size_t n = parse_int(argv[2]);
size_t p = parse_int(argv[3]);
float *A = malloc(m * n * sizeof(float));
float *B = malloc(n * p * sizeof(float));
float *C = malloc(m * p * sizeof(float));
fill_rand(A, m * n);
fill_rand(B, n * p);
#ifdef VALIDATE
print_mat(A, m, n);
print_mat(B, n, p);
#endif
struct timespec start, end;
clock_gettime(CLOCK_MONOTONIC, &start);
mm(A, B, C, m, n, p);
clock_gettime(CLOCK_MONOTONIC, &end);
#ifndef VALIDATE
double elapsed =
(end.tv_sec - start.tv_sec) + (end.tv_nsec - start.tv_nsec) / 1e9;
printf("%.9f\n", elapsed);
#endif
#ifdef VALIDATE
print_mat(C, m, p);
#endif
free(A);
free(B);
free(C);
return 0;
}
```
程式解說:
* `mm_tile`:專責處理恰為 64×64×64 的完整子矩陣乘法。外層二重迴圈掃描 tile 上的 (i, j),最內層迴圈累加 k 維,計算 `C[i,j] += A[i,k] * B[k,j]`。指標位移以 `stride_*` 參數決定,可對應原矩陣的實際 row 長
* `mm_edge`:當 tile 位於矩陣邊緣、維度不足 64 時呼叫。它與 `mm_tile` 相同,但三個 tile 尺寸 (`tile_m`, `tile_n`, `tile_p`) 在執行期決定,因此迴圈邊界無法在編譯期展開,導致編譯器難以向量化
* `mm`:外層三重 tile 迴圈 (i, j, k) 將整體運算拆分為小方塊;先將 C 清零,再逐塊呼叫 `mm_tile` 或 `mm_edge`
矩陣皆採 [row-major](https://en.wikipedia.org/wiki/Row-_and_column-major_order) 佈局。`mm_tile` 的 A 存取模式 (`ia = i*stride_a + k`) 連續;然而 B 的存取 (`ib = k*stride_b + j`) 固定 j、遞增 k,實際上沿著 column 方向跨 stride\_b (= p) 位移 ── 對 row-major 矩陣而言屬高度跳躍式存取,快取命中率差,也妨礙 SIMD 向量化。
直接把矩陣複製到新緩衝區並以零值實體填充至 64 的倍數,再對 B 做一次轉置,往往比目前的程式碼更快,原因包含:
* 可避免對每個 tile 執行大小判斷,消除分支與函式呼叫開銷。雖然這些分支在大量完整 tile 時可被分支預測有效隱藏,但在邊緣區域 (特別是 m、n、p 只要有一邊非 64 的整數倍) 仍需大量 `mm_edge` 呼叫
* `mm_edge` 的迴圈上限為變數,編譯器無法把迴圈攤平成固定 64 步,也難以套用最佳化如完全向量化或內迴圈展開;反之,zero padding 讓所有 tile 尺寸定值,可讓 CPU pipeline、SIMD、prefetcher 維持穩定節奏
* 轉置 B 之後,最內層 `k` 迴圈讀取 B 的位址會變成連續 (stride = 1),大幅降低 TLB 與快取未命中。現行程式在 `k` 方向每步跳 p 個 float,對 64×64 tile 而言一次 k 迴圈就跨越 256 KB (若 p≈1024),遠超 L1D cache 容量
* 在現代處理器上,將少量零值複製到對齊的新緩衝區成本相對極小,且一經填充後可反覆重用,從而降低存取成本
`main.c` 在 `unoptimized.c` 的基礎之上,提出以下關鍵改動:
* zero padding 與轉置矩陣
* micro-tile
* 工作佇列式 [thread pool](https://hackmd.io/@sysprog/concurrency-thread-pool)
* [CPU affinity](https://en.wikipedia.org/wiki/Processor_affinity) 設定
程式初始化階段
`main.c` 先將三個維度都向上補齊到 64 的倍數 (`ALIGN_UP`),再把 A 以 row-major 的方式進行 zero padding,並把 B 轉置後再 zero padding 到新緩衝區。藉由 `aligned_alloc` 以 64-byte 邊界對齊 (`MEM_ALIGNMENT`),隨後所有 tile 都是完整的 64×64,省去原本在內層迴圈檢查邊界的分支。
轉置後的 B 使 `mm_tile` 讀取 B 時沿著連續位址 (`(tj+j)*stride_b + k`) 迭代 k。與 `unoptimized.c` 中每次跳 `p` 個元素的欄存取相比,快取與 TLB 命中顯著改善。A 仍保持 row-major ,因此在 k 迴圈也是一維連續掃描。
新的 `mm_tile` 把 64×64 再切成 8×8 micro-tile (`MICRO_TILE`);內層把 8×8 的局部積累放進 32 個暫存器陣列 `sum`。這種 register blocking 減少寫回頻率,讓編譯器容易產生 SIMD 指令,同時使較小的工作單元適合佈署到執行緒池。
`init_thread_pool` 先建立 `N_CORES` 道執行緒並用 `pthread_setaffinity_np` 綁定至實體處理器核。每個 64×64 輸出 tile 封裝成 `task_t` 後丟進鎖保護的佇列;工作執行緒以 condition variable 等待並取出任務,呼叫 `mm_tile` 完成計算,最後以 `_Atomic` 計數器回報進度。相較於在主執行緒巢狀三重迴圈的單核處理器執行,該佇列模型能在 A、B 重用程度高的情況下讓 `N_CORES` 個處理器核平行運算,且排程成本固定,遠小於每 tile 重新建立執行緒的開銷。
由於已 zero padding,`mm()` 不再需要邊界判斷,也不必呼叫 `mm_edge`。熱區迴圈完全由編譯期常數控制,配合前述微分塊,分支預測失誤幾乎不發生,而 `unoptimized.c` 在 m、n、p 任一邊非 64 的倍數時會大量執行變動上限的迴圈與檢查,導致指令路徑分裂與向量化受阻。
因此,`main.c` 在多核系統上對大型矩陣(特別是 n, m, p ≫ 64)可取得顯著記憶體/指令級提升。
[changes.diff](https://gist.github.com/jserv/278cf8b489a7ba5cd510b02b9f6dac19) 是從 `unoptimized.c` 到 `main.c` 的修改。請補完程式碼,使其運作符合預期。書寫規範:
* `AAAA` 包含 `atomic_` 開頭的函式
* `BBBB` 和 `CCCC` 包含 `pthread_` 開頭的函式,注意 conditional variable 的使用
* 均不包含空白字元,也沒有 `;?:` 等字元
以下程式用來驗證 `main.c` 和 `unoptimized.c` (編譯時要指定 `-DVALIDATE`):
```python
#!/usr/bin/env python3
import numpy as np
import subprocess
import sys
def run_exec(exe, m, n, p):
"""Run the executable with dimensions and capture its output."""
cmd = [f"./{exe}", str(m), str(n), str(p)]
cp = subprocess.run(cmd, capture_output=True, text=True, check=True)
return cp.stdout
def parse_matrices(output):
"""Split the output at '---' and convert each block to a NumPy array."""
blocks = []
curr = []
for line in output.splitlines():
if line.strip() == '---':
if curr:
blocks.append(curr); curr = []
elif line.strip():
curr.append(line)
if len(blocks) < 3:
raise RuntimeError(f"Expected 3 matrices, found {len(blocks)} blocks")
return [to_array(blocks[i]) for i in range(3)]
def to_array(lines):
rows = []
for row in lines:
vals = [float(x) for x in row.split(',') if x.strip()]
rows.append(vals)
return np.array(rows, dtype=float)
def print_mat(mat, name):
"""Print a matrix in 2-decimals, comma-separated format."""
r, c = mat.shape
print(f"\nMatrix {name} ({r}×{c}):")
for i in range(r):
row = ", ".join(f"{mat[i,j]:.2f}" for j in range(c))
print(" " + row)
def validate(A, B, C, tol=1e-5):
D = A.dot(B)
bad = np.argwhere(~np.isclose(D, C, atol=tol))
if bad.size == 0:
return True, None, None
diffs = D - C
info = [(i+1, j+1, D[i,j], C[i,j], diffs[i,j]) for i,j in bad]
return False, info, D
def main():
exe = "main"
for size in range(2, 129):
print(f"Testing {size}×{size}×{size}...", flush=True)
out = run_exec(exe, size, size, size)
A, B, C = parse_matrices(out)
ok, mismatches, D = validate(A, B, C)
if not ok:
print(f"\n❌ Mismatch at size {size}:")
# dump full matrices
print_mat(A, "A")
print_mat(B, "B")
print_mat(C, "C (from C program)")
print_mat(D, "D = A @ B (Python)")
# then list the individual mismatches
print("\nFirst few differing entries:")
for r, c, comp, exp, diff in mismatches[:10]:
print(f" row {r}, col {c}: computed={comp}, expected={exp}, diff={diff}")
sys.exit(1)
else:
print(" ✓ OK")
print("\n✅ All sizes 2–128 validated successfully.")
if __name__ == "__main__":
main()
```
參考執行輸出:
```
...
Testing 126×126×126...
✓ OK
Testing 127×127×127...
✓ OK
Testing 128×128×128...
✓ OK
✅ All sizes 2–128 validated successfully.
```
:::success
延伸問題:
1. 解釋上述程式碼運作原理
2. 指出上述程式碼效能的缺失並予以改進
3. 提出 lock-free 的解決方案
:::