Try  HackMD Logo HackMD

Linux 核心專題: 重作 kxo

執行人: HeLunWu0317
期末專題影片

Reviewed by wurrrrrrrrrr

系統為每顆 CPU 建立兩個 worker-pool(normal 與 high-priority)那我要如何知道他們執行緒的區別,要觀察什麼值?

根據 kernel/workqueue.c 的 header 中, There are two worker pools for each CPU (one for normal work items and the other for high priority ones) and some extra pools for workqueues which are not bound to any specific CPU
所以每顆 CPU 兩個 worker-pool,而在 kernel 文件的 workqueue 部分中,有這一段
WQ_HIGHPRI Work items of a highpri wq are queued to the highpri worker-pool of the target cpu. Highpri worker-pools are served by worker threads with elevated nice level.
所以我們再回到 workqueue.c 中,可以找到format_worker_id(),函式中有寫:pool->attrs->nice < 0 ? "H" : "",如果該 worker 所屬的 pool 之 nice 值是負的,表示它是 high-priority pool。這時 thread 名稱尾端會加上 H,而根據老師之前課堂上介紹 nice 值,值越小,優先權越大,知道這些後,就可以透過指令去觀察, 下指令 ps -eLo pid,nice,comm | grep kworker,可以觀察 kernel worker 中所有process 的 PID 和 nice 值以及名稱。內容舉例:289 -20 kworker/9:1H-events_highpri ,289 為 pid ,-20為 nice 值(為最高優先),最後是名稱,名稱後面可以看到 大寫 H ,如 workqueue.c 程式碼內容所述,high-priority 的 process 在名稱後面會加上 H ,而寫上 R 的 process 是 unbound workqueue,都沒寫就是 normal pool (都可在 workqueue.c中觀察)。 HeLunWu0317

Reviewed by thwuedwin

「平均 read size 從 128 -> 4bytes 下降了 97% ,字串格式轉換成位元運算節省了大量運算。

這個 128 沒有給單位,若單位是 bytes 的話原本的專案中也不是這個數字,這數字是如何得出來的?你後續的討論也圍繞著 4 bytes,但你的實作是

3×3 的棋盤,你的計算也給出 18 bits,為何是 4 bytes?
另外,錯字非常多,至少有五處錯字。commit 3091c1a的超連結格式跑掉了不能直接點,標題也有錯字。

關於 128 byts 其實是筆記沒有更新的錯誤,我在後續使用 strace 作分析時,有發現和我原先預想的不同,而在根據 game.h 中的 DRAWBUFFER_SIZE 發現,此設計是動態調整的,當今天的棋盤是 4 x 4 的時候,會是 66 bytes 左右。
我會使用 4bytes 的原因最主要是要將資料放入 kfifo buffer 中的話,對於 CPU 來說 4bytes(32bits)可以 Alignment,而且此專案中多數設置參數為 u32 或 u64 ,以上理由,使用 4 bytes 我認為對於通訊效能會更好。 HeLunWu0317

Reviewed by Mike1117

但這裡會遇到一個問題,該如何判斷輪到誰執行?單純用 mutex lock 是不足夠的,因為有可能某方連續兩次都搶到該 lock ,使得它可以連續下兩步
所以當 CPU0 下完棋,解除 mutex(lock) ,但是此時可能還有剩餘的 time slice 以及處於執行狀態,導致在 queue 中的 CPU0 有可能被 CFS 放在靠前的位置,之後又被 schedule選中 。

最後是如何解決這個問題的?

Reviewed by liangchingyun

有兩處 "kxo" 後面接中文時沒有空一格:

  1. 自 GitHub kxo進行 fork
  2. 原本 kxo棋盤繪製在核心模式

已修正。 HeLunWu0317

Reviewed by weiso131

但這裡會遇到一個問題,該如何判斷輪到誰執行?單純用 mutex lock 是不足夠的,因為有可能某方連續兩次都搶到該 lock ,使得它可以連續下兩步
所以當 CPU0 下完棋,解除 mutex(lock) ,但是此時可能還有剩餘的 time slice 以及處於執行狀態,導致在 queue 中的 CPU0 有可能被 CFS 放在靠前的位置,之後又被 schedule選中 。

關於如何確保不會連續下兩步
觀察 ai_one_work_func ,可以發現 mutex_lock 鎖住的區域包含:使用 mcts 決定該下哪裡並更新到棋盤, 將 turn 換成 'X' ,這能確保這段 critical section 之後, game_tasklet_func 絕對會將 ai_two_work 放進 workqueue ,而 queuework 能確保同個 work 不會在已經存在於任意 workqueue 的情況下,再次被排入 workqueue (workqueue 的文件沒有直接寫到,但如果遇到這個情況會回傳 false) ,因此能保證不會有一個 work_func 連續搶到 lock 。

Reviewed by alexc-313

程式碼區塊皆未註記程式語言,開頭的系統資訊應該放在同一個程式碼區塊中,並正確標示為 bash
此外,請注意排版,錯誤的標記標題會導致讀者無法正確的使用索引。

因此使用 strace 分析使用的平均資料大小,根據呼叫次數以及總使用量,得到的平勳數可以看出仍然降低了通訊成本。

請問有附上 strace 的測試結果嗎?

已附上。 HeLunWu0317

Reviewed by leonnig

目前修改了 frame 大小,不再是 4byte大小 ,因此使用 strace 分析使用的平均資料大小,根據呼叫次數以及總使用量,得到的平勳數可以看出仍然降低了通訊成本。

想請問 frame 的 size 是修改為多少,有實際通訊成本降低的幅度嗎?

這一段詳細說明應該是我接收資料後,有兩個可能 4 byte的資料,那一定是棋盤,如果不是,我將它放到另一個較大的 buffer 中,此 buffer 設置為 1024 bytes。 HeLunWu0317

Reviewed by MuChengChen

Simulation : 計算此 path 的勝率,也就是從目前心節點開始的 path 隨機的把此 path 走到底,得到勝(1)或負(0)。

「心節點」是「新節點」吧 ?

已修正。 HeLunWu0317

任務描述

第三份作業為基礎、搭配課堂討論和觀摩學員的成果,重新投入 kxo 開發。過程中可參照其他學員的成果 (但要實驗和指出其缺失),務必清楚標示。

TODO: 重作 kxo

除了原本題目的要求,注意定點數運算誤差的分析和消除

作業要求:

  • 自 GitHub kxo 進行 fork,你的任務是縮減使用者和核心層級的通訊成本,並允許並行的對奕

  • 參照〈並行程式設計:排程器原理〉,引入若干 coroutine (注意:不得使用 POSIX Thread,採取教材提到的實作方式),分別對應到 AI1, AI2 (具備玩井字遊戲的人工智慧程式,採取不同的演算法,可指定運作於使用者/核心層級) 和鍵盤事件處理,意味著使用者可連續觀看多場「電腦 vs. 電腦」的對弈,當按下 Ctrl-P 時暫停或回復棋盤畫面更新 (但運算仍持續)、當按下 Ctrl-Q 時離開「電腦 vs. 電腦」的對弈,注意不該使用 (n)curses 函式庫。當離開「電腦 vs. 電腦」的對弈時,ttt 應顯示多場對弈的過程,亦即類似 Moves: B2 -> C2 -> C3 -> D3 -> D4 的輸出,儘量重用現有的結構體和程式碼。

    關於鍵盤事件和終端機畫面的處理機制,可參見 mazu-editor 原始程式碼和〈Build Your Own Text Editor〉。

  • 在「電腦 vs. 電腦」的對弈模式,比照前述 load average 的計算方式,規範針對下棋 AI 的 load 機制,並運用定點數來統計不同 AI 實作機制對系統負載的使用率,整合到畫面輸出,開發過程中應有對應的數學分析和羅列實作考量因素。

  • 對弈的過程中,要在螢幕顯示當下的時間 (含秒數),並持續更新。

  • 原本 kxo 棋盤繪製在核心模式,你應該修改程式碼,使畫面呈現的部分全部在使用者層級,且善用 bitops,降低核心和使用者層級之間的通訊成本,應給予相關量化

  • jserv/ttt 移植 reinforcement learning (RL) 到 kxo 核心模組,使其得以動態切換。

ksort: 處理並行排序的 Linux 核心模組

$ uname -r

6.11.0-21-generic

sudo apt install linux-headers-`uname -r`

linux-headers-6.11.0-21-generic is already the newest version (6.11.0-21.21~24.04.1).

$ ls -l /dev/sort
$ cat /sys/class/sort/sort/dev

511:0

觀察 sort_mod.c

搜尋與 dev 也就是 device 相關程式碼,對應函式為 sort_init()

if (alloc_chrdev_region(&dev, 0, 1, DEVICE_NAME) < 0)
        return -1;

其中 alloc_chrdev_region,根據 google 搜尋,功用是向linux kernel申請設備號。

  • 給定次設備號和名字,告訴linux要申請幾個設備號,讓linux直接幫你尋找主設備號
  • 而我的核心分配給我 511.0

ksort 核心模組內部

觀察程式如何與 kernel 互動

user.c

ksort 設計為一個 character device 能夠循序存取檔案,定義相關的函式,可利用存取檔案的系統呼叫以存取 (即 open, read, write, mmap 等等)。因此,使用者層級 (user-level 或 userspace) 的程式可透過 write 將資料傳入核心空間,並透過 read 系統呼叫讀取排序後的結果。

int fd = open(KSORT_DEV, O_RDWR);
    
    size_t n_elements = 1000;
    size_t size = n_elements * sizeof(int);
    int *inbuf = malloc(size);
....
    for (size_t i = 0; i < n_elements; i++)
        inbuf[i] = rand() % n_elements;
....
    ssize_t w_sz = write(fd, inbuf, size);
....
    ssize_t r_sz = read(fd, inbuf, size);
    if (r_sz != size) {
        perror("Failed to read character device");
        ret = EXIT_FAILURE;
        goto error_rw;
    }
  • 首先,透過 系統呼叫 open() 開啟裝置。
  • 接下來準備 1000個待排序的資料,malloc() 記憶體空間後,產生的是 0~999的亂序隨機整數放入 inbuf[]
  • 透過 系統呼叫 write() 寫入 /dev/sort
  • 最後以 read() 讀回以排序好的陣列。

sort_mod.c

當使用者執行 read 操作時,會觸發 fops 中自定義的 read 函式 sort_read。該函式先檢查 size 是否與之前透過 write 寫入的資料長度一致,若不一致則直接返回。接著透過 kmemdup 建立一份排序用的緩衝區,並呼叫實際的排序函式 sort_main(目前僅支援整數陣列,使用 num_compare 作為比較器)。排序完成後,再利用 Linux 核心的 copy_to_user 將排序結果回傳至使用者空間。

ssize_t sort_write():

tmp = kmalloc(size, GFP_KERNEL);
  • 以設置的 size 做 kmalloc(),與 malloc() 相似,以位元組大小的區塊形式取得 kernel 記憶體,對於大多數核心分配來說,kmalloc()介面是首選,是kernel空間專用的記憶體分配函式。
len = copy_from_user(tmp, buf, size);
  • 透過 copy_from_user() 將資料從使用者層面複製進 kernel.
kfree(user_data);
user_data = tmp;
user_data_size = size;
  • user_data 指向了最近一次 write() 的資料,kfree() 將資料釋放,將 user_data 指向 tmp ,也就是新的資料,並且更改 user_data_size 為新的 size.
return size;
  • 最後將資料的 size 做回傳。

ssize_t sort_read:

sort_buffer = kmemdup(user_data, size, GFP_KERNEL);
  • kernel 函式 kmemdup() 複製 user_data 的資料做排序,避免直接修改共用資料,防止其他程序同時執行導致資料錯誤。
sort_main(sort_buffer, size / es, es, num_compare);
  • 開始呼叫排序 sort_main()
len = copy_to_user(buf, sort_buffer, size);
  • 將結果回傳到使用者層 copy_to_user().
kfree(sort_buffer);
  • 最後釋放記憶體 kfree().

Linux 核心的並行處理

simrupt

simrupt 專案名稱由 simulate 和 interrupt 二個單字組合而來,其作用是模擬 IRQ 事件。

IRQ(Interupt Request):

  • 硬體裝置發送中斷訊號給中斷控制器(Interrupt Controller),中斷控制器根據來源產生一個中斷 ID。CPU 停止目前執行中的程式,將其狀態儲存到 PCB(Process Control Block),然後跳轉執行對應的 ISR(Interrupt Service Routine,中斷服務程式)以處理中斷事件。

在 Linux 中,中斷處理分為兩個階段:

  1. Top-half ,處理較緊急的事情,由硬體中斷觸發,將資料放進 fast_buf 中。
  2. Bottom Half,延後處理不緊急的工作。
  • Tasklet 屬於 softirq 的一種實作,做 將剩餘不緊急的任務作排程(也就是實作後續的處理(Bottom half))。
  • workqueue 將較花時間的任務都放在這裡作,可以作較複雜的任務,使用queue_work()排入 kernel thread 處理。

如何模擬 IRQ :

  • 設定 timer ,在 timer_handler() 中做週期性的 timer 進行中斷。

    目標是模擬 hard-irq,所以必須確保目前是在 softirq context,欲模擬在 interrupt context 中處理中斷,所以針對該 CPU disable interrupts。

  • process_data()中模擬 softirq ,使用 fast_buf_put()快速將資料放入 buffer 中,並且設定 tasklet 程序排程 tasklet_schedule().

  • simrupt_tasklet_func() 中,使用 queue_work() 將後續需要較長處理時間的任務委派給 workqueue。tasklet 本身不能進行會睡眠或阻塞的操作,因此將工作交給 workqueue 執行.

  • 建立新的 workqueue ,simrupt_workqueue = alloc_workqueue("simruptd", WQ_UNBOUND, WQ_MAX_ACTIVE),而根據官方文件可以了解:

    There are two worker-pools (normal / high-pri) for each CPU and extra pools for unbound queues; the number is dynamic

    以及

    Special-purpose threads, called
    kworkers, execute the functions off of the queue … These worker threads are managed in worker-pools

系統為每顆 CPU 建立兩個 worker-pool(normal 與 high-priority),此外也會為 unbound queue 建立額外的 global worker-pools,其數量是動態調整的。

這些任務最終會由稱為 kworker 的特殊 kernel thread 執行。這些 kworker 是由 worker-pool 管理的,並會負責從 queue 中取出任務並執行。

  • alloc_workqueue() 建立一個 unbound worker-pool,假如沒有 worker (pool 應該都跟著一個 worker),呼叫kthread_create(),給予一個 kthread.

  • workqueue 將需要複雜運算的任務交給從 worker-pool 中動態指派的 kthread,將 fast_buf (top_half資料)的東西則交給 kfifo.

  • 接下來在 simrupt_read() ,假如 queue 非空,則 read 內容給使用者,否則就持續 sleep.

對奕的核心模組

但這裡會遇到一個問題,該如何判斷輪到誰執行?單純用 mutex lock 是不足夠的,因為有可能某方連續兩次都搶到該 lock ,使得它可以連續下兩步

所以當 CPU0 下完棋,解除 mutex(lock) ,但是此時可能還有剩餘的 time slice 以及處於執行狀態,導致在 queue 中的 CPU0 有可能被 CFS 放在靠前的位置,之後又被 schedule選中 。

MCTS

主要目的:在 tree 中找 path ,哪一條 path 的勝率越高,就是最佳走法。

  • Node: 遊戲局面/狀態。

  • subnode: 在該局面採取的下一步。

  • UCT(Upper Confidence Bound for Trees):

    UCT(i)=wini+ClnNni

    • i:要評分的 subnode.
    • wi
      : 我方獲勝次數。
    • ni
      : 該 node 造訪次數。
    • N : parent(當前node)的造訪次數。
    • C : hyper-parameter ,常取平方根做計算經驗值。

演算法有以下步驟:

  1. Selection:為了從 tree 中找到最值得選的 path node ,從 root 開始重複挑 UCT 值高的 node 直到遇到未展開(expansion)的 node .
  2. Expansion: 展開所選 node 們,此時是隨機挑選的。
  3. Simulation : 計算此 path 的勝率,也就是從目前新節點開始的 path 隨機的把此 path 走到底,得到勝(1)或負(0)。
  4. Back-propagation: 把 Simulation 的結果回饋到 path 上的每個 node ,也就是假如每個 node 的造訪次數是
    ni
    則若模擬時為勝利,則
    wi
    ++.

重複以上步驟,直到時間或迴圈次數結束,最後在挑造訪次數或是勝率最高的 subnode 做實際的下手。

negamax演算法

在了解 negamax 之前,要先了解 minimax 演算法。

所謂的 minimax ,就是最大化自己的利益以及最小化對手的利益,分為 min 與 max 層。

  • min 層:由對手選擇,使我方利益最小的選擇。
  • max 層:由我方選擇,使我方利益最大的選擇。
Func minmax(node)
    if node is a terminal node //目前可推斷上限層數
        return value of node
    if node is in min_layer
        best_VALUE = + ∞
        for all child_node
            best_VALUE = min(best_VALUE, minmax(child_node))
    else
        best_VALUE = - ∞
        for all child_node //in max layer
            best_VALUE = max(best_VALUE, minmax(child_node))
    return best_VAαβLUE

在此延伸出了 αβ with minimax 演算法,新增了 αβ 參數:

  • α:我方使用,每此我方找到我方最大利益值,將其存入α中。
  • β:對方使用,每次對方找到我方最小利益值,將其存入β。
  • 剪枝:當 α >= β ,有就是這條分支對對手(MIN)來說已經夠壞,最多只能迫使根玩家(MAX)拿到 ≤ β 的分數,而 MAX 已確定至少能拿到 ≥ α;既然 α 已不小於 β,對手沒必要再深入搜尋其它兄弟節點,因為無論怎麼下都不會讓 MAX 得分低於 β。於是可直接停止遍歷後續兄弟節點(剪枝)。
Func ABminimax(node, alpha, beta)
    if node is a terminal node //目前可推斷上限層數
        return value of node
    if node is in min_layer    //對手
        best_VALUE = + ∞
        for all child_node
            value = ABminimax(child_node, alpha, beta)
            best_VALUE = min(best_VALUE, value)
            beta = min(beta, best_VALUE)
            if alpha >= beta //剪枝條件
                break        //剪掉右子樹
    if node is in max_layer    //我方
        best_VALUE = - ∞
        for all child_node
            value = ABminimax(child_node, alpha, beta)
            best_VALUE = max(best_VALUE, value)
            alpha = max(beta, best_VALUE)
            if alpha >= beta //剪枝條件
                break        //剪掉右子樹

而所謂的 negamax 演算法,就是 minimax 演算法的一種,差別在於:

  • 每一層都是 max 層。
  • 每一層在比較利益前,將每個數值都加上負號,才開始決定哪一個值利益較大。
  • 此時 max(原min) 層為負值,到下一層 max 層,返回值又取負號一次,變為正,可發現 max 層還是原來的 max 層。
Func negamax(node)  
    if node is a terminal node //目前可推斷上限層數
        return value of node
    best_VALUE = - ∞
    for all child_node
        best_VALUE = max(best_VALUE, -negamax(child, depth-1, -color))

可發現程式碼更加簡潔,只需作單一迴圈判斷,並且在 遞迴返回值的時候加上負號,在程式碼中再加上 color 輔助判斷目前是我方還是對手。

而在此之上延伸出 negamax with αβ,新增 α 和 β,由於 negamax 沒有區分 min 和 max 層,因此 α 和 β 的區分是以目前根是誰的視角為依據去判斷,也就是 root_player:

  • α:root_player 找出的目前最大利益值,也就是到目前為止,已經找到的最佳下限。
  • β:root_player 找出的目前最小利益值,也就是到目前為止,已經確定的最差上限。
  • 剪枝:當 α >= β ,有就是這條分支對對手(MIN)來說已經夠壞,最多只能迫使根玩家(MAX)拿到 ≤ β 的分數,而 MAX 已確定至少能拿到 ≥ α;既然 α 已不小於 β,對手沒必要再深入搜尋其它兄弟節點,因為無論怎麼下都不會讓 MAX 得分低於 β。於是可直接停止遍歷後續兄弟節點(剪枝)。
  • 其餘和 negamax 相同。
func ABNegamax(node, alpha, beta)
    if node is a terminal node //目前可推斷上限層數
        return value of node
    best_VALUE = - ∞
    for each child_node
        value = -ABNegamax(child_node, -beta, -max(alpha, best_VALUE))
        best_VALUE = max(best_VALUE, value)
    if best_VALUE >= beta
        break
    return best_VALUE

kxo 作業需求

作業目標 : 實作coroutine

coroutine 是為了不依賴多執行緒,甚至單一 CPU 執行多工任務。

當一個任務正在執行時,另一個任務到來,於是作業系統 (早期的術語稱為 monitor) 就會判斷是否切換到新的任務,若是,作業系統會保存目前的 context,然後切換到另一個任務。

根據專案 coro,使用 setjmp/longjmp 實現 context switch 。

task架構

task 架構中,設定自己的專屬 jmp_buf 使之後切換時可以儲存狀態,將暫存器和 stack pointer 儲存起來(之後要 return 更簡單),接下來導入 list 架構,用於 tasklist,n 為迴圈上限,i 為迴圈次數紀錄,task_name 紀錄任務名稱。

task_add

新增任務,將 task 架構中的 list 也就是 task->list 接入 tasklist 的尾端。也就是 task 進入 readyqueue.

task_switch()

當 task_list 非空,以 FIFO 得方法從 task_list 抓取任務,並且以 cur_task 紀錄,接下來就可以使用 longjmp 跳 coroutine (目前不確定如何執行),在任務生成後,必定有作 setjmp 因此,在這裡就可以作 longjmp 跳到指定的 coroutine.

schedule()

setjmp(sched),根據 setjmp 的功用,此時應該是在儲存 schedule 本身的 program counter 以及 stack pointer,方便之後可以跳回來,之後只要任何的 coroutine 執行 longjmp 即可跳到目前儲存的 setjmp ,while (ntasks-- > 0)表示目前啟動執行的任務為 0 ,則馬上取 tasks[i] 以及 argsi,呼叫啟動,作一系列 setjmp/task_add,最後 longjmp 到 scheduler ,當未啟動執行的 task 都做好後,開始呼叫 task_switch() 從 list 中挑下一個運行的 coroutine。
也就是分成兩次,第一次進入 scheduler 啟動任務,之後會 longjmp(sched,1),回 scheduler 並且 task_switch() 做下一個 task.

任務函式 - task0

以 task 架構做記憶體 malloc() ,設置 setjmp(task->env),最後將自己加入 task_list 後做 longjmp(ched,1).
當 scheduler 又 longjmp 回來後進入 for 迴圈開始執行任務內容 Fibonacci.
在迴圈中,又設置一次 setjmp,印出計算結果,task_add -> task_switch(),自主交出控制權。

任務函式 - task1

邏輯和 task0 相同,只有在迴圈設置上有所不同。

作業目標:顯示當下的時間

在 xo-user.c 中,select() 迴圈中接收棋盤資料,並且印出 draw_buffer ,可在這裡實行時間顯示。
使用函式庫 time.h,取得現在系統時間 time(NULL),設置當地時間資訊 struct tm *tm_info = localtime(&now);,換時區完成,設陣列 buffer ,儲存資訊,並且以 strftime ,轉換為看得懂的格式與字串。
而每一輪的 select() 取得新的棋盤資訊時,也更新系統時間並且同步(與棋盤)更新於螢幕上。

作業目標:將棋盤繪製在核心模式

這一部份詳細要求:

從原本的 ASCLL 轉換成使用 bitops 內容,總共有 9 格,每一個顯示的內容有: (1). O (2). X (3). 空,也就是沒一格需要 2 bits 顯示,共需要18 bits 顯示完整的內容,這樣改動後,平均 read size 從 66 bytes -> 4 bytes 下降了 97% ,字串格式轉換成位元運算節省了大量運算。

  • render_board()做棋盤的輸出,目前資料型態態 4bytes ,轉換成 ASCII 棋盤,也就是可讀棋盤。
    ​​​​static const char symbol[4] = " XO?";
    
    • 設置 sysmbol, 因為是 2bits ,所以有4種可能: X O ? .
    ​​​​printf("\033[3;1H");
    
    • 將棋盤放到第 3 行第 1 欄(第 1 行放時間)
    ​​​​symbol[(frame >> (row * 6 + 0)) & 0x3],   /* row0 */
    ​​​​symbol[(frame >> (row * 6 + 2)) & 0x3],   /* row1 */
    ​​​​symbol[(frame >> (row * 6 + 4)) & 0x3]);  /* row2 */
    
    • 計算方法 : 每一格佔 2bits ,一列有 3 格,第 0 格位移 0,第 1 格位移 2,第 2 格位移 4 ,也就是說一列 3 格共 6bits ,因此位移一列為 6bits,所以第一列為 row*0;第二列為 row*1以此類推

將棋盤繪製轉移到 user-space,也就是說 kernel 不再呼叫 draw_board(),因此 kfifo 的讀寫降低(只需 4 byte)。

  • board_to_mask(): 將9格的棋盤壓成 32bits

    • u32 m = 0 設置初始值 32bits 之 m 為 0,也就是將整個棋盤淨空。
    • 設置迴圈9次每一圈可以處理 1 格,在迴圈中,抓取的 table[i] 可能是 '空格'、'O'、'X',轉換成 2bits 值,
      (00)2
      = '空格'、
      (01)2
      = 'X'、
      (10)2
      = 'O'。
    • 因為 9 個格子共佔 18bits ,因此一輪做完 m 要位移 2,使用 OR 做而不是用累加,可以避免進位。
    • 舉例:
      ​​​​​​​​ X |   | O
      ​​​​​​​​-----------
      ​​​​​​​​   | O | X
      ​​​​​​​​-----------
      ​​​​​​​​   |   |  
      
      i table[i] v (2-bit) shift v<<shift (binary)
      0 'X' 01 0 0000 0000 0000 0001
      1 '空格' 00 2 0000 0000 0000 0000
      2 'O' 10 4 0000 0000 0001 0000
      3 '空格' 00 6 0000 0000 0000 0000
      4 'O' 10 8 0000 0001 0000 0000
      5 'X' 01 10 0000 0100 0000 0000
      6-8 '空格' 00 12–16 0
    • 最後多了一個 第18bits標示輪到誰。
  • produce_board_mask() 將 4byte的棋盤放入 kfifo

根據 main.c 中的,static DECLARE_KFIFO(rx_fifo, unsigned char, 4096),表示 kfifo 是一個 4096 byte 的 buffer,而程式將讀取的資料存進此 buffer 中,也就是 kfifo_in(),之後, 再 kfifo_out()把同樣的 byte 內容(written)輸出到 user-space,而一份棋盤資訊就是 4byte = 1 frame,就直接將 1個 frame 放入 kfifo 中,免得資訊不齊全。

  • 假如發生意外狀況,kfifo 意外剩下的 byte 數,不能跟 4byte 對齊,將 written 設 0 ,並設置dmesg中,輸出 warning (方便檢查驗證)。
  • 在後續 draw_board_func(),取代原始 code 改為 produce_board(),直接套用原本的消費者與生產者 mutex(lock),防止讀寫時拿到不同步的錯誤資料。
    bcab932

作業目標:多場對弈資料的過程

main.c 中,有兩個函式 ai_one_work_func() 以及 ai_two_work_func(),這裡是 AI 對弈的地方,所以我在這裡記錄每一步的對弈,並且呼叫自行設置的 record_move(),儲存進 game_history() 中。

static void record_move(int pos, char turn)
{
    static const char cols[] = "ABC";           
    int row = pos / BOARD_SIZE;                    
    int col = pos % BOARD_SIZE;

    move_pos += scnprintf(game_history[current_game] + move_pos,
                          MAX_MOVES_LEN - move_pos,
                          "%c%d -> ", cols[col], row + 1);
}
  • 為了呈現: Moves: B2 -> C2 -> C3 -> D3 -> D4目前實作的為 3x3 ,所以只有設置 3 行數的字母 ABC ,而自 ai_work_func() 會傳來目前是這一手是下在哪一格,呼叫 record(move,turn) 做傳遞,此時函式會將對戰資訊存入 game_history()

  • 為了紀錄目前的對弈過程的遊戲長度,因為Moves: B2 ->會隨著遊戲進行而變長,為了方便下一次呼叫 reocrd_move() 可以指向正確的位置,使用 scnprintf() 函式,此函式在寫入後會回傳實際寫入字節數,這樣的用意為,舉例:

呼叫次序 寫入前 move_pos 寫入內容 scnprintf 回傳 寫入後 move_pos history現況
1 0 B2 -> 5 5 B2 -> \0
2 5 C2 -> 5 10 B2 -> C2 -> \0
3 10 C3 -> 5 15 B2 -> C2 -> C3 -> \0
  • 此外也設置了最大的移動步數 MAX_MOVES_LEN ,方便 scnprint() 知道目前剩餘的可用的字節空間,防止記憶體爆炸,具體實踐 MAX_MOVES_LEN - move_pos

當一局結束後,會呼叫 end_game_record(),主要功能將 game_history() 轉成文字,並且輸入進 kfifo buffer 中,顯示在使用著畫面上。

static void end_game_record(char win)
/* win = 'X' | 'O' | 'D' (draw) */
{
    if (move_pos >= 4)              /* 去掉最後 " -> " */
        move_pos -= 4;

    move_pos += scnprintf(game_history[current_game] + move_pos,
                          MAX_MOVES_LEN - move_pos,
                          " [%s]\n",
                          win == 'X' ? "X win" :
                          win == 'O' ? "O win" : "D");

    /* 把整局文字送進 kfifo */
    mutex_lock(&consumer_lock);
    kfifo_in(&rx_fifo,
             (const unsigned char *)game_history[current_game],
             move_pos);
    mutex_unlock(&consumer_lock);

    /* 準備下一局 */
    current_game = (current_game + 1) % MAX_GAMES;
    move_pos = 0;
}
  • end game 後,追加輸出結果內容,而根據原有程式碼紀錄的 win 參數,輸出本局結果,舉例:
    A1->B1->....[X win]
  • end game 後,當輸出都結束,將 current_game 清空, move_pos 清0,使用環狀queue的方法做清空,也就是 (current+1) % MAX_GAMES = 0。

在設置完後出現 bug,對戰過程與結果無法出現,最後發現,原本在繪製棋盤到使用者層時,使用 bitmask 方法取代 ASCII,當時只有設 4 byte ,在當時確實就足夠了,但是現在 kfifo 又需要輸出對戰結果的文字,這明先超出 4byte 的限制,因此必須將 4byte 擴大,而在擴大後,也確實解決了問題。

3091c1a

  • 原專案:

    Screenshot from 2025-07-01 21-37-18

  • 目前專案:

    Screenshot from 2025-07-01 21-36-33

目前修改了 frame 大小,不再是 4byte大小 ,因此使用 strace 分析使用的平均資料大小,根據呼叫次數以及總使用量,得到的平均數可以看出仍然降低了通訊成本。

TODO: 觀摩其他學員的成果

紀錄你的啟發、你進行的驗證,以及後續改進