owned this note
owned this note
Published
Linked with GitHub
# Linux 核心專題: 重作第 10 和第 12 週測驗題
> 執行人: stevendd543
> [專題解說影片](https://youtu.be/dKtJYr0xGSw)
### Reviewed by `LindaTing0106`
下方提到關於 pipe 的簡介,另外還想請問在使用 pipe 時,你如何去確保傳遞的數據不會丟失或失真?另外,使用 pipe 是否會影響程序的運行效率或性能?
> [name=stevendd543][pipe(7)](https://man7.org/linux/man-pages/man7/pipe.7.html) 裡面有提到說兩個 process 在進行數據讀寫的時候 system call 的 `read` 與 `write` 會有錯誤處理機制防止數據讀寫錯誤。也有提到說 pipe 的通信機制是建立在 kernel space 能夠使交換資料不必 context switch 造成效率低下。
### Reviewedd by `eleanorLYJ`
在 第 12 週測驗的第 3 題部分,"讀寫逞罰" 是甚麼意思?
> [name=stevendd543] 這裡是指當 thread 試圖對 queue 進行寫入或者讀取時。若為滿或為空就會將 thread 暫停數秒鐘等待其他 thread 對其有新的操作。
### Reviewed by `Shiang1212`
> 在共享變數中擁有一個用來儲存 hazard pointers **HP,每一個 thread 都透過自己的 id(透過 alloc_tid 取得) 當作 index 去修改 HP[tid],被成功儲存在這的地址都是被保護不被別的 thread 刪除,可以從 lfq_dequeue_tid 得知要 dequeue 之前必須成功將資料加入 hazard pointer 中...
[第 10 週測驗的第 2 題](https://hackmd.io/@sysprog/SJXN0y9UR#TODO-%E7%AC%AC-10-%E9%80%B1%E6%B8%AC%E9%A9%97%E7%9A%84%E7%AC%AC-2-%E9%A1%8C),對於 hazard pointer 實作的描述有點混亂,一個段落裡描述太多東西了。建議你可以列點,把 `HP`、`lfq_dequeue_tid`、`lfq_enqeue` 的作用以及你的發現分開描述,以便讀者理解你想探討的議題。
至於延伸問題,我看到這次期末專題很多同學都使用 [Userspace RCU](https://github.com/urcu/userspace-rcu/blob/master/doc/rcu-api.md),你可以參考其他同學的想法並嘗試實作。
> [name=stevendd543] 了解 ! 我會修改我的描述,以及嘗試去時做看看。
### Reviewed by `kkkkk1109`
> pthread_barrier 是一個計數鎖用來同步同一~~進程~~產生的~~線程~~,因此就算被分配到其他 CPU 執行也是原屬於 pthread_create 的~~進程~~
根據 [資訊科技詞彙翻譯](https://hackmd.io/@sysprog/it-vocabulary) ,建議將 process 翻譯為 **行程** 。
> [name=stevendd543]收到 !!
## 任務簡介
強化對於並行處理的認知。
## TODO: 第 10 週測驗的第 1 題
> 包含延伸問題
#### 程式碼解析筆記
### Signal
這段程式碼主要是在並行環境下執行 bloom filter,首先在程式碼一開始有使用到 **Signal**,訊號是 processes 之間溝通的一種方式,舉例來說對 shell 輸入 `Ctrl + c` 就會對執行中的程式碼傳送訊號 `SIGINT`,因此在程式碼一開始先設定好信號處理程式碼,當收到`SIGHUP` 訊號表示通信中止,因此就能夠自定義通信中止處理函數 `handle_sighup`。
:::danger
注意用語:
* default 是「預設」,不是「默認」,二者語境不同
:::
[signal(7)](https://man7.org/linux/man-pages/man7/signal.7.html) 提到如果沒有註冊 signal 處理機制,它將以預設的方式處理中止。
> Using these system calls, a process can elect one of the following behaviors to occur on delivery of the signal: perform the default action; ignore the signal; or catch the signal with a signal handler, a programmer-defined function that is automatically invoked when the signal is delivered.
### Pipe
:::danger
務必使用本課程教材規範的術語!
:::
[pipe](https://man7.org/linux/man-pages/man2/pipe.2.html) 是 linux support 的 Unix IPC 用於不同 process 之間的資料交換,pipe 只會存在於 memory 且它會具有幾個特性 1、單向通信 2、半雙工,所以 pipe 會回傳兩個 file description, fd[0] 是用於讀取,fd[1] 是用於寫入。
### What child process do?
child process 是從 parent process `create_worker` 中產生,並且會關閉 pipe 讀取通道,只讓 child 擁有寫入的權力。並且將 parent process file description 將其與 pipe 寫入端做對應。
```c
pid_t pid = fork();
if (!pid) {
/* Worker */
close(fd[0]);
globals.parent_fd = fd[1];
worker_loop();
exit(0);
}
```
:::danger
什麼是「機率性地」?
改進你的漢語表達。
:::
`worker_loop` 裡面有以 0.85 的機率呼叫`bloom_test` 0.15 的機率呼叫 `bloom_add`,並且記錄操作次數。
```c
while (1) {
int n = (*k) % 100;
if (n < CONTAINS_P) {
bloom_test(globals.filter, key, key_len);
} else {
bloom_add(globals.filter, key, key_len);
}
(*k)++;
globals.op_counter++;
}
```
以上是 child process 從被創建後會完成的事情,但假設執行過程中程式被終止,將會有自定義的處理函數將紀錄資料透過 pipe 寫回 parent process。重新看回 parent process 一開始有將 SIGHUP 訊號設定對應的處裡函數,並且在程式結束前調用 system call: [kill](https://man7.org/linux/man-pages/man2/kill.2.html) 傳遞 SIGHUP 訊號給 child processe 來安全地終止程式。
<s>
從 `create_worker` 中可以發現 child process 會關閉讀取通道,再進行 `work_loop` 將資料透過 `write` 寫入 `pipe` 供給 parent 日後 child 被停止時可以讀取,此執行函數內有兩個重要的函數第一個是 `bloom_add` 對資料進行 hash 後加入 bloom filter 中,再來是 `bloom_test` 主要是檢查此資料是否再 bloom filter 當中。而且 `worker_loop` 中是以 0.85 機率去執行檢驗、0.15 進行加入 bloom filter 操作,最後可以發現 `kill` 就是對<s>程序</s> 發起 `SIGHUP` 此值是在一開始就<s>掛起</s> 等待的訊號 。
</s>
:::danger
什麼「掛起」,你知道自己在寫什麼嗎?不要看低劣的簡體中文材料,以第一手資訊為主,改進你的漢語表達。
:::
### Splitting hash values into 32-bit segments for separate storage
:::danger
區分「函數」和「函式」,務必採用本課程教材規範的術語。
:::
`get` 函式中會將 key 除以單一資料的 byte 大小來搜尋對應的 index of bytes,並在 1 byte 中設定 bit mask 來表示取得對應的資料。
```c
static inline int get(bloom_t *filter, size_t key)
{
uint64_t index = key / sizeof(char);
uint8_t bit = 1u << (key % sizeof(char));
return (filter->data[index] & bit) != 0;
}
```
`bloom_add` 透過將 64-bit hash value 切成兩分來當作有兩個 hash function 產生的 hash value。
```c
void bloom_add(bloom_t *filter, const void *key, size_t keylen)
{
uint64_t hbase = hash(key, keylen);
uint32_t h1 = (hbase >> 32) % BLOOMFILTER_SIZE;
uint32_t h2 = hbase % BLOOMFILTER_SIZE;
set(filter, h1);
set(filter, h2);
}
```
```c
int bloom_test(bloom_t *filter, const void *key, size_t keylen)
{
uint64_t hbase = hash(key, keylen);
uint32_t h1 = (hbase >> 32) % BLOOMFILTER_SIZE;
uint32_t h2 = hbase % BLOOMFILTER_SIZE;
return get(filter, h1) && get(filter, h2);
}
```
### 延伸問題:
由於此程式是以 `fork()` 創建多的子進程,每個 process 彼此不共享內存空間,因此不需要擔心 race condition 問題。
## TODO: 第 10 週測驗的第 2 題
> 包含延伸問題
#### 程式碼解析筆記
Hazard point 主要是保護指標不會變成 dangling pointer 也就是資料被其他 thread 給刪除後自己無法取得。
:::danger
這就是你的「總結」,難道程式碼這麼多行,你只能用這樣少的話語描述?
重新揣摩程式碼背後的考量、搭配教材,重新闡述。
:::
#### 程式碼架構
```c
struct lfq_ctx {
alignas(64) struct lfq_node *head;
int count;
struct lfq_node **HP; /* hazard pointers */
int *tid_map;
bool is_freeing;
struct lfq_node *fph, *fpt; /* free pool head/tail */
/* FIXME: get rid of struct. Make it configurable */
int MAX_HP_SIZE;
/* avoid cacheline contention */
alignas(64) struct lfq_node *tail;
};
```
在共享變數中擁有一個用來儲存 hazard pointers `**HP`,每一個 thread 都透過自己的 id(透過 `alloc_tid` 取得) 當作 index 去修改 `HP[tid]`,被**成功**儲存在這的地址都是被保護不被別的 thread 刪除,可以從 `lfq_dequeue_tid` 得知要 dequeue 之前必須成功將資料加入 hazard pointer 中。不過在一開始無法理解為何如果成功將 `old_head` 加入 `HP` 被受保護,還需要在離開迴圈時進行 `ctx->head` 與 `old_head` 比較確認是一樣才能更換成新的 head,不是已經將其保護住了?後來思考才發現 Hazard pointers 是為了保護資料**不被刪除**,因此其他 thread 仍然可以對其 enqueue,所以如果 `ctx->head` 被更動的話就必須重新 `dequeue`。`enqueue` 就不需考慮 HP 直接使用 atomic 放入即可不過會先虛假的更改 tail 最後才會作鏈接。
:::danger
閱讀教材對應的論文。
:::
```c
do {
retry:
/* continue jumps to the bottom of the loop, and would attempt a
* atomic_compare_exchange_strong with uninitialized new_head.
*/
old_head = atomic_load(&ctx->head);
atomic_store(&ctx->HP[tid], old_head);
atomic_thread_fence(memory_order_seq_cst);
/* another thread freed it before seeing our HP[tid] store */
if (old_head != atomic_load(&ctx->head))
goto retry;
new_head = atomic_load(&old_head->next);
if (new_head == 0) {
atomic_store(&ctx->HP[tid], 0);
return NULL; /* never remove the last node */
}
} while (!atomic_compare_exchange_strong (&ctx->head, &old_head, new_head));
```
#### `can_free` 與 `HP` 是否衝突
可以從 `safe_free` 中看到判斷 node 是否存在於危險當中與是否節點標記為 can_free,這裡為好奇為何別人沒有在存取卻還要判斷是否 can_free
#### 延伸問題: 藉由 thread-rcu 來改寫
因為目前對於 read-copy-update 應用在 dequeue 與 enqueue 上去作改寫 hazard pointer 的場景有些疑問,這兩個操作對於 RCU 來說的是 updater 那要如何改寫成 RCU 目前無法想通,**希望閱讀者能提供一些想法與思路**。
:::danger
你閱讀本課程的教材了嗎?
:::
## TODO: 第 10 週測驗的第 3 題
> 包含延伸問題
未完成
## TODO: 第 12 週測驗的第 2 題
> 包含延伸問題
在解釋程式碼結構前,先理解 **lap** 在ring buffer 中扮演什麼角色以及它的用途。
```c
struct chan_item {
_Atomic uint32_t lap;
void *data;
};
```
`lap` 是存放在每個緩衝資料格內的一個成員,他主要用來記錄目前此格資料的使用狀況,可以發現`lap`主要出現在傳送與接收資料的時候,首先有兩個特殊的 `head` 與 `tail` 他在高位 32 bits 暗藏了 `lap` 的值,我們知道 `tail` 用來放入資料 `head` 用來取出資料,從`trysend_buf`中可以發現,他會先取得 channel 尾端的 item 位置之後再將其位置紀錄的 `lap` 與存放在 `tail` 高位的 `lap` 做比對,這裡重點有幾個,存放在 `tail` 變數中的是獨立的,與 channel item 內存放的並不相同,為了區分頭尾,此不等式若比較失敗就表示無法加入或者無法寄送資料到 channel 內。
```graphviz
digraph RingBuffer {
node [shape=circle, fixedsize=true, width=0.5];
rankdir=LR;
Head [label="Head", shape=circle];
Tail [label="Tail", shape=circle];
Node0 [label="1"];
Node1 [label="1"];
Node2 [label="1"];
Node3 [label="1"];
Node4 [label="0"];
Head -> Node0 [label=""];
Node0 -> Node1 [label=""];
Node1 -> Node2 [label=""];
Node2 -> Node3 [label=""];
Node3 -> Node4 [label=""];
Node4 -> Node0 [label=""];
Tail -> Node4[label=""];
}
```
這裡可發現 `tail` 放入資料後都會 +1 ,在 +1 前每個 channel item 的 lap 都是 0,而且 `tail` 高位存放的 `lap` 還沒到尾端不會 +2 所以也是 0。
```graphviz
digraph RingBuffer {
node [shape=circle, fixedsize=true, width=0.5];
rankdir=LR;
Head [label="Head", shape=circle];
Tail [label="Tail", shape=circle];
Node0 [label="2"];
Node1 [label="2"];
Node2 [label="1"];
Node3 [label="1"];
Node4 [label="0"];
Head -> Node2 [label=""];
Node0 -> Node1 [label=""];
Node1 -> Node2 [label=""];
Node2 -> Node3 [label=""];
Node3 -> Node4 [label=""];
Node4 -> Node0 [label=""];
Tail -> Node4[label=""];
}
```
在看這張圖假設 `head` 取了兩個元素就會對 channel item lap +1 變成 2,如此一來當 ring buffer tail 走了一圈 +2 後就知道原本上一輪寫入的資料已經被取用,因此可以將其視為空格進行填入資料。換作是 `head` 也是同樣判斷方式,但是要等到資料放入 `lap` 變為 1 時才可讀取,這也是為何當初 `channel_init` 要將其 `head` 設為 1 的原因。
```c
ch->head = (uint64_t) 1 << 32;
ch->tail = 0;
```
```c
tail = atomic_load_explicit(&ch->tail, memory_order_acquire);
uint32_t pos = tail, lap = tail >> 32;
item = ch->ring + pos;
if (atomic_load_explicit(&item->lap, memory_order_acquire) != lap) {
errno = EAGAIN;
return -1;
}
```
channel 的資料結構
```c
struct chan_item {
_Atomic uint32_t lap;
void *data;
};
struct chan {
_Atomic bool closed;
_Atomic(void **) datap;
struct mutex send_mtx, recv_mtx;
_Atomic uint32_t send_ftx, recv_ftx;
_Atomic size_t send_waiters, recv_waiters;
size_t cap;
_Atomic uint64_t head, tail;
struct chan_item ring[0];
};
typedef void *(*chan_alloc_func_t)(size_t);
```
`strcut chan` 主要是用來實作 go channel 的一些機制,它分成緩衝與非緩衝通道。非緩衝通道並非完全沒有空間,只是他只能存放一個元素,直到他被取出否則他 routine 將無法推入新的元素而被暫停,此法更 mutex lock 一樣可以保護共享變數。
1. `bool closed`: 用來表示 channel 是否還能使用
2. `(void **)datap`: 用於無緩衝通道交換數據
3. `send or recv mtx`: 用於保證最多只有一個存取權
4. `send or recv waiters`:用於緩衝通道,表示在 futex wait 的 thead 數量
5. `send_ftx, recv_ftx`: 用於管理等待的 thread
#### Thread safety:
:::danger
注意用語,務必詳閱 https://hackmd.io/@sysprog/it-vocabulary
不要閱讀低劣的簡體中文素材。
:::
1、通過 atomic 操作和 `mutex` 保證了 multi-thread 環境下的安全性。
2、`send_waiters` 和 `recv_waiters` 計數器幫助管理等待的線程,優化了喚醒機制。
#### ThreadSanitizer
```shell=
WARNING: ThreadSanitizer: data race (pid=123819)
Read of size 8 at 0x7ff2c99be2a8 by thread T1:
#0 reader /home/dong/linux2024/gcftx/main.c:42 (channel_test+0x2e94)
Previous write of size 8 at 0x7ff2c99be2a8 by thread T81:
#0 chan_send_unbuf /home/dong/linux2024/gcftx/chan.c:180 (channel_test+0x266d)
#1 chan_send /home/dong/linux2024/gcftx/chan.c:261 (channel_test+0x2c8d)
#2 writer /home/dong/linux2024/gcftx/main.c:31 (channel_test+0x2da5)
Location is stack of thread T1.
Thread T1 (tid=123821, running) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:962 (libtsan.so.0+0x5ea79)
#1 create_threads /home/dong/linux2024/gcftx/main.c:70 (channel_test+0x309f)
#2 test_chan /home/dong/linux2024/gcftx/main.c:101 (channel_test+0x3356)
#3 main /home/dong/linux2024/gcftx/main.c:115 (channel_test+0x34fb)
Thread T81 (tid=123901, running) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:962 (libtsan.so.0+0x5ea79)
#1 create_threads /home/dong/linux2024/gcftx/main.c:70 (channel_test+0x309f)
#2 test_chan /home/dong/linux2024/gcftx/main.c:102 (channel_test+0x3381)
#3 main /home/dong/linux2024/gcftx/main.c:115 (channel_test+0x34fb)
SUMMARY: ThreadSanitizer: data race /home/dong/linux2024/gcftx/main.c:42 in reader
```
以上是發現是在 unbuf 時產生 race condition,因此會去探討在所有在無 buffer 情況下的共享變數,不過在檢查的時候發現反而是每個 thread 的區域 `msg` 接收資料的時候出現 data race。
```c
// sender
if (!atomic_compare_exchange_strong_explicit(&ch->datap, &ptr, &data,
memory_order_acq_rel,
memory_order_acquire))
*ptr = data; // data race
// reader
atomic_fetch_add(&msg_count[a->msg],1); // data race
```
> 為何非 shared memory 也會有 data race 發生?
由此可發現只有在 channel 還有資料的時候才會發生 data race,初步認為是 `*ptr` 的存取造成的 data race,因為前面條件不成立時 `ptr = ch->datap`,因此為了避免在此處操作有衝突將其改成以下原子操作 [82dfeeb](https://github.com/stevendd543/go_channel_with_C/commit/82dfeeb9fc69f9fc9fcf125a8a2ff3af2f7b284d)。
```c
// recv
atomic_store(data, *ptr);
// send
atomic_store(&ptr, &data);
```
`ptr` 是指向 unbuffer 的變數也就是 channel,不管是從 channel 儲存資料至 thread data 或是 thread data 儲存到 channel 都以原子操作完成即可避免 threads data race。我們知道 senders 或 readers 之間都有 futex 管理,不管是在 critical section 或者作為 channel 資料有無的管理都可以達成,**但 sender 與 reader 間是沒有 critical section**,所以會有資料被對方清除的可能導致最後不是每一個 `msg` 都能成功傳遞,主要導因為 unbuffer 不用 ring buffer 一樣在對 data 讀寫之後會儲存一個 lap 在資料結構上,提供給下一個使用者是否可存取資料,以上步驟將會同時發生,但 unbuffer 的防範機制在於以下。
```c
if (!atomic_compare_exchange_strong_explicit(&ch->datap, &ptr, data,
memory_order_acq_rel, memory_order_acq_rel,
memory_order_acquire)) { memory_order_acquire)) {
atomic_store_explicit(data, *ptr, memory_order_release);
atomic_store_explicit(&ch->datap, NULL, memory_order_release);
}
```
sender 與 reader 都一樣只要通道還有資料就不對通到直接修改值,而是先取得通道資料或寫入資料後將通道設為 `NULL`,因此假設兩個 thread 同時進入這個條件區域可能在讀取或寫入前就被改成 NULL,造成讀取失敗。
## TODO: 第 12 週測驗的第 3 題
> 包含延伸問題
#### Enqueue and Dequeue
在進行讀取修改時會針對 union 結構修改,union 結構分成 `volatile uint32_t w` 和 ` volatile const uint32_t r` 這可以讓在同一個記憶體位置操作不同的變數,來提高程式碼可讀性,但可以發現不管是讀寫操作都沒有使用到 atomic 操作,我認為是因為 single producer and single consumer 彼此之間並不會有 thread safety 問題,因為會有 batch size 來確認邊界和 `wait_ticks(SPSC_CONGESTION_PENALTY)` 來作讀寫逞罰,來避免過度競爭。
#### CPU affinity
在主函數中針對每一個 CPU 都會初始化一個 queue 配上一個負責生產所有 queues 資料的 CPU,每個 consumer 都會配置以自己 id 兩倍對應的 CPU。**但要如何在不同 CPU 達到同步等待?**
```c=
cpu_set_t cur_mask;
CPU_ZERO(&cur_mask); // initiate cpu mask
CPU_SET(cpu_id * 2, &cur_mask);
sched_setaffinity(0, sizeof(cur_mask), &cur_mask);
```
#### pthread_barrier_wait
[pthread_barrier](https://linux.die.net/man/3/pthread_barrier_init) 是一個計數鎖用來同步**同一行程**產生的線程,因此就算被分配到其他 CPU 執行也是原屬於 pthread_create 的進程
#### false sharing
:::danger
注意用語:
* access 是「存取」,而非「訪問」(visit)
務必使用本課程教材規範的術語。
:::
它發生在多個 CPU 同時存取 同一個 cache line 時性能低下,這都是因為 Cache Coherence Protocol 引發的問題,因此程式碼中有針對共享資料作對齊優化。`tail` 因為頻繁被多的 consumer 修改因此對其作對齊優化,**不過這裡尚不懂的地方是,程式碼當初是建立在多個 consumers 有自己對應的 queue 為何還需要對齊來避免 false sharing?**
:::danger
為何不作實驗來驗證自己的想法呢?避免「舉燭」
:::
```c
typedef struct spsc_queue {
counter_t head; /* Mostly accessed by producer */
volatile uint32_t batch_head;
counter_t tail __ALIGN; /* Mostly accessed by consumer */
volatile uint32_t batch_tail;
unsigned long batch_history;
/* For testing purpose */
uint64_t start_c __ALIGN;
uint64_t stop_c;
element_t data[SPSC_QUEUE_SIZE] __ALIGN; /* accessed by prod and coms */
} __ALIGN spsc_queue_t;
```