Try   HackMD

2025q1 Homework3 (kxo)

contributed by < Max042004 >

作業書寫規範:

  • 無論標題和內文中,中文和英文字元之間要有空白字元 (對排版和文字搜尋有利)
  • 文字訊息 (尤其是程式執行結果) 請避免用圖片來表示,否則不好搜尋和分類
  • 共筆書寫請考慮到日後協作,避免過多的個人色彩,用詞儘量中性
  • 不要在筆記內加入 [TOC] : 筆記左上方已有 Table of Contents (TOC) 功能,不需要畫蛇添足
  • 不要變更預設的 CSS 也不要加入任何佈景主題: 這是「開發紀錄」,用於評分和接受同儕的檢閱
  • 在筆記中貼入程式碼時,避免非必要的行號,也就是該手動將 c=cpp= 變更為 ccpp。行號只在後續討論明確需要行號時,才要出現,否則維持精簡的展現。可留意「你所不知道的 C 語言: linked list 和非連續記憶體」裡頭程式碼展現的方式
  • HackMD 不是讓你張貼完整程式碼的地方,GitHub 才是!因此你在開發紀錄只該列出關鍵程式碼 (善用 diff 標示),可附上對應 GitHub commit 的超連結,列出程式碼是為了「檢討」和「便於他人參與討論」
  • 留意科技詞彙的使用,請參見「資訊科技詞彙翻譯」及「詞彙對照表
  • 不要濫用 :::info, :::success, :::warning 等標示,儘量用清晰的文字書寫。:::danger 則僅限授課教師作為批注使用
  • 避免過多的中英文混用,已有明確翻譯詞彙者,例如「鏈結串列」(linked list) 和「佇列」(queue),就使用該中文詞彙,英文則留給變數名稱、人名,或者缺乏通用翻譯詞彙的場景
  • 在中文敘述中,使用全形標點符號,例如該用「,」,而非 ","。注意書名號的使用,即 ,非「小於」和「大於」符號
  • 避免使用不必要的 emoji 字元

開發環境

$ gcc --version
gcc (Ubuntu 13.3.0-6ubuntu2~24.04) 13.3.0
Copyright (C) 2023 Free Software Foundation, Inc.
This is free software; see the source for copying conditions.  There is NO
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.

$ lscpu
Architecture:             x86_64
  CPU op-mode(s):         32-bit, 64-bit
  Address sizes:          39 bits physical, 48 bits virtual
  Byte Order:             Little Endian
CPU(s):                   4
  On-line CPU(s) list:    0-3
Vendor ID:                GenuineIntel
  Model name:             Intel(R) Core(TM) i5-7360U CPU @ 2.30GHz
    CPU family:           6
    Model:                142
    Thread(s) per core:   2
    Core(s) per socket:   2
    Socket(s):            1
    Stepping:             9
    CPU(s) scaling MHz:   27%
    CPU max MHz:          3600.0000
    CPU min MHz:          400.0000
    BogoMIPS:             4599.93
    Flags:                fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush
                           dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_
                          tsc art arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc cpuid aperfmp
                          erf pni pclmulqdq dtes64 monitor ds_cpl vmx smx est tm2 ssse3 sdbg fma cx16 xtp
                          r pdcm pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx 
                          f16c rdrand lahf_lm abm 3dnowprefetch cpuid_fault epb pti ssbd ibrs ibpb stibp 
                          tpr_shadow flexpriority ept vpid ept_ad fsgsbase tsc_adjust bmi1 avx2 smep bmi2
                           erms invpcid mpx rdseed adx smap clflushopt intel_pt xsaveopt xsavec xgetbv1 x
                          saves dtherm ida arat pln pts hwp hwp_notify hwp_act_window hwp_epp vnmi md_cle
                          ar flush_l1d arch_capabilities
Virtualization features:  
  Virtualization:         VT-x
Caches (sum of all):      
  L1d:                    64 KiB (2 instances)
  L1i:                    64 KiB (2 instances)
  L2:                     512 KiB (2 instances)
  L3:                     4 MiB (1 instance)
NUMA:                     
  NUMA node(s):           1
  NUMA node0 CPU(s):      0-3
Vulnerabilities:          
  Gather data sampling:   Mitigation; Microcode
  Itlb multihit:          KVM: Mitigation: VMX disabled
  L1tf:                   Mitigation; PTE Inversion; VMX conditional cache flushes, SMT vulnerable
  Mds:                    Mitigation; Clear CPU buffers; SMT vulnerable
  Meltdown:               Mitigation; PTI
  Mmio stale data:        Mitigation; Clear CPU buffers; SMT vulnerable
  Reg file data sampling: Not affected
  Retbleed:               Mitigation; IBRS
  Spec rstack overflow:   Not affected
  Spec store bypass:      Mitigation; Speculative Store Bypass disabled via prctl
  Spectre v1:             Mitigation; usercopy/swapgs barriers and __user pointer sanitization
  Spectre v2:             Mitigation; IBRS; IBPB conditional; STIBP conditional; RSB filling; PBRSB-eIBRS
                           Not affected; BHI Not affected
  Srbds:                  Mitigation; Microcode
  Tsx async abort:        Mitigation; TSX disabled

Linux kernel module

linux kernel module 是你隨意添加進 linux kernel 的程式碼,執行 module 不需要重新編譯整個 kernel
kernel module 運行在 kerenl space ,所以一旦程式 crash 可能使得作業系統停擺。kernel 程式碼分成 load time 和 boot time
可卸載的 kernel module 的入口是 module_init,通常是 insmod 載入後執行;結束是 module_exitrmmod 後執行

由於 kernel module 與核心連結,為避免變數名稱衝突,所以:

  1. 所有變數宣告為 static,限制其作用域
  2. 為符號(symbols)加上明確的前綴字。按照慣例,Linux 核心的前綴均採用小寫字母。

lsmod 查看已載入的核心模組
從 command line 傳入參數到核心模組,無法依靠 argc/argv ,要靠 module_param 的巨集。

為什麼要用 write, read 等系統呼叫

在 user-mode 配置一個記憶體區塊,核心裝置驅動無法用該記憶體位址的指標直接將資料寫入(該指標值在 kernel address space 可能不同),所以需要透過 copy_to_user,將想傳給使用者模式 (即運作中的 client) 的字串複製到到 sort_read 的 buf 參數後,client 端方可接收到此字串。而 copy_to_user 的時間成本隨著數據 byte 變多會影響量測效能。

Linux kernel 計時有關的功能主要用在二種情境:
timer: 安排「在某個時間點做某件事情」。可想像成火車班次表,這個重要的地方是:若某班火車誤點,會連鎖地影響後面所有班次。因此需要較精準的計時機制;
timeout: 用來作為逾時的通知,提醒「有東西遲到」。最簡單的例子是 qtest 裡面的 SIGALRM 的使用。這對於時間的精準度要求不高;
最早 linux kernel 使用 jiffies 計時,用作業系統從開機以來的計時器中斷發生的次數作為計時的依據。但受限於計時器中斷觸發和實質能處理的頻率的極限
於是後來出現的 hrtimer 建立在 ktime_t 這個新的資料結構

以下程式碼如何測量執行時間
static ktime_t kt;

static long long sort_time_proxy(long long k)
{
    kt = ktime_get();
    long long result = sort_sequence(k);
    kt = ktime_sub(ktime_get(), kt);

    return result;
}

static ssize_t sort_read(struct file *file,
                        char *buf,
                        size_t size,
                        loff_t *offset)
{
    return (ssize_t) sort_time_proxy(*offset);
}

static ssize_t sort_write(struct file *file,
                         const char *buf,
                         size_t size,
                         loff_t *offset)
{
    return ktime_to_ns(kt);
}

CPU 的親和性

$ taskset -p <pid>
pid 1's current affinity mask: f

affinity mask 是十六進位,轉換為二進位後每一位代表是否該行程可在對應的 CPU 核上運行,輸出顯示 Linux 說我的電腦有四個 CPU 核(即便配備的是雙核心 Intel Core i5)

若想保留 CPU 核,輸入以下指令會指定 CPU 核在開機後不會在上面排程

isolcpus=0,1

可用 sudo dmesg 查看核心環境訊息 (kernel ring buffer)。只顯示最新 50 筆用 sudo dmesg | tail -n 50

執行 kxo 時,發現使得我電腦上 youtube 播放的音樂卡頓,猜測在 kernel 執行的 kxo 會中斷其他行程的執行。(後續幾次執行 kxo 卻又沒有這現象,不過可以在使用 hackmd 感受到明顯卡頓)
在一開始 clone 下來的版本,使用 linux kernel 既有的 workqueue, timer, tasklet 實現非同步執行

Linux kernel 並行處理

何為 workqueue, tasklet, irq

處理器接收到軟體與硬體的中斷請求(interupt request 簡稱 irq),會執行對應的中斷處理程序,此程序無法被排程。
實作後續的處理(Bottom half) 有以下三種機制:

  • softirqs
  • tasklets
  • workqueues

irq

不能讓 CPU 等慢的裝置,所以叫慢的裝置去通知 CPU
要用通知與發送可以交錯執行,並行處理

top half/bottom half
top half 會關中斷,不會交錯。bottom half 會交錯。
因為中斷處理會中斷程式的進行,如果處理時間太長便會影響系統的運作效率,因此才分成 top half, bottom half,具有中斷系統進行的 top half 盡可能迅速結束,再由可排程的 bottom half 完成中斷處理後續工作。

因為從網路處理路徑很長(OSI 7 層, TCP/IP),所以要用 bottom half 去執行。

softirq

可用以下命令觀察有哪幾種 softirq:

$ sudo cat /proc/softirqs
                    CPU0       CPU1       CPU2       CPU3       CPU4       CPU5       CPU6       CPU7       
          HI:         63          0          1          0          0         11          0          5
       TIMER:      91569      81926      83934     103131      74011     127987     113794     125569
      NET_TX:          4          2          0          0          0          3          0         33
      NET_RX:       4400       2811       4047       3683       2501       5472       2422     152954
       BLOCK:        351        282        679        225        358      15623      39399        486
    IRQ_POLL:          0          0          0          0          0          0          0          0
     TASKLET:     173719          0         57        524         20         42          0       1908
       SCHED:     240706     199132     152716     184508     189760     380257     337690     202108
     HRTIMER:          0          0          0          0          0          0          0          1
         RCU:     142200     142248     117051     133957     138481     245684     225400     148630

tasklet

建立在 softirq 上,但與 softirq 不一樣的是可以動態配置且可以被用在驅動裝置上。
game_tasklet_handler 將 AI 函式包在 workqueue 內的理由是盡可能減少 tasklet 的執行時間,因為 tasklet 無法被中斷和排程。所以執行時間長會造成延遲。

同一個 tasklet 會在同一個 CPU 執行

workqueue

可排程,可中斷,可睡眠

Asynchronous process execution context

中文翻譯成非同步上下文
其實指的就是非同步操作。因為有些工作比如等待網路回應,如果都用同步操作,也就是呼叫 A → 執行 A → 結果出來 → A 返回。控制權、呼叫堆疊與結果交付都在同一條線上完成,這樣便會阻塞程式的進行。
因此碰到這樣的情況要使用非同步。

使用 kernel space 中的非同步有三種機制可以選:softirq, tasklet, workqueue

User space 則有:select(), epoll(), setjmp/longjmp 還有 C++ Coroutine

Workqueue

在核心程式常有非同步的需求,當需要非同步上下文時,work item 是將要被執行的函式,被放在 workqueue。負責執行非同步上下文的 thread 稱為 worker。
可睡眠、可排程,不佔用中斷鎖或 BOTTOM‑HALF 時間
workqueue 的運作機制是:

  1. 先建立一個 work item ,只紀錄要執行哪個函式。
  2. 把這個 workitem 放在 workqueue
  3. 核心裡獨立的 worker thread 會依序取出 work item 並執行。

使用 workqueue 的方式是:

  1. 建立 work item (INIT_WORK() / INIT_DELAYED_WORK()
  2. queue_work() / schedule_delayed_work() 排入對應 workqueue

定義在 work item 的函式稍後才會由核心的 worker theard 在 process context 執行,達成非同步

tasklet

實作前自我提問

降低核心與使用者層級的通訊成本

  1. 核心與使用者層級如何通訊
  2. 通訊的時候交換什麼狀態
  3. 哪裡是瓶頸
  4. 為什麼分別要用 /dev 和 sysfs
  5. inode 是?
  6. udev 是?
  7. ktime 運作機制?

使用者層級運行 AI

將兩個 kxo 在同一個核心上執行,利用 Coroutine 的機制,彼此交互執行。所以我們要實作一個極簡排程器,負責處理兩個 kxo 的運行,中斷處理,以及同時可以接受命令列的輸入。
排程器必須處理行程 context switch ,以及制定策略,比如 FIFO ,或 RR ,還要考量優先權。

  1. kxo 的運作機制是?
  2. 實作是單執行緒還是多執行緒?
  3. coroutine 實作兩個 kxo 彼此如何溝通?
  4. 要制定怎樣的排程策略?
  5. 如何設計搶佔與協同?
  6. 如何設計任務結構體?
  7. 如何將多事件整合才單一事件迴圈?好處是?
  8. 程式運行硬體端看不見的成本有哪些?
  9. coroutine 與作業系統內建排程器所進行的並行操作有何不同?
  10. 自己實作的極簡排程器為什麼不會與作業系統內建的排程器衝突?
  11. workqueue 是什麼?
  12. 何時會有中斷請求?
  13. 為什麼要中斷請求?
  14. 中斷處理的實作是?
  15. 不同的 AI 的效能與效果的差別?
  16. 如何做效能分析?
  17. 如何從方便效能分析的角度設計整個實作?

中斷處理

  1. pr_info 的作用是? Pretend to simulate access to per-CPU data, disabling preemption during the pr_info() 是什麼意思?
  2. kfifo 是?
  3. smp_wmb() 是?

kxo 運作流程

  1. 核心空間:
    一個字元裝置+sysfs 介面,外加計時器、tasklet、workqueue。主要檔案是 main.c
  2. 使用者空間
    棋盤與勝負判定在 game.c;AI 分別用 MCTS (mcts.c) 與 Negamax (negamax.c) 完成
  3. 遊戲邏輯
    從 /dev/kxo 讀 ASCII 棋盤;透過 /sys/class/kxo/kxo/kxo_state 送控制字元(暫停顯示或結束)
  • timer_handler() 是最重要的「起點」,
  • game_tasklet_func() 是 softirq 的「分配者」,
  • ai_*_work_func()drawboard_work_func() 是真正改資料與把資料丟向使用者空間的「工人」。

掛載 module $ sudo insmod kxo.ko
執行 user space 程式 xo-user.c,然後 xo-user.c 執行 open /dev/kxo,呼叫 kxo 定義的 file_operations open,其內部設定 timer ,時間到呼叫 AI 計算下一步並寫入 /dev/kxo,select() 監聽到變化後執行 printf() 棋盤

程式碼分析

使用者層級互動

透過將使用者層級的輸入,寫入代表核心模組當中變數的檔案和核心模組進行溝通

struct kxo_attr {
    char display;
    char resume;
    char end;
    rwlock_t lock;
}

運用 sysfs 的機制,這個結構體的資料會掛載到 /sys/class/kxo/kxo/kxo_state 檔案中,執行後 cat 可以看到裡面是:

1 1 0^@

按 ctrl-q 後:

1 1 1^@

運作機制藉由 select() 同時監聽鍵盤輸入與 /dev/kxo
當 kxo 運行時會根據接收到鍵盤輸入,觸發修改 kxo_state 的內容。
當 /dev/kxo 接收到棋盤的資料輸入時,觸發棋盤繪製

select() 監聽原理:
監聽檔案是否處於 ready 的狀態,是的話便回傳。所以 select() 監聽的檔案只對 socket, pipe, 裝置有效,因為它們不是一直處在 ready。

select() is used to see if any of the specified file descriptors is, or becomes, ready for some class of I/O operation.

open 檔案的兩種寫法:open() 和 fopen()
要選擇使用哪個?

int open(const char *pathname, int flags, ...
                  /* mode_t mode */ );
FILE *fopen(const char *restrict pathname, const char *restrict mode);

cdev

Error handling

WARN_ON_ONCE() 輸出錯誤訊息(僅會輸出一次)

Time

ktime_get() 得到自 boot 以來的時間

同步操作

READ_ONCE() 功能類似 atomic_read ,避免讀取過程數值被其他 thread 修改
smp_rmb() 記憶體屏障
https://www.kernel.org/doc/Documentation/memory-barriers.txt

非同步操作

queue_work()

system call

strace hello_world.c 可以追溯程式使用到的 system call

降低核心與使用者層級之間的通訊成本

遊戲進行的過程,會把棋盤狀態儲存於 /dev/kxo ,每當 device_fd 觸發 select()

/dev 底下的每一個檔案代表一個檔案(device file)都代表一個硬體,而使用者與硬體的溝通橋樑就是這些檔案
每一個 device file 有 major 與 minor ,major 是 device driver 的編號,minor 是裝置的編號,意義為在第 major 號 device driver 的中對應第 minor 號裝置。
所有 major 相同的 device file 均被同一個 device driver 控制。

device 分為 character device 和 block device,區別是前者傳輸資料以 byte 為單位,後者以許多 byte 組成的 block 為單位。
character device 沒有 buffer 緩衝區,適合鍵盤、滑鼠這類即時反應的裝置。
block device 支援 buffer 緩衝,可以累積資料然後一次進行傳遞,並且編排傳遞順序使得更有效率,支援 random access,適合硬碟這類裝置。

crw-------   1 root root    510,   0 May  8 20:22 kxo
// major: 510, minor : 0

用命令搜尋後發現僅有 kxo 使用 major 510,啟動 kxo,/kxo 便掛載到 /kxo 底下。

device file 的用處在於提供溝通界面,應用程式不需要知道電腦使用什麼硬體,就能用規範好的方式,透過 device file 讓 device driver 與硬體溝通。
device 被分為兩類:character device 與 block device

image

driver 要定義對硬體做 read, write, open, release 等 file operation。
註冊一個 device major 和 minor 的方法是 alloc_chrdev_region()register_chrdev() ,後者屬於舊式。
使用 alloc_chrdev_region() 註冊後,初始化 device:

struct cdev my_cdev;
cdev_init(&my_cdev, &my_fops);
cdev_add(&my_cdev, dev, 1);

//or using dynamic alloc
struct cdev *my_cdev = cdev_alloc();
my_cdev->ops = &my_fops;

註冊 device 的方法是 device_create()

最後是釋放資源,取消註冊 device_destroy(), cdev_del(), unregister_chrdev_region()

在 kxo 用到 device driver 的意義是透過 device file 將資料傳遞 user space。
user space 和 kernel space 的記憶體不互通,所以不能直接傳遞。如果用一般寫檔的方式,也會有其他麻煩。因此用 device file 傳遞是經典,安全,簡單的作法

Q: device_fd 怎麼觸發 select() 的

sysfs

sysfs 讓使用者層級與正在運行中的 kernel 溝通,藉由讀取或設定變數達成。用 kobject 在核心程式碼中創造實體,這樣在核心程式碼就能讀取這些變數,並且用 device_create, device_create_file 在 sysfs 底下建立出溝通用的資料夾和檔案。其檔案可以一般檔案形式存在

實作

降低核心與使用者層級的通訊成本

先定義什麼算是通訊成本,核心與使用者層級之間要傳遞的狀態包含:

  1. 棋盤狀態
  2. 暫停,結束

棋盤狀態

每一次觸發 AI 計算完下一步後,會把結果寫入 table static 陣列,其存在於核心空間記憶體。將資料傳遞到使用者層級透過註冊的 file_operation read()
先前我們註冊了一個 device 可理解是個能夠循序存取檔案,透過定義相關的函式,可利用存取檔案的系統呼叫以存取。
當使用者層級使用該 kxo device 的 file_operation,就會呼叫我們定義的 kxo_open, kxo_read
這裡我們要讀取棋盤資料,於是用到 kxo_read,其藉由 kfifo_to_user()rx_fifo 的資料複製到使用者層級的 display_buf。所以核心與使用者層級之間的資料溝通,實踐就是 kfifo_to_user()

draw_board()
   │              (把 table[] 畫成 ASCII)
   ▼
produce_board()
   │              (kfifo_in 將資料塞進 rx_fifo)
   ▼
wake_up_interruptible(&rx_wait)
   │              (喚醒阻塞在 read/select 的行程)
   ▼
kxo_read()  ←─── 使用者程式 read("/dev/kxo")
   │              (kfifo_to_user 把 rx_fifo → buf)
   ▼
user buffer      (xo-user 印出棋盤)

workqueue 在 kernel thread 問題

work_func 一系列三個函式皆在 kernel thread 執行,此效果是? kernel 對 softirqs 與 hard-irqs 的互動是?

鍵盤輸入

xo-user 呼叫 listen_keyboard_handler ,使用 sysfs 的 write 和 read,分別會對應呼叫 kxo_state_storekxo_state_show

到這邊,總算理清核心與使用者層級的溝通,接著定義一下要改善的範圍。

  1. 對於輸出棋盤,可以把 drawboard_work_func()xo-user 輸出棋盤前經過的步驟都定義成要改善的部份。
  2. 對於鍵盤輸入,可以把 listen_keyboard_handler()kxo_state_ 操作到讀取 attr_obj ,期間經過的鎖也算在內

效能分析

接下來,需要效能分析才能量化改善的成果,採用的方法可以是:

  1. ftrace 或 ktime 追蹤發出訊息到印出資料之間的延遲
  2. perf 追蹤 context-switch 次數
  3. strace 追蹤系統呼叫

以上方法均要以統計方法證明,並且設定環境排除干擾因子

於是我首先利用老師教材使用到 ktime api,排除干擾因子
我照著教材排除四項干擾因子(我的電腦是 intel 晶片),並且以 sudo taskset 0x1 ./xo-user 執行程式,結果圖表不符合預期,如果只剔除右部 95%,發現有大量極大數據:

lat_databoard

如果剔除右部 50%:

lat_databoard

最後發現數據只有在剔除右部的 80% 才能看見類似常態分佈。

lat_databoard

接著我假設這一小部份數據呈現的常態分佈就是溝通時間成本的分佈,為了驗證假設,我用 udelay 增加延遲時間,以下第一、二張圖分別用了 udelay(8)udelay(15)。實驗後發現數據大致符合預期,平均值分別增加大概 8 微秒和 15 微秒 。
因此能證明這個區間的數據為實際溝通成本的分佈

lat_databoard_8
lat_databoard

得到改善前溝通時間成本的分佈後,就可以著手改善溝通成本了。
首先改善繪製棋盤,目前棋盤在每一次更新一步就全部重新繪製,包括分隔線。所以可以更改為 module 載入後就先繪製好,然後每下一步就更新那一步,每下完一盤棋就保留分隔線然後把棋格清空。
改善完以後,發現時間非但沒有減少,還增加了 1 微秒左右。原因未知,推測是以上改善的成效並不可見,反而被電腦其他的干擾因素影響

lat_databoard
我現在沒有將 CPU 隔離,也沒有考慮溫度降頻的問題,必須也納入考慮,避免統計分佈被影響。

於是我發現應該改善的不只是減少棋盤繪製次數,應該將整個棋盤繪製搬到使用者層級。因為目前通訊最大開銷是 kfifo 整份棋盤複製到使用者層級,memcpy 的開銷無法減少,所以應該改成 kfifo 只裝載每一次更動的棋子位置,這樣應該就能很大程度減少通訊成本。老師給的教材提到一篇論文研究 copy_to_user 的開銷為 1 位元組 22 ns ,現在 kfifo 每一次都傳遞 60 位元組左右,所以如果減少成僅傳遞 1 個位元組,預計應可減少 1.2 微秒左右的延遲

接著我發現效能統計結果會有大量極大數據的原因,因為在每一次棋局結束後也會繪製一次棋盤,而我在每一次棋盤繪製時會擷取當下的時間作為效能統計時間計算的 end time,因此會造成這棋局結束的每一次計算時間都是 end time - 0,造成錯誤。修正以後,把 start time 是 0 的剔除,就能發現僅剩少數極端值,而且明顯極端值的數值小很多:

lat_databoard
接著統計模型的判斷,或許可以為超過平均值一定的標準差的數據捨去,或者其他更可靠的統計方法,避免隨意的決定捨去數目。
這邊應該參考長尾資料分佈的處理,也要採用一些檢定方法

分析方法要參考
依據〈Hierarchical performance measurement and modeling of the Linux file system〉研究指出,從核心模式複製資料到使用者層級的時間成本是,每個位元組達 22ns,因此進行效能分析時,要考慮到 copy_to_user 函式的時間開銷,特別留意到資料大小成長後造成的量測誤差。

Coroutine 並行(使用者空間)

目的:在同一顆 CPU 上實作並行,策略是 FIFO ,每一個 AI 執行完換下一個 AI 執行。

要處理資料同步,以及核心空間到使用者空間的資訊傳遞。

setjmp/longjmp

使用 setjmp/longjmp,是使用者空間的函式庫,可以用來實作並行
setjmp 功能是 snapshot 當前函式的 stack ,並紀錄位置。longjmp 會跳到 setjmp 的位置。
setjmp 第一次呼叫回傳 0 ,從 longjmp 跳來時回傳設定值(預設為 1 )

詳細原理
/* Implementing coroutines with setjmp/longjmp */

#include <setjmp.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "list.h"

struct task {
    jmp_buf env;
    struct list_head list;
    char task_name[10];
    int n;
    int i;
};

struct arg {
    int n;
    int i;
    char *task_name;
};

static LIST_HEAD(tasklist);
static void (**tasks)(void *);
static struct arg *args;
static int ntasks;
static jmp_buf sched;
static struct task *cur_task;

static void task_add(struct task *task)
{
    list_add_tail(&task->list, &tasklist);
}

static void task_switch()
{
    if (!list_empty(&tasklist)) {
        struct task *t = list_first_entry(&tasklist, struct task, list);
        list_del(&t->list);
        cur_task = t;
        longjmp(t->env, 1);
    }
}

void schedule(void)
{
    static int i;

    setjmp(sched);

    while (ntasks-- > 0) {
        struct arg arg = args[i];
        tasks[i++](&arg);
        printf("Never reached\n");
    }

    task_switch();
}

static long long fib_sequence(long long k)
{
    /* FIXME: use clz/ctz and fast algorithms to speed up */
    long long f[k + 2];

    f[0] = 0;
    f[1] = 1;

    for (int i = 2; i <= k; i++) {
        f[i] = f[i - 1] + f[i - 2];
    }

    return f[k];
}

/* A task yields control n times */

void task0(void *arg)
{
    struct task *task = malloc(sizeof(struct task));
    strcpy(task->task_name, ((struct arg *) arg)->task_name);
    task->n = ((struct arg *) arg)->n;
    task->i = ((struct arg *) arg)->i;
    INIT_LIST_HEAD(&task->list);

    printf("%s: n = %d\n", task->task_name, task->n);

    if (setjmp(task->env) == 0) {
        task_add(task);
        longjmp(sched, 1);
    }

    task = cur_task;

    for (; task->i < task->n; task->i += 2) {
        if (setjmp(task->env) == 0) {
            long long res = fib_sequence(task->i);
            printf("%s fib(%d) = %lld\n", task->task_name, task->i, res);
            task_add(task);
            task_switch();
        }
        task = cur_task;
        printf("%s: resume\n", task->task_name);
    }

    printf("%s: complete\n", task->task_name);
    free(task);
    longjmp(sched, 1);
}

void task1(void *arg)
{
    struct task *task = malloc(sizeof(struct task));
    strcpy(task->task_name, ((struct arg *) arg)->task_name);
    task->n = ((struct arg *) arg)->n;
    task->i = ((struct arg *) arg)->i;
    INIT_LIST_HEAD(&task->list);

    printf("%s: n = %d\n", task->task_name, task->n);

    if (setjmp(task->env) == 0) {
        task_add(task);
        longjmp(sched, 1);
    }

    task = cur_task;

    for (; task->i < task->n; task->i++) {
        if (setjmp(task->env) == 0) {
            printf("%s %d\n", task->task_name, task->i);
            task_add(task);
            task_switch();
        }
        task = cur_task;
        printf("%s: resume\n", task->task_name);
    }

    printf("%s: complete\n", task->task_name);
    free(task);
}

#define ARRAY_SIZE(arr) (sizeof(arr) / sizeof((arr)[0]))
int main(void)
{
    void (*registered_task[])(void *) = {task0, task0, task1};
    struct arg arg0 = {.n = 70, .i = 0, .task_name = "Task 0"};
    struct arg arg1 = {.n = 70, .i = 1, .task_name = "Task 1"};
    struct arg arg2 = {.n = 70, .i = 0, .task_name = "Task 2"};
    struct arg registered_arg[] = {arg0, arg1, arg2};
    tasks = registered_task;
    args = registered_arg;
    ntasks = ARRAY_SIZE(registered_task);

    schedule();

    return 0;
}

解釋每一個 setjmp, longjmp 的發生。
程式進入點為 main 函式,設定好基本變數後,呼叫 schedule(),schedule 內部的 setjmp 設定紀錄點 sched,然後進入迴圈(初始 ntasks 為 3),迴圈中用陣列中的函式指標呼叫 task 函式,於是首先呼叫 task0 ,task0 初始化分配自己需要的記憶體空間,setjmp task0->env 然後因為第一次回傳 0 ,所以執行 if 內的程式將 task0 加入 tasklist ,接著 longjmp 到 sched 這個屬於 schedule 函式的紀錄點,接著繼續執行迴圈(此時 ntasks 為 2)呼叫第二個 task 的初始化,同理第三個 task。當所有 task 初始化完了以後。這時 tasklist 內有三個 task ,順序是 task0, task1, task2 。然後回到 schedule 函式,執行 task_switch() 。
task_switch() 取得 tasklist 的第一個節點,也就是 task0 並將它從 tasklist 移除,然後 longjmp 到 task0 所設定的紀錄點,進入 task0 後,由於是從 longjmp 跳來的,所以 setjmp 回傳 1 ,跳過以下程式:

if (setjmp(task->env) == 0) {
        task_add(task);
        longjmp(sched, 1);
    }

接著進入 for 迴圈,進行第二個 setjmp(等於設定新的紀錄點),因為是初次設定,所以 setjmp 回傳 0 執行以下程式碼:

if (setjmp(task->env) == 0) {
            long long res = fib_sequence(task->i);
            printf("%s fib(%d) = %lld\n", task->task_name, task->i, res);
            task_add(task);
            task_switch();
        }

執行於預定操作 fib-sequence 後,將 task0 重新新增回 tasklist 的尾巴後,做 task_switch(),再來就會換到 task1。
持續這樣的 round robin 排程操作直到一個 task 內迴圈結束,task 自己釋放資源,然後 longjmp 回 schedule 函式。當所有 task 資源都被釋放,schedule 函式結束,整段程式結束

schedule                task0               task1
   |                      |                  |
   | setjmp=0             |                  |
   |--> call task0() -----|                  |
   |                      | setjmp=0         |
   |                      | add→queue        |
   |                      | longjmp(sched,1) |
   | setjmp=1             |                  |
   |--> call task1() ------------------------|
   |                      |                  | setjmp=0
   |                      |                  | add→queue
   |                      |                  | longjmp(sched,1)
   | setjmp=1             |                  |
   | task_switch() -> pop task0, longjmp     |
   |---------------------->| setjmp=1        |
   |                      | work … add       |
   |                      | task_switch() -> task1
   |               ------------------>           
   |                      |                 |
   |                      <-----------------
   |                      |                 |
   |                      ------------------>
   ...
(輪流交替,直到兩邊 for 迴圈完成)