---
tags: linux2023
---
# [2023q1](http://wiki.csie.ncku.edu.tw/linux/schedule) 第 8 週測驗題
:::info
目的: 檢驗學員對並行程式設計、記憶體管理的認知
:::
==[作答表單: 測驗 1-2](https://docs.google.com/forms/d/e/1FAIpQLScjpNZdD1hS4-b0crW98jCAFCuDvydyBNxLHziNG493VbBJKA/viewform)== (針對 Linux 核心「設計」課程)
==[作答表單: 測驗 3](https://docs.google.com/forms/d/e/1FAIpQLSfXQdqv5N7qsnLfbIOo9grToG1Jq3GEw3dWqsQkQPiSw-264g/viewform)== (針對 Linux 核心「實作」課程)
### 測驗 `1`
〈[並行和多執行緒程式設計](https://hackmd.io/@sysprog/concurrency)〉和〈[Linux 核心模組運作原理](https://hackmd.io/@sysprog/linux-kernel-module)〉提及 [reference counting](https://en.wikipedia.org/wiki/Reference_counting),以下程式碼嘗試以 C11 Atomics 實作,參考執行輸出:
```
hread 1632626432, 0: Hello, world!
Thread 1632626432, 1: Hello, world!
Thread 1632626432, 2: Hello, world!
Thread 1632626432, 3: Hello, world!
...
Thread 1103886080, 97: Hello, world!
Thread 1103886080, 98: Hello, world!
Thread 1103886080, 99: Hello, world!
```
其中 `1103886080` 是執行緒的編號 (thread ID)。一旦 `REFCNT_TRACE` 事先定義,程式輸出如下:
```
...
main.c:141:(test_thread) refcnt_t_unref(str2)main.c:138:(test_thread) refcnt_t_ref(str)Thread 1174378240, 97: Hello, world!
main.c:141:(test_thread) refcnt_t_unref(str2)main.c:138:(test_thread) refcnt_t_ref(str)Thread 1174378240, 98: Hello, world!
main.c:141:(test_thread) refcnt_t_unref(str2)main.c:138:(test_thread) refcnt_t_ref(str)Thread 1174378240, 99: Hello, world!
```
原始程式碼如下 (檔名: `main.c`,部分遮蔽)
```c
/*
* Generic reference counting in C.
*
* Use refcnt_malloc/refcnt_unref in instead of malloc/free. Use refcnt_ref to
* duplicate a reference as necessary. Use refcnt unref to stop using a pointer.
*
* The resulting string must be released using refcnt_unref since refcnt_strdup
* function utilizes refcnt_malloc.
*
* This implementation is thread-safe with the exception of refcnt_realloc. If
* you need to use refcnt_realloc in a multi-threaded environment, you must
* synchronize access to the reference.
*
* If you define REFCNT_CHECK, references passed into refcnt_ref and and
* refcnt_unref will be checked that they were created by refcnt_malloc. This is
* useful for debugging, but it will slow down your program somewhat.
*
* If you define REFCNT_TRACE, refcnt_malloc and refcnt_unref will print
* the line number and file name where they were called. This is useful for
* debugging memory leaks or use after free errors.
*/
#include <assert.h>
#include <stdatomic.h>
#include <stddef.h>
#include <stdlib.h>
#include <string.h>
#define REFCNT_CHECK
#define maybe_unused __attribute__((unused))
typedef struct {
#ifdef REFCNT_CHECK
int magic;
#endif
atomic_uint refcount;
char data[];
} refcnt_t;
#ifdef REFCNT_TRACE
#define _REFCNT_TRACE(call) \
({ \
fprintf(stderr, "%s:%d:(%s) %s", __FILE__, __LINE__, __FUNCTION__, \
#call); \
call; \
})
#define refcnt_malloc refcnt_t_malloc
#define refcnt_realloc refcnt_t_realloc
#define refcnt_ref refcnt_t_ref
#define refcnt_unref refcnt_t_unref
#define refcnt_strdup refcnt_t_strdup
#endif
#define REFCNT_MAGIC 0xDEADBEEF
static maybe_unused void *refcnt_malloc(size_t len)
{
refcnt_t *ref = malloc(sizeof(refcnt_t) + len);
if (!ref)
return NULL;
#ifdef REFCNT_CHECK
ref->magic = REFCNT_MAGIC;
#endif
atomic_init(&ref->refcount, 1);
return ref->data;
}
static maybe_unused void *refcnt_realloc(void *ptr, size_t len)
{
refcnt_t *ref = (void *) (ptr - offsetof(AAAA, BBBB));
#ifdef REFCNT_CHECK
assert(ref->magic == REFCNT_MAGIC);
#endif
ref = realloc(ref, sizeof(refcnt_t) + len);
if (!ref)
return NULL;
return ref->data;
}
static maybe_unused void *refcnt_ref(void *ptr)
{
refcnt_t *ref = (void *) (ptr - offsetof(refcnt_t, data));
#ifdef REFCNT_CHECK
assert(ref->magic == REFCNT_MAGIC && "Invalid refcnt pointer");
#endif
CCCC(&ref->refcount, 1);
return ref->data;
}
static maybe_unused void refcnt_unref(void *ptr)
{
refcnt_t *ref = (void *) (ptr - offsetof(DDDD, EEEE));
#ifdef REFCNT_CHECK
assert(ref->magic == REFCNT_MAGIC && "Invalid refcnt pointer");
#endif
if (FFFF(&ref->refcount, 1) == 1)
free(ref);
}
static maybe_unused char *refcnt_strdup(char *str)
{
refcnt_t *ref = malloc(sizeof(refcnt_t) + strlen(str) + 1);
if (!ref)
return NULL;
#ifdef REFCNT_CHECK
ref->magic = REFCNT_MAGIC;
#endif
atomic_init(&ref->refcount, 1);
strcpy(ref->data, str);
return ref->data;
}
#ifdef REFCNT_TRACE
#undef refcnt_malloc
#undef refcnt_realloc
#undef refcnt_ref
#undef refcnt_unref
#undef refcnt_strdup
#define refcnt_malloc(len) _REFCNT_TRACE(refcnt_t_malloc(len))
#define refcnt_realloc(ptr, len) _REFCNT_TRACE(refcnt_t_realloc(ptr, len))
#define refcnt_ref(ptr) _REFCNT_TRACE(refcnt_t_ref(ptr))
#define refcnt_unref(ptr) _REFCNT_TRACE(refcnt_t_unref(ptr))
#define refcnt_strdup(ptr) _REFCNT_TRACE(refcnt_t_strdup(ptr))
#endif
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#define N_ITERATIONS 100
static void *test_thread(void *arg)
{
char *str = arg;
for (int i = 0; i < N_ITERATIONS; i++) {
char *str2 = refcnt_ref(str);
fprintf(stderr, "Thread %u, %i: %s\n", (unsigned int) GGGG, i,
str2);
refcnt_unref(str2);
}
refcnt_unref(str);
return NULL;
}
#define N_THREADS 64
int main(int argc, char **argv)
{
/* Create threads */
pthread_t threads[N_THREADS];
/* Create a new string that is count referenced */
char *str = refcnt_strdup("Hello, world!");
/* Start the threads, passing a new counted copy of the referece */
for (int i = 0; i < N_THREADS; i++)
pthread_create(&threads[i], NULL, test_thread, refcnt_ref(str));
/* We no longer own the reference */
refcnt_unref(str);
/* Whichever thread finishes last will free the string */
for (int i = 0; i < N_THREADS; i++)
pthread_join(threads[i], NULL);
void *ptr = malloc(100);
/* This should cause a heap overflow while checking the magic num which the
* sanitizer checks.
* Leaving commented out for now
*/
// refcnt_ref(ptr);
free(ptr);
return 0;
}
```
編譯方式如下:
```shell
gcc -o main -Wall -O2 -fsanitize=thread main.c -lpthread
```
請補完程式碼,使其運作符合預期,且不會觸發 Thread Sanitizer 在執行時期的各項錯誤或警告。作答規範:
* 以最精簡的程式碼實作,均不包含空白字元
* `AAAA`, `BBBB`, `DDDD`, `EEEE` 均為 identifier
* `CCCC` 和 `FFFF` 是 C11 atomics 規範的函式,均以 `atomic_fetch_` 開頭
* `GGGG` 是 POSIX Thread 規範的函式
:::success
延伸問題:
1. 解釋上述程式碼運作原理,指出其實作缺失並改進
2. 研讀〈[Linux 核心模組運作原理](https://hackmd.io/@sysprog/linux-kernel-module)〉並在 Linux 核心原始程式碼中找出相關 [reference counting](https://en.wikipedia.org/wiki/Reference_counting) 的程式碼,予以解讀和分析
3. 解釋為何 Linux 核心的同步機制不依賴 [reference counting](https://en.wikipedia.org/wiki/Reference_counting) (提示: 參閱 RCU 的設計理念)
:::
---
### 測驗 `2`
Linked list (鏈結串列) 允許時間複雜度為 $O(1)$ 的隨機插入 (insert) 及隨機移去 (remove) 節點,但搜尋卻需要 $O(n)$ 時間複雜度。假設一個鏈結串列內容如下:
$$
HEAD \to 1 \to 10 \to 18 \to 38 \to 22 \to 39 \to 47 \to 59 \to next
$$
從開頭 (即上方 `HEAD`) 找起,`59` 這個節點需要存取或比對 7 次才能找到,你直覺可能想到[二分法](https://en.wikipedia.org/wiki/Binary_search_algorithm),但對於 linked list 來說,隨機存取節點的時間複雜度仍是 $O(n)$,也就是說,二分法無法帶來效益。
我們可在鏈結串列中,增加額外的 "skip" (作用是提供資料存取的捷徑) 節點,如下:
![](https://hackmd.io/_uploads/B1q3TEKbn.png)
> "skip" 一詞在英語的意思很多,這裡取[韋伯字典](https://www.merriam-webster.com/dictionary/skip)提出的解釋: "to bound off one point after another"
如此一來,我們就可依據 "skip" 節點查詢,只需要比對 3 次即可找到上述的 `59`。至於若想搜尋 `47`,我們先根據 "skip" 節點來比對,於是在節點 `22` 上,它的 "skip" 指標會指向 `59`,後者大於 `47`,因此我們可知,`47` 可能會存在於節點 `22` 和節點 `59` 之間,這時候再根據鏈結串列順序搜尋,一共要比對 5 次。
隨著節點的增多,我們的 "skip" 鏈結串列會變成這樣:
![](https://hackmd.io/_uploads/B1Qp6NKZ2.png)
"skip" 節點的密度約為普通節點的一半,能否再提高密度呢?我們可在這基礎上再加一層新的 "skip" 節點,這層節點的密度為第一層 "skip" 節點的一半。
![](https://hackmd.io/_uploads/Sy3ppNYWn.png)
換個視角,呈現如下:
![](https://hackmd.io/_uploads/BJ_gC4t-n.png)
我們再給予新的限制:每層的 "skip" 節點都由它下一層的 "skip" 節點中產生,最終我們可得類似下圖的 "Skip List":
![](https://hackmd.io/_uploads/B1y-CNYW3.png)
> Skip List 示意圖
於是,我們不再區分每層是原節點還是 "skip" 節點,而將最底下的那層節點通稱為第一層 (level 1) 節點,第一層節點上面為第二層 (level 2) 節點,再來是第三層,以此類推。假設每層的節點數為下一層的一半,於是搜尋的時間複雜度為 $O(\log n)$。
一般的 linked list 搜尋時必須從第一個開始,按照順序一個一個搜尋,因此不論是搜尋或存取的時間複雜度皆為 $O( N)$,而 [**Skip list**](https://en.wikipedia.org/wiki/Skip_list) 可將存取、搜尋、新增和刪除等操作的平均時間複雜度降到 $O(\log N)$
> [Know Thy Complexities!](https://www.bigocheatsheet.com/)
skip list 是種針對 sorted linked list 的資料結構,透過不同 level 做出不同的 linked list 達到加速搜尋。
舉例來說,當我們想在上方 Skip List 示意圖中搜尋 `7`,步驟如下:
1. 從 level 4 (圖中左上方) 比對,因 `1 <= 7` 得知 level 4 不存在 `7`,但其他 level 可能具備。於是跳到 level 3 繼續找
2. 從 level 3 比對,因 `4 <= 7` 且 `6 <= 7` 得知 level 3 一樣不存在 `7`,於是跳到 level 2 繼續找
3. 從 level 2 比對,因 `9 >= 7` 表示此 level 不存在 `7`,跳去 level 1
4. 在 level 1 比對,發現 `7`
總共比對為 5 次,比直接循序查詢(需要 7 次存取) 還快。
如之前所及,skip list 是具有分層結構的鏈結串列,那麼每層的節點該如何產生呢?
我們可在新增元素時,使用隨機方法 (稱作 "coin flip",也就是「擲硬幣看正反面結果」) 決定這個元素有幾層節點。設定該元素僅有 1 層節點機率為 $\frac{1}{2}$,且有 2 層節點機率為 $\frac{1}{4}$,僅有 3 層節點機率為 $\frac{1}{8}$,以此類推。然後觸發隨機事件,當機率為 $\frac{1}{2}$ 的事件發生時,該元素有 1 層節點,機率為 $\frac{1}{2}$ 的事件發生時,該元素有 2 層節點,以此類推。另外,我們限定一個 "skip" 表格應當具有最大的層數限制。
假設一個 "skip" 表格最大層數限制為4,於是我們可設定一個整數區間為$[1, 2^{4-1}]$,即 $[1, 8]$。然後取一個介於 1 到 8 之間的隨機數,當落在 $[5, 8]$ 區間時有 1 層節點,落在 $[3, 4]$ 區間時有 2 層節點,落在 $[2, 2]$ 區間時有 3 層,落在 $[1, 1]$ 上時有 4 層。
總結 Skip List 的特徵:
* 第一層包含所有的元素
* 每一層都是個有序的 linked list
* skip list 是隨機產生,所以即使元素完全相同的 linked list ,最終的生成的 skip list 有可能不一樣
* 如果節點 x 出現在第 i 層,則所有比 i 小的層都包含 x
建立 skip list 的流程:
1. 最底層 (level 1) 是包含所有元素的 linked list
2. 保留第一個和最後一個元素,從其他元素中隨機選出一些元素形成新的一層 linked list
3. 為剛選出的每個元素新增指標,指向下一層和自己相等的元素
4. 重複上述 (2) 和 (3) 步驟,直到不再能選擇出除了第一個和最後一個元素以外的元素
下方動畫展示在 Skip List 新增節點:
![](https://upload.wikimedia.org/wikipedia/commons/2/2c/Skip_list_add_element-en.gif)
Google 的開放原始碼 Key-Value storage (可簡稱 KV) 專案 [LevelDB](https://github.com/google/leveldb) 和 Facebook 延伸 LevelDB 而發展出的 [RocksDB](https://rocksdb.org/),都用到 Skip List 的變形。[Redis](https://redis.io/) 內建一個高效率的 Skip List 實作。
延伸閱讀:
* [Skip 視覺化呈現](https://people.ok.ubc.ca/ylucet/DS/SkipList.html)
* [Skip List](https://www.jianshu.com/p/9d8296562806)
* [LevelDB 研究](https://hackmd.io/@grb72t3yde/sysprog_project_levelDB)
論文〈[Cache-Sensitive Skip List: Efficient Range Queries on modern CPUs](https://www2.informatik.hu-berlin.de/~sprengsz/papers/cssl.pdf)〉
![](https://hackmd.io/_uploads/H1Ig1BYWn.png)
> Figure 1
Skip list 由多個不同層級的鍵值通道 (Lane) 組成,如圖 Figure 1。
在第 Level i 的通道中,平均具有 $n \times p^i$ 個元素,其中 n 為元素總數且 $0<p<1$ ,所以 Skip list 是一個透過機率所決定出來的資料結構,雖然可以很有效地更新資料,但對於確切的結構仍是不可預測的。
論文採用 deterministic variant 的 skip list [perfectly balanced skip lists](https://dl.acm.org/doi/pdf/10.5555/139404.139478) 作為骨架,balanced skip list 的第 Level i+1 通道包含 Level i 通道的每 1/p 個元素,如 Figure 1 所示,當 p = 0.5 時, Level 1 的通道包含了 Level 0 的每 $\frac{1}{0.5} = 2$ 個元素 (1, 3, 5, 7, 9)、Level 2 的通道包含了 Level 1 的每 2 個元素 (1, 5, 9)。
當 p 值較小時,通道可視為稀疏的,可一次跳過許多元素,而在 p 值較大時,通道是密集的,需要在通道中比較多次的元素。
通道的功用主要用來縮少搜尋的範圍,例如在 Figure 1 搜尋元素 6 ,從最高 Leve 2 開始搜尋 6 ,當遇到比 6 還大的值就停止,往節點的下個 Level 繼續搜尋,因此找到元素 9 比 6 大,接著就從 5 往 Level 1 搜尋,遇到 7 停止,從 5 往 Level 0 搜尋,找到 6 。
在 p = 0.5 的 balanced skip list 之最壞情況,搜尋的複雜度需要 $O(\log n)$。
原始論文〈[Skip lists: a probabilistic alternative to balanced trees](https://15721.courses.cs.cmu.edu/spring2018/papers/08-oltpindexes1/pugh-skiplists-cacm1990.pdf)〉藉由 fat keys 來實作,fat keys 包含指向每個通道下個節點的指標以及數組資料,所有的節點皆是一致的,因此方便實作,但透過這樣的實作方式,會使得指向高 Level 下個節點的指標在大多情形都是指向 NULL ,且至少需要 $O(m \times n)$ 的指標空間,其中 m 為通道的總數,故具有較差的空間使用率。
而指標所指向的記憶體空間在大多的情況下為不連續,若採用上述的實作方式實作 skip list,藉由大量的指標操作來進行搜尋,勢必會遇到許多的 cache misses,不利於現代微處理器。
例如,在一個 64-bit architecture 上使用 32-bit 整數鍵值以及具有 5 個通道數的 skip list,每個節點需要佔 4 bytes + (5+1) * 8 bytes = 52 bytes 的空間大小,假設一個 cache line 為 64 bytes,而在遍歷 skip list 的節點時,會幾乎佔用了所有的 cache line 空間 (52 bytes),因此大部分情況下都會發生 cache misses ,但其實僅需 4 bytes + 8 bytes = 12 bytes (數組+指向下個節點的指標) 的空間來存放即可。
**Cache-Sensitive Skip List**
![](https://hackmd.io/_uploads/SJW-kBtb3.png)
> Figure 2
為了能充分利用 cache,最直覺的想法就是將**通道改為陣列表示**,如此一來具有 spatial locality 的特性可減少 cache misses ,也可允許 SIMD 指令的使用來加速操作。
Figure 2 包含兩個通道 (Linearized Fast Lane Array) 用來管理 32 筆整數資料 (Data List),其中 p = 0.5,例如搜尋 7 時的走訪路徑如紅色指示所示,而 CSSL 是基於 balanced skip list 實作,故 Level i 的每 1/0.5 = 2 個元素為 Level i+1 的元素,因此**通道的元素數量是已知的**,也只需透過簡單計算得出元素後續的位置。
此篇論文的實作是先假設最大鍵值 t 並根據 t 來預先分配一定的記憶體空間,當遇到插入的鍵值 n 大於 t 時,就得重新建立通道 (詳見更新策略)。
和傳統的 skip list 相比,CSSL 僅需要更少的空間配置,假設鍵值的大小為 k 、指標大小為 r ,傳統 skip list 需要 $n \times (m \times r + r + k)$ 的空間,而 CSSL 只須 $n \times (r + k) + \sum^{m}_{i=1}{p^i \times n \times k}$,其中 n 為資料量、m 為通道數。
**Optimizations**
![](https://hackmd.io/_uploads/HJH7JHYbn.png)
> Figure 3
進一步改進如下
1. 將通道大小調整為 cache line 大小的倍數,如 Figure 3。
2. 在最低 level 的通道與 data list 之間引入了一個附加的代理通道 (Proxy Lane Array),如 Figure 2,用來存放指向 data 的指標。
3. 於最高 level 的通道上使用二元搜尋法,這是因為當通道數少的時候,最高 level 的通道所含的元素遠超過 1/p ,使得 cpu 在搜索最高 level 通道的成本往往非常高。
**Updates**
為了能夠支援即時更新,進行以下調整:
1. Inserting keys:
由於通道使用陣列來管理,因此插入新值的時候,需要使用大量的位移操作來使元素保持有序,所以該實作**僅直接將新值插入到 data list (common linked list) 的適當位置,直到 skip list 需要重新建立時再為其分配通道陣列空間**,可透過 [atomic compare-and-swap instruction](https://en.wikipedia.org/wiki/Compare-and-swap) 來實作 latch-free 的插入演算法。
2. Deleting keys:
與插入相反,需先從通道陣列中刪除目標值,但如果將目標值改成 NULL 則需要重新排序通道陣列,因此改用**與目標值相鄰的元素來取代目標值當作刪除**,直到重建時才將重複的值刪除,而在 data list 中將指向目標節點的節點改為指向目標節點的後繼節點。
3. Updating keys
以下是個精簡的 skip list 實作,參考執行輸出如下:
```
0 search 0
1 search 1
2 search 2
3 search 3
4 search 4
5 search 5
6 search 6
7 search 7
8 search 8
9 search 9
[sl_erase(list, i)]: return 0, index 10000, size 1
```
程式碼列表如下: (部分遮蔽)
```c
/* total number of node is 2^32
* the level here is log2(n), which is log2(2^32) = 32
*/
#define SL_MAXLEVEL 32
struct sl_link {
struct sl_link *prev, *next;
};
struct sl_list {
int size;
int level;
struct sl_link head[SL_MAXLEVEL];
};
struct sl_list *sl_list_alloc(void);
void sl_delete(struct sl_list *list);
void *sl_search(struct sl_list *list, int key);
int sl_insert(struct sl_list *list, int key, void *val);
int sl_erase(struct sl_list *list, int key);
#include <errno.h>
#include <stddef.h>
#include <stdint.h>
#include <stdlib.h>
#include <time.h>
#define compiler_barrier() asm volatile("" : : : "memory")
struct sl_node {
int key;
void *val;
struct sl_link link[0];
};
static inline void list_init(struct sl_link *node)
{
node->next = node;
compiler_barrier();
node->prev = node;
}
static inline void __list_add(struct sl_link *new,
struct sl_link *prev,
struct sl_link *next)
{
next->prev = new;
new->next = next;
new->prev = prev;
compiler_barrier();
prev->next = new;
}
static inline void list_add(struct sl_link *new, struct sl_link *prev)
{
__list_add(new, prev, prev->next);
}
static inline void __list_del(struct sl_link *prev, struct sl_link *next)
{
next->prev = prev;
compiler_barrier();
prev->next = next;
}
static inline void list_del(struct sl_link *node)
{
__list_del(node->prev, node->next);
list_init(node);
}
#define list_for_each_from(pos, head) for (; pos != head; pos = pos->next)
#define list_for_each_safe_from(pos, n, head) \
for (n = pos->next; pos != head; pos = n, n = pos->next)
#define container_of(ptr, type, member) \
__extension__({ \
const __typeof__(((type *) 0)->member) *__mptr = (ptr); \
(type *) ((char *) __mptr - offsetof(type, member)); \
})
#define list_entry(ptr, i) container_of(ptr, struct sl_node, link[i])
/* the probability set at 1/2 */
static inline void set_random(void)
{
time_t current_time;
srandom(time(¤t_time));
}
static inline int random_level(void)
{
int level = 0;
uint32_t random_seed = (uint32_t) random();
while (random_seed && (random_seed & HHHH)) {
random_seed >>= 1;
level++;
}
return level >= SL_MAXLEVEL ? SL_MAXLEVEL - 1 : level;
}
static struct sl_node *sl_node_alloc(int key, void *val, int level)
{
struct sl_node *node =
malloc(sizeof(struct sl_node) + (level + 1) * sizeof(struct sl_link));
if (!node)
return NULL;
node->key = key, node->val = val;
return node;
}
struct sl_list *sl_list_alloc(void)
{
struct sl_list *list = malloc(sizeof(struct sl_list));
if (!list)
return NULL;
list->level = 0;
list->size = 0;
set_random();
for (int i = 0; i < SL_MAXLEVEL; i++)
list_init(&list->head[i]);
return list;
}
void sl_delete(struct sl_list *list)
{
struct sl_link *n, *pos = list->head[0].next;
list_for_each_safe_from (pos, n, &list->head[0]) {
struct sl_node *node = list_entry(pos, 0);
free(node);
}
free(list);
}
void *sl_search(struct sl_list *list, int key)
{
int i = list->level;
struct sl_link *pos = &list->head[i];
struct sl_link *head = &list->head[i];
for (; i >= 0; i--) {
pos = pos->next;
list_for_each_from (pos, head) {
struct sl_node *node = list_entry(pos, i);
if (node->key > key)
break;
else if (node->key == key)
return node->val;
}
pos = pos->prev;
pos--;
head--;
}
return NULL;
}
int sl_insert(struct sl_list *list, int key, void *val)
{
int i, level = random_level();
struct sl_link *pos = &list->head[level];
struct sl_link *head = &list->head[level];
struct sl_node *new = sl_node_alloc(key, val, level);
if (!new)
return -ENOMEM;
if (level > list->level)
list->level = level;
for (i = level; i >= 0; i--) {
pos = pos->next;
list_for_each_from (pos, head) {
struct sl_node *tmp = list_entry(pos, i);
if (tmp->key > key) {
break;
} else if (tmp->key == key)
goto failed;
}
pos = pos->prev;
IIII;
pos--;
head--;
}
list->size++;
return 0;
failed:
free(new);
return -EEXIST;
}
int sl_erase(struct sl_list *list, int key)
{
int i = list->level;
struct sl_link *pos = &list->head[i];
struct sl_link *n, *head = &list->head[i];
for (; i >= 0; i--) {
pos = pos->next;
JJJJ {
struct sl_node *tmp = list_entry(pos, i);
if (tmp->key == key) {
for (; i >= 0; i--) {
list_del(pos--);
if (list->head[i].next == &list->head[i])
list->level--;
}
free(tmp);
list->size--;
return 0;
} else if (tmp->key > key)
break;
}
pos = pos->prev;
pos--;
head--;
}
return -EINVAL;
}
#include <assert.h>
#include <stdio.h>
#define test(ops, index, size) \
printf("[" #ops \
"]: " \
"return %d, " \
"index %d, " \
"size %d\n", \
(int) ops, index, size)
#define times 10000
int main(int argc, char *argv[])
{
struct sl_list *list = sl_list_alloc();
int i, arr[10] = {0};
for (i = 0; i < times; i++)
assert(sl_insert(list, i, &arr[i]) == 0);
test(sl_insert(list, i, NULL), i, list->size);
for (i = 0; i < 10; i++) {
arr[i] = i;
printf("%d search %d\n", i, *(int *) sl_search(list, i));
}
for (i = 0; i < times; i++)
assert(sl_erase(list, i) == 0);
test(sl_erase(list, i), i, list->size);
sl_delete(list);
return 0;
}
```
請補完程式碼,使其運作符合預期。作答規範:
* 以最精簡的形式撰寫
* `HHHH` 為正整數
* `IIII` 和 `JJJJ` 為合法的 C 敘述,均以 `list_` 開頭
* 符合 lab0-c 指定的程式碼排版風格
:::success
延伸問題:
1. 解釋上述程式碼運作原理,指出設計缺失並改進
2. 研讀〈[A kernel skiplist implementation](https://lwn.net/Articles/551896/)〉和〈[Skiplists II: API and benchmarks](https://lwn.net/Articles/553047/)〉,探討 Linux 核心引入的 skip list 的考量,找出現行 Linux 核心的應用案例
3. 分析 [Concurrent-Skip-list](https://github.com/shreyas-gopalakrishna/Concurrent-Skip-list) 並以 C11 Atomics 重新實作,搭配閱讀〈[Concurrent Skiplists](https://spcl.inf.ethz.ch/Teaching/2013-dphpc/final/4.pdf)〉,實作更高效的程式碼。
:::
---
### 測驗 `3`
〈[並行程式設計: Hazard pointer](https://hackmd.io/@sysprog/concurrency-hazard-pointer)〉提到:
> 在並行程式設計中,當我們在存取共用的記憶體物件時,需要考慮到其他執行緒是否有可能也正在存取同一個物件,若要釋放該記憶體物件時,不考慮這個問題,會引發嚴重的後果,例如 dangling pointer。
>
> 使用 mutex 是最簡單且直觀的方法:存取共用記憶體時,acquire lock 即可保證沒有其他執行緒正在存取同一物件,也就可安全地釋放記憶體。但若我們正在存取的是一種 lock-free 資料結構,當然就不能恣意地使用 lock,因為會違反 lock-free 特性,即無論任何執行緒失敗,其他執行緒都能可繼續執行。於是乎,我們需要某種同為 lock-free 的記憶體物件回收機制。
`lfq` 嘗試實作精簡的並行佇列 (concurrent queue),運用 hazard pointer 來釋放並行處理過程中的記憶體,程式碼可見: [lfq](https://gist.github.com/jserv/81b0a90506d584c5c9be17122bcf114c) (部分遮蔽),其中 `atomics.h` 如下:
```c
#pragma once
/* TODO: rewrite in C11 Atomics */
#if !defined(__x86_64__) || defined(__i386)
#error "This implementation is specific to x86(-64)"
#endif
#include <stdbool.h>
#include <stdlib.h>
#include <string.h>
#ifndef asm
#define asm __asm
#endif
#define cmpxchg(ptr, _old, _new) \
{ \
volatile uint32_t *__ptr = (volatile uint32_t *) (ptr); \
uint32_t __ret; \
asm volatile("lock; cmpxchgl %2,%1" \
: "=a"(__ret), "+m"(*__ptr) \
: "r"(_new), "0"(_old) \
: "memory"); \
); \
__ret; \
}
#define ATOMIC_SET __sync_lock_test_and_set
#define ATOMIC_RELEASE __sync_lock_release
#define ATOMIC_SUB __sync_sub_and_fetch
#define ATOMIC_SUB64 ATOMIC_SUB
#define CAS __sync_bool_compare_and_swap
/* The 2nd argument is limited to 1 on machines with TAS but not XCHG.
* On x86 it is an arbitrary value.
*/
#define XCHG __sync_lock_test_and_set
#define ATOMIC_ADD __sync_add_and_fetch
#define ATOMIC_ADD64 ATOMIC_ADD
#define mb __sync_synchronize
/* compiler barrier only: runtime reordering is impossible on x86 */
#define lmb() asm volatile("" ::: "memory")
#define smb() asm volatile("" ::: "memory")
```
顯然上述實作針對 x86(-64),以 `__sync` 開頭的內建函式為 GCC [Legacy __sync Built-in Functions for Atomic Memory Access](https://gcc.gnu.org/onlinedocs/gcc/_005f_005fsync-Builtins.html)
編譯方式:
```
gcc -std=gnu99 -O3 -Wall -Wextra -o test_p1c1 -D MAX_PRODUCER=1 -D MAX_CONSUMER=1 lfq.c test.c -lpthread
```
這個實作應該要能夠支援以下組態:
* ` -D MAX_PRODUCER=4 -D MAX_CONSUMER=4`
* `-D MAX_PRODUCER=100 -D MAX_CONSUMER=10`
* `-D MAX_PRODUCER=10 -D MAX_CONSUMER=100`
預期輸出:
```
Producer thread [140583962404608] exited! Still 0 running...
Consumer thread [140583970797312] exited 0
Total push 500000 elements, pop 500000 elements. freelist=1, clean = 0
Test PASS!!
```
補完程式碼,使其運作符合預期。作答規範:
* `AAAA`, `BBBB`, `CCCC` 均包含 `CAS` 巨集,且為合法表示式
延伸閱讀:
* [Experiments on Concurrent Queue](https://hackmd.io/@butastur/concurrent-queue)
* [concurrent-ll (2022)](https://hackmd.io/@kdnvt/concurrent-ll-2022)
:::success
延伸問題:
1. 解釋上述程式碼運作原理
2. 用 C11 Atomics 改寫,使得能夠支援 x86(-64) 以外的處理器架構
3. 遞交 pull request,以上述程式碼的改進版本來取代 [lf-queue](https://github.com/sysprog21/concurrent-programs/tree/master/lf-queue)
:::