owned this note
owned this note
Published
Linked with GitHub
# Linux 核心專題: 並行程式設計
> 執行人: kkkkk1109
> [簡報內容](https://docs.google.com/presentation/d/1MTa8o6FMPKAB01YYOXVmzj_e0gA1vfJe/edit?usp=drive_link&ouid=104070822328263146319&rtpof=true&sd=true)
> [講解影片](https://www.youtube.com/watch?v=JDrb_v_Im5g)
### Reviewed by `chloe0919`
> EMPTY 代表搶工作失敗,試著在同個佇列中再搶一次
應為 ABORT 才是搶工作失敗,需要在同佇列中再嘗試偷竊一次
> 收到!感謝提醒
### Reviewed by `fennecJ`
* 透過 Work-stealing 從其他 theads 把工作偷過來執行的手法雖然可以提昇總體的 throughput ,卻會因為 task migration 的關係造成額外的成本,若 task migration 頻繁發生,甚至可能導致執行上花費大量時間處理因 task migration 產生的成本反而使總體執行效率降低。為了降低 task migration 造成的衝擊,當今天有 task 滿足可說明影片中被竊取的條件時「 Top <= bottom 」,我們還有哪些可以考慮的因素決定是否要竊取該 task ?
> 我認為還可以從剩餘工作較多的佇列偷取,並限制可偷取工作執行緒數量來減少 task migration 的大量成本
* Bloom filter 透過 hash 達成高效查詢的能力,但它卻有無法刪除存在於 table 內的 element 的顯著缺點,針對這個問題,資料科學家提出了改善的資料結構: [Counting Bloom Filter](https://en.wikipedia.org/wiki/Counting_Bloom_filter) 以及 [Quotient filter](https://en.wikipedia.org/wiki/Quotient_filter)
可否請你就上述不同資料結構進行比較,針對實做成本、支援操作、時間複雜度進行探討,並歸納出各自適合的應用場景。
> 可以! 感謝補充,我會實作看看
### Reviewed by `Wufangni`
> 判斷目前佇列的剩餘工作,由 top 和 bottom 的關係判斷
若能利用 top 和 bottom 的差當作佇列目前的任務多寡來判斷該優先偷取哪個佇列的任務,是否能降低空佇列(佇列內部)發生的情況?
> 收到,我認為此想法是可行的,可以實驗看看。
### Reviewed by `stevendd543`
想請問後面有提到將 channel 資料透過互斥鎖保護
```c
mutex_lock(&ch->data_mtx);
*data = *ptr;
mutex_unlock(&ch->data_mtx);
```
> 之後改成使用 atomic 操作,沒有使用互斥鎖是因為在 `chan_sen_unbuf` 和 `chan_recv_unbuf` 不會同時進入到 `*data = *ptr` 和 `*ptr = data` 操作中,因此在前後加上互斥鎖無法解決 data race 的問題。實際上是外部 `reader` 存取 `msg` 時造成 data race,且已經使用了 `send_mtx` 和 `recv_mtx`,想要避免使用多個鎖而造成 dead lock 的情況。
就我了解 msg 也是指向 chnnel 為什麼前面將其保護,後面 main 在存取的時候還需要再使用 atomic 操作? ` atomic_fetch_add_explicit(&msg_count[count], 1, memory_order_relaxed);`
> 雖然已經使用了 atomic 操作在進行 `atomic_fetch_add_explicit(&msg_count[msg], 1, memory_order_relaxed)` ,但這個 `msg` 需要再進行讀取一次,並且是沒有進行保護的,很容易就被其他執行緒同時更改這個值,而導致讀取失敗和寫入失敗,因此才使用 `count` 進行 atomic 操作來避免上述事件發生。
### Reviewed by `Shiang1212`
* 建議一
> 成功後,以 new_head 存取 old_head->next,若 new_head 為 0 ,則代表為最後一個節點,返回 NULL。
這段話看起來是用來描述這行程式碼:
```c
new_head = atomic_load(&old_head->next);
```
這裡使用 "存取" 這個詞不太恰當,函式名稱都有 "load" 字眼出現了,應該使用 "載入" 或 "讀取" 之類的詞,建議可以改成:使用 `atomic_load` 載入 `old_head->next` 的值,並將其賦值給 `new_head`。
> 收到!謝謝指正
* 建議二
> `fph` 和 `fpt` 對應到 `free_pool` 的頭和尾
>`insert_pool` 將 `node` 放入 `free_pool` 的節尾
>`free_pool` 為釋放 `free_pool` 中的節點
`free_pool` 應是 [lfq.c](https://gist.github.com/jserv/128b735e8e73846d38bf1e98ba553607) 中的一個函式,如上面的例子,若你想表達是 retire list,你應該使用其他詞來表示,否則容易造成誤會。
> 收到!謝謝指正
## TODO: 紀錄閱讀 [並行程式設計](https://hackmd.io/@sysprog/concurrency) 教材中遇到的問題
> 重現實驗並嘗試對內文做出貢獻
> [閱讀筆記](https://hackmd.io/@kkkkk1109/ry9tV2hzA/edit)
#### 排程器
CPU 會為各個工作進行排程,來決定下一個工作,可以是 static 排程,也可以是 dynamic 的排程。
排程有多種演算法,依照時間分割(round-robin scheduling 或 time slicing) 用於處理相同重要性的工作,而任務優先順序的 (priority scheduling)則處理那些有不同時效性的任務,也就是 hard real-time 的工作。
:::info
排程器是在哪裡運行的 是會有一個CPU來完成這些事情嗎 排程這件事算一個task嗎
:::
#### 搶佔式與非強取式核心
兩者的核心差別為,工作交出CPU的使用權是強制性的或是非強制的。
非搶佔式(non-preemptive)不會依照工作的優先順序交出CPU的使用權,而是不定期的交出使用權。為了達到並行,因此頻率要夠快,否則讓使用者感受到等待時間,有下列好處
1. 實作單純
2. 工作中可使用**非再進入程式碼(non-reentrant code)**,換言之,每個工作不需擔心在程式未執行完畢時又重新進入。因此該工作本身所用的記憶區不會有被污染 (corruption) 的可能;
3. 對系統共用記憶區的保護動作可減至最少,因為每一工作在未使用完記憶區時不會放棄 CPU ,無須擔心會被其他工作在半途中修改;
:::info
2、3項<s>不太懂</s>
> 不懂就說不懂,不要說「不太懂」。教材有講解錄影,搭配課程相關教材。對照閱讀後,紀錄詳盡的問題。
:::
## TODO: [第 9 週測驗題之 3](https://hackmd.io/@sysprog/linux2024-quiz9)
> 解釋上述程式碼運作原理,包含延伸問題
此題是嘗試以 C11 Atomics 撰寫一 [work stealing](https://en.wikipedia.org/wiki/Work_stealing) 的程式碼
![image](https://hackmd.io/_uploads/ByWOf_uUA.png)
可以把 work stealing 想成,兩執行緒各自有分配的任務,但若其中一方較早完成,便可以**偷走**另一方的任務,來避免空等的情況。
為了避免發生搶到另一方正在執行的任務,通常這種任務佇列會使用雙向佇列(double-ended queue),不同執行緒分別從佇列的頭和尾去完成任務。
接著介紹定義
* `work_t`
```c
typedef struct work_internal {
task_t code;
atomic_int join_count;
void *args[];
} work_t;
```
`task_t` 會回傳一 `work_t` 的指標,其定義如下,主要是回傳此工作的指標
```c
typedef struct work_internal *(*task_t)(struct work_internal *);
```
而 `join_count` 為計算此任務還需要多少參數,`args` 即為此任務所需要的參數
```c
typedef struct {
atomic_size_t size;
_Atomic work_t *buffer[];
} array_t;
```
```c
typedef struct {
/* Assume that they never overflow */
atomic_size_t top, bottom;
_Atomic(array_t *) array;
} deque_t;
```
接著是工作佇列的定義,使用 `top` 和 `bottom` 指向佇列的頭和尾
接著來解釋各個操作
* `init`
使用 `atomic_init` 來初始 `deque_t` 的各個參數
* 將 `top` 、 `bottom` 設為 0
* `size_hint` 為 可放入幾項任務,並使用 malloc 配置記憶體空間
* 將 `size` 以 `size_hint` 初始化
* 將 `array` 指向 `a` 的指標
```c
void init(deque_t *q, int size_hint)
{
atomic_init(&q->top, 0);
atomic_init(&q->bottom, 0);
array_t *a = malloc(sizeof(array_t) + sizeof(work_t *) * size_hint);
atomic_init(&a->size, size_hint);
atomic_init(&q->array, a);
}
```
而主要操作有 `push` 、 `take` 和 `steal`,下列是一個 deque 的範例
```graphviz
digraph G {
rankdir=LR;
node[shape=record];
map [label="A[0]|<a1>A[1]|A[2]|A[3]|... |<an>A[n-2]|A[n-1] "];
node[shape=plaintext]
top [fontcolor="red"]
bottom [fontcolor="red"]
top->map:a1
bottom->map:an
}
```
* `push` 的操作為將新的任務放入 `bottom` 的位置,並使 `bottom += 1 `
```c
void push(deque_t *q, work_t *w)
{
size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed);
size_t t = atomic_load_explicit(&q->top, memory_order_acquire);
array_t *a = atomic_load_explicit(&q->array, memory_order_relaxed);
if (b - t > a->size - 1) { /* Full queue */
resize(q);
a = atomic_load_explicit(&q->array, memory_order_relaxed);
}
atomic_store_explicit(&a->buffer[b % a->size], w, memory_order_relaxed);
atomic_thread_fence(memory_order_release);
atomic_store_explicit(&q->bottom, DDDD, memory_order_relaxed);
}
```
這邊要注意,由於 `push` 可能會使佇列的空間改變,因此當佇列滿時會調用到 `resize` 來改變佇列大小
`atomic_thread_fence(memory_order_release)` 用來確保 fence 後的 `store` 必定排在 fence 前的操作
:::success
`DDDD` 應為 `b+1`
:::
* `take` 的操作為從自己的佇列中,將 `bottom - 1` 的工作取出執行,並將 `bottom -= 1` ,但要考慮到佇列內剩餘工作的數量,考量是否會發生和 `steal` 同時發生競爭的情況,主要分為兩個部分
1. 預設取得任務會成功,將 `bottom -1` 並存回 `bottom`,取得 `top` 和 `bottom` 的數值
```c
work_t *take(deque_t *q)
{
size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed) - 1;
array_t *a = atomic_load_explicit(&q->array, memory_order_relaxed);
atomic_store_explicit(&q->bottom, b, memory_order_relaxed);
atomic_thread_fence(memory_order_seq_cst);
size_t t = atomic_load_explicit(&q->top, memory_order_relaxed);
work_t *x;
...
}
```
2. 判斷目前佇列的剩餘工作,由 `top` 和 `bottom` 的關係判斷
* `top` < `bottom` 代表還有工作,可直接取出
* `top` = `bottom` 代表只剩一個工作,可能造成和 `steal` 競爭最後一個工作,以 `atomic_compare_exchange_strong_explicit` 判斷是否失敗,失敗代表被 `steal` 搶走工作,復原 `bottom` ;反之則代表取得工作成功,將 `top += 1`,
* `top` > `bottom` 代表沒有工作,需復原 `bottom`
```c
...
if (t <= b) {
/* Non-empty queue */
x = atomic_load_explicit(&a->buffer[b % a->size], memory_order_relaxed);
if (t == b) {
/* Single last element in queue */
if (!atomic_compare_exchange_strong_explicit(&q->top, &t, t + 1,
memory_order_seq_cst,
memory_order_relaxed))
/* Failed race */
x = EMPTY;
atomic_store_explicit(&q->bottom,b + 1, memory_order_relaxed);
}
} else { /* Empty queue */
x = EMPTY;
atomic_store_explicit(&q->bottom, b + 1, memory_order_relaxed);
}
return x;
```
<s> `AAAA` 應為 `t+1` ,`BBBB` 應為 `b+1` ,`CCCC` 應為 `b+1` </s>
:::danger
專注在程式碼本身,不用抄題目,更正上方程式碼的內容。
:::
* `steal` 從 `top` 的方向取得工作,取得 `top` 和 `bottom` 後,一樣使用 `atomic_compare_exchange_strong_explicit` 判斷是否會和 `take` 競爭
```c
work_t *steal(deque_t *q)
{
size_t t = atomic_load_explicit(&q->top, memory_order_acquire);
atomic_thread_fence(memory_order_seq_cst);
size_t b = atomic_load_explicit(&q->bottom, memory_order_acquire);
work_t *x = EMPTY;
if (t < b) {
/* Non-empty queue */
array_t *a = atomic_load_explicit(&q->array, memory_order_consume);
x = atomic_load_explicit(&a->buffer[t % a->size], memory_order_relaxed);
if (!atomic_compare_exchange_strong_explicit(
&q->top, &t, t + 1, memory_order_seq_cst, memory_order_relaxed))
/* Failed race */
return ABORT;
}
return x;
}
```
<s> `EEEE` 應為 `t+1` </s>
:::danger
專注在程式碼本身,不用抄寫題目。
不要急著解說程式碼,應揣摩程式開發者的本意,說明「如果是我設計這段程式碼,我會怎麼做?」
:::
* `resize` 的動作為將佇列的空間加大,會直接開兩倍的空間給新的佇列。
```c
void resize(deque_t *q)
{
array_t *a = atomic_load_explicit(&q->array, memory_order_relaxed);
size_t old_size = a->size;
size_t new_size = old_size * 2;
array_t *new = malloc(sizeof(array_t) + sizeof(work_t *) * new_size);
atomic_init(&new->size, new_size);
size_t t = atomic_load_explicit(&q->top, memory_order_relaxed);
size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed);
for (size_t i = t; i < b; i++)
new->buffer[i % new_size] = a->buffer[i % old_size];
atomic_store_explicit(&q->array, new, memory_order_relaxed);
```
接著是實際操作
`thread` 為每個 thread 各自處理自己的工作佇列,使用 `take` 取得任務
```c
void *thread(void *payload)
{
int id = *(int *) payload;
deque_t *my_queue = &thread_queues[id];
while (true) {
work_t *work = take(my_queue);
if (work != EMPTY) {
do_work(id, work);
}else {
...
```
當沒有工作後,則可以開始 `steal` 別個 thread 的工作
`ABORT` 代表搶工作失敗,試著在同個佇列中再搶一次;`EMPTY` 代表此佇列已空。只要偷到工作便會跳離迴圈
```c
for (int i = 0; i < N_THREADS; ++i) {
if (i == id)
continue;
stolen = steal(&thread_queues[i]);
if (stolen == ABORT) {
i--;
continue; /* Try again at the same i */
} else if (stolen == EMPTY)
continue;
/* Found some work to do */
break;
}
```
如果全部跑完後,都沒有偷到任務的話,則檢查 `done` ,代表所有任務都已做完,沒有的話再重新跑一次迴圈。
```c
if (stolen == EMPTY) {
if (atomic_load(&done))
break;
continue;
} else {
do_work(id, stolen);
}
printf("work item %d finished\n", id);
return NULL;
```
---
## TODO: [第 10 週測驗題之 1, 2, 3](https://hackmd.io/@sysprog/linux2024-quiz10)
> 解釋上述程式碼運作原理,包含延伸問題
### 測驗 `1`
此題延伸 [第 6 周測驗題](https://hackmd.io/@sysprog/linux2024-quiz6)中的 [bloom filter](https://en.wikipedia.org/wiki/Bloom_filter),發展在並行環境中的程式碼,觀察理論與實際效能。
Bloom Filter 利用雜湊函數,在不用走訪全部元素的前提,預測特定字串是否在資料結構中。
其架構如下
* n 個位元構成的 table
* k 個雜湊函數:$h_1$、$h_2$、...、$h_k$
* 當有新的字串 s ,將 s 透過 k 個雜湊函數,會得到 k 個 `index` ,並將 `table[index]` 設為 `1`
當要檢查字串 s 是否存在,只需要透過這 k 個雜湊函數觀察 table 上的 index 是否為 1 即可。
![image](https://hackmd.io/_uploads/rJ3g6OYUR.png)
不過此作法存在著錯誤,有可能字串 s1 和 s2 經過 k 個雜湊函數後的 `table` 一模一樣,實際上我們只有放入字串 s2 。因此我們只能確保說,此字串一定不在資料結構中,但不能確保一定存在。
此外,無法將字串刪除,因為刪除可能會影響到其他字串的 index ,因此 bloom filter **只能新增,不能移除**。
此題的 table 為 $2^{28}$ 個位元,並使用 2 個雜湊函數
* `bloom_s` 為 bloom filter 的 table,而 table size 為 `2^28` 個位元
```c
#define BLOOMFILTER_SIZE 268435456 (2^28)
#define BLOOMFILTER_SIZE_BYTE BLOOMFILTER_SIZE / sizeof(volatile char)
struct bloom_s {
volatile char data[BLOOMFILTER_SIZE_BYTE];
};
```
並將 `bloom_s` 定義成 `bloom_t`
```c
typedef struct bloom_s bloom_t;
```
* `get` 為獲得 table 中的 key 的 bit
```c
static inline int get(bloom_t *filter, size_t key)
{
uint64_t index = key / sizeof(char);
uint8_t bit = 1u << (key % sizeof(char));
return (filter->data[index] & bit) != 0;
}
```
* `set` 將 table 中的 key 的 bit 設為 1
```c
static inline int set(bloom_t *filter, size_t key)
{
uint64_t index = key / sizeof(char);
uint64_t bit = 1u << (key % sizeof(char));
return (atomic_fetch_or(&filter->data[index], bit) & bit) == 0;
}
```
<s> `AAAA` 應為 `atomic_fetch_or` ,和 `bit` 做 `or` 運算 </s>
:::danger
不用抄寫題目,專注在程式碼本身。
:::
* `bloom_new` 為開啟一個新的 bloom filter,使用 `memset` 將所有 bit 設為 0
```c
bloom_t *bloom_new(bloom_allocator allocator)
{
bloom_t *filter = allocator(sizeof(bloom_t));
memset(filter, 0, sizeof(bloom_t));
return filter;
}
```
* `bloom_add` 將 key 透過 hash 產生 `hbase`,再將 `hbase` 分成 `h1` 和 `h2` 放入 table 中。
```c
void bloom_add(bloom_t *filter, const void *key, size_t keylen)
{
uint64_t hbase = hash(key, keylen);
uint32_t h1 = (hbase >> 32) % BLOOMFILTER_SIZE;
uint32_t h2 = hbase % BLOOMFILTER_SIZE;
set(filter, h1);
set(filter, h2);
}
```
* `bloom_test` 測試 `key` 是否存在於資料結構中
```c
int bloom_test(bloom_t *filter, const void *key, size_t keylen)
{
uint64_t hbase = hash(key, keylen);
uint32_t h1 = (hbase >> 32) % BLOOMFILTER_SIZE;
uint32_t h2 = hbase % BLOOMFILTER_SIZE;
return get(filter,h1) & get(filter,h2);
}
```
* `bloom_destroy` 會釋放 bloom filter 中的記憶體
* `bloom_clear` 將 bloom filter 所有 bit 設為 0
接著看測試檔案
* `globals_t`
```c
typedef struct {
int parent_fd;
bloom_t *filter;
uint32_t op_counter;
} globals_t;
globals_t globals;
```
:::info
`parent_fd` 可以讓 child process 和 parent process 透過 pipe 傳輸資料
`op_counter` 做了幾筆操作
:::
* `worker_loop`
`key` 為放入的字串,`key_len` 為字串長度
```c
void worker_loop()
{
const u_int8_t key[] = "wiki.csie.ncku.edu.tw";
u_int64_t key_len = strlen((const char *) key);
...
}
```
[`getpid`](https://man7.org/linux/man-pages/man2/getpid.2.html) 可以得到目前使用的 process id,[`srand`](https://man7.org/linux/man-pages/man3/rand.3p.html)可以產生亂數種子,使用 `rand` 基於產生的亂數種子產生亂數
```c
...
int *k = (int *) key;
srand(getpid());
*k = rand();
...
```
根據 `k` 的數值,決定是要將 key 放入資料結構中或是測試,並將 `*k++` 重複迴圈
```c
while (1) {
int n = (*k) % 100;
if (n < CONTAINS_P) {
bloom_test(globals.filter, key, key_len);
} else {
bloom_add(globals.filter, key, key_len);
}
(*k)++;
globals.op_counter++;
}
```
:::info
使用 85% 的機率進行測試,15%的機率放入 bloom filter
:::
* `create_worker`
首先,使用 [`pipe`](https://man7.org/linux/man-pages/man2/pipe.2.html) 創造可以在 process 間傳送訊息的通道,而可以透過 `fd` 讀寫。使用 [`fork()`](https://man7.org/linux/man-pages/man2/fork.2.html) 創造 child process , 並運行 `worker_loop`。
`fork()` 會回傳 pid, `pid = 0` 代表為 child process, `pid = -1` 則為 fork 失敗。
```c
int create_worker(worker *wrk)
{
int fd[2];
int ret = pipe(fd);
if (ret == -1)
return -1;
pid_t pid = fork();
if (!pid) {
/* Worker */
close(fd[0]);
globals.parent_fd = fd[1];
worker_loop();
exit(0);
}
if (pid < 0) {
printf("ERROR[%d]:%s", errno, strerror(errno));
close(fd[0]);
close(fd[1]);
return -1;
}
close(fd[1]);
wrk->pid = pid;
wrk->fd = fd[0];
return 0;
}
```
在 `main` 中,會創建 `N_WORKERS` 個 `create_worker` ,最後再透過 [kill](https://man7.org/linux/man-pages/man2/kill.2.html)去中止 process
```c
for (int i = 0; i < N_WORKERS; i++) {
uint32_t worker_out = 0;
if (kill(workers[i].pid, SIGKILL)) {
bloom_destroy(&globals.filter, bloom_free);
printf("ERROR[%d]:%s", errno, strerror(errno));
exit(1);
}
(void) read(workers[i].fd, &worker_out, sizeof(uint32_t));
globals.op_counter += worker_out;
}
```
### 使用 [Counting Bloom filter](https://en.wikipedia.org/wiki/Counting_Bloom_filter) 來刪除字串
原先的 Bloom Filter 中,若隨意刪除字串,可能會影響其他字串的在 table 中的 index,Counting Bloom filter 使用了 counter 來計算每個 index 被使用了幾次,當要進行刪除時,只需要將其對應的 counter 減一即可。
在 filter 結構中,加入 counter
```c
struct bloom_s {
volatile char data[BLOOMFILTER_SIZE_BYTE];
uint8_t counter[BLOOMFILTER_SIZE];
};
```
在 `set` 中,將其對應的 counter 加一
```c
static inline int set(bloom_t *filter, size_t key)
{
uint64_t index = key / sizeof(char);
uint64_t bit = 1u << (key % sizeof(char));
filter->counter[key]++;
return (atomic_fetch_or(&filter->data[index], bit) & bit) == 0;
}
```
增加了 `bloom_delete`,首先獲得字串對應到的 index `h1`、`h2`,並確保此字串在 filter 中,再將其刪除。
```c
int bloom_delete(bloom_t *filter, const void *key, size_t keylen)
{
uint64_t hbase = hash(key, keylen);
uint32_t h1 = (hbase >> 32) % BLOOMFILTER_SIZE;
uint32_t h2 = hbase % BLOOMFILTER_SIZE;
// make sure the delete string is in the table
if(get(filter,h1) & get(filter,h2))
{
filter->counter[h1]--;
filter->counter[h2]--;
if(filter->counter[h1] == 0 ) set_zero(filter,h1);
if(filter->counter[h2] == 0 ) set_zero(filter,h2);
return 1;
}
return 0;
}
```
使用 `set_zero` 刪除 table 中的 index
```c
static inline int set_zero(bloom_t *filter, size_t key)
{
uint64_t index = key / sizeof(char);
uint64_t bit = 1u << (key % sizeof(char));
uint64_t mask = 0xffffffffffffffff ^ bit;
filter->data[index] &= mask;
return 0;
}
```
### 測驗 `2`
`lfq` 嘗試實作精簡的並行佇列 (concurrent queue),運用 [hazard pointer](https://en.wikipedia.org/wiki/Hazard_pointer) 來釋放並行處理過程中的記憶體。
首先,先介紹 Hazard pointer,於〈[並行程式設計: Hazard pointer](https://hackmd.io/@sysprog/concurrency-hazard-pointer#%E4%B8%A6%E8%A1%8C%E7%A8%8B%E5%BC%8F%E8%A8%AD%E8%A8%88-Hazard-pointer)〉中提到
> 在並行程式設計中,當我們在存取共用的記憶體物件時,需要考慮到其他執行緒是否有可能也正在存取同一個物件,若要釋放該記憶體物件時,不考慮這個問題,會引發嚴重的後果,例如 dangling pointer。
> 使用 mutex 是最簡單且直觀的方法:存取共用記憶體時,acquire lock 即可保證沒有其他執行緒正在存取同一物件,也就可安全地釋放記憶體。但若我們正在存取的是一種 lock-free 資料結構,當然就不能恣意地使用 lock,因為會違反 lock-free 特性,即無論任何執行緒失敗,其他執行緒都能可繼續執行。於是乎,我們需要某種同為 lock-free 的記憶體物件回收機制。
因此使用了 Hazard pointer 的設計,其架構如下
![image](https://hackmd.io/_uploads/HkjJTvcL0.png)
每個 thread 都有自己的 hazard pointer 和 retire list。
設想以下情境,若同個物件, thread 1 正在讀取,但 thread 2 要釋放此記憶體,若是 thread 2 先進行釋放的動作,那麼 thread 1 便會出錯。因此要進行釋放前,要先確保無人讀取或已經讀取完畢,才能進行釋放。
* Hazard pointer : 此 thread 正在讀取的物件指標
* Retire list : 此 thread 即將釋放的物件指標
因此在要釋放前,會加入 retire list,並查看每個 thread 的 hazard pointer 是否指向要釋放的物件,若有,則等到其讀取完成,最後再進行釋放。
而 hazard 涉及為單一 thread 寫入,多個 thread 讀取,才能符合以上情境。
接著來看 `lfq` 中的各種定義
* `lfq_node`
`data` 指向物件,`next ` 指向下個節點, `can_free` 表示這個物件是否可以被釋放
:::success
`free_next` 指向 `free pool` 中的下個節點,應該就是 `retire list`
:::
```c
struct lfq_node {
void *data;
union {
struct lfq_node *next;
struct lfq_node *free_next;
};
bool can_free;
};
```
* `lfq_ctx`
* `head` 和 `tail` 分別指向佇列的頭和尾,使用 `alignas(64)` 避免造成 false sharing
* `HP` 即為 hazard pointer,`MAX_HP_SIZE` 為最多可以有幾個 `HP` ?
* `fph` 和 `fpt` 對應到 `free pool` 的頭和尾
* `bool is_freeing` 確認是否正在釋放
```c
struct lfq_ctx {
alignas(64) struct lfq_node *head;
int count;
struct lfq_node **HP; /* hazard pointers */
int *tid_map;
bool is_freeing;
struct lfq_node *fph, *fpt; /* free pool head/tail */
/* FIXME: get rid of struct. Make it configurable */
int MAX_HP_SIZE;
/* avoid cacheline contention */
alignas(64) struct lfq_node *tail;
};
```
:::success
`tid_map` 目前不清楚
thread id,
:::
接下來是 `lfq` 的各項操作
* `lfq_init` 會初始化整個 `lfq_ctx`
* `lfq_release` 會釋放整個 `lfq_ctx`
* `insert_pool` 將 `node` 放入 `free_pool` 的節尾
* 函式 `free_pool` 為釋放 `free pool` 中的節點
:::success
`AAAA` 應為檢查目前是否有人正在`free` ,所以應為 ``,操作成功即可進入迴圈,失敗代表已經有人在操作。
:::
獲得 `free_pool` 的 `head` 後,逐一檢查其中的節點是否可以釋放,若為以下條件
1. `(!atomic_load(&p->can_free)` 代表 `can_free = 0` 無法釋放
2. `(!atomic_load(&p->free_next))` 代表此節點已經是佇列末端,無法釋放
3. `in_hp(ctx, (struct lfq_node *) p)` 此節點被 `HP` 所指向,有 thread 正在讀取
則會跳出迴圈,並說明目前並無釋放記憶體,或者是已經釋放完成。
```c
static void free_pool(struct lfq_ctx *ctx, bool freeall)
{
bool old = 0;
if (!atomic_compare_exchange_strong(&ctx->freeing,0,1))
return;
for (int i = 0; i < MAX_FREE || freeall; i++) {
struct lfq_node *p = ctx->fph;
if ((!atomic_load(&p->can_free)) || (!atomic_load(&p->free_next)) ||
in_hp(ctx, (struct lfq_node *) p))
break;
ctx->fph = p->free_next;
free(p);
}
atomic_store(&ctx->is_freeing, false);
atomic_thread_fence(memory_order_seq_cst);
}
```
* `safe_free` 為釋放特定節點
首先,檢查此節點是可以釋放且不在 `HP` 中,接著判斷是否有人在釋放節點中
成功替換,則將此節點釋放,並將 `is_freeing` 改回 `false` 代表釋放完成。
若無法釋放,則加入 `free pool` 即可。
```c
static void safe_free(struct lfq_ctx *ctx, struct lfq_node *node)
{
if (atomic_load(&node->can_free) && !in_hp(ctx, node)) {
/* free is not thread-safe */
bool old = 0;
if (atomic_compare_exchange_strong(&ctx->freeing,0,1)) {
/* poison the pointer to detect use-after-free */
node->next = (void *) -1;
free(node); /* we got the lock; actually free */
atomic_store(&ctx->is_freeing, false);
atomic_thread_fence(memory_order_seq_cst);
} else /* we did not get the lock; only add to a freelist */
insert_pool(ctx, node);
} else
insert_pool(ctx, node);
free_pool(ctx, false);
}
```
* `lfq_enqueue`
在 `lfq_ctx` 中新增節點,使用 `atomic_exchange` 和 `atomic_store` 來替換 `ctx->tail` 和 `old_tail->next`,並確保 `tail->next` 必為 NULL
```c
int lfq_enqueue(struct lfq_ctx *ctx, void *data)
{
struct lfq_node *insert_node = calloc(1, sizeof(struct lfq_node));
if (!insert_node)
return -errno;
insert_node->data = data;
struct lfq_node *old_tail = atomic_exchange(&ctx->tail, insert_node);
assert(!old_tail->next && "old tail was not NULL");
atomic_store(&old_tail->next, insert_node);
return 0;
}
```
* `lfq_dequeue` 會移除節點
```c
void *lfq_dequeue(struct lfq_ctx *ctx)
{
int tid = alloc_tid(ctx);
/* many thread race */
if (tid == -1)
return (void *) -1;
void *ret = lfq_dequeue_tid(ctx, tid);
free_tid(ctx, tid);
return ret;
}
```
首先呼叫 `alloc_tid`,會檢查目前有哪些 `tid` 正在運行,回傳 `-1` 代表所有的 thread 都正在運行;非 `-1` 回傳剛開始運行的 thread
```c
static int alloc_tid(struct lfq_ctx *ctx)
{
for (int i = 0; i < ctx->MAX_HP_SIZE; i++) {
if (ctx->tid_map[i] == 0) {
int old = 0;
if (atomic_compare_exchange_strong(&ctx->tid_map[i], &old, 1))
return i;
}
}
return -1;
}
```
接著呼叫 `lfq_dequeue_tid`,移除其中一個節點,並記錄是 `tid` 正在操作
存取 `old_head` 、 `new_head`
```c
void *lfq_dequeue_tid(struct lfq_ctx *ctx, int tid)
{
struct lfq_node *old_head, *new_head;
...
}
```
`old_head` 為目前的 `head` ,並將 `HP[tid]` 指向此,表示正在讀取此節點。接著再 `atomic_load(&ctx->head)` 檢查是否有被其他 thread 移除,被移除再到 `retry` 重新執行。
```c
do {
retry:
old_head = atomic_load(&ctx->head);
atomic_store(&ctx->HP[tid], old_head);
atomic_thread_fence(memory_order_seq_cst);
if (old_head != atomic_load(&ctx->head))
goto retry;
...
} while (!atomic_compare_exchange_strong(CCCC));
```
成功後,以 `new_head` 載入 `old_head->next`,若 `new_head` 為 0 ,則代表為最後一個節點,返回 NULL
```c
do {
...
new_head = atomic_load(&old_head->next);
if (new_head == 0) {
atomic_store(&ctx->HP[tid], 0);
return NULL;
} while (!atomic_compare_exchange_strong(&ctx->head,&old_head,new_head));
```
最後要再將 `ctx->head` 換成 `new_head`
* `lfg_count_free_list` 計算 `free pool` 中有幾個節點
* `lfq_release` 為釋放整個 `lfq`,分成三個部分釋放
1. 釋放佇列 `ctx` 中的節點
檢查佇列內部是否有資料
```c
if (ctx->tail && ctx->head) { /* if we have data in queue */
while ((struct lfq_node *) ctx->head) { /* while still have node */
struct lfq_node *tmp = (struct lfq_node *) ctx->head->next;
safe_free(ctx, (struct lfq_node *) ctx->head);
ctx->head = tmp;
}
ctx->tail = 0;
}
```
2. 釋放佇列 `free pool` 中的節點
```c
if (ctx->fph && ctx->fpt) {
free_pool(ctx, true);
if (ctx->fph != ctx->fpt)
return -1;
free(ctx->fpt); /* free the empty node */
ctx->fph = ctx->fpt = 0;
}
```
3. 檢查函式 `free_pool` 是否成功,最後再釋放 `HP` 和 `tid_map`。
```c
if (ctx->fph || ctx->fpt)
return -1;
free(ctx->HP);
free(ctx->tid_map);
memset(ctx, 0, sizeof(struct lfq_ctx));
return 0;
```
## TODO: [第 12 週測驗題之 2, 3](https://hackmd.io/@sysprog/linux2024-quiz12)
> 解釋上述程式碼運作原理,包含延伸問題
### 測驗 `1`
此題希望修改以下程式碼,使用 futex 的方式,達到等待 3 秒的效果
```c
#include <linux/futex.h>
#include <stdint.h>
#include <stdio.h>
#include <sys/syscall.h>
#include <time.h>
#include <unistd.h>
int futex_sleep(time_t sec, long ns)
{
uint32_t futex_word = 0;
struct timespec timeout = {sec, ns};
return syscall(SYS_futex, AAAA, BBBB, futex_word, &timeout,
NULL, 0);
}
int main()
{
time_t secs = 3;
printf("Before futex_sleep for %ld seconds\n", secs);
futex_sleep(secs, 0);
printf("After futex_sleep\n");
return 0;
}
```
首先,我們先看 [`timespec` ](https://man7.org/linux/man-pages/man3/timespec.3type.html) ,它表示了一個時間,並且精度可以達到奈秒。
接著查看 `futex` 的格式和參數說明
```c
long syscall(SYS_futex, uint32_t *uaddr, int futex_op, uint32_t val,
const struct timespec *timeout, /* or: uint32_t val2 */
uint32_t *uaddr2, uint32_t val3);
```
Futex (Fast userspace mutex) 藉由使用一個 32 位元變數`futex word`,來和 Kernal 中的 wait queue 來互動,而需要進行同步的執行緒則會共享此變數。
* `uaddr` 指向著 `futexword`
* `futex_op` 則表示著不同的 futex 操作
* `val` 對應著 `futex_op` 中的數值
* `timeout` 、 `uaddr2` 和 `val3` 只有特定的 `futex_op` 會使用到,其餘則可以忽略
FUTEX_WAIT 會查看 `uaddr` 所指向的值,是否和 `val` 相同,此作法是為了避免被其他執行緒改變 `futex_word` 的值,而造成無法進入 sleep。
由於預期是等待 3 秒,因此 `futex_op` 應為 `FUTEX_WAIT` ,而 `*uaddr` 的部份為 `&futex_word` 。
### 測驗 `2`
此題使用 C11 Atomics 和 Linux 提供的 futex 系統呼叫,來模擬 Go 程式語言的 [goroutine](https://go.dev/tour/concurrency/1) 和 [channel](https://go.dev/tour/concurrency/2) 機制
goroutine 可以想像成一個較輕量的 Thread ,而goroutine共享著同樣的地址,因此同步地訪問共同記憶體也非常重要
```go
func main() {
go say("world")
say("hello")
}
```
![image](https://hackmd.io/_uploads/rySrlptEA.png)
原先的 goroutine 會呼叫另一個 goroutine ,當原先也就是 main 的 goroutine 結束,其他的 goroutine也會跟著結束。
而 channel 為各個 goroutine 間的通訊,我們可以以此來完成執行緒間的 wait 和同步。
```go
func say(s string, c chan string) {
for i := 0; i < 5; i++ {
time.Sleep(100 * time.Millisecond)
fmt.Println(s)
}
c <- "FINISH"
}
func main() {
ch := make(chan string)
go say("world", ch)
go say("hello", ch)
<-ch
<-ch
}
```
![image](https://hackmd.io/_uploads/HJUS-vj40.png)
在 `main` 中,創建了 channel ,表示用來傳送字串。
```go
ch := make(chan string)
```
接著創建了兩個 goroutine ,其結束後會往 channel 傳送 "Finish"
```go
func say(s string, c chan string) {
for i := 0; i < 5; i++ {
time.Sleep(100 * time.Millisecond)
fmt.Println(s)
}
c <- "FINISH"
}
```
直到接收到兩個 "Finish" 後,才會結束程式
```go
func main() {
ch := make(chan string)
go say("world", ch)
go say("hello", ch)
<-ch
<-ch
}
```
接著是使用 C11 Atomics 和 futex 實作 GO channel
首先是 `mutex_unlock` ,使用 atomic 操作對 mutex 減一並解鎖,確保不被其他執行緒影響,並查看目前是否有執行緒在等待,有的話就喚醒。
```c
void mutex_unlock(struct mutex *mu)
{
uint32_t orig =
atomic_fetch_sub_explicit(&mu->val, 1, memory_order_relaxed);
if (orig != LOCKED_NO_WAITER) {
mu->val = UNLOCKED;
futex_wake(&mu->val, 1);
}
}
```
使用 [`CAS` ](https://en.wikipedia.org/wiki/Compare-and-swap)操作,會比較 `ptr` 和 `expect` 的值。
**相同**,則將 `new` 放入 `ptr`; **不同的話**,則將 `ptr` 的值放入 `expect` ,最後回傳 expect 。
```c
static uint32_t cas(_Atomic uint32_t *ptr, uint32_t expect, uint32_t new)
{
atomic_compare_exchange_strong_explicit(
ptr, &expect, new, memory_order_acq_rel, memory_order_acquire);
return expect;
}
```
`mutex_lock` 用於將 lock 上鎖,先使用 `CAS` 看目前的 val 值是否為 `UNLOCKED` ,是的話則替換成 `LOCKED_NO_WAITER` ;否的話代表目前鎖有人在使用,因此會進入 futex_wait 直到解鎖。
```c
void mutex_lock(struct mutex *mu)
{
uint32_t val = cas(&mu->val, UNLOCKED, LOCKED_NO_WAITER);
if (val != UNLOCKED) {
do {
if (val == LOCKED ||
cas(&mu->val, LOCKED_NO_WAITER, LOCKED) != UNLOCKED)
futex_wait(&mu->val, LOCKED);
} while ((val = cas(&mu->val, UNLOCKED, LOCKED)) != UNLOCKED);
}
}
```
接著是 `channel` 的宣告,在此有兩種 channel ,分別是 `unbuffer` 和 `ring buffer` 。
```c
struct chan {
_Atomic bool closed;
/* Unbuffered channels only: the pointer used for data exchange. */
_Atomic(void **) datap;
/* Unbuffered channels only: guarantees that at most one writer and one
* reader have the right to access.
*/
struct mutex send_mtx, recv_mtx;
/* For unbuffered channels, these futexes start from 1 (CHAN_NOT_READY).
* They are incremented to indicate that a thread is waiting.
* They are decremented to indicate that data exchange is done.
*
* For buffered channels, these futexes represent credits for a reader or
* write to retry receiving or sending.
*/
_Atomic uint32_t send_ftx, recv_ftx;
/* Buffered channels only: number of waiting threads on the futexes. */
_Atomic size_t send_waiters, recv_waiters;
/* Ring buffer */
size_t cap;
_Atomic uint64_t head, tail;
struct chan_item ring[0];
};
```
* `closed` 表示通道的開啟與否
```c
_Atomic bool closed;
```
* `datap` 用以指向需要交換的資料 (unbuffered)
```c
_Atomic(void **) datap;
```
* `send_mtx` 、` recv_mtx`: 確保只有一位 writer 和 一位 reader (unbuffered)
```c
struct mutex send_mtx, recv_mtx;
```
* `send_ftx` 、` recv_ftx`
* unbuffered :會從 1 開始,表示 channel 尚未完成。增加代表有執行緒在等待,完成傳輸則減少。
* ring buffer :表示可供給傳送和接收的數量
```c
_Atomic uint32_t send_ftx, recv_ftx;
```
* `send_waiters` 、` recv_waiters` 目前正在等待傳送和接收的 Thread (buffered)
```c
_Atomic size_t send_waiters, recv_waiters;
```
* 最後是定義了 ring buffer 的長度 `cap`,並使用 `lap` 來計數。
```c
size_t cap;
_Atomic uint64_t head, tail;
struct chan_item ring[0];
```
```c
struct chan_item {
_Atomic uint32_t lap;
void *data;
};
```
接著是 channel 的相關操作
```c
enum {
CHAN_READY = 0,
CHAN_NOT_READY = 1,
CHAN_WAITING = 2,
CHAN_CLOSED = 3,
};
```
* `chan_init` 初始化一個 channel , 由 `cap` 決定為 unbuffered 還是 buffered channel。 若為 buffered channel ,則使用 `memset` 將所設定的 buffer 以 0 填滿。
```c
static void chan_init(struct chan *ch, size_t cap)
{
ch->closed = false;
ch->datap = NULL;
mutex_init(&ch->send_mtx), mutex_init(&ch->recv_mtx);
if (!cap)
ch->send_ftx = ch->recv_ftx = CHAN_NOT_READY;
else
ch->send_ftx = ch->recv_ftx = 0;
ch->send_waiters = ch->recv_waiters = 0;
ch->cap = cap;
ch->head = (uint64_t) 1 << 32;
ch->tail = 0;
if (ch->cap > 0) memset(ch->ring, 0, cap * sizeof(struct chan_item));
}
```
:::info
目前還不清楚為何 head = 1 << 32
:::
* `chan_make` 創建一個 channel ,並以 `alloc` 分配空間給 channel 。
```c
struct chan *chan_make(size_t cap, chan_alloc_func_t alloc)
{
struct chan *ch;
if (!alloc || !(ch = alloc(sizeof(*ch) + cap * sizeof(struct chan_item))))
return NULL;
chan_init(ch, cap);
return ch;
}
```
:::info
可以使用 malloc 嗎
:::
以下是 buffered 的通道
* `chan_trysend_buf` 執行以下事項
1. 先檢查了 channel 是否開啟
```c
if (atomic_load_explicit(&ch->closed, memory_order_relaxed)) {
errno = EPIPE;
return -1;
}
```
2. 存取 tail 的位置,並存取目前即將寫入的位置。檢查 `item` 的 `lap` 和 `tail` 的 `lap` 是否一致,表示可寫入狀態 ,並檢查這次寫入後 tail 是否達到 buffer 的最後位置,達到的話就回到第一個位置。
```c
uint64_t tail, new_tail;
struct chan_item *item;
// check if the tail is the same
do {
tail = atomic_load_explicit(&ch->tail, memory_order_acquire);
uint32_t pos = tail, lap = tail >> 32;
item = ch->ring + pos;
if (atomic_load_explicit(&item->lap, memory_order_acquire) != lap) {
errno = EAGAIN;
return -1;
}
if (pos + 1 == ch->cap)
new_tail = (uint64_t)(lap + 2) << 32;
else
new_tail = tail + 1;
} while (!atomic_compare_exchange_weak_explicit(&ch->tail, &tail, new_tail,
```
6. `!atomic_compare_exchange_weak_explicit` 檢查 `tail` 和 `ch->tail` 是否相同,若失敗則代表有被打斷,重新執行 1 ~ 4 步驟
7. 成功後,將資料放入buffer,
```c
item->data = data;
atomic_fetch_add_explicit(&item->lap, 1, memory_order_release);
```
* `chan_send_buf`
使用 `chan_trysend_buf` 檢查是否可以傳送,若為 `-1` 則代表目前無法傳送。此時由於交換失敗,因此將 `&ch->send_ftx` 存入 `v` ,因此若 `v` 為零,代表可傳送餘額為 0 進入等待,直到被喚醒。
```c
static int chan_send_buf(struct chan *ch, void *data)
{
//ready to send
while (chan_trysend_buf(ch, data) == -1) {
if (errno != EAGAIN) return -1;
//
uint32_t v = 1;
while (!atomic_compare_exchange_weak_explicit(&ch->send_ftx, &v, v - 1,
memory_order_acq_rel,
memory_order_acquire)) {
// v is zero only when &ch->send_ftx is not equal to &v
if (v == 0) {
atomic_fetch_add_explicit(&ch->send_waiters, 1,
memory_order_acq_rel);
futex_wait(&ch->send_ftx, 0);
atomic_fetch_sub_explicit(&ch->send_waiters, 1,
memory_order_acq_rel);
v = 1;
}
}
}
```
接著, `&ch->recv_ftx` 加一,增加可接收的餘額,並叫醒正在等待接收的 Thread。
```c
// recv_ftx + 1
atomic_fetch_add_explicit(&ch->recv_ftx, 1, memory_order_acq_rel);
// if there are recv_waiters, wake up one thread
if (atomic_load_explicit(&ch->recv_waiters, memory_order_relaxed) > 0)
futex_wake(&ch->recv_ftx, 1);
return 0;
}
```
* `chan_tryrecv_buf` 、`chan_recv_buf` 行為和 `chan_trysendbuf` 、`chan_send_buf` 原理相同。
接下來是 unbuffered 的通道。
* `int chan_send_unbuf(struct chan *ch, void *data)`
先檢查通道是否開啟
```c
if (atomic_load_explicit(&ch->closed, memory_order_relaxed)) {
errno = EPIPE;
return -1;
}
```
使用 mutex 上鎖 `&ch->send_mtx`
```c
mutex_lock(&ch->send_mtx);
```
接著嘗試將資料放入 channel 中,成功的話,代表`ch->data` 和 `ptr` 一樣,此時 `ch->data` 就會是 `data` ; 失敗的話,則 `ptr` 會被替換成 `ch->datap`。
失敗時,由於目前 `ptr` 為 `ch->datap` ,為 channel 的 data 交換區的指標的指標,使用 `*ptr` 將指標內容置換成 `data` ,接著再將 `ch->datap` 指向 NULL。
使用`atomic_fetch_sub_explicit(&ch->recv_ftx, 1, memory_order_acquire) ==
CHAN_WAITING` 查看是否接收端在等待接收,有的話則喚醒。
```c
void **ptr = NULL;
if (!atomic_compare_exchange_strong_explicit(&ch->datap, &ptr, &data,
memory_order_acq_rel,
memory_order_acquire)) {
*ptr = data;
atomic_store_explicit(&ch->datap, NULL, memory_order_release);
if (atomic_fetch_sub_explicit(&ch->recv_ftx, 1, memory_order_acquire) ==
CHAN_WAITING)
futex_wake(&ch->recv_ftx, CCCC = 1);
}
```
替換成功,則代表目前 `ch->data` 指向 `data`,使用 atomic 指令將 `&ch->send_ftx` +1,並回傳 `&ch->send_ftx` 初始值是否為 `CHAN_NOT_READY = 1`,是的話就持續等到接收端接收。
`futex_wait(&ch->send_ftx, CHAN_WAITING)` 會持續等待,直到 `&ch->send_ftx` 不等於 `CHAN_WAITING`。
最後再歸還鎖。
```c
else {
if (atomic_fetch_add_explicit(&ch->send_ftx, 1, memory_order_acquire) ==
CHAN_NOT_READY) {
do {
futex_wait(&ch->send_ftx, CHAN_WAITING);
} while (atomic_load_explicit(
&ch->send_ftx, memory_order_acquire) == CHAN_WAITING);
if (atomic_load_explicit(&ch->closed, memory_order_relaxed)) {
errno = EPIPE;
return -1;
}
}
}
mutex_unlock(&ch->send_mtx);
return 0;
}
```
* `chan_recv_unbuf` 和`chan_send_unbuf` 概念類似。
* `static int chan_recv_unbuf(struct chan *ch, void **data)`
先檢查要接收的位置 data 是否存在,和檢查 channel 是否開啟
```c
if (!data) {
errno = EINVAL;
return -1;
}
if (atomic_load_explicit(&ch->closed, memory_order_relaxed)) {
errno = EPIPE;
return -1;
}
```
將上鎖 `ch->recv_mtx` 上鎖,避免同時執行緒進行接收。
```c
mutex_lock(&ch->recv_mtx);
```
檢查 `ch->datap` 是否為 NULL,是的話代表還**沒有傳送端傳送資料**;否的話代表已有**接收端傳送資料**。
已有傳送資料的情況下,會將要資料接收的位置 `data` 替換成 `ptr` ,而目前 `ptr` = `ch->datap` ,為傳送端傳送的資料。再將 `ch->datap` 指向 NULL,代表資料接收已完成,將傳送端的 futex 減 1 並喚醒先前等待的傳送端執行緒。
```c
void **ptr = NULL;
if (!atomic_compare_exchange_strong_explicit(&ch->datap, &ptr, data,
memory_order_acq_rel,
memory_order_acquire)) {
*data = *ptr;
atomic_store_explicit(&ch->datap, NULL, memory_order_release);
if (atomic_fetch_sub_explicit(&ch->send_ftx, 1, memory_order_acquire) ==
CHAN_WAITING)
futex_wake(&ch->send_ftx, 1);
}
```
因為目前傳送端尚未傳送資料,接下來會將 `ch->recv_ftx` 加一表示等待,直到傳送端喚醒此執行緒。
```c
else {
if (atomic_fetch_add_explicit(&ch->recv_ftx, 1, memory_order_acquire) ==
CHAN_NOT_READY) {
do {
futex_wait(&ch->recv_ftx, CHAN_WAITING);
} while (atomic_load_explicit(
&ch->recv_ftx, memory_order_acquire) == CHAN_WAITING);
if (atomic_load_explicit(&ch->closed, memory_order_relaxed)) {
errno = EPIPE;
return -1;
}
}
}
```
最後歸還鎖
```c
mutex_unlock(&ch->recv_mtx);
```
我們可以歸納出,如果傳送端先傳送資料, `ch->datap` 不為 NULL ,而接收端會進入 CAS 失敗的情況,喚醒傳送端的執行緒;反之,接收端先等待資料, `ch->datap` 不為 NULL,傳送端會進入 CAS 失敗,喚醒等待的接收端執行緒。
* `chan_close` 根據 `ch->cap` 決定要關閉的 channel 種類
```c
void chan_close(struct chan *ch)
{
ch->closed = true;
if (!ch->cap) {
atomic_store(&ch->recv_ftx, CHAN_CLOSED);
atomic_store(&ch->send_ftx, CHAN_CLOSED);
}
futex_wake(&ch->recv_ftx, INT_MAX);
futex_wake(&ch->send_ftx, INT_MAX);
}
```
### 排除 ThreadSanitizer 的錯誤訊息並提出改進方案
ThreadSanitizer 是一個可以檢查各個執行續間有沒有發生 Data race,並且會在編譯時進行偵測程式碼,並在程式運行時紀錄每個執行緒的行為,如造訪的記憶體位置、讀寫行為等,最後再進行報告。
我們可以寫一個會造成 Data race 的程式 `data_race.c`
```c
#include <stdio.h>
#include <pthread.h>
pthread_mutex_t lock;
int shared_var = 0;
int times = 1000000;
void* increment(void* arg) {
for (int i = 0; i < times; ++i) {
++shared_var;
}
return NULL;
}
int main() {
pthread_t t1, t2;
pthread_create(&t1, NULL, increment, NULL);
pthread_create(&t2, NULL, increment, NULL);
pthread_join(t1, NULL);
pthread_join(t2, NULL);
printf("Final value: %d\n", shared_var);
return 0;
}
```
並使用 ThreadSanitizer 進行編譯
```
gcc -g -fsanitize=thread -o data_race data_race.c
```
執行 `data_race.c`
```
FATAL: ThreadSanitizer: unexpected memory mapping 0x63faef1d9000-0x63faef1da000
```
反而出現的不是 Data race 的問題,而是 memory mapping 的問題,在 [Thread Sanitizer FATAL error on kernel version](https://github.com/google/sanitizers/issues/1716) 說明到可能是 ASLR(Address space layout randomization) 的問題。
ASLR 通過隨機地分配程序的記憶體位置,防止惡意程式或攻擊者利用已知的記憶體位置來進行攻擊,預設是 32 bit,我們將其設置為 28 bit。
```shell
$ sudo sysctl -w vm.mmap_rnd_bits=28
vm.mmap_rnd_bits = 28
```
再執行一次 `data_race.c`
```c
==================
WARNING: ThreadSanitizer: data race (pid=43074)
Read of size 4 at 0x55939c8a7018 by thread T2:
#0 increment /home/kkkkk1109/2024q1week12/exam2/data_race.c:13 (data_race+0x129d)
Previous write of size 4 at 0x55939c8a7018 by thread T1:
#0 increment /home/kkkkk1109/2024q1week12/exam2/data_race.c:13 (data_race+0x12b5)
Location is global '<null>' at 0x000000000000 (data_race+0x000000004018)
Thread T2 (tid=43077, running) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
#1 main /home/kkkkk1109/2024q1week12/exam2/data_race.c:26 (data_race+0x134e)
Thread T1 (tid=43076, running) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
#1 main /home/kkkkk1109/2024q1week12/exam2/data_race.c:25 (data_race+0x1331)
SUMMARY: ThreadSanitizer: data race /home/kkkkk1109/2024q1week12/exam2/data_race.c:13 in increment
==================
Final value: 2000000
ThreadSanitizer: reported 1 warnings
```
說明發生了 data race。
接著在程式中加入 mutex 進行保護
::: spoiler `data_race_protect.c`
```diff
#include <stdio.h>
#include <pthread.h>
++pthread_mutex_t lock;
int shared_var = 0;
int times = 1000000;
void* increment(void* arg) {
for (int i = 0; i < times; ++i) {
++ pthread_mutex_lock(&lock);
shared_var;
++ pthread_mutex_unlock(&lock);
}
return NULL;
}
int main() {
pthread_t t1, t2;
++ pthread_mutex_init(&lock, NULL);
pthread_create(&t1, NULL, increment, NULL);
pthread_create(&t2, NULL, increment, NULL);
pthread_join(t1, NULL);
pthread_join(t2, NULL);
++ pthread_mutex_destroy(&lock);
printf("Final value: %d\n", shared_var);
return 0;
}
```
:::
ThreadSanitizer 就沒有報錯了
```
$ ./data_race_protect
Final value: 2000000
```
接著在測驗 `2` 加入 ThreadSanitizer 進行編譯
```c
CFLAGS = -std=c11 -Wall -Wextra -pthread -fsanitize=thread
```
也出現了 data race 的問題,發現是 unbuffered 發生了問題
```c
WARNING: ThreadSanitizer: data race (pid=77546)
Read of size 8 at 0x7f96901be1c8 by thread T1:
#0 reader <null> (exam2+0x16a0)
Previous write of size 8 at 0x7f96901be1c8 by thread T81:
#0 chan_send_unbuf <null> (exam2+0x2e58)
#1 chan_send <null> (exam2+0x3477)
#2 writer <null> (exam2+0x15b2)
Location is stack of thread T1.
Thread T1 (tid=77548, running) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
#1 create_threads <null> (exam2+0x18b0)
#2 test_chan <null> (exam2+0x1b81)
#3 main <null> (exam2+0x1d3b)
Thread T81 (tid=77628, finished) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
#1 create_threads <null> (exam2+0x18b0)
#2 test_chan <null> (exam2+0x1baf)
#3 main <null> (exam2+0x1d3b)
```
執行緒 `T1` 在 `reader` 和執行緒 `T81` 在 `chan_send_unbuf` 同時存取同個記憶體位置 `0x7f96901be1c8` ,不過只靠此訊息無法得知實際是哪個變數被同時存取,因此我使用了 [Helgrind](https://valgrind.org/docs/manual/hg-manual.html) 來輔助進行 data race 的除錯。
Helgind 是 Valgrind 的其中一個工具,可以檢測使用以下事項
* POSIX thread API 的使用錯誤
* deadlock
* data race
使用 helgrind 來檢查 data race
```
$ valgrind --tool=helgrind ./exam2
```
:::spoiler
```c
Possible data race during write of size 8 at 0x4AA1048 by thread #12
==7850== Locks held: none
==7850== at 0x10A22E: chan_send_unbuf (in /home/kkkkk1109/2024q1week12/exam2/exam2)
==7850== by 0x10A5F6: chan_send (in /home/kkkkk1109/2024q1week12/exam2/exam2)
==7850== by 0x1092A5: writer (in /home/kkkkk1109/2024q1week12/exam2/exam2)
==7850== by 0x485396A: ??? (in /usr/libexec/valgrind/vgpreload_helgrind-amd64-linux.so)
==7850== by 0x4909AC2: start_thread (pthread_create.c:442)
==7850== by 0x499AA03: clone (clone.S:100)
==7850==
==7850== This conflicts with a previous read of size 8 by thread #2
==7850== Locks held: none
==7850== at 0x10A3AF: chan_recv_unbuf (in /home/kkkkk1109/2024q1week12/exam2/exam2)
==7850== by 0x10A641: chan_recv (in /home/kkkkk1109/2024q1week12/exam2/exam2)
==7850== by 0x109329: reader (in /home/kkkkk1109/2024q1week12/exam2/exam2)
==7850== by 0x485396A: ??? (in /usr/libexec/valgrind/vgpreload_helgrind-amd64-linux.so)
==7850== by 0x4909AC2: start_thread (pthread_create.c:442)
==7850== by 0x499AA03: clone (clone.S:100)
==7850== Address 0x4aa1048 is 8 bytes inside a block of size 80 alloc'd
==7850== at 0x484A919: malloc (in /usr/libexec/valgrind/vgpreload_helgrind-amd64-linux.so)
==7850== by 0x109BCE: chan_make (in /home/kkkkk1109/2024q1week12/exam2/exam2)
==7850== by 0x1095B3: test_chan (in /home/kkkkk1109/2024q1week12/exam2/exam2)
==7850== by 0x1097DA: main (in /home/kkkkk1109/2024q1week12/exam2/exam2)
==7850== Block was alloc'd by thread #1
```
:::
說明是 `chan_send_unbuf` 和 `chan_recv_unbuf` 中同時寫入和讀取了一個 `size = 8` 的變數,而此變數是在 72byte 的結構中宣告的,推測是在 channel 中的變數,而透過觀察`chan_send_unbuf` 和 `chan_recv_unbuf`
```c
static int chan_send_unbuf(struct chan *ch, void *data)
{
...
void **ptr = NULL;
if (!atomic_compare_exchange_strong_explicit(&ch->datap, &ptr, &data,
memory_order_acq_rel,
memory_order_acquire)) {
*ptr = data;
atomic_store_explicit(&ch->datap, NULL, memory_order_release);
if (atomic_fetch_sub_explicit(&ch->recv_ftx, 1, memory_order_acquire) ==
CHAN_WAITING)
futex_wake(&ch->recv_ftx, CCCC);
}
...
}
```
```c
static int chan_recv_unbuf(struct chan *ch, void **data)
{
...
void **ptr = NULL;
if (!atomic_compare_exchange_strong_explicit(&ch->datap, &ptr, data,
memory_order_acq_rel,
memory_order_acquire)) {
*data = *ptr;
atomic_store_explicit(&ch->datap, NULL, memory_order_release);
if (atomic_fetch_sub_explicit(&ch->send_ftx, 1, memory_order_acquire) ==
CHAN_WAITING)
futex_wake(&ch->send_ftx, 1);
}
...
}
```
推測可能在 `*ptr = data` 和 `*data = *ptr` 沒有保護,可能造成同時寫入和讀取的問題
嘗試加入一個新的 mutex `data_mtx` ,在要修改前先使用鎖保護
```diff
static int chan_recv_unbuf(struct chan *ch, void **data)
{
...
++ mutex_lock(&ch->data_mtx)
*data = *ptr;
++ mutex_unlock(&ch->data_mtx);
...
}
static int chan_send_unbuf(struct chan *ch, void *data)
{
...
++ mutex_lock(&ch->data_mtx)
*ptr = data;
++ mutex_unlock(&ch->data_mtx);
...
}
```
仍然沒有解決 data race 的問題。
::: info
但後來思考,不應該兩者同時進入此區塊,因為在 `chan_send_unbuf` 和 `chan_recv_unbuf` 中,比較式為 atomic 操作,若一方替換成功,則另一方必定去到替換失敗區,可能問題不出在這?
:::
透過讓程式 print 出 目前所在的區塊,發現確實不會有兩者同時在 cas 成功或 cas 失敗的區塊
```
...
recv in cas success
send in cas failed
send in cas success
recv in cas failed
...
```
改成使用 `atomic` 指令進行 `*ptr = data` 和 `*data = *ptr`。
```diff
static int chan_send_unbuf(struct chan *ch, void *data)
{
...
- //*ptr = data;
+ atomic_store_explicit(ptr, data, memory_order_release);
...
}
static int chan_recv_unbuf(struct chan *ch, void *data)
{
...
- //*data= *ptr;
+ atomic_store_explicit(data, *ptr, memory_order_release);
...
}
```
還有其他可能發生 data race 的兩個地方,在 `reader` 和 `chan_send_unbuf`
:::spoiler
```c
==12438== Possible data race during read of size 8 at 0x56A0DB8 by thread #2
==12438== Locks held: none
==12438== at 0x10932F: reader (in /home/kkkkk1109/2024q1week12/exam2/exam2)
==12438== by 0x485396A: ??? (in /usr/libexec/valgrind/vgpreload_helgrind-amd64-linux.so)
==12438== by 0x4909AC2: start_thread (pthread_create.c:442)
==12438== by 0x499AA03: clone (clone.S:100)
==12438==
==12438== This conflicts with a previous write of size 8 by thread #12
==12438== Locks held: none
==12438== at 0x10A1FC: chan_send_unbuf (in /home/kkkkk1109/2024q1week12/exam2/exam2)
==12438== by 0x10A5F6: chan_send (in /home/kkkkk1109/2024q1week12/exam2/exam2)
==12438== by 0x1092A5: writer (in /home/kkkkk1109/2024q1week12/exam2/exam2)
==12438== by 0x485396A: ??? (in /usr/libexec/valgrind/vgpreload_helgrind-amd64-linux.so)
==12438== by 0x4909AC2: start_thread (pthread_create.c:442)
==12438== by 0x499AA03: clone (clone.S:100)
==12438== Address 0x56a0db8 is on thread #2's stack
==12438== in frame #0, created by reader (???:)
```
:::
以下是 `reader` 程式碼
```c
void *reader(void *arg)
{
struct thread_arg *a = arg;
size_t msg = 0, received = 0, expect = a->to - a->from;
printf("address of msg is %lx\n",&msg);
while (received < expect) {
if (chan_recv(a->ch, (void **) &msg) == -1) break;
atomic_fetch_add_explicit(&msg_count[msg], 1, memory_order_relaxed);
++received;
}
return 0;
}
```
推測是 `reader` 創造的 `msg` 發生了 data race,將 `msg` 的記憶體位置 `print` 出
```c
address of msg is 7fecce6be1c8
==================
WARNING: ThreadSanitizer: data race (pid=14787)
Read of size 8 at 0x7fecce6be1c8 by thread T1:
#0 reader <null> (exam2+0x16cf)
Previous atomic write of size 8 at 0x7fecce6be1c8 by thread T11:
```
確實是 msg 的問題
:::info
懷疑是 `reader` 中的 `msg_count[msg]` 的 `msg`,需要先經過 `atomic` 的處理
:::
將 `msg` 的讀取改成以變數 `count` 進行 atomic 操作
```diff
static void *reader(void *arg)
{
...
+ count = atomic_load(&msg);
+ atomic_fetch_add_explicit(&msg_count[count], 1, memory_order_relaxed);
- atomic_fetch_add_explicit(&msg_count[msg], 1, memory_order_relaxed);
++received;
}
return 0;
}
```
便成功沒有報錯!
> [commit](https://github.com/kkkkk1109/2024q1week12/commit/188319d5598c10b58f3c2c0280d1e0aa88e402e8)
### 測驗 `3`
此題為實作一個 lock-free 的 single-producer/single-consumer,使用 ring buffer 且避免造成 false sharing。
定義了 `counter_t` 用為計數,注意到這邊使用 `union` ,並用 `w` 和 `r` 區分寫入和讀取。
```c
typedef union {
volatile uint32_t w;
volatile const uint32_t r;
} counter_t;
```
接著看 `spsc_queue`
```c
typedef struct spsc_queue {
counter_t head; /* Mostly accessed by producer */
volatile uint32_t batch_head;
counter_t tail __ALIGN; /* Mostly accessed by consumer */
volatile uint32_t batch_tail;
unsigned long batch_history;
/* For testing purpose */
uint64_t start_c __ALIGN;
uint64_t stop_c;
element_t data[SPSC_QUEUE_SIZE] __ALIGN; /* accessed by prod and coms */
} __ALIGN spsc_queue_t;
```
可以看到使用了 `head` 、 `tail` 作為 ring buffer 的開頭和結尾。而這邊使用了 ` __ALIGN`,其定義為
```c
#define __ALIGN __attribute__((aligned(64)))
```
由於題目有說明要避免 false sharing,因此將常使用到的物件以 64byte 為一個單位,如此一來就不會被存在同個 cacheline,而不會造成頻繁的 cache coherence。
:::info
`batch_head` 、`batch_tail` 和 `batch_history` 為批次處理,是一次讀取 `batch_size` 的量嗎
:::
`element_t data[SPSC_QUEUE_SIZE] __ALIGN` 宣告了此 ring buffer 的大小。
接著是關於 queue 的操作
`queue_init` 使用 [`memset`](https://man7.org/linux/man-pages/man3/memset.3.html) 將 queue 中的各個數值以 0 填滿。
```c
static void queue_init(spsc_queue_t *self)
{
memset(self, 0, sizeof(spsc_queue_t));
self->batch_history = SPSC_BATCH_SIZE;
}
```
* `SPSC_QUEUE_SIZE = (1024 * 8)` 為 ring buffer 的大小
* `SPSC_BATCH_SIZE` 為 queue_size 除以 16
* `SPSC_BATCH_INCREAMENT = (SPSC_BATCH_SIZE / 2)` 為 batch 一次增加的量
* `SPSC_CONGESTION_PENALTY (1000)` 作為等待的時間
接著看 `dequeue`
首先,先取得上一次放入的 batch 大小, `*val_ptr` 為將取出資料的放入的空間。
:::info
若是第一次執行的話,batch_history = SPSC_BATCH_SIZE
:::
```c
static int dequeue(spsc_queue_t *self, element_t *val_ptr)
{
unsigned long batch_size = self->batch_history;
*val_ptr = SPSC_QUEUE_ELEMENT_ZERO;
...
}
```
:::success
第一次執行的話, tail.r = batch_tail = 0
:::
會判斷目前 `tail` 是否達到 `batch_tail` ,達到的話就要增加 `bathc_tail` 的量,先以 `tmp_tail` 去判斷要增加的量。
檢查 `tmp_tail` 是否達到 ring buffer 結尾,接著若 `batch_history` 小於 `SPSC_BATCH_SIZE` 的話,則要增加 `batch_history` 的大小,有兩個選項,比較 `SPSC_BATCH_SIZE` 和 `batch_history + SPSC_BATCH_INCREAMENT` ,選擇較小的一個做為新的 `batch_history`
```c
/* Try to zero in on the next batch tail */
if (self->tail.r == self->batch_tail) {
uint32_t tmp_tail = self->tail.r + SPSC_BATCH_SIZE;
// check if the batch_tail meet the end of ring buffer
if (tmp_tail >= SPSC_QUEUE_SIZE) {
tmp_tail = 0;
// determine the batch_history
if (self->batch_history < SPSC_BATCH_SIZE) {
self->batch_history =
(SPSC_BATCH_SIZE <
(self->batch_history + SPSC_BATCH_INCREAMENT))
? SPSC_BATCH_SIZE
: (self->batch_history + SPSC_BATCH_INCREAMENT);
}
}
...
}
```
目前已經決定 `tmp_tail` ,不過並非直接指定這樣大小的 batch_size,會先等待後續的空間已經有資料可以讀取,因此以 `while (!(self->data[tmp_tail]))` 判斷是否有資料,沒有的話便等待 producer 產生資料,並將 `batch_size` 縮小,重複步驟直到有 producer 產生的資料。
```c
if (self->tail.r == self->batch_tail) {
...
batch_size = self->batch_history;
while (!(self->data[tmp_tail])) {
wait_ticks(SPSC_CONGESTION_PENALTY);
batch_size >>= 1;
if (batch_size == 0)
return SPSC_Q_EMPTY;
tmp_tail = self->tail.r + batch_size;
if (tmp_tail >= SPSC_QUEUE_SIZE)
tmp_tail = 0;
}
self->batch_history = batch_size;
if (tmp_tail == self->tail.r)
tmp_tail = (tmp_tail + 1) >= SPSC_QUEUE_SIZE ? 0 : tmp_tail + 1;
self->batch_tail = tmp_tail;
}
```
:::info
最後 `if (tmp_tail == self->tail.r)` 猜測為判斷是否回到 tail.r 因為 ring buffer 為環形結構。
:::
接著是實際取出資料,使用 `val_ptr` 獲得資料 `self->data[self->tail.r]`
```c
*val_ptr = self->data[self->tail.r];
self->data[self->tail.r] = SPSC_QUEUE_ELEMENT_ZERO;
self->tail.w++;
if (self->tail.r >= SPSC_QUEUE_SIZE)
self->tail.w = 0;
return SPSC_OK;
```
接下來看如何使用 `enqueue` 放入資料,若 `head` 達到 `batch_head`,則使用 tmp_head 去決定下次 batch_head 的位置。
這邊和 consumer 不同的是,每次都是以 `SPSC_BATCH_SIZE` 來增加。
若 `tmp_head` 的部分有資料,則代表已經 ring buffer 已經滿了,無法再放入資料。
```c
static int enqueue(spsc_queue_t *self, element_t value)
{
/* Try to zero in on the next batch head. */
if (self->head.r == self->batch_head) {
uint32_t tmp_head = self->head.r + SPSC_BATCH_SIZE;
if (tmp_head >= SPSC_QUEUE_SIZE)
tmp_head = 0;
// wait
if (self->data[tmp_head]) {
/* run spin cycle penality */
wait_ticks(SPSC_CONGESTION_PENALTY);
return SPSC_Q_FULL;
}
self->batch_head = tmp_head;
}
// put value in ring buffer
self->data[self->head.r] = value;
self->head.w++;
if (self->head.r >= SPSC_QUEUE_SIZE)
self->head.w = 0;
return SPSC_OK;
}
```
接著看對應的 consumer 和 producer 操作
定義了 `struct init_info_t`
```c
typedef struct {
uint32_t cpu_id;
pthread_barrier_t *barrier;
} init_info_t;
```
`cpu_id` 對應到不同的 cpu ,`barrier` 作為一個同步的手段,會進入等待其他執行緒完成特定事項。
* consumer
```c
void *consumer(void *arg)
{
element_t value = 0, old_value = 0;
init_info_t *init = (init_info_t *) arg;
uint32_t cpu_id = init->cpu_id;
pthread_barrier_t *barrier = init->barrier;
/* user needs tune this according to their machine configurations. */
cpu_set_t cur_mask;
CPU_ZERO(&cur_mask);
CPU_SET(cpu_id * 2, &cur_mask);
printf("consumer %d: ---%d----\n", cpu_id, 2 * cpu_id);
if (sched_setaffinity(0, sizeof(cur_mask), &cur_mask) < 0) {
printf("Error: sched_setaffinity\n");
return NULL;
}
...
}
```
首先,[`cpu_set_t`](https://man7.org/linux/man-pages/man3/CPU_SET.3.html) 定義為多個 cpu 的集合,而使用 `CPU_ZERO` 將 `cur_mask` 設為沒有 CPU,再使用 `CPU_SET` 加入 `cpu_id * 2` 的 CPU。
接著,使用 [`sched_setaffinity`](https://man7.org/linux/man-pages/man2/sched_setaffinity.2.html) 將 thread 運行在特定的 CPU
```c
int sched_setaffinity(pid_t pid, size_t cpusetsize,
const cpu_set_t *mask);
```
如圖
![image](https://hackmd.io/_uploads/H1QPqSTI0.png)
```
consumer 1: ---2----
consumer 2: ---4----
Consumer created...
consumer 3: ---6----
producer 0: ---1----
Consumer created...
consumer 5: ---10----
Consumer created...
Consumer created...
consumer 4: ---8----
Consumer created...
consumer: 27 cycles/op
consumer: 27 cycles/op
consumer: 27 cycles/op
consumer: 27 cycles/op
consumer: 27 cycles/op
producer 5 cycles/op
Done!
```
若 pid 為零,則會選擇呼叫此函式的 thread
```c
...
printf("Consumer created...\n");
pthread_barrier_wait(barrier);
queues[cpu_id].start_c = read_tsc();
...
```
接著,使用 `barrier` 等到所有 `consumer` 完成 `sched_setaffinity`
`read_tsc()` 對應如下,主要為獲取時間
```c
static inline uint64_t read_tsc()
{
uint64_t time;
uint32_t msw, lsw;
__asm__ __volatile__(
"rdtsc\n\t"
"movl %%edx, %0\n\t"
"movl %%eax, %1\n\t"
: "=r"(msw), "=r"(lsw)
:
: "%edx", "%eax");
time = ((uint64_t) msw << 32) | lsw;
return time;
}
```
```c
for (uint64_t i = 1; i <= TEST_SIZE; i++) {
while (dequeue(&queues[cpu_id], &value) != 0)
;
assert((old_value + 1) == value);
old_value = value;
}
queues[cpu_id].stop_c = read_tsc();
printf(
"consumer: %ld cycles/op\n",
((queues[cpu_id].stop_c - queues[cpu_id].start_c) / (TEST_SIZE + 1)));
pthread_barrier_wait(barrier);
return NULL;
```
接著,執行 `dequeue` ,並記錄完成時間,使用 `barrier` 等到所有 consumer 完成。
* producer
```c
void producer(void *arg, uint32_t num)
{
...
for (uint64_t i = 1; i <= TEST_SIZE + SPSC_BATCH_SIZE; i++) {
for (int32_t j = 1; j < num; j++) {
element_t value = i;
while (enqueue(&queues[j], value) != 0)
;
}
}
...
}
```
內容和 `consumer` 僅有從 `dequeue` 換成 `enqueue`。
### 排除 ThreadSanitizer 的錯誤訊息並提出改進方案
在編譯時加入
```shell
gcc -Wall -O2 -I. -o main main.c -lpthread -fsanitize=thread
```
產生錯誤訊息
```c
==================
WARNING: ThreadSanitizer: thread leak (pid=6908)
Thread T1 (tid=6910, finished) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
#1 main <null> (exam3+0x1293)
And 1 more similar thread leaks.
SUMMARY: ThreadSanitizer: thread leak (/home/kkkkk1109/2024q1week12/exam3/exam3+0x1293) in main
```
說明發生 thread leak ,可能有 thread 沒有適當的釋放,猜測是 `consumer_thread` 沒有被安全釋放,加上 `pthread_detach` 來安全釋放 thread
```c
for (int i = 1; i < max_threads; i++) {
INIT_CPU_ID(i) = i;
INIT_BARRIER(i) = &barrier;
error = pthread_create(&consumer_thread, &consumer_attr, consumer,
INIT_PTR(i));
pthread_detach(consumer_thread);
}
```
便沒有報錯訊息了
> [commit](https://github.com/kkkkk1109/2024q1week12/commit/4672babc762546b51f7cba571619225979a5db9b)