Try   HackMD

Linux 核心專題:排程器原理

執行人: jeremy90307

解說影片

錄影解說 : Linux 核心專題:排程器原理 (youtube)

任務簡介

強化對並行程式設計的認知。

Reviewed by eleanorLYJ

討論 false sharing 的敘述不夠精確,應強調 CPU0 與 CPU1共享了同一塊的 cacheline,另外還有因為 cache coherence 機制,才導致效能降低

圖中有兩執行緒,當 CPU0 不斷的改變變數 X , CPU1 的 cache 該變數會變為無效,即使他並沒有修改此變量。這會導致頻繁的高速緩存無效和重新載入操作,從而降低性能

我確實在 false sharing 這裡解釋不夠完善,我會再將其補進我的另個筆記,但在這裡我主要是放上我在閱讀教材時遇到的問題,感謝同學的指教。
jeremy90307

TODO: 閱讀〈並行和多執行緒程式設計〉系列講座並紀錄問題和提出改進

因系列講座內容較多,因此另外開個筆記紀錄 -> 研讀〈並行和多執行緒程式設計〉系列講座

集中於本頁面,專注於「紀錄問題」和「改進/討論」,已在教材出現的內容就不用建立副本

概念

多工處理

  • 循序式程式(sequential program)
    • 時間與程式的執行同一方向,在一特定時間僅有一項工作進行
  • 前景/背景式程式(foreground / background program)
    • 具備中斷
    • 須確保資料的一致性問題
  • 多工程式 (multi-tasking program)
    • 並行程式設計,在系統中若包含多個 CPU 或不對等 (heterogeneous) 的執行單元時,必須將工作分散至各執行單元同時執行
    • 理想情況上若各子工作可以完全獨立運行,則系統將可達到其最高效率。但各執行單元間彼此仍需共用系統資料(如記憶體、I/O 等)

問題 1
不對等 (heterogeneous) 的執行單元,代表的是什麼意思? 在系統中會有不對等是指優先順序嗎?

問題 2
同進進行的說法是否有瑕疵?因為在並行程式中使用快速切換來達到看似同時切換而已。

我整理多重程式、多工、多執行緒、多重處理的表格如下

Multiprogramming 多重程式 Multitasking 多工 Multithreading 多執行緒 Multiprocessing 多重處理
定義 在單一 CPU 上執行多個程式 在單一 CPU 上執行多個工作 在單一工作上運行多個執行緒 在多個 CPU 上運行多個行程
共享資源 資源(CPU、記憶體)在程式之間共享 資源(CPU、記憶體)在工作之間共享 資源(CPU、記憶體)在執行緒之間共享 每個行程都有自己一組的資源(CPU、記憶體)
排程 使用循環或基於優先權的排程為程式分配 CPU 時間 使用基於優先權或時間分割的調度為工作分配 CPU 時間 使用基於優先權或時間分割的調度為執行緒分配 CPU 時間 每個行程可以有自己的排程演算法
記憶體管理 每個程式都有自己的記憶體空間 每個工作都有自己的記憶體空間 執行緒在工作內共享記憶體空間 每個行程都有自己的記憶體空間
工作切換 需要工作切換才能在程式之間切換 需要工作切換才能在工作之間切換 需要工作切換才能在執行緒之間切換 需要工作切換才能在行程之間切換
Inter-Process Communication (IPC) 使用訊息傳遞或共享記憶體進行 IPC 使用訊息傳遞或共享記憶體進行 IPC 使用 IPC 的執行緒同步機制(例如 lock、semaphores) 使用 IPC 的進程間通訊機制(例如 pipes、sockets)

coroutine 的實作是否涵蓋多工與多執行緒,雖然單一 CPU 上執行多個工作,但工作內容(如 qsort_r )不是應該確保 thread safe 的情況嗎?

下圖為單一 CPU 同一時間只執行一個任務,但利用快速切換的方式,看似一次執行多的任務。

Image Not Showing Possible Reasons
  • The image was uploaded to a note which you don't have access to
  • The note which the image was originally uploaded to has been deleted
Learn More →

Image Not Showing Possible Reasons
  • The image was uploaded to a note which you don't have access to
  • The note which the image was originally uploaded to has been deleted
Learn More →

頻繁的快速切換帶來的優勢?

排程器原理

Atomics 操作

簡述 race condition 的問題

假設兩執行緒各自將全域變數的值 +1

Thread 1 Thread 2 value
0
read <- 0
increase 0
write back -> 1
read <- 1
increase 1
write back -> 2

若沒有使用 lock 或是同步機制,可能遇到下面問題

Thread 1 Thread 2 value
0
read <- 0
read <-
increase 0
increase 0
write back -> 1
write back -> 1

結果並非預期的 2 而是 1。

上述問題的解決辦法有使用

  • 阻塞式同步 : mutex lock
    • 須考慮到 priority inversion (優先權反轉)的問題
    • 解決方法:Priority Inheritance 、 Priority Ceiling
  • 非阻塞式同步 : Atomic 指令操作,比如 compare-and-swap (CAS)
int CAS(int *reg, int old, int new)
{
    ATOMIC();
    int tmp = *reg;
    if(tmp == old)
        *reg = new;
    END_ATOMIC();
    
    return tmp;
}

若有 Atomic 指令來確保資料的一致性,為何還需要用到 mutex lock 等方式?能非阻塞就不會影響到其他執行緒的進行不是更有效率嗎?

false sharing (偽共享)

Image Not Showing Possible Reasons
  • The image was uploaded to a note which you don't have access to
  • The note which the image was originally uploaded to has been deleted
Learn More →

圖中有兩執行緒,當 CPU0 不斷的改變變數 X , CPU1 的 cache 該變數會變為無效,即使他並沒有修改此變量。這會導致頻繁的高速緩存無效和重新載入操作,從而降低性能。

問題

如何做到抑制 false sharing ? 共筆中有提到使用對齊的解決方法,但為何對齊 16 就可以避免 false sharing ?

考慮以下程式碼:

alignas(16) int x;

printBinary(&x);
printf("%ld\n", alignof(x));

對應的執行輸出:

_0000_0000_0000_0000_0000_0000_1010_1101_1001_0100_1111_1111_1111_1000_1101_0000
16

因為 int x 變數的地址會對齊 16 (單位:位元組_,亦即

24,在其二進位表示中,低位元處 4 個位元皆為 0,此處使用 alignas 以避免 false sharing。

TODO : 尚未閱讀 Memory Ordering 和 Barrier、處理器架構和其 Memory Order、wait-free & lock-free

POSIX Thread

在此章節主要跟著教材學習 pthread 相關程式設計。

研讀〈並行和多執行緒程式設計〉_POSIX Thread 連結有相關實作說明

〈實作輕量級的 Mutex Lock〉

測試和驗證中補充了些筆記即註解,使這個實作更加易懂

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

mutex/example/main.c

模擬情境:
給定 16 個工作節點

n0~
n16
,他們彼此間具備從屬關係的:
nk
的親代節點是
nk1
。每個節點需要都等待親代節點就緒 (ready) 方可進行下一階段的工作 (
n0
沒有親代節點,因此無此限制)。

16 個節點共享一個 clockclock 從 tick = 1 開始累計,節點在每個 tick 都有必須完成的事:在偶數 tick 時想像成是完成階段任務,可改變自身狀態為就緒並通知其子節點繼續後續任務,而在奇數 tick 時則推動時間讓 tick += 1。

結構體

struct clock {
    mutex_t mutex;
    cond_t cond;
    int ticks;
};

struct node {
    struct clock *clock;   // Each node points to a shared clock
    struct node *parent;   // Point to the parent node
    mutex_t mutex;
    cond_t cond;
    bool ready;            // Proceed to the next node only when ready is true
};
  • clock 中的 ticks 決定是否繼續下一步驟,同步上使用 condition variable + mutex 機制去通知此事件,且通知應該是用 broadcast 方式讓所有正在等待的執行緒得知

在 POSIX_Thread 中 condition variable 的實作中因只有一個執行緒在等待,使用 pthread_cond_signal

  • nodeready 主要讓結點能進行下個工作,因此當親代節點就緒時,主動通知此事。同樣需要 condition variable + mutex 機制,但以 signal 方式通知子節點就好。

thread_func(void *ptr)

static void *thread_func(void *ptr)
{
    struct node *self = ptr;
    bool bit = false;

    for (int i = 1; clock_wait(self->clock, i); ++i) {
        printf("Thread [%c] | i : %d\n", self->name, i);
        if (self->parent){
            printf("Thread [%c] wait parent\n", self->name);
            node_wait(self->parent);
        }

        if (bit) {
            printf("Thread [%c] send signal\n", self->name);
            node_signal(self);
        } else {
            printf("Thread [%c] trigger clock\n", self->name);
            clock_tick(self->clock);
        }
        bit = !bit;
    }

    node_signal(self);
    return NULL;
}

main(void)

int main(void)
{
    struct clock clock;
    clock_init(&clock);

#define N_NODES 3
    struct node nodes[N_NODES];
    node_init(&clock, NULL, &nodes[0]);
    for (int i = 1; i < N_NODES; ++i)
        node_init(&clock, &nodes[i - 1], &nodes[i]);

    printf("\nParent releationship : NULL ");
    for(int i = 0; i < N_NODES; ++i)
        printf(" -> [%c] ", nodes[i].name);
    printf("\n");

    pthread_t threads[N_NODES];
    for (int i = 0; i < N_NODES; ++i) {
        if (pthread_create(&threads[i], NULL, thread_func, &nodes[i]) != 0)
            return EXIT_FAILURE;
    }
    printf("Tick start~\n");
    clock_tick(&clock);
    clock_wait(&clock, 1 << N_NODES);
    clock_stop(&clock);
    printf("\nTick stop~\n");

    for (int i = 0; i < N_NODES; ++i) {
        if (pthread_join(threads[i], NULL) != 0)
            return EXIT_FAILURE;
    }

    return EXIT_SUCCESS;
}

main 執行緒呼叫第一個 clock_tick 來讓 tick 變為 1,這樣其他執行緒就可以開始根據 tick 逐步進行。而這裡的 clock_wait 會一直等待 tick1 << N_NODES 之後再用 clock_stop 來讓節點的執行緒得以結束。

稍微改寫 main.c 使其更容易了解每個執行緒的進行過程,以下為輸出結果:

在執行程式時會出現下面錯誤,想請問為何會有這樣的問題?

FATAL: ThreadSanitizer: unexpected memory mapping 0x5c9bd4d2b000-0x5c9bd4d4b000

輸入 sudo sysctl vm.mmap_rnd_bits=28 來解決問題

參考 FATAL: ThreadSanitizer: unexpected memory mapping when running on Linux Kernels 6.6+

Parent releationship : NULL  -> [A]  -> [B]  -> [C] 
Tick start~

============clock_tick() tick : 1============
Thread [C] | i : 1
Thread [C] wait parent
Thread [B] | i : 1
Thread [B] wait parent
Thread [A] | i : 1
Thread [A] trigger clock

============clock_tick() tick : 2============
Thread [A] | i : 2
Thread [A] send signal
Thread [A] becomes not ready.
Thread [B] trigger clock

============clock_tick() tick : 3============
Thread [B] | i : 2
Thread [B] wait parent
Thread [A] | i : 3
Thread [A] trigger clock

============clock_tick() tick : 4============
Thread [A] | i : 4
Thread [A] send signal
Thread [A] becomes not ready.
Thread [B] send signal
Thread [B] | i : 3
Thread [B] becomes not ready.
Thread [C] trigger clock

============clock_tick() tick : 5============
Thread [B] wait parent
Thread [A] | i : 5
Thread [A] trigger clock
Thread [C] | i : 2
Thread [C] wait parent

============clock_tick() tick : 6============
Thread [A] | i : 6
Thread [A] send signal
Thread [A] becomes not ready.
Thread [B] trigger clock

============clock_tick() tick : 7============
Thread [B] | i : 4
Thread [B] wait parent
Thread [A] | i : 7
Thread [A] trigger clock

============clock_tick() tick : 8============
Thread [A] | i : 8
Thread [A] send signal

Tick stop~
Thread [A] becomes not ready.
Thread [B] send signal
Thread [B] becomes not ready.
Thread [C] send signal
  • 一開始 main 執行緒 clock_tick 先將 tick 增加 1 接著進入 clock_wait 中進行等待,一直到 clock->ticks 等於
    2NNODES
  • 假設
    NNODES
    為 3 ,分別有 Thread [A]Thread [B]Thread [C] 來執行這三個 node ,且他們之間的關係為 NULL -> [A] -> [B] -> [C]
  • 接著每個 node 進入 thread_func ,且預設的 bitfalse ,每次迭代都會進行反轉,使其執行不同工作內容
    • Thread [A] 因沒有親代節點關係,跳過等待親代節點 ready 的過程,也因此在每次 tick 增加後都能看 Thread [A] 不斷的執行
============clock_tick() tick : 2============
Thread [A] | i : 2
Thread [A] send signal
Thread [A] becomes not ready.
Thread [B] trigger clock

在 tick : 2 中

  • Thread [A] 發送節點的訊號喚醒正在等待的 Thread [B] (喚醒時 Thread [A] 轉變為 ready 因此 Thread [B] 才能執行工作,此時 Thread [A] 交棒給 Thread [B] 後再次轉變為 not ready)
  • 此時的 Thread [B] 因第一次執行工作 bit = false ,所以進行 tick 加一的工作
============clock_tick() tick : 3============
Thread [B] | i : 2
Thread [B] wait parent
Thread [A] | i : 3
Thread [A] trigger clock

在 tick : 3 中

  • Thread [A] 尚未 ready ,因此 Thread [B] 再次進入等待親代節點的狀態
  • Thread [A] 不受親代節點影響繼續執行工作
============clock_tick() tick : 4============
Thread [A] | i : 4
Thread [A] send signal
Thread [A] becomes not ready.
Thread [B] send signal
Thread [B] | i : 3
Thread [B] becomes not ready.
Thread [C] trigger clock

在 tick : 4 中

  • Thread [A] 發送節點的訊號喚醒正在等待的 Thread [B] (喚醒時 Thread [A] 轉變為 ready 因此 Thread [B] 才能執行工作,此時 Thread [A] 交棒給 Thread [B] 後再次轉變為 not ready)
  • Thread [B] 因為 bit 的關係工作內容變成發送節點訊號喚醒正在等待的 Thread [C]

後續都是相似的步驟,因此不再贅述

最後在 tick : 8 中, clock->ticks 等於

2NNODES 喚醒 main 執行緒,接著進入 clock_stop(&clock) 將 ticks 的值設為 -1,導致在執行緒函式中 for 迴圈中止。

為何在 thread_func 最後需要在進行 node_signal(self)

補充筆記:實作 Priority-inheritance mutex
mutex/pi-test/main.c 函式中主要建立一個 Priority inversion 的情況

定義一個具集 TRY(f) 用來檢視錯誤,其中 #f 將參數 f 轉換為字符串,這樣可以在錯誤訊息中顯示出具體是哪個函式失敗。

#define TRY(f)                                                  \
    do {                                                        \
        int __r;                                                \
        if ((__r = (f != 0))) {                                 \
            fprintf(stderr, "Run function %s = %d\n", #f, __r); \
            return __r;                                         \
        }                                                       \
    } while (0)

ctx_init 函式中的 mutexattr_setprotocol 設置了優先級繼承,這樣可以避免優先級反轉。

static void ctx_init(struct ctx *ctx)
{
    mutexattr_t mattr;
    mutexattr_setprotocol(&mattr, PRIO_INHERIT);
    mutex_init(&ctx->m0, &mattr);
}
#include <pthread.h>

int pthread_mutexattr_setprotocol(pthread_mutexattr_t *attr, int protocol);

attr : 指向互斥鎖屬性的指標
protocol : 協議屬性

  • PTHREAD_PRIO_INHERIT : 使用優先級繼承
  • PTHREAD_PRIO_PROTECT : 使用優先級上限
  • PTHREAD_PRIO_NONE : 不使用優先級繼承或上限的策略

執行

  • 必須透過 sudo 執行,因為牽涉對執行緒優先權的調整,需要 root 權限
  • 必須使用 taskset 將程式鎖定在單一 CPU 上,否則多 CPU 資源的情況下,就沒辦法讓 middle 佔據唯一的 CPU。
sudo taskset -c 1 

問題 1
在研讀的過程中 pthread_create_with_prio 函式主要用作創建一個具有優先及的執行緒,當中 sched_param 的結構體又是從何而來?在完整程式碼中看起來沒有引用到排程器相關標頭檔。
部份程式碼

static int pthread_create_with_prio(pthread_t *thread,
                                    pthread_attr_t *attr,
                                    void *(*start_routine)(void *),
                                    void *arg,
                                    int prio)
{
    struct sched_param param;
    param.sched_priority = prio;

    TRY(pthread_attr_setschedparam(attr, &param));
    TRY(pthread_create(thread, attr, start_routine, arg));

    return 0;
}

pthread_setschedparam(3) — Linux manual page 找到 sched_param 結構體

struct sched_param {
    int sched_priority;     /* Scheduling priority */
};

pthread_attr_setschedpolicy(&attr, SCHED_FIFO)
pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED)
理解這兩 API 代表意思

int pthread_attr_setinheritsched(pthread_attr_t *attr,int inheritsched);

attr : 指向執行緒屬性的指標
inheritsched : 是否繼承父執行緒的排程屬性

  • PTHREAD_EXPLICIT_SCHED : 不繼承,新執行緒使用由 pthread_attr_setschedparampthread_attr_setschedpolicy 設置的排程屬性,執行緒
  • PTHREAD_INHERIT_SCHED : 繼承父執行緒的排程屬性

返回值:成功返回 0 ,失敗返回非 0 值

int pthread_attr_setschedpolicy(pthread_attr_t *attr, int policy);

attr : 指向執行緒屬性的指標
policy : 排程屬性

  • SCHED_FIFO : 搶佔式排程,即會被優先即更高的執行緒搶佔
  • SCHED_RR : 輪詢式排程,擁有同樣優先權的執行緒會以一固定時間量,又稱「時間配量」(time slice),用輪詢方式執行。
  • SCHED_OTHER : 為 Linux 中默認。僅用於靜態優先級為 0 的執行緒(i.e., threads under real-time policies always have priority over SCHED_OTHER processes),從靜態優先級 0 列表中選擇要運行的執行緒,由動態優先級決定(即 nice 值)。
    每個時間段中,當執行緒準備好運行但被排程器拒絕時,動態優先級會增加。這確保了所有 SCHED_OTHER 執行緒之間的公平進展。

返回值:成功返回 0 ,失敗返回非 0 值

sched(7) — Linux manual page

int pthread_attr_setschedparam(pthread_attr_t *restrict attr,
                              const struct sched_param *restrict param);

主要用作設定優先級

返回值:成功返回 0 ,失敗返回非 0 的值

問題 2
請問若沒有給定優先權的級別,系統的判定如何決定下個執行的執行緒?

TODO : 理解 nice 值、CFS

測試
主要測試排程策略相關問題。

完整測試程式 -> mutex/test_priority

基於問題 2 進行相關測試。設定三個執行緒,Thread 1 與 Thread 2 將優先級設置在 1 (其中優先級設置範圍 1 ~ 99),Thread 使用預設屬性(即 SCHED_OTHER),並進行兩種排程屬性:SCHED_FIFOSCHED_RR 測試

  • SCHED_FIFO
$ make run 
sudo taskset -c 1 /home/jeremy/posix_thread/mutex/test_priority/test_priority
1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1

在搶佔式的排程中,Thread 1 並沒有讓出資源給 Thread 2 執行。

問題 3
當這種靜態優先級與動態優先級同時出現時,如何決定動態優先級的執行緒何時能執行?在 SCHED_FIFO 的結果中看到 Thread 3 並沒有成功執行。

  • SCHED_RR
$ make run 
sudo taskset -c 1 /home/jeremy/posix_thread/mutex/test_priority/test_priority
1 1 1 1 1 1 2 1 2 3 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 3 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 3 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 3 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 3 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 3 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2

可以看到 Thread 1 與 Thread 2 將資源的平均分配給不同執行緒,Thread 3 的非即時執行緒根據動態優先級偶爾也能分配到資源。

〈建立相容於 POSIX Thread 的實作〉

  • Chain Blocking : 一個高優先權執行緒為了取得 n 個資源,被 n 個低優先權執行緒執行緒 block 住,這種情形就稱為 Chained Blocking
  • Deadlock : 不同優先級的執行緒們之間相互取得各自已經取得的資源,並且彼此等待對方釋放資源造成的 Deadlock

實作 priority protection mutex

priority protection mutex 設定 :
muthread_mutexattr_setprioceiling: 設定 mutex 的 prioceiling

    muthread_mutexattr_setprotocol(&mattr, TBTHREAD_PRIO_INHERIT);
    muthread_mutexattr_setprioceiling(&mattr, 25);
    muthread_mutex_init(&mutex_normal, &mattr);
    muthread_mutex_init(&mutex_normal_2, &mattr);

測驗程式 Tests/test-06-priority-inversion-PP.c 中,我們在已經設定好會發生 priority inversion 的情況下去實作出 priority protection mutex 是否會過於不實際,若面對到更複雜的情況下使用 PP 是否對效能真的有更多提升?該如何驗證?

我在問題中提到的不實際是因為我們已知任務的優先順序,故將某任務的優先級提升至某執行緒優先級之上,在系統中是否是常見的作法?

TODO: 針對〈並行程式設計:排程器原理〉設計相關實驗

以隨堂測驗和作業題目 (如 Timsort) 為展示標的,設計 preemptive coroutine 整合的相關實驗至少二個,達成並行處理並確認執行結果符合預期

在整合相關實作之前,先嘗試了解在 並行程式設計: 排程器原理 中 preemptive coroutine 的實作方法,才能延伸至其他案例中。

範例 : preempt_sched

  • preempt_count 用作決定是否允許搶佔,當大於 0 表示禁止搶佔

中斷處理

在了解中斷處理時出現幾個陌生的函式及結構體。

sigset_t 用來表示訊號集,其初始化方式有兩種:

  1. 使用 sigemptyset (sigset_t * set ) 指定訊號集為空
  2. 或者使用 sigfillset (sigset_t * set ) 指定訊號集為滿(即阻塞所有訊號),然後使用 sigdelset 單獨刪除指定的訊號(即不阻塞指定訊號),反之加入使用 sigaddset

sigprocmask -> ???

int sigprocmask(int how, const sigset_t *_Nullable restrict set,
                           sigset_t *_Nullable restrict oldset);

sigprocmask() is used to fetch and/or change the signal mask of the calling thread. The signal mask is the set of signals whose delivery is currently blocked for the caller

不太能理解這裡所謂的遮罩意思,描述裡提及可以透過 how 來改變訊號集的狀態

  • SIG_BLOCK:「阻塞」這些信號
  • SIG_UNBLOCK:「解除阻塞」這些信號
  • SIG_SETMASK:將當前的信號遮罩集設置為 set 指向的信號集,即「覆蓋」當前的信號遮罩集。
static void local_irq_save(sigset_t *sig_set)
{
    sigset_t block_set;
    sigfillset(&block_set);
    sigdelset(&block_set, SIGINT);
    sigprocmask(SIG_BLOCK, &block_set, sig_set);
}

-> 用作阻塞除了 SIGINT 以外的所有訊號,並保存原先的訊號狀態 sig_set

static void local_irq_restore(sigset_t *sig_set)
{
    sigprocmask(SIG_SETMASK, sig_set, NULL);
}

-> 恢復至原來的訊號狀態

sigprocmask 的第三個參數 oldset 為 NULL 時,表示不需要保存之前的訊號狀態

task
task_add() 加入一個新任務

首先使用 getcontext 來取得當前的 context,接著在函式中呼叫 makecontext 前必須先設置新的 stack 及下個 context 的地址,如下:

makecontext(3) — Linux manual page

    task->context.uc_stack.ss_sp = task->stack;
    task->context.uc_stack.ss_size = 1 << 20;
    task->context.uc_stack.ss_flags = 0;
    task->context.uc_link = NULL;

ss_flags -> 用處為何?

uc_link 為當前 context 執行結束後要執行的下個 context ,若為 NULL 則當前 context 執行完後結束。

在呼叫 makecontext 之前為何需要先設置 stack 大小 ss_size ?

Stack Overflow 當中解答提到 Because stack on some machine "grows toward numerically lower addresses" (like it does on x86 architecture and wiki x86) the makecontext needs/wants to place it data on the end of the stack. So it needs to determinate the end of stack and this is what ss_size is used for.

    makecontext(&task->context, (void (*)(void)) task_trampoline, 2, ptr.i[0],
                ptr.i[1]);

makecontext 先設置要輸入給指定函式 task_trampoline 的參數數量 2 及參數 ptr.i[0], ptr.i[1],使 context switch 時能執行特定函式。

注意:確保當新任務第一次被切換時,其 timer signal 是被阻塞的

sigaddset(&task->context.uc_sigmask, SIGALRM);

trampoline

Trampolines (sometimes referred to as indirect jump vectors) are memory locations holding addresses pointing to interrupt service routines, I/O routines, etc. Execution jumps into the trampoline and then immediately jumps out, or bounces, hence the term trampoline.

__attribute__((noreturn)) static void task_trampoline(int i0, int i1)
{
    union task_ptr ptr = {.i = {i0, i1}};
    struct task_struct *task = ptr.p;

    /* We switch to trampoline with blocked timer.  That is safe.
     * So the first thing that we have to do is to unblock timer signal.
     * Paired with task_add().
     */
    local_irq_restore_trampoline(task);
    task->callback(task->arg);
    task->reap_self = true;
    schedule();

    __builtin_unreachable(); /* shall not reach here */
}

noreturn 關鍵字告訴編譯器此函式不返回原來調用者函式的位置。

這裡問題有點大

task->callback(task->arg) ??? 貌似跑去執行排序任務一直到完成排序,不能理解為何

排程

static void schedule(void)
{
    sigset_t set;
    local_irq_save(&set);

    struct task_struct *next_task =
        list_first_entry(&task_current->list, struct task_struct, list);
    if (next_task) {
        if (task_current->reap_self)
            list_move(&task_current->list, &task_reap);
        task_switch_to(task_current, next_task);
    }

    struct task_struct *task, *tmp;
    list_for_each_entry_safe (task, tmp, &task_reap, list) /* clean reaps */
        task_destroy(task);

    local_irq_restore(&set);
}

next_task 使用 list_first_entry 選擇下一個被排程的任務(即為當前任務的下一個鏈節串列節點),若 next_task 存在使用 task_switch_to() 來進行 context switch 。

static void task_switch_to(struct task_struct *from, struct task_struct *to)
{
    task_current = to;
    swapcontext(&from->context, &to->context);
}

swapcontext 將任務從 from->context 切換到 to->context ,且 swapcontext 會保存當前 CPU 狀態。

swapcontext(3) - Linux man page

task_current->reap_self 表示任務是否完成,若完成則將其至於一個待清除的鏈節串列。

timer
在此案例中創建一個每 10 ms 發送 SIGALRM 訊號給行程來模擬中斷

static void timer_create(unsigned int usecs)
{
    ualarm(usecs, usecs);
}

ualarm(3) — Linux manual page

ualarm 第一個參數為第一次發送 SIGALRM 的時間間隔,第二個參數為間隔多久發送 SIGALRM

timer_cancel() 中設置 ualarm(0, 0),這樣 timer 會被取消。

static void timer_init(void)
{
    struct sigaction sa = {
        .sa_handler = (void (*)(int)) timer_handler,
        .sa_flags = SA_SIGINFO,
    };
    sigfillset(&sa.sa_mask);
    sigaction(SIGALRM, &sa, NULL);
}

sigaction 設置處理函式為 timer_handler

sigaction(2) — Linux manual page

問題:

看到 sa_flags 被設置為 SA_SIGINFO 時,查了他的作用卻還是不能理解這段話的意思,在結構體中為何不直接設置 sa_sigaction ,而要使用這種替代方式?

If SA_SIGINFO is specified in sa_flags, then sa_sigaction
(instead of sa_handler) specifies the signal-handling function
for signum. This function receives three arguments, as described
below.

           struct sigaction {
               void     (*sa_handler)(int);
               void     (*sa_sigaction)(int, siginfo_t *, void *);
               sigset_t   sa_mask;
               int        sa_flags;
               void     (*sa_restorer)(void);
           };
static void timer_wait(void)
{
    sigset_t mask;
    sigprocmask(0, NULL, &mask);
    sigdelset(&mask, SIGALRM);
    sigsuspend(&mask);
}

timer_wait() -> 等待 SIGALRM 訊號,當接收到訊號後進入 timer_handler 再進入 schedule

  • sigprocmask 將當前的訊號遮罩集保存至 mask 中,並使用 sigdelset 刪除 SIGALRM 訊號,使其不會被阻塞。
  • sigsuspend 暫時將 mask 給定的遮罩取代執行緒的訊號遮罩,直到接收到 SIGALRM 訊號,使執行緒恢復執行。

sigsuspend(2) — Linux manual page
sigsuspend() temporarily replaces the signal mask of the calling thread with the mask given by mask and then suspends the thread until delivery of a signal whose action is to invoke a signal handler or to terminate a process.

static void timer_handler(int signo, siginfo_t *info, ucontext_t *ctx)
{
    if (preempt_count) /* once preemption is disabled */
        return;

    /* We can schedule directly from sighandler because Linux kernel cares only
     * about proper sigreturn frame in the stack.
     */
    schedule();
}

SIGALRM 訊號到達時,使用 schedule 進行排程

main

int main()
{
    timer_init();
    task_init();

    task_add(sort, "1"), task_add(sort, "2"), task_add(sort, "3");

    preempt_disable();
    timer_create(10000); /* 10 ms */

    while (!list_empty(&task_main.list) || !list_empty(&task_reap)) {
        preempt_enable();
        timer_wait();
        preempt_disable();
    }

    preempt_enable();
    timer_cancel();

    return 0;
}

執行流程:

  1. 初始化 timer 及 task
  2. 新增三個任務,每個任務都執行隨機數列的排序,並將其加入到任務的鏈節串列。需注意到 makecontext 設置了新任務的入口點為 task_trampoline
  3. 禁止搶佔,保持創建 timer 的 Atomic,且每隔 10000 usecs 發送一次 SIGARLM 訊號
  4. while 循環中反覆等待 SIGARLM 訊號,每次接收到訊號進行 context switch
  5. 在等待的過程 timer_wait 中允取發生搶佔

    當任務 1 收到 SIGARLM 訊號後將工作讓給任務 2 或 任務 3 皆是有可能?

  6. timer_wait 接收到訊號後,交給 schedule 函式進行 context switch
  7. 若當任務首次搶佔時,task_trampoline 會恢復 timer 的訊號,因原先會阻塞 SIGARLM ,避免創建資料的搶佔造成資料不一致?

    task_add 函式中可以看到

  8. 最後 timer_cancel 取消 timer

問題:

  • 如何決定排程器的下個任務?
    • 當接收到 SIGALRM 訊號時,會觸發 timer_handler ,在此函式中使用 schedule 函式進行排程
  • 清除的時機 ?

案例 1 : list_sort 整合 preemptive coroutine

起步時遇到的問題:

在整合的過程中遇到一些困難,在 preempt_sched 中的我不太能理解其 qsort_r 實作的方式,因此我該如何將 quicksort 改成能支持 coroutine 的形式,並整合至 preemptive coroutine 之中?

我原先案例 1 要使用 quicksort 整合 preemptive coroutine

我先將 lab0-c 中實作過得 list_sort 整合至 preemptive coroutine ,主要因為 list_sort 能找到完整的程式碼,了解其運作原理。

整合中的問題:

  1. random_shuffle() 的意義,相較使用 random()
  2. 思考何時該禁止搶佔的發生?
    目前已知在創建鏈結串列的節點、打印訊息時禁止搶佔,但禁止搶佔的依據是什麼?我一直無法確定。
  3. 在 coroutine 中我是否應該注意我的 list_sort 是否 thread safe ?
  4. 請問該如何寫好 README

實作
完整程式碼 -> github

參考 preempt_sched 的模擬搶佔情境,並將 qsort_r 更改成 list_sort ,其中 list_sort 取自 linux kernel ,是一種 merge sort 的實作方式改良,更多說明可以參考 lab0-c

list_sort 為雙向的鏈結串列,將節點包裝成 element_t 的結構體,結構體包含 value 與可以連接至上下節點的 list ,其中每個節點都有隨機的 value 值,每個 task 的工作即將各自的鏈結串列完成排序。

+ typedef struct {
+     uint32_t value;
+     struct list_head list;
+ } element_t;

比較函式 cmp

先看原程式碼中使用到的比較技巧

static int cmp_u32(const void *a, const void *b, void *arg)
{
    uint32_t x = *(uint32_t *) a, y = *(uint32_t *) b;
    uint32_t diff = x ^ y;
    if (!diff)
        return 0; /* *a == *b */
    diff = diff | (diff >> 1);
    diff |= diff >> 2;
    diff |= diff >> 4;
    diff |= diff >> 8;
    diff |= diff >> 16;
    diff ^= diff >> 1;
    return (x & diff) ? 1 : -1;
}

舉例 x = 199y = 39

x 1100 0111
| y 0010 0111
= diff 1110 0111
| diff >> 1 0111 0011
= diff 1111 0111
| diff >> 2 0011 1101
= diff 1111 1111
| diff >> 4 0000 1111
= diff 1111 1111
diff 1111 1111
^ diff >> 1 0111 1111
= diff 1000 0000

最後 diff 可以得到有差異的最高位,(x & diff) ? 1 : -1 即可比較出兩數字之間的大小。

將其改寫成可以連接鏈結串列

static int cmp(void *priv,
               const struct list_head *a,
               const struct list_head *b)
{
    element_t *node_a = list_entry(a, element_t, list);
    element_t *node_b = list_entry(b, element_t, list);
    uint32_t diff = node_a->value ^ node_b->value;
    if(!diff)
        /* *a == *b */
        return 0;
    diff = diff | (diff >> 1);
    diff |= diff >> 2;
    diff |= diff >> 4;
    diff |= diff >> 8;
    diff |= diff >> 16;
    diff ^= diff >> 1;
    return (node_a->value & diff) ? 1 : -1;
}

在加入 list_sort 之前需重新定義

#define likely(x) __builtin_expect(!!(x), 1)
#define unlikely(x) __builtin_expect(!!(x), 0)

typedef int
    __attribute__((nonnull(2, 3))) (*list_cmp_func_t)(void *,
                                                      const struct list_head *,
                                                      const struct list_head *);
typedef uint8_t u8;

最後改寫 sort() 使其能與鏈結串列結合並進行排序的工作。

static void sort(void *arg)
{
    char *name = arg;

    preempt_disable();
-   uint32_t *arr = malloc(ARR_SIZE * sizeof(uint32_t));
+   struct list_head head;
+   INIT_LIST_HEAD(&head);

+   uint32_t r = getpid();

+   for (int i = 0; i < ARR_SIZE; i++) {
+       element_t *node = malloc(sizeof(element_t));
+       node->value = (r = random_shuffle(r));
+       list_add_tail(&node->list, &head);
+   }
    preempt_enable();

    task_printf("[%s] %s: begin\n", name, __func__);

-   uint32_t r = getpid();
-   for (int i = 0; i < ARR_SIZE; i++)
-       arr[i] = (r = random_shuffle(r));

    task_printf("[%s] %s: start sorting\n", name, __func__);

-   qsort_r(arr, ARR_SIZE, sizeof(uint32_t), cmp_u32, name);
+   list_sort(NULL, &head, cmp);

-   for (int i = 0; i < ARR_SIZE - 1; i++)
-       if (arr[i] > arr[i + 1]) {
-           task_printf("[%s] %s: failed: a[%d]=%u, a[%d]=%u\n", name, __func__,
-                       i, arr[i], i + 1, arr[i + 1]);
-           abort();
-       }
+   element_t *node, *safe;
+   list_for_each_entry_safe(node, safe, &head, list)
+   {
+       if (&safe->list != &head && node->value > safe->value) {
+           task_printf("[%s] %s: failed: node->value=%u, safe->value=%u\n", name, __func__,
+                       node->value, safe->value);
+           abort();
+       }
+   }
    task_printf("[%s] %s: end\n", name, __func__);

    preempt_disable();
+   struct list_head *pos, *tmp;
+   list_for_each_safe (pos, tmp, &head) {
+       element_t *node = list_entry(pos, element_t, list);
+       free(node);
+   }   
-   free(arr);
    preempt_enable();
}

輸出結果:

$ make
cc -O2 -Wall -std=gnu99 -o task_sched task_sched.c

$ ./task_sched 
[1] sort: begin
[1] sort: start sorting
[2] sort: begin
[2] sort: start sorting
[3] sort: begin
[3] sort: start sorting
[3] sort: end
[2] sort: end
[1] sort: end

$ ./task_sched 
[1] sort: begin
[1] sort: start sorting
[2] sort: begin
[2] sort: start sorting
[3] sort: begin
[3] sort: start sorting
[2] sort: end
[3] sort: end
[1] sort: end

為何會造成 3 個 task 工作結束的不一致

  • 排序值的隨機性,造成處理時間不一樣
  • 任務切換

案例 2 : Timsort 整合 preemptive coroutine

回顧第一周的測驗 2 來了解 Timsort 在 Merge sort 的基礎上做了哪些改進

Timsort 結合 Merge sort 和 Insertion Sort,改進的目的有以下:

  1. 加速 merge 的過程
  2. 減少合併次數
  3. 在處理有部分已排序的資料下,有更好的表現

值得一提的是, Timsort 將資料由 run 所構成,這裡的 run 由部分已排列的資料所構成。
那 run 的長度該怎麼決定,定義一個 minrun (最小運行長度),用以表示每個 run 的最小長度。有幾點需要我們考慮到:

  • minrun 不宜太長,會造成花費更多時間在執行插入排序
  • minrun 不宜太短,否則在進行 merge 時次數變多
  • 理想情況下 run 的數量等於或者略小於 2 的冪,效率會最高

find_run 函式會尋找符合遞增順序的元素,並組成一個 run ,接著使用 merge_collapse 來讓 run 的數量保持平衡,且須滿足以下條件:

  • 假設有三個 run 分別為 A、B、C
    • A > B + C
    • B > C

只要有一項不符合則將 B 與 A、C 中較小者進行合併,舉例說明:


合併分成兩種:

  • 在這我們假設有 run A 及 run B
A : [3, 7, 11, 12, 13, 31, 45]
B : [21, 22, 24, 24, 29, 300, 400]
  1. merge sort 從 A、 B 的開頭逐一比較來進行合併,稱為 one-pair-at-a-time
  2. Timsort 的合併基於 merge sort 做出改進,首先將 A 與 B 中長度較短者放入一塊臨時記憶區 temp (這裡 A 為較短者)
A : []
B : [21, 22, 24, 24, 29, 300, 400]
temp : [3, 7, 11, 12, 13, 31, 45]

接著 B 的第一個元素

B0 會去找在 A 的何處 (
A4
A5
之間),並將
A0
~
A4
放回 A 數組中

A : [3, 7, 11, 12, 13]
B : [21, 22, 24, 24, 29, 300, 400]
temp : [31, 45]

接著將 temp[0] 比對 B ,介於 B[4] 與 B[5] 之間

A : [3, 7, 11, 12, 13, 21, 22, 24, 24, 29]
B : [300, 400]
temp : [31, 45]

以此類推

A : [3, 7, 11, 12, 13, 21, 22, 24, 24, 29, 31, 45]
B : [300, 400]
temp : []

==>

A : [3, 7, 11, 12, 13, 21, 22, 24, 24, 29, 31, 45, 300, 400]
B : []
temp : []

這種合併方法被稱為 Galloping mode 。


TODO : 待整合