# [2022q1](http://wiki.csie.ncku.edu.tw/sysprog/schedule) 第 5 週測驗題
###### tags: `linux2022`
:::info
目的: 檢驗學員對 **[CS:APP 第 2 章](https://hackmd.io/@sysprog/CSAPP-ch2)** 和 **[並行和多執行緒程式設計](https://hackmd.io/@sysprog/concurrency)** 的認知
:::
作答表單:
* ==[測驗 `1`](https://docs.google.com/forms/d/e/1FAIpQLSf2uxh_DtSYRfIvgEzuQtsBeM4FDT_cZVf-yV5Y4PGngdr3Ag/viewform)== (Linux 核心設計)
* ==[測驗 `2`](https://docs.google.com/forms/d/e/1FAIpQLSciIx7K4kDr2a77UipQAqEuLT9sREgi7LWfu3ZRwOJ6p9ib5w/viewform)== (Linux 核心實作)
### 測驗 `1`
LeetCode [866. Prime Palindrome](https://leetcode.com/problems/prime-palindrome/) 要求判斷某數是否為 [Prime Palindrome](https://en.wikipedia.org/wiki/Palindromic_prime) (回文質數,簡稱 Palprime),也就是既是質數又是迴文數的整數。在十進位中,最小的幾個回文質數如下: ([A002385](https://oeis.org/A002385))
> 2, 3, 5, 7, 11, 101, 131, 151, 181, 191, 313, 353, 373, 383, 727, 757, 787, 797, 919, 929, 10301, 10501, 10601, 11311, ...
`17000000000000071` 則是個 17 位數的回文質數。回文質數的特性:
* 所有個位質數皆是回文質數,即 2, 3, 5, 7
* 兩位質數中只有 $11$ 一個回文質數,因其餘的回文數如 $22$ 等均是 11 的倍數
* 三位數以上,回文質數的起始一定是 1, 3, 7, 9 其中一個,因質數的個位只可是上述四數字之一
* 三位數回文質數有 101, 131, 151, 181, 191, 313, 353, 373, 383, 727, 757, 787, 797, 919 和 929,共 15 個
* 沒有四位數回文質數,因所有四位回文數圴是 $11$ 的倍數。其實在所有偶位數的回文數也都是 $11$ 的倍數,即除 $11$ 以外,也沒有其他偶數位 (Even Digits) 的回文質數
* 五位數回文質數有 84 個,最小的是 $11311$,而最大的是 $98689$
改寫[第 3 週測驗題](https://hackmd.io/@sysprog/linux2022-quiz3)裡頭的整數開平方根實作,藉由 [lookup table](https://en.wikipedia.org/wiki/Lookup_table) (LUT) 的手法降低分支的數量: (但這不必然較快)
```cpp
static ull isqrt(ull x)
{
if (x == 0) return 0;
int lz = __builtin_clzll(x) & 62;
x <<= lz;
uint32_t y = isqrt64_tab[(x >> 56) - 64];
y = (y << 7) + (x >> 41) / y;
y = (y << 15) + (x >> 17) / y;
y -= x < (uint64_t) y * y;
return y >> (lz >> 1);
}
```
已知在十進位,一個整數的每個數字的總和若能被 3 整除,這個數就是 3 的倍數,我們可搭配[第 2 週測驗題](https://hackmd.io/@sysprog/linux2022-quiz2)提到的 `is_divisible` 來檢驗用字串表示的數值是否可被 3 整除:
```cpp
static bool is_divisible_by3(char *buf, int len)
{
int sum = 0;
while (--len >= 0)
sum += buf[len] - '0';
static uint32_t M3 = UINT32_C(0xFFFFFFFF) / 3 + 1;
return (sum * M3) <= (M3 - 1);
}
```
以下實作利用 [Sieve of Eratosthenes](https://en.wikipedia.org/wiki/Sieve_of_Eratosthenes),嘗試找出第 1500 個回文質數,有效位數是 15:
```cpp
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define N_DIGITS 15
#define LIMIT 1500
typedef unsigned long long ull;
static ull pow10[] = {
1,
10,
100,
1000,
10000,
100000,
1000000,
10000000,
100000000,
1000000000,
10000000000,
100000000000,
1000000000000,
10000000000000,
100000000000000,
1000000000000000,
10000000000000000,
};
static int midindex;
static uint8_t *psieve;
static ull half_max;
#define __ALIGN_MASK(x, mask) (((x) + (mask)) & ~(mask))
#define ALIGN(x, a) __ALIGN_MASK(x, (typeof(x))(a) -1)
enum {
PRIME = 0,
COMPOSITE = 1,
};
static inline ull fastpow10(const int n)
{
if (n < 0 || n > 16) {
free(psieve);
printf("n = %d\n", n);
exit(1);
}
return pow10[n];
}
/* isqrt64_tab[k] = isqrt(256 * (k + 65) - 1) for 0 <= k < 192 */
static const uint8_t isqrt64_tab[192] = {
128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142,
143, 143, 144, 145, 146, 147, 148, 149, 150, 150, 151, 152, 153, 154, 155,
155, 156, 157, 158, 159, 159, 160, 161, 162, 163, 163, 164, 165, 166, 167,
167, 168, 169, 170, 170, 171, 172, 173, 173, 174, 175, 175, 176, 177, 178,
178, 179, 180, 181, 181, 182, 183, 183, 184, 185, 185, 186, 187, 187, 188,
189, 189, 190, 191, 191, 192, 193, 193, 194, 195, 195, 196, 197, 197, 198,
199, 199, 200, 201, 201, 202, 203, 203, 204, 204, 205, 206, 206, 207, 207,
208, 209, 209, 210, 211, 211, 212, 212, 213, 214, 214, 215, 215, 216, 217,
217, 218, 218, 219, 219, 220, 221, 221, 222, 222, 223, 223, 224, 225, 225,
226, 226, 227, 227, 228, 229, 229, 230, 230, 231, 231, 232, 232, 233, 234,
234, 235, 235, 236, 236, 237, 237, 238, 238, 239, 239, 240, 241, 241, 242,
242, 243, 243, 244, 244, 245, 245, 246, 246, 247, 247, 248, 248, 249, 249,
250, 250, 251, 251, 252, 252, 253, 253, 254, 254, 255, 255,
};
/* integer square root of a 64-bit unsigned integer */
static ull isqrt(ull x)
{
if (x == 0)
return 0;
int lz = __builtin_clzll(x) & 62;
x <<= lz;
uint32_t y = isqrt64_tab[(x >> 56) - 64];
y = (y << 7) + (x >> 41) / y;
y = (y << 15) + (x >> 17) / y;
y -= x < (uint64_t) y * y;
return y >> (lz >> 1);
}
/* Generates a table of prime numbers up to the maximum having digits.
* All even numbers (including 2) are ignored. Bit n in the bitmap
* represents the odd number (2n + 1). This enables the table to be
* half the size of a naive Sieve of Eratosthenes implementation.
*/
static void generate_sieve(int digits)
{
ull max = 0;
for (int count = 0; count < digits; ++count)
max = max * 10 + 9;
max = isqrt(max);
half_max = max >> 1;
/* We need half the space as multiples of 2 can be omitted */
int bytes = (max >> 1) + (max & 0x1);
/* Calculate the actual number of bytes required */
bytes = (bytes >> 3) + (bytes & 0x1);
bytes = ALIGN(bytes, 16); /* Align-up to 16-byte */
psieve = realloc(psieve, bytes);
if (!psieve) {
printf("realloc() failed!\n");
exit(1);
}
memset(psieve, 0, bytes);
/* In psieve bit 0 -> 1, 1 -> 3, 2 -> 5, 3 -> 7 and so on... */
/* Set the 0th bit representing 1 to COMPOSITE
*/
psieve[0] |= COMPOSITE << (1 >> 1);
unsigned char mask = 0x7;
for (ull n = 3; n <= max; n += 2) {
if (((psieve[n >> 4] >> ((n >> 1) & mask)) & 0x1) == PRIME) {
for (ull mul = (n << 1); mul < max; mul += n) {
/* Skip the evens: there is no representation in psieve */
if (!(mul & 0x1))
continue;
/* Set offset of mul in psieve */
psieve[mul >> 4] |= COMPOSITE
<< ((mul >> 1) & mask); /* bit offset */
}
}
}
}
/* Convert long to ASCII */
static char *ltoa(ull val, int *len)
{
static char ascbuf[32] = {0};
int i = 30;
for (; val && i; --i, val /= 10)
ascbuf[i] = "0123456789"[val % 10];
*len = 30 - i;
return &ascbuf[++i];
}
/* Check if a number (converted to string) is divisible by 3 */
static bool is_divisible_by3(char *buf, int len)
{
int sum = 0;
while (--len >= 0)
sum += buf[len] - '0';
static uint32_t M3 = UINT32_C(0xFFFFFFFF) / 3 + 1;
return (sum * M3) <= (M3 - 1);
}
/* Check if a number is prime. */
static bool isprime(const ull val)
{
if (!(val & 0x1)) /* Test for divisibility by 2 */
return false;
ull *pquadbits = (ull *) psieve;
ull next = 3; /* start at 7 (i.e. 3 * 2 + 1) */
for (ull quad = ~*pquadbits & ~0b111, prev = 0; prev <= half_max;
quad = ~*++pquadbits) {
if (!quad) {
prev += 64;
continue;
}
while (quad) {
ull i = EXP1;
next = prev + i;
if (!(val % ((next << 1) + 1)))
return false;
quad &= EXP2;
}
prev += 64;
}
return true;
}
/* Generate the next palindrome after a palindrome
* 123454321 -> 123464321
* 123494321 -> 123505321
* 123999321 -> 124000421
*/
static ull getnextpalin(char *buf, const int *len)
{
midindex = (*len >> 1) - 1;
/* Handle the case of odd number of digits.
* If the central digit is 9, reset it to 0 and process adjacent digit,
* otherwise increment it and return.
*/
if ((*len & 0x1) == 0x1) {
if (buf[midindex + 1] - '0' != 9) {
buf[midindex + 1] += 1;
return atol(buf);
}
buf[midindex + 1] = '0';
}
/* Adjust other digits */
for (; midindex >= 0; --midindex) {
if (buf[midindex] - '0' != 9) {
buf[midindex] += 1;
buf[*len - midindex - 1] += 1;
return atol(buf);
}
buf[midindex] = '0';
buf[*len - midindex - 1] = '0';
}
/* If we have exhausted numbers in *len digits, increase the number of
* digits and return the first palindrome of the form 10..0..01
*/
/* The last palin will always be of the form 99...99 */
return atol(buf) + 2;
}
int main()
{
int count = LIMIT;
ull i = 100000000000001;
int len = 0;
char *buf = ltoa(i, &len);
if (len < N_DIGITS) {
printf("len: %d\n", len);
exit(1);
}
generate_sieve(N_DIGITS);
/* We started at 1000000000001. */
while (1) {
/* If number of digits are even, all palindromes are divisible by 11.
* Get first palindrome of next odd number of digits.
*/
if (!(len & 0x1)) {
i = 1ULL | fastpow10(len);
buf = ltoa(i, &len);
continue;
}
/* Check if number is divisible by 5 or 3 */
if ((buf[len - 1] != '5') && !is_divisible_by3(buf, len)) {
if (isprime(i) && (--count == 0)) {
printf("%s\n", buf);
return 0;
}
}
int oldlen = len;
i = getnextpalin(buf, &len);
/* Refresh buffer if number of digits has increased */
if (oldlen != len)
buf = ltoa(i, &len);
}
free(psieve);
return 0;
}
```
預期執行結果為 `100191191191001`
請補完程式碼,得以符合預期。書寫規範:
1. `EXP1` 和 `EXP2` 均為不包含 `,` 和 `;` 的表達式,請用符合 C99 規範和程式碼排版原則的最精簡形式作答
2. `EXP1` 必須使用到 `__builtin_clz`, `__builtin_clzll`, `__builtin_ctz`, 和 `__builtin_ctzll` 其一 GCC 內建函式
3. `EXP2` 只能出現一組小括號 (即 `(` 和 `)`),搭配使用 bitwise 運算子
延伸閱讀:
* [A Fast Algorithm for the Integer Square Root](https://www.nuprl.org/MathLibrary/integer_sqrt/)
* [Computing a logarithm or sqrt](https://xcore.github.io/doc_tips_and_tricks/sqrt-log.html)
* [Computing the number of digits of an integer even faster](https://lemire.me/blog/2021/06/03/computing-the-number-of-digits-of-an-integer-even-faster/)
* [Optimized itoa function](https://itecnote.com/tecnote/optimized-itoa-function/)
* 典型的 [Sieve of Eratosthenes](https://en.wikipedia.org/wiki/Sieve_of_Eratosthenes) 演算法的時間複雜度為 $O(N \log (\log N))$,〈[An improved sieve of Eratosthenes](https://arxiv.org/abs/1712.09130)〉提出改進的演算法,使其時間複雜度為 $O((log N)^c)$,其中 $c$ 接近 `1`
* [你知道你的生日可以寫成 3 個迴文數相加嗎?](https://sites.google.com/a/g2.nctu.edu.tw/unimath/2020-01/02022020)
:::success
延伸問題:
1. 解釋上述程式碼運作原理,指出可改進之處並實作
> 是否有必要先將數值轉成字串?用十進位的角度處理運算是否產生額外的計算負擔?
2. Linux 核心原始程式碼 [lib/math/prime_numbers.c](https://github.com/torvalds/linux/blob/master/lib/math/prime_numbers.c) 有類似的程式碼,請解釋其作用、裡頭 [RCU](https://hackmd.io/@sysprog/linux-rcu) 的使用情境,及針對執行時間的改進
:::
---
### 測驗 `2`
在並行程式設計中,當我們在存取共用的記憶體物件時,需要考慮到其他執行緒是否有可能也正在存取同一個物件,若要釋放該記憶體物件時,不考慮這個問題,會引發嚴重的後果,例如 [dangling pointer](https://www.wikiwand.com/en/Dangling_pointer)。
使用 mutex 是最簡單且直觀的方法:存取共用記憶體時,acquire lock 即可保證沒有其他執行緒正在存取同一物件,也就可安全地釋放記憶體。但若我們正在存取的是一種 lock-free 資料結構,當然就不能恣意地使用 lock,因為會違反 lock-free 特性,即無論任何執行緒失敗,其他執行緒都能可繼續執行。於是乎,我們需要某種同為 lock-free 的記憶體物件回收機制。
對於 C 這樣缺乏內建 concurrent GC 機制的程式語言來說,若要實作 lock-free 演算法,就要自行處理記憶體釋放的議題。[Hazard pointer](https://en.wikipedia.org/wiki/Hazard_pointer) 是其中一種解決方案,其原理是讀取端執行緒對指標進行識別,指標 (特別是指向的記憶體區塊) 若要釋放時,會事先保存,延遲到確認沒有讀取端執行緒,才進行真正的釋放。Linux 核心的 [RCU 同步機制](https://hackmd.io/@sysprog/linux-rcu)是另一種 lock-free 程式設計演算法和記憶體回收機制。
:::info
"[hazard](https://dictionary.cambridge.org/zht/%E8%A9%9E%E5%85%B8/%E8%8B%B1%E8%AA%9E-%E6%BC%A2%E8%AA%9E-%E7%B9%81%E9%AB%94/hazard)" 一詞多用來指「危險物、危害物」,與 "danger" 的主要區別:
1. hazard 是可數名詞
2. "hazard" 不是立刻會發生的危險,而是可能會帶來危險的「隱患」,例如 a health hazard (一個健康隱患) 和 a safety hazard (一個[行為或功能的]安全隱患)
:::
〈[Lock-Free Data Structures with Hazard Pointers](https://erdani.org/publications/cuj-2004-12.pdf)〉寫道:
> Each reader thread owns a single-writer/multi-reader shared pointer called [Hazard pointer](https://en.wikipedia.org/wiki/Hazard_pointer). When a reader thread assigns the address of a map to its hazard pointer, it is basically announcing to other threads (writers), "I am reading this map. You can replace it if you want, but don’t change its contents and certainly keep your deleteing hands off it."
[Hazard pointer](https://en.wikipedia.org/wiki/Hazard_pointer) 可簡稱為 "HP",其關鍵的結構有:
* Hazard pointer
* retire list (也稱為 thread free list)
![](https://i.imgur.com/XtihewZ.png)
[hazard pointer](https://en.wikipedia.org/wiki/Hazard_pointer) 是一種可以解決 lock-free data 在動態記憶體配置的問題之方法。其基本的概念是允許 hazard pointer 可以被一個 thread 寫而多個 thread 讀,當存在 reader thread 正在操作該 hazard pointer 時,hazard pointer 會被標註,於是 writer thread 可得知此訊息並延遲對其釋放。
在 hazard pointer 架構中,每個 thread 首先需要各自維護的兩個關鍵集合是:
* hazard pointer: 儲存此 thread 正在存取的指標,因此該指標不能直接被釋放
* retire list: 被這個 thread 要求釋放的指標,但實際尚未釋放
因此要安全的釋放記憶體,其基本想法就是:
* 每個執行緒放在 hazard pointer 中的指標尚不能被釋放
* 每個執行緒要求釋放的指標先放在 retire list 中
* 掃描 retire list 可以得知所有執行緒皆不使用的指標,則可將其真正的釋放給作業系統
〈[Lock-Free Data Structures with Hazard Pointers](https://erdani.org/publications/cuj-2004-12.pdf)〉提到,lock-free programing 為了保證持續進展的特性,每個執行緒都有機會在任意時間去操作一個 object。當 thread A 在釋放一個物件時,需確保沒有另一個 thread B 仍取得該物件的 reference 且正要存取。如果先行釋放此 object,另一個 thread 的存取就會出錯。以下是經典的解決方式:
1. [Reference counting](https://en.wikipedia.org/wiki/Reference_counting): 基本想法是每當一個 thread 取得 reference 就將一個計數(其地址不同於 pointer)加一,解除 reference 時則減一。但是實際上其正確性要確保取得 pointer 和 reference count 加一是一個 atomic operation,需要使用特殊的 [DCAS (Double Compare And-Swap)](https://en.wikipedia.org/wiki/Double_compare-and-swap) 指令,但其效率不彰
2. Wait and delete: 就是 「等一段時間」 再刪除,但等待時間未知
3. Keep a reference count next to the pointer: 使用比起 DCAS 較合理且效率比較好的 CAS2,可以 atomic 的交換 memory 中的兩個相鄰的 words,不過 64 位元的處理器上一般不存在這種指令(但可以透過一些操作指標的技巧達到同等效果)
第三個方案看似可行,但實際上仍有缺陷,因為 CAS 語意要求 object 和 reference count 要同時一致才能替換上新的數值,write 操作需要等待到沒有 read 操作 (reference count) 才能進行。換句話說,在一個 read 操作結束前如果又有下一個 read 進來,write 就只能空等待。
為克服上述方法的問題,hazard pointer 是一個更佳方案 (尤其針對 Write-Rarely-Read-Many 的情境)。內文中以 lock-free 的 map 為案例:
```cpp
template <class K, class V>
class WRRMMap {
Map<K, V> * pMap_;
...
};
```
當 `WRRMMap` 需要被更新時,writer thread 會去取得一個 `*pMap_` 指向 map 的副本並更改,然後再將 `*pMap_` 指向新的副本,並且回收舊的。麻煩的問題是這個舊的 map 可能有其他 thread 正嘗試讀取。
而 hazard pointer 的概念就是讓 reader 將正在存取的 map 加入到自己的單讀多寫(single-writer-multiple-reader)的 hazard pointer 中,其目的等同向其他 writer 宣告禁止回收該 map 。當 writer 把舊的 map 替換下來,map 先被放在一個 thread 獨立的 list 而不先釋放 ,直到 list 中的舊 map 達到一個上限,再去掃描每個 reader 的 hazard pointer 中是否有相匹配。如果某個舊 map 不與任何 reader 的 hazard pointer 匹配,那麼釋放該 map 就是安全的,否則就繼續將其留在 list 中,直到下次掃描。
```cpp
// Hazard pointer record
class HPRecType {
HPRecType *pNext_;
int active_;
static HPRecType *pHead_;
static int listLen_;
public:
void *pHazard_;
static HPRecType *Head() {
return pHead_;
}
```
hazard pointer 的基本結構由 linked list 構成:
* `*pNext` 指向下個節點
* `active_` 表示該節點被使用與否
* `pHead_` 是 linked list 的首個節點
* `listLen_` 則是 list 中的節點數量
* `pHazard_` 是物件的指標本體(例如 map)
```cpp
// Acquires one hazard pointer
static HPRecType * Acquire() {
// Try to reuse a retired HP record
HPRecType * p = pHead_;
for (; p; p = p->pNext_) {
if (p->active_ ||
!CAS(&p->active_, 0, 1))
continue;
return p;
}
// Increment the list length
int oldLen;
do {
oldLen = listLen_;
} while (!CAS(&listLen_, oldLen, oldLen + 1));
// Allocate a new one
HPRecType * p = new HPRecType;
p->active_ = 1;
p->pHazard_ = 0;
// Push it to the front
do {
old = pHead_;
p->pNext_ = old;
} while (!CAS(&pHead_, old, p));
return p;
}
...
```
要將一個指標加入 hazard pointer 需要 acquire 一個可用的節點,首先先看看有沒有已經被建立的空節點(沒有對應要保護的 `pHazard` 實體)可以直接使用。如果沒有,則需要建立新的 node 插入到 hazard pointer linked list 的開頭,`listLen_` 也要相應的做加一
```cpp
// Releases a hazard pointer
static void Release(HPRecType* p) {
p->pHazard_ = 0;
p->active_ = 0;
}
};
```
要解除對指標的保護則透過 release,單純的將該節點的 `pHazard_` 重置且 `active_` 允許重用即可。
```cpp
// Per-thread private variable
__per_thread__ vector<Map<K, V> *> rlist;
```
每個 thread 還有一個獨立存取的 retire list,存放該 thread 不再需要,若其它thread 也不再需要就可以被釋放 pointer。 retire list 因為只有每個 thread 自己可以存取的,因此不需要被同步。
```cpp
template <class K, class V>
class WRRMMap {
Map<K, V> * pMap_;
...
private:
static void Retire(Map<K, V> * pOld) {
// put it in the retired list
rlist.push_back(pOld);
if (rlist.size() >= R) {
Scan(HPRecType::Head())
}
}
};
```
`Retire` 操作將要原本應該要釋放的 pointer 加入 vector 中,先儲存起來延遲釋放。當 vector 所放的 pointer 超過一個大小就呼叫 `Scan` 去掃描是否有可以真正釋放的 pointer。
```cpp
void Scan(HPRecType * head) {
// Stage 1: Scan hazard pointers list
// collecting all non-null ptrs
vector<void*> hp;
while (head) {
void * p = head->pHazard_;
if (p)
hp.push_back(p);
head = head->pNext_;
}
// Stage 2: sort the hazard pointers
sort(hp.begin(), hp.end(), less<void*>());
// Stage 3: Search for'em!
vector<Map<K, V>*>::iterator i = rlist.begin();
while (i != rlist.end()) {
if (!binary_search(hp.begin(), hp.end(), *i) {
// Aha!
delete *i;
if (&*i != &rlist.back()) {
*i = rlist.back();
}
rlist.pop_back();
} else {
++i;
}
}
}
```
`Scan` 的任務是找出 retire list 集合和所有 thread 的 hazard pointer 集合的差集,那些 pointer 就是可以被釋放而且不會導致錯誤的。
以下是 Hazrd pointer 的簡化 C11 實作,搭配 GNU extension:
```cpp
/* shortcuts */
#define atomic_load(src) __atomic_load_n(src, __ATOMIC_SEQ_CST)
#define atomic_store(dst, val) __atomic_store(dst, val, __ATOMIC_SEQ_CST)
#define atomic_exchange(ptr, val) \
__atomic_exchange_n(ptr, val, __ATOMIC_SEQ_CST)
#define atomic_cas(dst, expected, desired) \
__atomic_compare_exchange(dst, expected, desired, 0, __ATOMIC_SEQ_CST, \
__ATOMIC_SEQ_CST)
#include <stdint.h>
#define LIST_ITER(head, node) \
for (node = atomic_load(head); node; node = atomic_load(&node->next))
typedef struct __hp {
uintptr_t ptr;
struct __hp *next;
} hp_t;
#include <stdbool.h>
#include <stdlib.h>
#include <string.h>
/* Allocate a new node with specified value and append to list */
static hp_t *list_append(hp_t **head, uintptr_t ptr)
{
hp_t *new = calloc(1, sizeof(hp_t));
if (!new)
return NULL;
new->ptr = ptr;
hp_t *old = atomic_load(head);
do {
new->next = old;
} while (!atomic_cas(head, &old, &new));
return new;
}
/* Attempt to find an empty node to store value, otherwise append a new node.
* Returns the node containing the newly added value.
*/
hp_t *list_insert_or_append(hp_t **head, uintptr_t ptr)
{
hp_t *node;
bool need_alloc = true;
LIST_ITER (head, node) {
uintptr_t expected = atomic_load(&node->ptr);
if (expected == 0 && atomic_cas(&node->ptr, &expected, &ptr)) {
need_alloc = false;
break;
}
}
if (need_alloc)
node = list_append(head, ptr);
return node;
}
/* Remove a node from the list with the specified value */
bool list_remove(hp_t **head, uintptr_t ptr)
{
hp_t *node;
const uintptr_t nullptr = 0;
LIST_ITER (head, node) {
uintptr_t expected = atomic_load(&node->ptr);
if (expected == ptr && atomic_cas(&node->ptr, &expected, &nullptr))
return true;
}
return false;
}
/* Returns 1 if the list currently contains an node with the specified value */
bool list_contains(hp_t **head, uintptr_t ptr)
{
hp_t *node;
LIST_ITER (head, node) {
if (atomic_load(&node->ptr) == ptr)
return true;
}
return false;
}
/* Frees all the nodes in a list - NOT THREAD SAFE */
void list_free(hp_t **head)
{
hp_t *cur = *head;
while (cur) {
hp_t *old = cur;
cur = cur->next;
free(old);
}
}
#define DEFER_DEALLOC 1
typedef struct {
hp_t *pointers;
hp_t *retired;
void (*deallocator)(void *);
} domain_t;
/* Create a new domain on the heap */
domain_t *domain_new(void (*deallocator)(void *))
{
domain_t *dom = calloc(1, sizeof(domain_t));
if (!dom)
return NULL;
dom->deallocator = deallocator;
return dom;
}
/* Free a previously allocated domain */
void domain_free(domain_t *dom)
{
if (!dom)
return;
if (dom->pointers)
list_free(&dom->pointers);
if (dom->retired)
list_free(&dom->retired);
free(dom);
}
/*
* Load a safe pointer to a shared object. This pointer must be passed to
* `drop` once it is no longer needed. Returns 0 (NULL) on error.
*/
uintptr_t load(domain_t *dom, const uintptr_t *prot_ptr)
{
const uintptr_t nullptr = 0;
while (1) {
uintptr_t val = atomic_load(prot_ptr);
hp_t *node = list_insert_or_append(&dom->pointers, val);
if (!node)
return 0;
/* Hazard pointer inserted successfully */
if (atomic_load(prot_ptr) == val)
return val;
/*
* This pointer is being retired by another thread - remove this hazard
* pointer and try again. We first try to remove the hazard pointer we
* just used. If someone else used it to drop the same pointer, we walk
* the list.
*/
uintptr_t tmp = val;
if (!atomic_cas(&node->ptr, &tmp, &nullptr))
list_remove(&dom->pointers, val);
}
}
/*
* Drop a safe pointer to a shared object. This pointer (`safe_val`) must have
* come from `load`
*/
void drop(domain_t *dom, uintptr_t safe_val)
{
if (!list_remove(&dom->pointers, safe_val))
__builtin_unreachable();
}
static void cleanup_ptr(domain_t *dom, uintptr_t ptr, int flags)
{
if (!list_contains(&dom->pointers, ptr)) { /* deallocate straight away */
dom->deallocator((void *) ptr);
} else if (flags & DEFER_DEALLOC) { /* Defer deallocation for later */
list_insert_or_append(&dom->retired, ptr);
} else { /* Spin until all readers are done, then deallocate */
while (list_contains(&dom->pointers, ptr))
;
dom->deallocator((void *) ptr);
}
}
/* Swaps the contents of a shared pointer with a new pointer. The old value will
* be deallocated by calling the `deallocator` function for the domain, provided
* when `domain_new` was called. If `flags` is 0, this function will wait
* until no more references to the old object are held in order to deallocate
* it. If flags is `DEFER_DEALLOC`, the old object will only be deallocated
* if there are already no references to it; otherwise the cleanup will be done
* the next time `cleanup` is called.
*/
void swap(domain_t *dom, uintptr_t *prot_ptr, uintptr_t new_val, int flags)
{
const uintptr_t old_obj = atomic_exchange(prot_ptr, new_val);
cleanup_ptr(dom, old_obj, flags);
}
/* Forces the cleanup of old objects that have not been deallocated yet. Just
* like `swap`, if `flags` is 0, this function will wait until there are no
* more references to each object. If `flags` is `DEFER_DEALLOC`, only
* objects that already have no living references will be deallocated.
*/
void cleanup(domain_t *dom, int flags)
{
hp_t *node;
LIST_ITER (&dom->retired, node) {
uintptr_t ptr = node->ptr;
if (!ptr)
continue;
if (!list_contains(&dom->pointers, ptr)) {
/* We can deallocate straight away */
if (EXP3)
dom->deallocator((void *) ptr);
} else if (!(flags & DEFER_DEALLOC)) {
/* Spin until all readers are done, then deallocate */
while (list_contains(&dom->pointers, ptr))
;
if (EXP4)
dom->deallocator((void *) ptr);
}
}
}
#include <assert.h>
#include <err.h>
#include <pthread.h>
#include <stdio.h>
#define N_READERS 1
#define N_WRITERS 1
#define N_ITERS 20
#define ARRAY_SIZE(x) sizeof(x) / sizeof(*x)
typedef struct {
unsigned int v1, v2, v3;
} config_t;
static config_t *shared_config;
static domain_t *config_dom;
config_t *create_config()
{
config_t *out = calloc(1, sizeof(config_t));
if (!out)
err(EXIT_FAILURE, "calloc");
return out;
}
void delete_config(config_t *conf)
{
assert(conf);
free(conf);
}
static void print_config(const char *name, const config_t *conf)
{
printf("%s : { 0x%08x, 0x%08x, 0x%08x }\n", name, conf->v1, conf->v2,
conf->v3);
}
void init()
{
shared_config = create_config();
config_dom = domain_new(delete_config);
if (!config_dom)
err(EXIT_FAILURE, "domain_new");
}
void deinit()
{
delete_config(shared_config);
domain_free(config_dom);
}
static void *reader_thread(void *arg)
{
(void) arg;
for (int i = 0; i < N_ITERS; ++i) {
config_t *safe_config =
(config_t *) load(config_dom, (uintptr_t *) &shared_config);
if (!safe_config)
err(EXIT_FAILURE, "load");
print_config("read config ", safe_config);
drop(config_dom, (uintptr_t) safe_config);
}
return NULL;
}
static void *writer_thread(void *arg)
{
(void) arg;
for (int i = 0; i < N_ITERS / 2; ++i) {
config_t *new_config = create_config();
new_config->v1 = rand();
new_config->v2 = rand();
new_config->v3 = rand();
print_config("updating config", new_config);
swap(config_dom, (uintptr_t *) &shared_config, (uintptr_t) new_config,
0);
print_config("updated config ", new_config);
}
return NULL;
}
int main()
{
pthread_t readers[N_READERS], writers[N_WRITERS];
init();
/* Start threads */
for (size_t i = 0; i < ARRAY_SIZE(readers); ++i) {
if (pthread_create(readers + i, NULL, reader_thread, NULL))
warn("pthread_create");
}
for (size_t i = 0; i < ARRAY_SIZE(writers); ++i) {
if (pthread_create(writers + i, NULL, writer_thread, NULL))
warn("pthread_create");
}
/* Wait for threads to finish */
for (size_t i = 0; i < ARRAY_SIZE(readers); ++i) {
if (pthread_join(readers[i], NULL))
warn("pthread_join");
}
for (size_t i = 0; i < ARRAY_SIZE(writers); ++i) {
if (pthread_join(writers[i], NULL))
warn("pthread_join");
}
deinit();
return EXIT_SUCCESS;
}
```
假設記憶體配置都會成功,且已知上述程式碼在執行時期不會遇到 assertion 或者 ThreadSanitizer 的錯誤。請補完程式碼,使得執行結果符合預期。作答規範:
* `EXP3` 和 `EXP4` 應以指定的程式碼風格書寫,保持最精簡的形式
* `EXP3` 和 `EXP4` 皆為表示式,且呼叫以 `list_` 開頭的函式
:::success
延伸問題:
1. 解釋上述程式碼運作原理,指出可改進之處並實作
2. 上述程式碼開啟多個 reader 時,無法通過 [ThreadSanitizer](https://clang.llvm.org/docs/ThreadSanitizer.html),請指出具體問題並克服
3. 比較 [RCU](https://hackmd.io/@sysprog/linux-rcu) 和 hazard pointer (HP),探討為何 Linux 不採用 HP 而是另外發展 RCU
:::