Try   HackMD

Linux 核心專題: 改進 ttt 作為核心模組

執行人: lintin528
解說影片連結

Reviewed by mesohandsome

這個 function 就是在 kernel module 被啟動時在初始化會呼叫的計時器,每隔一段時間 (msecs_to_jiffies(delay)

可以說明一下 jiffies,這邊出現的很突然。

Reviewed by Lccgth

為了使某些耗費計算量較大的中斷事件能夠滿足上述需求,則會利用到 deferred interrupts 的機制,將中斷事件中必要的部分 top-half 先處理、較為耗時的工作 bottom-half 利用 workqueues、tasklet、softirqs 執行。

softirq 透過 kernel thread 執行較輕量且緊急的任務。

這兩段對於 softirq 的敘述有衝突,前者說 softirq 會執行較為耗時的工作,而後者說 softirq 會執行較輕量且緊急的任務。

任務簡介

重作第六次作業的 simrupt,整合 ttt 程式碼和彙整其他學員的成果,建構 Linux 核心模組。

TODO: 整合第三次作業提及的人工智慧程式碼到 simrupt

整合第三次作業提及的人工智慧程式碼,主體應在 Linux 核心內運作,讓二個不同的井字遊戲人工智慧演算法執行在「不同的 CPU」(善用 CMWQ) 並模擬二者的對弈,並允許使用者層級的程式藉由開啟 /dev/simrupt (可適度更名) 來設定二個人工智慧程式的對弈並存取彼此的棋步。
務必在 Linux 核心模組中使用定點數。
其一演算法必是 MCTS,另一者可參照第三次作業或 jserv/ttt 專案近期整合的 ELO rating system。

適度彙整其他學員的成果並予以驗證。

參閱 vax-r 同學之 作業3 後,可以看到他所設定的井字遊戲棋盤並不是只有 3*3 ,即他的 BOARD_SIZE 是可以被設定為三或以上的數字,並且在 game.h 中還另外設定了結構體

typedef struct {
    int i_shift, j_shift;
    int i_lower_bound, j_lower_bound, i_upper_bound, j_upper_bound;
} line_t;

用以在之後判斷對局是否結束,這邊看到在使用這個結構體時,會先建立一個 lines 數組

const line_t lines[4] = {
    {1, 0, 0, 0, BOARD_SIZE - GOAL + 1, BOARD_SIZE},             // ROW
    {0, 1, 0, 0, BOARD_SIZE, BOARD_SIZE - GOAL + 1},             // COL
    {1, 1, 0, 0, BOARD_SIZE - GOAL + 1, BOARD_SIZE - GOAL + 1},  // PRIMARY
    {1, -1, 0, GOAL - 1, BOARD_SIZE - GOAL + 1, BOARD_SIZE},     // SECONDARY
};

這裡分別代表四個方向 (橫向、直向、左上右下、右上左下) 的線段,GOAL 為判斷勝利的長度,在一般 3*3 的對局中即為 3 ,之後經過呼叫 check_win ,進行這四種獲勝條件方向的檢查。

char check_win(char *t)
{
    for (int i_line = 0; i_line < 4; ++i_line) {
        line_t line = lines[i_line];
        for (int i = line.i_lower_bound; i < line.i_upper_bound; ++i) {
            for (int j = line.j_lower_bound; j < line.j_upper_bound; ++j) {
                char win = check_line_segment_win(t, i, j, line);
                if (win != ' ')
                    return win;
            }
        }
    }
    for (int i = 0; i < N_GRIDS; i++)
        if (t[i] == ' ')
            return ' ';
    return 'D';
}

check_line_segment_win

for (int k = 1; k < GOAL; k++) {
        if (last != t[GET_INDEX(i + k * line.i_shift, j + k * line.j_shift)]) {
            return ' ';
        }

可以觀察到這個實作方式的優缺點,優點是這個勝利判定方式在棋盤為大於 3*3 的方形棋盤時也同樣可以適用,缺點在於這個方式做了過多的 for 迴圈,即在所有有可能達成勝利條件的起始點都進行一次的 check_line_segment_win ,因此在每一次落子之後,至少都需要耗費

O(N2) 的計算量,其中
N
為棋盤寬度;另外就是在資料存儲與傳輸的方面來看,這個實作方式需要一個大小為 N_GRIDS 的字串去儲存每一個格子內目前的狀況 ( ' ' / 'O' / 'X' )

char table[N_GRIDS];

在之後 user 與 kernel 之間的傳輸將會大幅提高 buffer 的負載量,因此這裡參考測驗四中的 tic-tac-toe 實作方式。

這邊使用 masks 來分別代表棋盤中的九個格子,每當一方棋手在此處落子,則需要將紀錄自己落子情況的 uint32_t boards[2] 與對應棋盤位置的 mask進行 OR 操作,以更新旗手的所有落子情形,如 boards[0] 為一號棋手的棋盤狀況,則當其選擇落在 A2 的位置,即對應到 move_masks[3] ,則須做 boards[0] |= move_masks[3]

static const uint32_t move_masks[9] = {
    0x40040040, 0x20004000, 0x10000404, 0x04020000, 0x02002022,
    0x01000200, 0x00410001, 0x00201000, 0x00100110,
};

// 0x40040040 -> 0100 0000 0000 0100 0000 0000 0100 0000:
// O |  |
// ---------    
//   |  |
// ---------   
//   |  |

在這邊的每一步後,都需要去記錄棋盤中哪幾個位置是被下過的,以避免重複落子,範例中使用了以下方式來應對:

uint32_t available_moves[9] = {0, 1, 2, 3, 4, 5, 6, 7, 8};
    for (uint32_t n_moves = 9; n_moves > 0; n_moves--) {
        /* Get board of player */
        uint32_t board = boards[player - 1];
        /* Choose random move */
        uint32_t i = fastmod(xorshift32(), n_moves);
        uint32_t move = available_moves[i];
        /* Delete move from available moves */
        available_moves[i] = available_moves[n_moves - 1];
        ...
            
// if choose '2', the array will become      {0, 1, 8, 3, 4, 5, 6, 7, 8}
// in next iteration, move will be chosen in {0, 1, 8, 3, 4, 5, 6, 7}.

這邊很巧妙的利用一個方式使每一次的選擇中不重複選到同樣的位置,當在
move = available_moves[i]; 記錄完本次選擇之後,
available_moves[i] = available_moves[n_moves - 1]; 會將此數組的最後一個元素去覆蓋掉本次所選擇的位置,這樣在 for 迴圈的下一次迭代時,會在前 n_moves 個元素做選擇,而被選取過的元素一定會被覆蓋掉,若是數組最後一個元素,也不會進入選擇範圍。

但這裡就遇到第一個問題:這個例子是以隨機選擇落子位置為前提而設計的,實際上棋手需要選擇特定的落子位置,目前想到的方式還是需要以一個大小為 N_GRIDS 的數組用以紀錄棋盤的落子情形,與先前的做法不同之處是在資料傳遞的過程中,並不會將此記錄整個棋盤狀況的數組也跟著傳遞,而是只透過 boards[2] 來表示兩位棋手目前的落子情形,以此降低 user <-> kernel 之間通訊的負擔。

再來就是勝利條件的檢查

static inline uint32_t is_win(uint32_t player_board)
{
    return (player_board + 0x11111111) & 0x88888888;
}

回想先前的 boards 設計,使用 uint32_t 變數中每 4 bits 的後 3 bits 用以紀錄這 8 條連線方式 (三個橫排、三個直排、兩條斜線) 的分佈情況,由此可知,當其中一條直線有三個落子時, player_board + 0x11111111 就會有其中一個 4 bits1000 ,因此與 0x88888888 做 AND 運算即可得到非零值。

第二個遇到的問題就是若想要套用 masks 的紀錄方式,目前還是被限制在 3*3 棋盤的條件下,若要套用於 4*4 的棋盤時,則總共會有 24 條三個棋子的連線方式,那麼還需要將儲存變數擴增為兩個
uint64_t (16*4) boards 來記錄落子情形,且新的 move_masks[16] 需要重新設定,以對應到這 24 條連線,但如果需要更大的棋盤 (5*5) ,建立 move_masks[25] 變得過於大型,複雜程度也將提升許多。

TODO: 提出程式碼的改進

vax-r 程式碼的缺陷: user <-> kernel 之間的通訊用大量的文字 (棋盤), workqueue (CMWQ) 無法善用 CPU 資源, 展現的方式 (原本在 Linux 核心模組繪圖,同步的成本), AI 演算法有改進空間
https://hackmd.io/@sysprog/linux2024-quiz4 (用數字表達下棋的狀況,單一的 atomic 操作即可保證內容正確)

在嘗試解析 vax-r 同學實作的 simrupt.c 之前,有必要先理解 CMWQ、workqueues、tasklet、softirqs 這幾個用於 deferred interrupts 的機制,以及他們之間相互的差異與關係。

在 Linux Kernel 中,會希望中斷事件的處理時間能夠盡量的短,以提高系統的響應能力,且避免堆積過多重要事件,降低系統穩定性,因此為了使某些耗費計算量較大的中斷事件能夠滿足上述需求,則會利用到 deferred interrupts 的機制,將中斷事件中必要的部分 top-half 先處理、較為耗時的工作 bottom-half 利用 workqueues、tasklet、softirqs 執行。

workqueue

workqueue 中,目標函式 work items 允許 blocksleep ,對於一些更複雜或耗時的操作更有利,與 tasklet 不同,每個 CPU 會分別建立自己的執行緒 workers 來執行 work items ,因此在 workqueue 中的任務可以分配到不同的 CPU 內執行,在本專案中, drawboard_work_funcai_one_work_func 等等需要同步記憶體空間訊息的操作,會使用 workqueue 的方式來處理。

schedule 是「排程」,而非「調度」
務必使用本課程教材規範的術語。

在多執行緒 multi-threaded 的環境下每個 CPU 都有自己的執行緒,但傳統的 workqueue 中並行處理效率並不是太好,因為靜態分配執行緒的關係,並不能根據各個 CPU 的負載來動態調整執行緒的運行狀況,相對的, CMWQ 提供一些改善,引入管理與調度 機制來使得各 CPU 能更有效率的分配資源,且使用全局的 worker-pool ,以避免一個執行緒僅執行特定 workqueue 中的任務,另外還有 CPU affinity 的調整等等,在保有原本 workqueue API 的提供更好的資源分配效率。

tasklet 與 softirq

softirq 透過 kernel thread 執行較輕量且緊急的任務,在中斷發生後更新狀態或進行簡單的處理,而 tasklet 是基於 softirq 的機制,提供更高層的接口,讓使用者可以簡單的使用,且同一類型的 tasklet 只可以在同一個 CPU 上執行,相對於 softirq 是系統啟動時靜態分配,tasklet 可以在使用者需要時創建,都是延遲中斷的處理機制。

simrupt.c 實作

struct kmldrv_attr {
    char display;
    char resume;
    char end;
    rwlock_t lock;
};

首先定義虛擬 file sys 中的結構

static void produce_board(void)
{
    unsigned int len = kfifo_in(&rx_fifo, draw_buffer, sizeof(draw_buffer));
    if (unlikely(len < sizeof(draw_buffer)) && printk_ratelimit())
        pr_warn("%s: %zu bytes dropped\n", __func__, sizeof(draw_buffer) - len);

    pr_debug("kmldrv: %s: in %u/%u bytes\n", __func__, len,
             kfifo_len(&rx_fifo));
}

這邊用於將計算出的棋盤,包括方格全部儲存的 draw_buffer 寫入 rx_fifo 並檢查有無缺失,之後再透過 read() 讀取模組文件時就可以將 kernel space 內的數據讀出至 user space

static int draw_board(char *table)

這個函式將棋盤 table[] 加上棋格的所有字符寫入 draw_buffer 內,這裡就遇到了 user <-> kernel 之間的通訊負載過重的問題,這部分可以嘗試僅將所有棋格內棋子分布情況,即 table[] 讀出,並在 user space 再對讀出的 table 後才畫出棋盤,避免先將整個棋盤化為字串在整個傳出。另外若成功套用第三次作業的改善方式,則輸出的棋盤分布狀況 table 又能夠更加精簡
(僅需兩個 uint_32)。

以下開始定義此專案中的 work items

static void drawboard_work_func(struct work_struct *w)
{
    int cpu;

    /* This code runs from a kernel thread, so softirqs and hard-irqs must
     * be enabled.
     */
    WARN_ON_ONCE(in_softirq());
    WARN_ON_ONCE(in_interrupt());

    /* Pretend to simulate access to per-CPU data, disabling preemption
     * during the pr_info().
     */
    cpu = get_cpu();
    pr_info("kmldrv: [CPU#%d] %s\n", cpu, __func__);
    put_cpu();

    read_lock(&attr_obj.lock);
    if (attr_obj.display == '0') {
        read_unlock(&attr_obj.lock);
        return;
    }
    read_unlock(&attr_obj.lock);

    mutex_lock(&producer_lock);
    draw_board(table);
    mutex_unlock(&producer_lock);

    /* Store data to the kfifo buffer */
    mutex_lock(&consumer_lock);
    produce_board();
    mutex_unlock(&consumer_lock);

    wake_up_interruptible(&rx_wait);
}

這邊可以看到 WARN_ON_ONCE 檢查這個函式不該是在中斷時被執行的,並且在
cpu = get_cpu();put_cpu(); 之間的所有動作,都將會是在同一個 CPU 內被執行,在這裡僅為了保證輸出 CPU 相關資訊時不要被搶佔,並且在這之後,進行棋盤的繪製 draw_board 以及輸出 produce_board ,可以看到此 work item 即是在 timer 中斷後負責一個相對複雜的工作,因此使用 workqueue 的方式延遲中斷。

static void ai_one_work_func(struct work_struct *w)
{
    ktime_t tv_start, tv_end;
    s64 nsecs;

    int cpu;

    WARN_ON_ONCE(in_softirq());
    WARN_ON_ONCE(in_interrupt());

    cpu = get_cpu();
    pr_info("kmldrv: [CPU#%d] start doing %s\n", cpu, __func__);
    tv_start = ktime_get();
    mutex_lock(&producer_lock);
    int move;
    WRITE_ONCE(move, mcts(table, 'O'));

    smp_mb();

    if (move != -1)
        WRITE_ONCE(table[move], 'O');

    WRITE_ONCE(turn, 'X');
    WRITE_ONCE(finish, 1);
    smp_wmb();
    mutex_unlock(&producer_lock);
    tv_end = ktime_get();

    nsecs = (s64) ktime_to_ns(ktime_sub(tv_end, tv_start));
    pr_info("kmldrv: [CPU#%d] doing %s for %llu usec\n", cpu, __func__,
            (unsigned long long) nsecs >> 10);
    put_cpu();
}

這邊是棋手一透過蒙地卡羅演算法 WRITE_ONCE(move, mcts(table, 'O')); 下一手棋,並在下一步 WRITE_ONCE(table[move], 'O'); 更新棋盤狀況,這裡要特別注意到使用了記憶體屏障 smp_mb() 以此保證隔開的兩個區塊前後,記憶體能夠正確的儲存完畢,以避免值令執行順序被優化而導致 race condition ,同理,使用巨集來寫入或讀區資料,如 WRITE_ONCE 也是為了使這個操作變成 atomic ,以保證資料的同步。

static void game_tasklet_func(unsigned long __data)
{
    ktime_t tv_start, tv_end;
    s64 nsecs;

    WARN_ON_ONCE(!in_interrupt());
    WARN_ON_ONCE(!in_softirq());

    tv_start = ktime_get();

    READ_ONCE(finish);
    READ_ONCE(turn);
    smp_rmb();

    if (finish && turn == 'O') {
        WRITE_ONCE(finish, 0);
        smp_wmb();
        queue_work(kmldrv_workqueue, &ai_one_work);
    } else if (finish && turn == 'X') {
        WRITE_ONCE(finish, 0);
        smp_wmb();
        queue_work(kmldrv_workqueue, &ai_two_work);
    }
    queue_work(kmldrv_workqueue, &mcts_calc_load_work);
    queue_work(kmldrv_workqueue, &drawboard_work);
    tv_end = ktime_get();

    nsecs = (s64) ktime_to_ns(ktime_sub(tv_end, tv_start));

    pr_info("kmldrv: [CPU#%d] %s in_softirq: %llu usec\n", smp_processor_id(),
            __func__, (unsigned long long) nsecs >> 10);
}

這個 tasklet 負責在 timer interrupt 觸發後,決定由哪位棋手下棋,並計算負載且輸出棋盤,這裡我發現 workqueue 是被這個 tasklet 所觸發的,也符合這兩種機制先前討論的特性,由 tasklet 進行需要在短時間內完成的任務分配、系統回應之後將複雜的任務交給 work items 去處理。

static void ai_game(void)
{
    WARN_ON_ONCE(!irqs_disabled());

    pr_info("kmldrv: [CPU#%d] doing AI game\n", smp_processor_id());
    pr_info("kmldrv: [CPU#%d] scheduling tasklet\n", smp_processor_id());
    tasklet_schedule(&game_tasklet);
}

timer_handler 觸發時將會呼叫此 ai_game ,來調用 game_tasklet

不太明白這邊為何需要特別創建一個 function 來調度 而不是直接在 timer 內呼叫 tasklet_schedule(&game_tasklet);

作實驗印證你的想法,避免「舉燭」

static void timer_handler(struct timer_list *__timer){
    
    ...
        
    local_irq_disable();
    
    ...
    char win = check_win(table);
    if (win == ' ') {
        ai_game();
        mod_timer(&timer, jiffies + msecs_to_jiffies(delay));
    } else {
        read_lock(&attr_obj.lock);
        if (attr_obj.display == '1') {
            int cpu = get_cpu();
            pr_info("kmldrv: [CPU#%d] Drawing final board\n", cpu);
            put_cpu();

            mutex_lock(&producer_lock);
            draw_board(table);
            mutex_unlock(&producer_lock);

            /* Store data to the kfifo buffer */
            mutex_lock(&consumer_lock);
            produce_board();
            mutex_unlock(&consumer_lock);

            wake_up_interruptible(&rx_wait);
        }
    ...
    local_irq_enable();
}

這個 function 就是在 kernel module 被啟動時在初始化會呼叫的計時器,每隔一段時間 (msecs_to_jiffies(delay)) 就會執行一次,並且在計時器中斷時會先將原有的 softirq 先無效化,以模擬設定的中斷執行環境 tasklet ,每次執行該 handler 時即檢查遊戲是否結束,若沒有就呼叫 ai_game() 啟動 game_tasklet 並接著呼叫

queue_work(kmldrv_workqueue, &ai_two_work);
                        or   &ai_one_work
...
queue_work(kmldrv_workqueue, &mcts_calc_load_work);
queue_work(kmldrv_workqueue, &drawboard_work);

若結束則將最後的棋盤輸出。

最後透過 kmldrv_read 裡面呼叫 ret = kfifo_to_user(&rx_fifo, buf, count, &read);kernel 中的 buffer rx_fifo 內部資料傳輸到 user spacebuffer buf ,達成
user <-> kernel 之間的訊息傳輸。

在這個實作中其中一個可改善的方式即為透過測驗四中的 tic-tac-toe 實作手法,減少
user <-> kernel 之間的傳輸成本。