owned this note
owned this note
Published
Linked with GitHub
# 期末專題筆記
* Todo : 閱讀 https://hackmd.io/@sysprog/concurrency 並紀錄問題
* Todo : quiz9/quiz10/quiz12 (or newer)
## 閱讀並行和多執行緒設計
### 概念
Concurrency(並行) : 將各個程式拆成多個可獨立運作的 process/thread 來執行,並分配在不同時間點做不同的事
Parallelism(平行) : 在多核或多處理器的架構中,將任務分配給不同的硬體來同時處理,注重在規劃。
上述兩種方式都會遇到同個問題,資料的 **Sychronization(同步處理)**。其作用是確保各個處理單元在運作時,不會因為**處理的先後順序**不同而造成錯誤。
#### mutex 和 semaphore 的差別:
process 使用 **mutex** 時,此 process 便獲得**存取/修改**資源的權限,執行其 Critical Section,結束後再釋放 mutex。
process 使用 **semaphore** 時,各個處理器會發出 signal 和 wait 來決定執行的先後順序,同個 process 不會先後進行 signal 和 wait。來保證 process 的同步處理正確。
**Thread** 在硬體中,代表著實際的處理器中能夠處理的執行續;在軟體中則代表一個 process 能夠被拆成多個 thread。
在 intel 開發了 Hyperthread ,讓單個 CPU 可以同時執行多個 Thread (物理上),藉由共用 ALU 或 FPU 來完成。
#### Task and Thread
在 Linux 中,並沒有特別強調 task/process/thread 的差異,在多個 CPU 的環境中,會將一個程式分割成最小單元(Thread),並分配到各個 CPU 中來進行。若要完成某個目標,會需要多個工作,而各個工作又分割成許多 Thread ,即為**多工多執行續系統**。
而多工系統中,每個工作會因為不同的排程,而各自運行到不同的階段,當被暫停切換至另一個工作時,需要使用 **工作切換 (context switch)** 來回復到上次的進度,也就是回到上次執行完的**下一個地址**。因為工作切換也會造成一定程度的**負擔(overhead)**,因此和系統的反應速率有**高度相關**。
#### 排程器
CPU 會為各個工作進行排程,來決定下一個工作,可以是 static 排程,也可以是 dynamic 的排程。
排程有多種演算法,依照時間分割(round-robin scheduling 或 time slicing) 用於處理相同重要性的工作,而任務優先順序的 (priority scheduling)則處理那些有不同時效性的任務,也就是 hard real-time 的工作。
:::info
排程器是在哪裡運行的 是會有一個CPU來完成這些事情嗎 排程這件事算一個task嗎
:::
#### 搶佔式與非強取式核心
兩者的核心差別為,工作交出CPU的使用權是強制性的或是非強制的。
非搶佔式(non-preemptive)不會依照工作的優先順序交出CPU的使用權,而是不定期的交出使用權。為了達到並行,因此頻率要夠快,否則讓使用者感受到等待時間,有下列好處
1. 實作單純
2. 工作中可使用**非再進入程式碼(non-reentrant code)**,換言之,每個工作不需擔心在程式未執行完畢時又重新進入。因此該工作本身所用的記憶區不會有被污染 (corruption) 的可能;
3. 對系統共用記憶區的保護動作可減至最少,因為每一工作在未使用完記憶區時不會放棄 CPU ,無須擔心會被其他工作在半途中修改;
:::info
2、3項不太懂
:::
缺點為反應能力 (responsiveness) 較差,當優先權較高的順序已準備好,卻必須等待較低的工作放棄CPU。
![visualization (1)](https://hackmd.io/_uploads/ByqahbPNR.png)
![visualization (2)](https://hackmd.io/_uploads/rJM03ZD40.png)
搶佔式核心則是讓優先權較高的工作可以打斷正在執行且較低優先權的工作。優點為系統反應速度快,但其核心設計較複查,也需考量較多因素,且要注意程式碼的再進入性與保護共用資料區。
#### 程式之可再進入性
可再進入的函式可以同時被多個工作呼叫,而不會造成資料不一致的問題。一個可再進入 [(reentry) ](https://en.wikipedia.org/wiki/Reentrancy_(computing))會避免使用共享記憶體(global memory),而變數和資料存在於呼叫者的資料區或堆疊區。
三種不同的函式
* Neither reentrant nor thread-safe
`tmp` 為全域不符合 reentrant,且沒有進行同步,非 thread-safe。
```cpp
int tmp;
void swap(int* x, int* y)
{
tmp = *x;
*x = *y;
/* Hardware interrupt might invoke isr() here. */
*y = tmp;
}
void isr()
{
int x = 1, y = 2;
swap(&x, &y);
}
```
* Thread-safe but not reentrant
`_Thread_local`
```cpp
_Thread_local int tmp;
void swap(int* x, int* y)
{
tmp = *x;
*x = *y;
/* Hardware interrupt might invoke isr() here. */
*y = tmp;
}
void isr()
{
int x = 1, y = 2;
swap(&x, &y);
}
```
* Reentrant and thread-safe
```cpp
void swap(int* x, int* y)
{
int tmp;
tmp = *x;
*x = *y;
*y = tmp; /* Hardware interrupt might invoke isr() here. */
}
void isr()
{
int x = 1, y = 2;
swap(&x, &y);
}
```
### Concurrency (並行) vs. Parallelism (平行)
**Concurrency** 是指程式架構,將程式拆開成多個可獨立運作的工作。案例: 裝置驅動程式,可獨立運作,但不需要平行化。
**Parallelism** 是指程式執行,同時執行多個程式。Concurrency 可能會用到 parallelism,但不一定要用 parallelism 才能實現 concurrency。案例: 向量內積計算
### Atomics 操作
在並行程式設計中,會使用 lock 來避免發生 race condition,
### POSIX THREADs
#### mutex lock
`pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER` :宣告一個mutex lock
`int pthread_mutex_init(pthread_mutex_t *mutex, pthread_mutexattr_t *attr)` : 初始化 lock
`int pthread_mutex_destroy(pthread_mutex_t *mutex)`: 刪除 lock
* `int pthread_mutex_lock(pthread_mutex_t *mutex)` : 獲得 lock or 等待得到 lock
* `int pthread_mutex_unlock(pthread_mutex_t *mutex)` : 釋放 lock 給其他 thread
* `int pthread_mutex_trylock(pthread_mutex_t *mutex)`: 獲得 lock or 回傳無法得到 lock
#### condition variables
* `int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)`: 初始化條件變數
* `int pthread_cond_destroy(pthread_cond_t *cond)`:刪除條件變數
* `int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)`:釋放目前的 lock ,並等待條件成立,再獲得 lock
* `int pthread_cond_signal(pthread_cond_t *cond)` : 發送條件內容。
* `int pthread_cond_broadcast(pthread_cond_t *cond)`: 喚醒所有執行續。
```cpp
// 宣告 mutex lock 和 condition variable
pthread_mutex_t *lock;
pthread_cond_t *notFull, *notEmpty;
//
void consumer(char* buf) {
for(;;) {
//獲得 mutex
pthread_mutex_lock(lock);
// 若沒有空間時
while(count == 0)
// 發送一 notEmpty 的訊號,呼叫 producer 生產物件,並釋放 mutex,等待直到 not Empty
pthread_cond_wait(notEmpty, lock);
// buff內容減少
useChar(buf[count-1]);
count--;
// 傳送 norFull 的訊號
pthread_cond_signal(notFull);
// 釋放 mutex
pthread_mutex_unlock(lock);
}
}
```
#### semaphores
* `sem_t semaphore;` : 宣告 semaphore
* `int sem_init(sem_t *sem, int pshared, unsigned int value)` : 初始化 semaphore, `pshared` 表示此 semaphore 是 thread 間共享還是 process 間共享。為 `0` 代表示 thread 間共享,存放地址應該要能夠讓所有 thread 都讀取; 不為 `0` 則代表為 process 間共享,這時就會使用到 `mmap`。
* `int sem_wait(sem_t *sem);` : 若目前 sem 值大於 0, 則可以 decrement 並 return ; 相反則會等待直到 sem 值大於 0
* `int sem_post(sem_t *sem);` : 將 sem 值加一。
`sem_wait` 和 `sem_post` 成功時會回傳 0 ,失敗回傳 -1 。
Mutex 雖然可以確保 CS ,但卻無法確保 thread 的先後順序,則 condition variable 可以完成上述任務
### 使用 實作輕量級的 Mutex Lock
futex 為一種系統呼叫,當在確認 task 是否會競爭資源時,會以系統呼叫來確認,便會造成不必要的浪費。 Futex 則可以在userspace 上完成 mutex 、 condvar 等操作。
通常在condvar 和 mutex 合用的實作中,會需要 condvar 的情況成立和獲得 mutex 才會進行動作,而頻繁的確認 mutex 也會造成一定的資源浪費,而 futex 將兩種動作合為一次呼叫即可完成。
* `futex_wait` :
```cpp
static inline void futex_wait(atomic int *futex, int value)
{
syscall(SYS_futex, futex, FUTEX_WAIT_PRIVATE, value, NULL);
}
```
### 案例 Ring buffer
## 第十二周測驗題
### 測驗 `1`
此題希望修改以下程式碼,使用 futex 的方式,達到等待 3 秒的效果
```cpp
#include <linux/futex.h>
#include <stdint.h>
#include <stdio.h>
#include <sys/syscall.h>
#include <time.h>
#include <unistd.h>
int futex_sleep(time_t sec, long ns)
{
uint32_t futex_word = 0;
struct timespec timeout = {sec, ns};
return syscall(SYS_futex, AAAA, BBBB, futex_word, &timeout,
NULL, 0);
}
int main()
{
time_t secs = 3;
printf("Before futex_sleep for %ld seconds\n", secs);
futex_sleep(secs, 0);
printf("After futex_sleep\n");
return 0;
}
```
首先,我們先看 [`timespec` ](https://man7.org/linux/man-pages/man3/timespec.3type.html) ,它表示了一個時間,並且精度可以達到奈秒。
接著查看 `futex` 的格式和參數說明
```cpp
long syscall(SYS_futex, uint32_t *uaddr, int futex_op, uint32_t val,
const struct timespec *timeout, /* or: uint32_t val2 */
uint32_t *uaddr2, uint32_t val3);
```
Futex (Fast userspace mutex) 藉由使用一個 32 位元變數`futex word`,來和 Kernal 中的 wait queue 來互動,而需要進行同步的執行緒則會共享此變數。
* `uaddr` 指向著 `futexword`
* `futex_op` 則表示著不同的 futex 操作
* `val` 對應著 `futex_op` 中的數值
* `timeout` 、 `uaddr2` 和 `val3` 只有特定的 `futex_op` 會使用到,其餘則可以忽略
FUTEX_WAIT 會查看 `uaddr` 所指向的值,是否和 `val` 相同,此作法是為了避免被其他執行緒改變 `futex_word` 的值,而造成無法進入 sleep。
由於預期是等待 3 秒,因此 `futex_op` 應為 `FUTEX_WAIT` ,而 `*uaddr` 的部份為 `&futex_word` 。
### 測驗 `2`
此題使用 C11 Atomics 和 Linux 提供的 futex 系統呼叫,來模擬 Go 程式語言的 [goroutine](https://go.dev/tour/concurrency/1) 和 [channel](https://go.dev/tour/concurrency/2) 機制
goroutine 可以想像成一個較輕量的 Thread ,而goroutine共享著同樣的地址,因此同步地訪問共同記憶體也非常重要
```go
func main() {
go say("world")
say("hello")
}
```
![image](https://hackmd.io/_uploads/rySrlptEA.png)
原先的 goroutine 會呼叫另一個 goroutine ,當原先也就是 main 的 goroutine 結束,其他的 goroutine也會跟著結束。
而 channel 為各個 goroutine 間的通訊,我們可以以此來完成執行緒間的 wait 和同步。
```go
func say(s string, c chan string) {
for i := 0; i < 5; i++ {
time.Sleep(100 * time.Millisecond)
fmt.Println(s)
}
c <- "FINISH"
}
func main() {
ch := make(chan string)
go say("world", ch)
go say("hello", ch)
<-ch
<-ch
}
```
![image](https://hackmd.io/_uploads/HJUS-vj40.png)
在 `main` 中,創建了 channel ,表示用來傳送字串。
```go
ch := make(chan string)
```
接著創建了兩個 goroutine ,其結束後會往 channel 傳送 "Finish"
```go
func say(s string, c chan string) {
for i := 0; i < 5; i++ {
time.Sleep(100 * time.Millisecond)
fmt.Println(s)
}
c <- "FINISH"
}
```
直到接收到兩個 "Finish" 後,才會結束程式
```go
func main() {
ch := make(chan string)
go say("world", ch)
go say("hello", ch)
<-ch
<-ch
}
```
接著是使用 C11 Atomics 和 futex 實作 GO channel
首先是 `mutex_unlock` ,使用 atomic 操作對 mutex 減一並解鎖,確保不被其他執行緒影響,並查看目前是否有執行緒在等待,有的話就喚醒。
```cpp
void mutex_unlock(struct mutex *mu)
{
uint32_t orig =
atomic_fetch_sub_explicit(&mu->val, 1, memory_order_relaxed);
if (orig != LOCKED_NO_WAITER) {
mu->val = UNLOCKED;
futex_wake(&mu->val, 1);
}
}
```
使用 [`CAS` ](https://en.wikipedia.org/wiki/Compare-and-swap)操作,會比較 `ptr` 和 `expect` 的值。
**相同**,則將 `new` 放入 `ptr`; **不同的話**,則將 `ptr` 的值放入 `expect` ,最後回傳 expect 。
```cpp
static uint32_t cas(_Atomic uint32_t *ptr, uint32_t expect, uint32_t new)
{
atomic_compare_exchange_strong_explicit(
ptr, &expect, new, memory_order_acq_rel, memory_order_acquire);
return expect;
}
```
`mutex_lock` 用於將 lock 上鎖,先使用 `CAS` 看目前的 val 值是否為 `UNLOCKED` ,是的話則替換成 `LOCKED_NO_WAITER` ;否的話代表目前鎖有人在使用,因此會進入 futex_wait 直到解鎖。
```cpp
void mutex_lock(struct mutex *mu)
{
uint32_t val = cas(&mu->val, UNLOCKED, LOCKED_NO_WAITER);
if (val != UNLOCKED) {
do {
if (val == LOCKED ||
cas(&mu->val, LOCKED_NO_WAITER, LOCKED) != UNLOCKED)
futex_wait(&mu->val, LOCKED);
} while ((val = cas(&mu->val, UNLOCKED, LOCKED)) != UNLOCKED);
}
}
```
接著是 `channel` 的宣告,在此有兩種 channel ,分別是 `unbuffer` 和 `ring buffer` 。
break
```cpp
struct chan {
_Atomic bool closed;
/* Unbuffered channels only: the pointer used for data exchange. */
_Atomic(void **) datap;
/* Unbuffered channels only: guarantees that at most one writer and one
* reader have the right to access.
*/
struct mutex send_mtx, recv_mtx;
/* For unbuffered channels, these futexes start from 1 (CHAN_NOT_READY).
* They are incremented to indicate that a thread is waiting.
* They are decremented to indicate that data exchange is done.
*
* For buffered channels, these futexes represent credits for a reader or
* write to retry receiving or sending.
*/
_Atomic uint32_t send_ftx, recv_ftx;
/* Buffered channels only: number of waiting threads on the futexes. */
_Atomic size_t send_waiters, recv_waiters;
/* Ring buffer */
size_t cap;
_Atomic uint64_t head, tail;
struct chan_item ring[0];
};
```
* `closed` 表示通道的開啟與否
```cpp
_Atomic bool closed;
```
* `datap` 用以指向需要交換的資料 (unbuffered)
```cpp
_Atomic(void **) datap;
```
* `send_mtx` 、` recv_mtx`: 確保只有一位 writer 和 一位 reader (unbuffered)
```cpp
struct mutex send_mtx, recv_mtx;
```
* `send_ftx` 、` recv_ftx`
* unbuffered :會從 1 開始,表示 channel 尚未完成。增加代表有執行緒在等待,完成傳輸則減少。
* ring buffer :表示可供給傳送和接收的數量
:::info
有點看不懂解釋
For buffered channels, these futexes represent credits for a reader or write to retry receiving or sending.
:::
```cpp
_Atomic uint32_t send_ftx, recv_ftx;
```
* `send_waiters` 、` recv_waiters` 目前正在等待傳送和接收的 Thread (buffered)
```cpp
_Atomic size_t send_waiters, recv_waiters;
```
* 最後是定義了 ring buffer 的長度 `cap`,並使用 `lap` 來計數。
```cpp
size_t cap;
_Atomic uint64_t head, tail;
struct chan_item ring[0];
```
```cpp
struct chan_item {
_Atomic uint32_t lap;
void *data;
};
```
接著是 channel 的相關操作
```cpp
enum {
CHAN_READY = 0,
CHAN_NOT_READY = 1,
CHAN_WAITING = 2,
CHAN_CLOSED = 3,
};
```
* `chan_init` 初始化一個 channel , 由 `cap` 決定為 unbuffered 還是 buffered channel。 若為 buffered channel ,則使用 `memset` 將所設定的 buffer 以 0 填滿。
```cpp
static void chan_init(struct chan *ch, size_t cap)
{
ch->closed = false;
ch->datap = NULL;
mutex_init(&ch->send_mtx), mutex_init(&ch->recv_mtx);
if (!cap)
ch->send_ftx = ch->recv_ftx = CHAN_NOT_READY;
else
ch->send_ftx = ch->recv_ftx = 0;
ch->send_waiters = ch->recv_waiters = 0;
ch->cap = cap;
ch->head = (uint64_t) 1 << 32;
ch->tail = 0;
if (ch->cap > 0) memset(ch->ring, 0, cap * sizeof(struct chan_item));
}
```
:::info
目前還不清楚為何 head = 1 << 32
:::
* `chan_make` 創建一個 channel ,並以 `alloc` 分配空間給 channel 。
```cpp
struct chan *chan_make(size_t cap, chan_alloc_func_t alloc)
{
struct chan *ch;
if (!alloc || !(ch = alloc(sizeof(*ch) + cap * sizeof(struct chan_item))))
return NULL;
chan_init(ch, cap);
return ch;
}
```
:::info
可以使用 malloc 嗎
:::
以下是 buffered 的通道
* `chan_trysend_buf` 執行以下事項
1. 先檢查了 channel 是否開啟
```cpp
if (atomic_load_explicit(&ch->closed, memory_order_relaxed)) {
errno = EPIPE;
return -1;
}
```
2. 存取 tail 的位置,並存取目前即將寫入的位置。檢查 `item` 的 `lap` 和 `tail` 的 `lap` 是否一致,表示可寫入狀態 ,並檢查這次寫入後 tail 是否達到 buffer 的最後位置,達到的話就回到第一個位置。
```cpp
uint64_t tail, new_tail;
struct chan_item *item;
// check if the tail is the same
do {
tail = atomic_load_explicit(&ch->tail, memory_order_acquire);
uint32_t pos = tail, lap = tail >> 32;
item = ch->ring + pos;
if (atomic_load_explicit(&item->lap, memory_order_acquire) != lap) {
errno = EAGAIN;
return -1;
}
if (pos + 1 == ch->cap)
new_tail = (uint64_t)(lap + 2) << 32;
else
new_tail = tail + 1;
} while (!atomic_compare_exchange_weak_explicit(&ch->tail, &tail, new_tail,
```
6. `!atomic_compare_exchange_weak_explicit` 檢查 `tail` 和 `ch->tail` 是否相同,若失敗則代表有被打斷,重新執行 1 ~ 4 步驟
7. 成功後,將資料放入buffer,
```cpp
item->data = data;
atomic_fetch_add_explicit(&item->lap, 1, memory_order_release);
```
* `chan_send_buf`
使用 `chan_trysend_buf` 檢查是否可以傳送,若為 `-1` 則代表目前無法傳送。此時由於交換失敗,因此將 `&ch->send_ftx` 存入 `v` ,因此若 `v` 為零,代表可傳送餘額為 0 進入等待,直到被喚醒。
```cpp
static int chan_send_buf(struct chan *ch, void *data)
{
//ready to send
while (chan_trysend_buf(ch, data) == -1) {
if (errno != EAGAIN) return -1;
//
uint32_t v = 1;
while (!atomic_compare_exchange_weak_explicit(&ch->send_ftx, &v, v - 1,
memory_order_acq_rel,
memory_order_acquire)) {
// v is zero only when &ch->send_ftx is not equal to &v
if (v == 0) {
atomic_fetch_add_explicit(&ch->send_waiters, 1,
memory_order_acq_rel);
futex_wait(&ch->send_ftx, 0);
atomic_fetch_sub_explicit(&ch->send_waiters, 1,
memory_order_acq_rel);
v = 1;
}
}
}
```
接著, `&ch->recv_ftx` 加一,增加可接收的餘額,並叫醒正在等待接收的 Thread。
```cpp
// recv_ftx + 1
atomic_fetch_add_explicit(&ch->recv_ftx, 1, memory_order_acq_rel);
// if there are recv_waiters, wake up one thread
if (atomic_load_explicit(&ch->recv_waiters, memory_order_relaxed) > 0)
futex_wake(&ch->recv_ftx, 1);
return 0;
}
```
* `chan_tryrecv_buf` 、`chan_recv_buf` 行為和 `chan_trysendbuf` 、`chan_send_buf` 原理相同。
接下來是 unbuffered 的通道。
* `int chan_send_unbuf(struct chan *ch, void *data)`
先檢查通道是否開啟
```cpp
if (atomic_load_explicit(&ch->closed, memory_order_relaxed)) {
errno = EPIPE;
return -1;
}
```
使用 mutex 上鎖 `&ch->send_mtx`
```cpp
mutex_lock(&ch->send_mtx);
```
接著嘗試將資料放入 channel 中,成功的話,代表`ch->data` 和 `ptr` 一樣,此時 `ch->data` 就會是 `data` ; 失敗的話,則 `ptr` 會被替換成 `ch->datap`。
失敗時,由於目前 `ptr` 為 `ch->datap` ,為 channel 的 data 交換區的指標的指標,使用 `*ptr` 將指標內容置換成 `data` ,接著再將 `ch->datap` 指向 NULL。
使用`atomic_fetch_sub_explicit(&ch->recv_ftx, 1, memory_order_acquire) ==
CHAN_WAITING` 查看是否接收端在等待接收,有的話則喚醒。
```cpp
void **ptr = NULL;
if (!atomic_compare_exchange_strong_explicit(&ch->datap, &ptr, &data,
memory_order_acq_rel,
memory_order_acquire)) {
*ptr = data;
atomic_store_explicit(&ch->datap, NULL, memory_order_release);
if (atomic_fetch_sub_explicit(&ch->recv_ftx, 1, memory_order_acquire) ==
CHAN_WAITING)
futex_wake(&ch->recv_ftx, CCCC = 1);
}
```
替換成功,則代表目前 `ch->data` 指向 `data`,使用 atomic 指令將 `&ch->send_ftx` +1,並回傳 `&ch->send_ftx` 初始值是否為 `CHAN_NOT_READY = 1`,是的話就持續等到接收端接收。
`futex_wait(&ch->send_ftx, CHAN_WAITING)` 會持續等待,直到 `&ch->send_ftx` 不等於 `CHAN_WAITING`。
最後再歸還鎖。
```cpp
else {
if (atomic_fetch_add_explicit(&ch->send_ftx, 1, memory_order_acquire) ==
CHAN_NOT_READY) {
do {
futex_wait(&ch->send_ftx, CHAN_WAITING);
} while (atomic_load_explicit(
&ch->send_ftx, memory_order_acquire) == CHAN_WAITING);
if (atomic_load_explicit(&ch->closed, memory_order_relaxed)) {
errno = EPIPE;
return -1;
}
}
}
mutex_unlock(&ch->send_mtx);
return 0;
}
```
* `chan_recv_unbuf` 和`chan_send_unbuf` 概念類似。
* `static int chan_recv_unbuf(struct chan *ch, void **data)`
先檢查要接收的位置 data 是否存在,和檢查 channel 是否開啟
```cpp
if (!data) {
errno = EINVAL;
return -1;
}
if (atomic_load_explicit(&ch->closed, memory_order_relaxed)) {
errno = EPIPE;
return -1;
}
```
將上鎖 `ch->recv_mtx` 上鎖,避免同時執行緒進行接收。
```cpp
mutex_lock(&ch->recv_mtx);
```
檢查 `ch->datap` 是否為 NULL,是的話代表還**沒有傳送端傳送資料**;否的話代表已有**接收端傳送資料**。
已有傳送資料的情況下,會將要資料接收的位置 `data` 替換成 `ptr` ,而目前 `ptr` = `ch->datap` ,為傳送端傳送的資料。再將 `ch->datap` 指向 NULL,代表資料接收已完成,將傳送端的 futex 減 1 並喚醒先前等待的傳送端執行緒。
```cpp
void **ptr = NULL;
if (!atomic_compare_exchange_strong_explicit(&ch->datap, &ptr, data,
memory_order_acq_rel,
memory_order_acquire)) {
*data = *ptr;
atomic_store_explicit(&ch->datap, NULL, memory_order_release);
if (atomic_fetch_sub_explicit(&ch->send_ftx, 1, memory_order_acquire) ==
CHAN_WAITING)
futex_wake(&ch->send_ftx, 1);
}
```
因為目前傳送端尚未傳送資料,接下來會將 `ch->recv_ftx` 加一表示等待,直到傳送端喚醒此執行緒。
```cpp
else {
if (atomic_fetch_add_explicit(&ch->recv_ftx, 1, memory_order_acquire) ==
CHAN_NOT_READY) {
do {
futex_wait(&ch->recv_ftx, CHAN_WAITING);
} while (atomic_load_explicit(
&ch->recv_ftx, memory_order_acquire) == CHAN_WAITING);
if (atomic_load_explicit(&ch->closed, memory_order_relaxed)) {
errno = EPIPE;
return -1;
}
}
}
```
最後歸還鎖
```cpp
mutex_unlock(&ch->recv_mtx);
```
我們可以歸納出,如果傳送端先傳送資料, `ch->datap` 不為 NULL ,而接收端會進入 CAS 失敗的情況,喚醒傳送端的執行緒;反之,接收端先等待資料, `ch->datap` 不為 NULL,傳送端會進入 CAS 失敗,喚醒等待的接收端執行緒。
* `chan_close` 根據 `ch->cap` 決定要關閉的 channel 種類
```cpp
void chan_close(struct chan *ch)
{
ch->closed = true;
if (!ch->cap) {
atomic_store(&ch->recv_ftx, CHAN_CLOSED);
atomic_store(&ch->send_ftx, CHAN_CLOSED);
}
futex_wake(&ch->recv_ftx, INT_MAX);
futex_wake(&ch->send_ftx, INT_MAX);
}
```
### 排除 ThreadSanitizer 的錯誤訊息並提出改進方案
ThreadSanitizer 是一個可以檢查各個執行續間有沒有發生 Data race,並且會在編譯時進行偵測程式碼,並在程式運行時紀錄每個執行緒的行為,如造訪的記憶體位置、讀寫行為等,最後再進行報告。
我們可以寫一個會造成 Data race 的程式 `data_race.c`
```cpp
#include <stdio.h>
#include <pthread.h>
pthread_mutex_t lock;
int shared_var = 0;
int times = 1000000;
void* increment(void* arg) {
for (int i = 0; i < times; ++i) {
++shared_var;
}
return NULL;
}
int main() {
pthread_t t1, t2;
pthread_create(&t1, NULL, increment, NULL);
pthread_create(&t2, NULL, increment, NULL);
pthread_join(t1, NULL);
pthread_join(t2, NULL);
printf("Final value: %d\n", shared_var);
return 0;
}
```
並使用 ThreadSanitizer 進行編譯
```
gcc -g -fsanitize=thread -o data_race data_race.c
```
執行 `data_race.c`
```
FATAL: ThreadSanitizer: unexpected memory mapping 0x63faef1d9000-0x63faef1da000
```
反而出現的不是 Data race 的問題,而是 memory mapping 的問題,在 [Thread Sanitizer FATAL error on kernel version](https://github.com/google/sanitizers/issues/1716) 說明到可能是 ASLR(Address space layout randomization) 的問題。
ASLR 通過隨機地分配程序的記憶體位置,防止惡意程式或攻擊者利用已知的記憶體位置來進行攻擊,預設是 32 bit,我們將其設置為 28 bit。
```shell
$ sudo sysctl -w vm.mmap_rnd_bits=28
vm.mmap_rnd_bits = 28
```
再執行一次 `data_race.c`
```cpp
==================
WARNING: ThreadSanitizer: data race (pid=43074)
Read of size 4 at 0x55939c8a7018 by thread T2:
#0 increment /home/kkkkk1109/2024q1week12/exam2/data_race.c:13 (data_race+0x129d)
Previous write of size 4 at 0x55939c8a7018 by thread T1:
#0 increment /home/kkkkk1109/2024q1week12/exam2/data_race.c:13 (data_race+0x12b5)
Location is global '<null>' at 0x000000000000 (data_race+0x000000004018)
Thread T2 (tid=43077, running) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
#1 main /home/kkkkk1109/2024q1week12/exam2/data_race.c:26 (data_race+0x134e)
Thread T1 (tid=43076, running) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
#1 main /home/kkkkk1109/2024q1week12/exam2/data_race.c:25 (data_race+0x1331)
SUMMARY: ThreadSanitizer: data race /home/kkkkk1109/2024q1week12/exam2/data_race.c:13 in increment
==================
Final value: 2000000
ThreadSanitizer: reported 1 warnings
```
說明發生了 data race。
接著在程式中加入 mutex 進行保護
::: spoiler `data_race_protect.c`
```diff
#include <stdio.h>
#include <pthread.h>
++pthread_mutex_t lock;
int shared_var = 0;
int times = 1000000;
void* increment(void* arg) {
for (int i = 0; i < times; ++i) {
++ pthread_mutex_lock(&lock);
shared_var;
++ pthread_mutex_unlock(&lock);
}
return NULL;
}
int main() {
pthread_t t1, t2;
++ pthread_mutex_init(&lock, NULL);
pthread_create(&t1, NULL, increment, NULL);
pthread_create(&t2, NULL, increment, NULL);
pthread_join(t1, NULL);
pthread_join(t2, NULL);
++ pthread_mutex_destroy(&lock);
printf("Final value: %d\n", shared_var);
return 0;
}
```
:::
ThreadSanitizer 就沒有報錯了
```
$ ./data_race_protect
Final value: 2000000
```
接著在測驗 `2` 加入 ThreadSanitizer 進行編譯
```cpp
CFLAGS = -std=c11 -Wall -Wextra -pthread -fsanitize=thread
```
也出現了 data race 的問題,發現是 unbuffered 發生了問題
```cpp
WARNING: ThreadSanitizer: data race (pid=77546)
Read of size 8 at 0x7f96901be1c8 by thread T1:
#0 reader <null> (exam2+0x16a0)
Previous write of size 8 at 0x7f96901be1c8 by thread T81:
#0 chan_send_unbuf <null> (exam2+0x2e58)
#1 chan_send <null> (exam2+0x3477)
#2 writer <null> (exam2+0x15b2)
Location is stack of thread T1.
Thread T1 (tid=77548, running) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
#1 create_threads <null> (exam2+0x18b0)
#2 test_chan <null> (exam2+0x1b81)
#3 main <null> (exam2+0x1d3b)
Thread T81 (tid=77628, finished) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
#1 create_threads <null> (exam2+0x18b0)
#2 test_chan <null> (exam2+0x1baf)
#3 main <null> (exam2+0x1d3b)
```
執行緒 `T1` 在 `reader` 和執行緒 `T81` 在 `chan_send_unbuf` 同時存取同個記憶體位置 `0x7f96901be1c8` ,不過只靠此訊息無法得知實際是哪個變數被同時存取,因此我使用了 [Helgrind](https://valgrind.org/docs/manual/hg-manual.html) 來輔助進行 data race 的除錯。
Helgind 是 Valgrind 的其中一個工具,可以檢測使用以下事項
* POSIX thread API 的使用錯誤
* deadlock
* data race
### 測驗 `3`
此題為實作一個 lock-free 的 single-producer/single-consumer,使用 ring buffer 且避免造成 false sharing。
定義了 `counter_t` 用為計數。
```cpp
typedef union {
volatile uint32_t w;
volatile const uint32_t r;
} counter_t;
```
接著看 `spsc_queue`
```cpp
typedef struct spsc_queue {
counter_t head; /* Mostly accessed by producer */
volatile uint32_t batch_head;
counter_t tail __ALIGN; /* Mostly accessed by consumer */
volatile uint32_t batch_tail;
unsigned long batch_history;
/* For testing purpose */
uint64_t start_c __ALIGN;
uint64_t stop_c;
element_t data[SPSC_QUEUE_SIZE] __ALIGN; /* accessed by prod and coms */
} __ALIGN spsc_queue_t;
```
可以看到使用了 `head` 、 `tail` 作為 ring buffer 的開頭和結尾。而這邊使用了 ` __ALIGN`,其定義為
```cpp
#define __ALIGN __attribute__((aligned(64)))
```
由於題目有說明要避免 false sharing,因此將常使用到的物件以 64byte 為一個單位,如此一來就不會被存在同個 cacheline,而不會造成頻繁的 cache coherence。
`batch_head` 、`batch_tail` 和 `batch_history` 目前還不知道是什麼
`element_t data[SPSC_QUEUE_SIZE] __ALIGN` 宣告了此 ring buffer 的大小。
接著是關於 queue 的操作
`queue_init` 使用 [`memset`](https://man7.org/linux/man-pages/man3/memset.3.html) 將指向的 self 以 0 填滿。
```cpp
static void queue_init(spsc_queue_t *self)
{
memset(self, 0, sizeof(spsc_queue_t));
self->batch_history = SPSC_BATCH_SIZE;
}
```
`SPSC_BATCH_SIZE` 為 queue_size 除以 16
```cpp
#define SPSC_BATCH_SIZE (SPSC_QUEUE_SIZE / 16)
```
接著看 `dequeue` ,將資料取出