Try  HackMD Logo HackMD

[]# Linux 核心專題: 並行程式設計相關測驗題
執行人: TonyLinX

Reviewed by eleanorLYJ

不知道為什麼使用了 lock free 的方式,卻效果更差了

可能是 false sharing 的問題? 你是否使用過 padding 或 alignment 等方式解決?

Reviewed by fcu-D0812998

排成演算法 : 要把哪些任務分配給哪個 ring buffer 很重要。

Reviewed by HeLunWu0317

TODO: 重作第 12 週測驗題

enqueue()好像沒有對 queue full 的情況下做預防?

TODO: 重作第 15 週測驗題

包含延伸問題

TODO: 重作第 12 週測驗題

著重 lock-based vs. lock-free + thread pool
包含延伸問題
期末專題解說影片
Github

運作原理 (unoptimized.c)

以下程式是實作一個 64×64 方塊 (tile) 為單位的矩陣乘法 (GEMM): (unoptimized.c)

Ci,j=k=0n1Ai,kBk,j

Tile-based GEMM 示例

4×4 的矩陣為例,若我們設定 tile(也是 micro tile)大小為
2×2
,則矩陣會被劃分成
2×2
的 tile,如下所示:

C00=A00B00+A01B10C01=A00B01+A01B11C10=A10B00+A11B10C11=A10B01+A11B11
其中,
Aij
表示矩陣
A
中第
i
行第
j
欄的 tile,tile 的尺寸為
2×2

條件式選擇:mm_tile vs mm_edge

在實際程式運行中,對於每個 tile 區塊,會先檢查該區塊是否為完整 tile:

  • 若是完整的 tile,便可直接使用固定大小的 mm_tile 進行矩陣運算。
  • 若不是完整的 tile,則為了避免越界,會使用支援任意大小的 mm_edge 進行矩陣運算。

舉例來說,若欲計算

3×3×3 的矩陣乘法,在最後一行與最後一列上便無法構成
2×2
的完整 tile,因此這些區域將交由 mm_edge 處理。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>

#define TILE_SIZE 64

static inline void mm_tile(float *A,
                           float *B,
                           float *C,
                           size_t stride_a,
                           size_t stride_b,
                           size_t stride_c)
{
    for (size_t i = 0; i < TILE_SIZE; i++) {
        for (size_t j = 0; j < TILE_SIZE; j++) {
            const size_t ic = i * stride_c + j;
            for (size_t k = 0; k < TILE_SIZE; k++) {
                const size_t ia = i * stride_a + k;
                const size_t ib = k * stride_b + j;
                C[ic] += A[ia] * B[ib];
            }
        }
    }
}

static inline void mm_edge(float *A,
                           float *B,
                           float *C,
                           size_t stride_a,
                           size_t stride_b,
                           size_t stride_c,
                           size_t tile_m,
                           size_t tile_n,
                           size_t tile_p)
{
    for (size_t i = 0; i < tile_m; i++) {
        for (size_t j = 0; j < tile_p; j++) {
            const size_t ic = i * stride_c + j;
            for (size_t k = 0; k < tile_n; k++) {
                const size_t ia = i * stride_a + k;
                const size_t ib = k * stride_b + j;
                C[ic] += A[ia] * B[ib];
            }
        }
    }
}

void mm(float *A, float *B, float *C, size_t m, size_t n, size_t p)
{
    const size_t inm = m - m % TILE_SIZE;
    const size_t inn = n - n % TILE_SIZE;
    const size_t inp = p - p % TILE_SIZE;

    memset(C, 0, m * p * sizeof(float));

    for (size_t i = 0; i < m; i += TILE_SIZE) {
        for (size_t j = 0; j < p; j += TILE_SIZE) {
            const size_t ic = i * p + j;
            for (size_t k = 0; k < n; k += TILE_SIZE) {
                const size_t ia = i * n + k;
                const size_t ib = k * p + j;

                if (i < inm && j < inp && k < inn) {
                    mm_tile(A + ia, B + ib, C + ic, n, p, p);
                } else {
                    const size_t tile_m =
                        (i + TILE_SIZE > m) ? (m - i) : TILE_SIZE;
                    const size_t tile_n =
                        (k + TILE_SIZE > n) ? (n - k) : TILE_SIZE;
                    const size_t tile_p =
                        (j + TILE_SIZE > p) ? (p - j) : TILE_SIZE;
                    mm_edge(A + ia, B + ib, C + ic, n, p, p, tile_m, tile_n,
                            tile_p);
                }
            }
        }
    }
}

void fill_rand(float *arr, size_t size)
{
    for (size_t i = 0; i < size; i++)
        arr[i] = (float) (rand() % 10);
}

void print_mat(const float *mat, size_t m, size_t n)
{
    for (size_t i = 0; i < m; i++) {
        for (size_t j = 0; j < n; j++) {
            printf("%.2f", mat[i * n + j]);
            if (j < n - 1)
                printf(", ");
        }
        printf("\n");
    }
    printf("---\n");
}

int parse_int(const char *str)
{
    char *end;
    long val = strtol(str, &end, 10);
    if (*end || str == end) {
        fprintf(stderr, "Invalid integer: '%s'\n", str);
        exit(1);
    }
    return (int) val;
}

int main(int argc, char *argv[])
{
    if (argc != 4) {
        fprintf(stderr, "Usage: %s <m> <n> <p>\n", argv[0]);
        return 1;
    }

    srand(314);
    size_t m = parse_int(argv[1]);
    size_t n = parse_int(argv[2]);
    size_t p = parse_int(argv[3]);

    float *A = malloc(m * n * sizeof(float));
    float *B = malloc(n * p * sizeof(float));
    float *C = malloc(m * p * sizeof(float));

    fill_rand(A, m * n);
    fill_rand(B, n * p);

#ifdef VALIDATE
    print_mat(A, m, n);
    print_mat(B, n, p);
#endif

    struct timespec start, end;
    clock_gettime(CLOCK_MONOTONIC, &start);
    mm(A, B, C, m, n, p);
    clock_gettime(CLOCK_MONOTONIC, &end);

#ifndef VALIDATE
    double elapsed =
        (end.tv_sec - start.tv_sec) + (end.tv_nsec - start.tv_nsec) / 1e9;
    printf("%.9f\n", elapsed);
#endif

#ifdef VALIDATE
    print_mat(C, m, p);
#endif

    free(A);
    free(B);
    free(C);

    return 0;
}

運作原理 (main.c)

Zero padding

ALIGN_UP

在 main.c 中,將矩陣的尺寸(如 m, n, p)補齊成 TILE_SIZE = 64 的倍數,以避免邊界不完整的 tile 出現,進而提升記憶體對齊與計算效率。程式中使用了以下巨集:

#define ALIGN_UP(x) (((x) + TILE_SIZE - 1) & ~(TILE_SIZE - 1))

技巧 :

  • ((x) + TILE_SIZE - 1):確保只要 x 不是 64 的倍數,就會進位到下一個倍數
  • & ~(TILE_SIZE - 1):把最低 6 bits 清 0,讓結果剛好是 64 的倍數

目標 :

  • 省去原本在內層迴圈檢查邊界的分支
  • 讓所有迴圈上限變成編譯期常數,幫助編譯器展開與向量化
pad_matpad_t_mat

將原始矩陣複製到新的、經過 zero-padding 的記憶體區塊中,大小為 TILE_SIZE(64)的倍數。
pad_mat():針對 A 矩陣的補齊
使用 aligned_alloc(MEM_ALIGNMENT, ...) 分配對齊的記憶體(如 64-byte),透過 memset 全部填成 0,由於 A 矩陣 是 row-major 所以不需要轉置,直接透過 memcpy 把原本第 i 行的 c 個 float 複製到新 buffer 中第 i 行開頭,右邊剩下的自動保留為 0(前面用 memset)。

float *pad_mat(const float *src, size_t r, size_t c, size_t padr, size_t padc)
{
    float *dst = aligned_alloc(MEM_ALIGNMENT, padr * padc * sizeof(float));
    memset(dst, 0, padr * padc * sizeof(float));
    for (size_t i = 0; i < r; i++)
        memcpy(dst + i * padc, src + i * c, c * sizeof(float));
    return dst;
}

pad_t_mat():針對 B 矩陣的轉置與補齊
使用雙層迴圈進行轉置與寫入,把原本的 B[i][j] 被寫到 B_T[j][i]

float *pad_t_mat(const float *src, size_t r, size_t c, size_t padr, size_t padc)
{
    float *dst = aligned_alloc(MEM_ALIGNMENT, padr * padc * sizeof(float));
    memset(dst, 0, padr * padc * sizeof(float));
    for (size_t i = 0; i < r; i++)
        for (size_t j = 0; j < c; j++)
            dst[j * padr + i] = src[i * c + j];
    return dst;
}

Thread pool 初始化階段

init_thread_pool

建立指定數量的 worker threads,並設定 CPU affinity:
透過 pthread_create 建立工作執行緒,並將每個 thread 綁定到指定的 CPU core 上(pthread_setaffinity_np)。

for (size_t i = 0; i < num_threads; i++) {
    pthread_create(&pool->threads[i], NULL, worker_thread, pool);
    cpu_set_t cpuset;
    CPU_ZERO(&cpuset);
    CPU_SET(i % N_CORES, &cpuset);
    pthread_setaffinity_np(pool->threads[i], sizeof(cpu_set_t), &cpuset);
}

矩陣運算

mm

把整個矩陣乘法任務,以 tile 為單位拆分成小任務(task_t),並丟給 thread pool 執行。這是一種 Non-blocking Dispatch 的方法,也就是把工作派出去的時候,主程式不會停下來等這個工作做完,而是馬上繼續下一件事。當所有工作丟完後,主線程呼叫 wait_for_completion() 等待所有 tile 被算完。

void mm(float *A,
        float *B,
        float *C,
        size_t m,
        size_t n,
        size_t p,
        threadpool_t *pool)
{
    for (size_t i = 0; i < m; i += TILE_SIZE) {
        for (size_t j = 0; j < p; j += TILE_SIZE) {
            task_t task = {
                .A = A + i * n,       //轉成一維的列
                .B = B + j * n,      //轉成一維的行
                .C = C + i * p + j, //轉成一維的列行
                .stride_a = n,
                .stride_b = n,
                .stride_c = p,
                .n_k = n,
            };
            enqueue(pool, task);
        }
    }
    wait_for_completion(pool);
}
enqueue

任務佇列實際上是一個由 queue_node_t 組成的單向 linked list,當有新的任務進來會加入尾端並以 pthread_cond_signal()not_empty 條件變數的 thread 喚醒,通知有新任務可取。

void enqueue(threadpool_t *pool, task_t task)
{
    queue_node_t *node = malloc(sizeof(queue_node_t));
    *node = (queue_node_t){.task = task, .next = NULL};

    pthread_mutex_lock(&pool->lock);
    atomic_fetch_add(&pool->tasks_remaining, 1);
    if (pool->tail)
        pool->tail->next = node;
    else
        pool->head = node;
    pool->tail = node;

    //有一條新的任務近來,喚醒一條 worker thread
    pthread_cond_signal(&pool->not_empty);
    pthread_mutex_unlock(&pool->lock);
}
worker_thread

當 thread 被建立後,會執行 worker_thread 函示,若 thread pool 沒有要被 shutdown, thread 會先取得鎖,原因是要它可能要讀取和修改 queue 裡面的內容,這些動作必須保證只有一個 thread 在操作。當拿到鎖後,會檢查有沒有任務或thread pool 是否要關閉,如果沒有任務的話就釋放鎖等待別人喚醒(pthread_cond_wait(&pool->not_empty, &pool->lock)),如果有任務就取得任務,釋放鎖給其他 thread 使用。任務內容就是調用 mm_tile 計算一個 tile,完成後減少tasks_remaining;若所有任務還未完成,先完成的 thread,會回到 thread pool 繼續等待,直到有新工作被喚醒。當所有任務皆完成,此 thread 會調用 pthread_cond_broadcast(&pool->all_done) 讓主執行緒能被喚醒。

void *worker_thread(void *arg)
{
    threadpool_t *pool = arg;
    while (!atomic_load(&pool->shutdown)) {
        pthread_mutex_lock(&pool->lock);
        while (!pool->head && !atomic_load(&pool->shutdown))
            pthread_cond_wait(&pool->not_empty, &pool->lock);

        //如果是被「關閉通知」喚醒,那這條 thread 不該再做事,直接退出:
        if (atomic_load(&pool->shutdown)) {
            pthread_mutex_unlock(&pool->lock);
            return NULL;
        }

        queue_node_t *node = pool->head;
        pool->head = node->next;
        if (!pool->head)
            pool->tail = NULL;
        pthread_mutex_unlock(&pool->lock);

        mm_tile(&node->task);
        free(node);

        //任務數量減 1,使用 atomic 確保 thread-safe
        atomic_fetch_sub(&pool->tasks_remaining, 1);

        if (atomic_load(&pool->tasks_remaining) == 0)
            pthread_cond_broadcast(&pool->all_done);
    }
    return NULL;
}

工作完成與喚醒機制

wait_for_completion

主函式在 mmenqueue 完全部工作後,呼叫 wait_for_completion

void wait_for_completion(threadpool_t *pool)
{
    pthread_mutex_lock(&pool->lock);
    while (atomic_load(&pool->tasks_remaining) > 0)
        pthread_cond_wait(&pool->all_done, &pool->lock);
    pthread_mutex_unlock(&pool->lock);
}

釋放資源

destroy_thread_pool

使用 atomic_store(&pool->shutdown, true) 將 shutdown 旗標設為 true。由於這個變數是被多個 thread 共用的,因此使用 atomic 操作可以避免資料競爭,並確保每個 worker thread 都能正確且同步地感知「要結束了」的訊號。
有些 worker 可能正在 pthread_cond_wait(&not_empty) 上等待(因為 queue 暫時是空的),這時呼叫 pthread_cond_broadcast(&not_empty) 將它們全部喚醒。這樣它們會醒來後發現 shutdown == true,並跳出工作迴圈結束自己。
使用 pthread_join 等待所有 thread 都完成並返回。這一步確保 worker threads 不會在後面我們開始釋放記憶體的時候還在執行,避免 use-after-free。最後釋放掉 pool->threads 所使用的記憶體,並呼叫 pthread_mutex_destroypthread_cond_destroy 釋放互斥鎖與條件變數。

效能缺失的部分

main.c

thread_pool

在原本的 main.c 中,thread pool 的實作是 lock-based:使用 pthread_mutex_t 搭配 pthread_cond_t 來保護與同步任務佇列(linked list 實作),此設計雖然正確進行矩陣運算,但是在每次工作執行緒在取任務(worker_thread)與主執行緒在新增任務(enqueue)都需進入相同的臨界區。大量執行緒同時競爭此鎖會造成等待,導致效能缺失。以下程式為 main.c 中, enqueueworker_thread 的設計,使用同一把鎖 pool->lock 保護佇列的增減操作::

void enqueue(threadpool_t *pool, task_t task)
{
    queue_node_t *node = malloc(sizeof(queue_node_t));
    *node = (queue_node_t){.task = task, .next = NULL};

    pthread_mutex_lock(&pool->lock);
    atomic_fetch_add(&pool->tasks_remaining, 1);
    if (pool->tail)
        pool->tail->next = node;
    else
        pool->head = node;
    pool->tail = node;

    //有一條新的任務近來,喚醒一條 worker thread
    pthread_cond_signal(&pool->not_empty);
    pthread_mutex_unlock(&pool->lock);
}

{
    threadpool_t *pool = arg;
    while (!atomic_load(&pool->shutdown)) {
        pthread_mutex_lock(&pool->lock);
        while (!pool->head && !atomic_load(&pool->shutdown))
            pthread_cond_wait(&pool->not_empty, &pool->lock);

        //如果是被「關閉通知」喚醒,那這條 thread 不該再做事,直接退出:
        if (atomic_load(&pool->shutdown)) {
            pthread_mutex_unlock(&pool->lock);
            return NULL;
        }

        queue_node_t *node = pool->head;
        pool->head = node->next;
        if (!pool->head)
            pool->tail = NULL;
        pthread_mutex_unlock(&pool->lock);

        mm_tile(&node->task);
        free(node);

        //任務數量減 1,使用 atomic 確保 thread-safe
        atomic_fetch_sub(&pool->tasks_remaining, 1);

        if (atomic_load(&pool->tasks_remaining) == 0){
            pthread_mutex_lock(&pool->lock);
            pthread_cond_broadcast(&pool->all_done);
            pthread_mutex_unlock(&pool->lock);
        }
    }
    return NULL;
}

參考 並行程式設計: Lock-Free Programming 這篇文章,可以使用 ring buffer 來避免 main thread 放入工作和 worker thread 取出任務的競爭問題,再透過 semaphore 取代 condition variable。
以下的程式碼使用一個 ring buffer 以及 semaphore 的方式實作出 thread pool:

-typedef struct queue_node_t {
-    task_t task;
-    struct queue_node_t *next;
-} queue_node_t;
-
 typedef struct {
-    queue_node_t *head, *tail;
-    pthread_mutex_t lock;
-    pthread_cond_t not_empty, all_done;
+    task_t *tasks;            // ring buffer of tasks
+    size_t capacity;          // total slots in ring buffer
+    atomic_size_t head;       
+    atomic_size_t tail;       
+    sem_t not_empty;          // counts available tasks
+    pthread_mutex_t done_lock;
+    pthread_cond_t all_done;
     pthread_t *threads;
     size_t num_threads;
     atomic_int tasks_remaining;
 void *worker_thread(void *arg)
 {
     threadpool_t *pool = arg;
-    while (!atomic_load(&pool->shutdown)) {
-        pthread_mutex_lock(&pool->lock);
-        while (!pool->head && !atomic_load(&pool->shutdown))
-            pthread_cond_wait(&pool->not_empty, &pool->lock);
-
-        //如果是被「關閉通知」喚醒,那這條 thread 不該再做事,直接退出:
-        if (atomic_load(&pool->shutdown)) {
-            pthread_mutex_unlock(&pool->lock);
-            return NULL;
-        }
-
-        queue_node_t *node = pool->head;
-        pool->head = node->next;
-        if (!pool->head)
-            pool->tail = NULL;
-        pthread_mutex_unlock(&pool->lock);
+    while (true) {
+        sem_wait(&pool->not_empty);
+        if (atomic_load(&pool->shutdown))
+            break;
 
-        mm_tile(&node->task);
-        free(node);
+        size_t idx = atomic_fetch_add(&pool->head, 1) % pool->capacity;
+        task_t task = pool->tasks[idx];
 
-        //任務數量減 1,使用 atomic 確保 thread-safe
+        mm_tile(&task);
         atomic_fetch_sub(&pool->tasks_remaining, 1);
-
-        /*如果這條 thread 完成了最後一個任務,通知所有
-        在主程式裡等 wait_for_completion() 的 thread,
-        因為有很多thread 可能同時在等待某一個thread完成
-        最後一個任務,所以要用 pthread_cond_broadcast,
-        不像 pthread_cond_signal 只會喚醒一個 thread。
-        */
-        if (atomic_load(&pool->tasks_remaining) == 0)
+        if (atomic_load(&pool->tasks_remaining) == 0) {
+            pthread_mutex_lock(&pool->done_lock);
             pthread_cond_broadcast(&pool->all_done);
+            pthread_mutex_unlock(&pool->done_lock);
+        }
     }
     return NULL;
 }
 void enqueue(threadpool_t *pool, task_t task)
 {
-    queue_node_t *node = malloc(sizeof(queue_node_t));
-    *node = (queue_node_t){.task = task, .next = NULL};
-
-    pthread_mutex_lock(&pool->lock);
+    size_t idx = atomic_fetch_add(&pool->tail, 1) % pool->capacity;
+    pool->tasks[idx] = task;
     atomic_fetch_add(&pool->tasks_remaining, 1);
-    if (pool->tail)
-        pool->tail->next = node;
-    else
-        pool->head = node;
-    pool->tail = node;
-
-    //有一條新的任務近來,喚醒一條 worker thread
-    pthread_cond_signal(&pool->not_empty);
-    pthread_mutex_unlock(&pool->lock);
+    sem_post(&pool->not_empty);
 }

效能比較

下圖是比較 lock-based 以及 lock free 的 throughput,可以觀察到目前只使用一個 ring buffer 的方式,效能會比使用 lock-based 還差。

image

可以發現 lock-free 的效果更差了,雖然本實作採用 lock-free ring buffer,理論上減少了鎖競爭,但所有 worker threads 仍需原子性競爭同一個 head 變數,這會導致在多執行緒下產生嚴重的 cache line ping-pong 現象。一次只能有一個 worker 成功取得任務,其餘則進入休眠,造成 thread utilization 下降,反而讓 lock-free 版本效能不如 lock-based。

Cache line ping-pong 指多核心同時頻繁寫入同一塊 cache line(如 atomic 操作共享變數),導致該 cache line 在各核心間不停搬移,嚴重拖慢效能。這種現象屬於 cache false sharing 的極端情形。

為了解決這個問題,我實作了一個 worker 對應一個 queue 的改版設計,每個 thread 有自己獨立的 ring buffer 和 semaphore,任務透過 round-robin 分發,這樣 worker 只需操作自己的 queue,不會再有一堆 worker 搶一個共同變數的問題。

lockfree_rr.c

 typedef struct {
-    task_t *tasks;            // ring buffer of tasks
-    size_t capacity;          // total slots in ring buffer
-    atomic_size_t head;       // consumer index
-    atomic_size_t tail;       // producer index
-    sem_t not_empty;          // counts available tasks
+    task_t *tasks;       // ring buffer of tasks
+    size_t capacity;     // slots per ring buffer
+    atomic_size_t head;  // consumer index
+    atomic_size_t tail;  // producer index
+    sem_t sem;           // counts available tasks
+} ring_buffer_t;
+
+typedef struct {
+    ring_buffer_t *queues;      // array of per-thread queues
+    pthread_t *threads;         // worker threads
+    size_t num_threads;         // number of workers
+    atomic_size_t next_queue;   // for round-robin dispatch
     pthread_mutex_t done_lock;
     pthread_cond_t all_done;
-    pthread_t *threads;
-    size_t num_threads;
-    atomic_int tasks_remaining;
+    atomic_int tasks_remaining; // across all queues
     _Atomic bool shutdown;
 } threadpool_t;


     }
 }

+typedef struct {
+    threadpool_t *pool;
+    size_t index;
+} worker_arg_t;
+
 void *worker_thread(void *arg)
 {
-    threadpool_t *pool = arg;
+    worker_arg_t *warg = arg;
+    threadpool_t *pool = warg->pool;
+    size_t idx_q = warg->index;
+    ring_buffer_t *queue = &pool->queues[idx_q];
+    free(warg);
+
     while (true) {
-        sem_wait(&pool->not_empty);
+        sem_wait(&queue->sem);
         if (atomic_load(&pool->shutdown))
             break;

-        size_t idx = atomic_fetch_add(&pool->head, 1) % pool->capacity;
-        task_t task = pool->tasks[idx];
+        size_t idx = atomic_fetch_add(&queue->head, 1) % queue->capacity;
+        task_t task = queue->tasks[idx];

         mm_tile(&task);
         atomic_fetch_sub(&pool->tasks_remaining, 1);
     *pool = (threadpool_t){
         .num_threads = num_threads,
         .threads = malloc(num_threads * sizeof(pthread_t)),
-        .capacity = capacity,
-        .tasks = malloc(capacity * sizeof(task_t)),
+        .queues = calloc(num_threads, sizeof(ring_buffer_t)),
     };
-    atomic_init(&pool->head, 0);
-    atomic_init(&pool->tail, 0);
-    sem_init(&pool->not_empty, 0, 0);
+    atomic_init(&pool->next_queue, 0);
     pthread_mutex_init(&pool->done_lock, NULL);
     pthread_cond_init(&pool->all_done, NULL);

     for (size_t i = 0; i < num_threads; i++) {
-        pthread_create(&pool->threads[i], NULL, worker_thread, pool);
+        ring_buffer_t *q = &pool->queues[i];
+        q->capacity = capacity;
+        q->tasks = calloc(capacity, sizeof(task_t));
+        atomic_init(&q->head, 0);
+        atomic_init(&q->tail, 0);
+        sem_init(&q->sem, 0, 0);
+
+        worker_arg_t *warg = malloc(sizeof(worker_arg_t));
+        *warg = (worker_arg_t){.pool = pool, .index = i};
+        pthread_create(&pool->threads[i], NULL, worker_thread, warg);
+
         cpu_set_t cpuset;
         CPU_ZERO(&cpuset);
         CPU_SET(i % N_CORES, &cpuset);

 void enqueue(threadpool_t *pool, task_t task)
 {
-    size_t idx = atomic_fetch_add(&pool->tail, 1) % pool->capacity;
-    pool->tasks[idx] = task;
+    size_t qid = atomic_fetch_add(&pool->next_queue, 1) % pool->num_threads;
+    ring_buffer_t *q = &pool->queues[qid];
+
+    size_t idx = atomic_fetch_add(&q->tail, 1) % q->capacity;
+    q->tasks[idx] = task;
     atomic_fetch_add(&pool->tasks_remaining, 1);
-    sem_post(&pool->not_empty);
+    sem_post(&q->sem);
 }

 void wait_for_completion(threadpool_t *pool)
 {
     atomic_store(&pool->shutdown, true);
     for (size_t i = 0; i < pool->num_threads; i++)
-        sem_post(&pool->not_empty); // wake workers
+        sem_post(&pool->queues[i].sem); // wake workers

     for (size_t i = 0; i < pool->num_threads; i++)
         pthread_join(pool->threads[i], NULL);

-    free(pool->tasks);
+    for (size_t i = 0; i < pool->num_threads; i++) {
+        ring_buffer_t *q = &pool->queues[i];
+        free(q->tasks);
+        sem_destroy(&q->sem);
+    }
+    free(pool->queues);
     free(pool->threads);
-    sem_destroy(&pool->not_empty);
     pthread_mutex_destroy(&pool->done_lock);
     pthread_cond_destroy(&pool->all_done);
 }

實驗比較 lockfree vs. lockfree_rr

由下面的實驗發現 lockfree_rr 完成整個矩陣運算的任務所需的時間,比 lockfree 還高。

root@tonylinx:/home/tonylinx/linux2025/GEMM# perf stat -r 10 -e cache-misses,cs ./build/lockfree_bench 2048 2048 2048
Time: 0.415489 sec
Time: 0.414018 sec
Time: 0.423264 sec
Time: 0.433853 sec
Time: 0.414566 sec
Time: 0.429556 sec
Time: 0.418634 sec
Time: 0.427258 sec
Time: 0.421059 sec
Time: 0.419739 sec

 Performance counter stats for './build/lockfree_bench 2048 2048 2048' (10 runs):

         7,599,301      cache-misses                                                            ( +-  3.41% )
               134      cs                                                                      ( +-  2.45% )

           0.58900 +- 0.00256 seconds time elapsed  ( +-  0.43% )
root@tonylinx:/home/tonylinx/linux2025/GEMM# perf stat -r 10 -e cache-misses,cs ./build/lockfree_rr_bench 2048 2048 2048
Time: 0.550715 sec
Time: 0.528759 sec
Time: 0.540224 sec
Time: 0.531338 sec
Time: 0.519645 sec
Time: 0.529487 sec
Time: 0.552652 sec
Time: 0.543132 sec
Time: 0.524136 sec
Time: 0.541093 sec

 Performance counter stats for './build/lockfree_rr_bench 2048 2048 2048' (10 runs):

         9,591,320      cache-misses                                                            ( +-  2.58% )
               124      cs                                                                      ( +-  2.88% )

           0.70733 +- 0.00501 seconds time elapsed  ( +-  0.71% )

問題出在,任務是以 round-robin 方式均勻分派,導致即使某些 worker 剛進入休眠狀態(因為 queue 空了),下一筆任務卻很可能馬上又被分配給它,造成頻繁的睡眠 → 喚醒循環。

void *worker_thread(void *arg)
{
    worker_arg_t *warg = arg;
    threadpool_t *pool = warg->pool;
    size_t idx_q = warg->index;
    ring_buffer_t *queue = &pool->queues[idx_q];
    free(warg);

    while (true) {
        sem_wait(&queue->sem);
        if (atomic_load(&pool->shutdown))
            break;

        size_t idx = atomic_fetch_add(&queue->head, 1) % queue->capacity;
        task_t task = queue->tasks[idx];

        mm_tile(&task);
        atomic_fetch_sub(&pool->tasks_remaining, 1);
        if (atomic_load(&pool->tasks_remaining) == 0) {
            pthread_mutex_lock(&pool->done_lock);
            pthread_cond_broadcast(&pool->all_done);
            pthread_mutex_unlock(&pool->done_lock);
        }
    }
    return NULL;
}

所以我加入一段 spin-before-sleep 的設計,先嘗試「忙等」(spinning)一段時間,以避免 thread 一有工作就睡、馬上又被喚醒。

 #include <string.h>
 #include <unistd.h>

+#define SPIN_LIMIT 1024
 #define TILE_SIZE 64
 #define MICRO_TILE 8
 #define MEM_ALIGNMENT 64
 #endif

 #define ALIGN_UP(x) (((x) + TILE_SIZE - 1) & ~(TILE_SIZE - 1))
-
+static inline void cpu_relax(void) {
+#if defined(__x86_64__) || defined(__i386__)
+    __asm__ __volatile__("pause");
+#else
+    sched_yield();
+#endif
+}
 typedef struct {
     float *A, *B, *C;
     size_t stride_a, stride_b, stride_c;
     free(warg);

     while (true) {
-        sem_wait(&queue->sem);
-        if (atomic_load(&pool->shutdown))
-            break;
+        int spun = 0;
+        while (spun < SPIN_LIMIT) {
+            /* 嘗試非阻塞地取 semaphore;成功就開始工作 */
+            if (sem_trywait(&queue->sem) == 0)
+                break;
+
+            /* 如果主線程要求關閉,直接跳出去 */
+            if (atomic_load_explicit(&pool->shutdown, memory_order_relaxed))
+                return NULL;

+            cpu_relax();
+            ++spun;
+        }
+
+        if (spun == SPIN_LIMIT) {
+            /* 阻塞直到有工作或收到 shutdown 訊號 */
+            sem_wait(&queue->sem);
+            if (atomic_load_explicit(&pool->shutdown, memory_order_relaxed))
+                return NULL;
+        }

實驗比較有無 spin-before-sleep

由下面的實驗可以觀察到矩陣運所需所需時間從原本 0.5 掉到 0.4。但是跟 lockfree 沒有明顯優勢,還不是很好。

root@tonylinx:/home/tonylinx/linux2025/GEMM# perf stat -r 10 -e cache-misses,cs ./build/lockfree_rr_bench 2048 2048 2048
Time: 0.429622 sec
Time: 0.429666 sec
Time: 0.429272 sec
Time: 0.432098 sec
Time: 0.429748 sec
Time: 0.429853 sec
Time: 0.415347 sec
Time: 0.412105 sec
Time: 0.417842 sec
Time: 0.407290 sec

 Performance counter stats for './build/lockfree_rr_bench 2048 2048 2048' (10 runs):

         8,476,478      cache-misses                                                            ( +-  3.14% )
               113      cs                                                                      ( +-  4.32% )

           0.59123 +- 0.00465 seconds time elapsed  ( +-  0.79% )
root@tonylinx:/home/tonylinx/linux2025/GEMM# perf stat -r 10 -e cache-misses,cs ./build/lockfree_bench 2048 2048 2048
Time: 0.409975 sec
Time: 0.425815 sec
Time: 0.426494 sec
Time: 0.416542 sec
Time: 0.416934 sec
Time: 0.419112 sec
Time: 0.417866 sec
Time: 0.420643 sec
Time: 0.418282 sec
Time: 0.433788 sec

 Performance counter stats for './build/lockfree_bench 2048 2048 2048' (10 runs):

         7,957,176      cache-misses                                                            ( +-  2.26% )
               136      cs                                                                      ( +-  2.59% )

           0.58842 +- 0.00255 seconds time elapsed  ( +-  0.43% )

在前面的設計中,我們會讓沒有任務的 worker 進入睡眠,但這樣的做法有一個問題:有些任務的執行時間比較長,如果某些 worker 分到這些重任務,就會忙得要命;而其他沒有任務的 worker 卻早早進入睡眠,這樣就會造成負載不平衡,導致整體效率下降。為了解決這個問題,引入 work stealing 策略。以下的設計讓每個 worker 優先處理自己 queue 的任務,若無任務可取,則進入一段 spin + work stealing 階段,主動嘗試從其他 worker 的 queue 中竊取任務。只有當多次嘗試皆失敗後,才會進入 sleep 狀態等待喚醒,避免頻繁的休眠與喚醒造成效能瓶頸。

+static inline bool try_dequeue_task(ring_buffer_t *q, task_t *out)
+{
+    if (sem_trywait(&q->sem) != 0)      /* 目前沒任務 */
+        return false;
+
+    size_t idx = atomic_fetch_add_explicit(&q->head, 1,
+                                           memory_order_relaxed) % q->capacity;
+    *out = q->tasks[idx];
+    return true;
+}

 void *worker_thread(void *arg)
 {
-    worker_arg_t *warg = arg;
-    threadpool_t *pool = warg->pool;
-    size_t idx_q = warg->index;
-    ring_buffer_t *queue = &pool->queues[idx_q];
+    worker_arg_t  *warg   = arg;
+    threadpool_t  *pool   = warg->pool;
+    size_t         selfID = warg->index;
+    ring_buffer_t *selfQ  = &pool->queues[selfID];
     free(warg);

-    while (true) {
-        int spun = 0;
-        while (spun < SPIN_LIMIT) {
-            if (sem_trywait(&queue->sem) == 0)
-                break;
+    task_t task;  
+
+    for (;;) {
+
+        if (try_dequeue_task(selfQ, &task))
+            goto got_job;
+
+        for (int spin = 0; spin < SPIN_LIMIT; ++spin) {
+
+            for (size_t off = 1; off < pool->num_threads; ++off) {
+                size_t victimID = (selfID + off) % pool->num_threads;
+                ring_buffer_t *vQ = &pool->queues[victimID];
+                if (try_dequeue_task(vQ, &task))
+                    goto got_job;                 
+            }
+
             if (atomic_load_explicit(&pool->shutdown, memory_order_relaxed))
                 return NULL;

             cpu_relax();
-            ++spun;
         }

-        if (spun == SPIN_LIMIT) {
-            sem_wait(&queue->sem);
-            if (atomic_load_explicit(&pool->shutdown, memory_order_relaxed))
-                return NULL;
-        }
-        size_t idx = atomic_fetch_add(&queue->head, 1) % queue->capacity;
-        task_t task = queue->tasks[idx];
-
+        sem_wait(&selfQ->sem);// 阻塞等待自己 queue 被喚醒
+
+        if (atomic_load_explicit(&pool->shutdown, memory_order_relaxed))
+            return NULL;
+
+        size_t idx = atomic_fetch_add_explicit(&selfQ->head, 1,memory_order_relaxed) % selfQ->capacity;
+        task = selfQ->tasks[idx];
+
+got_job:

         mm_tile(&task);
         atomic_fetch_sub(&pool->tasks_remaining, 1);
         if (atomic_load(&pool->tasks_remaining) == 0) {
             pthread_mutex_lock(&pool->done_lock);

實驗比較有無 work stealing

由下面實驗,可以觀察到有了 work stealing 原本矩陣運算的時間可以下降 0.02 sec,從這個設計開始 lockfree_rr 明顯優於 lockfree 的版本,但是還是輸 lock-based。

root@tonylinx:/home/tonylinx/linux2025/GEMM# perf stat -r 10 -e cache-misses,cs ./build/lockfree_rr_bench 2048 2048 2048
Time: 0.393714 sec
Time: 0.389105 sec
Time: 0.384914 sec
Time: 0.398160 sec
Time: 0.391590 sec
Time: 0.385851 sec
Time: 0.400985 sec
Time: 0.386179 sec
Time: 0.380247 sec
Time: 0.396653 sec

 Performance counter stats for './build/lockfree_rr_bench 2048 2048 2048' (10 runs):

         8,959,584      cache-misses                                                            ( +-  3.23% )
               149      cs                                                                      ( +-  3.55% )

           0.55866 +- 0.00290 seconds time elapsed  ( +-  0.52% 
root@tonylinx:/home/tonylinx/linux2025/GEMM# perf stat -r 10 -e cache-misses,cs ./build/main_bench 2048 2048 2048
Time: 0.365671 sec
Time: 0.372024 sec
Time: 0.365962 sec
Time: 0.370060 sec
Time: 0.366794 sec
Time: 0.371532 sec
Time: 0.368807 sec
Time: 0.374896 sec
Time: 0.371446 sec
Time: 0.375241 sec

 Performance counter stats for './build/main_bench 2048 2048 2048' (10 runs):

         6,682,440      cache-misses                                                            ( +-  4.49% )
               137      cs                                                                      ( +-  1.89% )

           0.53518 +- 0.00230 seconds time elapsed  ( +-  0.43% )

這問題出在,原本的 work stealing 設計中,worker 每次僅從其他 queue 中竊取一個任務,當任務不平均時,容易導致某些 thread 不斷嘗試偷任務、造成高頻率的 stealing 行為。為了減少竊取次數,我設計了 batch stealing 機制 steal_batch,讓一個 thread 一次能偷取 STEAL_CHUNK(這裡根據實驗設為 3)個任務,並暫存在steal_buf[] 中。worker thread 會依序執行 steal_buf 中的任務,直到消化完,再重新檢查自己 queue 或再次發動 stealing。
steal_batch 的具體設計是先讀取 headtail 來計算剩餘任務數量,如果剩下的任務數量不夠 STEAL_CHUNK,就直接返回 false,不執行偷取。如果有足夠的任務,會先透過 CAS 操作(atomic_compare_exchange_strong_explicit)檢查是否有另外一個 worker 同時在偷同一個人的任務,如果成功,代表這段區域我搶到了,就可以安全地從 tasks[(head + k) % capacity] 中讀取任務,並且在複製的過程中要減少被偷的 worker 擁有的任務量 (sem_trywait)。若 CAS 失敗,表示 head 已被其他 thread 偷走(或自己偷了),此時不 retry(因為無法保證是否還有足夠任務),而是直接去嘗試偷其他 worker 的任務。

-typedef struct {
+typedef struct{
     task_t *tasks;       // ring buffer of tasks
     size_t capacity;     // slots per ring buffer
     atomic_size_t head;  // consumer index
         return false;

     size_t idx = atomic_fetch_add_explicit(&q->head, 1,
-                                            memory_order_relaxed) % q->capacity;
+                                           memory_order_relaxed) % q->capacity;
     *out = q->tasks[idx];
     return true;
 }
+
+static bool steal_batch(ring_buffer_t *q, task_t *buf, size_t *n_stolen)
+{
+    size_t head = atomic_load_explicit(&q->head, memory_order_relaxed);
+    size_t tail = atomic_load_explicit(&q->tail, memory_order_acquire);
+
+    size_t available = tail - head;
+    if (available <= STEAL_CHUNK)
+        return false;                      /* 不夠就別偷 */
+
+    size_t new_head = head + STEAL_CHUNK;
+
+    /* CAS 把 head 往前推 – claim 這一段 */
+    if (atomic_compare_exchange_strong_explicit(
+            &q->head, &head, new_head,
+            memory_order_acquire, memory_order_relaxed))
+    {
+        for (size_t k = 0; k < STEAL_CHUNK; ++k){
+            buf[k] = q->tasks[(head + k) % q->capacity];
+            sem_trywait(&q->sem);
+        }
+
+
+        *n_stolen = STEAL_CHUNK;
+        return true;
+    }
+    return false;
+}
+
 static inline void mm_tile(const task_t *task)
 {
     for (size_t ti = 0; ti < TILE_SIZE; ti += MICRO_TILE) {
     ring_buffer_t *selfQ  = &pool->queues[selfID];
     free(warg);

-    task_t task;
-
+    task_t task;
+    task_t steal_buf[STEAL_CHUNK]; // 偷到的任務暫存在這
+    size_t steal_n = 0, steal_pos = 0;
     for (;;) {
+        /* 先吃完前面偷到的任務 */
+        if (steal_pos < steal_n) {
+            mm_tile(&steal_buf[steal_pos++]);
+            atomic_fetch_sub(&pool->tasks_remaining, 1);
+            if (atomic_load(&pool->tasks_remaining) == 0) {
+                pthread_mutex_lock(&pool->done_lock);
+                pthread_cond_broadcast(&pool->all_done);
+                pthread_mutex_unlock(&pool->done_lock);
+            }
+            continue;
+        }

+        /* 試著從自己 queue 拿任務 */
         if (try_dequeue_task(selfQ, &task))
             goto got_job;

+        /* busy-wait + work stealing */
         for (int spin = 0; spin < SPIN_LIMIT; ++spin) {
-
-            /* 逐個掃描其他 queue */
             for (size_t off = 1; off < pool->num_threads; ++off) {
                 size_t victimID = (selfID + off) % pool->num_threads;
                 ring_buffer_t *vQ = &pool->queues[victimID];

-                if (try_dequeue_task(vQ, &task))
-                    goto got_job;                 /* 成功偷到 → 做事 */
+                if (steal_batch(vQ, steal_buf, &steal_n)) {
+                    steal_pos = 0;
+                    goto continue_loop;
+                }
             }

-            if (atomic_load_explicit(&pool->shutdown, memory_order_relaxed))
+            if (atomic_load(&pool->shutdown))
                 return NULL;

             cpu_relax();
         }

+        /* sleep 等自己 queue 的任務來 */
         sem_wait(&selfQ->sem);
-
-        if (atomic_load_explicit(&pool->shutdown, memory_order_relaxed))
+        if (atomic_load(&pool->shutdown))
             return NULL;

-        size_t idx = atomic_fetch_add_explicit(&selfQ->head, 1,memory_order_relaxed) % selfQ->capacity;
+        size_t idx = atomic_fetch_add_explicit(&selfQ->head, 1, memory_order_relaxed) % selfQ->capacity;
         task = selfQ->tasks[idx];

-got_job:
+    got_job:
         mm_tile(&task);
-
         atomic_fetch_sub(&pool->tasks_remaining, 1);
         if (atomic_load(&pool->tasks_remaining) == 0) {
             pthread_mutex_lock(&pool->done_lock);
             pthread_cond_broadcast(&pool->all_done);
             pthread_mutex_unlock(&pool->done_lock);
         }
+
+    continue_loop:
+        continue;
     }
     return NULL;
 }

實驗比較不同 STEAL CHUNK size

根據下圖可以發現,當一次竊取數量為 3 時,throughput 最高。

image

實驗比較 lock-based vs. lock-free vs. lock-free-rr

root@tonylinx:/home/tonylinx/linux2025/GEMM# perf stat -r 10 -e cache-misses,cs ./build/main_bench 2048 2048 2048
Time: 0.367595 sec
Time: 0.377324 sec
Time: 0.371927 sec
Time: 0.374560 sec
Time: 0.374375 sec
Time: 0.369202 sec
Time: 0.369010 sec
Time: 0.367379 sec
Time: 0.370429 sec
Time: 0.367065 sec

 Performance counter stats for './build/main_bench 2048 2048 2048' (10 runs):

         7,338,196      cache-misses                                                            ( +-  4.30% )
               135      cs                                                                      ( +-  4.18% )

           0.53808 +- 0.00263 seconds time elapsed  ( +-  0.49% )
root@tonylinx:/home/tonylinx/linux2025/GEMM# perf stat -r 10 -e cache-misses,cs ./build/lockfree_rr_bench 2048 2048 2048
Time: 0.340515 sec
Time: 0.344814 sec
Time: 0.343446 sec
Time: 0.335358 sec
Time: 0.333868 sec
Time: 0.337032 sec
Time: 0.346161 sec
Time: 0.344273 sec
Time: 0.335613 sec
Time: 0.343400 sec

 Performance counter stats for './build/lockfree_rr_bench 2048 2048 2048' (10 runs):

         8,763,560      cache-misses                                                            ( +-  2.51% )
               125      cs                                                                      ( +-  2.59% )

           0.50839 +- 0.00213 seconds time elapsed  ( +-  0.42% )

由此實驗可以看到,經歷上述的這些調整,使得 lock-free-rr 在完成整個矩陣運算過程,可以比 lock-based 還更加有效率。

image

探討效能瓶頸的區域

我用 perf 去觀察整個程式,耗費時間最多的地方在哪裡?可以看到在計算 mm_tile 是最花費時間。

perf record -g ./build/lockfree_rr_bench 2048 2048 2048
perf annotate
for (size_t j = 0; j < MICRO_TILE; j++)                                                                                                       ▒
  3.66 │1a8:   movss    (%r12),%xmm1                                                                                                                       ◆
       │     float a = task->A[(ti + i) * task->stride_a + k];                                                                                             ▒
  0.01 │       mov      %rdi,%rdx                                                                                                                          ▒
  0.39 │       lea      0x20(%rax),%rcx                                                                                                                    ▒
  2.55 │       shufps   $0x0,%xmm1,%xmm1                                                                                                                   ▒
       │     sum[i][j] += a * task->B[(tj + j) * task->stride_b + k];                                                                                      ▒
  3.61 │1b9:   movss    (%rdx,%rbx,8),%xmm0                                                                                                                ▒
  1.79 │       movss    (%rdx,%rbx,4),%xmm2                                                                                                                ▒
  4.42 │       add      $0x10,%rax                                                                                                                         ▒
  4.33 │       movss    (%rdx),%xmm3                                                                                                                       ▒
  3.87 │       unpcklps %xmm0,%xmm2                                                                                                                        ▒
  2.32 │       movss    (%rdx,%r13,1),%xmm0                                                                                                                ▒
  4.11 │       add      %r10,%rdx                                                                                                                          ▒
  4.62 │       unpcklps %xmm3,%xmm0                                                                                                                        ▒
  3.75 │       movlhps  %xmm2,%xmm0                                                                                                                        ▒
 10.60 │       mulps    %xmm1,%xmm0                                                                                                                        ▒
 22.86 │       addps    -0x10(%rax),%xmm0                                                                                                                  ▒
  7.79 │       movaps   %xmm0,-0x10(%rax) 

回顧 lockfree_rr.c 的 mm_tile 可以發現,目前的 mm_tile 並不支援 SIMD 架構,還是取單一個值進行計算。

for (size_t ti = 0; ti < TILE_SIZE; ti += MICRO_TILE) {
        for (size_t tj = 0; tj < TILE_SIZE; tj += MICRO_TILE) {
            float sum[MICRO_TILE][MICRO_TILE] = {0};
            for (size_t k = 0; k < task->n_k; k++) {
                for (size_t i = 0; i < MICRO_TILE; i++) {
                    float a = task->A[(ti + i) * task->stride_a + k];
                    for (size_t j = 0; j < MICRO_TILE; j++)
                        sum[i][j] += a * task->B[(tj + j) * task->stride_b + k];
                }
            }
            for (size_t i = 0; i < MICRO_TILE; i++) {
                for (size_t j = 0; j < MICRO_TILE; j++)
                    task->C[(ti + i) * task->stride_c + (tj + j)] = sum[i][j];
            }
        }
    }

參考 0xff07 HackMD 筆記,我們可以將計算核心改寫為支援 SIMD 的架構。由於 MICRO_TILE = 8,意味著我們的 micro-tile 為 8×8。這使我們可以利用 AVX 的 _m256 指令集,一次處理 8 個 float。結合手動 loop unrolling(展開 8 個 row),即可實作出一次更新 64 個浮點數的方式。

我們做的是:

C=A×B(但 B 是轉置後的,記憶體中存的是 BT

計算公式為:

C[i][j]=k=0k1A[i][k]B[j][k]

每一次 8*8 micro-tile 矩陣乘法的視覺化如下:

A =                                           B^T =                                         C =
[ a00 a01 a02 a03 a04 a05 a06 a07 ... a0k ]   [ b00 b01 b02 b03 b04 b05 b06 b07 ... b0k ]   [ c00 c01 c02 c03 c04 c05 c06 c07 ]
[ a10 a11 a12 a13 a14 a15 a16 a17 ... a1k ]   [ b10 b11 b12 b13 b14 b15 b16 b17 ... b1k ]   [ c10 c11 c12 c13 c14 c15 c16 c17 ]
[ a20 a21 a22 a23 a24 a25 a26 a27 ... a2k ]   [ b20 b21 b22 b23 b24 b25 b26 b27 ... b2k ]   [ c20 c21 c22 c23 c24 c25 c26 c27 ]
[ a30 a31 a32 a33 a34 a35 a36 a37 ... a3k ]   [ b30 b31 b32 b33 b34 b35 b36 b37 ... b3k ]   [ c30 c31 c32 c33 c34 c35 c36 c37 ]
[ a40 a41 a42 a43 a44 a45 a46 a47 ... a4k ]   [ b40 b41 b42 b43 b44 b45 b46 b47 ... b4k ]   [ c40 c41 c42 c43 c44 c45 c46 c47 ]
[ a50 a51 a52 a53 a54 a55 a56 a57 ... a5k ]   [ b50 b51 b52 b53 b54 b55 b56 b57 ... b6k ]   [ c50 c51 c52 c53 c54 c55 c56 c57 ]
[ a60 a61 a62 a63 a64 a65 a66 a67 ... a6k ]   [ b60 b61 b62 b63 b64 b65 b66 b67 ... b6k ]   [ c60 c61 c62 c63 c64 c65 c66 c67 ]
[ a70 a71 a72 a73 a74 a75 a76 a77 ... a7k ]   [ b70 b71 b72 b73 b74 b75 b76 b77 ... b7k ]   [ c70 c71 c72 c73 c74 c75 c76 c77 ]

下面的計算方式可以觀察到,若要計算出 C 矩陣的第一列 (c00 ~ c07), a00 ~ a0k 會被重複使用 8 次,所以我們可以用 __m256 a0 = _mm256_set1_ps() 存 8 個一樣的 a 值,然後分別與 b00 ~ b70 去計算 k = 0 下 c00 ~ c07 的值,然後透過 for loop 將所有 k 計算的結果相加就是 c00 ~ c07 的值。 那其他列計算的方式也跟第一列一模一樣,所以我們可以透過手動 loop unrooling 一次計算 8 列,這樣的方式可以從過去計算單 1 個數值優化成一次計算 64 個數值,大幅增加效能。

c00	= a00·b00 + a01·b01 + a02·b02 + a03·b03 + a04·b04 + a05·b05 + a06·b06 + a07·b07 + ... + a0k·b0k
c01	= a00·b10 + a01·b11 + a02·b12 + a03·b13 + a04·b14 + a05·b15 + a06·b16 + a07·b17 + ... + a0k·b1k
c02	= a00·b20 + a01·b21 + a02·b22 + a03·b23 + a04·b24 + a05·b25 + a06·b26 + a07·b27 + ... + a0k·b2k
c03	= a00·b30 + a01·b31 + a02·b32 + a03·b33 + a04·b34 + a05·b35 + a06·b36 + a07·b37 + ... + a0k·b3k
c04	= a00·b40 + a01·b41 + a02·b42 + a03·b43 + a04·b44 + a05·b45 + a06·b46 + a07·b47 + ... + a0k·b4k
c05	= a00·b50 + a01·b51 + a02·b52 + a03·b53 + a04·b54 + a05·b55 + a06·b56 + a07·b57 + ... + a0k·b5k
c06	= a00·b60 + a01·b61 + a02·b62 + a03·b63 + a04·b64 + a05·b65 + a06·b66 + a07·b67 + ... + a0k·b6k
c07	= a00·b70 + a01·b71 + a02·b72 + a03·b73 + a04·b74 + a05·b75 + a06·b76 + a07·b77 + ... + a0k·b7k


c10	= a10·b00 + a11·b01 + a12·b02 + a13·b03 + a14·b04 + a15·b05 + a16·b06 + a17·b07 + ... + a1k·b0k
c11	= a10·b10 + a11·b11 + a12·b12 + a13·b13 + a14·b14 + a15·b15 + a16·b16 + a17·b17 + ... + a1k·b1k
c12	= a10·b20 + a11·b21 + a12·b22 + a13·b23 + a14·b24 + a15·b25 + a16·b26 + a17·b27 + ... + a1k·b2k
c13	= a10·b30 + a11·b31 + a12·b32 + a13·b33 + a14·b34 + a15·b35 + a16·b36 + a17·b37 + ... + a1k·b3k
c14	= a10·b40 + a11·b41 + a12·b42 + a13·b43 + a14·b44 + a15·b45 + a16·b46 + a17·b47 + ... + a1k·b4k
c15	= a10·b50 + a11·b51 + a12·b52 + a13·b53 + a14·b54 + a15·b55 + a16·b56 + a17·b57 + ... + a1k·b5k
c16	= a10·b60 + a11·b61 + a12·b62 + a13·b63 + a14·b64 + a15·b65 + a16·b66 + a17·b67 + ... + a1k·b6k
c17	= a10·b70 + a11·b71 + a12·b72 + a13·b73 + a14·b74 + a15·b75 + a16·b76 + a17·b77 + ... + a1k·b7k

 static inline void mm_tile(const task_t *task)
 {
-    printf("task->n_k:%ld\n ",task->n_k);
     for (size_t ti = 0; ti < TILE_SIZE; ti += MICRO_TILE) {
         for (size_t tj = 0; tj < TILE_SIZE; tj += MICRO_TILE) {
-            float sum[MICRO_TILE][MICRO_TILE] = {0};
-            for (size_t k = 0; k < task->n_k; k++) {
-                for (size_t i = 0; i < MICRO_TILE; i++) {
-                    float a = task->A[(ti + i) * task->stride_a + k];
-                    for (size_t j = 0; j < MICRO_TILE; j++){
+
+            __m256 c[MICRO_TILE];
+            for (int v = 0; v < MICRO_TILE; ++v) c[v] = _mm256_setzero_ps();
+
+            for (size_t k = 0; k < task->n_k; ++k) {
+                /* 1. broadcast A[ti~ti+7]][k] */
+                __m256 a0 = _mm256_set1_ps(task->A[(ti + 0) * task->stride_a + k]);
+                __m256 a1 = _mm256_set1_ps(task->A[(ti + 1) * task->stride_a + k]);
+                __m256 a2 = _mm256_set1_ps(task->A[(ti + 2) * task->stride_a + k]);
+                __m256 a3 = _mm256_set1_ps(task->A[(ti + 3) * task->stride_a + k]);
+                __m256 a4 = _mm256_set1_ps(task->A[(ti + 4) * task->stride_a + k]);
+                __m256 a5 = _mm256_set1_ps(task->A[(ti + 5) * task->stride_a + k]);
+                __m256 a6 = _mm256_set1_ps(task->A[(ti + 6) * task->stride_a + k]);
+                __m256 a7 = _mm256_set1_ps(task->A[(ti + 7) * task->stride_a + k]);
+
+                /* 2. 對固定 k,把 B 的 8 個 column 分別取出 */
+                const float *baseB = task->B + k; 
+                const size_t sb = task->stride_b;
                 
-                        sum[i][j] += a * task->B[(tj + j) * task->stride_b + k];
-                    }
-                        
-                }
-            }
-            for (size_t i = 0; i < MICRO_TILE; i++) {
-                for (size_t j = 0; j < MICRO_TILE; j++)
-                    task->C[(ti + i) * task->stride_c + (tj + j)] = sum[i][j];
+                //倒過來擺放的原因是 _mm256_set_ps 的設計是高位在前,低位在後,所以它的結構是 b[7]~b[0],不是 b[0]~b[7]
+                __m256 b = _mm256_set_ps(
+                    baseB[(tj + 7) * sb],
+                    baseB[(tj + 6) * sb],
+                    baseB[(tj + 5) * sb],
+                    baseB[(tj + 4) * sb],
+                    baseB[(tj + 3) * sb],
+                    baseB[(tj + 2) * sb],
+                    baseB[(tj + 1) * sb],
+                    baseB[(tj + 0) * sb]);   
+
+                /* 3. FMA accumulate */
+                c[0] = _mm256_fmadd_ps(a0, b, c[0]);
+                c[1] = _mm256_fmadd_ps(a1, b, c[1]);
+                c[2] = _mm256_fmadd_ps(a2, b, c[2]);
+                c[3] = _mm256_fmadd_ps(a3, b, c[3]);
+                c[4] = _mm256_fmadd_ps(a4, b, c[4]);
+                c[5] = _mm256_fmadd_ps(a5, b, c[5]);
+                c[6] = _mm256_fmadd_ps(a6, b, c[6]);
+                c[7] = _mm256_fmadd_ps(a7, b, c[7]);
             }
+
+            /* 4. store back 8×8 */
+            for (int v = 0; v < 8; ++v)
+                _mm256_storeu_ps(&task->C[(ti + v) * task->stride_c + tj], c[v]);
         }
     }
 }
+

看實驗成果,可以發現改成支援 SIMD 架構後, throughput 大副度地提昇。

image

記錄問題

  • main.c pthread_cond_broadcast(&pool->all_done) 的部分應該在呼叫之前要先取得對應 mutex lock,避免 lost wake-up 的問題。 chage.diff

    Calling pthread_cond_signal() or pthread_cond_broadcast() when the thread does not hold the mutex lock associated with the condition can lead to lost wake-up bugs. Oracle Solaris Multithreaded Programming Guide. “Lost Wake-Up Problem”

    The pthread_cond_broadcast() or pthread_cond_signal() functions may be called by a thread whether or not it currently owns the mutex that threads calling pthread_cond_wait() or pthread_cond_timedwait() have associated with the condition variable during their waits; however, if predictable scheduling behavior is required, then that mutex shall be locked by the thread calling pthread_cond_broadcast() or pthread_cond_signal(). — POSIX.1-2017, The Open Group Base Specification, Issue 7 (POSIX.1-2017).

TODO: 重作第 11 週測驗題

包含延伸問題