Try   HackMD

2020q1 Homework2 (fibdrv)

contributed by < KYG-yaya573142 >

H03: fibdrv

預期目標

  • 撰寫適用於 Linux 核心層級的程式
  • 學習 ktimer, copy_to_user 一類的核心 API
  • 複習 C 語言 數值系統 和 bitwise operation
  • 初探 Linux VFS
  • 自動測試機制
  • 透過工具進行效能分析

排除干擾效能分析的因素

限定 CPU 給特定的程式使用

修改 /etc/default/grub 內的 GRUB_CMDLINE_LINUX_DEFAULT,加入 isolcpus=7 來指定編號 7 的核心不被排程器賦予任務,修改完成後需 sudo update-grub 來更新設定,重開機後即生效 (可從 /sys/devices/system/cpu/isolated 確認是否生效)

GRUB_CMDLINE_LINUX_DEFAULT="quiet splash isolcpus=7"

修改後可觀察到 PID 1 - init 的 affinity list 不包含該編號的 CPU

$ taskset -cp 1
pid 1's current affinity list: 0-6

將程式固定在特定的 CPU 中執行

透過指定處理器親和性 (processor affinity,亦稱 CPU pinning),可以將程式固定在特定的 CPU 中執行

$ sudo taskset -c 7 ./client

抑制 address space layout randomization (ASLR)

$ sudo sh -c "echo 0 > /proc/sys/kernel/randomize_va_space"

設定 scaling_governor 為 performance

準備以下 shell script 來設定 CPU 以最高頻率執行所有 process

$ cat performance.sh

for i in /sys/devices/system/cpu/cpu*/cpufreq/scaling_governor
do
    echo performance > /sys/devices/system/cpu/cpu*/cpufreq/scaling_governor
done

$ sudo sh performance.sh

關閉 turbo mode

關閉 Intel 處理器的 turbo mode

$ sudo sh -c "echo 1 > /sys/devices/system/cpu/intel_pstate/no_turbo"

整合為單一 script

整合成單一 script 以便於重複操作,詳見 do_measurement.sh

SMP IRQ affinity

執行上述步驟後進行量測,發現結果仍有飄動的情況

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

針對 SMP IRQ affinity 進行設置,盡量避免 CPU 7 去處理 IRQ。使用以下 script 進行設置,僅將 CPU 7 從可用清單去除,但不大幅度改變本來系統的設置 (例如 smp_affinity 原本是 0~7,只會被更改為 0~6)
註: smp_affinity 和 smp_affinity_list 擇一設定即可

#!/bin/bash
for file in `find /proc/irq -name "smp_affinity"`
do
    var=0x`cat ${file}`
    var="$(( $var & 0x7f ))"
    var=`printf '%.2x' ${var}`
    sudo bash -c "echo ${var} > ${file}"
done
sudo bash -c "echo 7f > /proc/irq/default_smp_affinity"

設置完畢後可以透過 cat /proc/interrupts 觀察 CPU 7 的 IRQ 數量,同時也可以量測到更穩定的實驗結果

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

量測時間的方式

user space

使用 clock_gettime 來取得時間

#include <time.h>
...
struct timespec t1, t2;
clock_gettime(CLOCK_MONOTONIC, &t1);
// do something here
clock_gettime(CLOCK_MONOTONIC, &t2);
long long ut = (long long)(t2.tv_sec * 1e9 + t2.tv_nsec)
             - (t1.tv_sec * 1e9 + t1.tv_nsec); // ns

kernel space

使用 ktime 來量測執行時間,這裡參照作業說明的手法,挪用 fib_write 來回傳 kernel space 的執行時間,同時借用 size 參數當作切換的參數,以便於量測不同演算法的執行時間

static ssize_t fib_write(struct file *file,
                         const char *buf,
                         size_t size,
                         loff_t *offset)
{
    ktime_t kt;
    switch (size) {
    case 0:
        kt = ktime_get();
        fib_sequence(*offset);
        kt = ktime_sub(ktime_get(), kt);
        break;
    case 1:
        kt = ktime_get();
        fib_sequence_fdouble(*offset);
        kt = ktime_sub(ktime_get(), kt);
        break;
    case 2:
        return ktime_to_ns(ktime_get());
    default:
        return 0;
    }
    return (ssize_t) ktime_to_ns(kt);
}
  • write(fd, buf, 0) - 回傳 iterative 算法的執行時間
  • write(fd, buf, 1) - 回傳 fast doubling 的執行時間
  • write(fd, buf, 2) - 單純回傳在 kernel space 使用 ktime_get 獲得的時間點 (後續分析會用到)

統計量測結果

為了增加量測資料的代表性,對每一項費氏數列的計算時間採樣 1000 次,再根據 95% 的信賴區間來去除離群值,使用程式碼可參閱 client_statistic.c

user space 與 kernel space 的傳遞時間

system call overhead

使用上述量測時間的方式中提到的方式分別量測 user space 及 kernel space 花費的時間,再將兩者相減即可獲得 user space 呼叫 system call 所花費的時間

for (int i = 0; i <= offset; i++) {
    struct timespec t1, t2;
    long long kt;
    lseek(fd, i, SEEK_SET);
    clock_gettime(CLOCK_MONOTONIC, &t1);
    kt = write(fd, write_buf, 0); /* runtime in kernel space */
    clock_gettime(CLOCK_MONOTONIC, &t2);
    long long ut = (long long)(t2.tv_sec * 1e9 + t2.tv_nsec)
                 - (t1.tv_sec * 1e9 + t1.tv_nsec);

    printf("%d %lld %lld %lld\n", i, kt, ut, ut - kt);
}

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

  • 使用預設的演算法 (iterative w/ cache) 測試
  • 雖然計算費氏數列的時間會隨著項數提高而增加,但 system call 的 overhead 不變
  • 就算沒有呼叫 copy_from_user 來複製資料,仍花費約 500 ns 的時間在執行 system call 本身

fib_write 改為僅回傳 ktime_get 獲得的時間點,配合在 user space 記錄呼叫 write 的前後時間點,可以再進一步取得 system call overhead 中 user to kernel 和 kernel to user 所花費的時間

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

  • user to kernel 和 kernel to user 所花費的時間沒有明顯的差異
  • 前兩次的傳遞時間明顯高於其他數據,後續會進行討論

TODO:

  1. 透過 mlock 系列的系統呼叫,確保特定 page 不會被 swap out;
  2. 閱讀 Threaded RT-application with memory locking and stack handling example

:notes: jserv

KYG 已補上使用 mlock 進行的測試結果

接下來使用 mlock 系列的系統呼叫,並參照 Threaded RT-application with memory locking and stack handling example 的範例,確保 user space 的 page 不會被 swap out

struct timespec t1, t2;
long long sz, u2k, k2u;

if (mlockall(MCL_CURRENT | MCL_FUTURE))
    printf("mlockall failed!\n");

/* touch the whole page to make it mapped into RAM */
for (int i = 0; i <= offset; i++) {
    lseek(fd, i, SEEK_SET);
    clock_gettime(CLOCK_MONOTONIC, &t1);
    sz = write(fd, write_buf, 2);
    clock_gettime(CLOCK_MONOTONIC, &t2);
}
/* do actual measurements here */
for (int i = 0; i <= offset; i++) {
    lseek(fd, i, SEEK_SET);
    clock_gettime(CLOCK_MONOTONIC, &t1);
    sz = write(fd, write_buf, 2);
    clock_gettime(CLOCK_MONOTONIC, &t2);
    u2k = sz - (long long)(t1.tv_sec * 1e9 + t1.tv_nsec);
    k2u = (long long)(t2.tv_sec * 1e9 + t2.tv_nsec) - sz;
    printf("%d %lld %lld\n", i, u2k, k2u);
}

  • 前兩次花費的時間明顯降低,代表原本的延遲確實是 page fault 造成的

copy_{from/to}_user 的傳遞時間

修改 fib_readfib_write 來量測傳遞時間

static ssize_t fib_read(struct file *file,
                        char *buf,
                        size_t size,
                        loff_t *offset)
{
    ssize_t retval = 0;
    ktime_t kt;
    if (size > BUF_SIZE)
        size = BUF_SIZE;

    kt = ktime_get();
    retval = copy_to_user(buf, test_buffer, size);
    kt = ktime_sub(ktime_get(), kt);
    if (retval < 0)
        return -EFAULT;
    return (ssize_t) ktime_to_ns(kt);
}
  • 一般來說 readwrite 需負責更新 offset 的位置,為了簡化流程所以沒有實作這點
  • fib_write 一樣改法,只是將 copy_to_user 改為 copy_from_user

接著在 user space 使用對 /def/fibdrv 執行 readwrite 來取得 kernel space 的執行時間

long long wt, rt;

for (int i = 1; i < BUF_SIZE + 1; i++) {
    getrandom(test_data, BUF_SIZE, 0);
    memset(buf, 0, BUF_SIZE);

    wt = write(fd, test_data, i);;
    rt = read(fd, buf, i);
    printf("%d %lld %lld\n", i, wt, rt);
}

  • copy_to_user 根據 CPU 架構有不同的實作,以 x86 為例,在 /arch/x86/lib/copy_user_64.S 可以觀察到會使用不同的方式來加速複製 (視硬體支援),但原則上還是呈線性成長,只是斜率會不同

費氏數列

iterative 算法

fibdrv 一開始就是使用此法進行計算,時間複雜度為

O(n)

static long long fib_sequence(long long k)
{
    long long f[k + 2];
    f[0] = 0;
    f[1] = 1;

    for (int i = 2; i <= k; i++) {
        f[i] = f[i - 1] + f[i - 2];
    }
    return f[k];
}

fast doubling

參閱你所不知道的 C 語言:遞迴呼叫篇對 fast doubling 的說明,使用以下兩式來計算費氏數列

F(2k)=F(k)[2F(k+1)F(k)]F(2k+1)=F(k+1)2+F(k)2

fast doubling 每次疊代會根據 F(k) 與 F(k+1) 來計算 F(2k) 與 F(2k+1),由於 k 只能是整數,因此當所求項為奇數時,代表所求的目標會變成 F(2k+1),也就是乘 2 之後需要再加 1 項才是目標

以計算 F(10) 為例,首先考量 2 進位下的表達

1010=10102

乘 2 即左移一項 k << 1,因此可以理解為從 F(0) 開始,經過以下 4 個步驟可以得到 F(10),這也是為何 fast doubling 實作上會根據 bit 來決定計算的步驟

(0000 << 1) + 1 = 0001
(0001 << 1) + 0 = 0010
(0010 << 1) + 1 = 0101
(0101 << 1) + 0 = 1010

參考作業說明的虛擬碼以及Calculating Fibonacci Numbers by Fast Doubling後,實作程式碼如下

/* Calculate Fibonacci numbers by Fast Doubling */
static long long fib_sequence(long long n)
{
    if (n < 2) { /* F(0) = 0, F(1) = 1 */
        return n;
    }
    long long f[2];
    f[0] = 0; /* F(k) = dest */
    f[1] = 1; /* F(k+1) */

    for (unsigned int i = 1U << 31; i; i >>= 1) {
        /* F(2k) = F(k) * [ 2 * F(k+1) – F(k) ] */
        long long k1 = f[0] * (f[1] * 2 - f[0]);
        /* F(2k+1) = F(k)^2 + F(k+1)^2 */
        long long k2 = f[0] * f[0] + f[1] * f[1]; 
        if (n & i) {
            f[0] = k2;      /* (2k+1) */
            f[1] = k1 + k2; /* F(2k+2) */
        } else {
            f[0] = k1; /* F(2k) */
            f[1] = k2; /* F(2k+1) */
        }
    }
    return f[0];
}

  • fast doubling 計算所需時間有顯著的下降
    • iterative 方法的時間複雜度為
      O(n)
    • fast doubling 的時間複雜度降為
      O(log n)

clz 的影響

fast doubling 需要根據 bit 來決定計算的步驟,因此使用 clz/ctz 可以縮短尋找 MSB (Most Significant Bit) 的時間

static long long fib_sequence(long long n)
{
...
-   for (unsigned int i = 1U << 31; i; i >>= 1) {
+   for (unsigned int i = 1U << (31 - __builtin_clz(n)); i; i >>= 1) {
    ...
    }
}

Other Built-in Functions Provided by GCC

Returns the number of leading 0-bits in x, starting at the most significant bit position. If x is 0, the result is undefined.

  • 特別注意 __builtin_clz 的參數不能是 0

使用以下幾種方式實作 fast doubling 並測試,目的是凸顯尋找 MSB 所耗費的時間差異

  • 使用 __builtin_clz 來尋找 MSB
  • 一律從頭尋找 1U << 31
  • 從中間某個點開始尋找 1U << 161U << 6 (
    9210=10111002
    ,因此至少需要移動 6 bits,否則會計算錯誤)

  • __builtin_clz 所耗時間最少
  • 從其餘 3 筆資料可以證實尋訪的 bits 越多越耗時間

心得 - 注意實驗設置

一開始我使用以下方式來測試執行時間

static ssize_t fib_read(struct file *file,
                        char *buf,
                        size_t size,
                        loff_t *offset)
{
    ktime_t kt;
    kt = ktime_get();
    fib_sequence_fdouble(*offset);
    kt = ktime_sub(ktime_get(), kt);
    return (ssize_t) ktime_to_ns(kt);
}

得到的結果如下,顯示使用 clz 沒有比較快,甚至可能更慢一些

怎麼想都覺得事有蹊俏,決定看一下組合語言 (如果有 offset 可以加上 --adjust-vma=0xoffset 來調整,從 /proc/modules 可以查詢 offset)

$ cat /proc/modules | grep fib
fibdrv_new 16384 0 - Live 0x0000000000000000 (OE)
$ objdump -dS fibdrv_new.ko

結果發現 fib_sequence_fdouble 直接被 opt out 啦,所以我一開始測出來的數據根本就沒有執行費氏數列相關的計算

00000000000000b0 <fib_read>:
  b0:	e8 00 00 00 00       	callq  b5 <fib_read+0x5>
  b5:	55                   	push   %rbp
  b6:	48 89 e5             	mov    %rsp,%rbp
  b9:	53                   	push   %rbx
  ba:	e8 00 00 00 00       	callq  bf <fib_read+0xf>
  bf:	48 89 c3             	mov    %rax,%rbx
  c2:	e8 00 00 00 00       	callq  c7 <fib_read+0x17>
  c7:	48 29 d8             	sub    %rbx,%rax
  ca:	5b                   	pop    %rbx
  cb:	5d                   	pop    %rbp
  cc:	c3                   	retq   
  cd:	0f 1f 00             	nopl   (%rax)

為了避免 fib_sequence_fdouble 因沒有用到回傳值而被優化掉,參照 CppCon 2015: Tuning C++: Benchmarks, and CPUs, and Compilers! Oh My! 提到的方法,使用 escape 來確保回傳值不會被 opt out (inline asm 相關的語法請參閱 GCC Extended Asm)

__attribute__((always_inline))
static inline void escape(void *p) {
  __asm__ volatile ("" : : "g"(p) : "memory");
}

static ssize_t fib_read(struct file *file,
                        char *buf,
                        size_t size,
                        loff_t *offset)
{
    long long result = 0;
    ktime_t kt;
    kt = ktime_get();
    result = fib_sequence_fdouble(*offset);
    kt = ktime_sub(ktime_get(), kt);
    escape(&result);
    return (ssize_t) ktime_to_ns(kt);
}

另外,也可以觀察 __builtin_clz 是如何實作的

00000000000000b0 <fib_sequence_fdouble_clz.part.1>:
  b0:	e8 00 00 00 00       	callq  b5 <fib_sequence_fdouble_clz.part.1+0x5>
  b5:	0f bd c7             	bsr    %edi,%eax  #__builtin_clz
  b8:	55                   	push   %rbp
  b9:	83 f0 e0             	xor    $0xffffffe0,%eax
  bc:	8d 48 20             	lea    0x20(%rax),%ecx
  bf:	b8 01 00 00 00       	mov    $0x1,%eax
  c4:	48 89 e5             	mov    %rsp,%rbp
  c7:	d3 e0                	shl    %cl,%eax  #0x1 << rcx
...
  • 目標是 1U << (31 - __builtin_clz),也就是 MSB 後有幾個 bits,其實會直接等於 instruction - bsr 的結果
  • xor $0xffffffe0,%eaxlea 0x20(%rax),%ecx 這兩個 instruction 看起來就算不做,結果也一樣,不知道是否有我沒想到的作用

F(92) 以後的數值錯誤的原因

初次執行 client 會發現從 F(92) 之後輸出的數值都一樣,這是因為 fibdrv 中預設限制最大項目為 92

/* MAX_LENGTH is set to 92 because
 * ssize_t can't fit the number > 92
 */
#define MAX_LENGTH 92

fib_read 返回的資料型態為 long long,即 64 bits 的有號整數,可涵蓋的整數值介於

26411
264
之間,比對費氏數列的正確值,可確認 F(93) 會超出此範圍,這也是預設限制最大可用項目為 92 的原因

F(0)     = 0
F(1)     = 1
...
F(91)    = 4660046610375530309 
F(92)    = 7540113804746346429

2^63 - 1 = 9223372036854775808

F(93)    = 12200160415121876738

移除限制並重新觀察輸出,會從 F(93) 開始 overflow

F(92)    = 7540113804746346429
F(93)    = -6246583658587674878
F(94)    = 1293530146158671551

雖然結果 overflow,但可根據 two's complement 算出 overflow 後為何是這個數值

if(A+B)>TMax(overflow)result=A+B264=F(91)+F(92)264=6246583658587674878

將使用的 data type 由 long long 更改為 uint64_t,可以多計算出一項正確的數值 F(93),不過從 F(94) 開始仍會 overflow

F(92)    = 7540113804746346429
F(93)    = 12200160415121876738
F(94)    = 1293530146158671551
F(95)    = 13493690561280548289

一樣可以檢驗 overflow 後為何是這個數值

if(A+B)>UMax(overflow)result=A+B(mod2w1)=A+B264=F(92)+F(93)264=1293530146158671551

大數運算

本章節紀錄的程式碼皆為初版,下個章節 如何減少大數運算的成本 中會再對程式碼進行優化

初版實作程式碼可參閱 bn.cbn.h

最新版的實作程式碼可參閱 bn.cbn.h

bn 資料結構

為了計算 92 項以後的費氏數列,須採用長度可變動的數值表示法,動態配置不同大小的記憶體來呈現更大範圍的整數,定義的資料結構如下

/* number[size - 1] = msb, number[0] = lsb */
typedef struct _bn {
    unsigned int *number;
    unsigned int size;
    int sign;
} bn;
  • number - 指向儲存的數值,之後會以 array 的形式來取用
  • size - 配置的記憶體大小,單位為 sizeof(unsigned int)
  • sign - 0 為正數、1 為負數

由於大數沒辦法直接以數值的形式列出,這裡改用字串來呈現,轉換的部分利用 ASCII 的特性並根據 fast doubling 的邏輯來 "組合" 出 10 進位字串

/* 
 * output bn to decimal string
 * Note: the returned string should be freed with kfree()
 */
char* bn_to_string(bn src)
{
    // log10(x) = log2(x) / log2(10) ~= log2(x) / 3.322
    size_t len = (8 * sizeof(int) * src.size) / 3 + 2 + src.sign;
    char *s = kmalloc(len, GFP_KERNEL);
    char *p = s;

    memset(s, '0', len - 1);
    s[len - 1] = '\0';

    for (int i = src.size - 1; i >= 0; i--) {
        for (unsigned int d = 1U << 31; d; d >>= 1) {
            /* binary -> decimal string */
            int carry = ((d & src.number[i]) == 1);
            for (int j = len - 2; j >= 0; j--) {
                s[j] += s[j] - '0' + carry; // double it
                carry = (s[j] > '9');
                if (carry)
                    s[j] -= 10;
            }
        }
    }
    // skip leading zero
    while (p[0] == '0' && p[1] != '\0') { 
        p++;
    }
    if (src.sign)
        *(--p) = '-';
    memmove(s, p, strlen(p) + 1);
    return s;
}

加法與減法

加法與減法由於需要考慮數值的正負號,因此分為兩個步驟,先由 bn_addbn_sub 判斷結果的正負號,再使用輔助函數 bn_do_addbn_do_add 進行無號整數的計算

/* 
 * c = a + b 
 * Note: work for c == a or c == b
 */
void bn_add(const bn *a, const bn *b, bn *c)
{
    if (a->sign == b->sign) { // both positive or negative
        bn_do_add(a, b, c);
        c->sign = a->sign;
    } else { // different sign
        if (a->sign)  // let a > 0, b < 0
            SWAP(a, b);
        int cmp = bn_cmp(a, b);
        if (cmp > 0) {
            /* |a| > |b| and b < 0, hence c = a - |b| */
            bn_do_sub(a, b, c);
            c->sign = 0;
        } else if (cmp < 0) {
            /* |a| < |b| and b < 0, hence c = -(|b| - |a|) */
            bn_do_sub(b, a, c);
            c->sign = 1;
        } else {
            /* |a| == |b| */
            bn_resize(c, 1);
            c->number[0] = 0;
            c->sign = 0;
        }
    }
}

/* 
 * c = a - b 
 * Note: work for c == a or c == b
 */
void bn_sub(const bn *a, const bn *b, bn *c)
{
    /* xor the sign bit of b and let bn_add handle it */
    bn tmp = *b;
    tmp.sign ^= 1; // a - b = a + (-b)
    bn_add(a, &tmp, c);
}
  • 分類的方法參考了 bignum
  • bn_add 負責所有正負號的判斷,所以 bn_sub 只是改變 b 的正負號後,再直接交給 bn_add 判斷
    • 但不能直接改變 b 的數值,所以這裡使用 tmp 來暫時的賦予不同的正負號
  • bn_cmp 負責比對兩個 bn 物件開絕對值後的大小,邏輯類似 strcmp
/* |c| = |a| + |b| */
static void bn_do_add(const bn *a, const bn *b, bn *c)
{
    // max digits = max(sizeof(a) + sizeof(b)) + 1
    int d = MAX(bn_msb(a), bn_msb(b)) + 1;
    d = DIV_ROUNDUP(d, 32) + !d;
    bn_resize(c, d); // round up, min size = 1

    unsigned long long int carry = 0;
    for (int i = 0; i < c->size; i++) {
        unsigned int tmp1 = (i < a->size) ? a->number[i] : 0;
        unsigned int tmp2 = (i < b->size) ? b->number[i] : 0;
        carry += (unsigned long long int) tmp1 + tmp2;
        c->number[i] = carry;
        carry >>= 32;
    }

    if (!c->number[c->size - 1] && c->size > 1)
        bn_resize(c, c->size - 1);
}
  • 加法的部分比較簡單,只須確保 c 的大小足以儲存計算結果
  • DIV_ROUNDUP 的用法參考自 /arch/um/drivers/cow_user.c
  • 使用 8 bytes 大小的 carry 來實行兩個 4 bytes 項目的加法來避免 overflow
    • 等號右方記得要先將其中一方進行 integer promotion,不然會先被 truncated 然後才 implicit integer promotion
  • bn_msbbn_clz 是 bn 版本的 clz,詳見 bn_kernel.c
/* 
 * |c| = |a| - |b| 
 * Note: |a| > |b| must be true
 */
static void bn_do_sub(const bn *a, const bn *b, bn *c)
{
    // max digits = max(sizeof(a) + sizeof(b))
    int d = MAX(a->size, b->size);
    bn_resize(c, d);

    long long int carry = 0;
    for (int i = 0; i < c->size; i++) {
        unsigned int tmp1 = (i < a->size) ? a->number[i] : 0;
        unsigned int tmp2 = (i < b->size) ? b->number[i] : 0;
        carry = (long long int) tmp1 - tmp2 - carry;
        if (carry < 0) {
            c->number[i] = carry + (1LL << 32);
            carry = 1;
        } else {
            c->number[i] = carry;
            carry = 0;
        }
    }
    
    d = bn_clz(c) / 32;
    if (d == c->size)
        --d;
    bn_resize(c, c->size - d);
}
  • 實際上使用無號整數進行計算,因此若絕對值相減會小於 0,需先對調 ab,並於計算完成後再再補上負號
  • 計算的邏輯和 bn_do_add 一樣,不過此時 carry 是作為借位使用

乘法

/* 
 * c = a x b
 * Note: work for c == a or c == b
 * using the simple quadratic-time algorithm (long multiplication)
 */
void bn_mult(const bn *a, const bn *b, bn *c)
{
    // max digits = sizeof(a) + sizeof(b))
    int d = bn_msb(a) + bn_msb(b);
    d = DIV_ROUNDUP(d, 32) + !d; // round up, min size = 1
    bn *tmp;
    /* make it work properly when c == a or c == b */
    if (c == a || c == b) {
        tmp = c; // save c
        c = bn_alloc(d);
    } else {
        tmp = NULL;
        for (int i = 0; i < c->size; i++)
            c->number[i] = 0;
        bn_resize(c, d);
    }
    
    for (int i = 0; i < a->size; i++) {
        for (int j = 0; j < b->size; j++) {
            unsigned long long int carry = 0;
            carry = (unsigned long long int) a->number[i] * b->number[j];
            bn_mult_add(c, i + j, carry);
        }
    }
    c->sign = a->sign ^ b->sign;

    if (tmp) {
        bn_swap(tmp, c); // restore c
        bn_free(c);
    }
}
  • 目前採用最簡單的 long multiplication,就像手算乘法一樣疊加上去
  • 與加減法不同,若 c == a || c == b,就必須配置記憶體來儲存計算結果,避免 ab 在計算途中就被改變
  • 輔助函式 bn_mult_add 負責將每一行的計算結果疊加上去,如下
/* c += x, starting from offset */
static void bn_mult_add(bn*c, int offset, unsigned long long int x)
{
    unsigned long long int carry = 0;
    for (int i = offset; i < c->size; i++) {
        carry += c->number[i] + (x & 0xFFFFFFFF);
        c->number[i] = carry;
        carry >>= 32;
        x >>= 32;
        if (!x && ! carry) //done
            return;
    }
}

bit shift

/* left bit shift on bn (maximun shift 31) */
void bn_lshift(const bn *src, size_t shift, bn *dest)
{
    size_t z = bn_clz(src);
    shift %= 32;  // only handle shift within 32 bits atm
    if (!shift)
        return;

    if (shift > z) {
        bn_resize(dest, src->size + 1);
    } else {
        bn_resize(dest, src->size);
    }
    /* bit shift */
    for (int i = src->size - 1; i > 0; i--)
        dest->number[i] =
            src->number[i] << shift | src->number[i - 1] >> (32 - shift);
    dest->number[0] = src->number[0] << shift;
}
  • 如果要移動超過 32 bits 會比較麻煩,考量目前不會有這種需求,先以較簡單的方式實作
  • 邏輯類似 quiz4 的 bitcpy

swap

void bn_swap(bn *a, bn *b)
{
    bn tmp = *a;
    *a = *b;
    *b = tmp;
}
  • bn 資料結構中 number 紀錄的是指標,因此這麼做可以確實的互換兩個 bn 的數值,但不用更動儲存在 heap 中的數值

正確計算 F(92) 以後的數值

使用實作的大數運算來計算第 92 項以後的費氏數列,首先是 iterative 算法

/* calc n-th Fibonacci number and save into dest */
void bn_fib(bn *dest, unsigned int n)
{
    bn_resize(dest, 1);
    if (n < 2) { //Fib(0) = 0, Fib(1) = 1
        dest->number[0] = n;
        return;
    }

    bn *a = bn_alloc(1);
    bn *b = bn_alloc(1);
    dest->number[0] = 1;

    for (unsigned int i = 1; i < n; i++) {
        bn_swap(b, dest);
        bn_add(a, b, dest);
        bn_swap(a, b);
    }
    bn_free(a);
    bn_free(b);
}

接著是 fast doubling 的實作

void bn_fib_fdoubling(bn *dest, unsigned int n)
{
    bn_resize(dest, 1);
    if (n < 2) { //Fib(0) = 0, Fib(1) = 1
        dest->number[0] = n;
        return;
    }

    bn *f1 = dest; /* F(k) */
    bn *f2 = bn_alloc(1); /* F(k+1) */
    f1->number[0] = 0;
    f2->number[0] = 1;
    bn *k1 = bn_alloc(1);
    bn *k2 = bn_alloc(1);

    for (unsigned int i = 1U << 31; i; i >>= 1) {
        /* F(2k) = F(k) * [ 2 * F(k+1) – F(k) ] */
        bn_cpy(k1, f2);
        bn_lshift(k1, 1);
        bn_sub(k1, f1, k1);
        bn_mult(k1, f1, k1);
        /* F(2k+1) = F(k)^2 + F(k+1)^2 */
        bn_mult(f1, f1, f1);
        bn_mult(f2, f2, f2);
        bn_cpy(k2, f1);
        bn_add(k2, f2, k2);
        if (n & i) {
            bn_cpy(f1, k2);
            bn_cpy(f2, k1);
            bn_add(f2, k2, f2);
        } else {
            bn_cpy(f1, k1);
            bn_cpy(f2, k2);
        }
    }
    bn_free(f2);
    bn_free(k1);
    bn_free(k2);
}

使用以下 python 程式碼進行驗證,至少能正確計算至第 100000 項

def read_file(filename):
    f = open(filename, 'r')
    a = int(f.readline().strip())
    b = int(f.readline().strip())
    for target in f:
        target = int(target.strip())
        a, b = b, a + b  # a <- b, b <- (a + b)
        if b != target:
            print("wrong answer with value %d" % (target))
            return
    print("validation passed!")
    
parser = argparse.ArgumentParser(description='Validate the correctness of fibonacci numbers.')
parser.add_argument('--file', '-f', metavar='file_name', type=str, required=True, help='file for testing')
args = parser.parse_args()
read_file(args.file)

如何減少大數運算的成本

接下來會以 perf 分析函式的熱點,再逐步改善大數運算的效能

所有的實作程式碼可參閱 bn.cbn.h

原本的運算成本

測試環境

$ cat /proc/version
Linux version 5.3.0-53-generic (buildd@lgw01-amd64-016)
(gcc version 7.5.0 (Ubuntu 7.5.0-3ubuntu1~18.04))

$ cat /etc/os-release
NAME="Ubuntu"
VERSION="18.04.4 LTS (Bionic Beaver)"
...
  • 在 user space 進行量測
  • compiler flag: -O2-g-fno-omit-frame-pointer
    • 為了讓 perf record 更準確的量測 call graph,會需要詳細的 frame pointer 紀錄

運算成本

初版實作的效能如下,參考組為老師實作的 bignum

  • 效能明顯低於參考組,就連單純由加法實作的 iterative 版本演算法的速度都比較慢
  • fast doubling 的成本一開始就比較高,顯示我實作的方式有問題,後續會進行改善

perf

本文中主要使用 perf statperf recordperf report 這三種工具,以下分別介紹接下來會用的設置參數

perf stat

$ sudo perf stat -r 10 -e cycles,instructions,cache-misses,cache-references,branch-instructions,branch-misses ./fib
  • perf stat 用來快速的檢視量測的統計資料,詳細的訊息需使用 perf record
  • -r 10 : 量測 10 次,目的是確認每次量測間沒有明顯的波動
  • -e : 指定要量測的項目

perf record

$ sudo perf record
    [-g] [--call graph <fp, dwarf, lbr>]
    <command>
  • perf record 會紀錄樣本,預設輸出檔名為 perf.data
  • -g : 紀錄 call graph,可搭配 --call graph 指定 stack trace 的方式,預設為 fp
    • fp (frame pointer) 會需要搭配編譯器選項 --fomit-frame-pointer 使用,但如果受測函數呼叫 libc 函式,會因為函式庫編譯時沒有使用該 flag 編譯而導致 perf record 無法正確紀錄 stack trace,改用 dwarf 或是 lbr 可以避免這個問題
    • man perf record 中有提到如果有啟用 --fomit-frame-pointer,建議使用 dwarf 來量測,因此接下來都使用 dwarf

perf report

$ sudo perf report
    [--tui | --stdio]
    [--children|--no-children]
    [-g <graph,0.5,caller|callee>]
    [-i <file> | --input=file]
  • perf report 會顯示 perf record 採樣的結果,預設讀取的檔案名稱為 perf.data
  • --stdio : 使用 stdio 作為輸出介面,主要是方便我將結果貼上來,預設是 --tui
  • --children : 將 callee 的樣本加入 caller,這個選項與 -g 有關,預設為 --children
  • -g : 顯示 call graph,使用 perf record 時如果有 -g,perf report 時也要一併使用
    • 使用 --children 時,-g 的預設值是 graph,0.5,caller,會產生 caller-based 的 call graph
    • 使用 --no-children 時,-g 的預設值是 graph,0.5,callee,會產生 callee-based 的 call graph

caller 與 callee 的差別

perf report 可以對同一筆資料分別產生 caller-based 與 callee-based 兩種 call graph,分別提供不同的數據解讀方向。接下來以一個簡單的程式舉例,考量以下程式碼 foobar.c

void foo(void) {
    /* do something */
}

void bar(void) {
    /* do something */
    foo();
}

int main(void) {
    bar()
    return 0;
}

caller-based call graph

$ sudo perf report -g --stdio

Children      Self  Command  Shared Object      Symbol
........  ........  .......  .................  ..........
 100.00%     0.00%  foobar   foobar             [.] main
          |
           -- __libc_start_main
              main
              |
               --100.00%--bar
                          |
                           --60.00%--foo
                            
 100.00%    40.00%  foobar   foobar             [.] bar
          |
          |--60.00%--bar
          |          |
          |           --60.00%--foo
          |   
           --40.00%--__libc_start_main
                     main
                     bar

  60.00%    60.00%  foobar   foobar             [.] foo
          |
           --60.00%--__libc_start_main
                     main
                     bar
                     foo
  • 每個函式 (Symbol) 在第一次分支時會被分為 2 類
    • Children : 結束於其他函式的 stack trace
    • Self : 結束於此函數的 stack trace
    • 只有第一階的分支會將兩者都列出來,第二階以後通常只會列出第二階函式的 Children
    • 注意分支擺放順序的依據是比例大小,並非固定先顯示 Children 再顯示 Self
  • 觀察 main 的 stack trace
    • 所有函式起點都是 main,因此 Children 是 100%
    • main 總是呼叫 bar,因此 Self 是 0%
  • 觀察 bar 的 stack trace
    • 有 40% 的樣本結束於 bar,因此 Self 是 40%,呼叫順序為 __libc_start_main > main > bar
    • 有 60% 的樣本會再呼叫 foo
  • main 和 bar 的 Children 都是 100%,因為此範例假定 bar 返回至 main 後會立刻結束,因此不會採集到任何樣本
  • caller-based 能提供一個函式是如何組成的資訊,適合用來分析一個函式中各個函式呼叫所佔的比例

callee-based

$ sudo perf report -g --no-children --stdio

Overhead  Command  Shared Object      Symbol
........  .......  .................  ..........
  60.00%   foobar  foobar             [.] foo
          |
           --60.00%--__libc_start_main
                     main
                     bar
                     foo

  40.00%   foobar  foobar             [.] bar
          |   
           --40.00%--__libc_start_main
                     main
                     bar
  • Overhead 和 caller-based call graph 中的 Self 同義,代表此函式占整體執行時間的比例
  • 分支會顯示該函式被呼叫的途徑與對應的比例
  • callee-based 能提供函示被呼叫的途徑與比例,適合用來優化函式的呼叫途徑

測試程式碼

perf recordperf stat 皆使用以下程式碼進行測試

#define ITH 1000
#define ITER_TIMES 2000000

int main(int argc, char const *argv[])
{
    bn *test = bn_alloc(1);
    for (int i = 0; i < ITER_TIMES; i++) {
        bn_fib(test, ITH);
        escape(test->number);
    }
    bn_free(test);
    return 0;
}
  • 每次計算花費的時間大約在 10000 ns = 100000 Hz,但 perf record 採樣的最大頻率大約是 10000 Hz,直接量測單個函式會有明顯的觀察者效應,因此需要多次重複執行來確保採集的樣本具有代表性
  • 重複的次數 ITER_TIMES 會根據不同的量測範圍 ITH 與使用的演算法而改變,但後續的討論只會直接比對同條件下的量測結果
  • escape 用來確保每次迴圈都會確實的執行受測函式

優化 bn_fib_fdoubling

先以 perf stat 分析目前實作的效能,作為後續比對的基準

    63,453,850,327      cycles                                                        ( +-  0.03% )  (66.65%)
   182,785,094,108      instructions              #    2.88  insn per cycle           ( +-  0.00% )  (83.33%)
            15,795      cache-misses              #    1.375 % of all cache refs      ( +- 19.12% )  (83.33%)
         1,148,592      cache-references                                              ( +- 11.66% )  (83.34%)
    36,448,212,424      branch-instructions                                           ( +-  0.00% )  (83.34%)
       117,825,450      branch-misses             #    0.32% of all branches          ( +-  0.56% )  (83.33%)

          18.73770 +- 0.00638 seconds time elapsed  ( +-  0.03% )

接下來使用 perf record 量測 call graph (有省略部分內容來提升可讀性)

$ sudo perf record -g --call-graph dwarf ./fib
$ sudo perf report --stdio -g graph,0.5,caller

# Children      Self  Command  Shared Object      Symbol
# ........  ........  .......  .................  .................................
#
    84.92%     1.89%  fib      fib                [.] bn_fib_fdoubling
            |
            |--83.03%--bn_fib_fdoubling
            |          |
            |          |--48.43%--bn_mult
            |          |          |
            |          |          |--20.74%--bn_alloc
            |          |          |          |
            |          |          |          |--14.45%--__libc_calloc
            |          |          |          |
            |          |          |           --4.93%--__GI___libc_malloc (inlined)
            |          |          |
            |          |          |--13.00%--bn_mult_add (inlined)
            |          |          |
            |          |          |--3.43%--bn_msb (inlined)
            |          |          |
            |          |           --1.17%--bn_swap (inlined)
            |          |
            |          |--16.18%--bn_free
            |          |          |
            |          |          |--14.70%--__GI___libc_free (inlined)
            |          |          |
            |          |           --0.81%--free@plt
            |          |
            |          |--6.31%--bn_cpy
            |          |          |
            |          |          |--3.67%--memcpy (inlined)
            |          |          |
            |          |           --1.61%--bn_resize
            |          |                     |
            |          |                      --0.99%--__GI___libc_realloc (inlined)
            |          |--4.69%--bn_add
            |          |          |
            |          |           --4.52%--bn_do_add (inlined)
            |          |                     |
            |          |                      --1.69%--bn_msb (inlined)
            |          |
            |          |--4.31%--bn_sub (inlined)
            |          |          |
            |          |           --4.25%--bn_add
            |          |                     |
            |          |                     |--2.35%--bn_do_sub
            |          |                     |
            |          |                      --0.58%--bn_cmp
            |          |--1.93%--bn_lshift
            |          |          |
            |          |           --0.84%--bn_clz (inlined)
            |          |
            |           --0.55%--bn_alloc
            |
             --1.07%--_start
                       __libc_start_main
                       main
                       bn_fib_fdoubling
  • 有 84.92% 的時間 (準確來說是樣本數) 落在 bn_fib_fdoubling 內,其中有 83.03% 的時間會再呼叫其他函式
  • bn_mult 佔整體 48.43% 的時間,因此優化乘法會帶來明顯的效能增益
  • bn_fib_fdoubling 內有接近一半的時間在管理動態記憶體與複製資料,顯然需要相關的策略來降低這部分的成本
  • bn_addbn_sub 共佔 9% 的時間,需要再單獨使用 iterative 版本的 bn_fib 來進行分析與優化,否則很難在 bn_fib_fdoubling 內觀察到效能增益
  • bn_free 占有高比例的原因不明,目前先猜測可能是因為 bn_mult 過度頻繁的呼叫 bn_allocbn_free

改善方案 1 - 改寫 bn_fib_fdoubling 實作的方式

  • 原本的寫法局限於使用 bn_cpy 來更新暫存變數 k1k2 的數值,其實可以藉由 bn_swap 以及改變各函式儲存結果的位置來達成一樣的目的,將所有的 bn_cpy 去除來降低複製資料造成的成本
  • 當資料來源與目的重疊時 (c == a || c == b),bn_mult 必須先配置暫存的記憶體空間來儲存計算結果,因此可以進一步確保呼叫 bn_mult 時不要發生此狀況,降低使用 mallocmemcpy 的次數
void bn_fib_fdoubling(bn *dest, unsigned int n)
{
    ...
    for (unsigned int i = 1U << (31 - __builtin_clz(n)); i; i >>= 1) {
        /* F(2k) = F(k) * [ 2 * F(k+1) – F(k) ] */
        /* F(2k+1) = F(k)^2 + F(k+1)^2 */
        bn_lshift(f2, 1, k1);// k1 = 2 * F(k+1)
        bn_sub(k1, f1, k1);  // k1 = 2 * F(k+1) – F(k)
        bn_mult(k1, f1, k2); // k2 = k1 * f1 = F(2k)
        bn_mult(f1, f1, k1); // k1 = F(k)^2
        bn_swap(f1, k2);     // f1 <-> k2, f1 = F(2k) now
        bn_mult(f2, f2, k2); // k2 = F(k+1)^2
        bn_add(k1, k2, f2);  // f2 = f1^2 + f2^2 = F(2k+1) now
        if (n & i) {
            bn_swap(f1, f2);    // f1 = F(2k+1)
            bn_add(f1, f2, f2); // f2 = F(2k+2)
        }
    }
    ...
}

結果如下 (v1 綠線)

    24,770,616,442      cycles                                                        ( +-  0.05% )  (66.63%)
    71,462,180,892      instructions              #    2.88  insn per cycle           ( +-  0.00% )  (83.32%)
             8,406      cache-misses              #    1.048 % of all cache refs      ( +-  4.19% )  (83.33%)
           802,258      cache-references                                              ( +-  9.39% )  (83.34%)
    12,105,857,981      branch-instructions                                           ( +-  0.00% )  (83.36%)
        39,389,038      branch-misses             #    0.33% of all branches          ( +-  1.16% )  (83.33%)

           7.31640 +- 0.00362 seconds time elapsed  ( +-  0.05% )
  • 效能大幅度改善,時間從 18.73770s 降到 7.31640s
  • 複製資料的成本真的很大,不難想像為何會有 COW 等策略來降低成本

改善方案 2 - 使用不同的 Q-Matrix 實作 bn_fib_fdoubling

觀察老師的 bignum 中費氏數列的實作函式 fibonacci,會發現雖然一樣基於 fast doubling,但是使用了稍微不一樣的 Q-matrix,推導如下

[F(2n1)F(2n)]=[0111]2n[F(0)F(1)]=[F(n1)F(n)F(n)F(n+1)][F(n1)F(n)F(n)F(n+1)][10]=[F(n)2+F(n1)2F(n)F(n)+F(n)F(n1)]

整理後可得

F(2k1)=F(k)2+F(k1)2F(2k)=F(k)[2F(k1)+F(k)]

使用上述公式改寫 bn_fib_fdoubling,優點是可以少掉一次迴圈的計算及避免使用減法

void bn_fib_fdoubling(bn *dest, unsigned int n)
{
    bn_resize(dest, 1);
    if (n < 2) {  // Fib(0) = 0, Fib(1) = 1
        dest->number[0] = n;
        return;
    }

    bn *f1 = bn_alloc(1); // f1 = F(k-1)
    bn *f2 = dest;        // f2 = F(k) = dest
    f1->number[0] = 0;
    f2->number[0] = 1;
    bn *k1 = bn_alloc(1);
    bn *k2 = bn_alloc(1);

    for (unsigned int i = 1U << (30 - __builtin_clz(n)); i; i >>= 1) {
        /* F(2k-1) = F(k)^2 + F(k-1)^2 */
        /* F(2k) = F(k) * [ 2 * F(k-1) + F(k) ] */
        bn_lshift(f1, 1, k1);// k1 = 2 * F(k-1)
        bn_add(k1, f2, k1);  // k1 = 2 * F(k-1) + F(k)
        bn_mult(k1, f2, k2); // k2 = k1 * f2 = F(2k)
        bn_mult(f2, f2, k1); // k1 = F(k)^2
        bn_swap(f2, k2);     // f2 <-> k2, f2 = F(2k) now
        bn_mult(f1, f1, k2); // k2 = F(k-1)^2
        bn_add(k2, k1, f1);  // f1 = k1 + k2 = F(2k-1) now
        if (n & i) {
            bn_swap(f1, f2);    // f1 = F(2k+1)
            bn_add(f1, f2, f2); // f2 = F(2k+2)
        }
    }
    bn_free(f1);
    bn_free(k1);
    bn_free(k2);
}

結果如下 (v2 紅線)

    23,928,237,220      cycles                                                        ( +-  0.06% )  (66.64%)
    69,570,862,420      instructions              #    2.91  insn per cycle           ( +-  0.00% )  (83.33%)
             8,401      cache-misses              #    1.001 % of all cache refs      ( +-  5.17% )  (83.33%)
           839,163      cache-references                                              ( +-  9.65% )  (83.33%)
    11,641,338,644      branch-instructions                                           ( +-  0.00% )  (83.35%)
        41,101,058      branch-misses             #    0.35% of all branches          ( +-  1.42% )  (83.34%)

           7.06808 +- 0.00453 seconds time elapsed  ( +-  0.06% )
  • 時間從 7.31640 s 降低至 7.06808 s,小幅度減少約 3% 時間

改善 bn 使用動態記憶體的方式

原本實作的大數運算會在計算前先使用 bn_resize (realloc),確保有足夠大的空間來儲存計算結果,再於計算結束後檢查是否有多餘的空間 (msb 所在的 array 數值為 0) 並進行修剪 (trim),避免造成 memory leak 與增加後續計算的成本 (因為要走訪的空間會越來越長),然而頻繁使用 realloc 可能會造成降低效能

改善方案 3 - 引入 memory pool 的概念

參考 quiz 4,以 capacity 的方式管理 bn 實際可用的記憶體大小,降低 bn_resize 實際呼叫 realloc 的次數

typedef struct _bn {
    unsigned int *number;  /* ptr to number */
    unsigned int size;     /* length of number */
+   unsigned int capacity; /* total allocated length, size <= capacity */
    int sign;
} bn;
#define INIT_ALLOC_SIZE 4
#define ALLOC_CHUNK_SIZE 4

bn *bn_alloc(size_t size)
{
    bn *new = (bn *) malloc(sizeof(bn));
    new->size = size;
    new->capacity = size > INIT_ALLOC_SIZE ? size : INIT_ALLOC_SIZE;
    new->number = (unsigned int *) malloc(sizeof(int) * new->capacity);
    for (int i = 0; i <size; i++)
        new->number[i] = 0;
    new->sign = 0;
    return new;
}

static int bn_resize(bn *src, size_t size)
{
    ...
    if (size > src->capacity) { /* need to allocate larger capacity */
        src->capacity = (size + (ALLOC_CHUNK_SIZE - 1)) & ~(ALLOC_CHUNK_SIZE - 1); // ceil to 4*n
        src->number = realloc(src->number, sizeof(int) * src->capacity);
    }
    if (size > src->size) { /* memset(src, 0, size) */
        for (int i = src->size; i < size; i++)
            src->number[i] = 0;
    }
    src->size = size;
}
  • 只有當 size 超過 capacity 時才會 realloc,並以 4 為單位配置更大的空間
  • 所有計算仍以 size 作為計算的範圍,不會因為有多餘的空間而增加運算成本
  • trim 時只需要縮小 size,不需要實際 realloc 來縮小空間

結果如下 (v3 紅線)

    19,765,435,588      cycles                                                        ( +-  0.06% )  (66.64%)
    61,180,908,879      instructions              #    3.10  insn per cycle           ( +-  0.00% )  (83.33%)
             4,849      cache-misses              #    7.935 % of all cache refs      ( +-  5.97% )  (83.34%)
            61,110      cache-references                                              ( +-  5.79% )  (83.35%)
    10,612,740,290      branch-instructions                                           ( +-  0.00% )  (83.36%)
        32,583,167      branch-misses             #    0.31% of all branches          ( +-  1.54% )  (83.32%)

           5.83800 +- 0.00350 seconds time elapsed  ( +-  0.06% )
  • 時間從 7.06808 s 減少至 5.83800 s,減少約 17% 時間
  • cache-references 從 839,163 大幅度降低至 61,110,顯示原本頻繁呼叫 realloc 造成的成本非常可觀

改善方案 4 - 善用 64-bit CPU 的優勢

bn 資料結構中原本每個 array 的資料型態使用 unsigned int,在 64-bit CPU 下改為使用 uint64_t 應該能增加計算效能 (因為 word size 就是 64-bit)

#include <stdint.h>

#if defined(__LP64__) || defined(__x86_64__) || defined(__amd64__) || defined(__aarch64__)
#define BN_WSIZE 8
#else
#define BN_WSIZE 4
#endif

#if BN_WSIZE == 8
typedef uint64_t bn_data;
typedef unsigned __int128 bn_data_tmp; // gcc support __int128
#elif BN_WSIZE == 4
typedef uint32_t bn_data;
typedef uint64_t bn_data_tmp;
#else
#error "BN_WSIZE must be 4 or 8"
#endif

typedef struct _bn {
    bn_data *number;  /* ptr to number */
    bn_data size;     /* length of number */
    bn_data capacity; /* total allocated length, size <= capacity */
    int sign;
} bn;
  • 使用 bignum/apm.h 中的方式來定義 bn 的資料型態,以便於根據不同的 word size 切換定義
  • 乘法運算時會用到 2 倍大小的的暫存變數,直接使用 gcc 提供的 __int128 實作

結果如下 (v4)

    12,669,256,697      cycles                                                        ( +-  0.07% )  (66.64%)
    38,320,121,559      instructions              #    3.02  insn per cycle           ( +-  0.00% )  (83.32%)
             5,867      cache-misses              #   11.048 % of all cache refs      ( +- 14.55% )  (83.32%)
            53,104      cache-references                                              ( +- 12.56% )  (83.33%)
     5,274,117,456      branch-instructions                                           ( +-  0.00% )  (83.35%)
         2,174,668      branch-misses             #    0.04% of all branches          ( +-  0.28% )  (83.36%)

           3.74384 +- 0.00279 seconds time elapsed  ( +-  0.07% )
  • 時間從 5.83800 s 減少至 3.74384 s,減少約 36% 時間
  • instructions 的數量降低約 37%,顯示使用 uint64_t 更能發揮 64 位元 CPU 的優勢

改善 bn_add 的效能

為了凸顯 bn_add 對效能的影響,這個章節改為量測 bn_fib (iterative) 作為判斷依據,並將量測的範圍提高到 F(10000)。由於上述幾個改善策略也會提升 bn_add 的效能,因此先重新量測現有的效能,結果如下 (v1 紅線)

    87,130,307,524      cycles                                                        ( +-  0.01% )  (66.66%)
   262,062,098,878      instructions              #    3.01  insn per cycle           ( +-  0.00% )  (83.33%)
            11,863      cache-misses              #    2.193 % of all cache refs      ( +-  4.13% )  (83.33%)
           540,853      cache-references                                              ( +- 10.00% )  (83.33%)
    33,988,594,050      branch-instructions                                           ( +-  0.00% )  (83.34%)
       243,724,292      branch-misses             #    0.72% of all branches          ( +-  0.00% )  (83.33%)

          25.73128 +- 0.00279 seconds time elapsed  ( +-  0.01% )

改善方案 I - 改寫 bn_add 的實作法

原本的實作會在每次迴圈判斷需要相加的數值,這麼做的優點是只需寫一個迴圈就能完成計算,但缺點是每次迴圈都有兩個 branch 要判斷。為了改善這點,改為使用兩個迴圈進行計算,第一個迴圈先計算兩者皆有資料的範圍,再於第二個迴圈處理 carry 與剩餘的資料範圍。另外,藉由無號整數不會 overflow 的特性 (modulo),可以進一步避免使用 __int128 (bn_data_tmp) 進行計算

/* |c| = |a| + |b| */
static void bn_do_add(const bn *a, const bn *b, bn *c)
{
    ...
-   bn_data_tmp carry = 0;
-   for (int i = 0; i < c->size; i++) {
-       bn_data tmp1 = (i < asize) ? a->number[i] : 0;
-       bn_data tmp2 = (i < bsize) ? b->number[i] : 0;
-       carry += (bn_data_tmp) tmp1 + tmp2;
-       c->number[i] = carry;
-       carry >>= DATA_BITS;
-   }
    
+   bn_data carry = 0;
+   for (int i = 0; i < bsize; i++) {
+       bn_data tmp1 = a->number[i];
+       bn_data tmp2 = b->number[i];
+       carry = (tmp1 += carry) < carry;
+       carry += (c->number[i] = tmp1 + tmp2) < tmp2;
+   }
+   if (asize != bsize) {  // deal with the remaining part if asize > bsize
+       for (int i = bsize; i < asize; i++) {
+           bn_data tmp1 = a->number[i];
+           carry = (tmp1 += carry) < carry;
+           c->number[i] = tmp1;
+       }
+   }

    if (carry) {
        c->number[asize] = carry;
        ++(c->size);
    }
}

 Performance counter stats for './fib' (10 runs):

    42,111,360,506      cycles                                                        ( +-  0.41% )  (66.66%)
   125,087,664,564      instructions              #    2.97  insn per cycle           ( +-  0.00% )  (83.33%)
             9,037      cache-misses              #    5.927 % of all cache refs      ( +-  7.14% )  (83.33%)
           152,468      cache-references                                              ( +-  8.29% )  (83.34%)
    12,833,863,666      branch-instructions                                           ( +-  0.00% )  (83.34%)
       147,335,826      branch-misses             #    1.15% of all branches          ( +-  0.02% )  (83.34%)

           12.4361 +- 0.0512 seconds time elapsed  ( +-  0.41% )
  • branch-instructions 減少約 63%,branch-misses 也減少約 40%
  • cache-references 減少約 72%,顯示我本來的實作法有多餘的執行步驟,使 CPU 不斷重複讀取某些數值
  • 時間從 25.73128 s 減少至 12.4361 s,減少約 52% 時間

改善 bn_mult 的效能

改回量測 bn_fib_fdoubling 作為判斷依據,並接續上述 fast doubling v4 版本,將測試範圍提高至 F(10000),會發現 bn_mult 的效能明顯低於對照組

     7,208,970,350      cycles                                                        ( +-  0.11% )  (66.54%)
    15,804,723,358      instructions              #    2.19  insn per cycle           ( +-  0.00% )  (83.27%)
             3,826      cache-misses              #    9.269 % of all cache refs      ( +-  4.17% )  (83.29%)
            41,280      cache-references                                              ( +-  8.72% )  (83.37%)
     1,667,790,605      branch-instructions                                           ( +-  0.01% )  (83.44%)
        58,185,471      branch-misses             #    3.49% of all branches          ( +-  0.06% )  (83.36%)

           2.13072 +- 0.00229 seconds time elapsed  ( +-  0.11% )

改善方案 5 - 改寫 bn_mult 的實作法

原本實作 bn_mult 的方式為依序將兩格 array 相乘,再將結果直接疊加至輸出的變數,然而這會導致每行乘法被拆分成 2 個步驟 (相乘後先將 carry 疊加至下個 array,下次迴圈又再從該 array 取出數值來進行乘法),降低計算的速度。接下來參考 bignum/mul.c 來改寫 bn_mult,改為一次將乘積與 carry 疊加至輸出的變數來提升效能

/* c[size] += a[size] * k, and return the carry */
static bn_data _mult_partial(const bn_data *a, bn_data asize, const bn_data k, bn_data *c)
{
    if (k == 0)
        return 0;
    
    bn_data carry = 0;
    for (int i = 0; i < asize; i++) {
        bn_data high, low;
        bn_data_tmp prod = (bn_data_tmp) a[i] * k;
        low = prod;
        high = prod >> DATA_BITS;
        carry = high + ((low += carry) < carry);
        carry += ((c[i] += low) < low);
    }
    return carry;
}

void bn_mult(const bn *a, const bn *b, bn *c)
{
    ...
    bn_data *cp = c->number + a->size;
    for (int j = 0; j < b->size; j++) {
        c->number[a->size + j] =
            _mult_partial(a->number, a->size, b->number[j], c->number + j);
    }
    ...
}

     2,288,892,189      cycles                                                        ( +-  0.05% )  (66.40%)
     7,563,269,285      instructions              #    3.30  insn per cycle           ( +-  0.00% )  (83.35%)
             3,584      cache-misses              #   13.507 % of all cache refs      ( +- 29.45% )  (83.48%)
            26,534      cache-references                                              ( +-  9.79% )  (83.48%)
       602,658,857      branch-instructions                                           ( +-  0.02% )  (83.48%)
         3,857,937      branch-misses             #    0.64% of all branches          ( +-  0.15% )  (83.15%)

          0.678312 +- 0.000674 seconds time elapsed  ( +-  0.10% )
  • 時間從 2.13072 s 減少至 0.678312 s,減少約 68% 時間

改善方案 6 - inline asm

bignum 中使用 inline asm 來直接取得乘法運算的高位與低位,直接使用一樣的方式實作乘法,取代原本使用的 __int128 (bn_data_tmp)

static bn_data _mult_partial(const bn_data *a, bn_data asize, const bn_data k, bn_data *c)
{
    if (k == 0)
        return 0;
    
    bn_data carry = 0;
    for (int i = 0; i < asize; i++) {
        bn_data high, low;
-       bn_data_tmp prod = (bn_data_tmp) a[i] * k;
-       low = prod;
-       high = prod >> DATA_BITS;
+       __asm__("mulq %3" : "=a"(low), "=d"(high) : "%0"(a[i]), "rm"(k));
        carry = high + ((low += carry) < carry);
        carry += ((c[i] += low) < low);
    }
    return carry;
}

     1,412,000,613      cycles                                                        ( +-  0.07% )  (65.71%)
     3,782,233,502      instructions              #    2.68  insn per cycle           ( +-  0.02% )  (82.91%)
             1,816      cache-misses              #    9.135 % of all cache refs      ( +- 17.30% )  (83.56%)
            19,878      cache-references                                              ( +-  1.72% )  (83.76%)
       357,455,000      branch-instructions                                           ( +-  0.02% )  (83.76%)
         3,862,706      branch-misses             #    1.08% of all branches          ( +-  0.15% )  (83.21%)

          0.418849 +- 0.000460 seconds time elapsed  ( +-  0.11% )
  • 時間從 0.678312 s 減少至 0.418849 s,減少約 38% 時間
  • 使用 inline asm 的效能比 __int128 好,主因是沒辦法藉由使用 __int128 直接把乘積的高位與低位儲存至指定的空間

引入 bn_sqr

         a   b   c
      x  a   b   c
 -------------------
        ac  bc  cc
    ab  bb  bc
aa  ab  ac

考慮上述

abc2 的計算過程,會發現數值
ab
ac
bc
各會重複一次,因此可先計算對角線其中一邊的數值,將數值的總和直接乘二,最終再加上對角線上的
aa
bb
cc
。藉由這種方式,平方運算的成本可由本來的
n2
次乘法降為
(n2n)/2
次乘法

void do_sqr_base(const bn_data *a, bn_data size, bn_data *c)
{
    bn_data *cp = c + 1;
    const bn_data *ap = a;
    bn_data asize = size - 1;
    for (int i = 0; i < asize; i++) {
        /* calc the (ab bc bc) part */
        cp[asize - i] = _mult_partial(&ap[i + 1], asize - i, ap[i], cp);
        cp += 2;
    }

    /* Double it */
    for (int i = 2 * size - 1; i > 0; i--)
        c[i] = c[i] << 1 | c[i - 1] >> (DATA_BITS - 1);
    c[0] <<= 1;

    /* add the (aa bb cc) part at diagonal line */
    cp = c;
    ap = a;
    asize = size;
    bn_data carry = 0;
    for (int i = 0; i < asize; i++) {
        bn_data high, low;
        __asm__("mulq %3" : "=a"(low), "=d"(high) : "%0"(ap[i]), "rm"(ap[i]));
        high += (low += carry) < carry;
        high += (cp[0] += low) < low;
        carry = (cp[1] += high) < high;
        cp += 2;
    }
}

結果如下 (v7 藍線)

     1,057,685,945      cycles                                                        ( +-  0.14% )  (66.56%)
     2,744,641,149      instructions              #    2.59  insn per cycle           ( +-  0.02% )  (83.39%)
             1,304      cache-misses              #    6.200 % of all cache refs      ( +- 19.30% )  (83.46%)
            21,032      cache-references                                              ( +-  3.28% )  (83.46%)
       292,210,120      branch-instructions                                           ( +-  0.05% )  (83.46%)
         3,400,028      branch-misses             #    1.16% of all branches          ( +-  1.65% )  (83.05%)

          0.314624 +- 0.000825 seconds time elapsed  ( +-  0.26% )
  • 時間從 0.418849 s 減少至 0.314624 s,減少約 25% 時間
  • 資料長度越長,節省的時間越明顯

實作 Karatsuba algorithm

雖然上述 v7 版本所花的時間已略低於參考組,但若將量測範圍逐漸提高,會發現效能仍不及參考組,至 F(100000) 時差距約有 1 倍,觀察 bignum 的原始碼會發現有使用 Karatsuba algorithm 來加速乘法與平方運算,因此接下來一樣實作該演算法來提升效能

Karatsuba algorithm 的核心概念是將 a 與 b 拆分為高位與低位再進行計算,考量計算

a×b,且 a 與 b 的位數皆為
N=2n
位 (2 進位下的位數,不過 10 進位時邏輯相同),可將 a 與 b 表示如下

a=a0+a1×2n
b=b0+b1×2n

因此

a×b 可進一步整理為

(2n+22n)(a1b1)+2n(a1a0)(b0b1)+(2n+1)(a0b0)

由於

2n 可藉由 bit shift 達成,因此實際使用乘法的部分只剩 3 項,遠少於直接使用乘法的
N2
項,可大幅度降低乘法運算的成本

實作的程式碼很長所以不直接貼上,請詳見 do_mult_karatsubado_sqr_karatsuba

將 Karatsuba multiplication 應用至 bn_multbn_sqr 後,效能如下 (v8 藍線)

  • 圖中設置的閾值與參考組一樣,但縮小閾值並不會顯著提升數值長度較小時的效能

mutex 對 Linux 核心模組的影響

mutex 可用來確保 critical section (也就是 mutex 圍住的範圍) 內的程式碼同時間只會有一個 thread 可以執行,避免 thread 取用共用的資源時 (shared resource) 的同時,另一個 thread 對該資源進行修改,造成 race conditon

fibdrv 分別在 fib_openfib_release 中使用 mutex_trylockmutex_unlock 將 /dev/fibonacci 上鎖及解鎖,因此同時間只能有一個 user thread 使用此 char device。這點可以藉由簡單的實驗驗證,測試的程式碼如下

void *worker(void* arg){
    int fd = open("/dev/fibonacci", O_RDWR);
    if (fd < 0) {
        perror("Failed to open character device");
        goto end;
    }

    char buf[100];
    for (int i = 80; i <= 92; i++) {
        lseek(fd, i, SEEK_SET); // result = F(i)
        long long result = read(fd, buf, 0);
        printf("F(%d): %lld\n", i, result);
    }
end:
    close(fd);
}

int main(){
    pthread_t test[2];
    pthread_create(&test[0], NULL, worker, NULL);
    pthread_create(&test[1], NULL, worker, NULL);
    for(int i = 0; i < 2; i++)
        pthread_join(test[i], NULL);
    return 0;
}

另外,fib_read 更改為根據 offset 回傳費氏數列結果

static ssize_t fib_read(struct file *file,
                        char *buf,
                        size_t size,
                        loff_t *offset)
{
    return fib_sequence_fdouble(*offset);
}

可以觀察到第二個 thread 無法成功 open,因為第一個 thread 已經取得 mutex 了

$ ./client
F(80): 23416728348467685
F(81): 37889062373143906
F(82): 61305790721611591
Failed to open character device: Device or resource busy
F(83): 99194853094755497
...

如果將 fibdrv.c 中關於 mutex 的部分移除會造成甚麼後果呢? 以上述的例子而言,反而會使所有 thread 都能正常輸出結果,這是因為本例中的 thread 間沒有共用的資源

F(80): 23416728348467685
F(81): 37889062373143906
F(82): 61305790721611591
F(83): 99194853094755497
F(80): 23416728348467685
F(81): 37889062373143906
F(82): 61305790721611591
...

那麼 fibdrv 中是否完全不需使用 mutex 呢? 其實還是得根據使用的情境而定,例如下面這個例子,更改為讓所有 thread 共用同個 file descriptor,由於 offset 實際上儲存於該 fd 對應的 struct file 中的 f_pos 欄位,因此 offset 會成為共用資源

static int fd;

void *worker(void* arg){
    char buf[100];
    int delay = *((int *) arg);
    for (int i = 80; i <= 92; i++) {
        lseek(fd, i, SEEK_SET);
        sleep(delay);
        long long result = read(fd, buf, 0);
        printf("thread %d F(%d): %lld\n", delay, i, result);
    }
}

int main(){
    fd = open("/dev/fibonacci", O_RDWR);
    if (fd < 0) {
        perror("Failed to open character device");
        exit(1);
    }

    pthread_t test[2];
    int delay_a = 1;
    pthread_create(&test[0], NULL, worker, (void *) &delay_a);
    int delay_b = 2;
    pthread_create(&test[1], NULL, worker, (void *) &delay_b);
    for(int i = 0; i < 2; i++)
        pthread_join(test[i], NULL);

    close(fd);
    return 0;
}

這時就可以觀察到結果會出現錯誤,thread 2 以 lseek 設定好目標 offset 後會休眠 2 秒,但在休眠的期間 thread 1 又更改了 offset,導致 thread 進行 read 時使用的 offset 不是當初設定的數值,產生 race condition

$ ./client
thread 1 F(80): 23416728348467685
thread 2 F(80): 37889062373143906
thread 1 F(81): 37889062373143906
thread 1 F(82): 61305790721611591
...

如何量測模組間的相依性和實際使用次數 (reference counting)

模組間的相依性

Linux Device Drivers 中的 The Kernel Symbol Table 小節提到核心模組可以輸出 symbols 來讓其他核心模組使用 (module stacking),不過這樣會產生模組的相依性 (dependency) 關係,也就是說輸出 symbols 的核心模組需要先被掛載,才能供其他模組使用。

  • 可用 modprobedepmod 來處理核心模組相依性的問題
  • modprobe 會根據 /lib/modules/$(uname -r)/modules.dep 的內容來掛載與卸除核心模組
  • depmod 用來產生 modules.dep

接著寫兩個小型核心模組來觀察相依性,首先是輸出 symbol 的 hello_dep1.ko

#include <linux/init.h>
#include <linux/module.h>

MODULE_LICENSE("Dual MIT/GPL");
MODULE_AUTHOR("KYWeng");
MODULE_DESCRIPTION("kernel module dependency test");
MODULE_VERSION("0.1");

void print_hello(void)
{
    printk(KERN_DEBUG "Hello Worl... what are you expecting?\n");
}
EXPORT_SYMBOL(print_hello);  // export to kernel symbol table

static int __init mod1_init(void)
{
    printk(KERN_DEBUG "hello_dep1 has been loaded.\n");
    return 0;
}

static void __exit mod1_exit(void)
{
}

module_init(mod1_init);
module_exit(mod1_exit);
  • 使用 EXPORT_SYMBOL 將 symbol 輸出至 symbol table
    • 位於 ELF 中的 __ksymtab section
  • 可以使用 cat /proc/kallsyms | grep 'hello_dep.*' 來觀察 EXPORT_SYMBOL 造成的差異,會發現 print_hello 的 symbol type 從小寫的 't' 變為大寫的 'T'
    • man nm - If lowercase, the symbol is usually local; if uppercase, the symbol is global (external).

再來是負責使用 symbol 的 hello_dep2.ko

#include <linux/init.h>
#include <linux/module.h>

MODULE_LICENSE("Dual MIT/GPL");
MODULE_AUTHOR("KYWeng");
MODULE_DESCRIPTION("kernel module dependency test");
MODULE_VERSION("0.1");

extern void print_hello(void);

static int __init mod1_init(void)
{
    print_hello();
    return 0;
}

static void __exit mod1_exit(void)
{
}

module_init(mod1_init);
module_exit(mod1_exit);
  • hello_dep2.ko 直接使用 hello_dep1.ko 輸出的 print_hello
  • 編譯時會出現警告 WARNING: "print_hello" [path/hello_dep2.ko] undefined!

將兩者編譯並掛載後,可從 dmesg 觀察到 hello_dep2.ko 成功呼叫 print_hello 後產生的訊息

...
[235433.055308] hello_dep2 has been loaded.
[235433.055319] Hello Worl... what are you expecting?

核心模組的 reference counting (refcnt)

成功掛載 hello_dep2.ko 後,觀察 /sys/module/MODULENAME/refcnt 會發現 refcnt 從 0 變成 1

$ cat /sys/module/hello_dep1/refcnt
1

根據核心的說明文件 Unreliable Guide To Hacking The Linux Kernel,可以知道 try_module_getmodule_put() 分別會增加及減少 usage count (refcnt),不過掛載模組時是如何用到這兩個函式呢?

掛載模組時 (insmod)

strace 觀察單獨掛載 hello_dep2.ko 時的情況,會發現錯誤來自於 finit_module 這個步驟,錯誤代碼為 ENOENT,進一步追蹤核心程式碼,可以歸納出以下流程:

finit_module > load_module > simplify_symbols (從模組的 ELF 中取出需要尋找的 external symbols) > resolve_symbol_wait > resolve_symbol (會調用 find_symbol 來尋找該 symbol 屬於哪個核心模組) > ref_module > strong_try_module_get > try_module_get (增加 symbol 所屬的核心模組的 refcnt)

註:try_module_get 實際改變的是 struct module 中的 refcnt,在 load_module 的過程中會建立與 sysfs 的關係,然後再將模組的相關資訊列出到 sys/module/modulename,在 Module parameters in sysfs 有相關的討論

open device 的時候

同樣以 strace 觀察 user space 對 /dev/fibdrv 執行 open,可以發現實際會執行的 system call 為 openat,一路追蹤核心程式碼,可以歸納出以下流程:

openat > do_sys_openat2 > do_filp_open (共會嘗試呼叫 3 次 path_openat,第一次會使用 LOOKUP_RCU 這個 flag 來開啟) > path_openat > do_open (以前稱為 do_last,請參閱 LWN linux-kernel archive) > vfs_open > do_dentry_open > fops_get (增加目標核心模組的 refcnt)

fops_get 除了增加 refcnt 之外,主要的目的是從核心模組的 inode 取得 file 的作業方式 inode->i_fop,然後再執行 open 這個動作 f->f_op->open

static int do_dentry_open(struct file *f,
			  struct inode *inode,
			  int (*open)(struct inode *, struct file *))
{
	...
	f->f_op = fops_get(inode->i_fop);
	...
	if (!open)
		open = f->f_op->open;
	...
}

我們知道對 char devices 執行 open (user space),最終會一同呼叫模組內對應的函式 (定義於模組內 struct file_operations.open 欄位,例如 fibdrv 中的 fib_open),不過模組的 inode->i_fop 是在何時連結至我們定義的 struct file_operations 呢? 考量 /dev/fibdrv 這個裝置會在呼叫 device_create 後產生,一路觀察相關的核心程式碼:

device_create > device_create_vargs > device_create_groups_vargs > device_add > devtmpfs_create_node > devtmpfsd (這是一個 kthread,很像 daemon 的概念) > handle_create > vfs_mknod > mknod (dir->i_op->mknod 會根據檔案系統而有不同的實作) > init_special_inode

init_special_inode 會將 inode->i_fop 設為 def_chr_fops,其中 .open 的函式為 chrdev_open,也就是說上述 dentry_open 中的 f->f_op->open 不會直接呼叫模組中定義的函式,而是會先呼叫 chrdev_open,然後 chrdev_open 再將核心模組中定義的 struct file_operations 取代原本 inode 中的 def_chr_fops

/*
 * Called every time a character special file is opened
 */
static int chrdev_open(struct inode *inode, struct file *filp)
{
	...
	if (!p) {
		...
		p = inode->i_cdev;
		...
	}
	...
	fops = fops_get(p->ops);
	...
	replace_fops(filp, fops);
	...
}

lsmod 是如何實作出來的?

直接以 strace 觀察 lsmod,可以發現其實是直接列出 /sys/module/module_name 內的資訊,其中 “each module’s use count and a list of referring modules” 的資訊是來自於 holders 這個子目錄

...
openat(AT_FDCWD, "/sys/module/hello_dep1/refcnt", O_RDONLY|O_CLOEXEC) = 3
read(3, "1\n", 31)                      = 2
read(3, "", 29)                         = 0
close(3)                                = 0
openat(AT_FDCWD, "/sys/module/hello_dep1", O_RDONLY|O_CLOEXEC) = 3
openat(3, "coresize", O_RDONLY|O_CLOEXEC) = 4
read(4, "16384\n", 31)                  = 6
read(4, "", 25)                         = 0
close(4)                                = 0
close(3)                                = 0
openat(AT_FDCWD, "/sys/module/hello_dep1/holders", O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 3                            = 0
...

使用 sysfs 介面來提供讀寫操作

簡介

[man 5 sysfs]
The sysfs filesystem is a pseudo-filesystem which provides an interface to kernel data structures. (More precisely, the files and directories in sysfs provide a view of the kobject structures defined internally within the kernel.) The files under sysfs provide information about devices, kernel modules, filesystems, and other kernel components.

sysfs 是 Linux 的一個虛擬檔案系統,提供使用者存取部分核心資料結構的介面 (準確來說是定義於核心內的 struct kobject 物件),可以讓使用者讀取核心資訊或是寫入設定值 (例如 driver 相關設定)。sysfs 通常被掛載於 /sys 目錄下。

sysfs 的目錄結構基本上由 kobjects 、 ktypes 、 attributes 和 ksets 構成,要確實了解這個部分還是建議閱讀 Linux Device Drivers 的第 14 章或是核心說明文件 sysfs - The filesystem for exporting kernel objectsEverything you never wanted to know about kobjects, ksets, and ktypes。以下將列出部分重點,以便於更快速理解 sysfs 的目錄結構

kobjects

  • Linux Device Model 的基本單元,不過這裡的討論先著重於 kobjects 在 sysfs 的呈現方式
  • kobjects 指的是 struct kobject 或是內嵌有 struct kobject 的資料結構,但很少單獨使用,一般都是內嵌在別的資料結構中使用
  • kobject 在 sysfs 內會以目錄 (directory) 的形式呈現
    • 該 kobject 具有的屬性 (attributes) 會以檔案的形式呈現於其目錄下

attributes

  • 每個 kobject 會具有一至多個 attributes,每個 attribute 都代表特定的核心資訊,以檔案的形式呈現於 kobjects 的目錄下
  • attributes 指的是內嵌 struct attribute 的資料結構,例如 struct kobj_attribute
  • 使用者可以對 attributes 進行讀取 (e.g. cat) 或設定 (e.g. echo) 這兩種操作
    • attributes 中 .show.store 兩個欄位的 callback functions 決定了核心內部的對應行為
  • 分為預設屬性 (default) 和非預設屬性 (non-default) 兩種
    • default: ktype 中預設包含的屬性,kobject 會自動具有所屬 ktype 中定義的 default attribute(s)
    • non-default: 使用 sysfs_create_file 增加至 kobjects 上的屬性

ktypes

  • ktypes 指的是資料型態為 struct kobj_type 的物件
  • 任何 kobjects 都會對應到一種 ktypes
  • 定義 ktypes 物件時,需指定以下欄位
    • .release 指向負責釋放所屬 kobjects 的函式
    • .sysfs_ops 指向一個 struct sysfs_ops,其中的 .store.shows 欄位分別定義了當使用者讀取或設置 attributions 時 sysfs 會呼叫的 callback funstions
    • .default_attrs 指向一個 struct attribute_group,其中定義了此 ktype 具有的預設 attribute(s)
  • 讀到這裡會發現 attributes 和 ktypes 各自具有一組不同的 .store.shows 欄位,當使用者讀取或是設置 attributes 時,呼叫的順序如下
    • sysfs 呼叫 ktype 中的 callback,ktype 中的 callback 再去呼叫 attribute 中定義的 callback
    • 也就是說,ktype 提供一個一致的介面 (callback) 給 sysfs,然後每個 attributes 再各自定義不同的存取方式
    • 其面提到的預設 ktype dynamic_kobj_ktype 提供給 sysfs 的 callback 是 kobj_attr_showkobj_attr_store,其實就是直接呼叫 attributes 中定義的 callback functions

ksets

  • kobjects 的群體,可以視為 kobjects 的基本容器,且 ksets 中可以再包含別的 kset(s)
    • 不要和 ktypes 搞混,ksets 是資料的群組,ktypes 是資料的類別,因此一種 ktype 可以重複出現在多個不同的 kset 中
  • 以目錄的形式呈現在 sysfs 中,目錄下會有所含 kobject(s) 的目錄
  • ksets 指的是資料型態為 struct kset 的物件
  • kset 也具有一個自己的 kobject,不過應用上可以無視這點

新增 fibdrv 在 sysfs 中的存取路徑

接下來會在 fibdrv 中實作專屬的 kobject - fib_obj,在 sysfs 中的存取路徑為 /sys/kernel/linux2020/fibdrv,目前只需要一個 attribute - nth,此 attribute 的用途如下

  • 寫入: 設置費氏數列的目標項數
  • 讀取: 回傳設置項數的費氏數列數值

定義 kobject - fib_obj

第一步當然是定義 fibdrv 專用的 kobject

struct fib_obj {
    struct kobject kobj;
    int n;
};
#define to_fib_obj(x) container_of(x, struct fib_obj, kobj)

因為使用自定義的 kobject,需要自定義函數來配置 kobject 所需的記憶體及進行初始化

static struct fib_obj *create_fib_obj(const char *name)
{
    struct fib_obj *fib;
    int retval;

    /* allocate the memory for the whole object */
    fib = kzalloc(sizeof(*fib), GFP_KERNEL);
    if (!fib)
        return NULL;

    /* the kobject will be placed under the kset, no need to set a parent */
    fib->kobj.kset = linux2020_kset;
    retval = kobject_init_and_add(&fib->kobj, &fib_ktype, NULL, "%s", name);
    if (retval) {
        kobject_put(&fib->kobj);
        return NULL;
    }
    kobject_uevent(&fib->kobj, KOBJ_ADD);
    return fib;
}
  • 注意先指定 kobject 的 kset 再使用 kobject_init_and_add,如此一來 kobject 會自動列於該 kset 的目錄下
  • fib_ktype 是自定義的 ktype,後文會說明

準備 ktype 所需欄位 .default_attrs

定義 fibdrv 專用的 attribute

/* a custom attribute that works just for a struct fib_obj */
struct fib_attribute {
    struct attribute attr;
    ssize_t (*show)(struct fib_obj *fib,
                    struct fib_attribute *f_attr,
                    char *buf);
    ssize_t (*store)(struct fib_obj *fib,
                     struct fib_attribute *f_attr,
                     const char *buf,
                     size_t count);
};
#define to_fib_attr(x) container_of(x, struct fib_attribute, attr)

定義 fib_attribute 的 callback functions

static ssize_t fib_show(struct fib_obj *fib_obj,
                        struct fib_attribute *f_attr,
                        char *buf)
{
    int retval;
    bn *fib = bn_alloc(1);
    bn_fib_fdoubling(fib, fib_obj->n);
    char *p = bn_to_string(fib);
    retval = scnprintf(buf, PAGE_SIZE, "%s\n", p);
    bn_free(fib);
    kfree(p);
    return retval;
}

static ssize_t fib_store(struct fib_obj *fib,
                         struct fib_attribute *f_attr,
                         const char *buf,
                         size_t count)
{
    int ret;

    ret = kstrtoint(buf, 10, &(fib->n));
    if (ret < 0)
        return ret;

    return count;
}
  • 注意傳給 call back functions 的 buff 大小一定是 PAGE_SIZE,而且使用的格式為字串
  • 應使用 scnprintf 而不是 snprintf,可參閱 snprintf() confusion

接著創建所需的 attribute,待之後定義 ktype 時使用

static struct fib_attribute nth = __ATTR(nth, 0664, fib_show, fib_store);

/* default attribute for fib_ktype */
static struct attribute *fib_default_attrs[] = {
    &nth.attr, NULL, /* need to NULL terminate the list of attributes */
};
  • 目前只需要一個 attribute,將其命名為 nth
  • fib_default_attrs 是之後定義 ktype 時連結於 .default_attrs 欄位的資料結構
  • 仔細觀察會發現目前的實作不會用到 fib_attribute,這麼做的目的只是為了練習使用自定義 attribute

準備 ktype 所需欄位 .sysfs_ops

定義 sysfs 存取 ktype 的 callback functions

static ssize_t fib_attr_show(struct kobject *kobj,
                             struct attribute *attr,
                             char *buf)
{
    struct fib_obj *fib;
    struct fib_attribute *f_attr;

    fib = to_fib_obj(kobj);
    f_attr = to_fib_attr(attr);
    if (!f_attr->show)
        return -EIO;

    return f_attr->show(fib, f_attr, buf);
}

static ssize_t fib_attr_store(struct kobject *kobj,
                              struct attribute *attr,
                              const char *buf,
                              size_t len)
{
    struct fib_obj *fib;
    struct fib_attribute *f_attr;

    fib = to_fib_obj(kobj);
    f_attr = to_fib_attr(attr);
    if (!f_attr->store)
        return -EIO;

    return f_attr->store(fib, f_attr, buf, len);
}
  • 藉由使用 container_of 來取出我們自定義的 kobject 與 attribute,再呼叫 attribute 中定義的 callback functions
  • 會發現跟預設 ktype 的實作一樣,這麼做的目的一樣是為了練習自定義 ktype

最後準備 ktype 中 .sysfs_ops 欄位所需的部分

/* sysfs_ops for fib_ktype */
static struct sysfs_ops fib_sysfs_ops = {
    .show = fib_attr_show,
    .store = fib_attr_store,
};

準備 ktype 所需欄位 .release

kobject 必須動態記憶體,因此需定義釋放的方式。由於使用自定義的 kobject,一樣須搭配使用 container_of 來取得實際的 kobject 指標

/* release function for fib_ktype */
static void fib_obj_release(struct kobject *kobj)
{
    struct fib_obj *fib;

    fib = to_fib_obj(kobj);
    kfree(fib);
}

定義 ktype - fib_ktype

將上述三個部分組成我們要的 ktype - fib_ktype

/* Define our own ktype */
static struct kobj_type fib_ktype = {
    .sysfs_ops = &fib_sysfs_ops,
    .release = fib_obj_release,
    .default_attrs = fib_default_attrs,
};

將 kobject 整合至 fibdrv 模組

新增 kobject 相關的部分至 fibdrv 掛載與卸載的函數 init_fib_devexit_fib_dev

static struct kset *linux2020_kset;
static struct fib_obj *fib_obj;

static int __init init_fib_dev(void)
{
    ...
    // sysfs registeration
    linux2020_kset = kset_create_and_add("linux2020", NULL, kernel_kobj);
    if (!linux2020_kset)
        return -ENOMEM;
    fib_obj = create_fib_obj("fibdrv");
    if (!fib_obj)
        goto failed_sysfs;
    ...
failed_sysfs:
    kset_unregister(linux2020_kset);
    return rc;
}
  • kset_create_and_add 使用的參數為 kernel_kobj,代表會將名稱為 "linux2020" 的 kset 置於 /sys/kernel/ 下
  • 如果註冊的 kobject 不只一個,當註冊失敗時記得要反向將已經成功的 kobject 使用 kobject_put,否則那些 kobject(s) 就會一直佔據記憶體
static void __exit exit_fib_dev(void)
{
    ...
    kobject_put(&(fib_obj->kobj));
    kset_unregister(linux2020_kset);
}
  • 應使用 kobject_put 來減少 kobject 的 refcount,當 refcount 為 0 時,sysfs 就會呼叫 ktype 中 .release 欄位的 callback 來清理配置的記憶體

實際測試後,fib_obj 會正確掛載於 /sys/kernel/linux2020/fibdrv,且讀寫操作皆符合預期

$ sudo bash -c "echo 300 > /sys/kernel/linux2020/fibdrv/nth"
$ cat /sys/kernel/linux2020/fibdrv/nth
222232244629420445529739893461909967206666939096499764990979600

參考資料

Writing a Linux Kernel Module — Part 1: Introduction
The first 300 Fibonacci numbers, factored
簡介 perf_events 與 Call Graph

tags: linux 2020