--- tags: linux2023 --- # [2023q1](http://wiki.csie.ncku.edu.tw/linux/schedule) 第 8 週測驗題 :::info 目的: 檢驗學員對並行程式設計、記憶體管理的認知 ::: ==[作答表單: 測驗 1-2](https://docs.google.com/forms/d/e/1FAIpQLScjpNZdD1hS4-b0crW98jCAFCuDvydyBNxLHziNG493VbBJKA/viewform)== (針對 Linux 核心「設計」課程) ==[作答表單: 測驗 3](https://docs.google.com/forms/d/e/1FAIpQLSfXQdqv5N7qsnLfbIOo9grToG1Jq3GEw3dWqsQkQPiSw-264g/viewform)== (針對 Linux 核心「實作」課程) ### 測驗 `1` 〈[並行和多執行緒程式設計](https://hackmd.io/@sysprog/concurrency)〉和〈[Linux 核心模組運作原理](https://hackmd.io/@sysprog/linux-kernel-module)〉提及 [reference counting](https://en.wikipedia.org/wiki/Reference_counting),以下程式碼嘗試以 C11 Atomics 實作,參考執行輸出: ``` hread 1632626432, 0: Hello, world! Thread 1632626432, 1: Hello, world! Thread 1632626432, 2: Hello, world! Thread 1632626432, 3: Hello, world! ... Thread 1103886080, 97: Hello, world! Thread 1103886080, 98: Hello, world! Thread 1103886080, 99: Hello, world! ``` 其中 `1103886080` 是執行緒的編號 (thread ID)。一旦 `REFCNT_TRACE` 事先定義,程式輸出如下: ``` ... main.c:141:(test_thread) refcnt_t_unref(str2)main.c:138:(test_thread) refcnt_t_ref(str)Thread 1174378240, 97: Hello, world! main.c:141:(test_thread) refcnt_t_unref(str2)main.c:138:(test_thread) refcnt_t_ref(str)Thread 1174378240, 98: Hello, world! main.c:141:(test_thread) refcnt_t_unref(str2)main.c:138:(test_thread) refcnt_t_ref(str)Thread 1174378240, 99: Hello, world! ``` 原始程式碼如下 (檔名: `main.c`,部分遮蔽) ```c /* * Generic reference counting in C. * * Use refcnt_malloc/refcnt_unref in instead of malloc/free. Use refcnt_ref to * duplicate a reference as necessary. Use refcnt unref to stop using a pointer. * * The resulting string must be released using refcnt_unref since refcnt_strdup * function utilizes refcnt_malloc. * * This implementation is thread-safe with the exception of refcnt_realloc. If * you need to use refcnt_realloc in a multi-threaded environment, you must * synchronize access to the reference. * * If you define REFCNT_CHECK, references passed into refcnt_ref and and * refcnt_unref will be checked that they were created by refcnt_malloc. This is * useful for debugging, but it will slow down your program somewhat. * * If you define REFCNT_TRACE, refcnt_malloc and refcnt_unref will print * the line number and file name where they were called. This is useful for * debugging memory leaks or use after free errors. */ #include <assert.h> #include <stdatomic.h> #include <stddef.h> #include <stdlib.h> #include <string.h> #define REFCNT_CHECK #define maybe_unused __attribute__((unused)) typedef struct { #ifdef REFCNT_CHECK int magic; #endif atomic_uint refcount; char data[]; } refcnt_t; #ifdef REFCNT_TRACE #define _REFCNT_TRACE(call) \ ({ \ fprintf(stderr, "%s:%d:(%s) %s", __FILE__, __LINE__, __FUNCTION__, \ #call); \ call; \ }) #define refcnt_malloc refcnt_t_malloc #define refcnt_realloc refcnt_t_realloc #define refcnt_ref refcnt_t_ref #define refcnt_unref refcnt_t_unref #define refcnt_strdup refcnt_t_strdup #endif #define REFCNT_MAGIC 0xDEADBEEF static maybe_unused void *refcnt_malloc(size_t len) { refcnt_t *ref = malloc(sizeof(refcnt_t) + len); if (!ref) return NULL; #ifdef REFCNT_CHECK ref->magic = REFCNT_MAGIC; #endif atomic_init(&ref->refcount, 1); return ref->data; } static maybe_unused void *refcnt_realloc(void *ptr, size_t len) { refcnt_t *ref = (void *) (ptr - offsetof(AAAA, BBBB)); #ifdef REFCNT_CHECK assert(ref->magic == REFCNT_MAGIC); #endif ref = realloc(ref, sizeof(refcnt_t) + len); if (!ref) return NULL; return ref->data; } static maybe_unused void *refcnt_ref(void *ptr) { refcnt_t *ref = (void *) (ptr - offsetof(refcnt_t, data)); #ifdef REFCNT_CHECK assert(ref->magic == REFCNT_MAGIC && "Invalid refcnt pointer"); #endif CCCC(&ref->refcount, 1); return ref->data; } static maybe_unused void refcnt_unref(void *ptr) { refcnt_t *ref = (void *) (ptr - offsetof(DDDD, EEEE)); #ifdef REFCNT_CHECK assert(ref->magic == REFCNT_MAGIC && "Invalid refcnt pointer"); #endif if (FFFF(&ref->refcount, 1) == 1) free(ref); } static maybe_unused char *refcnt_strdup(char *str) { refcnt_t *ref = malloc(sizeof(refcnt_t) + strlen(str) + 1); if (!ref) return NULL; #ifdef REFCNT_CHECK ref->magic = REFCNT_MAGIC; #endif atomic_init(&ref->refcount, 1); strcpy(ref->data, str); return ref->data; } #ifdef REFCNT_TRACE #undef refcnt_malloc #undef refcnt_realloc #undef refcnt_ref #undef refcnt_unref #undef refcnt_strdup #define refcnt_malloc(len) _REFCNT_TRACE(refcnt_t_malloc(len)) #define refcnt_realloc(ptr, len) _REFCNT_TRACE(refcnt_t_realloc(ptr, len)) #define refcnt_ref(ptr) _REFCNT_TRACE(refcnt_t_ref(ptr)) #define refcnt_unref(ptr) _REFCNT_TRACE(refcnt_t_unref(ptr)) #define refcnt_strdup(ptr) _REFCNT_TRACE(refcnt_t_strdup(ptr)) #endif #include <pthread.h> #include <stdio.h> #include <stdlib.h> #define N_ITERATIONS 100 static void *test_thread(void *arg) { char *str = arg; for (int i = 0; i < N_ITERATIONS; i++) { char *str2 = refcnt_ref(str); fprintf(stderr, "Thread %u, %i: %s\n", (unsigned int) GGGG, i, str2); refcnt_unref(str2); } refcnt_unref(str); return NULL; } #define N_THREADS 64 int main(int argc, char **argv) { /* Create threads */ pthread_t threads[N_THREADS]; /* Create a new string that is count referenced */ char *str = refcnt_strdup("Hello, world!"); /* Start the threads, passing a new counted copy of the referece */ for (int i = 0; i < N_THREADS; i++) pthread_create(&threads[i], NULL, test_thread, refcnt_ref(str)); /* We no longer own the reference */ refcnt_unref(str); /* Whichever thread finishes last will free the string */ for (int i = 0; i < N_THREADS; i++) pthread_join(threads[i], NULL); void *ptr = malloc(100); /* This should cause a heap overflow while checking the magic num which the * sanitizer checks. * Leaving commented out for now */ // refcnt_ref(ptr); free(ptr); return 0; } ``` 編譯方式如下: ```shell gcc -o main -Wall -O2 -fsanitize=thread main.c -lpthread ``` 請補完程式碼,使其運作符合預期,且不會觸發 Thread Sanitizer 在執行時期的各項錯誤或警告。作答規範: * 以最精簡的程式碼實作,均不包含空白字元 * `AAAA`, `BBBB`, `DDDD`, `EEEE` 均為 identifier * `CCCC` 和 `FFFF` 是 C11 atomics 規範的函式,均以 `atomic_fetch_` 開頭 * `GGGG` 是 POSIX Thread 規範的函式 :::success 延伸問題: 1. 解釋上述程式碼運作原理,指出其實作缺失並改進 2. 研讀〈[Linux 核心模組運作原理](https://hackmd.io/@sysprog/linux-kernel-module)〉並在 Linux 核心原始程式碼中找出相關 [reference counting](https://en.wikipedia.org/wiki/Reference_counting) 的程式碼,予以解讀和分析 3. 解釋為何 Linux 核心的同步機制不依賴 [reference counting](https://en.wikipedia.org/wiki/Reference_counting) (提示: 參閱 RCU 的設計理念) ::: --- ### 測驗 `2` Linked list (鏈結串列) 允許時間複雜度為 $O(1)$ 的隨機插入 (insert) 及隨機移去 (remove) 節點,但搜尋卻需要 $O(n)$ 時間複雜度。假設一個鏈結串列內容如下: $$ HEAD \to 1 \to 10 \to 18 \to 38 \to 22 \to 39 \to 47 \to 59 \to next $$ 從開頭 (即上方 `HEAD`) 找起,`59` 這個節點需要存取或比對 7 次才能找到,你直覺可能想到[二分法](https://en.wikipedia.org/wiki/Binary_search_algorithm),但對於 linked list 來說,隨機存取節點的時間複雜度仍是 $O(n)$,也就是說,二分法無法帶來效益。 我們可在鏈結串列中,增加額外的 "skip" (作用是提供資料存取的捷徑) 節點,如下: ![](https://hackmd.io/_uploads/B1q3TEKbn.png) > "skip" 一詞在英語的意思很多,這裡取[韋伯字典](https://www.merriam-webster.com/dictionary/skip)提出的解釋: "to bound off one point after another" 如此一來,我們就可依據 "skip" 節點查詢,只需要比對 3 次即可找到上述的 `59`。至於若想搜尋 `47`,我們先根據 "skip" 節點來比對,於是在節點 `22` 上,它的 "skip" 指標會指向 `59`,後者大於 `47`,因此我們可知,`47` 可能會存在於節點 `22` 和節點 `59` 之間,這時候再根據鏈結串列順序搜尋,一共要比對 5 次。 隨著節點的增多,我們的 "skip" 鏈結串列會變成這樣: ![](https://hackmd.io/_uploads/B1Qp6NKZ2.png) "skip" 節點的密度約為普通節點的一半,能否再提高密度呢?我們可在這基礎上再加一層新的 "skip" 節點,這層節點的密度為第一層 "skip" 節點的一半。 ![](https://hackmd.io/_uploads/Sy3ppNYWn.png) 換個視角,呈現如下: ![](https://hackmd.io/_uploads/BJ_gC4t-n.png) 我們再給予新的限制:每層的 "skip" 節點都由它下一層的 "skip" 節點中產生,最終我們可得類似下圖的 "Skip List": ![](https://hackmd.io/_uploads/B1y-CNYW3.png) > Skip List 示意圖 於是,我們不再區分每層是原節點還是 "skip" 節點,而將最底下的那層節點通稱為第一層 (level 1) 節點,第一層節點上面為第二層 (level 2) 節點,再來是第三層,以此類推。假設每層的節點數為下一層的一半,於是搜尋的時間複雜度為 $O(\log n)$。 一般的 linked list 搜尋時必須從第一個開始,按照順序一個一個搜尋,因此不論是搜尋或存取的時間複雜度皆為 $O( N)$,而 [**Skip list**](https://en.wikipedia.org/wiki/Skip_list) 可將存取、搜尋、新增和刪除等操作的平均時間複雜度降到 $O(\log N)$ > [Know Thy Complexities!](https://www.bigocheatsheet.com/) skip list 是種針對 sorted linked list 的資料結構,透過不同 level 做出不同的 linked list 達到加速搜尋。 舉例來說,當我們想在上方 Skip List 示意圖中搜尋 `7`,步驟如下: 1. 從 level 4 (圖中左上方) 比對,因 `1 <= 7` 得知 level 4 不存在 `7`,但其他 level 可能具備。於是跳到 level 3 繼續找 2. 從 level 3 比對,因 `4 <= 7` 且 `6 <= 7` 得知 level 3 一樣不存在 `7`,於是跳到 level 2 繼續找 3. 從 level 2 比對,因 `9 >= 7` 表示此 level 不存在 `7`,跳去 level 1 4. 在 level 1 比對,發現 `7` 總共比對為 5 次,比直接循序查詢(需要 7 次存取) 還快。 如之前所及,skip list 是具有分層結構的鏈結串列,那麼每層的節點該如何產生呢? 我們可在新增元素時,使用隨機方法 (稱作 "coin flip",也就是「擲硬幣看正反面結果」) 決定這個元素有幾層節點。設定該元素僅有 1 層節點機率為 $\frac{1}{2}$,且有 2 層節點機率為 $\frac{1}{4}$,僅有 3 層節點機率為 $\frac{1}{8}$,以此類推。然後觸發隨機事件,當機率為 $\frac{1}{2}$ 的事件發生時,該元素有 1 層節點,機率為 $\frac{1}{2}$ 的事件發生時,該元素有 2 層節點,以此類推。另外,我們限定一個 "skip" 表格應當具有最大的層數限制。 假設一個 "skip" 表格最大層數限制為4,於是我們可設定一個整數區間為$[1, 2^{4-1}]$,即 $[1, 8]$。然後取一個介於 1 到 8 之間的隨機數,當落在 $[5, 8]$ 區間時有 1 層節點,落在 $[3, 4]$ 區間時有 2 層節點,落在 $[2, 2]$ 區間時有 3 層,落在 $[1, 1]$ 上時有 4 層。 總結 Skip List 的特徵: * 第一層包含所有的元素 * 每一層都是個有序的 linked list * skip list 是隨機產生,所以即使元素完全相同的 linked list ,最終的生成的 skip list 有可能不一樣 * 如果節點 x 出現在第 i 層,則所有比 i 小的層都包含 x 建立 skip list 的流程: 1. 最底層 (level 1) 是包含所有元素的 linked list 2. 保留第一個和最後一個元素,從其他元素中隨機選出一些元素形成新的一層 linked list 3. 為剛選出的每個元素新增指標,指向下一層和自己相等的元素 4. 重複上述 (2) 和 (3) 步驟,直到不再能選擇出除了第一個和最後一個元素以外的元素 下方動畫展示在 Skip List 新增節點: ![](https://upload.wikimedia.org/wikipedia/commons/2/2c/Skip_list_add_element-en.gif) Google 的開放原始碼 Key-Value storage (可簡稱 KV) 專案 [LevelDB](https://github.com/google/leveldb) 和 Facebook 延伸 LevelDB 而發展出的 [RocksDB](https://rocksdb.org/),都用到 Skip List 的變形。[Redis](https://redis.io/) 內建一個高效率的 Skip List 實作。 延伸閱讀: * [Skip 視覺化呈現](https://people.ok.ubc.ca/ylucet/DS/SkipList.html) * [Skip List](https://www.jianshu.com/p/9d8296562806) * [LevelDB 研究](https://hackmd.io/@grb72t3yde/sysprog_project_levelDB) 論文〈[Cache-Sensitive Skip List: Efficient Range Queries on modern CPUs](https://www2.informatik.hu-berlin.de/~sprengsz/papers/cssl.pdf)〉 ![](https://hackmd.io/_uploads/H1Ig1BYWn.png) > Figure 1 Skip list 由多個不同層級的鍵值通道 (Lane) 組成,如圖 Figure 1。 在第 Level i 的通道中,平均具有 $n \times p^i$ 個元素,其中 n 為元素總數且 $0<p<1$ ,所以 Skip list 是一個透過機率所決定出來的資料結構,雖然可以很有效地更新資料,但對於確切的結構仍是不可預測的。 論文採用 deterministic variant 的 skip list [perfectly balanced skip lists](https://dl.acm.org/doi/pdf/10.5555/139404.139478) 作為骨架,balanced skip list 的第 Level i+1 通道包含 Level i 通道的每 1/p 個元素,如 Figure 1 所示,當 p = 0.5 時, Level 1 的通道包含了 Level 0 的每 $\frac{1}{0.5} = 2$ 個元素 (1, 3, 5, 7, 9)、Level 2 的通道包含了 Level 1 的每 2 個元素 (1, 5, 9)。 當 p 值較小時,通道可視為稀疏的,可一次跳過許多元素,而在 p 值較大時,通道是密集的,需要在通道中比較多次的元素。 通道的功用主要用來縮少搜尋的範圍,例如在 Figure 1 搜尋元素 6 ,從最高 Leve 2 開始搜尋 6 ,當遇到比 6 還大的值就停止,往節點的下個 Level 繼續搜尋,因此找到元素 9 比 6 大,接著就從 5 往 Level 1 搜尋,遇到 7 停止,從 5 往 Level 0 搜尋,找到 6 。 在 p = 0.5 的 balanced skip list 之最壞情況,搜尋的複雜度需要 $O(\log n)$。 原始論文〈[Skip lists: a probabilistic alternative to balanced trees](https://15721.courses.cs.cmu.edu/spring2018/papers/08-oltpindexes1/pugh-skiplists-cacm1990.pdf)〉藉由 fat keys 來實作,fat keys 包含指向每個通道下個節點的指標以及數組資料,所有的節點皆是一致的,因此方便實作,但透過這樣的實作方式,會使得指向高 Level 下個節點的指標在大多情形都是指向 NULL ,且至少需要 $O(m \times n)$ 的指標空間,其中 m 為通道的總數,故具有較差的空間使用率。 而指標所指向的記憶體空間在大多的情況下為不連續,若採用上述的實作方式實作 skip list,藉由大量的指標操作來進行搜尋,勢必會遇到許多的 cache misses,不利於現代微處理器。 例如,在一個 64-bit architecture 上使用 32-bit 整數鍵值以及具有 5 個通道數的 skip list,每個節點需要佔 4 bytes + (5+1) * 8 bytes = 52 bytes 的空間大小,假設一個 cache line 為 64 bytes,而在遍歷 skip list 的節點時,會幾乎佔用了所有的 cache line 空間 (52 bytes),因此大部分情況下都會發生 cache misses ,但其實僅需 4 bytes + 8 bytes = 12 bytes (數組+指向下個節點的指標) 的空間來存放即可。 **Cache-Sensitive Skip List** ![](https://hackmd.io/_uploads/SJW-kBtb3.png) > Figure 2 為了能充分利用 cache,最直覺的想法就是將**通道改為陣列表示**,如此一來具有 spatial locality 的特性可減少 cache misses ,也可允許 SIMD 指令的使用來加速操作。 Figure 2 包含兩個通道 (Linearized Fast Lane Array) 用來管理 32 筆整數資料 (Data List),其中 p = 0.5,例如搜尋 7 時的走訪路徑如紅色指示所示,而 CSSL 是基於 balanced skip list 實作,故 Level i 的每 1/0.5 = 2 個元素為 Level i+1 的元素,因此**通道的元素數量是已知的**,也只需透過簡單計算得出元素後續的位置。 此篇論文的實作是先假設最大鍵值 t 並根據 t 來預先分配一定的記憶體空間,當遇到插入的鍵值 n 大於 t 時,就得重新建立通道 (詳見更新策略)。 和傳統的 skip list 相比,CSSL 僅需要更少的空間配置,假設鍵值的大小為 k 、指標大小為 r ,傳統 skip list 需要 $n \times (m \times r + r + k)$ 的空間,而 CSSL 只須 $n \times (r + k) + \sum^{m}_{i=1}{p^i \times n \times k}$,其中 n 為資料量、m 為通道數。 **Optimizations** ![](https://hackmd.io/_uploads/HJH7JHYbn.png) > Figure 3 進一步改進如下 1. 將通道大小調整為 cache line 大小的倍數,如 Figure 3。 2. 在最低 level 的通道與 data list 之間引入了一個附加的代理通道 (Proxy Lane Array),如 Figure 2,用來存放指向 data 的指標。 3. 於最高 level 的通道上使用二元搜尋法,這是因為當通道數少的時候,最高 level 的通道所含的元素遠超過 1/p ,使得 cpu 在搜索最高 level 通道的成本往往非常高。 **Updates** 為了能夠支援即時更新,進行以下調整: 1. Inserting keys: 由於通道使用陣列來管理,因此插入新值的時候,需要使用大量的位移操作來使元素保持有序,所以該實作**僅直接將新值插入到 data list (common linked list) 的適當位置,直到 skip list 需要重新建立時再為其分配通道陣列空間**,可透過 [atomic compare-and-swap instruction](https://en.wikipedia.org/wiki/Compare-and-swap) 來實作 latch-free 的插入演算法。 2. Deleting keys: 與插入相反,需先從通道陣列中刪除目標值,但如果將目標值改成 NULL 則需要重新排序通道陣列,因此改用**與目標值相鄰的元素來取代目標值當作刪除**,直到重建時才將重複的值刪除,而在 data list 中將指向目標節點的節點改為指向目標節點的後繼節點。 3. Updating keys 以下是個精簡的 skip list 實作,參考執行輸出如下: ``` 0 search 0 1 search 1 2 search 2 3 search 3 4 search 4 5 search 5 6 search 6 7 search 7 8 search 8 9 search 9 [sl_erase(list, i)]: return 0, index 10000, size 1 ``` 程式碼列表如下: (部分遮蔽) ```c /* total number of node is 2^32 * the level here is log2(n), which is log2(2^32) = 32 */ #define SL_MAXLEVEL 32 struct sl_link { struct sl_link *prev, *next; }; struct sl_list { int size; int level; struct sl_link head[SL_MAXLEVEL]; }; struct sl_list *sl_list_alloc(void); void sl_delete(struct sl_list *list); void *sl_search(struct sl_list *list, int key); int sl_insert(struct sl_list *list, int key, void *val); int sl_erase(struct sl_list *list, int key); #include <errno.h> #include <stddef.h> #include <stdint.h> #include <stdlib.h> #include <time.h> #define compiler_barrier() asm volatile("" : : : "memory") struct sl_node { int key; void *val; struct sl_link link[0]; }; static inline void list_init(struct sl_link *node) { node->next = node; compiler_barrier(); node->prev = node; } static inline void __list_add(struct sl_link *new, struct sl_link *prev, struct sl_link *next) { next->prev = new; new->next = next; new->prev = prev; compiler_barrier(); prev->next = new; } static inline void list_add(struct sl_link *new, struct sl_link *prev) { __list_add(new, prev, prev->next); } static inline void __list_del(struct sl_link *prev, struct sl_link *next) { next->prev = prev; compiler_barrier(); prev->next = next; } static inline void list_del(struct sl_link *node) { __list_del(node->prev, node->next); list_init(node); } #define list_for_each_from(pos, head) for (; pos != head; pos = pos->next) #define list_for_each_safe_from(pos, n, head) \ for (n = pos->next; pos != head; pos = n, n = pos->next) #define container_of(ptr, type, member) \ __extension__({ \ const __typeof__(((type *) 0)->member) *__mptr = (ptr); \ (type *) ((char *) __mptr - offsetof(type, member)); \ }) #define list_entry(ptr, i) container_of(ptr, struct sl_node, link[i]) /* the probability set at 1/2 */ static inline void set_random(void) { time_t current_time; srandom(time(&current_time)); } static inline int random_level(void) { int level = 0; uint32_t random_seed = (uint32_t) random(); while (random_seed && (random_seed & HHHH)) { random_seed >>= 1; level++; } return level >= SL_MAXLEVEL ? SL_MAXLEVEL - 1 : level; } static struct sl_node *sl_node_alloc(int key, void *val, int level) { struct sl_node *node = malloc(sizeof(struct sl_node) + (level + 1) * sizeof(struct sl_link)); if (!node) return NULL; node->key = key, node->val = val; return node; } struct sl_list *sl_list_alloc(void) { struct sl_list *list = malloc(sizeof(struct sl_list)); if (!list) return NULL; list->level = 0; list->size = 0; set_random(); for (int i = 0; i < SL_MAXLEVEL; i++) list_init(&list->head[i]); return list; } void sl_delete(struct sl_list *list) { struct sl_link *n, *pos = list->head[0].next; list_for_each_safe_from (pos, n, &list->head[0]) { struct sl_node *node = list_entry(pos, 0); free(node); } free(list); } void *sl_search(struct sl_list *list, int key) { int i = list->level; struct sl_link *pos = &list->head[i]; struct sl_link *head = &list->head[i]; for (; i >= 0; i--) { pos = pos->next; list_for_each_from (pos, head) { struct sl_node *node = list_entry(pos, i); if (node->key > key) break; else if (node->key == key) return node->val; } pos = pos->prev; pos--; head--; } return NULL; } int sl_insert(struct sl_list *list, int key, void *val) { int i, level = random_level(); struct sl_link *pos = &list->head[level]; struct sl_link *head = &list->head[level]; struct sl_node *new = sl_node_alloc(key, val, level); if (!new) return -ENOMEM; if (level > list->level) list->level = level; for (i = level; i >= 0; i--) { pos = pos->next; list_for_each_from (pos, head) { struct sl_node *tmp = list_entry(pos, i); if (tmp->key > key) { break; } else if (tmp->key == key) goto failed; } pos = pos->prev; IIII; pos--; head--; } list->size++; return 0; failed: free(new); return -EEXIST; } int sl_erase(struct sl_list *list, int key) { int i = list->level; struct sl_link *pos = &list->head[i]; struct sl_link *n, *head = &list->head[i]; for (; i >= 0; i--) { pos = pos->next; JJJJ { struct sl_node *tmp = list_entry(pos, i); if (tmp->key == key) { for (; i >= 0; i--) { list_del(pos--); if (list->head[i].next == &list->head[i]) list->level--; } free(tmp); list->size--; return 0; } else if (tmp->key > key) break; } pos = pos->prev; pos--; head--; } return -EINVAL; } #include <assert.h> #include <stdio.h> #define test(ops, index, size) \ printf("[" #ops \ "]: " \ "return %d, " \ "index %d, " \ "size %d\n", \ (int) ops, index, size) #define times 10000 int main(int argc, char *argv[]) { struct sl_list *list = sl_list_alloc(); int i, arr[10] = {0}; for (i = 0; i < times; i++) assert(sl_insert(list, i, &arr[i]) == 0); test(sl_insert(list, i, NULL), i, list->size); for (i = 0; i < 10; i++) { arr[i] = i; printf("%d search %d\n", i, *(int *) sl_search(list, i)); } for (i = 0; i < times; i++) assert(sl_erase(list, i) == 0); test(sl_erase(list, i), i, list->size); sl_delete(list); return 0; } ``` 請補完程式碼,使其運作符合預期。作答規範: * 以最精簡的形式撰寫 * `HHHH` 為正整數 * `IIII` 和 `JJJJ` 為合法的 C 敘述,均以 `list_` 開頭 * 符合 lab0-c 指定的程式碼排版風格 :::success 延伸問題: 1. 解釋上述程式碼運作原理,指出設計缺失並改進 2. 研讀〈[A kernel skiplist implementation](https://lwn.net/Articles/551896/)〉和〈[Skiplists II: API and benchmarks](https://lwn.net/Articles/553047/)〉,探討 Linux 核心引入的 skip list 的考量,找出現行 Linux 核心的應用案例 3. 分析 [Concurrent-Skip-list](https://github.com/shreyas-gopalakrishna/Concurrent-Skip-list) 並以 C11 Atomics 重新實作,搭配閱讀〈[Concurrent Skiplists](https://spcl.inf.ethz.ch/Teaching/2013-dphpc/final/4.pdf)〉,實作更高效的程式碼。 ::: --- ### 測驗 `3` 〈[並行程式設計: Hazard pointer](https://hackmd.io/@sysprog/concurrency-hazard-pointer)〉提到: > 在並行程式設計中,當我們在存取共用的記憶體物件時,需要考慮到其他執行緒是否有可能也正在存取同一個物件,若要釋放該記憶體物件時,不考慮這個問題,會引發嚴重的後果,例如 dangling pointer。 > > 使用 mutex 是最簡單且直觀的方法:存取共用記憶體時,acquire lock 即可保證沒有其他執行緒正在存取同一物件,也就可安全地釋放記憶體。但若我們正在存取的是一種 lock-free 資料結構,當然就不能恣意地使用 lock,因為會違反 lock-free 特性,即無論任何執行緒失敗,其他執行緒都能可繼續執行。於是乎,我們需要某種同為 lock-free 的記憶體物件回收機制。 `lfq` 嘗試實作精簡的並行佇列 (concurrent queue),運用 hazard pointer 來釋放並行處理過程中的記憶體,程式碼可見: [lfq](https://gist.github.com/jserv/81b0a90506d584c5c9be17122bcf114c) (部分遮蔽),其中 `atomics.h` 如下: ```c #pragma once /* TODO: rewrite in C11 Atomics */ #if !defined(__x86_64__) || defined(__i386) #error "This implementation is specific to x86(-64)" #endif #include <stdbool.h> #include <stdlib.h> #include <string.h> #ifndef asm #define asm __asm #endif #define cmpxchg(ptr, _old, _new) \ { \ volatile uint32_t *__ptr = (volatile uint32_t *) (ptr); \ uint32_t __ret; \ asm volatile("lock; cmpxchgl %2,%1" \ : "=a"(__ret), "+m"(*__ptr) \ : "r"(_new), "0"(_old) \ : "memory"); \ ); \ __ret; \ } #define ATOMIC_SET __sync_lock_test_and_set #define ATOMIC_RELEASE __sync_lock_release #define ATOMIC_SUB __sync_sub_and_fetch #define ATOMIC_SUB64 ATOMIC_SUB #define CAS __sync_bool_compare_and_swap /* The 2nd argument is limited to 1 on machines with TAS but not XCHG. * On x86 it is an arbitrary value. */ #define XCHG __sync_lock_test_and_set #define ATOMIC_ADD __sync_add_and_fetch #define ATOMIC_ADD64 ATOMIC_ADD #define mb __sync_synchronize /* compiler barrier only: runtime reordering is impossible on x86 */ #define lmb() asm volatile("" ::: "memory") #define smb() asm volatile("" ::: "memory") ``` 顯然上述實作針對 x86(-64),以 `__sync` 開頭的內建函式為 GCC [Legacy __sync Built-in Functions for Atomic Memory Access](https://gcc.gnu.org/onlinedocs/gcc/_005f_005fsync-Builtins.html) 編譯方式: ``` gcc -std=gnu99 -O3 -Wall -Wextra -o test_p1c1 -D MAX_PRODUCER=1 -D MAX_CONSUMER=1 lfq.c test.c -lpthread ``` 這個實作應該要能夠支援以下組態: * ` -D MAX_PRODUCER=4 -D MAX_CONSUMER=4` * `-D MAX_PRODUCER=100 -D MAX_CONSUMER=10` * `-D MAX_PRODUCER=10 -D MAX_CONSUMER=100` 預期輸出: ``` Producer thread [140583962404608] exited! Still 0 running... Consumer thread [140583970797312] exited 0 Total push 500000 elements, pop 500000 elements. freelist=1, clean = 0 Test PASS!! ``` 補完程式碼,使其運作符合預期。作答規範: * `AAAA`, `BBBB`, `CCCC` 均包含 `CAS` 巨集,且為合法表示式 延伸閱讀: * [Experiments on Concurrent Queue](https://hackmd.io/@butastur/concurrent-queue) * [concurrent-ll (2022)](https://hackmd.io/@kdnvt/concurrent-ll-2022) :::success 延伸問題: 1. 解釋上述程式碼運作原理 2. 用 C11 Atomics 改寫,使得能夠支援 x86(-64) 以外的處理器架構 3. 遞交 pull request,以上述程式碼的改進版本來取代 [lf-queue](https://github.com/sysprog21/concurrent-programs/tree/master/lf-queue) :::