> []# Linux 核心專題: 並行程式設計相關測驗題
> 執行人: TonyLinX
### Reviewed by `eleanorLYJ`
> 不知道為什麼使用了 lock free 的方式,卻效果更差了
可能是 false sharing 的問題? 你是否使用過 padding 或 alignment 等方式解決?
### Reviewed by `fcu-D0812998`
>排成演算法 : 要把哪些任務分配給哪個 ring buffer 很重要。
### Reviewed by `HeLunWu0317`
>TODO: 重作第 12 週測驗題
`enqueue()`好像沒有對 queue full 的情況下做預防?
## TODO: 重作[第 15 週測驗題](https://hackmd.io/@sysprog/linux2025-quiz15)
> 包含延伸問題
## TODO: 重作[第 12 週測驗題](https://hackmd.io/@sysprog/linux2025-quiz12)
> 著重 lock-based vs. lock-free + thread pool
> 包含延伸問題
[期末專題解說影片](https://youtu.be/orq81p7OMIY)
[Github](https://github.com/TonyLinX/GEMM)
### 運作原理 (`unoptimized.c`)
以下程式是實作一個 64×64 方塊 (tile) 為單位的矩陣乘法 (GEMM): (unoptimized.c)
$$
C_{i,j} \;=\; \sum_{k=0}^{n-1} A_{i,k}\,B_{k,j}
$$
#### Tile-based GEMM 示例
以 $4\times4$ 的矩陣為例,若我們設定 tile(也是 micro tile)大小為 $2\times2$,則矩陣會被劃分成 $2\times2$ 的 tile,如下所示:
$$
\begin{aligned}
C_{00} = A_{00}B_{00} + A_{01}B_{10} \\
C_{01} = A_{00}B_{01} + A_{01}B_{11} \\
C_{10} = A_{10}B_{00} + A_{11}B_{10} \\
C_{11} = A_{10}B_{01} + A_{11}B_{11}
\end{aligned}
$$
其中,$A_{ij}$ 表示矩陣 $A$ 中第 $i$ 行第 $j$ 欄的 tile,tile 的尺寸為 $2 \times 2$。
#### 條件式選擇:`mm_tile` vs `mm_edge`
在實際程式運行中,對於每個 tile 區塊,會先檢查該區塊是否為完整 tile:
* 若是完整的 tile,便可直接使用固定大小的 `mm_tile` 進行矩陣運算。
* 若不是完整的 tile,則為了避免越界,會使用支援任意大小的 `mm_edge` 進行矩陣運算。
舉例來說,若欲計算 $3\times3\times3$ 的矩陣乘法,在最後一行與最後一列上便無法構成 $2\times2$ 的完整 tile,因此這些區域將交由 `mm_edge` 處理。
```c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#define TILE_SIZE 64
static inline void mm_tile(float *A,
float *B,
float *C,
size_t stride_a,
size_t stride_b,
size_t stride_c)
{
for (size_t i = 0; i < TILE_SIZE; i++) {
for (size_t j = 0; j < TILE_SIZE; j++) {
const size_t ic = i * stride_c + j;
for (size_t k = 0; k < TILE_SIZE; k++) {
const size_t ia = i * stride_a + k;
const size_t ib = k * stride_b + j;
C[ic] += A[ia] * B[ib];
}
}
}
}
static inline void mm_edge(float *A,
float *B,
float *C,
size_t stride_a,
size_t stride_b,
size_t stride_c,
size_t tile_m,
size_t tile_n,
size_t tile_p)
{
for (size_t i = 0; i < tile_m; i++) {
for (size_t j = 0; j < tile_p; j++) {
const size_t ic = i * stride_c + j;
for (size_t k = 0; k < tile_n; k++) {
const size_t ia = i * stride_a + k;
const size_t ib = k * stride_b + j;
C[ic] += A[ia] * B[ib];
}
}
}
}
void mm(float *A, float *B, float *C, size_t m, size_t n, size_t p)
{
const size_t inm = m - m % TILE_SIZE;
const size_t inn = n - n % TILE_SIZE;
const size_t inp = p - p % TILE_SIZE;
memset(C, 0, m * p * sizeof(float));
for (size_t i = 0; i < m; i += TILE_SIZE) {
for (size_t j = 0; j < p; j += TILE_SIZE) {
const size_t ic = i * p + j;
for (size_t k = 0; k < n; k += TILE_SIZE) {
const size_t ia = i * n + k;
const size_t ib = k * p + j;
if (i < inm && j < inp && k < inn) {
mm_tile(A + ia, B + ib, C + ic, n, p, p);
} else {
const size_t tile_m =
(i + TILE_SIZE > m) ? (m - i) : TILE_SIZE;
const size_t tile_n =
(k + TILE_SIZE > n) ? (n - k) : TILE_SIZE;
const size_t tile_p =
(j + TILE_SIZE > p) ? (p - j) : TILE_SIZE;
mm_edge(A + ia, B + ib, C + ic, n, p, p, tile_m, tile_n,
tile_p);
}
}
}
}
}
void fill_rand(float *arr, size_t size)
{
for (size_t i = 0; i < size; i++)
arr[i] = (float) (rand() % 10);
}
void print_mat(const float *mat, size_t m, size_t n)
{
for (size_t i = 0; i < m; i++) {
for (size_t j = 0; j < n; j++) {
printf("%.2f", mat[i * n + j]);
if (j < n - 1)
printf(", ");
}
printf("\n");
}
printf("---\n");
}
int parse_int(const char *str)
{
char *end;
long val = strtol(str, &end, 10);
if (*end || str == end) {
fprintf(stderr, "Invalid integer: '%s'\n", str);
exit(1);
}
return (int) val;
}
int main(int argc, char *argv[])
{
if (argc != 4) {
fprintf(stderr, "Usage: %s <m> <n> <p>\n", argv[0]);
return 1;
}
srand(314);
size_t m = parse_int(argv[1]);
size_t n = parse_int(argv[2]);
size_t p = parse_int(argv[3]);
float *A = malloc(m * n * sizeof(float));
float *B = malloc(n * p * sizeof(float));
float *C = malloc(m * p * sizeof(float));
fill_rand(A, m * n);
fill_rand(B, n * p);
#ifdef VALIDATE
print_mat(A, m, n);
print_mat(B, n, p);
#endif
struct timespec start, end;
clock_gettime(CLOCK_MONOTONIC, &start);
mm(A, B, C, m, n, p);
clock_gettime(CLOCK_MONOTONIC, &end);
#ifndef VALIDATE
double elapsed =
(end.tv_sec - start.tv_sec) + (end.tv_nsec - start.tv_nsec) / 1e9;
printf("%.9f\n", elapsed);
#endif
#ifdef VALIDATE
print_mat(C, m, p);
#endif
free(A);
free(B);
free(C);
return 0;
}
```
### 運作原理 (main.c)
#### Zero padding
##### `ALIGN_UP`
在 main.c 中,將矩陣的尺寸(如 `m`, `n`, `p`)補齊成 `TILE_SIZE = 64` 的倍數,以避免邊界不完整的 tile 出現,進而提升記憶體對齊與計算效率。程式中使用了以下巨集:
```c
#define ALIGN_UP(x) (((x) + TILE_SIZE - 1) & ~(TILE_SIZE - 1))
```
技巧 :
* `((x) + TILE_SIZE - 1)`:確保只要 x 不是 64 的倍數,就會進位到下一個倍數
* ` & ~(TILE_SIZE - 1)`:把最低 6 bits 清 0,讓結果剛好是 64 的倍數
目標 :
* 省去原本在內層迴圈檢查邊界的分支
* 讓所有迴圈上限變成編譯期常數,幫助編譯器展開與向量化
##### `pad_mat` 與 `pad_t_mat`
將原始矩陣複製到新的、經過 zero-padding 的記憶體區塊中,大小為 `TILE_SIZE`(64)的倍數。
`pad_mat()`:針對 A 矩陣的補齊
使用 `aligned_alloc(MEM_ALIGNMENT, ...)` 分配對齊的記憶體(如 64-byte),透過 `memset` 全部填成 0,由於 A 矩陣 是 row-major 所以不需要轉置,直接透過 `memcpy` 把原本第 i 行的 c 個 float 複製到新 buffer 中第 i 行開頭,右邊剩下的自動保留為 0(前面用 memset)。
```c
float *pad_mat(const float *src, size_t r, size_t c, size_t padr, size_t padc)
{
float *dst = aligned_alloc(MEM_ALIGNMENT, padr * padc * sizeof(float));
memset(dst, 0, padr * padc * sizeof(float));
for (size_t i = 0; i < r; i++)
memcpy(dst + i * padc, src + i * c, c * sizeof(float));
return dst;
}
```
`pad_t_mat()`:針對 B 矩陣的轉置與補齊
使用雙層迴圈進行轉置與寫入,把原本的 `B[i][j]` 被寫到 `B_T[j][i]`,
```c
float *pad_t_mat(const float *src, size_t r, size_t c, size_t padr, size_t padc)
{
float *dst = aligned_alloc(MEM_ALIGNMENT, padr * padc * sizeof(float));
memset(dst, 0, padr * padc * sizeof(float));
for (size_t i = 0; i < r; i++)
for (size_t j = 0; j < c; j++)
dst[j * padr + i] = src[i * c + j];
return dst;
}
```
#### Thread pool 初始化階段
##### `init_thread_pool`
建立指定數量的 worker threads,並設定 CPU affinity:
透過 `pthread_create` 建立工作執行緒,並將每個 thread 綁定到指定的 CPU core 上(`pthread_setaffinity_np`)。
```c
for (size_t i = 0; i < num_threads; i++) {
pthread_create(&pool->threads[i], NULL, worker_thread, pool);
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(i % N_CORES, &cpuset);
pthread_setaffinity_np(pool->threads[i], sizeof(cpu_set_t), &cpuset);
}
```
#### 矩陣運算
##### `mm`
把整個矩陣乘法任務,以 tile 為單位拆分成小任務(`task_t`),並丟給 thread pool 執行。這是一種 Non-blocking Dispatch 的方法,也就是把工作派出去的時候,主程式不會停下來等這個工作做完,而是馬上繼續下一件事。當所有工作丟完後,主線程呼叫 `wait_for_completion()` 等待所有 tile 被算完。
```c
void mm(float *A,
float *B,
float *C,
size_t m,
size_t n,
size_t p,
threadpool_t *pool)
{
for (size_t i = 0; i < m; i += TILE_SIZE) {
for (size_t j = 0; j < p; j += TILE_SIZE) {
task_t task = {
.A = A + i * n, //轉成一維的列
.B = B + j * n, //轉成一維的行
.C = C + i * p + j, //轉成一維的列行
.stride_a = n,
.stride_b = n,
.stride_c = p,
.n_k = n,
};
enqueue(pool, task);
}
}
wait_for_completion(pool);
}
```
##### `enqueue`
任務佇列實際上是一個由 `queue_node_t` 組成的單向 linked list,當有新的任務進來會加入尾端並以 `pthread_cond_signal()` 將 `not_empty` 條件變數的 thread 喚醒,通知有新任務可取。
```c
void enqueue(threadpool_t *pool, task_t task)
{
queue_node_t *node = malloc(sizeof(queue_node_t));
*node = (queue_node_t){.task = task, .next = NULL};
pthread_mutex_lock(&pool->lock);
atomic_fetch_add(&pool->tasks_remaining, 1);
if (pool->tail)
pool->tail->next = node;
else
pool->head = node;
pool->tail = node;
//有一條新的任務近來,喚醒一條 worker thread
pthread_cond_signal(&pool->not_empty);
pthread_mutex_unlock(&pool->lock);
}
```
##### `worker_thread`
當 thread 被建立後,會執行 `worker_thread` 函示,若 thread pool 沒有要被 `shutdown`, thread 會先取得鎖,原因是要它可能要讀取和修改 queue 裡面的內容,這些動作必須保證只有一個 thread 在操作。當拿到鎖後,會檢查有沒有任務或thread pool 是否要關閉,如果沒有任務的話就釋放鎖等待別人喚醒(`pthread_cond_wait(&pool->not_empty, &pool->lock)`),如果有任務就取得任務,釋放鎖給其他 thread 使用。任務內容就是調用 `mm_tile` 計算一個 tile,完成後減少`tasks_remaining`;若所有任務還未完成,先完成的 thread,會回到 thread pool 繼續等待,直到有新工作被喚醒。當所有任務皆完成,此 thread 會調用 `pthread_cond_broadcast(&pool->all_done)` 讓主執行緒能被喚醒。
```c
void *worker_thread(void *arg)
{
threadpool_t *pool = arg;
while (!atomic_load(&pool->shutdown)) {
pthread_mutex_lock(&pool->lock);
while (!pool->head && !atomic_load(&pool->shutdown))
pthread_cond_wait(&pool->not_empty, &pool->lock);
//如果是被「關閉通知」喚醒,那這條 thread 不該再做事,直接退出:
if (atomic_load(&pool->shutdown)) {
pthread_mutex_unlock(&pool->lock);
return NULL;
}
queue_node_t *node = pool->head;
pool->head = node->next;
if (!pool->head)
pool->tail = NULL;
pthread_mutex_unlock(&pool->lock);
mm_tile(&node->task);
free(node);
//任務數量減 1,使用 atomic 確保 thread-safe
atomic_fetch_sub(&pool->tasks_remaining, 1);
if (atomic_load(&pool->tasks_remaining) == 0)
pthread_cond_broadcast(&pool->all_done);
}
return NULL;
}
```
#### 工作完成與喚醒機制
##### `wait_for_completion`
主函式在 `mm` 裡 `enqueue` 完全部工作後,呼叫 `wait_for_completion`。
```c
void wait_for_completion(threadpool_t *pool)
{
pthread_mutex_lock(&pool->lock);
while (atomic_load(&pool->tasks_remaining) > 0)
pthread_cond_wait(&pool->all_done, &pool->lock);
pthread_mutex_unlock(&pool->lock);
}
```
#### 釋放資源
##### `destroy_thread_pool`
使用 `atomic_store(&pool->shutdown, true)` 將 shutdown 旗標設為 true。由於這個變數是被多個 thread 共用的,因此使用 atomic 操作可以避免資料競爭,並確保每個 worker thread 都能正確且同步地感知「要結束了」的訊號。
有些 worker 可能正在 `pthread_cond_wait(¬_empty)` 上等待(因為 queue 暫時是空的),這時呼叫 `pthread_cond_broadcast(¬_empty)` 將它們全部喚醒。這樣它們會醒來後發現 `shutdown == true`,並跳出工作迴圈結束自己。
使用 `pthread_join` 等待所有 thread 都完成並返回。這一步確保 worker threads 不會在後面我們開始釋放記憶體的時候還在執行,避免 use-after-free。最後釋放掉 `pool->threads` 所使用的記憶體,並呼叫 `pthread_mutex_destroy`、`pthread_cond_destroy` 釋放互斥鎖與條件變數。
### 效能缺失的部分
#### main.c

在原本的 main.c 中,thread pool 的實作是 lock-based:使用 `pthread_mutex_t` 搭配 `pthread_cond_t` 來保護與同步任務佇列(linked list 實作),此設計雖然正確進行矩陣運算,但是在每次工作執行緒在取任務(`worker_thread`)與主執行緒在新增任務(`enqueue`)都需進入相同的臨界區。大量執行緒同時競爭此鎖會造成等待,導致效能缺失。以下程式為 `main.c` 中, `enqueue` 與 `worker_thread` 的設計,使用同一把鎖 `pool->lock` 保護佇列的增減操作::
```c
void enqueue(threadpool_t *pool, task_t task)
{
queue_node_t *node = malloc(sizeof(queue_node_t));
*node = (queue_node_t){.task = task, .next = NULL};
pthread_mutex_lock(&pool->lock);
atomic_fetch_add(&pool->tasks_remaining, 1);
if (pool->tail)
pool->tail->next = node;
else
pool->head = node;
pool->tail = node;
//有一條新的任務近來,喚醒一條 worker thread
pthread_cond_signal(&pool->not_empty);
pthread_mutex_unlock(&pool->lock);
}
```
```c
{
threadpool_t *pool = arg;
while (!atomic_load(&pool->shutdown)) {
pthread_mutex_lock(&pool->lock);
while (!pool->head && !atomic_load(&pool->shutdown))
pthread_cond_wait(&pool->not_empty, &pool->lock);
//如果是被「關閉通知」喚醒,那這條 thread 不該再做事,直接退出:
if (atomic_load(&pool->shutdown)) {
pthread_mutex_unlock(&pool->lock);
return NULL;
}
queue_node_t *node = pool->head;
pool->head = node->next;
if (!pool->head)
pool->tail = NULL;
pthread_mutex_unlock(&pool->lock);
mm_tile(&node->task);
free(node);
//任務數量減 1,使用 atomic 確保 thread-safe
atomic_fetch_sub(&pool->tasks_remaining, 1);
if (atomic_load(&pool->tasks_remaining) == 0){
pthread_mutex_lock(&pool->lock);
pthread_cond_broadcast(&pool->all_done);
pthread_mutex_unlock(&pool->lock);
}
}
return NULL;
}
```
參考 [並行程式設計: Lock-Free Programming](https://hackmd.io/@sysprog/concurrency/%2F%40sysprog%2Fconcurrency-lockfree#Lock-Free-Programming) 這篇文章,可以使用 ring buffer 來避免 main thread 放入工作和 worker thread 取出任務的競爭問題,再透過 semaphore 取代 condition variable。
以下的程式碼使用一個 ring buffer 以及 semaphore 的方式實作出 thread pool:
```diff
-typedef struct queue_node_t {
- task_t task;
- struct queue_node_t *next;
-} queue_node_t;
-
typedef struct {
- queue_node_t *head, *tail;
- pthread_mutex_t lock;
- pthread_cond_t not_empty, all_done;
+ task_t *tasks; // ring buffer of tasks
+ size_t capacity; // total slots in ring buffer
+ atomic_size_t head;
+ atomic_size_t tail;
+ sem_t not_empty; // counts available tasks
+ pthread_mutex_t done_lock;
+ pthread_cond_t all_done;
pthread_t *threads;
size_t num_threads;
atomic_int tasks_remaining;
```
```diff
void *worker_thread(void *arg)
{
threadpool_t *pool = arg;
- while (!atomic_load(&pool->shutdown)) {
- pthread_mutex_lock(&pool->lock);
- while (!pool->head && !atomic_load(&pool->shutdown))
- pthread_cond_wait(&pool->not_empty, &pool->lock);
-
- //如果是被「關閉通知」喚醒,那這條 thread 不該再做事,直接退出:
- if (atomic_load(&pool->shutdown)) {
- pthread_mutex_unlock(&pool->lock);
- return NULL;
- }
-
- queue_node_t *node = pool->head;
- pool->head = node->next;
- if (!pool->head)
- pool->tail = NULL;
- pthread_mutex_unlock(&pool->lock);
+ while (true) {
+ sem_wait(&pool->not_empty);
+ if (atomic_load(&pool->shutdown))
+ break;
- mm_tile(&node->task);
- free(node);
+ size_t idx = atomic_fetch_add(&pool->head, 1) % pool->capacity;
+ task_t task = pool->tasks[idx];
- //任務數量減 1,使用 atomic 確保 thread-safe
+ mm_tile(&task);
atomic_fetch_sub(&pool->tasks_remaining, 1);
-
- /*如果這條 thread 完成了最後一個任務,通知所有
- 在主程式裡等 wait_for_completion() 的 thread,
- 因為有很多thread 可能同時在等待某一個thread完成
- 最後一個任務,所以要用 pthread_cond_broadcast,
- 不像 pthread_cond_signal 只會喚醒一個 thread。
- */
- if (atomic_load(&pool->tasks_remaining) == 0)
+ if (atomic_load(&pool->tasks_remaining) == 0) {
+ pthread_mutex_lock(&pool->done_lock);
pthread_cond_broadcast(&pool->all_done);
+ pthread_mutex_unlock(&pool->done_lock);
+ }
}
return NULL;
}
```
```diff
void enqueue(threadpool_t *pool, task_t task)
{
- queue_node_t *node = malloc(sizeof(queue_node_t));
- *node = (queue_node_t){.task = task, .next = NULL};
-
- pthread_mutex_lock(&pool->lock);
+ size_t idx = atomic_fetch_add(&pool->tail, 1) % pool->capacity;
+ pool->tasks[idx] = task;
atomic_fetch_add(&pool->tasks_remaining, 1);
- if (pool->tail)
- pool->tail->next = node;
- else
- pool->head = node;
- pool->tail = node;
-
- //有一條新的任務近來,喚醒一條 worker thread
- pthread_cond_signal(&pool->not_empty);
- pthread_mutex_unlock(&pool->lock);
+ sem_post(&pool->not_empty);
}
```
#### 效能比較
下圖是比較 lock-based 以及 lock free 的 throughput,可以觀察到目前只使用一個 ring buffer 的方式,效能會比使用 lock-based 還差。

可以發現 lock-free 的效果更差了,雖然本實作採用 lock-free ring buffer,理論上減少了鎖競爭,但所有 worker threads 仍需原子性競爭同一個 head 變數,這會導致在多執行緒下產生嚴重的 cache line ping-pong 現象。一次只能有一個 worker 成功取得任務,其餘則進入休眠,造成 thread utilization 下降,反而讓 lock-free 版本效能不如 lock-based。
>Cache line ping-pong 指多核心同時頻繁寫入同一塊 cache line(如 atomic 操作共享變數),導致該 cache line 在各核心間不停搬移,嚴重拖慢效能。這種現象屬於 cache false sharing 的極端情形。
為了解決這個問題,我實作了一個 worker 對應一個 queue 的改版設計,每個 thread 有自己獨立的 ring buffer 和 semaphore,任務透過 round-robin 分發,這樣 worker 只需操作自己的 queue,不會再有一堆 worker 搶一個共同變數的問題。
### `lockfree_rr.c`
```diff
typedef struct {
- task_t *tasks; // ring buffer of tasks
- size_t capacity; // total slots in ring buffer
- atomic_size_t head; // consumer index
- atomic_size_t tail; // producer index
- sem_t not_empty; // counts available tasks
+ task_t *tasks; // ring buffer of tasks
+ size_t capacity; // slots per ring buffer
+ atomic_size_t head; // consumer index
+ atomic_size_t tail; // producer index
+ sem_t sem; // counts available tasks
+} ring_buffer_t;
+
+typedef struct {
+ ring_buffer_t *queues; // array of per-thread queues
+ pthread_t *threads; // worker threads
+ size_t num_threads; // number of workers
+ atomic_size_t next_queue; // for round-robin dispatch
pthread_mutex_t done_lock;
pthread_cond_t all_done;
- pthread_t *threads;
- size_t num_threads;
- atomic_int tasks_remaining;
+ atomic_int tasks_remaining; // across all queues
_Atomic bool shutdown;
} threadpool_t;
}
}
+typedef struct {
+ threadpool_t *pool;
+ size_t index;
+} worker_arg_t;
+
void *worker_thread(void *arg)
{
- threadpool_t *pool = arg;
+ worker_arg_t *warg = arg;
+ threadpool_t *pool = warg->pool;
+ size_t idx_q = warg->index;
+ ring_buffer_t *queue = &pool->queues[idx_q];
+ free(warg);
+
while (true) {
- sem_wait(&pool->not_empty);
+ sem_wait(&queue->sem);
if (atomic_load(&pool->shutdown))
break;
- size_t idx = atomic_fetch_add(&pool->head, 1) % pool->capacity;
- task_t task = pool->tasks[idx];
+ size_t idx = atomic_fetch_add(&queue->head, 1) % queue->capacity;
+ task_t task = queue->tasks[idx];
mm_tile(&task);
atomic_fetch_sub(&pool->tasks_remaining, 1);
*pool = (threadpool_t){
.num_threads = num_threads,
.threads = malloc(num_threads * sizeof(pthread_t)),
- .capacity = capacity,
- .tasks = malloc(capacity * sizeof(task_t)),
+ .queues = calloc(num_threads, sizeof(ring_buffer_t)),
};
- atomic_init(&pool->head, 0);
- atomic_init(&pool->tail, 0);
- sem_init(&pool->not_empty, 0, 0);
+ atomic_init(&pool->next_queue, 0);
pthread_mutex_init(&pool->done_lock, NULL);
pthread_cond_init(&pool->all_done, NULL);
for (size_t i = 0; i < num_threads; i++) {
- pthread_create(&pool->threads[i], NULL, worker_thread, pool);
+ ring_buffer_t *q = &pool->queues[i];
+ q->capacity = capacity;
+ q->tasks = calloc(capacity, sizeof(task_t));
+ atomic_init(&q->head, 0);
+ atomic_init(&q->tail, 0);
+ sem_init(&q->sem, 0, 0);
+
+ worker_arg_t *warg = malloc(sizeof(worker_arg_t));
+ *warg = (worker_arg_t){.pool = pool, .index = i};
+ pthread_create(&pool->threads[i], NULL, worker_thread, warg);
+
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(i % N_CORES, &cpuset);
void enqueue(threadpool_t *pool, task_t task)
{
- size_t idx = atomic_fetch_add(&pool->tail, 1) % pool->capacity;
- pool->tasks[idx] = task;
+ size_t qid = atomic_fetch_add(&pool->next_queue, 1) % pool->num_threads;
+ ring_buffer_t *q = &pool->queues[qid];
+
+ size_t idx = atomic_fetch_add(&q->tail, 1) % q->capacity;
+ q->tasks[idx] = task;
atomic_fetch_add(&pool->tasks_remaining, 1);
- sem_post(&pool->not_empty);
+ sem_post(&q->sem);
}
void wait_for_completion(threadpool_t *pool)
{
atomic_store(&pool->shutdown, true);
for (size_t i = 0; i < pool->num_threads; i++)
- sem_post(&pool->not_empty); // wake workers
+ sem_post(&pool->queues[i].sem); // wake workers
for (size_t i = 0; i < pool->num_threads; i++)
pthread_join(pool->threads[i], NULL);
- free(pool->tasks);
+ for (size_t i = 0; i < pool->num_threads; i++) {
+ ring_buffer_t *q = &pool->queues[i];
+ free(q->tasks);
+ sem_destroy(&q->sem);
+ }
+ free(pool->queues);
free(pool->threads);
- sem_destroy(&pool->not_empty);
pthread_mutex_destroy(&pool->done_lock);
pthread_cond_destroy(&pool->all_done);
}
```
#### 實驗比較 lockfree vs. lockfree_rr
由下面的實驗發現 `lockfree_rr` 完成整個矩陣運算的任務所需的時間,比 `lockfree` 還高。
```
root@tonylinx:/home/tonylinx/linux2025/GEMM# perf stat -r 10 -e cache-misses,cs ./build/lockfree_bench 2048 2048 2048
Time: 0.415489 sec
Time: 0.414018 sec
Time: 0.423264 sec
Time: 0.433853 sec
Time: 0.414566 sec
Time: 0.429556 sec
Time: 0.418634 sec
Time: 0.427258 sec
Time: 0.421059 sec
Time: 0.419739 sec
Performance counter stats for './build/lockfree_bench 2048 2048 2048' (10 runs):
7,599,301 cache-misses ( +- 3.41% )
134 cs ( +- 2.45% )
0.58900 +- 0.00256 seconds time elapsed ( +- 0.43% )
```
```
root@tonylinx:/home/tonylinx/linux2025/GEMM# perf stat -r 10 -e cache-misses,cs ./build/lockfree_rr_bench 2048 2048 2048
Time: 0.550715 sec
Time: 0.528759 sec
Time: 0.540224 sec
Time: 0.531338 sec
Time: 0.519645 sec
Time: 0.529487 sec
Time: 0.552652 sec
Time: 0.543132 sec
Time: 0.524136 sec
Time: 0.541093 sec
Performance counter stats for './build/lockfree_rr_bench 2048 2048 2048' (10 runs):
9,591,320 cache-misses ( +- 2.58% )
124 cs ( +- 2.88% )
0.70733 +- 0.00501 seconds time elapsed ( +- 0.71% )
```
問題出在,任務是以 round-robin 方式均勻分派,導致即使某些 worker 剛進入休眠狀態(因為 queue 空了),下一筆任務卻很可能馬上又被分配給它,造成頻繁的睡眠 → 喚醒循環。
```c
void *worker_thread(void *arg)
{
worker_arg_t *warg = arg;
threadpool_t *pool = warg->pool;
size_t idx_q = warg->index;
ring_buffer_t *queue = &pool->queues[idx_q];
free(warg);
while (true) {
sem_wait(&queue->sem);
if (atomic_load(&pool->shutdown))
break;
size_t idx = atomic_fetch_add(&queue->head, 1) % queue->capacity;
task_t task = queue->tasks[idx];
mm_tile(&task);
atomic_fetch_sub(&pool->tasks_remaining, 1);
if (atomic_load(&pool->tasks_remaining) == 0) {
pthread_mutex_lock(&pool->done_lock);
pthread_cond_broadcast(&pool->all_done);
pthread_mutex_unlock(&pool->done_lock);
}
}
return NULL;
}
```
所以我加入一段 spin-before-sleep 的設計,先嘗試「忙等」(spinning)一段時間,以避免 thread 一有工作就睡、馬上又被喚醒。
```diff
#include <string.h>
#include <unistd.h>
+#define SPIN_LIMIT 1024
#define TILE_SIZE 64
#define MICRO_TILE 8
#define MEM_ALIGNMENT 64
#endif
#define ALIGN_UP(x) (((x) + TILE_SIZE - 1) & ~(TILE_SIZE - 1))
-
+static inline void cpu_relax(void) {
+#if defined(__x86_64__) || defined(__i386__)
+ __asm__ __volatile__("pause");
+#else
+ sched_yield();
+#endif
+}
typedef struct {
float *A, *B, *C;
size_t stride_a, stride_b, stride_c;
free(warg);
while (true) {
- sem_wait(&queue->sem);
- if (atomic_load(&pool->shutdown))
- break;
+ int spun = 0;
+ while (spun < SPIN_LIMIT) {
+ /* 嘗試非阻塞地取 semaphore;成功就開始工作 */
+ if (sem_trywait(&queue->sem) == 0)
+ break;
+
+ /* 如果主線程要求關閉,直接跳出去 */
+ if (atomic_load_explicit(&pool->shutdown, memory_order_relaxed))
+ return NULL;
+ cpu_relax();
+ ++spun;
+ }
+
+ if (spun == SPIN_LIMIT) {
+ /* 阻塞直到有工作或收到 shutdown 訊號 */
+ sem_wait(&queue->sem);
+ if (atomic_load_explicit(&pool->shutdown, memory_order_relaxed))
+ return NULL;
+ }
```
#### 實驗比較有無 spin-before-sleep
由下面的實驗可以觀察到矩陣運所需所需時間從原本 0.5 掉到 0.4。但是跟 `lockfree` 沒有明顯優勢,還不是很好。
```
root@tonylinx:/home/tonylinx/linux2025/GEMM# perf stat -r 10 -e cache-misses,cs ./build/lockfree_rr_bench 2048 2048 2048
Time: 0.429622 sec
Time: 0.429666 sec
Time: 0.429272 sec
Time: 0.432098 sec
Time: 0.429748 sec
Time: 0.429853 sec
Time: 0.415347 sec
Time: 0.412105 sec
Time: 0.417842 sec
Time: 0.407290 sec
Performance counter stats for './build/lockfree_rr_bench 2048 2048 2048' (10 runs):
8,476,478 cache-misses ( +- 3.14% )
113 cs ( +- 4.32% )
0.59123 +- 0.00465 seconds time elapsed ( +- 0.79% )
```
```
root@tonylinx:/home/tonylinx/linux2025/GEMM# perf stat -r 10 -e cache-misses,cs ./build/lockfree_bench 2048 2048 2048
Time: 0.409975 sec
Time: 0.425815 sec
Time: 0.426494 sec
Time: 0.416542 sec
Time: 0.416934 sec
Time: 0.419112 sec
Time: 0.417866 sec
Time: 0.420643 sec
Time: 0.418282 sec
Time: 0.433788 sec
Performance counter stats for './build/lockfree_bench 2048 2048 2048' (10 runs):
7,957,176 cache-misses ( +- 2.26% )
136 cs ( +- 2.59% )
0.58842 +- 0.00255 seconds time elapsed ( +- 0.43% )
```
在前面的設計中,我們會讓沒有任務的 worker 進入睡眠,但這樣的做法有一個問題:有些任務的執行時間比較長,如果某些 worker 分到這些重任務,就會忙得要命;而其他沒有任務的 worker 卻早早進入睡眠,這樣就會造成負載不平衡,導致整體效率下降。為了解決這個問題,引入 work stealing 策略。以下的設計讓每個 worker 優先處理自己 queue 的任務,若無任務可取,則進入一段 spin + work stealing 階段,主動嘗試從其他 worker 的 queue 中竊取任務。只有當多次嘗試皆失敗後,才會進入 sleep 狀態等待喚醒,避免頻繁的休眠與喚醒造成效能瓶頸。
```diff
+static inline bool try_dequeue_task(ring_buffer_t *q, task_t *out)
+{
+ if (sem_trywait(&q->sem) != 0) /* 目前沒任務 */
+ return false;
+
+ size_t idx = atomic_fetch_add_explicit(&q->head, 1,
+ memory_order_relaxed) % q->capacity;
+ *out = q->tasks[idx];
+ return true;
+}
void *worker_thread(void *arg)
{
- worker_arg_t *warg = arg;
- threadpool_t *pool = warg->pool;
- size_t idx_q = warg->index;
- ring_buffer_t *queue = &pool->queues[idx_q];
+ worker_arg_t *warg = arg;
+ threadpool_t *pool = warg->pool;
+ size_t selfID = warg->index;
+ ring_buffer_t *selfQ = &pool->queues[selfID];
free(warg);
- while (true) {
- int spun = 0;
- while (spun < SPIN_LIMIT) {
- if (sem_trywait(&queue->sem) == 0)
- break;
+ task_t task;
+
+ for (;;) {
+
+ if (try_dequeue_task(selfQ, &task))
+ goto got_job;
+
+ for (int spin = 0; spin < SPIN_LIMIT; ++spin) {
+
+ for (size_t off = 1; off < pool->num_threads; ++off) {
+ size_t victimID = (selfID + off) % pool->num_threads;
+ ring_buffer_t *vQ = &pool->queues[victimID];
+ if (try_dequeue_task(vQ, &task))
+ goto got_job;
+ }
+
if (atomic_load_explicit(&pool->shutdown, memory_order_relaxed))
return NULL;
cpu_relax();
- ++spun;
}
- if (spun == SPIN_LIMIT) {
- sem_wait(&queue->sem);
- if (atomic_load_explicit(&pool->shutdown, memory_order_relaxed))
- return NULL;
- }
- size_t idx = atomic_fetch_add(&queue->head, 1) % queue->capacity;
- task_t task = queue->tasks[idx];
-
+ sem_wait(&selfQ->sem);// 阻塞等待自己 queue 被喚醒
+
+ if (atomic_load_explicit(&pool->shutdown, memory_order_relaxed))
+ return NULL;
+
+ size_t idx = atomic_fetch_add_explicit(&selfQ->head, 1,memory_order_relaxed) % selfQ->capacity;
+ task = selfQ->tasks[idx];
+
+got_job:
mm_tile(&task);
atomic_fetch_sub(&pool->tasks_remaining, 1);
if (atomic_load(&pool->tasks_remaining) == 0) {
pthread_mutex_lock(&pool->done_lock);
```
#### 實驗比較有無 work stealing
由下面實驗,可以觀察到有了 work stealing 原本矩陣運算的時間可以下降 0.02 sec,從這個設計開始 lockfree_rr 明顯優於 lockfree 的版本,但是還是輸 lock-based。
```
root@tonylinx:/home/tonylinx/linux2025/GEMM# perf stat -r 10 -e cache-misses,cs ./build/lockfree_rr_bench 2048 2048 2048
Time: 0.393714 sec
Time: 0.389105 sec
Time: 0.384914 sec
Time: 0.398160 sec
Time: 0.391590 sec
Time: 0.385851 sec
Time: 0.400985 sec
Time: 0.386179 sec
Time: 0.380247 sec
Time: 0.396653 sec
Performance counter stats for './build/lockfree_rr_bench 2048 2048 2048' (10 runs):
8,959,584 cache-misses ( +- 3.23% )
149 cs ( +- 3.55% )
0.55866 +- 0.00290 seconds time elapsed ( +- 0.52%
```
```
root@tonylinx:/home/tonylinx/linux2025/GEMM# perf stat -r 10 -e cache-misses,cs ./build/main_bench 2048 2048 2048
Time: 0.365671 sec
Time: 0.372024 sec
Time: 0.365962 sec
Time: 0.370060 sec
Time: 0.366794 sec
Time: 0.371532 sec
Time: 0.368807 sec
Time: 0.374896 sec
Time: 0.371446 sec
Time: 0.375241 sec
Performance counter stats for './build/main_bench 2048 2048 2048' (10 runs):
6,682,440 cache-misses ( +- 4.49% )
137 cs ( +- 1.89% )
0.53518 +- 0.00230 seconds time elapsed ( +- 0.43% )
```
這問題出在,原本的 work stealing 設計中,worker 每次僅從其他 queue 中竊取一個任務,當任務不平均時,容易導致某些 thread 不斷嘗試偷任務、造成高頻率的 stealing 行為。為了減少竊取次數,我設計了 batch stealing 機制 `steal_batch`,讓一個 thread 一次能偷取 `STEAL_CHUNK`(這裡根據實驗設為 3)個任務,並暫存在`steal_buf[]` 中。worker thread 會依序執行 `steal_buf` 中的任務,直到消化完,再重新檢查自己 queue 或再次發動 stealing。
`steal_batch` 的具體設計是先讀取 `head` 與 `tail` 來計算剩餘任務數量,如果剩下的任務數量不夠 `STEAL_CHUNK`,就直接返回 false,不執行偷取。如果有足夠的任務,會先透過 CAS 操作(`atomic_compare_exchange_strong_explicit`)檢查是否有另外一個 worker 同時在偷同一個人的任務,如果成功,代表這段區域我搶到了,就可以安全地從 `tasks[(head + k) % capacity]` 中讀取任務,並且在複製的過程中要減少被偷的 worker 擁有的任務量 (`sem_trywait`)。若 CAS 失敗,表示 `head` 已被其他 thread 偷走(或自己偷了),此時不 retry(因為無法保證是否還有足夠任務),而是直接去嘗試偷其他 worker 的任務。
```diff
-typedef struct {
+typedef struct{
task_t *tasks; // ring buffer of tasks
size_t capacity; // slots per ring buffer
atomic_size_t head; // consumer index
return false;
size_t idx = atomic_fetch_add_explicit(&q->head, 1,
- memory_order_relaxed) % q->capacity;
+ memory_order_relaxed) % q->capacity;
*out = q->tasks[idx];
return true;
}
+
+static bool steal_batch(ring_buffer_t *q, task_t *buf, size_t *n_stolen)
+{
+ size_t head = atomic_load_explicit(&q->head, memory_order_relaxed);
+ size_t tail = atomic_load_explicit(&q->tail, memory_order_acquire);
+
+ size_t available = tail - head;
+ if (available <= STEAL_CHUNK)
+ return false; /* 不夠就別偷 */
+
+ size_t new_head = head + STEAL_CHUNK;
+
+ /* CAS 把 head 往前推 – claim 這一段 */
+ if (atomic_compare_exchange_strong_explicit(
+ &q->head, &head, new_head,
+ memory_order_acquire, memory_order_relaxed))
+ {
+ for (size_t k = 0; k < STEAL_CHUNK; ++k){
+ buf[k] = q->tasks[(head + k) % q->capacity];
+ sem_trywait(&q->sem);
+ }
+
+
+ *n_stolen = STEAL_CHUNK;
+ return true;
+ }
+ return false;
+}
+
static inline void mm_tile(const task_t *task)
{
for (size_t ti = 0; ti < TILE_SIZE; ti += MICRO_TILE) {
ring_buffer_t *selfQ = &pool->queues[selfID];
free(warg);
- task_t task;
-
+ task_t task;
+ task_t steal_buf[STEAL_CHUNK]; // 偷到的任務暫存在這
+ size_t steal_n = 0, steal_pos = 0;
for (;;) {
+ /* 先吃完前面偷到的任務 */
+ if (steal_pos < steal_n) {
+ mm_tile(&steal_buf[steal_pos++]);
+ atomic_fetch_sub(&pool->tasks_remaining, 1);
+ if (atomic_load(&pool->tasks_remaining) == 0) {
+ pthread_mutex_lock(&pool->done_lock);
+ pthread_cond_broadcast(&pool->all_done);
+ pthread_mutex_unlock(&pool->done_lock);
+ }
+ continue;
+ }
+ /* 試著從自己 queue 拿任務 */
if (try_dequeue_task(selfQ, &task))
goto got_job;
+ /* busy-wait + work stealing */
for (int spin = 0; spin < SPIN_LIMIT; ++spin) {
-
- /* 逐個掃描其他 queue */
for (size_t off = 1; off < pool->num_threads; ++off) {
size_t victimID = (selfID + off) % pool->num_threads;
ring_buffer_t *vQ = &pool->queues[victimID];
- if (try_dequeue_task(vQ, &task))
- goto got_job; /* 成功偷到 → 做事 */
+ if (steal_batch(vQ, steal_buf, &steal_n)) {
+ steal_pos = 0;
+ goto continue_loop;
+ }
}
- if (atomic_load_explicit(&pool->shutdown, memory_order_relaxed))
+ if (atomic_load(&pool->shutdown))
return NULL;
cpu_relax();
}
+ /* sleep 等自己 queue 的任務來 */
sem_wait(&selfQ->sem);
-
- if (atomic_load_explicit(&pool->shutdown, memory_order_relaxed))
+ if (atomic_load(&pool->shutdown))
return NULL;
- size_t idx = atomic_fetch_add_explicit(&selfQ->head, 1,memory_order_relaxed) % selfQ->capacity;
+ size_t idx = atomic_fetch_add_explicit(&selfQ->head, 1, memory_order_relaxed) % selfQ->capacity;
task = selfQ->tasks[idx];
-got_job:
+ got_job:
mm_tile(&task);
-
atomic_fetch_sub(&pool->tasks_remaining, 1);
if (atomic_load(&pool->tasks_remaining) == 0) {
pthread_mutex_lock(&pool->done_lock);
pthread_cond_broadcast(&pool->all_done);
pthread_mutex_unlock(&pool->done_lock);
}
+
+ continue_loop:
+ continue;
}
return NULL;
}
```
#### 實驗比較不同 STEAL CHUNK size
根據下圖可以發現,當一次竊取數量為 3 時,throughput 最高。

#### 實驗比較 lock-based vs. lock-free vs. lock-free-rr
```
root@tonylinx:/home/tonylinx/linux2025/GEMM# perf stat -r 10 -e cache-misses,cs ./build/main_bench 2048 2048 2048
Time: 0.367595 sec
Time: 0.377324 sec
Time: 0.371927 sec
Time: 0.374560 sec
Time: 0.374375 sec
Time: 0.369202 sec
Time: 0.369010 sec
Time: 0.367379 sec
Time: 0.370429 sec
Time: 0.367065 sec
Performance counter stats for './build/main_bench 2048 2048 2048' (10 runs):
7,338,196 cache-misses ( +- 4.30% )
135 cs ( +- 4.18% )
0.53808 +- 0.00263 seconds time elapsed ( +- 0.49% )
```
```
root@tonylinx:/home/tonylinx/linux2025/GEMM# perf stat -r 10 -e cache-misses,cs ./build/lockfree_rr_bench 2048 2048 2048
Time: 0.340515 sec
Time: 0.344814 sec
Time: 0.343446 sec
Time: 0.335358 sec
Time: 0.333868 sec
Time: 0.337032 sec
Time: 0.346161 sec
Time: 0.344273 sec
Time: 0.335613 sec
Time: 0.343400 sec
Performance counter stats for './build/lockfree_rr_bench 2048 2048 2048' (10 runs):
8,763,560 cache-misses ( +- 2.51% )
125 cs ( +- 2.59% )
0.50839 +- 0.00213 seconds time elapsed ( +- 0.42% )
```
由此實驗可以看到,經歷上述的這些調整,使得 `lock-free-rr` 在完成整個矩陣運算過程,可以比 lock-based 還更加有效率。

### 探討效能瓶頸的區域
我用 perf 去觀察整個程式,耗費時間最多的地方在哪裡?可以看到在計算 `mm_tile` 是最花費時間。
```bash
perf record -g ./build/lockfree_rr_bench 2048 2048 2048
perf annotate
```
```bash
│ for (size_t j = 0; j < MICRO_TILE; j++) ▒
3.66 │1a8: movss (%r12),%xmm1 ◆
│ float a = task->A[(ti + i) * task->stride_a + k]; ▒
0.01 │ mov %rdi,%rdx ▒
0.39 │ lea 0x20(%rax),%rcx ▒
2.55 │ shufps $0x0,%xmm1,%xmm1 ▒
│ sum[i][j] += a * task->B[(tj + j) * task->stride_b + k]; ▒
3.61 │1b9: movss (%rdx,%rbx,8),%xmm0 ▒
1.79 │ movss (%rdx,%rbx,4),%xmm2 ▒
4.42 │ add $0x10,%rax ▒
4.33 │ movss (%rdx),%xmm3 ▒
3.87 │ unpcklps %xmm0,%xmm2 ▒
2.32 │ movss (%rdx,%r13,1),%xmm0 ▒
4.11 │ add %r10,%rdx ▒
4.62 │ unpcklps %xmm3,%xmm0 ▒
3.75 │ movlhps %xmm2,%xmm0 ▒
10.60 │ mulps %xmm1,%xmm0 ▒
22.86 │ addps -0x10(%rax),%xmm0 ▒
7.79 │ movaps %xmm0,-0x10(%rax)
```
回顧 lockfree_rr.c 的 `mm_tile` 可以發現,目前的 `mm_tile` 並不支援 SIMD 架構,還是取單一個值進行計算。
```c
for (size_t ti = 0; ti < TILE_SIZE; ti += MICRO_TILE) {
for (size_t tj = 0; tj < TILE_SIZE; tj += MICRO_TILE) {
float sum[MICRO_TILE][MICRO_TILE] = {0};
for (size_t k = 0; k < task->n_k; k++) {
for (size_t i = 0; i < MICRO_TILE; i++) {
float a = task->A[(ti + i) * task->stride_a + k];
for (size_t j = 0; j < MICRO_TILE; j++)
sum[i][j] += a * task->B[(tj + j) * task->stride_b + k];
}
}
for (size_t i = 0; i < MICRO_TILE; i++) {
for (size_t j = 0; j < MICRO_TILE; j++)
task->C[(ti + i) * task->stride_c + (tj + j)] = sum[i][j];
}
}
}
```
參考 [0xff07 HackMD 筆記](https://hackmd.io/@0xff07/sw-pipeline),我們可以將計算核心改寫為支援 SIMD 的架構。由於 `MICRO_TILE = 8`,意味著我們的 micro-tile 為 8×8。這使我們可以利用 AVX 的 _m256 指令集,一次處理 8 個 float。結合手動 loop unrolling(展開 8 個 row),即可實作出一次更新 64 個浮點數的方式。
我們做的是:
$$
C = A \times B \quad \text{(但 B 是轉置後的,記憶體中存的是 } B^T \text{)}
$$
計算公式為:
$$
C[i][j] = \sum_{k=0}^{k-1} A[i][k] \cdot B[j][k]
$$
每一次 8*8 micro-tile 矩陣乘法的視覺化如下:
```
A = B^T = C =
[ a00 a01 a02 a03 a04 a05 a06 a07 ... a0k ] [ b00 b01 b02 b03 b04 b05 b06 b07 ... b0k ] [ c00 c01 c02 c03 c04 c05 c06 c07 ]
[ a10 a11 a12 a13 a14 a15 a16 a17 ... a1k ] [ b10 b11 b12 b13 b14 b15 b16 b17 ... b1k ] [ c10 c11 c12 c13 c14 c15 c16 c17 ]
[ a20 a21 a22 a23 a24 a25 a26 a27 ... a2k ] [ b20 b21 b22 b23 b24 b25 b26 b27 ... b2k ] [ c20 c21 c22 c23 c24 c25 c26 c27 ]
[ a30 a31 a32 a33 a34 a35 a36 a37 ... a3k ] [ b30 b31 b32 b33 b34 b35 b36 b37 ... b3k ] [ c30 c31 c32 c33 c34 c35 c36 c37 ]
[ a40 a41 a42 a43 a44 a45 a46 a47 ... a4k ] [ b40 b41 b42 b43 b44 b45 b46 b47 ... b4k ] [ c40 c41 c42 c43 c44 c45 c46 c47 ]
[ a50 a51 a52 a53 a54 a55 a56 a57 ... a5k ] [ b50 b51 b52 b53 b54 b55 b56 b57 ... b6k ] [ c50 c51 c52 c53 c54 c55 c56 c57 ]
[ a60 a61 a62 a63 a64 a65 a66 a67 ... a6k ] [ b60 b61 b62 b63 b64 b65 b66 b67 ... b6k ] [ c60 c61 c62 c63 c64 c65 c66 c67 ]
[ a70 a71 a72 a73 a74 a75 a76 a77 ... a7k ] [ b70 b71 b72 b73 b74 b75 b76 b77 ... b7k ] [ c70 c71 c72 c73 c74 c75 c76 c77 ]
```
下面的計算方式可以觀察到,若要計算出 C 矩陣的第一列 (c00 ~ c07), a00 ~ a0k 會被重複使用 8 次,所以我們可以用 `__m256 a0 = _mm256_set1_ps()` 存 8 個一樣的 a 值,然後分別與 b00 ~ b70 去計算 k = 0 下 c00 ~ c07 的值,然後透過 for loop 將所有 k 計算的結果相加就是 c00 ~ c07 的值。 那其他列計算的方式也跟第一列一模一樣,所以我們可以透過手動 loop unrooling 一次計算 8 列,這樣的方式可以從過去計算單 1 個數值優化成一次計算 64 個數值,大幅增加效能。
```
c00 = a00·b00 + a01·b01 + a02·b02 + a03·b03 + a04·b04 + a05·b05 + a06·b06 + a07·b07 + ... + a0k·b0k
c01 = a00·b10 + a01·b11 + a02·b12 + a03·b13 + a04·b14 + a05·b15 + a06·b16 + a07·b17 + ... + a0k·b1k
c02 = a00·b20 + a01·b21 + a02·b22 + a03·b23 + a04·b24 + a05·b25 + a06·b26 + a07·b27 + ... + a0k·b2k
c03 = a00·b30 + a01·b31 + a02·b32 + a03·b33 + a04·b34 + a05·b35 + a06·b36 + a07·b37 + ... + a0k·b3k
c04 = a00·b40 + a01·b41 + a02·b42 + a03·b43 + a04·b44 + a05·b45 + a06·b46 + a07·b47 + ... + a0k·b4k
c05 = a00·b50 + a01·b51 + a02·b52 + a03·b53 + a04·b54 + a05·b55 + a06·b56 + a07·b57 + ... + a0k·b5k
c06 = a00·b60 + a01·b61 + a02·b62 + a03·b63 + a04·b64 + a05·b65 + a06·b66 + a07·b67 + ... + a0k·b6k
c07 = a00·b70 + a01·b71 + a02·b72 + a03·b73 + a04·b74 + a05·b75 + a06·b76 + a07·b77 + ... + a0k·b7k
c10 = a10·b00 + a11·b01 + a12·b02 + a13·b03 + a14·b04 + a15·b05 + a16·b06 + a17·b07 + ... + a1k·b0k
c11 = a10·b10 + a11·b11 + a12·b12 + a13·b13 + a14·b14 + a15·b15 + a16·b16 + a17·b17 + ... + a1k·b1k
c12 = a10·b20 + a11·b21 + a12·b22 + a13·b23 + a14·b24 + a15·b25 + a16·b26 + a17·b27 + ... + a1k·b2k
c13 = a10·b30 + a11·b31 + a12·b32 + a13·b33 + a14·b34 + a15·b35 + a16·b36 + a17·b37 + ... + a1k·b3k
c14 = a10·b40 + a11·b41 + a12·b42 + a13·b43 + a14·b44 + a15·b45 + a16·b46 + a17·b47 + ... + a1k·b4k
c15 = a10·b50 + a11·b51 + a12·b52 + a13·b53 + a14·b54 + a15·b55 + a16·b56 + a17·b57 + ... + a1k·b5k
c16 = a10·b60 + a11·b61 + a12·b62 + a13·b63 + a14·b64 + a15·b65 + a16·b66 + a17·b67 + ... + a1k·b6k
c17 = a10·b70 + a11·b71 + a12·b72 + a13·b73 + a14·b74 + a15·b75 + a16·b76 + a17·b77 + ... + a1k·b7k
```
```diff
static inline void mm_tile(const task_t *task)
{
- printf("task->n_k:%ld\n ",task->n_k);
for (size_t ti = 0; ti < TILE_SIZE; ti += MICRO_TILE) {
for (size_t tj = 0; tj < TILE_SIZE; tj += MICRO_TILE) {
- float sum[MICRO_TILE][MICRO_TILE] = {0};
- for (size_t k = 0; k < task->n_k; k++) {
- for (size_t i = 0; i < MICRO_TILE; i++) {
- float a = task->A[(ti + i) * task->stride_a + k];
- for (size_t j = 0; j < MICRO_TILE; j++){
+
+ __m256 c[MICRO_TILE];
+ for (int v = 0; v < MICRO_TILE; ++v) c[v] = _mm256_setzero_ps();
+
+ for (size_t k = 0; k < task->n_k; ++k) {
+ /* 1. broadcast A[ti~ti+7]][k] */
+ __m256 a0 = _mm256_set1_ps(task->A[(ti + 0) * task->stride_a + k]);
+ __m256 a1 = _mm256_set1_ps(task->A[(ti + 1) * task->stride_a + k]);
+ __m256 a2 = _mm256_set1_ps(task->A[(ti + 2) * task->stride_a + k]);
+ __m256 a3 = _mm256_set1_ps(task->A[(ti + 3) * task->stride_a + k]);
+ __m256 a4 = _mm256_set1_ps(task->A[(ti + 4) * task->stride_a + k]);
+ __m256 a5 = _mm256_set1_ps(task->A[(ti + 5) * task->stride_a + k]);
+ __m256 a6 = _mm256_set1_ps(task->A[(ti + 6) * task->stride_a + k]);
+ __m256 a7 = _mm256_set1_ps(task->A[(ti + 7) * task->stride_a + k]);
+
+ /* 2. 對固定 k,把 B 的 8 個 column 分別取出 */
+ const float *baseB = task->B + k;
+ const size_t sb = task->stride_b;
- sum[i][j] += a * task->B[(tj + j) * task->stride_b + k];
- }
-
- }
- }
- for (size_t i = 0; i < MICRO_TILE; i++) {
- for (size_t j = 0; j < MICRO_TILE; j++)
- task->C[(ti + i) * task->stride_c + (tj + j)] = sum[i][j];
+ //倒過來擺放的原因是 _mm256_set_ps 的設計是高位在前,低位在後,所以它的結構是 b[7]~b[0],不是 b[0]~b[7]
+ __m256 b = _mm256_set_ps(
+ baseB[(tj + 7) * sb],
+ baseB[(tj + 6) * sb],
+ baseB[(tj + 5) * sb],
+ baseB[(tj + 4) * sb],
+ baseB[(tj + 3) * sb],
+ baseB[(tj + 2) * sb],
+ baseB[(tj + 1) * sb],
+ baseB[(tj + 0) * sb]);
+
+ /* 3. FMA accumulate */
+ c[0] = _mm256_fmadd_ps(a0, b, c[0]);
+ c[1] = _mm256_fmadd_ps(a1, b, c[1]);
+ c[2] = _mm256_fmadd_ps(a2, b, c[2]);
+ c[3] = _mm256_fmadd_ps(a3, b, c[3]);
+ c[4] = _mm256_fmadd_ps(a4, b, c[4]);
+ c[5] = _mm256_fmadd_ps(a5, b, c[5]);
+ c[6] = _mm256_fmadd_ps(a6, b, c[6]);
+ c[7] = _mm256_fmadd_ps(a7, b, c[7]);
}
+
+ /* 4. store back 8×8 */
+ for (int v = 0; v < 8; ++v)
+ _mm256_storeu_ps(&task->C[(ti + v) * task->stride_c + tj], c[v]);
}
}
}
+
```
看實驗成果,可以發現改成支援 SIMD 架構後, throughput 大副度地提昇。

### 記錄問題
* main.c `pthread_cond_broadcast(&pool->all_done)` 的部分應該在呼叫之前要先取得對應 mutex lock,避免 lost wake-up 的問題。 [chage.diff](https://gist.github.com/jserv/278cf8b489a7ba5cd510b02b9f6dac19)
>Calling pthread_cond_signal() or pthread_cond_broadcast() when the thread does not hold the mutex lock associated with the condition can lead to lost wake-up bugs. [Oracle Solaris Multithreaded Programming Guide. “Lost Wake-Up Problem”](https://docs.oracle.com/cd/E19455-01/806-5257/sync-30/index.html?utm_source=chatgpt.com)
> The pthread_cond_broadcast() or pthread_cond_signal() functions may be called by a thread whether or not it currently owns the mutex that threads calling pthread_cond_wait() or pthread_cond_timedwait() have associated with the condition variable during their waits; however, if predictable scheduling behavior is required, then that mutex shall be locked by the thread calling pthread_cond_broadcast() or pthread_cond_signal(). — POSIX.1-2017, [The Open Group Base Specification, Issue 7 (POSIX.1-2017).](https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_cond_broadcast.html)
## TODO: 重作[第 11 週測驗題](https://hackmd.io/@sysprog/linux2025-quiz11)
> 包含延伸問題