Try   HackMD

2025q1 第 12 週測驗題

目的: 檢驗學員對 CS:APP 第 6 章並行程式設計的認知

作答表單: 測驗 1

測驗 1

Visualization of cache-optimized matrix multiplication

考慮以下程式,我們以 32 位元浮點數 (float) 為資料型別,實作一個 64×64 方塊 (tile) 為單位的矩陣乘法 (GEMM): (unoptimized.c)

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>

#define TILE_SIZE 64

static inline void mm_tile(float *A,
                           float *B,
                           float *C,
                           size_t stride_a,
                           size_t stride_b,
                           size_t stride_c)
{
    for (size_t i = 0; i < TILE_SIZE; i++) {
        for (size_t j = 0; j < TILE_SIZE; j++) {
            const size_t ic = i * stride_c + j;
            for (size_t k = 0; k < TILE_SIZE; k++) {
                const size_t ia = i * stride_a + k;
                const size_t ib = k * stride_b + j;
                C[ic] += A[ia] * B[ib];
            }
        }
    }
}

static inline void mm_edge(float *A,
                           float *B,
                           float *C,
                           size_t stride_a,
                           size_t stride_b,
                           size_t stride_c,
                           size_t tile_m,
                           size_t tile_n,
                           size_t tile_p)
{
    for (size_t i = 0; i < tile_m; i++) {
        for (size_t j = 0; j < tile_p; j++) {
            const size_t ic = i * stride_c + j;
            for (size_t k = 0; k < tile_n; k++) {
                const size_t ia = i * stride_a + k;
                const size_t ib = k * stride_b + j;
                C[ic] += A[ia] * B[ib];
            }
        }
    }
}

void mm(float *A, float *B, float *C, size_t m, size_t n, size_t p)
{
    const size_t inm = m - m % TILE_SIZE;
    const size_t inn = n - n % TILE_SIZE;
    const size_t inp = p - p % TILE_SIZE;

    memset(C, 0, m * p * sizeof(float));

    for (size_t i = 0; i < m; i += TILE_SIZE) {
        for (size_t j = 0; j < p; j += TILE_SIZE) {
            const size_t ic = i * p + j;
            for (size_t k = 0; k < n; k += TILE_SIZE) {
                const size_t ia = i * n + k;
                const size_t ib = k * p + j;

                if (i < inm && j < inp && k < inn) {
                    mm_tile(A + ia, B + ib, C + ic, n, p, p);
                } else {
                    const size_t tile_m =
                        (i + TILE_SIZE > m) ? (m - i) : TILE_SIZE;
                    const size_t tile_n =
                        (k + TILE_SIZE > n) ? (n - k) : TILE_SIZE;
                    const size_t tile_p =
                        (j + TILE_SIZE > p) ? (p - j) : TILE_SIZE;
                    mm_edge(A + ia, B + ib, C + ic, n, p, p, tile_m, tile_n,
                            tile_p);
                }
            }
        }
    }
}

void fill_rand(float *arr, size_t size)
{
    for (size_t i = 0; i < size; i++)
        arr[i] = (float) (rand() % 10);
}

void print_mat(const float *mat, size_t m, size_t n)
{
    for (size_t i = 0; i < m; i++) {
        for (size_t j = 0; j < n; j++) {
            printf("%.2f", mat[i * n + j]);
            if (j < n - 1)
                printf(", ");
        }
        printf("\n");
    }
    printf("---\n");
}

int parse_int(const char *str)
{
    char *end;
    long val = strtol(str, &end, 10);
    if (*end || str == end) {
        fprintf(stderr, "Invalid integer: '%s'\n", str);
        exit(1);
    }
    return (int) val;
}

int main(int argc, char *argv[])
{
    if (argc != 4) {
        fprintf(stderr, "Usage: %s <m> <n> <p>\n", argv[0]);
        return 1;
    }

    srand(314);
    size_t m = parse_int(argv[1]);
    size_t n = parse_int(argv[2]);
    size_t p = parse_int(argv[3]);

    float *A = malloc(m * n * sizeof(float));
    float *B = malloc(n * p * sizeof(float));
    float *C = malloc(m * p * sizeof(float));

    fill_rand(A, m * n);
    fill_rand(B, n * p);

#ifdef VALIDATE
    print_mat(A, m, n);
    print_mat(B, n, p);
#endif

    struct timespec start, end;
    clock_gettime(CLOCK_MONOTONIC, &start);
    mm(A, B, C, m, n, p);
    clock_gettime(CLOCK_MONOTONIC, &end);

#ifndef VALIDATE
    double elapsed =
        (end.tv_sec - start.tv_sec) + (end.tv_nsec - start.tv_nsec) / 1e9;
    printf("%.9f\n", elapsed);
#endif

#ifdef VALIDATE
    print_mat(C, m, p);
#endif

    free(A);
    free(B);
    free(C);

    return 0;
}

程式解說:

  • mm_tile:專責處理恰為 64×64×64 的完整子矩陣乘法。外層二重迴圈掃描 tile 上的 (i, j),最內層迴圈累加 k 維,計算 C[i,j] += A[i,k] * B[k,j]。指標位移以 stride_* 參數決定,可對應原矩陣的實際 row 長
  • mm_edge:當 tile 位於矩陣邊緣、維度不足 64 時呼叫。它與 mm_tile 相同,但三個 tile 尺寸 (tile_m, tile_n, tile_p) 在執行期決定,因此迴圈邊界無法在編譯期展開,導致編譯器難以向量化
  • mm:外層三重 tile 迴圈 (i, j, k) 將整體運算拆分為小方塊;先將 C 清零,再逐塊呼叫 mm_tilemm_edge

矩陣皆採 row-major 佈局。mm_tile 的 A 存取模式 (ia = i*stride_a + k) 連續;然而 B 的存取 (ib = k*stride_b + j) 固定 j、遞增 k,實際上沿著 column 方向跨 stride_b (= p) 位移 ── 對 row-major 矩陣而言屬高度跳躍式存取,快取命中率差,也妨礙 SIMD 向量化。

直接把矩陣複製到新緩衝區並以零值實體填充至 64 的倍數,再對 B 做一次轉置,往往比目前的程式碼更快,原因包含:

  • 可避免對每個 tile 執行大小判斷,消除分支與函式呼叫開銷。雖然這些分支在大量完整 tile 時可被分支預測有效隱藏,但在邊緣區域 (特別是 m、n、p 只要有一邊非 64 的整數倍) 仍需大量 mm_edge 呼叫
  • mm_edge 的迴圈上限為變數,編譯器無法把迴圈攤平成固定 64 步,也難以套用最佳化如完全向量化或內迴圈展開;反之,zero padding 讓所有 tile 尺寸定值,可讓 CPU pipeline、SIMD、prefetcher 維持穩定節奏
  • 轉置 B 之後,最內層 k 迴圈讀取 B 的位址會變成連續 (stride = 1),大幅降低 TLB 與快取未命中。現行程式在 k 方向每步跳 p 個 float,對 64×64 tile 而言一次 k 迴圈就跨越 256 KB (若 p≈1024),遠超 L1D cache 容量
  • 在現代處理器上,將少量零值複製到對齊的新緩衝區成本相對極小,且一經填充後可反覆重用,從而降低存取成本

main.cunoptimized.c 的基礎之上,提出以下關鍵改動:

程式初始化階段
main.c 先將三個維度都向上補齊到 64 的倍數 (ALIGN_UP),再把 A 以 row-major 的方式進行 zero padding,並把 B 轉置後再 zero padding 到新緩衝區。藉由 aligned_alloc 以 64-byte 邊界對齊 (MEM_ALIGNMENT),隨後所有 tile 都是完整的 64×64,省去原本在內層迴圈檢查邊界的分支。

轉置後的 B 使 mm_tile 讀取 B 時沿著連續位址 ((tj+j)*stride_b + k) 迭代 k。與 unoptimized.c 中每次跳 p 個元素的欄存取相比,快取與 TLB 命中顯著改善。A 仍保持 row-major ,因此在 k 迴圈也是一維連續掃描。

新的 mm_tile 把 64×64 再切成 8×8 micro-tile (MICRO_TILE);內層把 8×8 的局部積累放進 32 個暫存器陣列 sum。這種 register blocking 減少寫回頻率,讓編譯器容易產生 SIMD 指令,同時使較小的工作單元適合佈署到執行緒池。

init_thread_pool 先建立 N_CORES 道執行緒並用 pthread_setaffinity_np 綁定至實體處理器核。每個 64×64 輸出 tile 封裝成 task_t 後丟進鎖保護的佇列;工作執行緒以 condition variable 等待並取出任務,呼叫 mm_tile 完成計算,最後以 _Atomic 計數器回報進度。相較於在主執行緒巢狀三重迴圈的單核處理器執行,該佇列模型能在 A、B 重用程度高的情況下讓 N_CORES 個處理器核平行運算,且排程成本固定,遠小於每 tile 重新建立執行緒的開銷。

由於已 zero padding,mm() 不再需要邊界判斷,也不必呼叫 mm_edge。熱區迴圈完全由編譯期常數控制,配合前述微分塊,分支預測失誤幾乎不發生,而 unoptimized.c 在 m、n、p 任一邊非 64 的倍數時會大量執行變動上限的迴圈與檢查,導致指令路徑分裂與向量化受阻。

因此,main.c 在多核系統上對大型矩陣(特別是 n, m, p ≫ 64)可取得顯著記憶體/指令級提升。

changes.diff 是從 unoptimized.cmain.c 的修改。請補完程式碼,使其運作符合預期。書寫規範:

  • AAAA 包含 atomic_ 開頭的函式
  • BBBBCCCC 包含 pthread_ 開頭的函式,注意 conditional variable 的使用
  • 均不包含空白字元,也沒有 ;?: 等字元

以下程式用來驗證 main.cunoptimized.c (編譯時要指定 -DVALIDATE):

#!/usr/bin/env python3
import numpy as np
import subprocess
import sys

def run_exec(exe, m, n, p):
    """Run the executable with dimensions and capture its output."""
    cmd = [f"./{exe}", str(m), str(n), str(p)]
    cp = subprocess.run(cmd, capture_output=True, text=True, check=True)
    return cp.stdout

def parse_matrices(output):
    """Split the output at '---' and convert each block to a NumPy array."""
    blocks = []
    curr = []
    for line in output.splitlines():
        if line.strip() == '---':
            if curr:
                blocks.append(curr); curr = []
        elif line.strip():
            curr.append(line)
    if len(blocks) < 3:
        raise RuntimeError(f"Expected 3 matrices, found {len(blocks)} blocks")
    return [to_array(blocks[i]) for i in range(3)]

def to_array(lines):
    rows = []
    for row in lines:
        vals = [float(x) for x in row.split(',') if x.strip()]
        rows.append(vals)
    return np.array(rows, dtype=float)

def print_mat(mat, name):
    """Print a matrix in 2-decimals, comma-separated format."""
    r, c = mat.shape
    print(f"\nMatrix {name} ({r}×{c}):")
    for i in range(r):
        row = ", ".join(f"{mat[i,j]:.2f}" for j in range(c))
        print("  " + row)

def validate(A, B, C, tol=1e-5):
    D = A.dot(B)
    bad = np.argwhere(~np.isclose(D, C, atol=tol))
    if bad.size == 0:
        return True, None, None
    diffs = D - C
    info = [(i+1, j+1, D[i,j], C[i,j], diffs[i,j]) for i,j in bad]
    return False, info, D

def main():
    exe = "main"
    for size in range(2, 129):
        print(f"Testing {size}×{size}×{size}...", flush=True)
        out = run_exec(exe, size, size, size)
        A, B, C = parse_matrices(out)
        ok, mismatches, D = validate(A, B, C)
        if not ok:
            print(f"\n❌ Mismatch at size {size}:")
            # dump full matrices
            print_mat(A, "A")
            print_mat(B, "B")
            print_mat(C, "C (from C program)")
            print_mat(D, "D = A @ B (Python)")
            # then list the individual mismatches
            print("\nFirst few differing entries:")
            for r, c, comp, exp, diff in mismatches[:10]:
                print(f"  row {r}, col {c}: computed={comp}, expected={exp}, diff={diff}")
            sys.exit(1)
        else:
            print(" ✓ OK")
    print("\n✅ All sizes 2–128 validated successfully.")

if __name__ == "__main__":
    main()

參考執行輸出:

...
Testing 126×126×126...
 ✓ OK
Testing 127×127×127...
 ✓ OK
Testing 128×128×128...
 ✓ OK

✅ All sizes 2–128 validated successfully.