作答表單:
測驗 1
LeetCode 866. Prime Palindrome 要求判斷某數是否為 Prime Palindrome (回文質數,簡稱 Palprime),也就是既是質數又是迴文數的整數。在十進位中,最小的幾個回文質數如下: (A002385)
2, 3, 5, 7, 11, 101, 131, 151, 181, 191, 313, 353, 373, 383, 727, 757, 787, 797, 919, 929, 10301, 10501, 10601, 11311, …
17000000000000071
則是個 17 位數的回文質數。回文質數的特性:
- 所有個位質數皆是回文質數,即 2, 3, 5, 7
- 兩位質數中只有 一個回文質數,因其餘的回文數如 等均是 11 的倍數
- 三位數以上,回文質數的起始一定是 1, 3, 7, 9 其中一個,因質數的個位只可是上述四數字之一
- 三位數回文質數有 101, 131, 151, 181, 191, 313, 353, 373, 383, 727, 757, 787, 797, 919 和 929,共 15 個
- 沒有四位數回文質數,因所有四位回文數圴是 的倍數。其實在所有偶位數的回文數也都是 的倍數,即除 以外,也沒有其他偶數位 (Even Digits) 的回文質數
- 五位數回文質數有 84 個,最小的是 ,而最大的是
改寫第 3 週測驗題裡頭的整數開平方根實作,藉由 lookup table (LUT) 的手法降低分支的數量: (但這不必然較快)
已知在十進位,一個整數的每個數字的總和若能被 3 整除,這個數就是 3 的倍數,我們可搭配第 2 週測驗題提到的 is_divisible
來檢驗用字串表示的數值是否可被 3 整除:
以下實作利用 Sieve of Eratosthenes,嘗試找出第 1500 個回文質數,有效位數是 15:
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define N_DIGITS 15
#define LIMIT 1500
typedef unsigned long long ull;
static ull pow10[] = {
1,
10,
100,
1000,
10000,
100000,
1000000,
10000000,
100000000,
1000000000,
10000000000,
100000000000,
1000000000000,
10000000000000,
100000000000000,
1000000000000000,
10000000000000000,
};
static int midindex;
static uint8_t *psieve;
static ull half_max;
#define __ALIGN_MASK(x, mask) (((x) + (mask)) & ~(mask))
#define ALIGN(x, a) __ALIGN_MASK(x, (typeof(x))(a) -1)
enum {
PRIME = 0,
COMPOSITE = 1,
};
static inline ull fastpow10(const int n)
{
if (n < 0 || n > 16) {
free(psieve);
printf("n = %d\n", n);
exit(1);
}
return pow10[n];
}
static const uint8_t isqrt64_tab[192] = {
128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142,
143, 143, 144, 145, 146, 147, 148, 149, 150, 150, 151, 152, 153, 154, 155,
155, 156, 157, 158, 159, 159, 160, 161, 162, 163, 163, 164, 165, 166, 167,
167, 168, 169, 170, 170, 171, 172, 173, 173, 174, 175, 175, 176, 177, 178,
178, 179, 180, 181, 181, 182, 183, 183, 184, 185, 185, 186, 187, 187, 188,
189, 189, 190, 191, 191, 192, 193, 193, 194, 195, 195, 196, 197, 197, 198,
199, 199, 200, 201, 201, 202, 203, 203, 204, 204, 205, 206, 206, 207, 207,
208, 209, 209, 210, 211, 211, 212, 212, 213, 214, 214, 215, 215, 216, 217,
217, 218, 218, 219, 219, 220, 221, 221, 222, 222, 223, 223, 224, 225, 225,
226, 226, 227, 227, 228, 229, 229, 230, 230, 231, 231, 232, 232, 233, 234,
234, 235, 235, 236, 236, 237, 237, 238, 238, 239, 239, 240, 241, 241, 242,
242, 243, 243, 244, 244, 245, 245, 246, 246, 247, 247, 248, 248, 249, 249,
250, 250, 251, 251, 252, 252, 253, 253, 254, 254, 255, 255,
};
static ull isqrt(ull x)
{
if (x == 0)
return 0;
int lz = __builtin_clzll(x) & 62;
x <<= lz;
uint32_t y = isqrt64_tab[(x >> 56) - 64];
y = (y << 7) + (x >> 41) / y;
y = (y << 15) + (x >> 17) / y;
y -= x < (uint64_t) y * y;
return y >> (lz >> 1);
}
static void generate_sieve(int digits)
{
ull max = 0;
for (int count = 0; count < digits; ++count)
max = max * 10 + 9;
max = isqrt(max);
half_max = max >> 1;
int bytes = (max >> 1) + (max & 0x1);
bytes = (bytes >> 3) + (bytes & 0x1);
bytes = ALIGN(bytes, 16);
psieve = realloc(psieve, bytes);
if (!psieve) {
printf("realloc() failed!\n");
exit(1);
}
memset(psieve, 0, bytes);
psieve[0] |= COMPOSITE << (1 >> 1);
unsigned char mask = 0x7;
for (ull n = 3; n <= max; n += 2) {
if (((psieve[n >> 4] >> ((n >> 1) & mask)) & 0x1) == PRIME) {
for (ull mul = (n << 1); mul < max; mul += n) {
if (!(mul & 0x1))
continue;
psieve[mul >> 4] |= COMPOSITE
<< ((mul >> 1) & mask);
}
}
}
}
static char *ltoa(ull val, int *len)
{
static char ascbuf[32] = {0};
int i = 30;
for (; val && i; --i, val /= 10)
ascbuf[i] = "0123456789"[val % 10];
*len = 30 - i;
return &ascbuf[++i];
}
static bool is_divisible_by3(char *buf, int len)
{
int sum = 0;
while (--len >= 0)
sum += buf[len] - '0';
static uint32_t M3 = UINT32_C(0xFFFFFFFF) / 3 + 1;
return (sum * M3) <= (M3 - 1);
}
static bool isprime(const ull val)
{
if (!(val & 0x1))
return false;
ull *pquadbits = (ull *) psieve;
ull next = 3;
for (ull quad = ~*pquadbits & ~0b111, prev = 0; prev <= half_max;
quad = ~*++pquadbits) {
if (!quad) {
prev += 64;
continue;
}
while (quad) {
ull i = EXP1;
next = prev + i;
if (!(val % ((next << 1) + 1)))
return false;
quad &= EXP2;
}
prev += 64;
}
return true;
}
static ull getnextpalin(char *buf, const int *len)
{
midindex = (*len >> 1) - 1;
if ((*len & 0x1) == 0x1) {
if (buf[midindex + 1] - '0' != 9) {
buf[midindex + 1] += 1;
return atol(buf);
}
buf[midindex + 1] = '0';
}
for (; midindex >= 0; --midindex) {
if (buf[midindex] - '0' != 9) {
buf[midindex] += 1;
buf[*len - midindex - 1] += 1;
return atol(buf);
}
buf[midindex] = '0';
buf[*len - midindex - 1] = '0';
}
return atol(buf) + 2;
}
int main()
{
int count = LIMIT;
ull i = 100000000000001;
int len = 0;
char *buf = ltoa(i, &len);
if (len < N_DIGITS) {
printf("len: %d\n", len);
exit(1);
}
generate_sieve(N_DIGITS);
while (1) {
if (!(len & 0x1)) {
i = 1ULL | fastpow10(len);
buf = ltoa(i, &len);
continue;
}
if ((buf[len - 1] != '5') && !is_divisible_by3(buf, len)) {
if (isprime(i) && (--count == 0)) {
printf("%s\n", buf);
return 0;
}
}
int oldlen = len;
i = getnextpalin(buf, &len);
if (oldlen != len)
buf = ltoa(i, &len);
}
free(psieve);
return 0;
}
預期執行結果為 100191191191001
請補完程式碼,得以符合預期。書寫規範:
EXP1
和 EXP2
均為不包含 ,
和 ;
的表達式,請用符合 C99 規範和程式碼排版原則的最精簡形式作答
EXP1
必須使用到 __builtin_clz
, __builtin_clzll
, __builtin_ctz
, 和 __builtin_ctzll
其一 GCC 內建函式
EXP2
只能出現一組小括號 (即 (
和 )
),搭配使用 bitwise 運算子
延伸閱讀:
延伸問題:
- 解釋上述程式碼運作原理,指出可改進之處並實作
是否有必要先將數值轉成字串?用十進位的角度處理運算是否產生額外的計算負擔?
- Linux 核心原始程式碼 lib/math/prime_numbers.c 有類似的程式碼,請解釋其作用、裡頭 RCU 的使用情境,及針對執行時間的改進
測驗 2
在並行程式設計中,當我們在存取共用的記憶體物件時,需要考慮到其他執行緒是否有可能也正在存取同一個物件,若要釋放該記憶體物件時,不考慮這個問題,會引發嚴重的後果,例如 dangling pointer。
使用 mutex 是最簡單且直觀的方法:存取共用記憶體時,acquire lock 即可保證沒有其他執行緒正在存取同一物件,也就可安全地釋放記憶體。但若我們正在存取的是一種 lock-free 資料結構,當然就不能恣意地使用 lock,因為會違反 lock-free 特性,即無論任何執行緒失敗,其他執行緒都能可繼續執行。於是乎,我們需要某種同為 lock-free 的記憶體物件回收機制。
對於 C 這樣缺乏內建 concurrent GC 機制的程式語言來說,若要實作 lock-free 演算法,就要自行處理記憶體釋放的議題。Hazard pointer 是其中一種解決方案,其原理是讀取端執行緒對指標進行識別,指標 (特別是指向的記憶體區塊) 若要釋放時,會事先保存,延遲到確認沒有讀取端執行緒,才進行真正的釋放。Linux 核心的 RCU 同步機制是另一種 lock-free 程式設計演算法和記憶體回收機制。
"hazard" 一詞多用來指「危險物、危害物」,與 "danger" 的主要區別:
- hazard 是可數名詞
- "hazard" 不是立刻會發生的危險,而是可能會帶來危險的「隱患」,例如 a health hazard (一個健康隱患) 和 a safety hazard (一個[行為或功能的]安全隱患)
〈Lock-Free Data Structures with Hazard Pointers〉寫道:
Each reader thread owns a single-writer/multi-reader shared pointer called Hazard pointer. When a reader thread assigns the address of a map to its hazard pointer, it is basically announcing to other threads (writers), "I am reading this map. You can replace it if you want, but don’t change its contents and certainly keep your deleteing hands off it."
Hazard pointer 可簡稱為 "HP",其關鍵的結構有:
- Hazard pointer
- retire list (也稱為 thread free list)

hazard pointer 是一種可以解決 lock-free data 在動態記憶體配置的問題之方法。其基本的概念是允許 hazard pointer 可以被一個 thread 寫而多個 thread 讀,當存在 reader thread 正在操作該 hazard pointer 時,hazard pointer 會被標註,於是 writer thread 可得知此訊息並延遲對其釋放。
在 hazard pointer 架構中,每個 thread 首先需要各自維護的兩個關鍵集合是:
- hazard pointer: 儲存此 thread 正在存取的指標,因此該指標不能直接被釋放
- retire list: 被這個 thread 要求釋放的指標,但實際尚未釋放
因此要安全的釋放記憶體,其基本想法就是:
- 每個執行緒放在 hazard pointer 中的指標尚不能被釋放
- 每個執行緒要求釋放的指標先放在 retire list 中
- 掃描 retire list 可以得知所有執行緒皆不使用的指標,則可將其真正的釋放給作業系統
〈Lock-Free Data Structures with Hazard Pointers〉提到,lock-free programing 為了保證持續進展的特性,每個執行緒都有機會在任意時間去操作一個 object。當 thread A 在釋放一個物件時,需確保沒有另一個 thread B 仍取得該物件的 reference 且正要存取。如果先行釋放此 object,另一個 thread 的存取就會出錯。以下是經典的解決方式:
- Reference counting: 基本想法是每當一個 thread 取得 reference 就將一個計數(其地址不同於 pointer)加一,解除 reference 時則減一。但是實際上其正確性要確保取得 pointer 和 reference count 加一是一個 atomic operation,需要使用特殊的 DCAS (Double Compare And-Swap) 指令,但其效率不彰
- Wait and delete: 就是 「等一段時間」 再刪除,但等待時間未知
- Keep a reference count next to the pointer: 使用比起 DCAS 較合理且效率比較好的 CAS2,可以 atomic 的交換 memory 中的兩個相鄰的 words,不過 64 位元的處理器上一般不存在這種指令(但可以透過一些操作指標的技巧達到同等效果)
第三個方案看似可行,但實際上仍有缺陷,因為 CAS 語意要求 object 和 reference count 要同時一致才能替換上新的數值,write 操作需要等待到沒有 read 操作 (reference count) 才能進行。換句話說,在一個 read 操作結束前如果又有下一個 read 進來,write 就只能空等待。
為克服上述方法的問題,hazard pointer 是一個更佳方案 (尤其針對 Write-Rarely-Read-Many 的情境)。內文中以 lock-free 的 map 為案例:
當 WRRMMap
需要被更新時,writer thread 會去取得一個 *pMap_
指向 map 的副本並更改,然後再將 *pMap_
指向新的副本,並且回收舊的。麻煩的問題是這個舊的 map 可能有其他 thread 正嘗試讀取。
而 hazard pointer 的概念就是讓 reader 將正在存取的 map 加入到自己的(single-writer-multiple-reader)的 hazard pointer 中,其目的等同向其他 writer 宣告禁止回收該 map 。當 writer 把舊的 map 替換下來,map 先被放在一個 thread 獨立的 list 而不先釋放 ,直到 list 中的舊 map 達到一個上限,再去掃描每個 reader 的 hazard pointer 中是否有相匹配。如果某個舊 map 不與任何 reader 的 hazard pointer 匹配,那麼釋放該 map 就是安全的,否則就繼續將其留在 list 中,直到下次掃描。
hazard pointer 的基本結構由 linked list 構成:
*pNext
指向下個節點
active_
表示該節點被使用與否
pHead_
是 linked list 的首個節點
listLen_
則是 list 中的節點數量
pHazard_
是物件的指標本體(例如 map)
要將一個指標加入 hazard pointer 需要 acquire 一個可用的節點,首先先看看有沒有已經被建立的空節點(沒有對應要保護的 pHazard
實體)可以直接使用。如果沒有,則需要建立新的 node 插入到 hazard pointer linked list 的開頭,listLen_
也要相應的做加一
要解除對指標的保護則透過 release,單純的將該節點的 pHazard_
重置且 active_
允許重用即可。
每個 thread 還有一個獨立存取的 retire list,存放該 thread 不再需要,若其它thread 也不再需要就可以被釋放 pointer。 retire list 因為只有每個 thread 自己可以存取的,因此不需要被同步。
Retire
操作將要原本應該要釋放的 pointer 加入 vector 中,先儲存起來延遲釋放。當 vector 所放的 pointer 超過一個大小就呼叫 Scan
去掃描是否有可以真正釋放的 pointer。
Scan
的任務是找出 retire list 集合和所有 thread 的 hazard pointer 集合的差集,那些 pointer 就是可以被釋放而且不會導致錯誤的。
以下是 Hazrd pointer 的簡化 C11 實作,搭配 GNU extension:
#define atomic_load(src) __atomic_load_n(src, __ATOMIC_SEQ_CST)
#define atomic_store(dst, val) __atomic_store(dst, val, __ATOMIC_SEQ_CST)
#define atomic_exchange(ptr, val) \
__atomic_exchange_n(ptr, val, __ATOMIC_SEQ_CST)
#define atomic_cas(dst, expected, desired) \
__atomic_compare_exchange(dst, expected, desired, 0, __ATOMIC_SEQ_CST, \
__ATOMIC_SEQ_CST)
#include <stdint.h>
#define LIST_ITER(head, node) \
for (node = atomic_load(head); node; node = atomic_load(&node->next))
typedef struct __hp {
uintptr_t ptr;
struct __hp *next;
} hp_t;
#include <stdbool.h>
#include <stdlib.h>
#include <string.h>
static hp_t *list_append(hp_t **head, uintptr_t ptr)
{
hp_t *new = calloc(1, sizeof(hp_t));
if (!new)
return NULL;
new->ptr = ptr;
hp_t *old = atomic_load(head);
do {
new->next = old;
} while (!atomic_cas(head, &old, &new));
return new;
}
hp_t *list_insert_or_append(hp_t **head, uintptr_t ptr)
{
hp_t *node;
bool need_alloc = true;
LIST_ITER (head, node) {
uintptr_t expected = atomic_load(&node->ptr);
if (expected == 0 && atomic_cas(&node->ptr, &expected, &ptr)) {
need_alloc = false;
break;
}
}
if (need_alloc)
node = list_append(head, ptr);
return node;
}
bool list_remove(hp_t **head, uintptr_t ptr)
{
hp_t *node;
const uintptr_t nullptr = 0;
LIST_ITER (head, node) {
uintptr_t expected = atomic_load(&node->ptr);
if (expected == ptr && atomic_cas(&node->ptr, &expected, &nullptr))
return true;
}
return false;
}
bool list_contains(hp_t **head, uintptr_t ptr)
{
hp_t *node;
LIST_ITER (head, node) {
if (atomic_load(&node->ptr) == ptr)
return true;
}
return false;
}
void list_free(hp_t **head)
{
hp_t *cur = *head;
while (cur) {
hp_t *old = cur;
cur = cur->next;
free(old);
}
}
#define DEFER_DEALLOC 1
typedef struct {
hp_t *pointers;
hp_t *retired;
void (*deallocator)(void *);
} domain_t;
domain_t *domain_new(void (*deallocator)(void *))
{
domain_t *dom = calloc(1, sizeof(domain_t));
if (!dom)
return NULL;
dom->deallocator = deallocator;
return dom;
}
void domain_free(domain_t *dom)
{
if (!dom)
return;
if (dom->pointers)
list_free(&dom->pointers);
if (dom->retired)
list_free(&dom->retired);
free(dom);
}
uintptr_t load(domain_t *dom, const uintptr_t *prot_ptr)
{
const uintptr_t nullptr = 0;
while (1) {
uintptr_t val = atomic_load(prot_ptr);
hp_t *node = list_insert_or_append(&dom->pointers, val);
if (!node)
return 0;
if (atomic_load(prot_ptr) == val)
return val;
uintptr_t tmp = val;
if (!atomic_cas(&node->ptr, &tmp, &nullptr))
list_remove(&dom->pointers, val);
}
}
void drop(domain_t *dom, uintptr_t safe_val)
{
if (!list_remove(&dom->pointers, safe_val))
__builtin_unreachable();
}
static void cleanup_ptr(domain_t *dom, uintptr_t ptr, int flags)
{
if (!list_contains(&dom->pointers, ptr)) {
dom->deallocator((void *) ptr);
} else if (flags & DEFER_DEALLOC) {
list_insert_or_append(&dom->retired, ptr);
} else {
while (list_contains(&dom->pointers, ptr))
;
dom->deallocator((void *) ptr);
}
}
void swap(domain_t *dom, uintptr_t *prot_ptr, uintptr_t new_val, int flags)
{
const uintptr_t old_obj = atomic_exchange(prot_ptr, new_val);
cleanup_ptr(dom, old_obj, flags);
}
void cleanup(domain_t *dom, int flags)
{
hp_t *node;
LIST_ITER (&dom->retired, node) {
uintptr_t ptr = node->ptr;
if (!ptr)
continue;
if (!list_contains(&dom->pointers, ptr)) {
if (EXP3)
dom->deallocator((void *) ptr);
} else if (!(flags & DEFER_DEALLOC)) {
while (list_contains(&dom->pointers, ptr))
;
if (EXP4)
dom->deallocator((void *) ptr);
}
}
}
#include <assert.h>
#include <err.h>
#include <pthread.h>
#include <stdio.h>
#define N_READERS 1
#define N_WRITERS 1
#define N_ITERS 20
#define ARRAY_SIZE(x) sizeof(x) / sizeof(*x)
typedef struct {
unsigned int v1, v2, v3;
} config_t;
static config_t *shared_config;
static domain_t *config_dom;
config_t *create_config()
{
config_t *out = calloc(1, sizeof(config_t));
if (!out)
err(EXIT_FAILURE, "calloc");
return out;
}
void delete_config(config_t *conf)
{
assert(conf);
free(conf);
}
static void print_config(const char *name, const config_t *conf)
{
printf("%s : { 0x%08x, 0x%08x, 0x%08x }\n", name, conf->v1, conf->v2,
conf->v3);
}
void init()
{
shared_config = create_config();
config_dom = domain_new(delete_config);
if (!config_dom)
err(EXIT_FAILURE, "domain_new");
}
void deinit()
{
delete_config(shared_config);
domain_free(config_dom);
}
static void *reader_thread(void *arg)
{
(void) arg;
for (int i = 0; i < N_ITERS; ++i) {
config_t *safe_config =
(config_t *) load(config_dom, (uintptr_t *) &shared_config);
if (!safe_config)
err(EXIT_FAILURE, "load");
print_config("read config ", safe_config);
drop(config_dom, (uintptr_t) safe_config);
}
return NULL;
}
static void *writer_thread(void *arg)
{
(void) arg;
for (int i = 0; i < N_ITERS / 2; ++i) {
config_t *new_config = create_config();
new_config->v1 = rand();
new_config->v2 = rand();
new_config->v3 = rand();
print_config("updating config", new_config);
swap(config_dom, (uintptr_t *) &shared_config, (uintptr_t) new_config,
0);
print_config("updated config ", new_config);
}
return NULL;
}
int main()
{
pthread_t readers[N_READERS], writers[N_WRITERS];
init();
for (size_t i = 0; i < ARRAY_SIZE(readers); ++i) {
if (pthread_create(readers + i, NULL, reader_thread, NULL))
warn("pthread_create");
}
for (size_t i = 0; i < ARRAY_SIZE(writers); ++i) {
if (pthread_create(writers + i, NULL, writer_thread, NULL))
warn("pthread_create");
}
for (size_t i = 0; i < ARRAY_SIZE(readers); ++i) {
if (pthread_join(readers[i], NULL))
warn("pthread_join");
}
for (size_t i = 0; i < ARRAY_SIZE(writers); ++i) {
if (pthread_join(writers[i], NULL))
warn("pthread_join");
}
deinit();
return EXIT_SUCCESS;
}
假設記憶體配置都會成功,且已知上述程式碼在執行時期不會遇到 assertion 或者 ThreadSanitizer 的錯誤。請補完程式碼,使得執行結果符合預期。作答規範:
EXP3
和 EXP4
應以指定的程式碼風格書寫,保持最精簡的形式
EXP3
和 EXP4
皆為表示式,且呼叫以 list_
開頭的函式
延伸問題:
- 解釋上述程式碼運作原理,指出可改進之處並實作
- 上述程式碼開啟多個 reader 時,無法通過 ThreadSanitizer,請指出具體問題並克服
- 比較 RCU 和 hazard pointer (HP),探討為何 Linux 不採用 HP 而是另外發展 RCU