Try   HackMD

2024q1 Homework5 (assessment)

contributed by < lintin528 >

紀錄閱讀〈因為自動飲料機而延畢的那一年〉的啟發

看完這篇文章的第一個想法是佩服作者的恆心,更可貴的是他儘管在一開始就遇到冰塊無法分開的這個沒想到會遇過的問題時,能夠不斷地想出自己的解決方法並真的去嘗試可行性,就我的觀點而言,在計畫的一開始就遇到這種無法預見且來自於一個小地方的問題時,多次嘗試解決卻又無果時,肯定會產生很大的無力感,畢竟這只是初期準備而已,誰能知道後面機器運作、 pipeline 流水線規劃 、電路上又會有多少問題,如果是我的話或許在這一階段就有很大的機會放棄了。

「你不能現在就放棄,要是現在就放棄的話,你這輩子日後遇到這種等級的困難,就只會想逃避而已。」 這句話帶給我很大的激勵,或許很多任務都不是可以簡單地被規劃出解決方法的,這種時候嘗試就是最好的解決方法,我認為漫無目的地尋找能一口氣完成的方式,逐步進行試錯更能帶給我有在前進的感覺,而不會產生原地踏步的絕望感,而且這更能使自己熟悉所做的所有細節。學習也是這樣,總是需要先建立好基礎,才能構築出更完整的專案,我覺得自己總在學習基礎時感到迷茫,看不見學習完能實際表現出來的成果,或許就是因為缺少了這種實作精神,紙上談兵永遠都只是想像的,就像是這幾個作業,僅僅提到 "效能提高" 可能是不負責任的,但一想到那些數學證明或統計,我往往會有些退縮,但這就是真正能成就、驗證什麼的人會去做的,看完這篇文章,我看見了作者做出決定前不會去害怕這麼做是否會一無所獲,嘗試未知的勇氣,我自認基礎不足,非常不足,但我也不該將學習僅僅侷限在單方面的接納知識,更該驗證自己的想法基於我所吸收的是否相同,這些疑惑我都該自己嘗試去分析、解決,就像作者勇於嘗試找出每個問題屬於自己的解法一樣。

從前 4 週的測驗題選出 3 題改進

紀錄第 1 到第 6 週「課程教材」和 CS:APP 3/e 心得和提問

第四次作業中的第三周測驗題二

d0 = q & 0b1;
d1 = q & 0b11;
d2 = q & 0b111;
q = ((((tmp >> 3) + (tmp >> 1) + tmp) << 3) + d0 + d1 + d2) >> 7;
r = tmp - (((q << 2) + q) << 1);

這段程式碼有一些疑惑,主要是對於 d0, d1, d2 的功用,以及他的重複位元該怎麼處理,另外是這一段與最後的 divmod_10 之間的關係也不是很明白。

簡述想投入的專案

對於排程或是並行程式這些不是直接透過改善任務演算法,而是透過分配硬體運算資源改善效能的方式較感興趣,希望能透過這次專題接觸實務上是如何分配資源,將先前作業系統相關的知識能自己嘗試做出分析。

並行程式設計

回顧〈並行和多執行緒程式設計〉教材和相關測驗題,強化對延伸問題的掌握

Linux 排程器研究

探討 Linux 排程器內部設計,改進《Demystifying the Linux CPU Scheduler》,並尋求貢獻程式碼到 Linux 核心的機會

另外在這幾次的作業以及測驗中看到了許多利用 GNU extension 的位元操作來改善如 ilog2 、除法、平方根等等運算的效能,若能熟悉且合理的運用在其他專案上的話,期望在效能改善方面有幫助,我認為這是一個容易被忽視但卻是基本且有效的,若是自己在其他專案因為能力不足無法完成一定成果的話,位元操作是相對容易理解並實作的,所以選擇這個主題。

位元操作

第 2 週測驗題嘗試檢驗學員對位元操作的認知,本任務強化相關練習。


x % 4 <-> x & 0b11
x % 8 <-> x & 0b111

TODO: 第六次作業 (simrupt + ttt) 並改進現有同學的實作 (vax-r)

vax-r 程式碼的缺陷: user <-> kernel 之間的通訊用大量的文字 (棋盤), workqueue (CMWQ) 無法善用 CPU 資源, 展現的方式 (原本在 Linux 核心模組繪圖,同步的成本), AI 演算法有改進空間
https://hackmd.io/@sysprog/linux2024-quiz4 (用數字表達下棋的狀況,單一的 atomic 操作即可保證內容正確)

改善作業三實作中棋盤的紀錄方式及勝利判斷

參閱 vax-r 同學之 作業3 後,可以看到他所設定的井字遊戲棋盤並不是只有 3*3 ,即他的 BOARD_SIZE 是可以被設定為三或以上的數字,並且在 game.h 中還另外設定了結構體

typedef struct {
    int i_shift, j_shift;
    int i_lower_bound, j_lower_bound, i_upper_bound, j_upper_bound;
} line_t;

用以在之後判斷對局是否結束,這邊看到在使用這個結構體時,會先建立一個 lines 數組

const line_t lines[4] = {
    {1, 0, 0, 0, BOARD_SIZE - GOAL + 1, BOARD_SIZE},             // ROW
    {0, 1, 0, 0, BOARD_SIZE, BOARD_SIZE - GOAL + 1},             // COL
    {1, 1, 0, 0, BOARD_SIZE - GOAL + 1, BOARD_SIZE - GOAL + 1},  // PRIMARY
    {1, -1, 0, GOAL - 1, BOARD_SIZE - GOAL + 1, BOARD_SIZE},     // SECONDARY
};

這裡分別代表四個方向 (橫向、直向、左上右下、右上左下) 的線段,GOAL 為判斷勝利的長度,在一般 3*3 的對局中即為 3 ,之後經過呼叫 check_win ,進行這四種獲勝條件方向的檢查。

char check_win(char *t)
{
    for (int i_line = 0; i_line < 4; ++i_line) {
        line_t line = lines[i_line];
        for (int i = line.i_lower_bound; i < line.i_upper_bound; ++i) {
            for (int j = line.j_lower_bound; j < line.j_upper_bound; ++j) {
                char win = check_line_segment_win(t, i, j, line);
                if (win != ' ')
                    return win;
            }
        }
    }
    for (int i = 0; i < N_GRIDS; i++)
        if (t[i] == ' ')
            return ' ';
    return 'D';
}

check_line_segment_win

for (int k = 1; k < GOAL; k++) {
        if (last != t[GET_INDEX(i + k * line.i_shift, j + k * line.j_shift)]) {
            return ' ';
        }

可以觀察到這個實作方式的優缺點,優點是這個勝利判定方式在棋盤為大於 3*3 的方形棋盤時也同樣可以適用,缺點在於這個方式做了過多的 for 迴圈,即在所有有可能達成勝利條件的起始點都進行一次的 check_line_segment_win ,因此在每一次落子之後,至少都需要耗費

O(N2) 的計算量,其中
N
為棋盤寬度;另外就是在資料存儲與傳輸的方面來看,這個實作方式需要一個大小為 N_GRIDS 的字串去儲存每一個格子內目前的狀況 ( ' ' / 'O' / 'X' )

char table[N_GRIDS];

在之後 user 與 kernel 之間的傳輸將會大幅提高 buffer 的附載量,因此這裡參考測驗四中的 tic-tac-toe 實作方式。

這邊使用 masks 來分別代表棋盤中的九個格子,每當一方棋手在此處落子,則需要將紀錄自己落子情況的 uint32_t boards[2] 與對應棋盤位置的 mask進行 OR 操作,以更新旗手的所有落子情形,如 boards[0] 為一號棋手的棋盤狀況,則當其選擇落在 A2 的位置,即對應到 move_masks[3] ,則須做 boards[0] |= move_masks[3]

static const uint32_t move_masks[9] = {
    0x40040040, 0x20004000, 0x10000404, 0x04020000, 0x02002022,
    0x01000200, 0x00410001, 0x00201000, 0x00100110,
};

在這邊的每一步後,都需要去記錄棋盤中哪幾個位置是被下過的,以避免重複落子,範例中使用了以下方式來應對:

uint32_t available_moves[9] = {0, 1, 2, 3, 4, 5, 6, 7, 8};
    for (uint32_t n_moves = 9; n_moves > 0; n_moves--) {
        /* Get board of player */
        uint32_t board = boards[player - 1];
        /* Choose random move */
        uint32_t i = fastmod(xorshift32(), n_moves);
        uint32_t move = available_moves[i];
        /* Delete move from available moves */
        available_moves[i] = available_moves[n_moves - 1];
        ...

這邊很巧妙的利用一個方式使每一次的選擇中不重複選到同樣的位置,當在
move = available_moves[i]; 記錄完本次選擇之後,
available_moves[i] = available_moves[n_moves - 1]; 會將此數組的最後一個元素去覆蓋掉本次所選擇的位置,這樣在 for 迴圈的下一次迭代時,會在前 n_moves 個元素做選擇,而被選取過的元素一定會被覆蓋掉,若是數組最後一個元素,也不會進入選擇範圍。

但這裡就遇到第一個問題:這個例子是以隨機選擇落子位置為前提而設計的,實際上棋手需要選擇特定的落子位置,目前想到的方式還是需要以一個大小為 N_GRIDS 的數組用以紀錄棋盤的落子情形,與先前的做法不同之處是在資料傳遞的過程中,並不會將此記錄整個棋盤狀況的數組也跟著傳遞,而是只透過 boards[2] 來表示兩位棋手目前的落子情形,以此降低 user <-> kernel 之間通訊的負擔。

再來就是勝利條件的檢查

static inline uint32_t is_win(uint32_t player_board)
{
    return (player_board + 0x11111111) & 0x88888888;
}

回想先前的 boards 設計,使用 uint32_t 變數中每 4 bits 的後 3 bits 用以紀錄這 8 條連線方式 (三個橫排、三個直排、兩條斜線) 的分佈情況,由此可知,當其中一條直線有三個落子時, player_board + 0x11111111 就會有其中一個 4 bits1000 ,因此與 0x88888888 做 AND 運算即可得到非零值。

第二個遇到的問題就是若想要套aㄎ用 masks 的紀錄方式,目前還是被限制在 3*3 棋盤的條件下,若要套用於 4*4 的棋盤時,則總共會有 16 條三個棋子的連線方式,那麼還是可以透過
uint64_t (16*4) boards[2] 來記錄落子情形,且新的 move_masks[16] 需要重新設定,以對應到這 16 條連線,但如果需要更大的棋盤 (5*5) ,建立 move_masks[25] 變得過於大型,複雜程度也將提升許多。

分析第六次作業中 simrupt.c 中的實作與資料流向

在嘗試解析 vax-r 同學實作的 simrupt.c 之前,有必要先理解 CMWQ、workqueues、tasklet、softirqs 這幾個用於 deferred interrupts 的機制,以及他們之間相互的差異與關係。

在 Linux Kernel 中,會希望中斷事件的處理時間能夠盡量的短,以提高系統的響應能力,且避免堆積過多重要事件,降低系統穩定性,因此為了使某些耗費計算量較大的中斷事件能夠滿足上述需求,則會利用到 deferred interrupts 的機制,將中斷事件中必要的部分 top-half 先處理、較為耗時的工作 bottom-half 利用 workqueues、tasklet、softirqs 執行。

workqueue

workqueue 中,目標函式 work items 允許 blocksleep ,對於一些更複雜或耗時的操作更有利,與 tasklet 不同,每個 CPU 會分別建立自己的執行緒 workers 來執行 work items ,因此在 workqueue 中的任務可以分配到不同的 CPU 內執行,在本專案中, drawboard_work_funcai_one_work_func 等等需要同步記憶體空間訊息的操作,會使用 workqueue 的方式來處理。

在多執行緒 multi-threaded 的環境下每個 CPU 都有自己的執行緒,但傳統的 workqueue 中並行處理效率並不是太好,因為靜態分配執行緒的關係,並不能根據各個 CPU 的負載來動態調整執行緒的運行狀況,相對的, CMWQ 提供一些改善,引入管理與調度機制來使得各 CPU 能更有效率的分配資源,且使用全局的 worker-pool ,以避免一個執行緒僅執行特定 workqueue 中的任務,另外還有 CPU affinity 的調整等等,在保有原本 workqueue API 的提供更好的資源分配效率。

tasklet 與 softirq

softirq 透過 kernel thread 執行較輕量且緊急的任務,在中斷發生後更新狀態或進行簡單的處理,而 tasklet 是基於 softirq 的機制,提供更高層的接口,讓使用者可以簡單的使用,且同一類型的 tasklet 只可以在同一個 CPU 上執行,相對於 softirq 是系統啟動時靜態分配,tasklet 可以在使用者需要時創建,都是延遲中斷的處理機制。

simrupt.c 實作

struct kmldrv_attr {
    char display;
    char resume;
    char end;
    rwlock_t lock;
};

首先定義虛擬 file sys 中的結構

static void produce_board(void)
{
    unsigned int len = kfifo_in(&rx_fifo, draw_buffer, sizeof(draw_buffer));
    if (unlikely(len < sizeof(draw_buffer)) && printk_ratelimit())
        pr_warn("%s: %zu bytes dropped\n", __func__, sizeof(draw_buffer) - len);

    pr_debug("kmldrv: %s: in %u/%u bytes\n", __func__, len,
             kfifo_len(&rx_fifo));
}

這邊用於將計算出的棋盤,包括方格全部儲存的 draw_buffer 寫入 rx_fifo 並檢查有無缺失,之後再透過 read() 讀取模組文件時就可以將 kernel space 內的數據讀出至 user space

static int draw_board(char *table)

這個函式將棋盤 table[] 加上棋格的所有字符寫入 draw_buffer 內,這裡就遇到了 user <-> kernel 之間的通訊負載過重的問題,這部分可以嘗試僅將所有棋格內棋子分布情況,即 table[] 讀出,並在 user space 再對讀出的 table 後才畫出棋盤,避免先將整個棋盤化為字串在整個傳出。另外若成功套用第三次作業的改善方式,則輸出的棋盤分布狀況 table 又能夠更加精簡
(僅需兩個 uint_32)。

以下開始定義此專案中的 work items

static void drawboard_work_func(struct work_struct *w)
{
    int cpu;

    /* This code runs from a kernel thread, so softirqs and hard-irqs must
     * be enabled.
     */
    WARN_ON_ONCE(in_softirq());
    WARN_ON_ONCE(in_interrupt());

    /* Pretend to simulate access to per-CPU data, disabling preemption
     * during the pr_info().
     */
    cpu = get_cpu();
    pr_info("kmldrv: [CPU#%d] %s\n", cpu, __func__);
    put_cpu();

    read_lock(&attr_obj.lock);
    if (attr_obj.display == '0') {
        read_unlock(&attr_obj.lock);
        return;
    }
    read_unlock(&attr_obj.lock);

    mutex_lock(&producer_lock);
    draw_board(table);
    mutex_unlock(&producer_lock);

    /* Store data to the kfifo buffer */
    mutex_lock(&consumer_lock);
    produce_board();
    mutex_unlock(&consumer_lock);

    wake_up_interruptible(&rx_wait);
}

這邊可以看到 WARN_ON_ONCE 檢查這個函式不該是在中斷時被執行的,並且在
cpu = get_cpu();put_cpu(); 之間的所有動作,都將會是在同一個 CPU 內被執行,在這裡僅為了保證輸出 CPU 相關資訊時不要被搶佔,並且在這之後,進行棋盤的繪製 draw_board 以及輸出 produce_board ,可以看到此 work item 即是在 timer 中斷後負責一個相對複雜的工作,因此使用 workqueue 的方式延遲中斷。

static void ai_one_work_func(struct work_struct *w)
{
    ktime_t tv_start, tv_end;
    s64 nsecs;

    int cpu;

    WARN_ON_ONCE(in_softirq());
    WARN_ON_ONCE(in_interrupt());

    cpu = get_cpu();
    pr_info("kmldrv: [CPU#%d] start doing %s\n", cpu, __func__);
    tv_start = ktime_get();
    mutex_lock(&producer_lock);
    int move;
    WRITE_ONCE(move, mcts(table, 'O'));

    smp_mb();

    if (move != -1)
        WRITE_ONCE(table[move], 'O');

    WRITE_ONCE(turn, 'X');
    WRITE_ONCE(finish, 1);
    smp_wmb();
    mutex_unlock(&producer_lock);
    tv_end = ktime_get();

    nsecs = (s64) ktime_to_ns(ktime_sub(tv_end, tv_start));
    pr_info("kmldrv: [CPU#%d] doing %s for %llu usec\n", cpu, __func__,
            (unsigned long long) nsecs >> 10);
    put_cpu();
}

這邊是棋手一透過蒙地卡羅演算法 WRITE_ONCE(move, mcts(table, 'O')); 下一手棋,並在下一步 WRITE_ONCE(table[move], 'O'); 更新棋盤狀況,這裡要特別注意到使用了記憶體屏障 smp_mb() 以此保證隔開的兩個區塊前後,記憶體能夠正確的儲存完畢,以避免值令執行順序被優化而導致 race condition ,同理,使用巨集來寫入或讀區資料,如 WRITE_ONCE 也是為了使這個操作變成 atomic ,以保證資料的同步。

static void game_tasklet_func(unsigned long __data)
{
    ktime_t tv_start, tv_end;
    s64 nsecs;

    WARN_ON_ONCE(!in_interrupt());
    WARN_ON_ONCE(!in_softirq());

    tv_start = ktime_get();

    READ_ONCE(finish);
    READ_ONCE(turn);
    smp_rmb();

    if (finish && turn == 'O') {
        WRITE_ONCE(finish, 0);
        smp_wmb();
        queue_work(kmldrv_workqueue, &ai_one_work);
    } else if (finish && turn == 'X') {
        WRITE_ONCE(finish, 0);
        smp_wmb();
        queue_work(kmldrv_workqueue, &ai_two_work);
    }
    queue_work(kmldrv_workqueue, &mcts_calc_load_work);
    queue_work(kmldrv_workqueue, &drawboard_work);
    tv_end = ktime_get();

    nsecs = (s64) ktime_to_ns(ktime_sub(tv_end, tv_start));

    pr_info("kmldrv: [CPU#%d] %s in_softirq: %llu usec\n", smp_processor_id(),
            __func__, (unsigned long long) nsecs >> 10);
}

這個 tasklet 負責在 timer interrupt 觸發後,決定由哪位棋手下棋,並計算負載且輸出棋盤,這裡我發現 workqueue 是被這個 tasklet 所觸發的,也符合這兩種機制先前討論的特性,由 tasklet 進行需要在短時間內完成的任務分配、系統回應之後將複雜的任務交給 work items 去處理。

static void ai_game(void)
{
    WARN_ON_ONCE(!irqs_disabled());

    pr_info("kmldrv: [CPU#%d] doing AI game\n", smp_processor_id());
    pr_info("kmldrv: [CPU#%d] scheduling tasklet\n", smp_processor_id());
    tasklet_schedule(&game_tasklet);
}

timer_handler 觸發時將會呼叫此 ai_game ,來調用 game_tasklet

不太明白這邊為何需要特別創建一個 function 來調度而不是直接在 timer 內呼叫 tasklet_schedule(&game_tasklet);

static void timer_handler(struct timer_list *__timer){
    
    ...
        
    local_irq_disable();
    
    ...
    char win = check_win(table);
    if (win == ' ') {
        ai_game();
        mod_timer(&timer, jiffies + msecs_to_jiffies(delay));
    } else {
        read_lock(&attr_obj.lock);
        if (attr_obj.display == '1') {
            int cpu = get_cpu();
            pr_info("kmldrv: [CPU#%d] Drawing final board\n", cpu);
            put_cpu();

            mutex_lock(&producer_lock);
            draw_board(table);
            mutex_unlock(&producer_lock);

            /* Store data to the kfifo buffer */
            mutex_lock(&consumer_lock);
            produce_board();
            mutex_unlock(&consumer_lock);

            wake_up_interruptible(&rx_wait);
        }
    ...
    local_irq_enable();
}

這個 function 就是在 kernel module 被啟動時在初始化會呼叫的計時器,每隔一段時間 (msecs_to_jiffies(delay)) 就會執行一次,並且在計時器中斷時會先將原有的 softirq 先無效化,以模擬設定的中斷執行環境 tasklet ,每次執行該 handler 時即檢查遊戲是否結束,若沒有就呼叫 ai_game() 啟動 game_tasklet 並接著呼叫

queue_work(kmldrv_workqueue, &ai_two_work);
                        or   &ai_one_work
...
queue_work(kmldrv_workqueue, &mcts_calc_load_work);
queue_work(kmldrv_workqueue, &drawboard_work);

若結束則將最後的棋盤輸出。

最後透過 kmldrv_read 裡面呼叫得 ret = kfifo_to_user(&rx_fifo, buf, count, &read);kernel 中的 buffer rx_fifo 內部資料傳輸到 user spacebuffer buf ,達成
user <-> kernel 之間的訊息傳輸。