# Linux 核心專題: 閱讀 ctp
contributed by < `eleanorLYJ` >
> [Linux 核心專題: RCU 實作](https://hackmd.io/@sysprog/HkkWZ20B0)
:::danger
將以下內容彙整到上方超連結指向的筆記中。
:::
## [CTP](https://github.com/nicowilliams/ctp) 介紹
CTP 專案是類似使用者狀態 read-copy-update (RCU) 的 thread-safe variable (TSV)。與 RCU 一樣,此專案的 API 確保 reader 執行緒在無鎖的情況下運行,並且不會被 writer 阻塞。另一方面,writer 會等待最後引用的 reader 執行緒完成讀取後才會寫入。
<!-- 在引用計數的幫助下,記憶體管理是自動的:“當釋放對某個值的最後一個引用時,值會自動銷毀。” -->
### 理解 RCU
閱讀 [Linux 核心設計: RCU 同步機制](https://hackmd.io/@sysprog/linux-rcu)、 [Is Parallel Programming Hard, And, If So, What Can You Do About It?](https://mirrors.edge.kernel.org/pub/linux/kernel/people/paulmck/perfbook/perfbook.html) 的第九章 Deferred Processing
### tsv 的好處
根據 [Using Thread Safe Variables to Protect Data
](https://www.ni.com/docs/en-US/bundle/labwindows-cvi/page/cvi/programmerref/threadsafevariable.htm) 得知關於 tsv 好處:
- 避免常見的多執行緒錯誤,如在存取變數之前忘記取得鎖的常見錯誤。
- 易於資料傳遞,只需傳遞 thread-safe variable handle,即可在函數之間傳遞受保護的數據,而無需同時傳遞 thread-lock handle 和受保護的變數。
:::danger
列出 TSV 的理論依據、學術論文、技術報告
:::
### CTP 中兩種主要實作之間的差異:Slot-Pair 與 Slot-List
#### Slot-Pair
結構:每個 tsv 都有兩個 slot,一個用於當前數據,另一個用於上一個或下一個資料
操作:具有 O(1) 無鎖和 spin-less 讀取以及 O(1) 序列化(serialized )寫入。
reader 行為:
讀取器呼叫 free() 和值析構函數(value destructor),並且可能需要向潛在等待的寫入器發出訊號,這涉及獲取互斥鎖。儘管這在技術上是阻塞的,但它通常是在無競爭的資源上。
writer 行為:
writer 將"上一個" slot 切換到"當前" slot。讀者從目前指定的 slot 讀取。
writer 會等待,直到前一個 slot 沒有活躍的 reader 為止。該值是引用計數的,因此當最後一個引用被刪除時會自動釋放。
#### Slot-List
操作:提供 O(1) 無鎖讀取,並由序列化寫入器在 O(N log(M)) 時間內垃圾收集未引用的值。
結構:維護引用值的鏈結串列,串列的頭部始終是當前值。它還維護一個 subscription slots ,每個 reader 執行緒都有一個 slot。第一次讀取時,reader 分配一個 slot 並將值串列的頭部複製到其 slot 中。
reader 行為:
subscription slots 分配是無鎖的。讀取器中的所有內容都是無鎖的
writer 行為:
writer 對引用值鏈結串列執行垃圾回收(Garbage Collection, GC)。值在最後一個引用被刪除後的第一次寫入時被釋放,因為它們被 writer 回收。
### ctp 目前有六個函式能被使用
- `int thread_safe_var_init(thread_safe_var *vpp, thread_safe_var_dtor_f dtor)`: 初始化 tsv 變數的所有成員
- `thread_safe_var_destroy`
- `int thread_safe_var_get(thread_safe_var vp, void **res, uint64_t *version)`: reader 取得變數的目前值,此外管理參考計數和 slot 選擇。
- `void thread_safe_var_release(thread_safe_var vp)`
- `int thread_safe_var_set(thread_safe_var vp, void *data, uint64_t *new_version)`: data 是要寫入結構為 `value *` 的值(new_value),vp 是 Pointer to thread-safe global variable 與 new_version 則會填入 new_value的版本號。返回0 代表成功。
- `int thread_safe_var_wait(thread_safe_var vp)` : 當還沒有任何資料被寫入時,reader需要等待。
## 從起始點看 ctp API 使用
根據從程式的起始點 main() 的 開始認識 ctp 的 API 使用
main() 主要過程
1. 初始化,包括處理器的數目,預設為 20
2. 根據根據處理器數目設定 reader 和 write r執行緒數量,reader 執行緒的數量等於處理器的數量,而 writer 執行緒 則是 reader 執行緒的數量的五分之一,而最少為一。在建立執行緒後並且用`pthread_detach`將其分離,執行緒結束時不需要用 join 回收執行緒的資源。
3. 三種測量內容: 測(1)多執行緒的讀、寫、(2)多執行緒的讀與(3)單執行緒的多次寫入的效能,並輸出時間
4. 最後銷毀 tsv (變數 var)、釋放 allocated 的記憶體。
以下是測量多執行緒讀寫 或是測多執行緒的讀時出現一樣的的程式碼,其目的是要等待所有執行緒結束的(將錯誤檢查部分移除的版本)
多執行緒的讀寫時, 變數 `nthread` 會等於 reader 加 writer 的總執行緒數量。而在只有讀時,變數 `nthread` 等於 reader 執行緒數量。
```c
while (atomic_cas_32(&nthreads, 0, 0) > 0) {
pthread_cond_wait(&exit_cv, &exit_cv_lock);
if (atomic_cas_32(&nthreads, 0, 0) == nreaders)
thread_safe_var_set(var, magic_exit, &last_version
}
```
#### 多執行緒的讀寫
要看懂以這段,要先去看 reader 與 writer 的函式寫法。
reader 函式,先用 tsv_wait() 等到 writer 第一次將值寫入tsv,之後就開始計時以下動作,用tsv_get() 取得資料、休眠,直到讀到 writer 寫入 `MAGIC_EXIT`,停止計時。離開 reader 函式前,用 `atomic_dec_32_nv` 將 nthreads 減一。最後 signal (exit_cv)。
writer 函式,模擬寫入執行緒以受控的睡眠間隔執行指定次數的寫入。整個過程同樣有計時,並且相同地,在離開 writer 函式前,用 `atomic_dec_32_nv` 將 nthreads 減一。最後 signal (exit_cv)。
因此,每個執行緒完成工作時 nthreads 遞減。主循環等待 nthreads 變成0,在以上擷取至 main function 的第二行,pthread_cond_wait(exit_cv) 在 reader 或 writer 發出要離開的條件變數前先阻塞,接著多判斷,如果 nthreads 都只剩下 reader 執行緒,藉由 tsv_set 寫入 magic_exit 進 tsv(變數 var),就提早指示 tsv 指示離開。
>這段程式會race condition,因為 現在的設計中,nthreads 減少到 nreaders 是假設所有的 writer 執行緒都已經完成並退出。然而,如果一個 writer 執行緒剛好完成並退出,nthreads 減少,但是signal 起一個 reader,這樣會導致 nthreads 減少到 nreaders。
>TODO: 使用另一個變數來明確跟蹤 writer 執行緒的完成狀態。一個變數 nwriters_done,用來記錄已經完成的 writer 執行緒數量,並且在 nwriters_done 等於 nwriters 時,寫入退出訊號至tsv中。
#### 僅測多執行緒的讀
與測量多執行緒的讀、寫的程式碼類似,除了沒有執行 writer 執行緒,reader 函式 (`idle_reader`),也沒有 tsv_wait() 去等待writer的第一次寫入。
<!-- :::success
atomic_dec_32_nv
::: -->
## API 的實作
在 threead_safe_global.h 中表示, `thread_safe_var` 將替代 `thread_safe_var_s *`
```c
typedef struct thread_safe_var_s *thread_safe_var;
```
#### `thread_safe_var_wait(thread_safe_var vp)`
slot-pair 與 slot-list 共有的函式,此函數目的是要等著 writer 第一次將值寫入。
倘若已經 能從 vp 取得到值,那就不用等了,return 0。
其餘,鎖定 waiter_lock
使用 `pthread_cond_wait()` 阻塞,直到 writer第一次寫入值,之後用 pthread_cond_signal() 喚醒等待在指定條件變數上的一個線程,之後同樣再用 pthread_cond_signal() 逐一喚起其他的 waiter。
```c
if (thread_safe_var_get(vp, &junk, NULL) == 0 && junk != NULL)
return 0;
pthread_mutex_lock(&vp->waiter_lock);
while (thread_safe_var_get(vp, &junk, NULL) == 0 && junk == NULL) {
if (pthread_cond_wait(&vp->waiter_cv, &vp->waiter_lock) != 0) {
pthread_mutex_unlock(&vp->waiter_lock);
return err;
}
}
```
等著writer第一次將值寫入,
跟 futex 一樣,為了防止[驚群問題(thundering herd problem)](https://en.wikipedia.org/wiki/Thundering_herd_problem),调用 `pthread_cond_signal(&vp->waiter_cv)` ,而不是用廣播的方式,確保所有等待的 reader 執行緒是逐一喚醒。
```c
pthread_cond_signal(&vp->waiter_cv);
pthread_mutex_unlock(&vp->waiter_lock);
```
### slot-pair
tsv 中有兩個 slot,一個 slot 存 "current" value,另一個 存 "previous"/"next" value。
writer 將 "previous" slot 放入下一個 "current" slot,而讀取器從看起來是"current" slot 的槽中讀取。
#### 資料結構
核心的資料在 vwrapper 中,var 為 vwrapper 的 wrapper, 而在 thread_safe_var_s 中存有 一對 slot 另外還有多個條件變數與鎖。
```c
struct vwrapper {
var_dtor_t dtor; /* value destructor */
void *ptr; /* the actual value */
uint64_t version; /* version of this data */
volatile uint32_t nref; /* release when drops to 0 */
};
/* This is a slot. There are two of these. */
struct var {
struct vwrapper *wrapper; /* wraps real ptr, has nref */
struct var *other; /* always points to the other slot */
uint64_t version; /* version of this slot's data */
volatile uint32_t nreaders; /* no. of readers active in this slot */
};
struct thread_safe_var_s {
pthread_key_t tkey; /* to detect thread exits */
pthread_mutex_t write_lock; /* one writer at a time */
pthread_mutex_t waiter_lock; /* to signal waiters */
pthread_cond_t waiter_cv; /* to signal waiters */
pthread_mutex_t cv_lock; /* to signal waiting writer */
pthread_cond_t cv; /* to signal waiting writer */
struct var vars[2]; /* the two slots */
var_dtor_t dtor; /* both read this */
uint64_t next_version; /* both read; writer writes */
};
```
TODO: 畫個圖
<!-- #### `thread_safe_var_init`
`pthread_key_create(&vp->tkey, var_dtor_wrapper))`
其中
wrapper 是 thread 獨立擁有的
:::info
==解釋==
- pthread_key_create
- `slot = pthread_getspecific(vp->tkey)` ensures that each thread has its own specific slot associated with it !!!!!!!!!
:::
-->
#### `thread_safe_var_get`
<!-- > 讀取器呼叫 free() 和值析構函數(value destructor),並且可能需要向潛在等待的寫入器發出訊號,這涉及獲取互斥鎖。儘管這在技術上是阻塞的,但它通常是在無競爭的資源上。-->
`get` 的快速路徑為,如果這個執行緒擁有的 `wrapper` (`pthread_getspecific(vp->tkey)`) 與 `tsv` 的 `next_version - 1` 一致,就將 `wrapper` 的版本與實際值(`ptr`)存回參數中,並結束函式。
```c
if ((wrapper = pthread_getspecific(vp->tkey)) != NULL &&
wrapper->version == atomic_read_64(&vp->next_version) - 1) {
/* Fast path */
*version = wrapper->version;
*res = wrapper->ptr;
return 0;
}
```
在確定 當前最新的 slot 的迴圈中,會與 writers 競爭。先讀取 version 之後讀取 slot (`v`),之後判斷如果 `atomic_read_64(&vp->next_version) == (*version + 1)` 代表中途是否有其他執行緒在競爭,如果等於,這就確定 slot。反之不等於,並且如果 活躍 reader (nreader) 是 0,就 `signal_writer()`通知 writer 寫入,其方式是使用 `vp->cv` 條件變數告知 writer ,而 reader 則繼續在此迴圈,直到確定好slot。
```c
static int signal_writer(thread_safe_var vp)
{
pthread_mutex_lock(&vp->cv_lock);
err = pthread_cond_signal(&vp->cv);
return pthread_mutex_unlock(&vp->cv_lock);
}
```
選擇好 `slot` 後,就可以將 `slot` 中的 `vwrapper` 引用加一,並將 `vwrapper` 的值寫進 `*version` 與 `*res` 中。
接著表示讀取完畢,將 `slot` 的活躍 reader 減一,然而此時,活躍 reader 等於 0,如果執行緒的最新版不是此當前 `slot` 的版本,那就要 `signal_write` ,因為代表有 writer 在等待。這樣的作法是要確保讀寫操作的同步,避免競爭
```c
if (atomic_dec_32_nv(&v->nreaders) == 0 &&
atomic_read_64(&vp->next_version) != (*version + 1))
err2 = signal_writer(vp);
```
最後 釋放之前的 `wrapper` 並設置新的 `wrapper`。
#### `thread_safe_var_set`
首先,構建一個新的 `wrapper` 變數來保存新的值。
```c
wrapper = calloc(1, sizeof(*wrapper)
wrapper->dtor = vp->dtor;
wrapper->nref = 0;
wrapper->ptr = cfdata;
```
接著,使用 `pthread_mutex_lock(&vp->write_lock)` 來鎖定寫入操作,這也起到 memory barrier 的作用 (==這我不確定,原作者寫的==),以確保之前的寫操作完成。隨後,將新 `wrapper` 的版本號設置為 `vp->next_version`,也將這個版本號返回給 `new_version`。
接著,獲取下一個 slot `v`。
如果是這 tsv 第一次寫入,則將這新 `wrapper` 設置在兩個 slot 上。每個 slot 的 wrapper 引用計數(`nref`) 增加一次,並將這些 slot 的 wrapper 更新為新的 `wrapper`。然後,將 `vp->next_version` 增加一,並藉由 `pthread_cond_signal(&vp->waiter_cv)`通知等待的 reader ,讀取新的值。最後,解鎖寫入操作並結束函式。
如果這不是第一次寫入,則這新 `wrapper` 的引用計數增加一次。然後,等待直到 slot 沒有活躍的 reader。如果有活躍的 reader,則使用 `pthread_cond_wait(&vp->cv, &vp->cv_lock)` 不斷等待,直到所有讀者釋放 `cv`。
```c
while (atomic_read_32(&v->nreaders) > 0) {
pthread_cond_wait(&vp->cv, &vp->cv_lock)) != 0);
```
接著,解鎖 `cv_lock` 後,將 `v->wrapper` 更新為新的 `wrapper`,並將 `v->version` 和 `vp->next_version` 都增加一。最後,釋放舊的 `wrapper`,並解鎖 `write_lock`。
這個過程與 QSBR(Quiescent State Based Reclamation)很相似,只有在最後一個活躍的 reader 不再讀取後,才能寫入新值並回收舊值。
### slot-list
#### 資料結構
```c
struct value {
volatile struct value *next; /* previous (still ref'd) value */
void *value; /* actual value */
volatile uint64_t version; /* version number */
volatile uint32_t referenced; /* for mark and sweep */
};
/*
* Each thread that has read this thread-safe global variable gets one
* of these.
*/
struct slot {
volatile struct value *value; /* reference to last value read */
volatile uint32_t in_use; /* atomic */
thread_safe_var vp; /* for cleanup from thread key dtor */
/* We could add a pthread_t here */
};
/*
* Slots are allocated in arrays that are linked into one larger logical
* array.
*/
struct slots {
volatile struct slots *next; /* atomic */
struct slot *slot_array;/* atomic */
volatile uint32_t slot_count; /* atomic */
uint32_t slot_base; /* logical index of slot_array[0] */
};
struct thread_safe_var_s {
pthread_key_t tkey; /* to detect thread exits */
pthread_mutex_t write_lock; /* one writer at a time */
pthread_mutex_t waiter_lock; /* to signal waiters */
pthread_cond_t waiter_cv; /* to signal waiters */
var_dtor_t dtor; /* value destructor */
volatile struct value *values; /* atomic ref'd value list head */
volatile struct slots *slots; /* atomic reader subscription slots */
volatile uint32_t next_slot_idx; /* atomic index of next new slot */
volatile uint32_t slots_in_use; /* atomic count of live readers */
uint32_t nvalues; /* writer-only; for housekeeping */
};
```
#### `int thread_safe_var_get(thread_safe_var vp, void **res, uint64_t *version)`
- 將 vp->tkey 寫入 slot 變數
- 如果還沒寫入任何值進 vp->tkey 中,用 get_free_slot 尋找空的 slot,倘若都沒有空的 slot 才用 grow_slot 去產出一個新的 slot,之後將 slot 寫入 vp->tkey (其他thread 也就可以看到此slot了(代表是最新slot?)))
接著用while loop 中反覆確認,slot 的value 就是 vp->tkey->values的首位, 也就是當前最新的值,倘若不是,就將slot->value 的值寫進vp->tkey->values的首位。
#### grow_slot
> 補
#### `thread_safe_var_set`
<!-- 順序? -->
建構 new_value (存data 進去)之後,new_value->next 指向 values 的開頭,之後也確定 new_version了
```c
new_value->value = data;
new_value->next = atomic_read_ptr((volatile void **)&vp->values);
if (new_value->next == NULL)
new_value->version = 1;
else
new_value->version = new_value->next->version + 1;
*new_version = new_value->version;
```
接著確定 new_value 為 vp->values 的開頭,並且vp 上 的 values 數加一。
```c
/* Publish the new value */
atomic_write_ptr((volatile void **)&vp->values, new_value);
vp->nvalues++;
```
同樣有記得要去 `pthread_cond_signal(&vp->waiter_cv);` 再等待的read wait thread。
呼叫 `mark_values(vp)` 對舊值執行垃圾回收(Garabage Collection, GC),標記仍在使用的值並識別可以釋放的值。下一段介紹更多。
```c
old_values = mark_values(vp);
```
接著,呼叫`sched_yield()` 放棄 CPU,允許其他執行緒(最好是讀取器)運作,接著解鎖 write_lock、釋放不再使用的舊值的記憶體,如果已設置,則呼叫析構函數 dtor,最終返回解鎖寫鎖的狀態。
#### `mark_value`
`mark_values` 的目的是標記目前被引用的值,並回收那些不再被引用的舊值,基於 mark-and-sweep 的 GC。以下透過如何不把 `thread_safe_var_set` 剛寫入的值給回收掉的角度,去理解 `mark_value`。
mark-and-sweep 主要兩階段:
##### 標記階段 (mark phase):
先走訪 tsv (變數名稱: `vp`)上所有的 `value` 這些值儲存在 `old_values_array` 中,接著走訪所有的 slots 中的 `value`,如果這 value 也有出現在 `old_values_array`,被認為還有被引用,在第二階段時就不會被回收掉。
為了加速,使用 qsort 為 `old_values_array` 做排序,之後用二分搜尋法判斷某 value 是否存在在 `old_values_array`。
```c
for (slots = atomic_read_ptr((volatile void **)&vp->slots); slots != NULL;) {
for (i = 0; i < slots->slot_count; i++) {
slot = &slots->slot_array[i];
v = atomic_read_ptr((volatile void **)&slot->value);
if (v == NULL || v == vp->values) /* vp->values 中的最新值總是被設定為 referenced,確保了它不會被誤刪除 */
continue;
if ((value_binary_search(old_values_array, vp->nvalues, v)) != NULL) {
v->referenced = 1;
}
}
slots = slots->next;
}
```
##### 清除階段 (sweep phase):
將 那些 `tsv` 的 `values` 沒有被標記的值從鍊結串列中移除,並將這些 `values` 加入 `old_values` 鍊結串列中。 而那些有在標記階段的有標記的 values,重設 referenced 為 0,為了下一次做 GC 有機會被回收。
```c
for (p = &vp->values; *p != NULL;) {
v = *p;
if (!v->referenced) {
*p = v->next;
v->next = old_values;
old_values = v; // 將 v 接到 old_values 的頭
vp->nvalues--;
continue;
}
v->referenced = 0;
p = &v->next;
}
```
理解 mark_values 的關鍵在於理解 mark_values 函數如何區分目前有效的值和無效的值。
> 在發佈新值之前,實作不會等待顯式寬限期。相反,它依賴於 `mark_values()` 確定哪些值可以安全回收,也許這是一個TODO。也沒辦法做 批次回收。
## TODO
### 增加 C thread-primitives
目前 thread_safe_var_get 就要 value 就要釋放先前的值, 多寫一個能夠函式控制 reader 是否要釋放。
### 新增 static anaysis
### 重構
1.
重構: init 部分將多個重複的程式碼 改成go 減少重複
2.
`pthread_key_create(&vp->tkey, var_dtor_wrapper` 的 var_dtor_wrapper 只是呼叫 wrapper_free,能將 var_dtor_wrapper 拿掉,更精簡。
```c
static void
var_dtor_wrapper(void *wrapper)
{
wrapper_free(wrapper);
}
```
atomic 改成 gcc in-built function 或是看能不能符合C11的要求
使用 gcc 內建函式 增加跨平台兼容性
```
__sync_val_compare_and_swap(a, b, c)
__sync_fetch_and_add(a, 1)
__sync_add_and_fetch(a, 1)
```
3.
再測一次 data race (原作者測了TSAN 與 helgrind)
4.
改善 batch 回收
(mark_value 那邊)
writer 登記 callback
最後一個離開安靜模式 的 reader 要呼叫 這callback
而累積多個reader 的 callback,再一次做回收 callback
:::info
* userspace RCU 提供多種 flavor,不過要依據應用場景予以調整,才能發揮其作用,例如 [Redis + urcu](https://hackmd.io/@sysprog/B1W2HKTER)
* CTP 除了提供寬鬆的授權條款 (相較於 GNU LGPLv2) 替代選擇,我最初留意到其使用模式可能會比 urcu 單純,於是指派你來分析,不過我仍不確定 CTP 能否替代 urcu,特別是像前一項的 mt-redis
* 依據 CTP 的設計,其 writer 的成本應該比 RCU 的低,不過我沒看到相關的佐證
* CTP 可對應到 urcu 哪個 flavor 的表現?
:::