owned this note
owned this note
Published
Linked with GitHub
# 5/15 一對一討論
## Q1
計算 e^x,不用 FPU,x >=1,x 是單精度浮點數
泰勒展開式:
$e^x = 1+x+\frac{x^2}{2!}+\frac{x^3}{3!}+...+\frac{x^n}{n!}+R_n$
一般將浮點數轉換成定點數會將浮點數乘以 scaling factor,而這使用到 FPU。 因此若要將浮點數轉換成定點數並且不使用 FPU,需要把浮點數拆成三段處理,然後換成定點數表達。
同理轉換回來。
注意位元運算,如果將有號整數位移到了符號位,屬於未定義行為,會造成錯誤。這邊因為運算過程保證答案會大於 0,因此選用 unsigned int。
```c
#include <math.h>
#include <stdint.h>
#include <stdio.h>
#include <string.h>
typedef int32_t fix16_t;
#define FIX_ONE 0x00010000
#define FIX_MAX 0xFFFFFFFF
static const int mask[] = { 0, 8, 12, 14 };
static const int magic[] = { 2, 1, 0, 0 };
unsigned clz32(uint32_t x, int c)
{
if (!x && !c)
return 32;
uint32_t upper = x >> (16 >> c);
uint32_t lower = x & (0xFFFFu >> mask[c]);
if (c == 3)
return upper ? magic[upper]
: 2 + magic[lower];
return upper ? clz32(upper, c + 1)
: (16 >> c) + clz32(lower, c + 1);
}
fix16_t int_to_fix(uint32_t x) { return x * FIX_ONE; }
uint32_t fix_to_int(fix16_t x) { return (uint32_t)x >> 16; }
float fix_to_float(uint32_t x) {
if(!x) return 0.0f;
uint32_t bits = 0;
int32_t lz = clz32(x, 0);
int32_t exp = 15 - lz;
uint32_t mant;
if (exp >= 0) {
x >>= exp;
} else {
x <<= -1*exp;
}
mant = x & 0x0000FFFF;
bits = (uint32_t)(exp + 127) << 23 | (mant << 7);
float f;
memcpy(&f, &bits, sizeof(uint32_t));
return f;
}
fix16_t fix_mul(fix16_t x, fix16_t y) {
uint64_t res = (uint64_t)x * y;
return (fix16_t)(res >> 16);
}
fix16_t fix_div(fix16_t x, fix16_t y) {
if (y == 0)
return 0;
fix16_t result = (((uint64_t)x) << 16) / (uint64_t)y;
return result;
}
uint32_t my_expf(float in) {
/* cast float type to 32 bit unsigned integer type */
uint32_t x;
memcpy(&x, &in, sizeof(in));
uint32_t sign = x >> 31;
uint32_t exponent = (x >> 23) & 0x000000FF;
uint32_t mantissa = (x << 9) >> 9;
mantissa = mantissa | (1u << 23);
uint32_t e = exponent - 127;
if(exponent == 0 && mantissa == 0 && sign == 0) return 1 * FIX_ONE;
if(exponent == 127 && mantissa == 0 && sign ==0) return 2 * FIX_ONE;
if(exponent == 130 && mantissa & 0x00400000 || exponent >= 131) return 0;
uint32_t decimal_point = 23;
for (int j = 0; j < 23 - e; j++) {
if ((mantissa & 0x00000001) == 0)
mantissa = mantissa >> 1;
else
break;
decimal_point--;
}
uint32_t i = mantissa;
fix16_t fix_x;
if (decimal_point >= 16)
fix_x = i >> (decimal_point - 16 - e);
else
fix_x = i << (16 - decimal_point + e);
/* tylor expansion of e^x */
fix16_t result, term;
result = FIX_ONE + fix_x;
term = fix_x;
for (int k = 2; k <= 30; k++) {
term = fix_mul(term, fix_div(fix_x, int_to_fix(k)));
if (term < 500)
break;
result += term;
}
return result;
}
int main() {
float x;
scanf("%f", &x); // x >= 1
float result = fix_to_float(my_expf(x));
printf("%f %f\n", result, expf(x));
}
```
輸出結果:
```
x: 2.5
12.178955 12.182494
```
[參考 3/18 問答簡記](https://hackmd.io/EFs_Nfs6TOmIA5ldA_tZsQ?stext=10428%3A12%3A0%3A1749655840%3A2VN_CY&view=)
## Q2
Linux 核心不傾向使用 FPU 的原因?
第一個原因是 FPU 的 context 無法自動保存,需要手動,因此非常不方便。第二個原因:
> Linux kernel not have seamless support for floating point because it can not trap itself.
在 user space,kernel 可以去抓捕 FPE 的發生,但在 Linux kernel 自己本身卻沒辦法,因此一旦發生 floating point exception 就會造成系統 panic
有五種情況會觸發 FPE:
* Invalid operation: involving NaN, sqrt(-1)
* Division by zero. The result is signed Inf.
* Overflow: when rounding and converting
* Underflow
* Inexact
## Q3
浮點數轉換成定點數該選擇何種 Q notation? Q23.8 的選用
### 定點數
定點數是一種數值表達方式,將固定的小數位數儲存在 fractional 部份
本質上定點數就是將整數乘以一個 scaling factor 後的表示:
1. $1.23$ 以 $\frac{1}{1000}$ 作為 scaling factor 的整數表示為 $1230$
2. $1230000$ 以 $1000$ 作為 scaling factor 的整數表示為 $1230$,表示最小有效位數後還隱含 3 位數
scaling factor 可以設定的比位數還多,比如對於只有 8 位元的整數,取 $2^{15}$ 依然合法,只是要從定點數轉換為整數時,相同位數的整數會放不下如此大的數值,必須用更多位數的整數才放的下。
定點數加減法跟整數一樣,而乘除法則需要將結果除/乘一次 scaling factor。因此 scaling factor 通常會選擇 2 的冪次,這樣就能以位移取代 scaling factor 的乘除:
```c
static inline fix16_t fix16_mul(fix16_t x, fix16_t y)
{
int64_t res = (int64_t) x * y;
return (fix16_t) (res >> 16);
}
static inline fix16_t fix16_div(fix16_t a, fix16_t b)
{
if (b == 0) /* Avoid division by zero */
return 0;
fix16_t result = (((int64_t) a) << 16) / ((int64_t) b);
return result;
}
```
仔細看定點數除法的細節,假如直接做 `a/b` 然後再乘一次 scaling factor,如果 a 小於 b 就會造成數值 = 0 而遺失,因此調整運算的前後順序,a 先乘以 scaling factor 後能確保 a 必然大於 b,就避免了數值遺失的問題。
### 選擇合適 Q notation
有號 Q notation 其實就只是把二補數加上小數點而已。而有時會把符號位計入整數位,有時不會。若為有號,則最高位即符號位。
該選擇怎樣的 Q notation 需要考慮幾件事:
1. 計算過程要表示的最大數值
2. 可容忍的誤差
參考[鄭以新的作業筆記](https://hackmd.io/@RinHizakura/rJhEpdyNw),他以 Q23.8 實作 ttt 的蒙地卡羅算法,即 1 位 signed bit,23 位整數位,8 位小數位。因為該蒙地卡羅算法中使用到的浮點數範圍僅介於 0 和 3.552684,不過考量到定點數除法需要先將分子乘上 scaling factor,避免分子小於分母造成相除成為 0,如果選用 Q16.16(Q15.16) 則造成左移 16 位時整數部份被抹除,而如果選用 Q23.8,考量到範圍最大僅 3.552684 也就 2 位整數位,左移 8 位也不至於抹除整數數值。
然而在[第五周測驗](https://hackmd.io/@sysprog/linux2025-quiz5)的定點數運算中,同樣是除法,採取將分子擴展為更多位數的整數型態避免數值被抹除。
我想問將整數轉換為更多位數的型態的成本很高嗎? 會造成什麼潛在問題?
## Q4
如何使用 atomic RMW 實作 concurrent linked list? Assume singly-linked list
### atomic 操作
atomic RMW 是 Read-Modify-Write 的簡稱,代表讀-修改-寫回這三個操作屬同一個 atomic operation,不可拆分。
atomic RMW 分為以下幾種,程式碼以 C/C++ 語言的 `stdatomic.h` 為例:
* Exchange
* Test and Set
* Fetch and ...
* Compare and Swap
#### Exchange
將私有變數與共享變數交換。要確保共享變數的交換為 atomic ,也就是讀取共享變數然後寫進私有變數是 atomic。而私有變數內部的自己運算中的修改不需要是 atomic,隨時要修改不會影響都可以。
#### Test and Set
Test and Set 包含一個布林值,讀取並且設定該布林值作為 atomic 操作,而且回傳設定前的布林值
Test and Set 不局限於單純 RMW 操作,也可以用來實作 Spinlock
```c
atomic_flag af = ATOMIC_FLAG_INIT;
void lock()
{
while (atomic_flag_test_and_set(&af)) { /* wait */ }
}
void unlock() { atomic_flag_clear(&af); }
```
`_Bool atomic_flag_test_and_set( volatile atomic_flag* obj )`:sets an atomic_flag to true and returns the old valu
`void atomic_flag_clear( volatile atomic_flag* obj )`:sets an atomic_flag to false
設想目前 atomic flag 是 false,然後某一個執行緒呼叫 lock(),取得之後經過一次迴圈就結束 lock(),接著進行 lock() 之後的函式,此刻 atomic flag 是 true。而這時另外一個執行緒也呼叫 lock() 但是它只取得 true 的 flag,因此不斷的重複執行迴圈。直到前面那一個執行緒結束 lock() 和 unlock() 之間的函式後,呼叫 unlock() 把 flag 設定為 false。這時另外一個執行緒取得 false 的 flag 就能結束 spin。
#### Fetch and ...
Fetch and ... 對於共享變數的基本運算作為 atomic 操作。
`atomic_fetch_add( volatile A* obj, M arg )`
`atomic_fetch_sub( volatile A* obj, M arg ) `
`atomic_fetch_or( volatile A* obj, M arg )`
`atomic_fetch_and( volatile A* obj, M arg )`
`atomic_fetch_xor( volatile A* obj, M arg )`
#### Compare and Swap
在 C/C++ 語言稱為 compare and exchange
pseudo code 如下:
```c
/* A is an atomic type. C is the non-atomic type corresponding to A */
bool atomic_compare_exchange_strong(A* obj, C* expected, C desired)
{
if (memcmp(obj, expected, sizeof(*object)) == 0) {
memcpy(obj, &desired, sizeof(*object));
return true;
} else {
memcpy(expected, obj, sizeof(*object));
return false;
}
}
```
### 阻塞式同步機制
lock 有分為 mutex lock, spinlock, futex 等等
### concurrent linked list
要實作 concurrent linked list,為的就是避免鏈結斷掉的情況:

參考 [並行程式設計: Lock-Free Programming](/-YnlOxYUQxOHI-CJptK7BQ),談到避免鏈結斷掉的方法就是將要刪除的節點先標示成 MARKED,而不是刪除,至於刪除節點的動作交給 `search_node` 這個函式。


而標記一個節點藉由 malloc 的記憶體對齊性質,回傳的位址會以 4 的倍數 byte 對齊,因此記憶體位址第 0 bit 一定是 0,因此可以將第 0 bit 作為標記而不使用額外的空間儲存標記。
:::warning
如何確保不會遇到 [ABA problem](https://en.wikipedia.org/wiki/ABA_problem)?
:::
除此之外,在找到目標節點後,每一次要更改節點的指標,都必須利用 atomic compare and swap 檢查是否節點有被更動,如果有就用重新找目標節點;如果沒有就直接更改節點的指標
以下程式碼對 10 -> 15 -> 20 -> 25 做連續多次刪除和插入,若未使用上述方法,會造成 10 以後的節點斷鏈而搜尋不到。改善以後便可以在輸出得到正確未斷鍊的結果:
```c
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <stdbool.h>
#include <stdint.h>
#include <unistd.h>
#include <stdatomic.h>
#define MARK_MASK ((uintptr_t)1)
#define MARK(p) ((struct list *)(((uintptr_t)(p)) | MARK_MASK))
#define UNMARK(p) ((struct list *)(((uintptr_t)(p)) & ~MARK_MASK))
#define MARKED(p) (((uintptr_t)(p)) & MARK_MASK)
typedef struct list {
int val;
struct list *next;
}list_t;
static struct list *list_search(struct list *head, int key,
struct list **left_node)
{
retry:
struct list *left = head;
struct list *left_next = left->next;
/* 刪除標記的節點 */
while (MARKED(left_next)) {
struct list *unmarked = UNMARK(left_next);
if (atomic_compare_exchange_weak(&left->next, &left_next,
unmarked))
left_next = unmarked;
else
goto retry; /* left->next 發生竄改 */
}
struct list *right = left_next;
while (right && right->val < key) {
left = right;
left_next = __atomic_load_n(&left->next, __ATOMIC_ACQUIRE);
/* 跳過標記的 */
while (MARKED(left_next)) {
struct list *unmarked = UNMARK(left_next);
if (atomic_compare_exchange_weak(&left->next, &left_next,
unmarked))
left_next = unmarked;
else
goto retry;
}
right = left_next;
}
*left_node = left;
return right;
}
bool list_insert_atomic(struct list *head, int key)
{
struct list *left, *right, *node;
for (;;) {
right = list_search(head, key, &left);
if (right && right->val == key)
return false;
node = malloc(sizeof *node);
node->val = key;
node->next = right;
if (atomic_compare_exchange_weak(&left->next, &right,
node))
return true;
free(node);
}
}
bool list_remove_atomic(struct list *head, int key)
{
struct list *left, *right, *right_next;
for (;;) {
right = list_search(head, key, &left);
if (!right || right->val != key)
return false;
right_next = right->next;
if (!MARKED(right_next)) {
if (!atomic_compare_exchange_weak(&right->next, &right_next,
MARK(right_next)))
continue;
}
atomic_compare_exchange_weak(&left->next, &right,
UNMARK(right_next));
return true;
}
}
void *t3(void *l){
printf("t is ready\n");
}
void *t0(void *l){
list_insert_atomic(l, 17);
list_remove_atomic(l, 25);
list_insert_atomic(l, 25);
list_remove_atomic(l, 17);
printf("t0 finish\n");
}
void *t1(void *l){
list_remove_atomic(l, 20);
list_insert_atomic(l, 17);
list_remove_atomic(l, 15);
list_insert_atomic(l, 19);
printf("t1 finish\n");
}
int main(){
list_t *a = malloc(sizeof(list_t));
list_t *b = malloc(sizeof(list_t));
list_t *c = malloc(sizeof(list_t));
list_t *d = malloc(sizeof(list_t));
a->val = 10;
b->val = 15;
c->val = 20;
d->val = 25;
a->next = b;
b->next = c;
c->next = d;
d->next = NULL;
for(list_t *node = a; node != NULL; node = node->next){
printf("%d\n", node->val);
}
pthread_t t[3];
pthread_create(&t[0], NULL, t3, (void *)a);
pthread_create(&t[1], NULL, t0, (void *)a);
pthread_create(&t[2], NULL, t1, (void *)a);
pthread_join(t[0], NULL);
pthread_join(t[1], NULL);
pthread_join(t[2], NULL);
printf("finish\n");
for(list_t *node = a; node != NULL; node = node->next){
printf("%d\n", node->val);
}
return 0;
}
```