# 2024q1 Homework5 (assessment) contributed by < `lintin528` > ## 紀錄閱讀〈[因為自動飲料機而延畢的那一年](https://blog.opasschang.com/the-story-of-auto-beverage-machine-1/)〉的啟發 看完這篇文章的第一個想法是佩服作者的恆心,更可貴的是他儘管在一開始就遇到冰塊無法分開的這個沒想到會遇過的問題時,能夠不斷地想出自己的解決方法並真的去嘗試可行性,就我的觀點而言,在計畫的一開始就遇到這種無法預見且來自於一個小地方的問題時,多次嘗試解決卻又無果時,肯定會產生很大的無力感,畢竟這只是初期準備而已,誰能知道後面機器運作、 pipeline 流水線規劃 、電路上又會有多少問題,如果是我的話或許在這一階段就有很大的機會放棄了。 「你不能現在就放棄,要是現在就放棄的話,你這輩子日後遇到這種等級的困難,就只會想逃避而已。」 這句話帶給我很大的激勵,或許很多任務都不是可以簡單地被規劃出解決方法的,這種時候嘗試就是最好的解決方法,我認為漫無目的地尋找能一口氣完成的方式,逐步進行試錯更能帶給我有在前進的感覺,而不會產生原地踏步的絕望感,而且這更能使自己熟悉所做的所有細節。學習也是這樣,總是需要先建立好基礎,才能構築出更完整的專案,我覺得自己總在學習基礎時感到迷茫,看不見學習完能實際表現出來的成果,或許就是因為缺少了這種實作精神,紙上談兵永遠都只是想像的,就像是這幾個作業,僅僅提到 "效能提高" 可能是不負責任的,但一想到那些數學證明或統計,我往往會有些退縮,但這就是真正能成就、驗證什麼的人會去做的,看完這篇文章,我看見了作者做出決定前不會去害怕這麼做是否會一無所獲,嘗試未知的勇氣,我自認基礎不足,非常不足,但我也不該將學習僅僅侷限在單方面的接納知識,更該驗證自己的想法基於我所吸收的是否相同,這些疑惑我都該自己嘗試去分析、解決,就像作者勇於嘗試找出每個問題屬於自己的解法一樣。 ## 從前 4 週的測驗題選出 3 題改進 ## 紀錄第 1 到第 6 週「課程教材」和 CS:APP 3/e 心得和提問 ### 第四次作業中的[第三周測驗題二](https://hackmd.io/@lintin528/rJEt94H10) ```c d0 = q & 0b1; d1 = q & 0b11; d2 = q & 0b111; q = ((((tmp >> 3) + (tmp >> 1) + tmp) << 3) + d0 + d1 + d2) >> 7; r = tmp - (((q << 2) + q) << 1); ``` 這段程式碼有一些疑惑,主要是對於 `d0, d1, d2` 的功用,以及他的重複位元該怎麼處理,另外是這一段與最後的 `divmod_10` 之間的關係也不是很明白。 ## 簡述想投入的專案 對於排程或是並行程式這些不是直接透過改善任務演算法,而是透過分配硬體運算資源改善效能的方式較感興趣,希望能透過這次專題接觸實務上是如何分配資源,將先前作業系統相關的知識能自己嘗試做出分析。 ### 並行程式設計 >回顧〈並行和多執行緒程式設計〉教材和相關測驗題,強化對延伸問題的掌握 ### Linux 排程器研究 >探討 Linux 排程器內部設計,改進《Demystifying the Linux CPU Scheduler》,並尋求貢獻程式碼到 Linux 核心的機會 另外在這幾次的作業以及測驗中看到了許多利用 `GNU extension` 的位元操作來改善如 `ilog2` 、除法、平方根等等運算的效能,若能熟悉且合理的運用在其他專案上的話,期望在效能改善方面有幫助,我認為這是一個容易被忽視但卻是基本且有效的,若是自己在其他專案因為能力不足無法完成一定成果的話,位元操作是相對容易理解並實作的,所以選擇這個主題。 ### 位元操作 >第 2 週測驗題嘗試檢驗學員對位元操作的認知,本任務強化相關練習。 --- `x % 4` <-> `x & 0b11` `x % 8` <-> `x & 0b111` TODO: 第六次作業 (simrupt + ttt) 並改進現有同學的實作 (vax-r) > vax-r 程式碼的缺陷: user <-> kernel 之間的通訊用大量的文字 (棋盤), workqueue (CMWQ) 無法善用 CPU 資源, 展現的方式 (原本在 Linux 核心模組繪圖,同步的成本), AI 演算法有改進空間 > https://hackmd.io/@sysprog/linux2024-quiz4 (用數字表達下棋的狀況,單一的 atomic 操作即可保證內容正確) ### 改善作業三實作中棋盤的紀錄方式及勝利判斷 參閱 `vax-r` 同學之 [作業3](https://hackmd.io/@vax-r/linux2024-homework1#Basic-setups) 後,可以看到他所設定的井字遊戲棋盤並不是只有 `3*3` ,即他的 `BOARD_SIZE` 是可以被設定為三或以上的數字,並且在 `game.h` 中還另外設定了結構體 ```c typedef struct { int i_shift, j_shift; int i_lower_bound, j_lower_bound, i_upper_bound, j_upper_bound; } line_t; ``` 用以在之後判斷對局是否結束,這邊看到在使用這個結構體時,會先建立一個 `lines` 數組 ```c const line_t lines[4] = { {1, 0, 0, 0, BOARD_SIZE - GOAL + 1, BOARD_SIZE}, // ROW {0, 1, 0, 0, BOARD_SIZE, BOARD_SIZE - GOAL + 1}, // COL {1, 1, 0, 0, BOARD_SIZE - GOAL + 1, BOARD_SIZE - GOAL + 1}, // PRIMARY {1, -1, 0, GOAL - 1, BOARD_SIZE - GOAL + 1, BOARD_SIZE}, // SECONDARY }; ``` 這裡分別代表四個方向 (橫向、直向、左上右下、右上左下) 的線段,`GOAL` 為判斷勝利的長度,在一般 `3*3` 的對局中即為 `3` ,之後經過呼叫 `check_win` ,進行這四種獲勝條件方向的檢查。 ```c char check_win(char *t) { for (int i_line = 0; i_line < 4; ++i_line) { line_t line = lines[i_line]; for (int i = line.i_lower_bound; i < line.i_upper_bound; ++i) { for (int j = line.j_lower_bound; j < line.j_upper_bound; ++j) { char win = check_line_segment_win(t, i, j, line); if (win != ' ') return win; } } } for (int i = 0; i < N_GRIDS; i++) if (t[i] == ' ') return ' '; return 'D'; } ``` 在 `check_line_segment_win` 內 ```c for (int k = 1; k < GOAL; k++) { if (last != t[GET_INDEX(i + k * line.i_shift, j + k * line.j_shift)]) { return ' '; } ``` 可以觀察到這個實作方式的優缺點,優點是這個勝利判定方式在棋盤為大於 `3*3` 的方形棋盤時也同樣可以適用,缺點在於這個方式做了過多的 `for` 迴圈,即在所有有可能達成勝利條件的起始點都進行一次的 `check_line_segment_win` ,因此在每一次落子之後,至少都需要耗費 $O(N^2)$ 的計算量,其中 $N$ 為棋盤寬度;另外就是在資料存儲與傳輸的方面來看,這個實作方式需要一個大小為 `N_GRIDS` 的字串去儲存每一個格子內目前的狀況 `( ' ' / 'O' / 'X' )` ```c char table[N_GRIDS]; ``` 在之後 user 與 kernel 之間的傳輸將會大幅提高 buffer 的附載量,因此這裡參考測驗四中的 [tic-tac-toe](https://gist.github.com/jserv/9088f6f72a35a1fcaaf1c932399a5624) 實作方式。 這邊使用 `masks` 來分別代表棋盤中的九個格子,每當一方棋手在此處落子,則需要將紀錄自己落子情況的 `uint32_t boards[2]` 與對應棋盤位置的 `mask`進行 OR 操作,以更新旗手的所有落子情形,如 `boards[0]` 為一號棋手的棋盤狀況,則當其選擇落在 A2 的位置,即對應到 `move_masks[3]` ,則須做 `boards[0] |= move_masks[3]` 。 ```c static const uint32_t move_masks[9] = { 0x40040040, 0x20004000, 0x10000404, 0x04020000, 0x02002022, 0x01000200, 0x00410001, 0x00201000, 0x00100110, }; ``` 在這邊的每一步後,都需要去記錄棋盤中哪幾個位置是被下過的,以避免重複落子,範例中使用了以下方式來應對: ```c uint32_t available_moves[9] = {0, 1, 2, 3, 4, 5, 6, 7, 8}; for (uint32_t n_moves = 9; n_moves > 0; n_moves--) { /* Get board of player */ uint32_t board = boards[player - 1]; /* Choose random move */ uint32_t i = fastmod(xorshift32(), n_moves); uint32_t move = available_moves[i]; /* Delete move from available moves */ available_moves[i] = available_moves[n_moves - 1]; ... ``` 這邊很巧妙的利用一個方式使每一次的選擇中不重複選到同樣的位置,當在 `move = available_moves[i];` 記錄完本次選擇之後, `available_moves[i] = available_moves[n_moves - 1];` 會將此數組的最後一個元素去覆蓋掉本次所選擇的位置,這樣在 `for` 迴圈的下一次迭代時,會在前 `n_moves` 個元素做選擇,而被選取過的元素一定會被覆蓋掉,若是數組最後一個元素,也不會進入選擇範圍。 但這裡就遇到第一個問題:這個例子是以隨機選擇落子位置為前提而設計的,實際上棋手需要選擇特定的落子位置,目前想到的方式還是需要以一個大小為 `N_GRIDS` 的數組用以紀錄棋盤的落子情形,與先前的做法不同之處是在資料傳遞的過程中,並不會將此記錄整個棋盤狀況的數組也跟著傳遞,而是只透過 `boards[2]` 來表示兩位棋手目前的落子情形,以此降低 user <-> kernel 之間通訊的負擔。 再來就是勝利條件的檢查 ```c static inline uint32_t is_win(uint32_t player_board) { return (player_board + 0x11111111) & 0x88888888; } ``` 回想先前的 `boards` 設計,使用 `uint32_t` 變數中每 `4 bits` 的後 `3 bits` 用以紀錄這 `8` 條連線方式 (三個橫排、三個直排、兩條斜線) 的分佈情況,由此可知,當其中一條直線有三個落子時, `player_board + 0x11111111` 就會有其中一個 `4 bits` 為 `1000` ,因此與 `0x88888888` 做 AND 運算即可得到非零值。 ㄇ 第二個遇到的問題就是若想要套aㄎ用 `masks` 的紀錄方式,目前還是被限制在 `3*3` 棋盤的條件下,若要套用於 `4*4` 的棋盤時,則總共會有 `16` 條三個棋子的連線方式,那麼還是可以透過 `uint64_t (16*4) boards[2]` 來記錄落子情形,且新的 `move_masks[16]` 需要重新設定,以對應到這 `16` 條連線,但如果需要更大的棋盤 `(5*5)` ,建立 `move_masks[25]` 變得過於大型,複雜程度也將提升許多。 ### 分析第六次作業中 `simrupt.c` 中的實作與資料流向 在嘗試解析 `vax-r` 同學實作的 [simrupt.c](https://github.com/vax-r/KMLdrv/blob/main/simrupt.c#L405) 之前,有必要先理解 `CMWQ、workqueues、tasklet、softirqs` 這幾個用於 `deferred interrupts` 的機制,以及他們之間相互的差異與關係。 在 Linux Kernel 中,會希望中斷事件的處理時間能夠盡量的短,以提高系統的響應能力,且避免堆積過多重要事件,降低系統穩定性,因此為了使某些耗費計算量較大的中斷事件能夠滿足上述需求,則會利用到 `deferred interrupts` 的機制,將中斷事件中必要的部分 `top-half` 先處理、較為耗時的工作 `bottom-half` 利用 `workqueues、tasklet、softirqs` 執行。 #### workqueue 在 `workqueue` 中,目標函式 `work items` 允許 `block` 與 `sleep` ,對於一些更複雜或耗時的操作更有利,與 `tasklet` 不同,每個 CPU 會分別建立自己的執行緒 `workers` 來執行 `work items` ,因此在 `workqueue` 中的任務可以分配到不同的 CPU 內執行,在本專案中, `drawboard_work_func` 、 `ai_one_work_func` 等等需要同步記憶體空間訊息的操作,會使用 `workqueue` 的方式來處理。 在多執行緒 `multi-threaded` 的環境下每個 CPU 都有自己的執行緒,但傳統的 `workqueue` 中並行處理效率並不是太好,因為靜態分配執行緒的關係,並不能根據各個 CPU 的負載來動態調整執行緒的運行狀況,相對的, `CMWQ` 提供一些改善,引入管理與調度機制來使得各 CPU 能更有效率的分配資源,且使用全局的 `worker-pool` ,以避免一個執行緒僅執行特定 `workqueue` 中的任務,另外還有 CPU affinity 的調整等等,在保有原本 `workqueue API` 的提供更好的資源分配效率。 #### tasklet 與 softirq `softirq` 透過 `kernel thread` 執行較輕量且緊急的任務,在中斷發生後更新狀態或進行簡單的處理,而 `tasklet` 是基於 `softirq` 的機制,提供更高層的接口,讓使用者可以簡單的使用,且同一類型的 `tasklet` 只可以在同一個 CPU 上執行,相對於 `softirq` 是系統啟動時靜態分配,`tasklet` 可以在使用者需要時創建,都是延遲中斷的處理機制。 #### simrupt.c 實作 ```c struct kmldrv_attr { char display; char resume; char end; rwlock_t lock; }; ``` 首先定義虛擬 `file sys` 中的結構 ```c static void produce_board(void) { unsigned int len = kfifo_in(&rx_fifo, draw_buffer, sizeof(draw_buffer)); if (unlikely(len < sizeof(draw_buffer)) && printk_ratelimit()) pr_warn("%s: %zu bytes dropped\n", __func__, sizeof(draw_buffer) - len); pr_debug("kmldrv: %s: in %u/%u bytes\n", __func__, len, kfifo_len(&rx_fifo)); } ``` 這邊用於將計算出的棋盤,包括方格全部儲存的 `draw_buffer` 寫入 `rx_fifo` 並檢查有無缺失,之後再透過 `read()` 讀取模組文件時就可以將 `kernel space` 內的數據讀出至 `user space`。 ```c static int draw_board(char *table) ``` 這個函式將棋盤 `table[]` 加上棋格的所有字符寫入 `draw_buffer` 內,這裡就遇到了 `user <-> kernel` 之間的通訊負載過重的問題,這部分可以嘗試僅將所有棋格內棋子分布情況,即 `table[]` 讀出,並在 `user space` 再對讀出的 `table` 後才畫出棋盤,避免先將整個棋盤化為字串在整個傳出。另外若成功套用第三次作業的改善方式,則輸出的棋盤分布狀況 `table` 又能夠更加精簡 (僅需兩個 uint_32)。 以下開始定義此專案中的 `work items` : ```c static void drawboard_work_func(struct work_struct *w) { int cpu; /* This code runs from a kernel thread, so softirqs and hard-irqs must * be enabled. */ WARN_ON_ONCE(in_softirq()); WARN_ON_ONCE(in_interrupt()); /* Pretend to simulate access to per-CPU data, disabling preemption * during the pr_info(). */ cpu = get_cpu(); pr_info("kmldrv: [CPU#%d] %s\n", cpu, __func__); put_cpu(); read_lock(&attr_obj.lock); if (attr_obj.display == '0') { read_unlock(&attr_obj.lock); return; } read_unlock(&attr_obj.lock); mutex_lock(&producer_lock); draw_board(table); mutex_unlock(&producer_lock); /* Store data to the kfifo buffer */ mutex_lock(&consumer_lock); produce_board(); mutex_unlock(&consumer_lock); wake_up_interruptible(&rx_wait); } ``` 這邊可以看到 `WARN_ON_ONCE` 檢查這個函式不該是在中斷時被執行的,並且在 `cpu = get_cpu();` 與 `put_cpu();` 之間的所有動作,都將會是在同一個 CPU 內被執行,在這裡僅為了保證輸出 CPU 相關資訊時不要被搶佔,並且在這之後,進行棋盤的繪製 `draw_board` 以及輸出 `produce_board` ,可以看到此 `work item` 即是在 `timer` 中斷後負責一個相對複雜的工作,因此使用 `workqueue` 的方式延遲中斷。 ```c static void ai_one_work_func(struct work_struct *w) { ktime_t tv_start, tv_end; s64 nsecs; int cpu; WARN_ON_ONCE(in_softirq()); WARN_ON_ONCE(in_interrupt()); cpu = get_cpu(); pr_info("kmldrv: [CPU#%d] start doing %s\n", cpu, __func__); tv_start = ktime_get(); mutex_lock(&producer_lock); int move; WRITE_ONCE(move, mcts(table, 'O')); smp_mb(); if (move != -1) WRITE_ONCE(table[move], 'O'); WRITE_ONCE(turn, 'X'); WRITE_ONCE(finish, 1); smp_wmb(); mutex_unlock(&producer_lock); tv_end = ktime_get(); nsecs = (s64) ktime_to_ns(ktime_sub(tv_end, tv_start)); pr_info("kmldrv: [CPU#%d] doing %s for %llu usec\n", cpu, __func__, (unsigned long long) nsecs >> 10); put_cpu(); } ``` 這邊是棋手一透過蒙地卡羅演算法 `WRITE_ONCE(move, mcts(table, 'O'));` 下一手棋,並在下一步 `WRITE_ONCE(table[move], 'O');` 更新棋盤狀況,這裡要特別注意到使用了記憶體屏障 `smp_mb()` 以此保證隔開的兩個區塊前後,記憶體能夠正確的儲存完畢,以避免值令執行順序被優化而導致 `race condition` ,同理,使用巨集來寫入或讀區資料,如 `WRITE_ONCE` 也是為了使這個操作變成 `atomic` ,以保證資料的同步。 ```c static void game_tasklet_func(unsigned long __data) { ktime_t tv_start, tv_end; s64 nsecs; WARN_ON_ONCE(!in_interrupt()); WARN_ON_ONCE(!in_softirq()); tv_start = ktime_get(); READ_ONCE(finish); READ_ONCE(turn); smp_rmb(); if (finish && turn == 'O') { WRITE_ONCE(finish, 0); smp_wmb(); queue_work(kmldrv_workqueue, &ai_one_work); } else if (finish && turn == 'X') { WRITE_ONCE(finish, 0); smp_wmb(); queue_work(kmldrv_workqueue, &ai_two_work); } queue_work(kmldrv_workqueue, &mcts_calc_load_work); queue_work(kmldrv_workqueue, &drawboard_work); tv_end = ktime_get(); nsecs = (s64) ktime_to_ns(ktime_sub(tv_end, tv_start)); pr_info("kmldrv: [CPU#%d] %s in_softirq: %llu usec\n", smp_processor_id(), __func__, (unsigned long long) nsecs >> 10); } ``` 這個 `tasklet` 負責在 `timer interrupt` 觸發後,決定由哪位棋手下棋,並計算負載且輸出棋盤,這裡我發現 `workqueue` 是被這個 `tasklet` 所觸發的,也符合這兩種機制先前討論的特性,由 `tasklet` 進行需要在短時間內完成的任務分配、系統回應之後將複雜的任務交給 `work items` 去處理。 ```c static void ai_game(void) { WARN_ON_ONCE(!irqs_disabled()); pr_info("kmldrv: [CPU#%d] doing AI game\n", smp_processor_id()); pr_info("kmldrv: [CPU#%d] scheduling tasklet\n", smp_processor_id()); tasklet_schedule(&game_tasklet); } ``` 在 `timer_handler` 觸發時將會呼叫此 `ai_game` ,來調用 `game_tasklet`。 :::warning 不太明白這邊為何需要特別創建一個 function 來調度而不是直接在 timer 內呼叫 `tasklet_schedule(&game_tasklet);` ::: ```c static void timer_handler(struct timer_list *__timer){ ... local_irq_disable(); ... char win = check_win(table); if (win == ' ') { ai_game(); mod_timer(&timer, jiffies + msecs_to_jiffies(delay)); } else { read_lock(&attr_obj.lock); if (attr_obj.display == '1') { int cpu = get_cpu(); pr_info("kmldrv: [CPU#%d] Drawing final board\n", cpu); put_cpu(); mutex_lock(&producer_lock); draw_board(table); mutex_unlock(&producer_lock); /* Store data to the kfifo buffer */ mutex_lock(&consumer_lock); produce_board(); mutex_unlock(&consumer_lock); wake_up_interruptible(&rx_wait); } ... local_irq_enable(); } ``` 這個 function 就是在 `kernel module` 被啟動時在初始化會呼叫的計時器,每隔一段時間 (`msecs_to_jiffies(delay)`) 就會執行一次,並且在計時器中斷時會先將原有的 `softirq` 先無效化,以模擬設定的中斷執行環境 `tasklet` ,每次執行該 `handler` 時即檢查遊戲是否結束,若沒有就呼叫 `ai_game()` 啟動 `game_tasklet` 並接著呼叫 ```c queue_work(kmldrv_workqueue, &ai_two_work); or &ai_one_work ... queue_work(kmldrv_workqueue, &mcts_calc_load_work); queue_work(kmldrv_workqueue, &drawboard_work); ``` 若結束則將最後的棋盤輸出。 最後透過 `kmldrv_read` 裡面呼叫得 `ret = kfifo_to_user(&rx_fifo, buf, count, &read);` 將 `kernel` 中的 `buffer rx_fifo` 內部資料傳輸到 `user space` 的 `buffer buf` ,達成 `user <-> kernel` 之間的訊息傳輸。