# Linux 核心專題: 閱讀 ctp contributed by < `eleanorLYJ` > > [Linux 核心專題: RCU 實作](https://hackmd.io/@sysprog/HkkWZ20B0) :::danger 將以下內容彙整到上方超連結指向的筆記中。 ::: ## [CTP](https://github.com/nicowilliams/ctp) 介紹 CTP 專案是類似使用者狀態 read-copy-update (RCU) 的 thread-safe variable (TSV)。與 RCU 一樣,此專案的 API 確保 reader 執行緒在無鎖的情況下運行,並且不會被 writer 阻塞。另一方面,writer 會等待最後引用的 reader 執行緒完成讀取後才會寫入。 <!-- 在引用計數的幫助下,記憶體管理是自動的:“當釋放對某個值的最後一個引用時,值會自動銷毀。” --> ### 理解 RCU 閱讀 [Linux 核心設計: RCU 同步機制](https://hackmd.io/@sysprog/linux-rcu)、 [Is Parallel Programming Hard, And, If So, What Can You Do About It?](https://mirrors.edge.kernel.org/pub/linux/kernel/people/paulmck/perfbook/perfbook.html) 的第九章 Deferred Processing ### tsv 的好處 根據 [Using Thread Safe Variables to Protect Data ](https://www.ni.com/docs/en-US/bundle/labwindows-cvi/page/cvi/programmerref/threadsafevariable.htm) 得知關於 tsv 好處: - 避免常見的多執行緒錯誤,如在存取變數之前忘記取得鎖的常見錯誤。 - 易於資料傳遞,只需傳遞 thread-safe variable handle,即可在函數之間傳遞受保護的數據,而無需同時傳遞 thread-lock handle 和受保護的變數。 :::danger 列出 TSV 的理論依據、學術論文、技術報告 ::: ### CTP 中兩種主要實作之間的差異:Slot-Pair 與 Slot-List #### Slot-Pair 結構:每個 tsv 都有兩個 slot,一個用於當前數據,另一個用於上一個或下一個資料 操作:具有 O(1) 無鎖和 spin-less 讀取以及 O(1) 序列化(serialized )寫入。 reader 行為: 讀取器呼叫 free() 和值析構函數(value destructor),並且可能需要向潛在等待的寫入器發出訊號,這涉及獲取互斥鎖。儘管這在技術上是阻塞的,但它通常是在無競爭的資源上。 writer 行為: writer 將"上一個" slot 切換到"當前" slot。讀者從目前指定的 slot 讀取。 writer 會等待,直到前一個 slot 沒有活躍的 reader 為止。該值是引用計數的,因此當最後一個引用被刪除時會自動釋放。 #### Slot-List 操作:提供 O(1) 無鎖讀取,並由序列化寫入器在 O(N log(M)) 時間內垃圾收集未引用的值。 結構:維護引用值的鏈結串列,串列的頭部始終是當前值。它還維護一個 subscription slots ,每個 reader 執行緒都有一個 slot。第一次讀取時,reader 分配一個 slot 並將值串列的頭部複製到其 slot 中。 reader 行為: subscription slots 分配是無鎖的。讀取器中的所有內容都是無鎖的 writer 行為: writer 對引用值鏈結串列執行垃圾回收(Garbage Collection, GC)。值在最後一個引用被刪除後的第一次寫入時被釋放,因為它們被 writer 回收。 ### ctp 目前有六個函式能被使用 - `int thread_safe_var_init(thread_safe_var *vpp, thread_safe_var_dtor_f dtor)`: 初始化 tsv 變數的所有成員 - `thread_safe_var_destroy` - `int thread_safe_var_get(thread_safe_var vp, void **res, uint64_t *version)`: reader 取得變數的目前值,此外管理參考計數和 slot 選擇。 - `void thread_safe_var_release(thread_safe_var vp)` - `int thread_safe_var_set(thread_safe_var vp, void *data, uint64_t *new_version)`: data 是要寫入結構為 `value *` 的值(new_value),vp 是 Pointer to thread-safe global variable 與 new_version 則會填入 new_value的版本號。返回0 代表成功。 - `int thread_safe_var_wait(thread_safe_var vp)` : 當還沒有任何資料被寫入時,reader需要等待。 ## 從起始點看 ctp API 使用 根據從程式的起始點 main() 的 開始認識 ctp 的 API 使用 main() 主要過程 1. 初始化,包括處理器的數目,預設為 20 2. 根據根據處理器數目設定 reader 和 write r執行緒數量,reader 執行緒的數量等於處理器的數量,而 writer 執行緒 則是 reader 執行緒的數量的五分之一,而最少為一。在建立執行緒後並且用`pthread_detach`將其分離,執行緒結束時不需要用 join 回收執行緒的資源。 3. 三種測量內容: 測(1)多執行緒的讀、寫、(2)多執行緒的讀與(3)單執行緒的多次寫入的效能,並輸出時間 4. 最後銷毀 tsv (變數 var)、釋放 allocated 的記憶體。 以下是測量多執行緒讀寫 或是測多執行緒的讀時出現一樣的的程式碼,其目的是要等待所有執行緒結束的(將錯誤檢查部分移除的版本) 多執行緒的讀寫時, 變數 `nthread` 會等於 reader 加 writer 的總執行緒數量。而在只有讀時,變數 `nthread` 等於 reader 執行緒數量。 ```c while (atomic_cas_32(&nthreads, 0, 0) > 0) { pthread_cond_wait(&exit_cv, &exit_cv_lock); if (atomic_cas_32(&nthreads, 0, 0) == nreaders) thread_safe_var_set(var, magic_exit, &last_version } ``` #### 多執行緒的讀寫 要看懂以這段,要先去看 reader 與 writer 的函式寫法。 reader 函式,先用 tsv_wait() 等到 writer 第一次將值寫入tsv,之後就開始計時以下動作,用tsv_get() 取得資料、休眠,直到讀到 writer 寫入 `MAGIC_EXIT`,停止計時。離開 reader 函式前,用 `atomic_dec_32_nv` 將 nthreads 減一。最後 signal (exit_cv)。 writer 函式,模擬寫入執行緒以受控的睡眠間隔執行指定次數的寫入。整個過程同樣有計時,並且相同地,在離開 writer 函式前,用 `atomic_dec_32_nv` 將 nthreads 減一。最後 signal (exit_cv)。 因此,每個執行緒完成工作時 nthreads 遞減。主循環等待 nthreads 變成0,在以上擷取至 main function 的第二行,pthread_cond_wait(exit_cv) 在 reader 或 writer 發出要離開的條件變數前先阻塞,接著多判斷,如果 nthreads 都只剩下 reader 執行緒,藉由 tsv_set 寫入 magic_exit 進 tsv(變數 var),就提早指示 tsv 指示離開。 >這段程式會race condition,因為 現在的設計中,nthreads 減少到 nreaders 是假設所有的 writer 執行緒都已經完成並退出。然而,如果一個 writer 執行緒剛好完成並退出,nthreads 減少,但是signal 起一個 reader,這樣會導致 nthreads 減少到 nreaders。 >TODO: 使用另一個變數來明確跟蹤 writer 執行緒的完成狀態。一個變數 nwriters_done,用來記錄已經完成的 writer 執行緒數量,並且在 nwriters_done 等於 nwriters 時,寫入退出訊號至tsv中。 #### 僅測多執行緒的讀 與測量多執行緒的讀、寫的程式碼類似,除了沒有執行 writer 執行緒,reader 函式 (`idle_reader`),也沒有 tsv_wait() 去等待writer的第一次寫入。 <!-- :::success atomic_dec_32_nv ::: --> ## API 的實作 在 threead_safe_global.h 中表示, `thread_safe_var` 將替代 `thread_safe_var_s *` ```c typedef struct thread_safe_var_s *thread_safe_var; ``` #### `thread_safe_var_wait(thread_safe_var vp)` slot-pair 與 slot-list 共有的函式,此函數目的是要等著 writer 第一次將值寫入。 倘若已經 能從 vp 取得到值,那就不用等了,return 0。 其餘,鎖定 waiter_lock 使用 `pthread_cond_wait()` 阻塞,直到 writer第一次寫入值,之後用 pthread_cond_signal() 喚醒等待在指定條件變數上的一個線程,之後同樣再用 pthread_cond_signal() 逐一喚起其他的 waiter。 ```c if (thread_safe_var_get(vp, &junk, NULL) == 0 && junk != NULL) return 0; pthread_mutex_lock(&vp->waiter_lock); while (thread_safe_var_get(vp, &junk, NULL) == 0 && junk == NULL) { if (pthread_cond_wait(&vp->waiter_cv, &vp->waiter_lock) != 0) { pthread_mutex_unlock(&vp->waiter_lock); return err; } } ``` 等著writer第一次將值寫入, 跟 futex 一樣,為了防止[驚群問題(thundering herd problem)](https://en.wikipedia.org/wiki/Thundering_herd_problem),调用 `pthread_cond_signal(&vp->waiter_cv)` ,而不是用廣播的方式,確保所有等待的 reader 執行緒是逐一喚醒。 ```c pthread_cond_signal(&vp->waiter_cv);     pthread_mutex_unlock(&vp->waiter_lock); ``` ### slot-pair tsv 中有兩個 slot,一個 slot 存 "current" value,另一個 存 "previous"/"next" value。 writer 將 "previous" slot 放入下一個 "current" slot,而讀取器從看起來是"current" slot 的槽中讀取。 #### 資料結構 核心的資料在 vwrapper 中,var 為 vwrapper 的 wrapper, 而在 thread_safe_var_s 中存有 一對 slot 另外還有多個條件變數與鎖。 ```c struct vwrapper { var_dtor_t dtor; /* value destructor */ void *ptr; /* the actual value */ uint64_t version; /* version of this data */ volatile uint32_t nref; /* release when drops to 0 */ }; /* This is a slot. There are two of these. */ struct var { struct vwrapper *wrapper; /* wraps real ptr, has nref */ struct var *other; /* always points to the other slot */ uint64_t version; /* version of this slot's data */ volatile uint32_t nreaders; /* no. of readers active in this slot */ }; struct thread_safe_var_s { pthread_key_t tkey; /* to detect thread exits */ pthread_mutex_t write_lock; /* one writer at a time */ pthread_mutex_t waiter_lock; /* to signal waiters */ pthread_cond_t waiter_cv; /* to signal waiters */ pthread_mutex_t cv_lock; /* to signal waiting writer */ pthread_cond_t cv; /* to signal waiting writer */ struct var vars[2]; /* the two slots */ var_dtor_t dtor; /* both read this */ uint64_t next_version; /* both read; writer writes */ }; ``` TODO: 畫個圖 <!-- #### `thread_safe_var_init` `pthread_key_create(&vp->tkey, var_dtor_wrapper))` 其中 wrapper 是 thread 獨立擁有的 :::info ==解釋== - pthread_key_create - `slot = pthread_getspecific(vp->tkey)` ensures that each thread has its own specific slot associated with it !!!!!!!!! ::: --> #### `thread_safe_var_get` <!-- > 讀取器呼叫 free() 和值析構函數(value destructor),並且可能需要向潛在等待的寫入器發出訊號,這涉及獲取互斥鎖。儘管這在技術上是阻塞的,但它通常是在無競爭的資源上。--> `get` 的快速路徑為,如果這個執行緒擁有的 `wrapper` (`pthread_getspecific(vp->tkey)`) 與 `tsv` 的 `next_version - 1` 一致,就將 `wrapper` 的版本與實際值(`ptr`)存回參數中,並結束函式。 ```c if ((wrapper = pthread_getspecific(vp->tkey)) != NULL && wrapper->version == atomic_read_64(&vp->next_version) - 1) { /* Fast path */ *version = wrapper->version; *res = wrapper->ptr; return 0; } ``` 在確定 當前最新的 slot 的迴圈中,會與 writers 競爭。先讀取 version 之後讀取 slot (`v`),之後判斷如果 `atomic_read_64(&vp->next_version) == (*version + 1)` 代表中途是否有其他執行緒在競爭,如果等於,這就確定 slot。反之不等於,並且如果 活躍 reader (nreader) 是 0,就 `signal_writer()`通知 writer 寫入,其方式是使用 `vp->cv` 條件變數告知 writer ,而 reader 則繼續在此迴圈,直到確定好slot。 ```c static int signal_writer(thread_safe_var vp) { pthread_mutex_lock(&vp->cv_lock); err = pthread_cond_signal(&vp->cv); return pthread_mutex_unlock(&vp->cv_lock); } ``` 選擇好 `slot` 後,就可以將 `slot` 中的 `vwrapper` 引用加一,並將 `vwrapper` 的值寫進 `*version` 與 `*res` 中。 接著表示讀取完畢,將 `slot` 的活躍 reader 減一,然而此時,活躍 reader 等於 0,如果執行緒的最新版不是此當前 `slot` 的版本,那就要 `signal_write` ,因為代表有 writer 在等待。這樣的作法是要確保讀寫操作的同步,避免競爭 ```c if (atomic_dec_32_nv(&v->nreaders) == 0 && atomic_read_64(&vp->next_version) != (*version + 1)) err2 = signal_writer(vp); ``` 最後 釋放之前的 `wrapper` 並設置新的 `wrapper`。 #### `thread_safe_var_set` 首先,構建一個新的 `wrapper` 變數來保存新的值。 ```c wrapper = calloc(1, sizeof(*wrapper) wrapper->dtor = vp->dtor; wrapper->nref = 0; wrapper->ptr = cfdata; ``` 接著,使用 `pthread_mutex_lock(&vp->write_lock)` 來鎖定寫入操作,這也起到 memory barrier 的作用 (==這我不確定,原作者寫的==),以確保之前的寫操作完成。隨後,將新 `wrapper` 的版本號設置為 `vp->next_version`,也將這個版本號返回給 `new_version`。 接著,獲取下一個 slot `v`。 如果是這 tsv 第一次寫入,則將這新 `wrapper` 設置在兩個 slot 上。每個 slot 的 wrapper 引用計數(`nref`) 增加一次,並將這些 slot 的 wrapper 更新為新的 `wrapper`。然後,將 `vp->next_version` 增加一,並藉由 `pthread_cond_signal(&vp->waiter_cv)`通知等待的 reader ,讀取新的值。最後,解鎖寫入操作並結束函式。 如果這不是第一次寫入,則這新 `wrapper` 的引用計數增加一次。然後,等待直到 slot 沒有活躍的 reader。如果有活躍的 reader,則使用 `pthread_cond_wait(&vp->cv, &vp->cv_lock)` 不斷等待,直到所有讀者釋放 `cv`。 ```c while (atomic_read_32(&v->nreaders) > 0) { pthread_cond_wait(&vp->cv, &vp->cv_lock)) != 0); ``` 接著,解鎖 `cv_lock` 後,將 `v->wrapper` 更新為新的 `wrapper`,並將 `v->version` 和 `vp->next_version` 都增加一。最後,釋放舊的 `wrapper`,並解鎖 `write_lock`。 這個過程與 QSBR(Quiescent State Based Reclamation)很相似,只有在最後一個活躍的 reader 不再讀取後,才能寫入新值並回收舊值。 ### slot-list #### 資料結構 ```c struct value { volatile struct value *next; /* previous (still ref'd) value */ void *value; /* actual value */ volatile uint64_t version; /* version number */ volatile uint32_t referenced; /* for mark and sweep */ }; /* * Each thread that has read this thread-safe global variable gets one * of these. */ struct slot { volatile struct value *value; /* reference to last value read */ volatile uint32_t in_use; /* atomic */ thread_safe_var vp; /* for cleanup from thread key dtor */ /* We could add a pthread_t here */ }; /* * Slots are allocated in arrays that are linked into one larger logical * array. */ struct slots { volatile struct slots *next; /* atomic */ struct slot *slot_array;/* atomic */ volatile uint32_t slot_count; /* atomic */ uint32_t slot_base; /* logical index of slot_array[0] */ }; struct thread_safe_var_s { pthread_key_t tkey; /* to detect thread exits */ pthread_mutex_t write_lock; /* one writer at a time */ pthread_mutex_t waiter_lock; /* to signal waiters */ pthread_cond_t waiter_cv; /* to signal waiters */ var_dtor_t dtor; /* value destructor */ volatile struct value *values; /* atomic ref'd value list head */ volatile struct slots *slots; /* atomic reader subscription slots */ volatile uint32_t next_slot_idx; /* atomic index of next new slot */ volatile uint32_t slots_in_use; /* atomic count of live readers */ uint32_t nvalues; /* writer-only; for housekeeping */ }; ``` #### `int thread_safe_var_get(thread_safe_var vp, void **res, uint64_t *version)` - 將 vp->tkey 寫入 slot 變數 - 如果還沒寫入任何值進 vp->tkey 中,用 get_free_slot 尋找空的 slot,倘若都沒有空的 slot 才用 grow_slot 去產出一個新的 slot,之後將 slot 寫入 vp->tkey (其他thread 也就可以看到此slot了(代表是最新slot?))) 接著用while loop 中反覆確認,slot 的value 就是 vp->tkey->values的首位, 也就是當前最新的值,倘若不是,就將slot->value 的值寫進vp->tkey->values的首位。 #### grow_slot > 補 #### `thread_safe_var_set` <!-- 順序? --> 建構 new_value (存data 進去)之後,new_value->next 指向 values 的開頭,之後也確定 new_version了 ```c new_value->value = data; new_value->next = atomic_read_ptr((volatile void **)&vp->values); if (new_value->next == NULL) new_value->version = 1; else new_value->version = new_value->next->version + 1; *new_version = new_value->version; ``` 接著確定 new_value 為 vp->values 的開頭,並且vp 上 的 values 數加一。 ```c /* Publish the new value */ atomic_write_ptr((volatile void **)&vp->values, new_value); vp->nvalues++; ``` 同樣有記得要去 `pthread_cond_signal(&vp->waiter_cv);` 再等待的read wait thread。 呼叫 `mark_values(vp)` 對舊值執行垃圾回收(Garabage Collection, GC),標記仍在使用的值並識別可以釋放的值。下一段介紹更多。 ```c old_values = mark_values(vp); ``` 接著,呼叫`sched_yield()` 放棄 CPU,允許其他執行緒(最好是讀取器)運作,接著解鎖 write_lock、釋放不再使用的舊值的記憶體,如果已設置,則呼叫析構函數 dtor,最終返回解鎖寫鎖的狀態。 #### `mark_value` `mark_values` 的目的是標記目前被引用的值,並回收那些不再被引用的舊值,基於 mark-and-sweep 的 GC。以下透過如何不把 `thread_safe_var_set` 剛寫入的值給回收掉的角度,去理解 `mark_value`。 mark-and-sweep 主要兩階段: ##### 標記階段 (mark phase): 先走訪 tsv (變數名稱: `vp`)上所有的 `value` 這些值儲存在 `old_values_array` 中,接著走訪所有的 slots 中的 `value`,如果這 value 也有出現在 `old_values_array`,被認為還有被引用,在第二階段時就不會被回收掉。 為了加速,使用 qsort 為 `old_values_array` 做排序,之後用二分搜尋法判斷某 value 是否存在在 `old_values_array`。 ```c for (slots = atomic_read_ptr((volatile void **)&vp->slots); slots != NULL;) { for (i = 0; i < slots->slot_count; i++) { slot = &slots->slot_array[i]; v = atomic_read_ptr((volatile void **)&slot->value); if (v == NULL || v == vp->values) /* vp->values 中的最新值總是被設定為 referenced,確保了它不會被誤刪除 */ continue; if ((value_binary_search(old_values_array, vp->nvalues, v)) != NULL) { v->referenced = 1; } } slots = slots->next; } ``` ##### 清除階段 (sweep phase): 將 那些 `tsv` 的 `values` 沒有被標記的值從鍊結串列中移除,並將這些 `values` 加入 `old_values` 鍊結串列中。 而那些有在標記階段的有標記的 values,重設 referenced 為 0,為了下一次做 GC 有機會被回收。 ```c for (p = &vp->values; *p != NULL;) { v = *p; if (!v->referenced) { *p = v->next; v->next = old_values; old_values = v; // 將 v 接到 old_values 的頭 vp->nvalues--; continue; } v->referenced = 0; p = &v->next; } ``` 理解 mark_values 的關鍵在於理解 mark_values 函數如何區分目前有效的值和無效的值。 > 在發佈新值之前,實作不會等待顯式寬限期。相反,它依賴於 `mark_values()` 確定哪些值可以安全回收,也許這是一個TODO。也沒辦法做 批次回收。 ## TODO ### 增加 C thread-primitives 目前 thread_safe_var_get 就要 value 就要釋放先前的值, 多寫一個能夠函式控制 reader 是否要釋放。 ### 新增 static anaysis ### 重構 1. 重構: init 部分將多個重複的程式碼 改成go 減少重複 2. `pthread_key_create(&vp->tkey, var_dtor_wrapper` 的 var_dtor_wrapper 只是呼叫 wrapper_free,能將 var_dtor_wrapper 拿掉,更精簡。 ```c static void var_dtor_wrapper(void *wrapper) { wrapper_free(wrapper); } ``` atomic 改成 gcc in-built function 或是看能不能符合C11的要求 使用 gcc 內建函式 增加跨平台兼容性 ``` __sync_val_compare_and_swap(a, b, c) __sync_fetch_and_add(a, 1) __sync_add_and_fetch(a, 1) ``` 3. 再測一次 data race (原作者測了TSAN 與 helgrind) 4. 改善 batch 回收 (mark_value 那邊) writer 登記 callback 最後一個離開安靜模式 的 reader 要呼叫 這callback 而累積多個reader 的 callback,再一次做回收 callback :::info * userspace RCU 提供多種 flavor,不過要依據應用場景予以調整,才能發揮其作用,例如 [Redis + urcu](https://hackmd.io/@sysprog/B1W2HKTER) * CTP 除了提供寬鬆的授權條款 (相較於 GNU LGPLv2) 替代選擇,我最初留意到其使用模式可能會比 urcu 單純,於是指派你來分析,不過我仍不確定 CTP 能否替代 urcu,特別是像前一項的 mt-redis * 依據 CTP 的設計,其 writer 的成本應該比 RCU 的低,不過我沒看到相關的佐證 * CTP 可對應到 urcu 哪個 flavor 的表現? :::