> []# Linux 核心專題: 並行程式設計相關測驗題 > 執行人: TonyLinX ### Reviewed by `eleanorLYJ` > 不知道為什麼使用了 lock free 的方式,卻效果更差了 可能是 false sharing 的問題? 你是否使用過 padding 或 alignment 等方式解決? ### Reviewed by `fcu-D0812998` >排成演算法 : 要把哪些任務分配給哪個 ring buffer 很重要。 ### Reviewed by `HeLunWu0317` >TODO: 重作第 12 週測驗題 `enqueue()`好像沒有對 queue full 的情況下做預防? ## TODO: 重作[第 15 週測驗題](https://hackmd.io/@sysprog/linux2025-quiz15) > 包含延伸問題 ## TODO: 重作[第 12 週測驗題](https://hackmd.io/@sysprog/linux2025-quiz12) > 著重 lock-based vs. lock-free + thread pool > 包含延伸問題 [期末專題解說影片](https://youtu.be/orq81p7OMIY) [Github](https://github.com/TonyLinX/GEMM) ### 運作原理 (`unoptimized.c`) 以下程式是實作一個 64×64 方塊 (tile) 為單位的矩陣乘法 (GEMM): (unoptimized.c) $$ C_{i,j} \;=\; \sum_{k=0}^{n-1} A_{i,k}\,B_{k,j} $$ #### Tile-based GEMM 示例 以 $4\times4$ 的矩陣為例,若我們設定 tile(也是 micro tile)大小為 $2\times2$,則矩陣會被劃分成 $2\times2$ 的 tile,如下所示: $$ \begin{aligned} C_{00} = A_{00}B_{00} + A_{01}B_{10} \\ C_{01} = A_{00}B_{01} + A_{01}B_{11} \\ C_{10} = A_{10}B_{00} + A_{11}B_{10} \\ C_{11} = A_{10}B_{01} + A_{11}B_{11} \end{aligned} $$ 其中,$A_{ij}$ 表示矩陣 $A$ 中第 $i$ 行第 $j$ 欄的 tile,tile 的尺寸為 $2 \times 2$。 #### 條件式選擇:`mm_tile` vs `mm_edge` 在實際程式運行中,對於每個 tile 區塊,會先檢查該區塊是否為完整 tile: * 若是完整的 tile,便可直接使用固定大小的 `mm_tile` 進行矩陣運算。 * 若不是完整的 tile,則為了避免越界,會使用支援任意大小的 `mm_edge` 進行矩陣運算。 舉例來說,若欲計算 $3\times3\times3$ 的矩陣乘法,在最後一行與最後一列上便無法構成 $2\times2$ 的完整 tile,因此這些區域將交由 `mm_edge` 處理。 ```c #include <stdio.h> #include <stdlib.h> #include <string.h> #include <time.h> #define TILE_SIZE 64 static inline void mm_tile(float *A, float *B, float *C, size_t stride_a, size_t stride_b, size_t stride_c) { for (size_t i = 0; i < TILE_SIZE; i++) { for (size_t j = 0; j < TILE_SIZE; j++) { const size_t ic = i * stride_c + j; for (size_t k = 0; k < TILE_SIZE; k++) { const size_t ia = i * stride_a + k; const size_t ib = k * stride_b + j; C[ic] += A[ia] * B[ib]; } } } } static inline void mm_edge(float *A, float *B, float *C, size_t stride_a, size_t stride_b, size_t stride_c, size_t tile_m, size_t tile_n, size_t tile_p) { for (size_t i = 0; i < tile_m; i++) { for (size_t j = 0; j < tile_p; j++) { const size_t ic = i * stride_c + j; for (size_t k = 0; k < tile_n; k++) { const size_t ia = i * stride_a + k; const size_t ib = k * stride_b + j; C[ic] += A[ia] * B[ib]; } } } } void mm(float *A, float *B, float *C, size_t m, size_t n, size_t p) { const size_t inm = m - m % TILE_SIZE; const size_t inn = n - n % TILE_SIZE; const size_t inp = p - p % TILE_SIZE; memset(C, 0, m * p * sizeof(float)); for (size_t i = 0; i < m; i += TILE_SIZE) { for (size_t j = 0; j < p; j += TILE_SIZE) { const size_t ic = i * p + j; for (size_t k = 0; k < n; k += TILE_SIZE) { const size_t ia = i * n + k; const size_t ib = k * p + j; if (i < inm && j < inp && k < inn) { mm_tile(A + ia, B + ib, C + ic, n, p, p); } else { const size_t tile_m = (i + TILE_SIZE > m) ? (m - i) : TILE_SIZE; const size_t tile_n = (k + TILE_SIZE > n) ? (n - k) : TILE_SIZE; const size_t tile_p = (j + TILE_SIZE > p) ? (p - j) : TILE_SIZE; mm_edge(A + ia, B + ib, C + ic, n, p, p, tile_m, tile_n, tile_p); } } } } } void fill_rand(float *arr, size_t size) { for (size_t i = 0; i < size; i++) arr[i] = (float) (rand() % 10); } void print_mat(const float *mat, size_t m, size_t n) { for (size_t i = 0; i < m; i++) { for (size_t j = 0; j < n; j++) { printf("%.2f", mat[i * n + j]); if (j < n - 1) printf(", "); } printf("\n"); } printf("---\n"); } int parse_int(const char *str) { char *end; long val = strtol(str, &end, 10); if (*end || str == end) { fprintf(stderr, "Invalid integer: '%s'\n", str); exit(1); } return (int) val; } int main(int argc, char *argv[]) { if (argc != 4) { fprintf(stderr, "Usage: %s <m> <n> <p>\n", argv[0]); return 1; } srand(314); size_t m = parse_int(argv[1]); size_t n = parse_int(argv[2]); size_t p = parse_int(argv[3]); float *A = malloc(m * n * sizeof(float)); float *B = malloc(n * p * sizeof(float)); float *C = malloc(m * p * sizeof(float)); fill_rand(A, m * n); fill_rand(B, n * p); #ifdef VALIDATE print_mat(A, m, n); print_mat(B, n, p); #endif struct timespec start, end; clock_gettime(CLOCK_MONOTONIC, &start); mm(A, B, C, m, n, p); clock_gettime(CLOCK_MONOTONIC, &end); #ifndef VALIDATE double elapsed = (end.tv_sec - start.tv_sec) + (end.tv_nsec - start.tv_nsec) / 1e9; printf("%.9f\n", elapsed); #endif #ifdef VALIDATE print_mat(C, m, p); #endif free(A); free(B); free(C); return 0; } ``` ### 運作原理 (main.c) #### Zero padding ##### `ALIGN_UP` 在 main.c 中,將矩陣的尺寸(如 `m`, `n`, `p`)補齊成 `TILE_SIZE = 64` 的倍數,以避免邊界不完整的 tile 出現,進而提升記憶體對齊與計算效率。程式中使用了以下巨集: ```c #define ALIGN_UP(x) (((x) + TILE_SIZE - 1) & ~(TILE_SIZE - 1)) ``` 技巧 : * `((x) + TILE_SIZE - 1)`:確保只要 x 不是 64 的倍數,就會進位到下一個倍數 * ` & ~(TILE_SIZE - 1)`:把最低 6 bits 清 0,讓結果剛好是 64 的倍數 目標 : * 省去原本在內層迴圈檢查邊界的分支 * 讓所有迴圈上限變成編譯期常數,幫助編譯器展開與向量化 ##### `pad_mat` 與 `pad_t_mat` 將原始矩陣複製到新的、經過 zero-padding 的記憶體區塊中,大小為 `TILE_SIZE`(64)的倍數。 `pad_mat()`:針對 A 矩陣的補齊 使用 `aligned_alloc(MEM_ALIGNMENT, ...)` 分配對齊的記憶體(如 64-byte),透過 `memset` 全部填成 0,由於 A 矩陣 是 row-major 所以不需要轉置,直接透過 `memcpy` 把原本第 i 行的 c 個 float 複製到新 buffer 中第 i 行開頭,右邊剩下的自動保留為 0(前面用 memset)。 ```c float *pad_mat(const float *src, size_t r, size_t c, size_t padr, size_t padc) { float *dst = aligned_alloc(MEM_ALIGNMENT, padr * padc * sizeof(float)); memset(dst, 0, padr * padc * sizeof(float)); for (size_t i = 0; i < r; i++) memcpy(dst + i * padc, src + i * c, c * sizeof(float)); return dst; } ``` `pad_t_mat()`:針對 B 矩陣的轉置與補齊 使用雙層迴圈進行轉置與寫入,把原本的 `B[i][j]` 被寫到 `B_T[j][i]`, ```c float *pad_t_mat(const float *src, size_t r, size_t c, size_t padr, size_t padc) { float *dst = aligned_alloc(MEM_ALIGNMENT, padr * padc * sizeof(float)); memset(dst, 0, padr * padc * sizeof(float)); for (size_t i = 0; i < r; i++) for (size_t j = 0; j < c; j++) dst[j * padr + i] = src[i * c + j]; return dst; } ``` #### Thread pool 初始化階段 ##### `init_thread_pool` 建立指定數量的 worker threads,並設定 CPU affinity: 透過 `pthread_create` 建立工作執行緒,並將每個 thread 綁定到指定的 CPU core 上(`pthread_setaffinity_np`)。 ```c for (size_t i = 0; i < num_threads; i++) { pthread_create(&pool->threads[i], NULL, worker_thread, pool); cpu_set_t cpuset; CPU_ZERO(&cpuset); CPU_SET(i % N_CORES, &cpuset); pthread_setaffinity_np(pool->threads[i], sizeof(cpu_set_t), &cpuset); } ``` #### 矩陣運算 ##### `mm` 把整個矩陣乘法任務,以 tile 為單位拆分成小任務(`task_t`),並丟給 thread pool 執行。這是一種 Non-blocking Dispatch 的方法,也就是把工作派出去的時候,主程式不會停下來等這個工作做完,而是馬上繼續下一件事。當所有工作丟完後,主線程呼叫 `wait_for_completion()` 等待所有 tile 被算完。 ```c void mm(float *A, float *B, float *C, size_t m, size_t n, size_t p, threadpool_t *pool) { for (size_t i = 0; i < m; i += TILE_SIZE) { for (size_t j = 0; j < p; j += TILE_SIZE) { task_t task = { .A = A + i * n, //轉成一維的列 .B = B + j * n, //轉成一維的行 .C = C + i * p + j, //轉成一維的列行 .stride_a = n, .stride_b = n, .stride_c = p, .n_k = n, }; enqueue(pool, task); } } wait_for_completion(pool); } ``` ##### `enqueue` 任務佇列實際上是一個由 `queue_node_t` 組成的單向 linked list,當有新的任務進來會加入尾端並以 `pthread_cond_signal()` 將 `not_empty` 條件變數的 thread 喚醒,通知有新任務可取。 ```c void enqueue(threadpool_t *pool, task_t task) { queue_node_t *node = malloc(sizeof(queue_node_t)); *node = (queue_node_t){.task = task, .next = NULL}; pthread_mutex_lock(&pool->lock); atomic_fetch_add(&pool->tasks_remaining, 1); if (pool->tail) pool->tail->next = node; else pool->head = node; pool->tail = node; //有一條新的任務近來,喚醒一條 worker thread pthread_cond_signal(&pool->not_empty); pthread_mutex_unlock(&pool->lock); } ``` ##### `worker_thread` 當 thread 被建立後,會執行 `worker_thread` 函示,若 thread pool 沒有要被 `shutdown`, thread 會先取得鎖,原因是要它可能要讀取和修改 queue 裡面的內容,這些動作必須保證只有一個 thread 在操作。當拿到鎖後,會檢查有沒有任務或thread pool 是否要關閉,如果沒有任務的話就釋放鎖等待別人喚醒(`pthread_cond_wait(&pool->not_empty, &pool->lock)`),如果有任務就取得任務,釋放鎖給其他 thread 使用。任務內容就是調用 `mm_tile` 計算一個 tile,完成後減少`tasks_remaining`;若所有任務還未完成,先完成的 thread,會回到 thread pool 繼續等待,直到有新工作被喚醒。當所有任務皆完成,此 thread 會調用 `pthread_cond_broadcast(&pool->all_done)` 讓主執行緒能被喚醒。 ```c void *worker_thread(void *arg) { threadpool_t *pool = arg; while (!atomic_load(&pool->shutdown)) { pthread_mutex_lock(&pool->lock); while (!pool->head && !atomic_load(&pool->shutdown)) pthread_cond_wait(&pool->not_empty, &pool->lock); //如果是被「關閉通知」喚醒,那這條 thread 不該再做事,直接退出: if (atomic_load(&pool->shutdown)) { pthread_mutex_unlock(&pool->lock); return NULL; } queue_node_t *node = pool->head; pool->head = node->next; if (!pool->head) pool->tail = NULL; pthread_mutex_unlock(&pool->lock); mm_tile(&node->task); free(node); //任務數量減 1,使用 atomic 確保 thread-safe atomic_fetch_sub(&pool->tasks_remaining, 1); if (atomic_load(&pool->tasks_remaining) == 0) pthread_cond_broadcast(&pool->all_done); } return NULL; } ``` #### 工作完成與喚醒機制 ##### `wait_for_completion` 主函式在 `mm` 裡 `enqueue` 完全部工作後,呼叫 `wait_for_completion`。 ```c void wait_for_completion(threadpool_t *pool) { pthread_mutex_lock(&pool->lock); while (atomic_load(&pool->tasks_remaining) > 0) pthread_cond_wait(&pool->all_done, &pool->lock); pthread_mutex_unlock(&pool->lock); } ``` #### 釋放資源 ##### `destroy_thread_pool` 使用 `atomic_store(&pool->shutdown, true)` 將 shutdown 旗標設為 true。由於這個變數是被多個 thread 共用的,因此使用 atomic 操作可以避免資料競爭,並確保每個 worker thread 都能正確且同步地感知「要結束了」的訊號。 有些 worker 可能正在 `pthread_cond_wait(&not_empty)` 上等待(因為 queue 暫時是空的),這時呼叫 `pthread_cond_broadcast(&not_empty)` 將它們全部喚醒。這樣它們會醒來後發現 `shutdown == true`,並跳出工作迴圈結束自己。 使用 `pthread_join` 等待所有 thread 都完成並返回。這一步確保 worker threads 不會在後面我們開始釋放記憶體的時候還在執行,避免 use-after-free。最後釋放掉 `pool->threads` 所使用的記憶體,並呼叫 `pthread_mutex_destroy`、`pthread_cond_destroy` 釋放互斥鎖與條件變數。 ### 效能缺失的部分 #### main.c ![thread_pool](https://hackmd.io/_uploads/ByGSgd5Nle.png) 在原本的 main.c 中,thread pool 的實作是 lock-based:使用 `pthread_mutex_t` 搭配 `pthread_cond_t` 來保護與同步任務佇列(linked list 實作),此設計雖然正確進行矩陣運算,但是在每次工作執行緒在取任務(`worker_thread`)與主執行緒在新增任務(`enqueue`)都需進入相同的臨界區。大量執行緒同時競爭此鎖會造成等待,導致效能缺失。以下程式為 `main.c` 中, `enqueue` 與 `worker_thread` 的設計,使用同一把鎖 `pool->lock` 保護佇列的增減操作:: ```c void enqueue(threadpool_t *pool, task_t task) { queue_node_t *node = malloc(sizeof(queue_node_t)); *node = (queue_node_t){.task = task, .next = NULL}; pthread_mutex_lock(&pool->lock); atomic_fetch_add(&pool->tasks_remaining, 1); if (pool->tail) pool->tail->next = node; else pool->head = node; pool->tail = node; //有一條新的任務近來,喚醒一條 worker thread pthread_cond_signal(&pool->not_empty); pthread_mutex_unlock(&pool->lock); } ``` ```c { threadpool_t *pool = arg; while (!atomic_load(&pool->shutdown)) { pthread_mutex_lock(&pool->lock); while (!pool->head && !atomic_load(&pool->shutdown)) pthread_cond_wait(&pool->not_empty, &pool->lock); //如果是被「關閉通知」喚醒,那這條 thread 不該再做事,直接退出: if (atomic_load(&pool->shutdown)) { pthread_mutex_unlock(&pool->lock); return NULL; } queue_node_t *node = pool->head; pool->head = node->next; if (!pool->head) pool->tail = NULL; pthread_mutex_unlock(&pool->lock); mm_tile(&node->task); free(node); //任務數量減 1,使用 atomic 確保 thread-safe atomic_fetch_sub(&pool->tasks_remaining, 1); if (atomic_load(&pool->tasks_remaining) == 0){ pthread_mutex_lock(&pool->lock); pthread_cond_broadcast(&pool->all_done); pthread_mutex_unlock(&pool->lock); } } return NULL; } ``` 參考 [並行程式設計: Lock-Free Programming](https://hackmd.io/@sysprog/concurrency/%2F%40sysprog%2Fconcurrency-lockfree#Lock-Free-Programming) 這篇文章,可以使用 ring buffer 來避免 main thread 放入工作和 worker thread 取出任務的競爭問題,再透過 semaphore 取代 condition variable。 以下的程式碼使用一個 ring buffer 以及 semaphore 的方式實作出 thread pool: ```diff -typedef struct queue_node_t { - task_t task; - struct queue_node_t *next; -} queue_node_t; - typedef struct { - queue_node_t *head, *tail; - pthread_mutex_t lock; - pthread_cond_t not_empty, all_done; + task_t *tasks; // ring buffer of tasks + size_t capacity; // total slots in ring buffer + atomic_size_t head; + atomic_size_t tail; + sem_t not_empty; // counts available tasks + pthread_mutex_t done_lock; + pthread_cond_t all_done; pthread_t *threads; size_t num_threads; atomic_int tasks_remaining; ``` ```diff void *worker_thread(void *arg) { threadpool_t *pool = arg; - while (!atomic_load(&pool->shutdown)) { - pthread_mutex_lock(&pool->lock); - while (!pool->head && !atomic_load(&pool->shutdown)) - pthread_cond_wait(&pool->not_empty, &pool->lock); - - //如果是被「關閉通知」喚醒,那這條 thread 不該再做事,直接退出: - if (atomic_load(&pool->shutdown)) { - pthread_mutex_unlock(&pool->lock); - return NULL; - } - - queue_node_t *node = pool->head; - pool->head = node->next; - if (!pool->head) - pool->tail = NULL; - pthread_mutex_unlock(&pool->lock); + while (true) { + sem_wait(&pool->not_empty); + if (atomic_load(&pool->shutdown)) + break; - mm_tile(&node->task); - free(node); + size_t idx = atomic_fetch_add(&pool->head, 1) % pool->capacity; + task_t task = pool->tasks[idx]; - //任務數量減 1,使用 atomic 確保 thread-safe + mm_tile(&task); atomic_fetch_sub(&pool->tasks_remaining, 1); - - /*如果這條 thread 完成了最後一個任務,通知所有 - 在主程式裡等 wait_for_completion() 的 thread, - 因為有很多thread 可能同時在等待某一個thread完成 - 最後一個任務,所以要用 pthread_cond_broadcast, - 不像 pthread_cond_signal 只會喚醒一個 thread。 - */ - if (atomic_load(&pool->tasks_remaining) == 0) + if (atomic_load(&pool->tasks_remaining) == 0) { + pthread_mutex_lock(&pool->done_lock); pthread_cond_broadcast(&pool->all_done); + pthread_mutex_unlock(&pool->done_lock); + } } return NULL; } ``` ```diff void enqueue(threadpool_t *pool, task_t task) { - queue_node_t *node = malloc(sizeof(queue_node_t)); - *node = (queue_node_t){.task = task, .next = NULL}; - - pthread_mutex_lock(&pool->lock); + size_t idx = atomic_fetch_add(&pool->tail, 1) % pool->capacity; + pool->tasks[idx] = task; atomic_fetch_add(&pool->tasks_remaining, 1); - if (pool->tail) - pool->tail->next = node; - else - pool->head = node; - pool->tail = node; - - //有一條新的任務近來,喚醒一條 worker thread - pthread_cond_signal(&pool->not_empty); - pthread_mutex_unlock(&pool->lock); + sem_post(&pool->not_empty); } ``` #### 效能比較 下圖是比較 lock-based 以及 lock free 的 throughput,可以觀察到目前只使用一個 ring buffer 的方式,效能會比使用 lock-based 還差。 ![image](https://hackmd.io/_uploads/Bk7N4MiNll.png) 可以發現 lock-free 的效果更差了,雖然本實作採用 lock-free ring buffer,理論上減少了鎖競爭,但所有 worker threads 仍需原子性競爭同一個 head 變數,這會導致在多執行緒下產生嚴重的 cache line ping-pong 現象。一次只能有一個 worker 成功取得任務,其餘則進入休眠,造成 thread utilization 下降,反而讓 lock-free 版本效能不如 lock-based。 >Cache line ping-pong 指多核心同時頻繁寫入同一塊 cache line(如 atomic 操作共享變數),導致該 cache line 在各核心間不停搬移,嚴重拖慢效能。這種現象屬於 cache false sharing 的極端情形。 為了解決這個問題,我實作了一個 worker 對應一個 queue 的改版設計,每個 thread 有自己獨立的 ring buffer 和 semaphore,任務透過 round-robin 分發,這樣 worker 只需操作自己的 queue,不會再有一堆 worker 搶一個共同變數的問題。 ### `lockfree_rr.c` ```diff typedef struct { - task_t *tasks; // ring buffer of tasks - size_t capacity; // total slots in ring buffer - atomic_size_t head; // consumer index - atomic_size_t tail; // producer index - sem_t not_empty; // counts available tasks + task_t *tasks; // ring buffer of tasks + size_t capacity; // slots per ring buffer + atomic_size_t head; // consumer index + atomic_size_t tail; // producer index + sem_t sem; // counts available tasks +} ring_buffer_t; + +typedef struct { + ring_buffer_t *queues; // array of per-thread queues + pthread_t *threads; // worker threads + size_t num_threads; // number of workers + atomic_size_t next_queue; // for round-robin dispatch pthread_mutex_t done_lock; pthread_cond_t all_done; - pthread_t *threads; - size_t num_threads; - atomic_int tasks_remaining; + atomic_int tasks_remaining; // across all queues _Atomic bool shutdown; } threadpool_t; } } +typedef struct { + threadpool_t *pool; + size_t index; +} worker_arg_t; + void *worker_thread(void *arg) { - threadpool_t *pool = arg; + worker_arg_t *warg = arg; + threadpool_t *pool = warg->pool; + size_t idx_q = warg->index; + ring_buffer_t *queue = &pool->queues[idx_q]; + free(warg); + while (true) { - sem_wait(&pool->not_empty); + sem_wait(&queue->sem); if (atomic_load(&pool->shutdown)) break; - size_t idx = atomic_fetch_add(&pool->head, 1) % pool->capacity; - task_t task = pool->tasks[idx]; + size_t idx = atomic_fetch_add(&queue->head, 1) % queue->capacity; + task_t task = queue->tasks[idx]; mm_tile(&task); atomic_fetch_sub(&pool->tasks_remaining, 1); *pool = (threadpool_t){ .num_threads = num_threads, .threads = malloc(num_threads * sizeof(pthread_t)), - .capacity = capacity, - .tasks = malloc(capacity * sizeof(task_t)), + .queues = calloc(num_threads, sizeof(ring_buffer_t)), }; - atomic_init(&pool->head, 0); - atomic_init(&pool->tail, 0); - sem_init(&pool->not_empty, 0, 0); + atomic_init(&pool->next_queue, 0); pthread_mutex_init(&pool->done_lock, NULL); pthread_cond_init(&pool->all_done, NULL); for (size_t i = 0; i < num_threads; i++) { - pthread_create(&pool->threads[i], NULL, worker_thread, pool); + ring_buffer_t *q = &pool->queues[i]; + q->capacity = capacity; + q->tasks = calloc(capacity, sizeof(task_t)); + atomic_init(&q->head, 0); + atomic_init(&q->tail, 0); + sem_init(&q->sem, 0, 0); + + worker_arg_t *warg = malloc(sizeof(worker_arg_t)); + *warg = (worker_arg_t){.pool = pool, .index = i}; + pthread_create(&pool->threads[i], NULL, worker_thread, warg); + cpu_set_t cpuset; CPU_ZERO(&cpuset); CPU_SET(i % N_CORES, &cpuset); void enqueue(threadpool_t *pool, task_t task) { - size_t idx = atomic_fetch_add(&pool->tail, 1) % pool->capacity; - pool->tasks[idx] = task; + size_t qid = atomic_fetch_add(&pool->next_queue, 1) % pool->num_threads; + ring_buffer_t *q = &pool->queues[qid]; + + size_t idx = atomic_fetch_add(&q->tail, 1) % q->capacity; + q->tasks[idx] = task; atomic_fetch_add(&pool->tasks_remaining, 1); - sem_post(&pool->not_empty); + sem_post(&q->sem); } void wait_for_completion(threadpool_t *pool) { atomic_store(&pool->shutdown, true); for (size_t i = 0; i < pool->num_threads; i++) - sem_post(&pool->not_empty); // wake workers + sem_post(&pool->queues[i].sem); // wake workers for (size_t i = 0; i < pool->num_threads; i++) pthread_join(pool->threads[i], NULL); - free(pool->tasks); + for (size_t i = 0; i < pool->num_threads; i++) { + ring_buffer_t *q = &pool->queues[i]; + free(q->tasks); + sem_destroy(&q->sem); + } + free(pool->queues); free(pool->threads); - sem_destroy(&pool->not_empty); pthread_mutex_destroy(&pool->done_lock); pthread_cond_destroy(&pool->all_done); } ``` #### 實驗比較 lockfree vs. lockfree_rr 由下面的實驗發現 `lockfree_rr` 完成整個矩陣運算的任務所需的時間,比 `lockfree` 還高。 ``` root@tonylinx:/home/tonylinx/linux2025/GEMM# perf stat -r 10 -e cache-misses,cs ./build/lockfree_bench 2048 2048 2048 Time: 0.415489 sec Time: 0.414018 sec Time: 0.423264 sec Time: 0.433853 sec Time: 0.414566 sec Time: 0.429556 sec Time: 0.418634 sec Time: 0.427258 sec Time: 0.421059 sec Time: 0.419739 sec Performance counter stats for './build/lockfree_bench 2048 2048 2048' (10 runs): 7,599,301 cache-misses ( +- 3.41% ) 134 cs ( +- 2.45% ) 0.58900 +- 0.00256 seconds time elapsed ( +- 0.43% ) ``` ``` root@tonylinx:/home/tonylinx/linux2025/GEMM# perf stat -r 10 -e cache-misses,cs ./build/lockfree_rr_bench 2048 2048 2048 Time: 0.550715 sec Time: 0.528759 sec Time: 0.540224 sec Time: 0.531338 sec Time: 0.519645 sec Time: 0.529487 sec Time: 0.552652 sec Time: 0.543132 sec Time: 0.524136 sec Time: 0.541093 sec Performance counter stats for './build/lockfree_rr_bench 2048 2048 2048' (10 runs): 9,591,320 cache-misses ( +- 2.58% ) 124 cs ( +- 2.88% ) 0.70733 +- 0.00501 seconds time elapsed ( +- 0.71% ) ``` 問題出在,任務是以 round-robin 方式均勻分派,導致即使某些 worker 剛進入休眠狀態(因為 queue 空了),下一筆任務卻很可能馬上又被分配給它,造成頻繁的睡眠 → 喚醒循環。 ```c void *worker_thread(void *arg) { worker_arg_t *warg = arg; threadpool_t *pool = warg->pool; size_t idx_q = warg->index; ring_buffer_t *queue = &pool->queues[idx_q]; free(warg); while (true) { sem_wait(&queue->sem); if (atomic_load(&pool->shutdown)) break; size_t idx = atomic_fetch_add(&queue->head, 1) % queue->capacity; task_t task = queue->tasks[idx]; mm_tile(&task); atomic_fetch_sub(&pool->tasks_remaining, 1); if (atomic_load(&pool->tasks_remaining) == 0) { pthread_mutex_lock(&pool->done_lock); pthread_cond_broadcast(&pool->all_done); pthread_mutex_unlock(&pool->done_lock); } } return NULL; } ``` 所以我加入一段 spin-before-sleep 的設計,先嘗試「忙等」(spinning)一段時間,以避免 thread 一有工作就睡、馬上又被喚醒。 ```diff #include <string.h> #include <unistd.h> +#define SPIN_LIMIT 1024 #define TILE_SIZE 64 #define MICRO_TILE 8 #define MEM_ALIGNMENT 64 #endif #define ALIGN_UP(x) (((x) + TILE_SIZE - 1) & ~(TILE_SIZE - 1)) - +static inline void cpu_relax(void) { +#if defined(__x86_64__) || defined(__i386__) + __asm__ __volatile__("pause"); +#else + sched_yield(); +#endif +} typedef struct { float *A, *B, *C; size_t stride_a, stride_b, stride_c; free(warg); while (true) { - sem_wait(&queue->sem); - if (atomic_load(&pool->shutdown)) - break; + int spun = 0; + while (spun < SPIN_LIMIT) { + /* 嘗試非阻塞地取 semaphore;成功就開始工作 */ + if (sem_trywait(&queue->sem) == 0) + break; + + /* 如果主線程要求關閉,直接跳出去 */ + if (atomic_load_explicit(&pool->shutdown, memory_order_relaxed)) + return NULL; + cpu_relax(); + ++spun; + } + + if (spun == SPIN_LIMIT) { + /* 阻塞直到有工作或收到 shutdown 訊號 */ + sem_wait(&queue->sem); + if (atomic_load_explicit(&pool->shutdown, memory_order_relaxed)) + return NULL; + } ``` #### 實驗比較有無 spin-before-sleep 由下面的實驗可以觀察到矩陣運所需所需時間從原本 0.5 掉到 0.4。但是跟 `lockfree` 沒有明顯優勢,還不是很好。 ``` root@tonylinx:/home/tonylinx/linux2025/GEMM# perf stat -r 10 -e cache-misses,cs ./build/lockfree_rr_bench 2048 2048 2048 Time: 0.429622 sec Time: 0.429666 sec Time: 0.429272 sec Time: 0.432098 sec Time: 0.429748 sec Time: 0.429853 sec Time: 0.415347 sec Time: 0.412105 sec Time: 0.417842 sec Time: 0.407290 sec Performance counter stats for './build/lockfree_rr_bench 2048 2048 2048' (10 runs): 8,476,478 cache-misses ( +- 3.14% ) 113 cs ( +- 4.32% ) 0.59123 +- 0.00465 seconds time elapsed ( +- 0.79% ) ``` ``` root@tonylinx:/home/tonylinx/linux2025/GEMM# perf stat -r 10 -e cache-misses,cs ./build/lockfree_bench 2048 2048 2048 Time: 0.409975 sec Time: 0.425815 sec Time: 0.426494 sec Time: 0.416542 sec Time: 0.416934 sec Time: 0.419112 sec Time: 0.417866 sec Time: 0.420643 sec Time: 0.418282 sec Time: 0.433788 sec Performance counter stats for './build/lockfree_bench 2048 2048 2048' (10 runs): 7,957,176 cache-misses ( +- 2.26% ) 136 cs ( +- 2.59% ) 0.58842 +- 0.00255 seconds time elapsed ( +- 0.43% ) ``` 在前面的設計中,我們會讓沒有任務的 worker 進入睡眠,但這樣的做法有一個問題:有些任務的執行時間比較長,如果某些 worker 分到這些重任務,就會忙得要命;而其他沒有任務的 worker 卻早早進入睡眠,這樣就會造成負載不平衡,導致整體效率下降。為了解決這個問題,引入 work stealing 策略。以下的設計讓每個 worker 優先處理自己 queue 的任務,若無任務可取,則進入一段 spin + work stealing 階段,主動嘗試從其他 worker 的 queue 中竊取任務。只有當多次嘗試皆失敗後,才會進入 sleep 狀態等待喚醒,避免頻繁的休眠與喚醒造成效能瓶頸。 ```diff +static inline bool try_dequeue_task(ring_buffer_t *q, task_t *out) +{ + if (sem_trywait(&q->sem) != 0) /* 目前沒任務 */ + return false; + + size_t idx = atomic_fetch_add_explicit(&q->head, 1, + memory_order_relaxed) % q->capacity; + *out = q->tasks[idx]; + return true; +} void *worker_thread(void *arg) { - worker_arg_t *warg = arg; - threadpool_t *pool = warg->pool; - size_t idx_q = warg->index; - ring_buffer_t *queue = &pool->queues[idx_q]; + worker_arg_t *warg = arg; + threadpool_t *pool = warg->pool; + size_t selfID = warg->index; + ring_buffer_t *selfQ = &pool->queues[selfID]; free(warg); - while (true) { - int spun = 0; - while (spun < SPIN_LIMIT) { - if (sem_trywait(&queue->sem) == 0) - break; + task_t task; + + for (;;) { + + if (try_dequeue_task(selfQ, &task)) + goto got_job; + + for (int spin = 0; spin < SPIN_LIMIT; ++spin) { + + for (size_t off = 1; off < pool->num_threads; ++off) { + size_t victimID = (selfID + off) % pool->num_threads; + ring_buffer_t *vQ = &pool->queues[victimID]; + if (try_dequeue_task(vQ, &task)) + goto got_job; + } + if (atomic_load_explicit(&pool->shutdown, memory_order_relaxed)) return NULL; cpu_relax(); - ++spun; } - if (spun == SPIN_LIMIT) { - sem_wait(&queue->sem); - if (atomic_load_explicit(&pool->shutdown, memory_order_relaxed)) - return NULL; - } - size_t idx = atomic_fetch_add(&queue->head, 1) % queue->capacity; - task_t task = queue->tasks[idx]; - + sem_wait(&selfQ->sem);// 阻塞等待自己 queue 被喚醒 + + if (atomic_load_explicit(&pool->shutdown, memory_order_relaxed)) + return NULL; + + size_t idx = atomic_fetch_add_explicit(&selfQ->head, 1,memory_order_relaxed) % selfQ->capacity; + task = selfQ->tasks[idx]; + +got_job: mm_tile(&task); atomic_fetch_sub(&pool->tasks_remaining, 1); if (atomic_load(&pool->tasks_remaining) == 0) { pthread_mutex_lock(&pool->done_lock); ``` #### 實驗比較有無 work stealing 由下面實驗,可以觀察到有了 work stealing 原本矩陣運算的時間可以下降 0.02 sec,從這個設計開始 lockfree_rr 明顯優於 lockfree 的版本,但是還是輸 lock-based。 ``` root@tonylinx:/home/tonylinx/linux2025/GEMM# perf stat -r 10 -e cache-misses,cs ./build/lockfree_rr_bench 2048 2048 2048 Time: 0.393714 sec Time: 0.389105 sec Time: 0.384914 sec Time: 0.398160 sec Time: 0.391590 sec Time: 0.385851 sec Time: 0.400985 sec Time: 0.386179 sec Time: 0.380247 sec Time: 0.396653 sec Performance counter stats for './build/lockfree_rr_bench 2048 2048 2048' (10 runs): 8,959,584 cache-misses ( +- 3.23% ) 149 cs ( +- 3.55% ) 0.55866 +- 0.00290 seconds time elapsed ( +- 0.52% ``` ``` root@tonylinx:/home/tonylinx/linux2025/GEMM# perf stat -r 10 -e cache-misses,cs ./build/main_bench 2048 2048 2048 Time: 0.365671 sec Time: 0.372024 sec Time: 0.365962 sec Time: 0.370060 sec Time: 0.366794 sec Time: 0.371532 sec Time: 0.368807 sec Time: 0.374896 sec Time: 0.371446 sec Time: 0.375241 sec Performance counter stats for './build/main_bench 2048 2048 2048' (10 runs): 6,682,440 cache-misses ( +- 4.49% ) 137 cs ( +- 1.89% ) 0.53518 +- 0.00230 seconds time elapsed ( +- 0.43% ) ``` 這問題出在,原本的 work stealing 設計中,worker 每次僅從其他 queue 中竊取一個任務,當任務不平均時,容易導致某些 thread 不斷嘗試偷任務、造成高頻率的 stealing 行為。為了減少竊取次數,我設計了 batch stealing 機制 `steal_batch`,讓一個 thread 一次能偷取 `STEAL_CHUNK`(這裡根據實驗設為 3)個任務,並暫存在`steal_buf[]` 中。worker thread 會依序執行 `steal_buf` 中的任務,直到消化完,再重新檢查自己 queue 或再次發動 stealing。 `steal_batch` 的具體設計是先讀取 `head` 與 `tail` 來計算剩餘任務數量,如果剩下的任務數量不夠 `STEAL_CHUNK`,就直接返回 false,不執行偷取。如果有足夠的任務,會先透過 CAS 操作(`atomic_compare_exchange_strong_explicit`)檢查是否有另外一個 worker 同時在偷同一個人的任務,如果成功,代表這段區域我搶到了,就可以安全地從 `tasks[(head + k) % capacity]` 中讀取任務,並且在複製的過程中要減少被偷的 worker 擁有的任務量 (`sem_trywait`)。若 CAS 失敗,表示 `head` 已被其他 thread 偷走(或自己偷了),此時不 retry(因為無法保證是否還有足夠任務),而是直接去嘗試偷其他 worker 的任務。 ```diff -typedef struct { +typedef struct{ task_t *tasks; // ring buffer of tasks size_t capacity; // slots per ring buffer atomic_size_t head; // consumer index return false; size_t idx = atomic_fetch_add_explicit(&q->head, 1, - memory_order_relaxed) % q->capacity; + memory_order_relaxed) % q->capacity; *out = q->tasks[idx]; return true; } + +static bool steal_batch(ring_buffer_t *q, task_t *buf, size_t *n_stolen) +{ + size_t head = atomic_load_explicit(&q->head, memory_order_relaxed); + size_t tail = atomic_load_explicit(&q->tail, memory_order_acquire); + + size_t available = tail - head; + if (available <= STEAL_CHUNK) + return false; /* 不夠就別偷 */ + + size_t new_head = head + STEAL_CHUNK; + + /* CAS 把 head 往前推 – claim 這一段 */ + if (atomic_compare_exchange_strong_explicit( + &q->head, &head, new_head, + memory_order_acquire, memory_order_relaxed)) + { + for (size_t k = 0; k < STEAL_CHUNK; ++k){ + buf[k] = q->tasks[(head + k) % q->capacity]; + sem_trywait(&q->sem); + } + + + *n_stolen = STEAL_CHUNK; + return true; + } + return false; +} + static inline void mm_tile(const task_t *task) { for (size_t ti = 0; ti < TILE_SIZE; ti += MICRO_TILE) { ring_buffer_t *selfQ = &pool->queues[selfID]; free(warg); - task_t task; - + task_t task; + task_t steal_buf[STEAL_CHUNK]; // 偷到的任務暫存在這 + size_t steal_n = 0, steal_pos = 0; for (;;) { + /* 先吃完前面偷到的任務 */ + if (steal_pos < steal_n) { + mm_tile(&steal_buf[steal_pos++]); + atomic_fetch_sub(&pool->tasks_remaining, 1); + if (atomic_load(&pool->tasks_remaining) == 0) { + pthread_mutex_lock(&pool->done_lock); + pthread_cond_broadcast(&pool->all_done); + pthread_mutex_unlock(&pool->done_lock); + } + continue; + } + /* 試著從自己 queue 拿任務 */ if (try_dequeue_task(selfQ, &task)) goto got_job; + /* busy-wait + work stealing */ for (int spin = 0; spin < SPIN_LIMIT; ++spin) { - - /* 逐個掃描其他 queue */ for (size_t off = 1; off < pool->num_threads; ++off) { size_t victimID = (selfID + off) % pool->num_threads; ring_buffer_t *vQ = &pool->queues[victimID]; - if (try_dequeue_task(vQ, &task)) - goto got_job; /* 成功偷到 → 做事 */ + if (steal_batch(vQ, steal_buf, &steal_n)) { + steal_pos = 0; + goto continue_loop; + } } - if (atomic_load_explicit(&pool->shutdown, memory_order_relaxed)) + if (atomic_load(&pool->shutdown)) return NULL; cpu_relax(); } + /* sleep 等自己 queue 的任務來 */ sem_wait(&selfQ->sem); - - if (atomic_load_explicit(&pool->shutdown, memory_order_relaxed)) + if (atomic_load(&pool->shutdown)) return NULL; - size_t idx = atomic_fetch_add_explicit(&selfQ->head, 1,memory_order_relaxed) % selfQ->capacity; + size_t idx = atomic_fetch_add_explicit(&selfQ->head, 1, memory_order_relaxed) % selfQ->capacity; task = selfQ->tasks[idx]; -got_job: + got_job: mm_tile(&task); - atomic_fetch_sub(&pool->tasks_remaining, 1); if (atomic_load(&pool->tasks_remaining) == 0) { pthread_mutex_lock(&pool->done_lock); pthread_cond_broadcast(&pool->all_done); pthread_mutex_unlock(&pool->done_lock); } + + continue_loop: + continue; } return NULL; } ``` #### 實驗比較不同 STEAL CHUNK size 根據下圖可以發現,當一次竊取數量為 3 時,throughput 最高。 ![image](https://hackmd.io/_uploads/B1hTGnwrlx.png) #### 實驗比較 lock-based vs. lock-free vs. lock-free-rr ``` root@tonylinx:/home/tonylinx/linux2025/GEMM# perf stat -r 10 -e cache-misses,cs ./build/main_bench 2048 2048 2048 Time: 0.367595 sec Time: 0.377324 sec Time: 0.371927 sec Time: 0.374560 sec Time: 0.374375 sec Time: 0.369202 sec Time: 0.369010 sec Time: 0.367379 sec Time: 0.370429 sec Time: 0.367065 sec Performance counter stats for './build/main_bench 2048 2048 2048' (10 runs): 7,338,196 cache-misses ( +- 4.30% ) 135 cs ( +- 4.18% ) 0.53808 +- 0.00263 seconds time elapsed ( +- 0.49% ) ``` ``` root@tonylinx:/home/tonylinx/linux2025/GEMM# perf stat -r 10 -e cache-misses,cs ./build/lockfree_rr_bench 2048 2048 2048 Time: 0.340515 sec Time: 0.344814 sec Time: 0.343446 sec Time: 0.335358 sec Time: 0.333868 sec Time: 0.337032 sec Time: 0.346161 sec Time: 0.344273 sec Time: 0.335613 sec Time: 0.343400 sec Performance counter stats for './build/lockfree_rr_bench 2048 2048 2048' (10 runs): 8,763,560 cache-misses ( +- 2.51% ) 125 cs ( +- 2.59% ) 0.50839 +- 0.00213 seconds time elapsed ( +- 0.42% ) ``` 由此實驗可以看到,經歷上述的這些調整,使得 `lock-free-rr` 在完成整個矩陣運算過程,可以比 lock-based 還更加有效率。 ![image](https://hackmd.io/_uploads/B1HechDrgl.png) ### 探討效能瓶頸的區域 我用 perf 去觀察整個程式,耗費時間最多的地方在哪裡?可以看到在計算 `mm_tile` 是最花費時間。 ```bash perf record -g ./build/lockfree_rr_bench 2048 2048 2048 perf annotate ``` ```bash │ for (size_t j = 0; j < MICRO_TILE; j++) ▒ 3.66 │1a8: movss (%r12),%xmm1 ◆ │ float a = task->A[(ti + i) * task->stride_a + k]; ▒ 0.01 │ mov %rdi,%rdx ▒ 0.39 │ lea 0x20(%rax),%rcx ▒ 2.55 │ shufps $0x0,%xmm1,%xmm1 ▒ │ sum[i][j] += a * task->B[(tj + j) * task->stride_b + k]; ▒ 3.61 │1b9: movss (%rdx,%rbx,8),%xmm0 ▒ 1.79 │ movss (%rdx,%rbx,4),%xmm2 ▒ 4.42 │ add $0x10,%rax ▒ 4.33 │ movss (%rdx),%xmm3 ▒ 3.87 │ unpcklps %xmm0,%xmm2 ▒ 2.32 │ movss (%rdx,%r13,1),%xmm0 ▒ 4.11 │ add %r10,%rdx ▒ 4.62 │ unpcklps %xmm3,%xmm0 ▒ 3.75 │ movlhps %xmm2,%xmm0 ▒ 10.60 │ mulps %xmm1,%xmm0 ▒ 22.86 │ addps -0x10(%rax),%xmm0 ▒ 7.79 │ movaps %xmm0,-0x10(%rax) ``` 回顧 lockfree_rr.c 的 `mm_tile` 可以發現,目前的 `mm_tile` 並不支援 SIMD 架構,還是取單一個值進行計算。 ```c for (size_t ti = 0; ti < TILE_SIZE; ti += MICRO_TILE) { for (size_t tj = 0; tj < TILE_SIZE; tj += MICRO_TILE) { float sum[MICRO_TILE][MICRO_TILE] = {0}; for (size_t k = 0; k < task->n_k; k++) { for (size_t i = 0; i < MICRO_TILE; i++) { float a = task->A[(ti + i) * task->stride_a + k]; for (size_t j = 0; j < MICRO_TILE; j++) sum[i][j] += a * task->B[(tj + j) * task->stride_b + k]; } } for (size_t i = 0; i < MICRO_TILE; i++) { for (size_t j = 0; j < MICRO_TILE; j++) task->C[(ti + i) * task->stride_c + (tj + j)] = sum[i][j]; } } } ``` 參考 [0xff07 HackMD 筆記](https://hackmd.io/@0xff07/sw-pipeline),我們可以將計算核心改寫為支援 SIMD 的架構。由於 `MICRO_TILE = 8`,意味著我們的 micro-tile 為 8×8。這使我們可以利用 AVX 的 _m256 指令集,一次處理 8 個 float。結合手動 loop unrolling(展開 8 個 row),即可實作出一次更新 64 個浮點數的方式。 我們做的是: $$ C = A \times B \quad \text{(但 B 是轉置後的,記憶體中存的是 } B^T \text{)} $$ 計算公式為: $$ C[i][j] = \sum_{k=0}^{k-1} A[i][k] \cdot B[j][k] $$ 每一次 8*8 micro-tile 矩陣乘法的視覺化如下: ``` A = B^T = C = [ a00 a01 a02 a03 a04 a05 a06 a07 ... a0k ] [ b00 b01 b02 b03 b04 b05 b06 b07 ... b0k ] [ c00 c01 c02 c03 c04 c05 c06 c07 ] [ a10 a11 a12 a13 a14 a15 a16 a17 ... a1k ] [ b10 b11 b12 b13 b14 b15 b16 b17 ... b1k ] [ c10 c11 c12 c13 c14 c15 c16 c17 ] [ a20 a21 a22 a23 a24 a25 a26 a27 ... a2k ] [ b20 b21 b22 b23 b24 b25 b26 b27 ... b2k ] [ c20 c21 c22 c23 c24 c25 c26 c27 ] [ a30 a31 a32 a33 a34 a35 a36 a37 ... a3k ] [ b30 b31 b32 b33 b34 b35 b36 b37 ... b3k ] [ c30 c31 c32 c33 c34 c35 c36 c37 ] [ a40 a41 a42 a43 a44 a45 a46 a47 ... a4k ] [ b40 b41 b42 b43 b44 b45 b46 b47 ... b4k ] [ c40 c41 c42 c43 c44 c45 c46 c47 ] [ a50 a51 a52 a53 a54 a55 a56 a57 ... a5k ] [ b50 b51 b52 b53 b54 b55 b56 b57 ... b6k ] [ c50 c51 c52 c53 c54 c55 c56 c57 ] [ a60 a61 a62 a63 a64 a65 a66 a67 ... a6k ] [ b60 b61 b62 b63 b64 b65 b66 b67 ... b6k ] [ c60 c61 c62 c63 c64 c65 c66 c67 ] [ a70 a71 a72 a73 a74 a75 a76 a77 ... a7k ] [ b70 b71 b72 b73 b74 b75 b76 b77 ... b7k ] [ c70 c71 c72 c73 c74 c75 c76 c77 ] ``` 下面的計算方式可以觀察到,若要計算出 C 矩陣的第一列 (c00 ~ c07), a00 ~ a0k 會被重複使用 8 次,所以我們可以用 `__m256 a0 = _mm256_set1_ps()` 存 8 個一樣的 a 值,然後分別與 b00 ~ b70 去計算 k = 0 下 c00 ~ c07 的值,然後透過 for loop 將所有 k 計算的結果相加就是 c00 ~ c07 的值。 那其他列計算的方式也跟第一列一模一樣,所以我們可以透過手動 loop unrooling 一次計算 8 列,這樣的方式可以從過去計算單 1 個數值優化成一次計算 64 個數值,大幅增加效能。 ``` c00 = a00·b00 + a01·b01 + a02·b02 + a03·b03 + a04·b04 + a05·b05 + a06·b06 + a07·b07 + ... + a0k·b0k c01 = a00·b10 + a01·b11 + a02·b12 + a03·b13 + a04·b14 + a05·b15 + a06·b16 + a07·b17 + ... + a0k·b1k c02 = a00·b20 + a01·b21 + a02·b22 + a03·b23 + a04·b24 + a05·b25 + a06·b26 + a07·b27 + ... + a0k·b2k c03 = a00·b30 + a01·b31 + a02·b32 + a03·b33 + a04·b34 + a05·b35 + a06·b36 + a07·b37 + ... + a0k·b3k c04 = a00·b40 + a01·b41 + a02·b42 + a03·b43 + a04·b44 + a05·b45 + a06·b46 + a07·b47 + ... + a0k·b4k c05 = a00·b50 + a01·b51 + a02·b52 + a03·b53 + a04·b54 + a05·b55 + a06·b56 + a07·b57 + ... + a0k·b5k c06 = a00·b60 + a01·b61 + a02·b62 + a03·b63 + a04·b64 + a05·b65 + a06·b66 + a07·b67 + ... + a0k·b6k c07 = a00·b70 + a01·b71 + a02·b72 + a03·b73 + a04·b74 + a05·b75 + a06·b76 + a07·b77 + ... + a0k·b7k c10 = a10·b00 + a11·b01 + a12·b02 + a13·b03 + a14·b04 + a15·b05 + a16·b06 + a17·b07 + ... + a1k·b0k c11 = a10·b10 + a11·b11 + a12·b12 + a13·b13 + a14·b14 + a15·b15 + a16·b16 + a17·b17 + ... + a1k·b1k c12 = a10·b20 + a11·b21 + a12·b22 + a13·b23 + a14·b24 + a15·b25 + a16·b26 + a17·b27 + ... + a1k·b2k c13 = a10·b30 + a11·b31 + a12·b32 + a13·b33 + a14·b34 + a15·b35 + a16·b36 + a17·b37 + ... + a1k·b3k c14 = a10·b40 + a11·b41 + a12·b42 + a13·b43 + a14·b44 + a15·b45 + a16·b46 + a17·b47 + ... + a1k·b4k c15 = a10·b50 + a11·b51 + a12·b52 + a13·b53 + a14·b54 + a15·b55 + a16·b56 + a17·b57 + ... + a1k·b5k c16 = a10·b60 + a11·b61 + a12·b62 + a13·b63 + a14·b64 + a15·b65 + a16·b66 + a17·b67 + ... + a1k·b6k c17 = a10·b70 + a11·b71 + a12·b72 + a13·b73 + a14·b74 + a15·b75 + a16·b76 + a17·b77 + ... + a1k·b7k ``` ```diff static inline void mm_tile(const task_t *task) { - printf("task->n_k:%ld\n ",task->n_k); for (size_t ti = 0; ti < TILE_SIZE; ti += MICRO_TILE) { for (size_t tj = 0; tj < TILE_SIZE; tj += MICRO_TILE) { - float sum[MICRO_TILE][MICRO_TILE] = {0}; - for (size_t k = 0; k < task->n_k; k++) { - for (size_t i = 0; i < MICRO_TILE; i++) { - float a = task->A[(ti + i) * task->stride_a + k]; - for (size_t j = 0; j < MICRO_TILE; j++){ + + __m256 c[MICRO_TILE]; + for (int v = 0; v < MICRO_TILE; ++v) c[v] = _mm256_setzero_ps(); + + for (size_t k = 0; k < task->n_k; ++k) { + /* 1. broadcast A[ti~ti+7]][k] */ + __m256 a0 = _mm256_set1_ps(task->A[(ti + 0) * task->stride_a + k]); + __m256 a1 = _mm256_set1_ps(task->A[(ti + 1) * task->stride_a + k]); + __m256 a2 = _mm256_set1_ps(task->A[(ti + 2) * task->stride_a + k]); + __m256 a3 = _mm256_set1_ps(task->A[(ti + 3) * task->stride_a + k]); + __m256 a4 = _mm256_set1_ps(task->A[(ti + 4) * task->stride_a + k]); + __m256 a5 = _mm256_set1_ps(task->A[(ti + 5) * task->stride_a + k]); + __m256 a6 = _mm256_set1_ps(task->A[(ti + 6) * task->stride_a + k]); + __m256 a7 = _mm256_set1_ps(task->A[(ti + 7) * task->stride_a + k]); + + /* 2. 對固定 k,把 B 的 8 個 column 分別取出 */ + const float *baseB = task->B + k; + const size_t sb = task->stride_b; - sum[i][j] += a * task->B[(tj + j) * task->stride_b + k]; - } - - } - } - for (size_t i = 0; i < MICRO_TILE; i++) { - for (size_t j = 0; j < MICRO_TILE; j++) - task->C[(ti + i) * task->stride_c + (tj + j)] = sum[i][j]; + //倒過來擺放的原因是 _mm256_set_ps 的設計是高位在前,低位在後,所以它的結構是 b[7]~b[0],不是 b[0]~b[7] + __m256 b = _mm256_set_ps( + baseB[(tj + 7) * sb], + baseB[(tj + 6) * sb], + baseB[(tj + 5) * sb], + baseB[(tj + 4) * sb], + baseB[(tj + 3) * sb], + baseB[(tj + 2) * sb], + baseB[(tj + 1) * sb], + baseB[(tj + 0) * sb]); + + /* 3. FMA accumulate */ + c[0] = _mm256_fmadd_ps(a0, b, c[0]); + c[1] = _mm256_fmadd_ps(a1, b, c[1]); + c[2] = _mm256_fmadd_ps(a2, b, c[2]); + c[3] = _mm256_fmadd_ps(a3, b, c[3]); + c[4] = _mm256_fmadd_ps(a4, b, c[4]); + c[5] = _mm256_fmadd_ps(a5, b, c[5]); + c[6] = _mm256_fmadd_ps(a6, b, c[6]); + c[7] = _mm256_fmadd_ps(a7, b, c[7]); } + + /* 4. store back 8×8 */ + for (int v = 0; v < 8; ++v) + _mm256_storeu_ps(&task->C[(ti + v) * task->stride_c + tj], c[v]); } } } + ``` 看實驗成果,可以發現改成支援 SIMD 架構後, throughput 大副度地提昇。 ![image](https://hackmd.io/_uploads/rJVTmU_Beg.png) ### 記錄問題 * main.c `pthread_cond_broadcast(&pool->all_done)` 的部分應該在呼叫之前要先取得對應 mutex lock,避免 lost wake-up 的問題。 [chage.diff](https://gist.github.com/jserv/278cf8b489a7ba5cd510b02b9f6dac19) >Calling pthread_cond_signal() or pthread_cond_broadcast() when the thread does not hold the mutex lock associated with the condition can lead to lost wake-up bugs. [Oracle Solaris Multithreaded Programming Guide. “Lost Wake-Up Problem”](https://docs.oracle.com/cd/E19455-01/806-5257/sync-30/index.html?utm_source=chatgpt.com) > The pthread_cond_broadcast() or pthread_cond_signal() functions may be called by a thread whether or not it currently owns the mutex that threads calling pthread_cond_wait() or pthread_cond_timedwait() have associated with the condition variable during their waits; however, if predictable scheduling behavior is required, then that mutex shall be locked by the thread calling pthread_cond_broadcast() or pthread_cond_signal(). — POSIX.1-2017, [The Open Group Base Specification, Issue 7 (POSIX.1-2017).](https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_cond_broadcast.html) ## TODO: 重作[第 11 週測驗題](https://hackmd.io/@sysprog/linux2025-quiz11) > 包含延伸問題