作答表單: 測驗 1
測驗 1
Visualization of cache-optimized matrix multiplication
考慮以下程式,我們以 32 位元浮點數 (float) 為資料型別,實作一個 64×64 方塊 (tile) 為單位的矩陣乘法 (GEMM): (unoptimized.c
)
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#define TILE_SIZE 64
static inline void mm_tile(float *A,
float *B,
float *C,
size_t stride_a,
size_t stride_b,
size_t stride_c)
{
for (size_t i = 0; i < TILE_SIZE; i++) {
for (size_t j = 0; j < TILE_SIZE; j++) {
const size_t ic = i * stride_c + j;
for (size_t k = 0; k < TILE_SIZE; k++) {
const size_t ia = i * stride_a + k;
const size_t ib = k * stride_b + j;
C[ic] += A[ia] * B[ib];
}
}
}
}
static inline void mm_edge(float *A,
float *B,
float *C,
size_t stride_a,
size_t stride_b,
size_t stride_c,
size_t tile_m,
size_t tile_n,
size_t tile_p)
{
for (size_t i = 0; i < tile_m; i++) {
for (size_t j = 0; j < tile_p; j++) {
const size_t ic = i * stride_c + j;
for (size_t k = 0; k < tile_n; k++) {
const size_t ia = i * stride_a + k;
const size_t ib = k * stride_b + j;
C[ic] += A[ia] * B[ib];
}
}
}
}
void mm(float *A, float *B, float *C, size_t m, size_t n, size_t p)
{
const size_t inm = m - m % TILE_SIZE;
const size_t inn = n - n % TILE_SIZE;
const size_t inp = p - p % TILE_SIZE;
memset(C, 0, m * p * sizeof(float));
for (size_t i = 0; i < m; i += TILE_SIZE) {
for (size_t j = 0; j < p; j += TILE_SIZE) {
const size_t ic = i * p + j;
for (size_t k = 0; k < n; k += TILE_SIZE) {
const size_t ia = i * n + k;
const size_t ib = k * p + j;
if (i < inm && j < inp && k < inn) {
mm_tile(A + ia, B + ib, C + ic, n, p, p);
} else {
const size_t tile_m =
(i + TILE_SIZE > m) ? (m - i) : TILE_SIZE;
const size_t tile_n =
(k + TILE_SIZE > n) ? (n - k) : TILE_SIZE;
const size_t tile_p =
(j + TILE_SIZE > p) ? (p - j) : TILE_SIZE;
mm_edge(A + ia, B + ib, C + ic, n, p, p, tile_m, tile_n,
tile_p);
}
}
}
}
}
void fill_rand(float *arr, size_t size)
{
for (size_t i = 0; i < size; i++)
arr[i] = (float) (rand() % 10);
}
void print_mat(const float *mat, size_t m, size_t n)
{
for (size_t i = 0; i < m; i++) {
for (size_t j = 0; j < n; j++) {
printf("%.2f", mat[i * n + j]);
if (j < n - 1)
printf(", ");
}
printf("\n");
}
printf("---\n");
}
int parse_int(const char *str)
{
char *end;
long val = strtol(str, &end, 10);
if (*end || str == end) {
fprintf(stderr, "Invalid integer: '%s'\n", str);
exit(1);
}
return (int) val;
}
int main(int argc, char *argv[])
{
if (argc != 4) {
fprintf(stderr, "Usage: %s <m> <n> <p>\n", argv[0]);
return 1;
}
srand(314);
size_t m = parse_int(argv[1]);
size_t n = parse_int(argv[2]);
size_t p = parse_int(argv[3]);
float *A = malloc(m * n * sizeof(float));
float *B = malloc(n * p * sizeof(float));
float *C = malloc(m * p * sizeof(float));
fill_rand(A, m * n);
fill_rand(B, n * p);
#ifdef VALIDATE
print_mat(A, m, n);
print_mat(B, n, p);
#endif
struct timespec start, end;
clock_gettime(CLOCK_MONOTONIC, &start);
mm(A, B, C, m, n, p);
clock_gettime(CLOCK_MONOTONIC, &end);
#ifndef VALIDATE
double elapsed =
(end.tv_sec - start.tv_sec) + (end.tv_nsec - start.tv_nsec) / 1e9;
printf("%.9f\n", elapsed);
#endif
#ifdef VALIDATE
print_mat(C, m, p);
#endif
free(A);
free(B);
free(C);
return 0;
}
程式解說:
mm_tile
:專責處理恰為 64×64×64 的完整子矩陣乘法。外層二重迴圈掃描 tile 上的 (i, j),最內層迴圈累加 k 維,計算 C[i,j] += A[i,k] * B[k,j]
。指標位移以 stride_*
參數決定,可對應原矩陣的實際 row 長
mm_edge
:當 tile 位於矩陣邊緣、維度不足 64 時呼叫。它與 mm_tile
相同,但三個 tile 尺寸 (tile_m
, tile_n
, tile_p
) 在執行期決定,因此迴圈邊界無法在編譯期展開,導致編譯器難以向量化
mm
:外層三重 tile 迴圈 (i, j, k) 將整體運算拆分為小方塊;先將 C 清零,再逐塊呼叫 mm_tile
或 mm_edge
矩陣皆採 row-major 佈局。mm_tile
的 A 存取模式 (ia = i*stride_a + k
) 連續;然而 B 的存取 (ib = k*stride_b + j
) 固定 j、遞增 k,實際上沿著 column 方向跨 stride_b (= p) 位移 ── 對 row-major 矩陣而言屬高度跳躍式存取,快取命中率差,也妨礙 SIMD 向量化。
直接把矩陣複製到新緩衝區並以零值實體填充至 64 的倍數,再對 B 做一次轉置,往往比目前的程式碼更快,原因包含:
- 可避免對每個 tile 執行大小判斷,消除分支與函式呼叫開銷。雖然這些分支在大量完整 tile 時可被分支預測有效隱藏,但在邊緣區域 (特別是 m、n、p 只要有一邊非 64 的整數倍) 仍需大量
mm_edge
呼叫
mm_edge
的迴圈上限為變數,編譯器無法把迴圈攤平成固定 64 步,也難以套用最佳化如完全向量化或內迴圈展開;反之,zero padding 讓所有 tile 尺寸定值,可讓 CPU pipeline、SIMD、prefetcher 維持穩定節奏
- 轉置 B 之後,最內層
k
迴圈讀取 B 的位址會變成連續 (stride = 1),大幅降低 TLB 與快取未命中。現行程式在 k
方向每步跳 p 個 float,對 64×64 tile 而言一次 k 迴圈就跨越 256 KB (若 p≈1024),遠超 L1D cache 容量
- 在現代處理器上,將少量零值複製到對齊的新緩衝區成本相對極小,且一經填充後可反覆重用,從而降低存取成本
main.c
在 unoptimized.c
的基礎之上,提出以下關鍵改動:
程式初始化階段
main.c
先將三個維度都向上補齊到 64 的倍數 (ALIGN_UP
),再把 A 以 row-major 的方式進行 zero padding,並把 B 轉置後再 zero padding 到新緩衝區。藉由 aligned_alloc
以 64-byte 邊界對齊 (MEM_ALIGNMENT
),隨後所有 tile 都是完整的 64×64,省去原本在內層迴圈檢查邊界的分支。
轉置後的 B 使 mm_tile
讀取 B 時沿著連續位址 ((tj+j)*stride_b + k
) 迭代 k。與 unoptimized.c
中每次跳 p
個元素的欄存取相比,快取與 TLB 命中顯著改善。A 仍保持 row-major ,因此在 k 迴圈也是一維連續掃描。
新的 mm_tile
把 64×64 再切成 8×8 micro-tile (MICRO_TILE
);內層把 8×8 的局部積累放進 32 個暫存器陣列 sum
。這種 register blocking 減少寫回頻率,讓編譯器容易產生 SIMD 指令,同時使較小的工作單元適合佈署到執行緒池。
init_thread_pool
先建立 N_CORES
道執行緒並用 pthread_setaffinity_np
綁定至實體處理器核。每個 64×64 輸出 tile 封裝成 task_t
後丟進鎖保護的佇列;工作執行緒以 condition variable 等待並取出任務,呼叫 mm_tile
完成計算,最後以 _Atomic
計數器回報進度。相較於在主執行緒巢狀三重迴圈的單核處理器執行,該佇列模型能在 A、B 重用程度高的情況下讓 N_CORES
個處理器核平行運算,且排程成本固定,遠小於每 tile 重新建立執行緒的開銷。
由於已 zero padding,mm()
不再需要邊界判斷,也不必呼叫 mm_edge
。熱區迴圈完全由編譯期常數控制,配合前述微分塊,分支預測失誤幾乎不發生,而 unoptimized.c
在 m、n、p 任一邊非 64 的倍數時會大量執行變動上限的迴圈與檢查,導致指令路徑分裂與向量化受阻。
因此,main.c
在多核系統上對大型矩陣(特別是 n, m, p ≫ 64)可取得顯著記憶體/指令級提升。
changes.diff 是從 unoptimized.c
到 main.c
的修改。請補完程式碼,使其運作符合預期。書寫規範:
AAAA
包含 atomic_
開頭的函式
BBBB
和 CCCC
包含 pthread_
開頭的函式,注意 conditional variable 的使用
- 均不包含空白字元,也沒有
;?:
等字元
以下程式用來驗證 main.c
和 unoptimized.c
(編譯時要指定 -DVALIDATE
):
import numpy as np
import subprocess
import sys
def run_exec(exe, m, n, p):
"""Run the executable with dimensions and capture its output."""
cmd = [f"./{exe}", str(m), str(n), str(p)]
cp = subprocess.run(cmd, capture_output=True, text=True, check=True)
return cp.stdout
def parse_matrices(output):
"""Split the output at '---' and convert each block to a NumPy array."""
blocks = []
curr = []
for line in output.splitlines():
if line.strip() == '---':
if curr:
blocks.append(curr); curr = []
elif line.strip():
curr.append(line)
if len(blocks) < 3:
raise RuntimeError(f"Expected 3 matrices, found {len(blocks)} blocks")
return [to_array(blocks[i]) for i in range(3)]
def to_array(lines):
rows = []
for row in lines:
vals = [float(x) for x in row.split(',') if x.strip()]
rows.append(vals)
return np.array(rows, dtype=float)
def print_mat(mat, name):
"""Print a matrix in 2-decimals, comma-separated format."""
r, c = mat.shape
print(f"\nMatrix {name} ({r}×{c}):")
for i in range(r):
row = ", ".join(f"{mat[i,j]:.2f}" for j in range(c))
print(" " + row)
def validate(A, B, C, tol=1e-5):
D = A.dot(B)
bad = np.argwhere(~np.isclose(D, C, atol=tol))
if bad.size == 0:
return True, None, None
diffs = D - C
info = [(i+1, j+1, D[i,j], C[i,j], diffs[i,j]) for i,j in bad]
return False, info, D
def main():
exe = "main"
for size in range(2, 129):
print(f"Testing {size}×{size}×{size}...", flush=True)
out = run_exec(exe, size, size, size)
A, B, C = parse_matrices(out)
ok, mismatches, D = validate(A, B, C)
if not ok:
print(f"\n❌ Mismatch at size {size}:")
print_mat(A, "A")
print_mat(B, "B")
print_mat(C, "C (from C program)")
print_mat(D, "D = A @ B (Python)")
print("\nFirst few differing entries:")
for r, c, comp, exp, diff in mismatches[:10]:
print(f" row {r}, col {c}: computed={comp}, expected={exp}, diff={diff}")
sys.exit(1)
else:
print(" ✓ OK")
print("\n✅ All sizes 2–128 validated successfully.")
if __name__ == "__main__":
main()
參考執行輸出: