執行人: jeremy90307
錄影解說 : Linux 核心專題:排程器原理 (youtube)
強化對並行程式設計的認知。
eleanorLYJ
討論 false sharing 的敘述不夠精確,應強調 CPU0 與 CPU1共享了同一塊的 cacheline,另外還有因為 cache coherence 機制,才導致效能降低
圖中有兩執行緒,當 CPU0 不斷的改變變數 X , CPU1 的 cache 該變數會變為無效,即使他並沒有修改此變量。這會導致頻繁的高速緩存無效和重新載入操作,從而降低性能
我確實在 false sharing 這裡解釋不夠完善,我會再將其補進我的另個筆記,但在這裡我主要是放上我在閱讀教材時遇到的問題,感謝同學的指教。
jeremy90307
因系列講座內容較多,因此另外開個筆記紀錄 -> 研讀〈並行和多執行緒程式設計〉系列講座
集中於本頁面,專注於「紀錄問題」和「改進/討論」,已在教材出現的內容就不用建立副本
多工處理
問題 1
不對等 (heterogeneous) 的執行單元,代表的是什麼意思? 在系統中會有不對等是指優先順序嗎?
問題 2
同進進行的說法是否有瑕疵?因為在並行程式中使用快速切換來達到看似同時切換而已。
我整理多重程式、多工、多執行緒、多重處理的表格如下
Multiprogramming 多重程式 | Multitasking 多工 | Multithreading 多執行緒 | Multiprocessing 多重處理 | |
---|---|---|---|---|
定義 | 在單一 CPU 上執行多個程式 | 在單一 CPU 上執行多個工作 | 在單一工作上運行多個執行緒 | 在多個 CPU 上運行多個行程 |
共享資源 | 資源(CPU、記憶體)在程式之間共享 | 資源(CPU、記憶體)在工作之間共享 | 資源(CPU、記憶體)在執行緒之間共享 | 每個行程都有自己一組的資源(CPU、記憶體) |
排程 | 使用循環或基於優先權的排程為程式分配 CPU 時間 | 使用基於優先權或時間分割的調度為工作分配 CPU 時間 | 使用基於優先權或時間分割的調度為執行緒分配 CPU 時間 | 每個行程可以有自己的排程演算法 |
記憶體管理 | 每個程式都有自己的記憶體空間 | 每個工作都有自己的記憶體空間 | 執行緒在工作內共享記憶體空間 | 每個行程都有自己的記憶體空間 |
工作切換 | 需要工作切換才能在程式之間切換 | 需要工作切換才能在工作之間切換 | 需要工作切換才能在執行緒之間切換 | 需要工作切換才能在行程之間切換 |
Inter-Process Communication (IPC) | 使用訊息傳遞或共享記憶體進行 IPC | 使用訊息傳遞或共享記憶體進行 IPC | 使用 IPC 的執行緒同步機制(例如 lock、semaphores) | 使用 IPC 的進程間通訊機制(例如 pipes、sockets) |
coroutine 的實作是否涵蓋多工與多執行緒,雖然單一 CPU 上執行多個工作,但工作內容(如 qsort_r
)不是應該確保 thread safe 的情況嗎?
下圖為單一 CPU 同一時間只執行一個任務,但利用快速切換的方式,看似一次執行多的任務。
頻繁的快速切換帶來的優勢?
簡述 race condition 的問題
假設兩執行緒各自將全域變數的值 +1
Thread 1 | Thread 2 | value | |
---|---|---|---|
0 | |||
read | <- | 0 | |
increase | 0 | ||
write back | -> | 1 | |
read | <- | 1 | |
increase | 1 | ||
write back | -> | 2 |
若沒有使用 lock 或是同步機制,可能遇到下面問題
Thread 1 | Thread 2 | value | |
---|---|---|---|
0 | |||
read | <- | 0 | |
read | <- | ||
increase | 0 | ||
increase | 0 | ||
write back | -> | 1 | |
write back | -> | 1 |
結果並非預期的 2 而是 1。
上述問題的解決辦法有使用
int CAS(int *reg, int old, int new)
{
ATOMIC();
int tmp = *reg;
if(tmp == old)
*reg = new;
END_ATOMIC();
return tmp;
}
若有 Atomic 指令來確保資料的一致性,為何還需要用到 mutex lock 等方式?能非阻塞就不會影響到其他執行緒的進行不是更有效率嗎?
false sharing (偽共享)
問題
如何做到抑制 false sharing ? 共筆中有提到使用對齊的解決方法,但為何對齊 16 就可以避免 false sharing ?
考慮以下程式碼:
alignas(16) int x;
printBinary(&x);
printf("%ld\n", alignof(x));
對應的執行輸出:
_0000_0000_0000_0000_0000_0000_1010_1101_1001_0100_1111_1111_1111_1000_1101_0000
16
因為 int x 變數的地址會對齊 16 (單位:位元組_,亦即
TODO : 尚未閱讀 Memory Ordering 和 Barrier、處理器架構和其 Memory Order、wait-free & lock-free
在此章節主要跟著教材學習 pthread 相關程式設計。
研讀〈並行和多執行緒程式設計〉_POSIX Thread 連結有相關實作說明
在測試和驗證中補充了些筆記即註解,使這個實作更加易懂
模擬情境:
給定 16 個工作節點
16 個節點共享一個 clock
,clock
從 tick = 1 開始累計,節點在每個 tick 都有必須完成的事:在偶數 tick 時想像成是完成階段任務,可改變自身狀態為就緒並通知其子節點繼續後續任務,而在奇數 tick 時則推動時間讓 tick += 1。
結構體
struct clock {
mutex_t mutex;
cond_t cond;
int ticks;
};
struct node {
struct clock *clock; // Each node points to a shared clock
struct node *parent; // Point to the parent node
mutex_t mutex;
cond_t cond;
bool ready; // Proceed to the next node only when ready is true
};
clock
中的 ticks
決定是否繼續下一步驟,同步上使用 condition variable + mutex 機制去通知此事件,且通知應該是用 broadcast 方式讓所有正在等待的執行緒得知在 POSIX_Thread 中 condition variable 的實作中因只有一個執行緒在等待,使用
pthread_cond_signal
node
中 ready
主要讓結點能進行下個工作,因此當親代節點就緒時,主動通知此事。同樣需要 condition variable + mutex 機制,但以 signal 方式通知子節點就好。thread_func(void *ptr)
static void *thread_func(void *ptr)
{
struct node *self = ptr;
bool bit = false;
for (int i = 1; clock_wait(self->clock, i); ++i) {
printf("Thread [%c] | i : %d\n", self->name, i);
if (self->parent){
printf("Thread [%c] wait parent\n", self->name);
node_wait(self->parent);
}
if (bit) {
printf("Thread [%c] send signal\n", self->name);
node_signal(self);
} else {
printf("Thread [%c] trigger clock\n", self->name);
clock_tick(self->clock);
}
bit = !bit;
}
node_signal(self);
return NULL;
}
main(void)
int main(void)
{
struct clock clock;
clock_init(&clock);
#define N_NODES 3
struct node nodes[N_NODES];
node_init(&clock, NULL, &nodes[0]);
for (int i = 1; i < N_NODES; ++i)
node_init(&clock, &nodes[i - 1], &nodes[i]);
printf("\nParent releationship : NULL ");
for(int i = 0; i < N_NODES; ++i)
printf(" -> [%c] ", nodes[i].name);
printf("\n");
pthread_t threads[N_NODES];
for (int i = 0; i < N_NODES; ++i) {
if (pthread_create(&threads[i], NULL, thread_func, &nodes[i]) != 0)
return EXIT_FAILURE;
}
printf("Tick start~\n");
clock_tick(&clock);
clock_wait(&clock, 1 << N_NODES);
clock_stop(&clock);
printf("\nTick stop~\n");
for (int i = 0; i < N_NODES; ++i) {
if (pthread_join(threads[i], NULL) != 0)
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
main
執行緒呼叫第一個 clock_tick
來讓 tick
變為 1
,這樣其他執行緒就可以開始根據 tick
逐步進行。而這裡的 clock_wait
會一直等待 tick
到 1 << N_NODES
之後再用 clock_stop
來讓節點的執行緒得以結束。
稍微改寫 main.c 使其更容易了解每個執行緒的進行過程,以下為輸出結果:
在執行程式時會出現下面錯誤,想請問為何會有這樣的問題?
FATAL: ThreadSanitizer: unexpected memory mapping 0x5c9bd4d2b000-0x5c9bd4d4b000
輸入 sudo sysctl vm.mmap_rnd_bits=28
來解決問題
參考 FATAL: ThreadSanitizer: unexpected memory mapping when running on Linux Kernels 6.6+
Parent releationship : NULL -> [A] -> [B] -> [C]
Tick start~
============clock_tick() tick : 1============
Thread [C] | i : 1
Thread [C] wait parent
Thread [B] | i : 1
Thread [B] wait parent
Thread [A] | i : 1
Thread [A] trigger clock
============clock_tick() tick : 2============
Thread [A] | i : 2
Thread [A] send signal
Thread [A] becomes not ready.
Thread [B] trigger clock
============clock_tick() tick : 3============
Thread [B] | i : 2
Thread [B] wait parent
Thread [A] | i : 3
Thread [A] trigger clock
============clock_tick() tick : 4============
Thread [A] | i : 4
Thread [A] send signal
Thread [A] becomes not ready.
Thread [B] send signal
Thread [B] | i : 3
Thread [B] becomes not ready.
Thread [C] trigger clock
============clock_tick() tick : 5============
Thread [B] wait parent
Thread [A] | i : 5
Thread [A] trigger clock
Thread [C] | i : 2
Thread [C] wait parent
============clock_tick() tick : 6============
Thread [A] | i : 6
Thread [A] send signal
Thread [A] becomes not ready.
Thread [B] trigger clock
============clock_tick() tick : 7============
Thread [B] | i : 4
Thread [B] wait parent
Thread [A] | i : 7
Thread [A] trigger clock
============clock_tick() tick : 8============
Thread [A] | i : 8
Thread [A] send signal
Tick stop~
Thread [A] becomes not ready.
Thread [B] send signal
Thread [B] becomes not ready.
Thread [C] send signal
clock_tick
先將 tick 增加 1 接著進入 clock_wait
中進行等待,一直到 clock->ticks
等於 Thread [A]
、Thread [B]
、Thread [C]
來執行這三個 node ,且他們之間的關係為 NULL -> [A] -> [B] -> [C]
thread_func
,且預設的 bit
為 false
,每次迭代都會進行反轉,使其執行不同工作內容
============clock_tick() tick : 2============
Thread [A] | i : 2
Thread [A] send signal
Thread [A] becomes not ready.
Thread [B] trigger clock
在 tick : 2 中
Thread [A]
發送節點的訊號喚醒正在等待的 Thread [B]
(喚醒時 Thread [A]
轉變為 ready 因此 Thread [B]
才能執行工作,此時 Thread [A]
交棒給 Thread [B]
後再次轉變為 not ready)Thread [B]
因第一次執行工作 bit = false
,所以進行 tick 加一的工作============clock_tick() tick : 3============
Thread [B] | i : 2
Thread [B] wait parent
Thread [A] | i : 3
Thread [A] trigger clock
在 tick : 3 中
Thread [A]
尚未 ready ,因此 Thread [B]
再次進入等待親代節點的狀態Thread [A]
不受親代節點影響繼續執行工作============clock_tick() tick : 4============
Thread [A] | i : 4
Thread [A] send signal
Thread [A] becomes not ready.
Thread [B] send signal
Thread [B] | i : 3
Thread [B] becomes not ready.
Thread [C] trigger clock
在 tick : 4 中
Thread [A]
發送節點的訊號喚醒正在等待的 Thread [B]
(喚醒時 Thread [A]
轉變為 ready 因此 Thread [B]
才能執行工作,此時 Thread [A]
交棒給 Thread [B]
後再次轉變為 not ready)Thread [B]
因為 bit
的關係工作內容變成發送節點訊號喚醒正在等待的 Thread [C]
後續都是相似的步驟,因此不再贅述
最後在 tick : 8 中, clock->ticks
等於 clock_stop(&clock)
將 ticks 的值設為 -1
,導致在執行緒函式中 for
迴圈中止。
為何在 thread_func
最後需要在進行 node_signal(self)
?
補充筆記:實作 Priority-inheritance mutex
在 mutex/pi-test/main.c 函式中主要建立一個 Priority inversion 的情況
定義一個具集 TRY(f)
用來檢視錯誤,其中 #f
將參數 f
轉換為字符串,這樣可以在錯誤訊息中顯示出具體是哪個函式失敗。
#define TRY(f) \
do { \
int __r; \
if ((__r = (f != 0))) { \
fprintf(stderr, "Run function %s = %d\n", #f, __r); \
return __r; \
} \
} while (0)
ctx_init
函式中的 mutexattr_setprotocol
設置了優先級繼承,這樣可以避免優先級反轉。
static void ctx_init(struct ctx *ctx)
{
mutexattr_t mattr;
mutexattr_setprotocol(&mattr, PRIO_INHERIT);
mutex_init(&ctx->m0, &mattr);
}
#include <pthread.h>
int pthread_mutexattr_setprotocol(pthread_mutexattr_t *attr, int protocol);
attr
: 指向互斥鎖屬性的指標
protocol
: 協議屬性
PTHREAD_PRIO_INHERIT
: 使用優先級繼承PTHREAD_PRIO_PROTECT
: 使用優先級上限PTHREAD_PRIO_NONE
: 不使用優先級繼承或上限的策略執行
sudo
執行,因為牽涉對執行緒優先權的調整,需要 root 權限sudo taskset -c 1
問題 1
在研讀的過程中 pthread_create_with_prio
函式主要用作創建一個具有優先及的執行緒,當中 sched_param
的結構體又是從何而來?在完整程式碼中看起來沒有引用到排程器相關標頭檔。
部份程式碼
static int pthread_create_with_prio(pthread_t *thread,
pthread_attr_t *attr,
void *(*start_routine)(void *),
void *arg,
int prio)
{
struct sched_param param;
param.sched_priority = prio;
TRY(pthread_attr_setschedparam(attr, ¶m));
TRY(pthread_create(thread, attr, start_routine, arg));
return 0;
}
在 pthread_setschedparam(3) — Linux manual page 找到 sched_param
結構體
struct sched_param {
int sched_priority; /* Scheduling priority */
};
pthread_attr_setschedpolicy(&attr, SCHED_FIFO)
pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED)
理解這兩 API 代表意思
int pthread_attr_setinheritsched(pthread_attr_t *attr,int inheritsched);
attr
: 指向執行緒屬性的指標
inheritsched
: 是否繼承父執行緒的排程屬性
PTHREAD_EXPLICIT_SCHED
: 不繼承,新執行緒使用由 pthread_attr_setschedparam
和 pthread_attr_setschedpolicy
設置的排程屬性,執行緒PTHREAD_INHERIT_SCHED
: 繼承父執行緒的排程屬性返回值:成功返回 0
,失敗返回非 0 值
int pthread_attr_setschedpolicy(pthread_attr_t *attr, int policy);
attr
: 指向執行緒屬性的指標
policy
: 排程屬性
SCHED_FIFO
: 搶佔式排程,即會被優先即更高的執行緒搶佔SCHED_RR
: 輪詢式排程,擁有同樣優先權的執行緒會以一固定時間量,又稱「時間配量」(time slice),用輪詢方式執行。SCHED_OTHER
: 為 Linux 中默認。僅用於靜態優先級為 0 的執行緒(i.e., threads under real-time policies always have priority over SCHED_OTHER
processes),從靜態優先級 0 列表中選擇要運行的執行緒,由動態優先級決定(即 nice 值)。SCHED_OTHER
執行緒之間的公平進展。返回值:成功返回 0
,失敗返回非 0 值
int pthread_attr_setschedparam(pthread_attr_t *restrict attr,
const struct sched_param *restrict param);
主要用作設定優先級
返回值:成功返回 0
,失敗返回非 0 的值
問題 2
請問若沒有給定優先權的級別,系統的判定如何決定下個執行的執行緒?
TODO : 理解 nice 值、CFS
測試
主要測試排程策略相關問題。
完整測試程式 -> mutex/test_priority
基於問題 2 進行相關測試。設定三個執行緒,Thread 1 與 Thread 2 將優先級設置在 1 (其中優先級設置範圍 1 ~ 99),Thread 使用預設屬性(即 SCHED_OTHER
),並進行兩種排程屬性:SCHED_FIFO
、SCHED_RR
測試
SCHED_FIFO
$ make run
sudo taskset -c 1 /home/jeremy/posix_thread/mutex/test_priority/test_priority
1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
在搶佔式的排程中,Thread 1 並沒有讓出資源給 Thread 2 執行。
問題 3
當這種靜態優先級與動態優先級同時出現時,如何決定動態優先級的執行緒何時能執行?在 SCHED_FIFO
的結果中看到 Thread 3 並沒有成功執行。
SCHED_RR
$ make run
sudo taskset -c 1 /home/jeremy/posix_thread/mutex/test_priority/test_priority
1 1 1 1 1 1 2 1 2 3 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 3 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 3 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 3 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 3 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 3 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2
可以看到 Thread 1 與 Thread 2 將資源的平均分配給不同執行緒,Thread 3 的非即時執行緒根據動態優先級偶爾也能分配到資源。
priority protection mutex 設定 :
muthread_mutexattr_setprioceiling
: 設定 mutex 的 prioceiling
muthread_mutexattr_setprotocol(&mattr, TBTHREAD_PRIO_INHERIT);
muthread_mutexattr_setprioceiling(&mattr, 25);
muthread_mutex_init(&mutex_normal, &mattr);
muthread_mutex_init(&mutex_normal_2, &mattr);
在測驗程式 Tests/test-06-priority-inversion-PP.c 中,我們在已經設定好會發生 priority inversion 的情況下去實作出 priority protection mutex 是否會過於不實際,若面對到更複雜的情況下使用 PP 是否對效能真的有更多提升?該如何驗證?
我在問題中提到的不實際是因為我們已知任務的優先順序,故將某任務的優先級提升至某執行緒優先級之上,在系統中是否是常見的作法?
以隨堂測驗和作業題目 (如 Timsort) 為展示標的,設計 preemptive coroutine 整合的相關實驗至少二個,達成並行處理並確認執行結果符合預期
在整合相關實作之前,先嘗試了解在 並行程式設計: 排程器原理 中 preemptive coroutine 的實作方法,才能延伸至其他案例中。
範例 : preempt_sched
preempt_count
用作決定是否允許搶佔,當大於 0 表示禁止搶佔中斷處理
在了解中斷處理時出現幾個陌生的函式及結構體。
sigset_t
用來表示訊號集,其初始化方式有兩種:
sigemptyset (sigset_t * set )
指定訊號集為空sigfillset (sigset_t * set )
指定訊號集為滿(即阻塞所有訊號),然後使用 sigdelset
單獨刪除指定的訊號(即不阻塞指定訊號),反之加入使用 sigaddset
。sigprocmask
-> ???
int sigprocmask(int how, const sigset_t *_Nullable restrict set,
sigset_t *_Nullable restrict oldset);
sigprocmask()
is used to fetch and/or change the signal mask of the calling thread. The signal mask is the set of signals whose delivery is currently blocked for the caller
不太能理解這裡所謂的遮罩意思,描述裡提及可以透過 how
來改變訊號集的狀態
SIG_BLOCK
:「阻塞」這些信號SIG_UNBLOCK
:「解除阻塞」這些信號SIG_SETMASK
:將當前的信號遮罩集設置為 set
指向的信號集,即「覆蓋」當前的信號遮罩集。static void local_irq_save(sigset_t *sig_set)
{
sigset_t block_set;
sigfillset(&block_set);
sigdelset(&block_set, SIGINT);
sigprocmask(SIG_BLOCK, &block_set, sig_set);
}
-> 用作阻塞除了 SIGINT
以外的所有訊號,並保存原先的訊號狀態 sig_set
static void local_irq_restore(sigset_t *sig_set)
{
sigprocmask(SIG_SETMASK, sig_set, NULL);
}
-> 恢復至原來的訊號狀態
當
sigprocmask
的第三個參數oldset
為 NULL 時,表示不需要保存之前的訊號狀態
task
task_add()
加入一個新任務
首先使用 getcontext
來取得當前的 context,接著在函式中呼叫 makecontext
前必須先設置新的 stack 及下個 context 的地址,如下:
task->context.uc_stack.ss_sp = task->stack;
task->context.uc_stack.ss_size = 1 << 20;
task->context.uc_stack.ss_flags = 0;
task->context.uc_link = NULL;
ss_flags
-> 用處為何?
uc_link
為當前 context 執行結束後要執行的下個 context ,若為 NULL
則當前 context 執行完後結束。
在呼叫 makecontext
之前為何需要先設置 stack 大小 ss_size
?
Stack Overflow 當中解答提到 Because stack on some machine "grows toward numerically lower addresses" (like it does on x86 architecture and wiki x86) the makecontext needs/wants to place it data on the end of the stack. So it needs to determinate the end of stack and this is what ss_size is used for.
makecontext(&task->context, (void (*)(void)) task_trampoline, 2, ptr.i[0],
ptr.i[1]);
makecontext
先設置要輸入給指定函式 task_trampoline
的參數數量 2
及參數 ptr.i[0]
, ptr.i[1]
,使 context switch 時能執行特定函式。
注意:確保當新任務第一次被切換時,其 timer signal 是被阻塞的
sigaddset(&task->context.uc_sigmask, SIGALRM);
trampoline
Trampolines (sometimes referred to as indirect jump vectors) are memory locations holding addresses pointing to interrupt service routines, I/O routines, etc. Execution jumps into the trampoline and then immediately jumps out, or bounces, hence the term trampoline.
__attribute__((noreturn)) static void task_trampoline(int i0, int i1)
{
union task_ptr ptr = {.i = {i0, i1}};
struct task_struct *task = ptr.p;
/* We switch to trampoline with blocked timer. That is safe.
* So the first thing that we have to do is to unblock timer signal.
* Paired with task_add().
*/
local_irq_restore_trampoline(task);
task->callback(task->arg);
task->reap_self = true;
schedule();
__builtin_unreachable(); /* shall not reach here */
}
noreturn
關鍵字告訴編譯器此函式不返回原來調用者函式的位置。
這裡問題有點大
task->callback(task->arg)
??? 貌似跑去執行排序任務一直到完成排序,不能理解為何
排程
static void schedule(void)
{
sigset_t set;
local_irq_save(&set);
struct task_struct *next_task =
list_first_entry(&task_current->list, struct task_struct, list);
if (next_task) {
if (task_current->reap_self)
list_move(&task_current->list, &task_reap);
task_switch_to(task_current, next_task);
}
struct task_struct *task, *tmp;
list_for_each_entry_safe (task, tmp, &task_reap, list) /* clean reaps */
task_destroy(task);
local_irq_restore(&set);
}
next_task
使用 list_first_entry
選擇下一個被排程的任務(即為當前任務的下一個鏈節串列節點),若 next_task
存在使用 task_switch_to()
來進行 context switch 。
static void task_switch_to(struct task_struct *from, struct task_struct *to)
{
task_current = to;
swapcontext(&from->context, &to->context);
}
swapcontext
將任務從 from->context
切換到 to->context
,且 swapcontext
會保存當前 CPU 狀態。
task_current->reap_self
表示任務是否完成,若完成則將其至於一個待清除的鏈節串列。
timer
在此案例中創建一個每 10 ms 發送 SIGALRM
訊號給行程來模擬中斷
static void timer_create(unsigned int usecs)
{
ualarm(usecs, usecs);
}
ualarm
第一個參數為第一次發送 SIGALRM
的時間間隔,第二個參數為間隔多久發送 SIGALRM
。
在 timer_cancel()
中設置 ualarm(0, 0)
,這樣 timer 會被取消。
static void timer_init(void)
{
struct sigaction sa = {
.sa_handler = (void (*)(int)) timer_handler,
.sa_flags = SA_SIGINFO,
};
sigfillset(&sa.sa_mask);
sigaction(SIGALRM, &sa, NULL);
}
sigaction
設置處理函式為 timer_handler
問題:
看到 sa_flags
被設置為 SA_SIGINFO
時,查了他的作用卻還是不能理解這段話的意思,在結構體中為何不直接設置 sa_sigaction
,而要使用這種替代方式?
If
SA_SIGINFO
is specified in sa_flags, then sa_sigaction
(instead of sa_handler) specifies the signal-handling function
for signum. This function receives three arguments, as described
below.
struct sigaction {
void (*sa_handler)(int);
void (*sa_sigaction)(int, siginfo_t *, void *);
sigset_t sa_mask;
int sa_flags;
void (*sa_restorer)(void);
};
static void timer_wait(void)
{
sigset_t mask;
sigprocmask(0, NULL, &mask);
sigdelset(&mask, SIGALRM);
sigsuspend(&mask);
}
timer_wait()
-> 等待 SIGALRM
訊號,當接收到訊號後進入 timer_handler
再進入 schedule
sigprocmask
將當前的訊號遮罩集保存至 mask
中,並使用 sigdelset
刪除 SIGALRM
訊號,使其不會被阻塞。sigsuspend
暫時將 mask 給定的遮罩取代執行緒的訊號遮罩,直到接收到 SIGALRM
訊號,使執行緒恢復執行。sigsuspend(2) — Linux manual page
sigsuspend() temporarily replaces the signal mask of the calling thread with the mask given by mask and then suspends the thread until delivery of a signal whose action is to invoke a signal handler or to terminate a process.
static void timer_handler(int signo, siginfo_t *info, ucontext_t *ctx)
{
if (preempt_count) /* once preemption is disabled */
return;
/* We can schedule directly from sighandler because Linux kernel cares only
* about proper sigreturn frame in the stack.
*/
schedule();
}
當 SIGALRM
訊號到達時,使用 schedule
進行排程
main
int main()
{
timer_init();
task_init();
task_add(sort, "1"), task_add(sort, "2"), task_add(sort, "3");
preempt_disable();
timer_create(10000); /* 10 ms */
while (!list_empty(&task_main.list) || !list_empty(&task_reap)) {
preempt_enable();
timer_wait();
preempt_disable();
}
preempt_enable();
timer_cancel();
return 0;
}
執行流程:
makecontext
設置了新任務的入口點為 task_trampoline
SIGARLM
訊號while
循環中反覆等待 SIGARLM
訊號,每次接收到訊號進行 context switchtimer_wait
中允取發生搶佔
當任務 1 收到 SIGARLM
訊號後將工作讓給任務 2 或 任務 3 皆是有可能?
timer_wait
接收到訊號後,交給 schedule
函式進行 context switchtask_trampoline
會恢復 timer 的訊號,因原先會阻塞 SIGARLM
,避免創建資料的搶佔造成資料不一致?
在
task_add
函式中可以看到
timer_cancel
取消 timer問題:
SIGALRM
訊號時,會觸發 timer_handler
,在此函式中使用 schedule
函式進行排程起步時遇到的問題:
在整合的過程中遇到一些困難,在 preempt_sched 中的我不太能理解其 qsort_r
實作的方式,因此我該如何將 quicksort 改成能支持 coroutine 的形式,並整合至 preemptive coroutine 之中?
我原先案例 1 要使用 quicksort 整合 preemptive coroutine
我先將 lab0-c 中實作過得 list_sort
整合至 preemptive coroutine ,主要因為 list_sort 能找到完整的程式碼,了解其運作原理。
整合中的問題:
random_shuffle()
的意義,相較使用 random()
?list_sort
是否 thread safe ?README
?實作:
完整程式碼 -> github
參考 preempt_sched 的模擬搶佔情境,並將 qsort_r
更改成 list_sort
,其中 list_sort
取自 linux kernel ,是一種 merge sort 的實作方式改良,更多說明可以參考 lab0-c 。
list_sort
為雙向的鏈結串列,將節點包裝成 element_t
的結構體,結構體包含 value
與可以連接至上下節點的 list
,其中每個節點都有隨機的 value
值,每個 task 的工作即將各自的鏈結串列完成排序。
+ typedef struct {
+ uint32_t value;
+ struct list_head list;
+ } element_t;
比較函式 cmp
先看原程式碼中使用到的比較技巧
static int cmp_u32(const void *a, const void *b, void *arg)
{
uint32_t x = *(uint32_t *) a, y = *(uint32_t *) b;
uint32_t diff = x ^ y;
if (!diff)
return 0; /* *a == *b */
diff = diff | (diff >> 1);
diff |= diff >> 2;
diff |= diff >> 4;
diff |= diff >> 8;
diff |= diff >> 16;
diff ^= diff >> 1;
return (x & diff) ? 1 : -1;
}
舉例 x = 199
與 y = 39
x | 1100 0111 | |
---|---|---|
| | y | 0010 0111 |
= | diff | 1110 0111 |
| | diff >> 1 | 0111 0011 |
= | diff | 1111 0111 |
| | diff >> 2 | 0011 1101 |
= | diff | 1111 1111 |
| | diff >> 4 | 0000 1111 |
= | diff | 1111 1111 |
… | … | |
diff | 1111 1111 | |
^ | diff >> 1 | 0111 1111 |
= | diff | 1000 0000 |
最後 diff
可以得到有差異的最高位,(x & diff) ? 1 : -1
即可比較出兩數字之間的大小。
將其改寫成可以連接鏈結串列
static int cmp(void *priv,
const struct list_head *a,
const struct list_head *b)
{
element_t *node_a = list_entry(a, element_t, list);
element_t *node_b = list_entry(b, element_t, list);
uint32_t diff = node_a->value ^ node_b->value;
if(!diff)
/* *a == *b */
return 0;
diff = diff | (diff >> 1);
diff |= diff >> 2;
diff |= diff >> 4;
diff |= diff >> 8;
diff |= diff >> 16;
diff ^= diff >> 1;
return (node_a->value & diff) ? 1 : -1;
}
在加入 list_sort
之前需重新定義
#define likely(x) __builtin_expect(!!(x), 1)
#define unlikely(x) __builtin_expect(!!(x), 0)
typedef int
__attribute__((nonnull(2, 3))) (*list_cmp_func_t)(void *,
const struct list_head *,
const struct list_head *);
typedef uint8_t u8;
最後改寫 sort()
使其能與鏈結串列結合並進行排序的工作。
static void sort(void *arg)
{
char *name = arg;
preempt_disable();
- uint32_t *arr = malloc(ARR_SIZE * sizeof(uint32_t));
+ struct list_head head;
+ INIT_LIST_HEAD(&head);
+ uint32_t r = getpid();
+ for (int i = 0; i < ARR_SIZE; i++) {
+ element_t *node = malloc(sizeof(element_t));
+ node->value = (r = random_shuffle(r));
+ list_add_tail(&node->list, &head);
+ }
preempt_enable();
task_printf("[%s] %s: begin\n", name, __func__);
- uint32_t r = getpid();
- for (int i = 0; i < ARR_SIZE; i++)
- arr[i] = (r = random_shuffle(r));
task_printf("[%s] %s: start sorting\n", name, __func__);
- qsort_r(arr, ARR_SIZE, sizeof(uint32_t), cmp_u32, name);
+ list_sort(NULL, &head, cmp);
- for (int i = 0; i < ARR_SIZE - 1; i++)
- if (arr[i] > arr[i + 1]) {
- task_printf("[%s] %s: failed: a[%d]=%u, a[%d]=%u\n", name, __func__,
- i, arr[i], i + 1, arr[i + 1]);
- abort();
- }
+ element_t *node, *safe;
+ list_for_each_entry_safe(node, safe, &head, list)
+ {
+ if (&safe->list != &head && node->value > safe->value) {
+ task_printf("[%s] %s: failed: node->value=%u, safe->value=%u\n", name, __func__,
+ node->value, safe->value);
+ abort();
+ }
+ }
task_printf("[%s] %s: end\n", name, __func__);
preempt_disable();
+ struct list_head *pos, *tmp;
+ list_for_each_safe (pos, tmp, &head) {
+ element_t *node = list_entry(pos, element_t, list);
+ free(node);
+ }
- free(arr);
preempt_enable();
}
輸出結果:
$ make
cc -O2 -Wall -std=gnu99 -o task_sched task_sched.c
$ ./task_sched
[1] sort: begin
[1] sort: start sorting
[2] sort: begin
[2] sort: start sorting
[3] sort: begin
[3] sort: start sorting
[3] sort: end
[2] sort: end
[1] sort: end
$ ./task_sched
[1] sort: begin
[1] sort: start sorting
[2] sort: begin
[2] sort: start sorting
[3] sort: begin
[3] sort: start sorting
[2] sort: end
[3] sort: end
[1] sort: end
為何會造成 3 個 task 工作結束的不一致
回顧第一周的測驗 2
來了解 Timsort 在 Merge sort 的基礎上做了哪些改進
Timsort 結合 Merge sort 和 Insertion Sort,改進的目的有以下:
值得一提的是, Timsort 將資料由 run 所構成,這裡的 run 由部分已排列的資料所構成。
那 run 的長度該怎麼決定,定義一個 minrun (最小運行長度),用以表示每個 run 的最小長度。有幾點需要我們考慮到:
在 find_run
函式會尋找符合遞增順序的元素,並組成一個 run ,接著使用 merge_collapse
來讓 run 的數量保持平衡,且須滿足以下條件:
只要有一項不符合則將 B 與 A、C 中較小者進行合併,舉例說明:
合併分成兩種:
A : [3, 7, 11, 12, 13, 31, 45]
B : [21, 22, 24, 24, 29, 300, 400]
one-pair-at-a-time
temp
(這裡 A 為較短者)A : []
B : [21, 22, 24, 24, 29, 300, 400]
temp : [3, 7, 11, 12, 13, 31, 45]
接著 B 的第一個元素
A : [3, 7, 11, 12, 13]
B : [21, 22, 24, 24, 29, 300, 400]
temp : [31, 45]
接著將 temp[0] 比對 B ,介於 B[4] 與 B[5] 之間
A : [3, 7, 11, 12, 13, 21, 22, 24, 24, 29]
B : [300, 400]
temp : [31, 45]
以此類推
A : [3, 7, 11, 12, 13, 21, 22, 24, 24, 29, 31, 45]
B : [300, 400]
temp : []
==>
A : [3, 7, 11, 12, 13, 21, 22, 24, 24, 29, 31, 45, 300, 400]
B : []
temp : []
這種合併方法被稱為 Galloping mode 。
TODO : 待整合