owned this note
owned this note
Published
Linked with GitHub
---
tags: linux, kernel
---
# 2022q1 Homework3 (fibdrv)
contributed by < `Destiny0504` >
## 實驗環境
```shell
$ gcc --version
gcc (Ubuntu 9.3.0-17ubuntu1~20.04) 9.3.0
$ lscpu
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
Address sizes: 43 bits physical, 48 bits virtual
CPU(s): 16
On-line CPU(s) list: 0-15
Thread(s) per core: 2
Core(s) per socket: 8
Socket(s): 1
NUMA node(s): 1
Vendor ID: AuthenticAMD
CPU family: 23
Model: 113
Model name: AMD Ryzen 7 3700X 8-Core Processor
Stepping: 0
Frequency boost: enabled
CPU MHz: 2200.000
CPU max MHz: 4426.1709
CPU min MHz: 2200.0000
BogoMIPS: 7186.24
Virtualization: AMD-V
L1d cache: 256 KiB
L1i cache: 256 KiB
L2 cache: 4 MiB
L3 cache: 32 MiB
NUMA node0 CPU(s): 0-15
```
## Fibonacci sequence (iteration)
### char_swap
交換二個指標所指向的字元空間
- 為了不要使用額外的儲存空間,所以我們採用下列的實作方式,透過簡單的加減法來把兩個指標所指向的字元互換。
``` c
void char_swap(char *a, char *b)
{
*a = *a + *b;
*b = *a - *b;
*a = *a - *b;
}
```
### reverse
將字串順序反轉
- 反轉的用意在於方便做加法,否則在進位的情況下,需要將字串右移 shift 一位
``` c
void reverse(char *a)
{
int counter = 0, len = strlen(a) - 1;
if (len % 2) {
for (; counter <= len / 2; counter++) {
char_swap(a + counter, a + len - counter);
}
} else {
for (; counter < len / 2; counter++) {
char_swap(a + counter, a + len - counter);
}
}
}
```
### string_number_add
將兩個數字字串相加
- 因為我們需要實現大數加法,所以使用字串儲存我們要計算的大數避免 overflow
``` c
char *string_number_add(char *a, char *b, char *out)
{
char *data_a, *data_b, *buf;
size_t size_a, size_b;
int i, carry = 0;
int sum;
/*
* Make sure the string length of 'a' is always greater than
* the one of 'b'.
*/
if (strlen(a) < strlen(b)) {
void *tmp = a;
a = b;
b = (char *) tmp;
}
size_a = strlen(a);
size_b = strlen(b);
data_a = a;
data_b = b;
buf = (char *) kmalloc(sizeof(char) * size_a + 1, GFP_KERNEL);
reverse(data_a);
reverse(data_b);
memset(buf, 0, sizeof(char) * strlen(buf) + 1);
/*
* The next two for-loop are calcuating the sum of a + b
*/
for (i = 0; i < size_b; i++) {
sum = (data_a[i] - '0') + (data_b[i] - '0') + carry;
buf[i] = '0' + sum % 10;
carry = sum / 10;
}
for (i = size_b; i < size_a; i++) {
sum = (data_a[i] - '0') + carry;
buf[i] = '0' + sum % 10;
carry = sum / 10;
}
if (carry) {
// allocate a extra byte for 'carry'
buf = (char *)krealloc(buf, sizeof(char) * size_a + 2, GFP_KERNEL);
out = (char *)krealloc(out, sizeof(char) * size_a + 2, GFP_KERNEL);
buf[i] = '0' + carry;
i++;
}
reverse(buf);
buf[i] = 0;
/* Restore the original string */
reverse(data_a);
reverse(data_b);
if (out) {
memmove(out, buf, strlen(buf) + 1);
}
kfree(buf);
return out;
}
```
### fib_sequence
利用 `string_number_add` 計算 fibonacci sequence 中的數。
- 利用 `copy_to_user` 將我們算出來的結果從 kernel space 複製一份到 user space 。
- 因為我們的 ```fibdrv.c``` 要做為一個 kernel module ,所以必須去 linux document 找出有同樣功能的 function 來使用 ( e.g. `memmove` 、`kmalloc` 、`krealloc` )。
``` c
static long long fib_sequence(long long k, char *buf)
{
ssize_t retval = 0;
char *num1 = (char *) kmalloc(sizeof(char) * 2, GFP_KERNEL);
char *num2 = (char *) kmalloc(sizeof(char) * 2, GFP_KERNEL);
char *ans = (char *) kmalloc(sizeof(char) * 2, GFP_KERNEL);
memset(num1, 0, sizeof(char) * 2);
memset(num2, 0, sizeof(char) * 2);
num1[0] = '0';
num2[0] = '1';
if (k)
memmove(ans, num2, 2);
else
memmove(ans, num1, 2);
for (int i = 2; i <= k; i++) {
printk("Fib : %d\n", i);
memset(ans, 0, sizeof(char) * strlen(ans) + 1);
ans = string_number_add(num1, num2, ans);
num1 = krealloc(num1, sizeof(char) * strlen(num2) + 1, GFP_KERNEL);
if (!num1)
return -1;
memmove(num1, num2, strlen(num2) + 1);
num2 = krealloc(num2, sizeof(char) * strlen(ans) + 1, GFP_KERNEL);
if (!num2)
return -1;
memmove(num2, ans, strlen(ans) + 1);
}
kfree(num1);
kfree(num2);
retval = _copy_to_user(buf, ans, strlen(ans) + 1);
kfree(ans);
if (retval < 0)
return -EFAULT;
return 0
}
```
- 由上面的程式碼能夠計算出至少 `Fib(100000)` 的數
- 此實作方法因為大量的使用 `krealloc` ,會拉長執行的時間,所以在改進的部份有減少使用`krealloc`,嘗試確認 `krealloc` 影響的多寡。
```
Fib(1000) = 43466557686937456435688527675040625802564660517371780402481729089536555417949051890403879840079255169295922593080322634775209689623239873322471161642996440906533187938298969649928516003704476137795166849228875
```
- 此結果與網路上查詢到的結果相同,接下來會討論要怎麼驗證結果正確
### Performance
- 計算 `Fib(0)` ~ `Fib(100)` 每一個數字所需的時間
- 由執行結果可知 iteration 版本的計算的時間複雜度為 $O(n)$
![](https://i.imgur.com/Uf89I5q.png)
:::warning
後續章節改進:`krealloc` 會拖累程式的執行速度,所以應該盡量減少使用
:::
## Fibonacci sequence (fast doubling)
因為 fast doubling 的 Fibonacci sequence 公式有減法與乘法,所以我們需要實作大數乘法與大數減法。
$$\begin{array}{l}
F(2n) = F(n)[2F(n+1) - F(n)]\\
F(2n+1) = (F(n+1))^2 + (F(n))^2
\end{array}
$$
### string_number_add
``` c
char *string_number_add(char *a, char *b)
{
char *data_a, *data_b, *buf;
size_t size_a, size_b;
int i, carry = 0;
int sum;
/*
* Make sure the string length of 'a' is always greater than
* the one of 'b'.
*/
if (strlen(a) < strlen(b)) {
void *tmp = a;
a = b;
b = (char *) tmp;
}
size_a = strlen(a);
size_b = strlen(b);
data_a = a;
data_b = b;
buf = (char *) kmalloc(size_a + 2, GFP_KERNEL);
reverse(data_a);
reverse(data_b);
memset(buf, 0, size_a + 2);
/*
* The next two for-loop are calcuating the sum of a + b
*/
for (i = 0; i < size_b; i++) {
sum = (data_a[i] - '0') + (data_b[i] - '0') + carry;
buf[i] = '0' + sum % 10;
carry = sum / 10;
}
for (i = size_b; i < size_a; i++) {
sum = (data_a[i] - '0') + carry;
buf[i] = '0' + sum % 10;
carry = sum / 10;
}
if (carry)
*(buf + i) = '0' + carry;
// printk("Variable i : %d\n", i);
buf = krealloc(buf, strlen(buf) + 1, GFP_KERNEL);
reverse(buf);
/* Restore the original string */
reverse(data_a);
reverse(data_b);
return buf;
}
```
### string_number_mul
將兩個數字字串相乘
```c
char *string_number_mul(char *a, char *b)
{
size_t size_a = strlen(a), size_b = strlen(b);
int i = 0, j = 0, carry;
unsigned short *buf = (unsigned short *) kmalloc(
sizeof(short) * (size_a + size_b + 4), GFP_KERNEL);
char *result =
(char *) kmalloc(size_a + size_b + 4, GFP_KERNEL);
memset(buf, 0, sizeof(short) * (size_a + size_b + 4));
memset(result, 0, size_a + size_b + 4);
reverse(a);
if (strcmp(a, b) != 0)
{
reverse(b);
}
for (; i < size_a; i++) {
for (j = 0; j < size_b; j++) {
*(buf + i + j) +=
((*(a + i) - '0') * (*(b + j) - '0') + carry) % 10;
carry = ((*(a + i) - '0') * (*(b + j) - '0') + carry) / 10;
}
*(buf + i + j) += carry;
carry = 0;
}
carry = 0;
for (i = 0; i < size_a + size_b; i++) {
*(result + i) = (*(buf + i) + carry) % 10 + '0';
carry = (*(buf + i) + carry) / 10;
}
for (i = size_a + size_b - 1; *(result + i) == '0' && i > 0; i--)
*(result + i) = 0;
reverse(a);
if (strcmp(a, b) != 0)
{
reverse(b);
}
reverse(result);
if (strlen(result) < 1) {
*result = '0';
}
result = (char *) krealloc(result, strlen(result) + 1, GFP_KERNEL);
kfree(buf);
return result;
}
```
### string_number_min
將兩個數字字串相減
```c
char *string_number_min(char* a, char *b)
{
size_t size_a = strlen(a), size_b = strlen(b);
int i = 0, carry = 0;
char *buf = (char *) kmalloc(size_a + 1, GFP_KERNEL);
memset(buf, 0, size_a + 1);
reverse(a);
reverse(b);
for (; i < size_b; i++) {
if (((*(a + i) - '0') - (*(b + i) - '0') + carry) < 0) {
*(buf + i) = *(a + i) - *(b + i) + '0' + carry + 10;
carry = -1;
} else {
*(buf + i) = *(a + i) - *(b + i) + '0' + carry;
carry = 0;
}
}
for (; i < size_a; i++) {
if (((*(a + i) - '0') + carry) < 0) {
*(buf + i) = *(a + i) + carry + 10;
carry = -1;
} else {
*(buf + i) = *(a + i) + carry;
carry = 0;
}
}
// remove the leading zero
if (*(buf + i - 1) == '0' && strlen(buf) > 1)
*(buf + i - 1) = 0;
reverse(a);
reverse(b);
reverse(buf);
buf = (char *) krealloc(buf, strlen(buf) + 1, GFP_KERNEL);
return buf;
}
```
### fib_sequence_fast_doubling (使用 clz 加速)
由 fast doubling 的公式可以得知在遇到第一個 1 之前,`fib_0` 與 `fib_1` 所紀錄的值並不會改變,所以使用 `__builtin_clz` 可以直接得出由右開始計算的第幾個 bit 為 1 ,能夠讓迴圈的執行次數變少,達成加速的效果。
``` c
void fib_fast_doubling(long long k, char *buf)
{
char *a, *b, *fib_0, *fib_1, *tmp_0, *tmp_1, *two = "2\0";
long long i;
ssize_t retval = 0;
fib_0 = (char *) kmalloc(2, GFP_KERNEL);
fib_1 = (char *) kmalloc(2, GFP_KERNEL);
memset(fib_0, 0, 2);
memset(fib_1, 0, 2);
*fib_0 = '0';
*fib_1 = '1';
// datatype long long has 64 bits
for (i = 1u << (64 - __builtin_clz(k)); i > 0; i >>= 1) {
// calcualting a
tmp_0 = string_number_mul(two, fib_1);
tmp_1 = string_number_min(tmp_0, fib_0);
a = string_number_mul(tmp_1, fib_0);
kfree(tmp_0);
kfree(tmp_1);
// calculating b
tmp_0 = string_number_mul(fib_0, fib_0);
tmp_1 = string_number_mul(fib_1, fib_1);
b = string_number_add(tmp_0, tmp_1);
kfree(tmp_0);
kfree(tmp_1);
kfree(fib_0);
kfree(fib_1);
if (i & k) {
fib_0 = (char *) kmalloc(strlen(b) + 1, GFP_KERNEL);
memmove(fib_0, b, strlen(b) + 1);
fib_1 = string_number_add(a, b);
} else {
fib_0 = (char *) kmalloc(strlen(a) + 1, GFP_KERNEL);
fib_1 = (char *) kmalloc(strlen(b) + 1, GFP_KERNEL);
memmove(fib_0, a, strlen(a) + 1);
memmove(fib_1, b, strlen(b) + 1);
}
kfree(a);
kfree(b);
}
kfree(fib_1);
retval = _copy_to_user(buf, fib_0, strlen(fib_0) + 1);
kfree(fib_0);
}
```
### Performance
- 計算 `Fib(0)` ~ `Fib(100)` 每一個數字所需的時間
- 由執行結果可知 iteration 版本的計算的時間複雜度為 $O(\log n)$
![](https://i.imgur.com/KDK7Ayv.png)
## [KYG-yaya573142](https://hackmd.io/@KYWeng/rkGdultSU#2020q1-Homework2-fibdrv) 的實作分析
### 與我的實作比較
- 他使用了 ```unsigned int pointer``` 來儲存大數 (`fibdrv` 的[分支](https://github.com/0xff07/bignum/tree/fibdrv)),減少了記憶體的使用量。
- 相較於我的版本需要使用 `1 byte` 來儲存一個數字,他的實作能夠用 `4 bytes` 來存放 $0 \sim 2^{32} - 1$
- 能夠使用原本的數值加減法,相比於字串儲存的每一位計算一次更加快速,能夠使用的加速手法也更加彈性。
- 我的實作方法雖然直觀,但在執行速度上有待改進。
### 其他實作上的優點
- 使用 memory pool 的概念,減少需要 `realloc` 的次數
- 減少 branch miss 的次數,以及盡可能的利用已經取出來的資料
- [Karatsuba algorithm](https://en.wikipedia.org/wiki/Karatsuba_algorithm)
- Example 1 :
- $76 = 4 \times 2^4 + 12$
- $39 = 2 \times 2^4 + 7$
$$
\begin{array}{l}
76\times 39 = 2964\\
a = 4,\ b = 12,\ c = 2,\ d = 7\\
\text{step 1 : }a\times c= 4\times 2 = 8\ (\text{multiply})\\
\text{step 2 : }b\times d= 12\times 7 = 84 = 5 \times 2^4 + 4\ (\text{multiply})\\
\text{step 3 : }(a+b)\times (c+d)= 16\times 9 = 144 = 9 \times 2^4\ (\text{multiply})\\
\text{step 4 : }\text{step 3}-\text{step 2}-\text{step 1}= 144-84-8=52=3\times 2^4 + 4\\
\text{step 5 : }C_1 = (\text{step 1}) \times 2^8=8\times 2^8\ (\text{shift})\\
\text{step 6 : }C_2 = (\text{step 4}) \times 2^4=52\times 2^4\ (\text{shift})\\
\text{step 7 : total = }C_1+C_2+(\text{step 2}) = 8\times 2^8 + 52\times 2^4 + 5 \times 2^4 +4= 11 \times 2^8+9\times 2^4+4
\end{array}
$$
- 如果直接進行相乘的話則會變成以下的結果
$$
\begin{array}{l}
76\times 39 = 2964\\
a = 4,\ b = 12,\ c = 2,\ d = 7\\
\text{step 1 : }b\times d= 12\times 7 = 84 = 5 \times 2^4 + 4\ (\text{multiply})\\
\text{step 2 : }a\times d= 4\times 7 = 28 = 28 \times 2^4\ (\text{multiply})\\
\text{step 3 : }b\times c= 12\times 2 = 24 = 24 \times 2^4\ (\text{multiply})\\
\text{step 4 : }a\times c= 4\times 2 = 8 = 8 \times 2^8\ (\text{multiply})\\
\text{step 5 : total = (step 1) + (step 2) + (step 3) + (step 4)}= 8\times 2^8 + 52\times 2^4 + 5 \times 2^4 +4= 11 \times 2^8+9\times 2^4+4
\end{array}
$$
- 由這兩者的結果比較可以得知,將整個過程分成 $C_1+C_2+(\text{step 2})$ 的方式去計算,才是整個演算法的關鍵。
- Example 2 :
- $421 = 1 \times 2^8 + 10 \times 2^4 + 5$
- $678 = 2 \times 2^8 + 10 \times 2^4 + 6$
$$
\begin{array}{l}
421\times 678 = 285438\\
a = 1,\ b = 10,\ c = 5,\ d = 2,\ e = 10,\ f = 6\\
\text{step 1 : }a\times d= 1\times 2 = 2\ (\text{multiply})\\
\text{step 2 : }c\times f= 5\times 6 = 30 = 1 \times 2^4 + 14\ (\text{multiply})\\
\text{step 3 : }b\times e= 10\times 10 = 100 = 6 \times 2^4 + 4\ (\text{multiply})\\
\text{step 4 : }(a+b)\times (d+e)= 11\times 12 = 132 = 8 \times 2^4 + 4\ (\text{multiply})\\
\text{step 5 : }(b+c)\times (e+f)= 15\times 16 = 240 = 15 \times 2^4\ (\text{multiply})\\
\text{step 6 : }\text{step 4}-\text{step 3}-\text{step 1}= 132-100-2=30=1 \times 2^4 + 14\\
\text{step 7 : }\text{step 5}-\text{step 3}-\text{step 2}= 240-100-30=110=6\times 2^4 + 14\\
\text{step 8 : }(a+b+c)\times (d+e+f)= 16\times 18 = 288 = 18 \times 2^4\ (\text{multiply})\\
\text{step 9 : }\text{step 8}-\text{step 1}-\text{step 2}-\text{step 6}-\text{step 7}= 288-2-30-30-110=116=7\times 2^4 + 4\\
\text{step 10 : }C_1 = (\text{step 1}) \times 2^{16}=2\times 2^{16}\ (\text{shift})\\
\text{step 11 : }C_2 = (\text{step 6}) \times 2^{12}=1 \times 2^{16} + 14\times 2^{12}\ (\text{shift})\\
\text{step 12 : }C_3 = (\text{step 9}) \times 2^{8}=7\times 2^{12} + 4\times 2^8\ (\text{shift})\\
\text{step 13 : }C_4 = (\text{step 7}) \times 2^{4}=6\times 2^8 + 14 \times 2^4\ (\text{shift})\\
\text{step 14 : total = }C_1+C_2+C_3+C_4+(\text{step 2}) = (2+1)\times 2^{16} + (14+7)\times 2^{12} + (4+6) \times 2^8 + (14 + 1) \times 2^4+14\\= 4\times 2^{16} + 5\times 2^{12} + 11 \times 2^8 + 15 \times 2^4+14=285438
\end{array}
$$
- $4\times 2^{16} + 5\times 2^{12} + 11 \times 2^8 + 15 \times 2^4+14=285438\Rightarrow$ 總共做了 6 次乘法,相較於不使用這個演算法的情況少了 3 次
- 由上面的執行流程可以看出數字越大,這個演算法能帶來的效益越大。
:::warning
提醒:Karatsuba algorithm 一定要配合 $C_1+C_2+(\text{step 2})$ 的方式去計算,否則需要花費的乘法計算次數不會比較少,而且上面的分析沒有考慮到加法的成本,所以當數字很小,有可能沒有省到計算時間。
:::
## 執行速度改進
### 減少 `krealloc` 的使用
- 改進 fib_sequence function
```c
for (int i = 2; i <= k; i++) {
printk("Fib : %d\n", i);
kfree(ans);
ans = string_number_add(num1, num2);
kfree(num1);
num1 = num2;
num2 = (char *) kmalloc(strlen(ans) + 1, GFP_KERNEL);
memmove(num2, ans, strlen(ans) + 1);
}
```
- 改進 string_number_add
```diff
static long long fib_sequence(long long n)
{
...
- if (carry) {
- buf = (char *)krealloc(buf, sizeof(char) * size_a + 2, GFP_KERNEL);
- out = (char *)krealloc(out, sizeof(char) * size_a + 2, GFP_KERNEL);
- buf[i] = '0' + carry;
- i++;
- }
- reverse(buf);
- buf[i] = 0;
- reverse(data_a);
- reverse(data_b);
- if (out) {
- memmove(out, buf, strlen(buf) + 1);
- }
- kfree(buf);
+ buf = krealloc(buf, strlen(buf) + 1, GFP_KERNEL);
...
}
```
相較於原本的版本,新的版本減少了一次 `memmove` 一次 `krealloc`,所以執行時間的可以看出不小的差距。而且也沒有出現執行時間浮動的問題。
![](https://i.imgur.com/lCCxzYd.png)
## CPU affinity
1. 修改 `/etc/default/grub` 內的 `GRUB_CMDLINE_LINUX_DEFAULT`,加入 `isolcpus=15` 來指定編號 15 的核心不被排程器賦予任務,**修改完成後**需 `sudo update-grub` 來更新設定,重開機後即生效,可以用 `taskset -cp 1` 命令得知特定任務的 CPU affinity。
```shell
$ taskset -cp 1
pid 1's current affinity list: 0-14
```
2. **將程式固定在指定的 CPU 中執行**
```shell
$ sudo taskset -c 15 ./client
```
3. **抑制 [address space layout randomization](https://en.wikipedia.org/wiki/Address_space_layout_randomization) (ASLR)**
- ASLR 是一種保護 kernel 被攻擊的工具,能夠讓 memory 的位址較難被預測。
```shell
$ sudo sh -c "echo 0 > /proc/sys/kernel/randomize_va_space"
```
4. **設定 scaling_governor 為 performance**
```shell
$ echo performance > /sys/devices/system/cpu/cpu15/cpufreq/scaling_governor
```
5. **將 AMD cpu 的 boost 關閉**
```shell
$ sudo sh -c "echo 0 > /sys/devices/system/cpu/cpufreq/boost"
```
### Fibonacci (iteration)
相比於去除干擾因素前,可以看出更明顯的線性關係
![](https://i.imgur.com/agBpupi.png)
### Fibonacci (fast doubling)
相比於去除干擾因素前**並無**太大的區別
![](https://i.imgur.com/XWBxBl0.png)
### 兩種實作版本的 performance 比較
![](https://i.imgur.com/L4EFflY.png)
## kernel space 與 user space 資料轉移
### 測試 copy_from_user
- fib_write function 中加入以下程式碼來從 user 複製字串到 kernel
```c
kt = ktime_get();
retval = _copy_from_user(test_copy, buf, *offset);
kt = ktime_sub(ktime_get(), kt);
```
- 加入 `access_ok` 保護 kernel space
- 加入 `access_ok` 的用意在於確認 buf 這個指標所指向的位置確實在 user space 以免出現系統漏洞,讓惡意使用者可以更改 kernel space 的內容
```diff
kt = ktime_get();
+ if (access_ok(buf, size))
retval = _copy_from_user(test_copy, buf, size);
kt = ktime_sub(ktime_get(), kt);
```
- client.c 中加入以下程式碼來增長要複製的字串長度
```c
for (int i = 2; i <= offset; i++) {
// resize the string
test_write = realloc(test_write, i);
memset(test_write, '1', i);
*(test_write + i - 1) = 0;
sz = write(fd, test_write, i);
printf("%lld\n", sz);
}
```
### 測試 copy_to_user
- fib_read function 改成以下程式碼來從 kernel 複製字串到 user
```c
char *test_copy = (char *) kmalloc(*offset, GFP_KERNEL);
memset(test_copy, '1', *offset);
*(test_copy + *offset - 1) = 0;
kt = ktime_get();
retval = _copy_to_user(buf, test_copy, *offset);
kt = ktime_sub(ktime_get(), kt);
```
### Performance 比較
綠色的線所代表的是 copy from user 所需要的時間
紫色的線所代表的是 copy to user 所需要的時間
- 我們能觀察到因為 page fault 所以在複製第一次複製字串時所花的時間特別多
![](https://i.imgur.com/415oL2e.png)
- 將測試用的字串的長度加長到 3000 之後,可以看出兩者的執行時間仍舊相當接近
![](https://i.imgur.com/ogVwv1G.png)
- 加入 `access_ok` 的執行時間比較,這就說明了記憶體區塊檢查所花的時間並不多
![](https://i.imgur.com/gOywPhm.png)
## 如何驗證結果的正確性
- 在確保整數的[等價性 (equivalent class)](https://en.wikipedia.org/wiki/Equivalence_class) 後,由等價的整數所成的集合[基數 (cardinality)](https://en.wikipedia.org/wiki/Cardinality) 必定為 1 (**即每一個數都是獨一無二的**),因此整數加法必為 [well-defined](https://en.wikipedia.org/wiki/Well-defined_expression),所以我們只需要確認 `Fib(0)` 與 `Fib(1)` 永遠是相同數字,就可以確保結果的正確性。
- 因為我們在算 `Fibonacci number` 時的位數只會不斷的上升,所以我們可以嘗試使用位數估計的方式,去估計我們的結果的正確性。
### 使用 python 程式進行驗證
在 C 語言 `int` 只佔有 4 bytes 所以任何整數加上 0 與加上 $2^{32}$ 會得出一樣的結果,這不符合前面提及的 [well-defined](https://en.wikipedia.org/wiki/Well-defined_expression) ,但因為 python 具有大數處理的能力,不會有任何整數加上兩個不同整數卻可以得到相同的答案,所以我們可以先用 python 的程式碼計算出結果,再與我們用 C 語言實作的結果進行比對驗證我們計算的數值正確性。
```python
f = open("./out",'r')
fib = f.readlines()
num1 = 1
num2 = 1
next_fib = 2
for i in range(1,1001):
if (num1 != int(fib[i - 1])):
print('wrong answer')
num1, num2, next_fib = num2, next_fib, num2 + next_fib
f.close()
```
驗證前 1000 個 Fibonacci 數的 python 程式碼
### 使用位數估計的方式來驗證
可以透過 Fibonacci sequence 的遞迴關係式得出 Fibonacci 數的一般式如下 :
$$
F_n=\frac{1}{\sqrt{5}}[(\frac{1+\sqrt{5}}{2})^n-(\frac{1-\sqrt{5}}{2})^n]\approx 0.4472135955\ \cdot 1.61803398875^n
$$
我們要估計的是位數,所以將上式的 $F_n$ 取 $\log$ 來得到估計位數值
$$
\log(F_n)=\log0.4472135955+n\log1.61803398875 = - 0.34948500216 + n \times 0.20898764025
$$
- 檢驗估計的 Fibonacci 數的位數是否正確
- 如果精確度不取這麼多位,會造成計算結果出錯。
```python
from collections import Counter
with open('./out', 'r') as f:
fib_len = f.readlines()
fib_len = list(map(lambda x : int(x), fib_len))
# log(1.61803398875) = 0.20898764025
# log(0.4472135955) = -0.34948500216
approx = [int(0.20898764025 * a - 0.34948500216) + 1 for a in range(1, len(fib_len) + 1)]
tmp = [fib_len[i] == approx[i] for i in range(0, len(fib_len))]
tmp = Counter(tmp)
print(tmp)
```
- 檢驗前 42957 個 Fibonacci 數的執行結果
```
Counter({True: 45297})
```
- 整合進 `fib_fast_doubling` 和 `fib_sequence`
- kernel 中限制不能進行浮點數運算,因為浮點數有[系統漏洞](https://hackmd.io/@sysprog/linux2020-kcalc#-Linux-核心的浮點數運算),但這裡需要使用浮點數才能進行估計,所以只好下方的算法
```diff
+ if (strlen(fib_0) | ((int)((20898764025 * k - 34948500216) / 100000000000) + 1))
retval = _copy_to_user(buf, fib_0, strlen(fib_0) + 1);
```
:::warning
Robert Love 在 "Linux Kernel Development" 提到浮點數時是這樣說的
> No (Easy) Use of Floating Point
> When using floating-point instructions kernel normally catches a trap and then initiates the transition from integer to floating point mode. Unlike user-space, the kernel does not have the luxury of seamless support for floating point because it cannot easily trap itself. Using a floating point inside the kernel requires manually saving and restoring the floating point registers. Except in the rare cases, no floating-point operations are in the kernel.
:::
## mutex 的功用
目前觀察到的用處,但還需要進一步的檢驗
- `fib_open` 中有 `mutex_trylock(&fib_mutex)` 檢查有沒有其他的 thread、process 打開這個 module,但其實並不需要這樣做,因為取得 `Fib(offset)` 的過程中,只有 `offset` 是共享的。
- 在我們原本的實作中,`fib_write` 並沒有任何的功能,所以不需要任何的保護,但如果我們要測試 `copy_from_user` 的話就需要用`mutex_trylock`保護共享的變數。
- 我們的 module 需要 `mutex` 保護的只有 `fib_read` 的 `offset`
### 避免使用 mutex
- 為了有效的使用 cpu ,所以我們可以使用 workqueue 來用不同的 cpu 或是不同的 worker 來執行 `fibdrv` ,只要將要執行的 work 掛載於 workqueue 上,等待 work 被執行,進而不再限制打開 fibdrv 的 process 數量。
:::info
在 `fib_read` 之中,有一個參數並沒有被用到,其實也可以用這個方式來決定要讀取的數值,並不需要利用 `offset`
:::
## [Concurrency Managed Workqueue](https://www.kernel.org/doc/html/latest/core-api/workqueue.html)
### 為什麼我們需要 cmwq
- 在 cmwq 推出之前只有兩種 workqueue ,Multithread workqueue 和 Singlethread workqueue,前者一個 workqueue 只有一個 execution context ,後者則是整個 system 共同持有一個 execution context,這樣的作法容易造成 deadlock。因為兩者的方便性與功能沒辦法滿足 concurrency 和 resource usage 的考量,所以為了解決這些問題有些 module (e.g. `fache`)甚至實作了自己的 thread pool 。
- workqueue 的 user 其實並不關心 worker pool 如何運作的,所以將 workqueue 與 worker 拆分開來更方便使用。
### cmwq 想要達成的目標
- 相容以前的 workqueue API
- 每個 CPU 有一個統一的 worker pool 被所有的 workqueue 共用,藉此滿足一定程度的 concurrency 需求,同時避免過多的資源浪費。
- 規範好 worker pool 的行為以及 concurrency 的程度,減輕使用者的負擔
### 內部設計
- work item
- 用一個 function pointer 指向要非同步執行的 function ,並包裝成一個 work item 放入 workqueue 之中
- worker threads
- 負責執行 workqueue 中的 work ,如果沒有 work 則會 idle,如果 idle 的時間過久才會被刪除
- 將 workqueue 的後端運作機制抽離,只提供指令給 user 或是 driver 將 work item 放入 workqueue 。
- 每一個可供系統調用的 CPU 都有兩種 bounded worker-pool ,一個給一般的 work item 的 worker-pool 和一個給高優先度的 work item 的 worker-pool。除了 bounded worker-pool 還會有額外的 unbounded worker-pool,但不屬於單一個 CPU。
- 當一個 work item 放入 workqueue 之後,會由 workqueue 的屬性選擇應該被放入的 worker-pool,同時被登記在 worker-pool 之間共用的 worklist。
- concurrency 的程度都是對於任何一種 worker-pool 都很重要,最佳的情況是可以維持足夠程度的並行程度,但又不會太超過以至於佔用過多的硬體資源。
- worker-pool 通過 scheduler 來實現並行管理,而 worker-pool 只有在所有的 worker 全部都 idle 的情況才會喚醒一個 worker 去取得新的 work item 來執行,確保 CPU 不會 idle。
- **bounded worker-pool** => 限制綁定在一特 CPU 上,而且每個沒有被 cpu mask 限制不能使用的 CPU 都有自己的 normal thread pool 和 high priority thread pool
- **unbounded worker-pool** => 不限制綁定在哪個 CPU 上,而且只有在一個 work item 要被加入 unbounded worker-pool 但沒有相同屬性的 worker pool 的時候才會建立新的。加入 unbounded worker-pool 中的 work 被安排給哪個 CPU 執行由 scheduler 來調度,可以避免喚醒 idle 的 CPU 達成節省能源的效果,但相對的 cache 的利用率會比較低。
- 因為 idle 的 worker 是一個 kthread 所以佔用的資源極少,所以不會在他剛變成 idle 就刪除他。
- 控制 unbounded workqueues 的 concurrency 程度是 user 的責任,`cmwq` 提供了 [`apply_workqueue_attrs()`](https://hackmd.io/Q66IYnGKSwy8ckseib-IpA?view#alloc_workqueue-的-flags-與-argument) 來讓 user 自己進行調整。
- 除了前面提到的 worker,worker-pool 還有 rescue-worker 來幫忙釋放原本 worker 所佔用的記憶體,避免 worker 全部被使用光而沒辦法收回被這些 worker 所佔用的記憶體,行程整個 worker-pool 的 deadlock
- workqueue 是設計給 kernel space 用的
### cpu affinity 與 cmwq 的 cpu mask 之間的關係
因為前面有設定 cpu affinity 的緣故,所以 `/sys/devices/virtual/workqueue` 的 `mask` 被改為 `ffff7fff` => 第 15 個 bit 被改為 0 了
### 將 cmwq 引入專案中 ( 參考 [kecho](https://github.com/sysprog21/kecho) 專案的實作 )
#### 建立新的 work_struct
```c
struct kfib {
int offset;
/* This pointer points to the space we need to store the anser */
char *buffer;
/* use for queue work on workqueue */
struct work_struct fib_work;
};
```
- 建立這個 struct 的理由如下
- 因為被 workqueue 所執行的 work 只能接受 `struct work_struct *` 作為傳入的變數,所以當我們需要傳入額外的變數 (e.g. offset) ,就可以用指向 `kfib` 的指標來取得 `offset` 的值。我們只需要利用 `container_of` 就可以在只有 `struct work_struct *` 得到指向包含這個 `work_struct` 的 `kfib` 的指標
![](https://i.imgur.com/mM6C7IZ.png =40%x)
```c
struct fib_worker {
struct list_head list;
/* Store the pid of the user process or thread for lock-free */
int pid;
int offset;
}
```
- 建立這個 struct 的理由如下
- 因為 `offset` 是整個 file descriptor 的共用變數,所以當我們要同時讓兩個以上的 process 使用這個 character device 時,要讓這個變數不再被共用,而這裡的作法便是將 offset 與 pid 同時儲存起來,未來要執行放入的 work 的時候,只需要比對 pid 即可找出對應的 work 並執行
- 此次破除 mutex 的方式採取紀錄 thread id 的方式,因為 thread id 在 kernel 的角度都是不同的,所以不會有重複的問題。
- `list` 作用在於串接於整個 `fibdrv` 所維護的額外的 linked list, 利用 thread ID 不會重複的特性,當執行 fib_read 時,只要在 list 上找出 pid 與現在執行 fib_read 相同者,就可以知道當初放入 workqueue 時的 `offset` 是多少
#### 額外建立新的 workqueue -> 利用 queue_work 將 work 放入 workqueue 之中
- 為了避免在與系統共用同一個 workqueue 會導致我們在打開 mutex 時會造成的 race condition ,所以我們建立屬於自己的 workqueue 。
```c
#include <linux/workqueue.h>
static struct workqueue_struct *fibdrv_wq;
/* the list head */
static struct list_head supervisor;
```
- 在建立 workqueue 的時候,如果失敗則須要將 workqueue 捨棄,並同時初始化 supervisor 作為我們的串列的開頭
```c
static int __init init_fib_dev(void)
{
INIT_LIST_HEAD(&supervisor);
if (!test_wq) {
printk("Failed to create workqueue");
rc = -ENOMEM;
goto failed_cworkqueue;
}
return rc;
failed_cworkqueue:
destroy_workqueue(fibdrv_wq);
return rc;
}
```
#### work_fn
用這個 function 來呼叫已經實作好的 `fib_fast_doubling`,並在執行完之後釋放 worker 所佔用的記憶體。
```c
void work_fn(struct work_struct *work)
{
struct kfib *worker = container_of(work, struct kfib, fib_work);
fib_fast_doubling(worker->offset, worker->buffer);
kfree(worker);
}
```
#### create_work
我們為了讓只能傳入`work_struct` 的 `work_fn` 可以得到別的參數所以需要用這個 function 初始化一個 `kfib` 並回傳其中的 `fib_work` 變數,來將其掛載於我們建立的 `workqueue`
```c
static struct work_struct *create_work(long long data, char *buf)
{
struct kfib *work;
if (!(work = kmalloc(sizeof(struct kfib), GFP_KERNEL)))
return NULL;
work->offset = data;
work->buffer = buf;
INIT_WORK(&work->fib_work, work_fn);
return &work->fib_work;
}
```
#### 修改 fib_read
- 因為不再有 mutex 保護,所以我們讀取用於儲存過往移動過 offset 紀錄的串列來得知需要計算第幾個 fibonacci number 。
```c
static ssize_t fib_read(struct file *file,
char *buf,
size_t size,
loff_t *offset)
{
struct work_struct *work;
struct fib_worker *l, *tar;
int pid = task_pid_nr(current);
long long tmp = -1;
list_for_each_entry_safe (tar, l, &supervisor, list) {
if (unlikely(tar->pid == pid)) {
tmp = tar->offset;
__list_del_entry(&tar->list);
kfree(tar);
break;
}
}
if (unlikely(tmp < 0)) {
printk(KERN_ALERT
"Didn't assign the value for reading.");
return -1;
}
while (1) {
if (unlikely(!(work = create_work(tmp, buf)))) {
printk(KERN_ERR DEV_FIBONACCI_NAME
": can't create work\nstart a next try");
continue;
}
flush_work(work);
return 0;
}
}
```
- 如果在移除節點的過程之中被打斷,由另一個 thread 執行 fib_read,有可能導致整個 list 斷裂,所以要確保只有一個 thread 可以移除節點。
- 因為 critical section 之中只有移除節點一個簡單的任務,確保它不會被目前佔用的 cpu 打斷,可以避免在使用 bounded workqueue 時,同個 cpu 上的其他 thread 被卡住,導致整個 workqueue 上所有的 work 都被卡住,如果使用的是 unbounded workqueue 則可以盡量不卡到其他的 cpu 上的 work ,所以這邊使用 `spin_lock_irq()`。
```diff
+ spin_lock_irq(&fib_lock);
__list_del_entry(&tar->list);
+ spin_unlock_irq(&fib_lock);
```
:::warning
應該增加新的機制 (e.g. [timer](https://hackmd.io/@sysprog/linux-timer?fbclid=IwAR0Di1SqKQZbiziMCH-hmUs5S6t12octgjikrf10mkXfAZuaA5AGrStJ-ms#Timer)),避免 process 一直不執行 `fib_read` 所導致的串列過長問題
:::
### fib_device_lseek
由於移動 offset 就代表未來 fib_read 會需要讀取相對應的 fiboncci number ,所以需要將 offset 、 pid 儲存起來並掛載於串列中。同時為了避免一個 process 或是 thread 更改 offset 多於一次,所以在這樣的情況發生時,將採取不將新的 offset 放入串列之中的作法。
```c
struct fib_worker *work;
struct fib_worker *l, *tar;
work = kzalloc(sizeof(struct fib_worker), GFP_KERNEL);
work->pid = task_pid_nr(current);
work->offset = new_pos;
list_for_each_entry_safe (tar, l, &supervisor, list) {
printk("Entry in supervisor %d, its pid = %d", tar->offset, tar->pid);
if (work->pid == tar->pid) {
printk(KERN_ALERT
"There is a value assigned before. Please read it first");
kfree(work);
return -1;
}
}
spin_lock_irq(&fib_lock);
list_add_tail(&work->list, &supervisor);
spin_unlock_irq(&fib_lock);
```
:::info
需要採取更新 offset 或是無視此次移動 offset 兩種方案的哪一種仍有待商榷,兩者的差別在於想要以過去的想要讀取的值為主,還是以新的值為主。
:::
### 使用系統的 default workqueue -> schedule_work
除了建立自己的 workqueue 之外,也可以使用系統的 default workqueue ,default workqueue 是由系統維護,也是一個常用的 workqueue 使用方法,所以此專案中介紹。
- 下方為實際的使用方法
```diff
- printk("%s", queue_work(fibdrv_wq, work) ? "true" : "false");
+ printk("%s", schedule_work(work) ? "true" : "false");
```
### 以 user space 的 pthread 檢驗是否不需要 mutex lock 保障正確性
不再由 mutex lock 避免 `fibdrv` 被同時開啟
```diff
static int fib_open(struct inode *inode, struct file *file)
{
- if (!mutex_trylock(&fib_mutex)) {
- printk(KERN_ALERT "fibdrv is in use");
- return -EBUSY;
- }
return 0;
}
```
利用以下程式碼可以檢驗 workqueue 的實作是否有效
```c
#define FIB_DEV "/dev/fibonacci"
static int fd;
void *worker(void* arg){
char buf[100] = {0};
int delay = *((int *) arg);
for (int i = 80; i <= 85; i++) {
lseek(fd, i, SEEK_SET);
sleep(delay);
read(fd, buf, 0);
printf("thread %d F(%d): %s\n", delay, i, buf);
memset(buf, 0, sizeof(buf));
}
}
int main()
{
fd = open(FIB_DEV, O_RDWR);
if (fd < 0) {
perror("Failed to open character device");
exit(1);
}
pthread_t test[2];
int delay_a = 1;
pthread_create(&test[0], NULL, worker, (void *) &delay_a);
int delay_b = 2;
pthread_create(&test[1], NULL, worker, (void *) &delay_b);
for(int i = 0; i < 2; i++)
pthread_join(test[i], NULL);
close(fd);
return 0;
}
```
原本的執行結果可以很明顯的看到,沒有 mutex 的保護會導致回傳值出錯
```
thread 1 F(80): 23416728348467685
thread 2 F(80): 37889062373143906
thread 1 F(81): 37889062373143906
thread 1 F(82): 61305790721611591
thread 2 F(81): 99194853094755497
thread 1 F(83): 61305790721611591
thread 1 F(84): 160500643816367088
thread 2 F(82): 259695496911122585
thread 1 F(85): 99194853094755497
thread 2 F(83): 99194853094755497
thread 2 F(84): 160500643816367088
thread 2 F(85): 259695496911122585
```
利用 workqueue 機制增加吞吐量的同時,確保正確性
```
thread 1 F(80): 23416728348467685
thread 2 F(80): 23416728348467685
thread 1 F(81): 37889062373143906
thread 1 F(82): 61305790721611591
thread 2 F(81): 37889062373143906
thread 1 F(83): 99194853094755497
thread 1 F(84): 160500643816367088
thread 2 F(82): 61305790721611591
thread 1 F(85): 259695496911122585
thread 2 F(83): 99194853094755497
thread 2 F(84): 160500643816367088
thread 2 F(85): 259695496911122585
```
:::warning
若 userspace 欲同時存取 fibdrv 的執行緒數量大於 CMWQ 的 worker 數量,會發生什麼事?
:notes: jserv
:::
:::warning
- 如果想要存取 fibdrv 的 thread 數量,大於目前 CPU 所擁有的 worker 數量,CMWQ 會建立新的 worker 來執行需要執行的 work 來提升整體的產出量。根據官方文件寫說,worker 只是一個 kernel thread ,佔用的資源並不多,所以這樣的處理方式也是合理的。
- 上面的敘述有一個前提,workqueue 所佔用的 worker 不會超過原本建立 workqueue 時所建立的 max_active 所設定的值。
- 對於 bounded workqueue ,官方設定的 default 值為 256 ,而可以設定的上限為 512 。
- 對於 unbounded workqueue ,官方設定的 default 值為 256 ,而可以設定的上限為 max(512, 4 * num_possible_cpus()) 。
- 如果超過可以佔用的 worker 數量,就只能由先要求被執行的 work 開始執行。( 如下方的範例 )
- 假設 w0, w1, w2 全部被 queue 在 wq q0 上,max_active 被設定為 2
- w0 需要先佔用 cpu 5ms ,sleep 10 秒之後,再佔用 cpu 5ms
- w1, w2 只需要佔用 cpu 5ms ,再 sleep 10 秒之後,即可完成工作
```
TIME IN MSECS EVENT
0 w0 starts and burns CPU
5 w0 sleeps
5 w1 starts and burns CPU
10 w1 sleeps
15 w0 wakes up and burns CPU
20 w0 finishes
20 w1 wakes up and finishes
20 w2 starts and burns CPU
25 w2 sleeps
35 w2 wakes up and finishes
```
:::
## 參考資料
[Linux 官方程式碼 :linux/workqueue.h](https://github.com/torvalds/linux/blob/master/include/linux/workqueue.h)
[Linux 官方程式碼 :kernel/workqueue.c](https://github.com/torvalds/linux/blob/master/kernel/workqueue.c)