contributed by < hankluo6
>
1
static ull isqrt(ull x)
{
if (x == 0) return 0;
int lz = __builtin_clzll(x) & 62;
x <<= lz;
uint32_t y = isqrt64_tab[(x >> 56) - 64];
y = (y << 7) + (x >> 41) / y;
y = (y << 15) + (x >> 17) / y;
y -= x < (uint64_t) y * y;
return y >> (lz >> 1);
}
程式碼等價於下方 python 程式:
def isqrt(n):
"""
Return the integer part of the square root of the input.
"""
n = operator.index(n)
if n < 0:
raise ValueError("isqrt() argument must be nonnegative")
if n == 0:
return 0
c = (n.bit_length() - 1) // 2
a = 1
d = 0
for s in reversed(range(c.bit_length())):
# Loop invariant: (a-1)**2 < (n >> 2*(c - d)) < (a+1)**2
e = d
d = c >> s
a = (a << d - e - 1) + (n >> 2*c - e - d + 1) // a
return a - (a*a > n)
概念上是利用牛頓法找出 n >> s
的 square root,其中 s
為一個偶數,迴圈內會持續減少 s
的值,直到 s
等於 0 時的答案即為
而表格提供一個近似
static void generate_sieve(int digits)
{
ull max = 0;
for (int count = 0; count < digits; ++count)
max = max * 10 + 9;
max = isqrt(max);
half_max = max >> 1;
/* We need half the space as multiples of 2 can be omitted */
int bytes = (max >> 1) + (max & 0x1);
/* Calculate the actual number of bytes required */
bytes = (bytes >> 3) + (bytes & 0x1);
bytes = ALIGN(bytes, 16); /* Align-up to 16-byte */
psieve = realloc(psieve, bytes);
if (!psieve) {
printf("realloc() failed!\n");
exit(1);
}
memset(psieve, 0, bytes);
/* In psieve bit 0 -> 1, 1 -> 3, 2 -> 5, 3 -> 7 and so on... */
/* Set the 0th bit representing 1 to COMPOSITE
*/
psieve[0] |= COMPOSITE << (1 >> 1);
unsigned char mask = 0x7;
for (ull n = 3; n <= max; n += 2) {
if (((psieve[n >> 4] >> ((n >> 1) & mask)) & 0x1) == PRIME) {
for (ull mul = (n << 1); mul < max; mul += n) {
/* Skip the evens: there is no representation in psieve */
if (!(mul & 0x1))
continue;
/* Set offset of mul in psieve */
psieve[mul >> 4] |= COMPOSITE
<< ((mul >> 1) & mask); /* bit offset */
}
}
}
}
psieve
中的每個 bit 都代表一個數字,且不包含偶數。故 max
為所有可能出現的數字數量,而 bytes
為總共需要的 byte 數量,因為每個 byte 可表示 8 個數字 (bits),所以 bytes
需要位移。
根據 Sieve of Eratosthenes 演算法將每個合數位置的 bit 設置為 COMPOSITE
。psieve
陣列型態為 uint8_t
,所以每個陣列元素可以存 8 個數字,(mul >> 1) & mask
即為該數字在陣列中的位置。
/* Check if a number is prime. */
static bool isprime(const ull val)
{
if (!(val & 0x1)) /* Test for divisibility by 2 */
return false;
ull *pquadbits = (ull *) psieve;
ull next = 3; /* start at 7 (i.e. 3 * 2 + 1) */
for (ull quad = ~*pquadbits & ~0b111, prev = 0; prev <= half_max;
quad = ~*++pquadbits) {
if (!quad) {
prev += 64;
continue;
}
while (quad) {
ull i = __builtin_ctzll(quad);
next = prev + i;
if (!(val % ((next << 1) + 1)))
return false;
quad &= ~(1ULL << i);
}
prev += 64;
}
return true;
}
is_prime
檢查在 val
能否被質數表中的任一數整除。quad
為 psieve
陣列中的某個元素取反,此時內容為 1 的位元對應的數字即為質數,透過 __builtin_ctzll
找出該位元的位置,並透過 (next << 1) + 1
便能得到實際的數字。每檢查完一個位元後,會將該位元設置為 0,便能在檢查所有 sizeof(ull)
位元後,跳出迴圈,並將 base address prev
往後移動,繼續檢查下個元素。
static ull getnextpalin(char *buf, const int *len)
{
midindex = (*len >> 1) - 1;
/* Handle the case of odd number of digits.
* If the central digit is 9, reset it to 0 and process adjacent digit,
* otherwise increment it and return.
*/
if ((*len & 0x1) == 0x1) {
if (buf[midindex + 1] - '0' != 9) {
buf[midindex + 1] += 1;
return atol(buf);
}
buf[midindex + 1] = '0';
}
/* Adjust other digits */
for (; midindex >= 0; --midindex) {
if (buf[midindex] - '0' != 9) {
buf[midindex] += 1;
buf[*len - midindex - 1] += 1;
return atol(buf);
}
buf[midindex] = '0';
buf[*len - midindex - 1] = '0';
}
/* If we have exhausted numbers in *len digits, increase the number of
* digits and return the first palindrome of the form 10..0..01
*/
/* The last palin will always be of the form 99...99 */
return atol(buf) + 2;
}
getnextpalin
將長度分成偶數跟奇數討論,每次逐步增加最靠近中間的位置的數字,當數字為 9 時,則增加其兩側數字。而當該長度所有數字都跑過後,需要將 buf
擴充,並增加 len
,原本程式是將 buf
內的最高值 (99…99) + 2,但此時 buf
已經被重置為 00…00,回傳值會變成 2 而不是正確的數字。
故需作以下更改:
static ull getnextpalin(char *buf, const int *len)
{
...
ull ret = 1;
for (int i = 0; i < *len; ++i) {
ret *= 10;
}
/* If we have exhausted numbers in *len digits, increase the number of
* digits and return the first palindrome of the form 10..0..01
*/
*len += 1;
/* The last palin will always be of the form 99...99 */
return ret + 1;
}
轉成字串的好處在於可以快速的取得下一個迴文數值,但可以不需要將數值轉成字串再來計算,觀察數值有
而當
2
typedef struct {
unsigned int v1, v2, v3;
} config_t;
static config_t *shared_config;
同時被 reader 及 writer 存取的共用資源。
typedef struct {
hp_t *pointers;
hp_t *retired;
void (*deallocator)(void *);
} domain_t;
static domain_t *config_dom;
config_dom
為主要管理 hazard pointer 的結構。
reader thread 在讀取共用資源時,會建立一個 hazard pointer 指向此共用資源,而這些 hazard pointer 以 linked list 的方式存在 domain_t
內的 pointers
中。
writer thread 要釋放共用資源的空間時,如果該空間還有 reader 正在讀取,則將此 reader 對應的 hazard pointer 放到 retired
linked list 當中,等待讀取完後再釋放空間。
deallocator
用來將共用資源的空間釋放,此例中為釋放 config_t
的函式。
typedef struct __hp {
uintptr_t ptr;
struct __hp *next;
} hp_t;
以 struct hp_t
表示 hazard pointer,為 singly linked list,因為 hazard pointer 可能同時被多個 reader 及 writer 存取,所以存取 list 的函式需使用 atomic operator。
/*
* Load a safe pointer to a shared object. This pointer must be passed to
* `drop` once it is no longer needed. Returns 0 (NULL) on error.
*/
uintptr_t load(domain_t *dom, const uintptr_t *prot_ptr)
{
const uintptr_t nullptr = 0;
while (1) {
uintptr_t val = atomic_load(prot_ptr);
hp_t *node = list_insert_or_append(&dom->pointers, val);
if (!node)
return 0;
/* Hazard pointer inserted successfully */
if (atomic_load(prot_ptr) == val)
return val;
/*
* This pointer is being retired by another thread - remove this hazard
* pointer and try again. We first try to remove the hazard pointer we
* just used. If someone else used it to drop the same pointer, we walk
* the list.
*/
uintptr_t tmp = val;
if (!atomic_cas(&node->ptr, &tmp, &nullptr))
list_remove(&dom->pointers, val);
}
}
/*
* Drop a safe pointer to a shared object. This pointer (`safe_val`) must have
* come from `load`
*/
void drop(domain_t *dom, uintptr_t safe_val)
{
if (!list_remove(&dom->pointers, safe_val))
__builtin_unreachable();
}
static void *reader_thread(void *arg)
{
(void) arg;
for (int i = 0; i < N_ITERS; ++i) {
config_t *safe_config =
(config_t *) load(config_dom, (uintptr_t *) &shared_config);
if (!safe_config)
err(EXIT_FAILURE, "load");
print_config("read config ", safe_config);
drop(config_dom, (uintptr_t) safe_config);
}
return NULL;
}
reader thread 會透過 load
讀取 shared_config
,並在讀取完後呼叫 drop
。
load
嘗試在 dom->pointer
這個 list 上找尋沒被使用的 node 當作 hazard pointer,如果在 list 上找不到則分配一個新的 node 當作 hazard pointer。hazard pointer 指向的值即為 reader 要讀取的記憶體位置。而在建立 hazard pointer 的過程中,該共用記憶體位置的值可能被更改,故 load
內的 atomic_cas
為記憶體被更改時,將剛剛建立的 hazard pointer 內的值清空,並重新分配新的 hazard pointer。
drop
會在 reader 讀取完資料後,將 hazard pointer 的值設定成 NULL
,表示該 pointer 可再被其他 reader 使用。
static void cleanup_ptr(domain_t *dom, uintptr_t ptr, int flags)
{
if (!list_contains(&dom->pointers, ptr)) { /* deallocate straight away */
dom->deallocator((void *) ptr);
} else if (flags & DEFER_DEALLOC) { /* Defer deallocation for later */
list_insert_or_append(&dom->retired, ptr);
} else { /* Spin until all readers are done, then deallocate */
while (list_contains(&dom->pointers, ptr))
;
dom->deallocator((void *) ptr);
}
}
/* Swaps the contents of a shared pointer with a new pointer. The old value will
* be deallocated by calling the `deallocator` function for the domain, provided
* when `domain_new` was called. If `flags` is 0, this function will wait
* until no more references to the old object are held in order to deallocate
* it. If flags is `DEFER_DEALLOC`, the old object will only be deallocated
* if there are already no references to it; otherwise the cleanup will be done
* the next time `cleanup` is called.
*/
void swap(domain_t *dom, uintptr_t *prot_ptr, uintptr_t new_val, int flags)
{
const uintptr_t old_obj = atomic_exchange(prot_ptr, new_val);
cleanup_ptr(dom, old_obj, flags);
}
static void *writer_thread(void *arg)
{
(void) arg;
for (int i = 0; i < N_ITERS / 2; ++i) {
config_t *new_config = create_config();
new_config->v1 = rand();
new_config->v2 = rand();
new_config->v3 = rand();
print_config("updating config", new_config);
swap(config_dom, (uintptr_t *) &shared_config, (uintptr_t) new_config,
0);
print_config("updated config ", new_config);
}
return NULL;
}
writer thread 建立一個新的 config,並將其透過 swap
與先前的 config 交換。
swap
利用 atomic_exchange
將新舊 config 交換,並呼叫 cleanup_ptr
嘗試釋放舊的 config。
cleanup_ptr
會判斷現在是否還有 reader 的 hazard pointer 正在指向此記憶體,如果沒有則可以直接呼叫 deallocator
釋放該空間。而如果還有 reader 在使用,則根據 flag
的值分成兩種方法:
flag & DEFER_DEALLOC
: 將要釋放的空間放到 retire list 中,等待最後被釋放。flag & DEFER_DEALLOC == 0
: busy wating 等待所有 reader 都讀取完該空間,在釋放。在 macOS Monterey 12.2 執行程式時需加上環境變數 MallocNanoZone=0
。
reference
WARNING: ThreadSanitizer: data race (pid=3836)
Read of size 4 at 0x000106d00b90 by thread T12:
#0 print_config hazard.c:264 (hazard:arm64+0x100003bfc)
#1 writer_thread hazard.c:313 (hazard:arm64+0x100003ba8)
Previous write of size 4 at 0x000106d00b90 by thread T13:
#0 writer_thread hazard.c:306 (hazard:arm64+0x100003b48)
Location is heap block of size 12 at 0x000106d00b90 allocated by thread T13:
#0 calloc <null>:75478556 (libclang_rt.tsan_osx_dynamic.dylib:arm64e+0x57a20)
#1 create_config hazard.c:250 (hazard:arm64+0x10000374c)
#2 writer_thread hazard.c:305 (hazard:arm64+0x100003b34)
Thread T12 (tid=133769, running) created by main thread at:
#0 pthread_create <null>:75478556 (libclang_rt.tsan_osx_dynamic.dylib:arm64e+0x2cda0)
#1 main hazard.c:331 (hazard:arm64+0x100003954)
Thread T13 (tid=133770, running) created by main thread at:
#0 pthread_create <null>:75478556 (libclang_rt.tsan_osx_dynamic.dylib:arm64e+0x2cda0)
#1 main hazard.c:331 (hazard:arm64+0x100003954)
SUMMARY: ThreadSanitizer: data race hazard.c:264 in print_config
可以看到問題出在 print_config
,從 ThreadSanitizer 輸出中可看到 thread 13 分配的 new_config
與 thread 12 的 print_config
有 data race。但 print_config
印的是自己執行緒上新分配的 new_config
,與其他執行緒的 new_config
是不會產生 data race 的。
可以推測 thread 12 分配的 new_config
在 thread 12 呼叫第二次 print_config
之前,已經被其他執行緒釋放,而 thread 13 新分配的 new_config
剛好使用到 thread 12 的 new_config
被釋放的空間,產生 data race。
故必須確保在第二次 print_config
之前,new_config
不能被釋放。
static void *writer_thread(void *arg)
{
...
for (int i = 0; i < N_ITERS / 2; ++i) {
config_t *new_config = create_config();
config_t *copy = create_config();
new_config->v1 = rand();
new_config->v2 = rand();
new_config->v3 = rand();
memcpy(copy, new_config, sizeof(config_t));
print_config("updating config", copy);
swap(config_dom, (uintptr_t *) &shared_config, (uintptr_t) new_config,
0);
print_config("updated config ", copy);
free(copy);
}
...
}
一個簡單的方法為建立副本用來讀取,便能防止 data race,但當共用的資料大小很大時,此方法會降低效能。而另一個方法為一樣使用 hazard pointer 保護 new_config
。但不論哪種方法似乎都不太適合,因為 writer thread 本身功能應為寫入資料,而非讀取資料。
後續的實作: hp_list,若你想到改進方案,請提交 pull request
linux2022