---
tags: linux2022
---
# 2022q1 Homework4 (quiz4)
contributed by < `SmallHanley` >
> [測驗題](https://hackmd.io/@sysprog/linux2022-quiz4)
## 測驗 `1`
:::success
解釋上述程式碼運作原理
:::
延伸[第 3 週測驗題的測驗](https://hackmd.io/@sysprog/linux2022-quiz3) `7`,已知輸入必為大於 `0` 的數值 (即 `x > 0`),以下程式碼可計算 $\lceil log(x) \rceil$,也就是 [ceil](https://man7.org/linux/man-pages/man3/ceil.3.html) 和 [log2](https://man7.org/linux/man-pages/man3/log2.3.html) 的組合並轉型為整數:
```c
int ceil_log2(uint32_t x)
{
uint32_t r, shift;
x--;
r = (x > 0xFFFF) << 4;
x >>= r;
shift = (x > 0xFF) << 3;
x >>= shift;
r |= shift;
shift = (x > 0xF) << 2;
x >>= shift;
r |= shift;
shift = (x > 0x3) << 1;
x >>= shift;
return (r | shift | x >> 1) + 1;
}
```
一開始的 `x--` 和最後 return 時的 `+1`,是用來計算 cell 的。若將上述兩者去掉,則整段程式可以計算 $\lfloor log(x) \rfloor$ (即 [floor](https://man7.org/linux/man-pages/man3/floor.3.html) 和 log2 的組合並轉型為整數)。剩餘的程式則是用 binary search,每次比較一半的位元數 (假設為 n,即與 $2^n - 1$ 比較),若比較大,則將 r 的第 ($log_2 n$) bit set。
:::success
改進程式碼,使其得以處理 `x = 0` 的狀況,並仍是 branchless
:::
一種方法是在編譯時期就找出使用 constant 0 傳入,定義以下巨集 (參考 [bit-field](https://hackmd.io/@sysprog/c-bitfield)):
```c
#define CHECK_NOT_ZERO(e) (sizeof(struct { int : (!!(e) -1); }))
```
另一種方式是雖然數學上 $log(x)$,$x$ 需要傳入大於 0 的數,我們另外定義當函式傳入 0 時,回傳 0。觀察原程式傳入 0 的回傳值是 1,因此我們只要在回傳時改成加 `!!x`,便可以處理參數為 0 的特例。
```c
int ceil_log2(uint32_t x)
{
uint32_t r, shift;
r--;
r = (x > 0xFFFF) << 4;
x >>= r;
shift = (x > 0xFF) << 3;
x >>= shift;
r |= shift;
shift = (x > 0xF) << 2;
x >>= shift;
r |= shift;
shift = (x > 0x3) << 1;
x >>= shift;
return (r | shift | x >> 1) + !!x;
}
```
---
## 測驗 `2`
:::success
解釋上述程式碼運作原理,並迴避 while, for, goto 等關鍵字以改寫出功能等價的實作
:::
複習〈[你所不知道的 C 語言: bitwise 操作](https://hackmd.io/@sysprog/c-bitwise)〉,改寫[第 3 週測驗題](https://hackmd.io/@sysprog/linux2022-quiz3)的測驗 `11` 裡頭的 fls 函式 (fls 意謂 “find last set”),使其得以計算 [Find first set](https://en.wikipedia.org/wiki/Find_first_set):
```c
#define BITS_PER_BYTE 8
#define BITS_PER_LONG (sizeof(unsigned long) * BITS_PER_BYTE)
#include <stddef.h>
static inline size_t ffs(unsigned long x)
{
if (x == 0)
return 0;
size_t o = 1;
unsigned long t = ~0UL;
size_t shift = BITS_PER_LONG;
shift >>= 1;
t >>= shift;
while (shift) {
if ((x & t) == 0) {
x >>= shift;
o += shift;
}
shift >>= 1;
t >>= shift;
}
return o;
}
```
上述程式碼是尋找一個 unsigned long 的資料中,第一個 set 的 bit 的位址 (由 LSB 開始從 0 開始算)。`t` 的作用是 bitmask,一開始從 16 個 bits 開始 (之後是依序為 8、4、2、1),每次檢查右半邊是否全為 0,類似於 [linux2022-quiz4#測驗-1](https://hackmd.io/@sysprog/linux2022-quiz4#測驗-1) 的作法,因此,我們可 loop unrolling,迴避 while, for, goto 等操作:
```c
#include <stddef.h>
static inline size_t ffs(unsigned long x)
{
if (x == 0)
return 0;
size_t o = 1;
// unsigned long in LP64 is 64 bits
if ((x & 0xffffffff) == 0) {
x >> 32;
o += 32;
}
if ((x & 0xffff) == 0) {
x >> 16;
o += 16;
}
if ((x & 0xff) == 0) {
x >> 8;
o += 8;
}
if ((x & 0xf) == 0) {
x >> 4;
o += 4;
}
if ((x & 0x3) == 0) {
x >> 2;
o += 2;
}
if ((x & 0x1) == 0) {
o += 1;
}
return o;
}
```
或是使用 gcc built in function `__builtin_ctzl`:
```c
#include <stddef.h>
static inline size_t ffs(unsigned long x)
{
// Avoid undefined behavior
if (x == 0)
return 0;
return __builtin_ctzl(x);
}
```
:::success
在 Linux 核心原始程式碼的 [include/asm-generic/bitops](https://github.com/torvalds/linux/tree/master/include/asm-generic/bitops) 目錄找出相關程式碼,並探討應用案例
:::
在 [include/asm-generic/bitops](https://github.com/torvalds/linux/tree/master/include/asm-generic/bitops) 有 4 個相關的實作,分別為 `ffs.h`, `builtin-ffs.h`, `__ffs.h`, `builtin-__ffs.h`,含有 `builtin` 前綴的代表使用 gcc built in function。另外,沒有底線的是 32 bits 的實作,含有底線的是有考量 unsigned long 在不同平台的資料寬度,可能為 32 或是 64 bits。
在 [linux2022-quiz2#測驗-4](https://hackmd.io/@sysprog/linux2022-quiz2#%E6%B8%AC%E9%A9%97-4) 中有介紹到 bitmap,Linux 核心原始程式碼就有許多地方用到 bitmap,像是 RT 排程器就是使用 bitmap 來表示哪一個 priority 的 queue 非空,相關程式在 [kernel/sched/sched.h](https://elixir.bootlin.com/linux/latest/source/kernel/sched/sched.h#L254):
```c
/*
* This is the priority-queue data structure of the RT scheduling class:
*/
struct rt_prio_array {
DECLARE_BITMAP(bitmap, MAX_RT_PRIO+1); /* include 1 bit for delimiter */
struct list_head queue[MAX_RT_PRIO];
};
```
我們可以用 ffs 找到最高 priority 且非空的 queue,如下:
```c
/*
* Every architecture must define this function. It's the fastest
* way of searching a 100-bit bitmap. It's guaranteed that at least
* one of the 100 bits is cleared.
*/
static inline int sched_find_first_bit(const unsigned long *b)
{
#if BITS_PER_LONG == 64
if (b[0])
return __ffs(b[0]);
return __ffs(b[1]) + 64;
#elif BITS_PER_LONG == 32
if (b[0])
return __ffs(b[0]);
if (b[1])
return __ffs(b[1]) + 32;
if (b[2])
return __ffs(b[2]) + 64;
return __ffs(b[3]) + 96;
#else
#error BITS_PER_LONG not defined
#endif
}
```
## 測驗 `3`
___
## 測驗 `4`
:::spoiler 原始程式
```c
/* Implementing coroutines with setjmp/longjmp */
#include <setjmp.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "list.h"
struct task {
jmp_buf env;
struct list_head list;
};
static LIST_HEAD(tasklist);
static void (**tasks)(void *);
static int ntasks;
static jmp_buf sched;
static void task_add(struct list_head *tasklist, jmp_buf env)
{
struct task *t = malloc(sizeof(*t));
memcpy(t->env, env, sizeof(jmp_buf));
INIT_LIST_HEAD(&t->list);
list_add_tail(&t->list, tasklist);
}
static void task_switch(struct list_head *tasklist)
{
jmp_buf env;
if (!list_empty(tasklist)) {
struct task *t = list_first_entry(tasklist, struct task, list);
list_del(&t->list);
memcpy(env, t->env, sizeof(jmp_buf));
free(t);
longjmp(env, 1);
}
}
static void task_join(struct list_head *tasklist)
{
jmp_buf env;
while (!list_empty(tasklist)) {
struct task *t = list_first_entry(tasklist, struct task, list);
list_del(&t->list);
memcpy(env, t->env, sizeof(jmp_buf));
free(t);
longjmp(env, 1);
}
}
void schedule(void)
{
static int i;
srand(0xCAFEBABE ^ (uintptr_t) &schedule); /* Thanks to ASLR */
setjmp(sched);
while (ntasks-- > 0) {
int n = rand() % 5;
tasks[i++](&n);
printf("Never reached\n");
}
task_join(&tasklist);
}
/* A task yields control n times */
void task0(void *arg)
{
jmp_buf env;
static int n;
n = *(int *) arg;
printf("Task 0: n = %d\n", n);
if (setjmp(env) == 0) {
task_add(&tasklist, env);
// Jump back to scheduler
longjmp(sched, 1);
}
for (int i = 0; i < n; i++) {
if (setjmp(env) == 0) {
task_add(&tasklist, env);
task_switch(&tasklist);
}
printf("Task 0: resume\n");
}
printf("Task 0: complete\n");
longjmp(sched, 1);
}
void task1(void *arg)
{
jmp_buf env;
static int n;
n = *(int *) arg;
printf("Task 1: n = %d\n", n);
if (setjmp(env) == 0) {
task_add(&tasklist, env);
// Jump back to scheduler
longjmp(sched, 1);
}
for (int i = 0; i < n; i++) {
if (setjmp(env) == 0) {
task_add(&tasklist, env);
task_switch(&tasklist);
}
printf("Task 1: resume\n");
}
printf("Task 1: complete\n");
longjmp(sched, 1);
}
#define ARRAY_SIZE(arr) (sizeof(arr) / sizeof((arr)[0]))
int main(void)
{
void (*registered_task[])(void *) = {task0, task1};
tasks = registered_task;
ntasks = ARRAY_SIZE(registered_task);
schedule();
return 0;
}
```
:::
<br>
:::success
解釋上述程式碼運作原理,舉出上述 [coroutine](https://en.wikipedia.org/wiki/Coroutine) 實作的限制 (注意 stack)
:::
### 運作原理
`struct task` 有兩個 member,分別為用於 `setjmp()` 的 `jmp_buf` `env` 以及用於排程的 `list`。`main` 函式中,`registered_task` 是 function pointer 陣列,其中 function 的傳入參數是 generic type void pointer。在 `schedule` 函式中,利用 [ASLR](https://en.wikipedia.org/wiki/Address_space_layout_randomization) 設定亂數種子。接著呼叫 `setjmp(sched)`,每當新的任務加進排程,會呼叫 `longjmp(sched, 1)`,讓 `schedule()` 可以繼續將新任務加進排程。
而 `task0` 和 `task1` 的結構一樣,有三大部份,第一部份根據 `schedule()` 設定的參數決定迴圈次數,將 task 加入排程後呼叫 `longjmp(sched, 1)`,讓 `schedule()` 可以繼續將新任務加進排程。當所有任務被加入排程後,`schedule()` 會呼叫 ` task_join(&tasklist)`,其中,會根據 list 的 first entry `longjmp` 回該 task 的 `setjmp` 位置:
```c
jmp_buf env;
static int n;
n = *(int *) arg;
printf("Task 0: n = %d\n", n);
if (setjmp(env) == 0) {
task_add(&tasklist, env);
longjmp(sched, 1);
}
```
第二部份是兩個 task 交互排程,每次執行一個 loop 後,呼叫 `task_add()` 重新將 task 加入 list 的尾端,並呼叫 `task_switch` 指定 list 的 first task 執行:
```c
for (int i = 0; i < n; i++) {
if (setjmp(env) == 0) {
task_add(&tasklist, env);
task_switch(&tasklist);
}
printf("Task 0: resume\n");
}
```
第三部份完成該 task,會呼叫 `longjmp(sched, 1)` 跳到 `schedule()`,接著會執行 `task_join(&tasklist)` 執行尚未執行完的 task:
```c
printf("Task 1: complete\n");
longjmp(sched, 1);
```
### 上述 coroutine 實作的限制
我根據老師的提示將 [fibdrv](https://hackmd.io/@sysprog/linux2022-fibdrv) 的 `fib_sequence()` 引入,讓兩個 task 交錯執行,一個執行奇數項,一個執行偶數項,並在 `schedule()` 中指定 `task0` 和 `task1` 的參數為 90,改完後 task 的第二部份實作如下:
```c=98
// task0
for (int i = 0; i < n; i += 2) {
if (setjmp(env) == 0) {
long long res = fib_sequence(i);
printf("fib(%d) = %lld\n", i, res);
task_add(&tasklist, env);
task_switch(&tasklist);
}
printf("Task 0: resume\n");
}
```
```c=125
// task1
for (int i = 1; i < n; i += 2) {
if (setjmp(env) == 0) {
long long res = fib_sequence(i);
printf("fib(%d) = %lld\n", i, res);
task_add(&tasklist, env);
task_switch(&tasklist);
}
printf("Task 0: resume\n");
}
```
預期上述改寫可以讓 `task0` 和 `task1` 交錯呼叫 `fib_sequence()`,並且輸出 0 ~ 89 的結果,實際結果如下:
```bash
Task 0: n = 90
Task 1: n = 90
fib(0) = 0
fib(1) = 1
Task 0: resume
fib(3) = 2
Task 1: resume
fib(5) = 5
Task 0: resume
fib(7) = 13
Task 1: resume
fib(9) = 34
Task 0: resume
fib(11) = 89
Task 1: resume
fib(13) = 233
Task 0: resume
fib(15) = 610
Task 1: resume
...
```
可以發現執行結果和預期不同,第 2, 4, 6, ... 項結果被跳過,使用 GDB 追蹤發生錯誤的地方:
```c=
0x00005555555557c9 in task0 (arg=0x555555558010 <tasklist>) at test.c:100
100 if (setjmp(env) == 0) {
(gdb) p i
$12 = 1
(gdb) p &i
$13 = (int *) 0x7fffffffdca4
(gdb) n
106 printf("Task 0: resume\n");
(gdb) n
Task 0: resume
99 for (int i = 0; i < n; i += 2) {
(gdb) n
100 if (setjmp(env) == 0) {
(gdb) p i
$14 = 3
(gdb) p &i
$15 = (int *) 0x7fffffffdca4
(gdb) n
101 long long res = fib_sequence(i);
(gdb) n
102 printf("fib(%d) = %lld\n", i, res);
(gdb) n
fib(3) = 2
103 task_add(&tasklist, env);
(gdb) n
104 task_switch(&tasklist);
(gdb) n
0x0000555555555915 in task1 (arg=0x555555558010 <tasklist>) at test.c:127
127 if (setjmp(env) == 0) {
(gdb) p i
$16 = 3
(gdb) p &i
$17 = (int *) 0x7fffffffdca4
(gdb) n
133 printf("Task 1: resume\n");
(gdb) n
Task 1: resume
126 for (int i = 1; i < n; i += 2) {
(gdb) n
127 if (setjmp(env) == 0) {
(gdb) p i
$18 = 5
(gdb) p &i
$19 = (int *) 0x7fffffffdca4
(gdb) n
128 long long res = fib_sequence(i);
(gdb) n
129 printf("fib(%d) = %lld\n", i, res);
(gdb) n
fib(5) = 5
130 task_add(&tasklist, env);
(gdb) n
131 task_switch(&tasklist);
```
其中上面 GDB 輸出結果片段的第 1 行為 `task0` 的第 2 次 for 迴圈,可以發現 longjmp 回來後 `i` 的值是 1,且位址為 `0x7fffffffdca4`,恰好與 `task1` 的 `i` 位址相同。根據 [setjmp(3) ](https://man7.org/linux/man-pages/man3/setjmp.3.html):
> The setjmp() function saves various information about the calling environment (typically, the stack pointer, the instruction pointer, possibly the values of other registers and the signal mask) in the buffer env for later use by longjmp().
這裡 `task0` 的 `i` 存取到的剛好是之前 `task1` 的 `i` 的位址,如果將 `task0()` 動一點手腳,改成:
```c
for (int i = 0, r = 0; i < n; i += 2) {
r++;
if (setjmp(env) == 0) {
long long res = fib_sequence(i);
printf("fib(%d) = %lld\n", i, res);
task_add(&tasklist, env);
task_switch(&tasklist);
}
printf("Task 0: resume\n");
}
```
可以發現行為完全不一樣,GDB 片段如下:
```c
0x000055555555577c in task0 (arg=0x555555558010 <tasklist>) at test.c:94
94 if (setjmp(env) == 0) {
(gdb)
99 for (int i = 0, r = 0; i < n; i += 2) {
(gdb)
100 r++;
(gdb) p &r
$1 = (int *) 0x7fffffffdca4
(gdb) n
101 if (setjmp(env) == 0) {
(gdb) p &i
$2 = (int *) 0x7fffffffdca0
.
.
.
0x00005555555558d9 in task1 (arg=0x555555558010 <tasklist>) at test.c:122
122 if (setjmp(env) == 0) {
(gdb) n
127 for (int i = 1; i < n; i += 2) {
(gdb) n
128 if (setjmp(env) == 0) {
(gdb) p &i
$3 = (int *) 0x7fffffffdca4
```
此時 `task1` 的 `i` 存取到的是上一次 `task0` 的 `r`,而 `task0` 的 `i` 也是存取到 `task1` 上一次 function stack 中的某一位址。我們可以這樣理解,`task0` 呼叫 `task_switch` 的 `longjmp()` 後,該 stack 被 pop 掉,但是原本存放在該 stack 空間的值沒有被更改到。`longjmp` 到 `task1` 後,只有當初 `setjmp` 保存的 `sp`、`pc` 和某一些暫存器以及 signal mask 被 restored,其餘 local variable 存取為未定義行為。
:::info
不過這裡我有地方沒搞懂,man page 中提到 `possibly the values of other registers` 會包含哪些暫存器,不確定怎麼做可以保證特定暫存器在呼叫 `setjmp()` 時也被保存起來。
:::
既然 local variable 會產生以上結果,我們可以將 `i` 改成 static data,使得 `i` 不是存在 stack 上,`longjmp` 後得以保存原來的值。
還有一種方式是將必要的資料保存在 `struct task` 中 (類似於 task control block)。缺點是這個 `struct tack` 要不宣告成 global,要不就宣告成 static data。因此,每個 task 都需要一個對應的函式,函式設計上沒辦法設計成可重入 ([Reentrancy](https://en.wikipedia.org/wiki/Reentrancy_(computing)))。
另一個嘗試同上,也是將必要的資料保存在 `struct task`,不過利用 `setjmp` 的回傳值和 `longjmp` 的參數傳遞 `struct task` 指標指向的位址 (此方法只限於用 32 位元來編譯,因為上述兩函式的參數傳遞 type 是 int):
```c
struct task {
jmp_buf env;
struct list_head list;
char task_name[10];
int n;
int i;
};
...
for (task->i = 0; task->i < task->n; task->i += 2) {
>>>> if ((tmp = setjmp(task->env)) == 0) {
long long res = fib_sequence(task->i);
printf("fib(%d) = %lld\n", task->i, res);
task_add(task);
task_switch();
}
task = (struct task *) tmp;
printf("%s: resume\n", task->task_name);
}
```
但是此方法沒有成功,會在呼叫 `setjmp()` 時出現 segfault,似乎會跳到奇怪的地方,尚未解決:
```
(gdb) bt -full
#0 0x0000005a in ?? ()
No symbol table info available.
#1 0x56556755 in task0 (arg=0x56559008 <tasklist>) at test.c:100
tmp = 1448452512
task = 0x5655a1a0
#2 0x565564f6 in schedule () at test.c:62
i = 3
#3 0x56556adf in main () at test.c:158
registered_task = {0x5655665d <task0>, 0x5655665d <task0>, 0x56556839 <task1>}
arg0 = {n = 90, task_name = 0x5655704c "Task 0"}
arg1 = {n = 90, task_name = 0x56557053 "Task 1"}
arg2 = {n = 100, task_name = 0x5655705a "Task 2"}
registered_arg = {{n = 90, task_name = 0x5655704c "Task 0"}, {n = 90, task_name = 0x56557053 "Task 1"}, {n = 100,
task_name = 0x5655705a "Task 2"}}
```
還有一種方法 (參考 FreeRTOS 的 `pxCurrentTCB`),將 `cur_task` 宣告在全域,每當重新決定新的要執行的 task 時,就更新指標指向的內容,longjmp 回去後將內容寫回 stack。以下為參考程式,有兩個 task 函式,分別執行 fib_sequence() 和輸出 0 ~ 99。並且 create 三個 task,前兩個 task 分別輸出費氏數列的偶數項和奇數項,第三個 task 負責執行 `task1()`。
:::spoiler 參考程式碼
```c
#include <setjmp.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "list.h"
struct task {
jmp_buf env;
struct list_head list;
char task_name[10];
int n;
int i;
};
struct arg {
int n;
int i;
char *task_name;
};
static LIST_HEAD(tasklist);
static void (**tasks)(void *);
static struct arg *args;
static int ntasks;
static jmp_buf sched;
static struct task *cur_task;
static void task_add(struct task *task)
{
list_add_tail(&task->list, &tasklist);
}
static void task_switch()
{
if (!list_empty(&tasklist)) {
struct task *t = list_first_entry(&tasklist, struct task, list);
list_del(&t->list);
cur_task = t;
longjmp(t->env, 1);
}
}
static void task_join()
{
while (!list_empty(&tasklist)) {
struct task *t = list_first_entry(&tasklist, struct task, list);
list_del(&t->list);
cur_task = t;
longjmp(t->env, 1);
}
}
void schedule(void)
{
static int i;
setjmp(sched);
while (ntasks-- > 0) {
struct arg arg = args[i];
tasks[i++](&arg);
printf("Never reached\n");
}
task_join(&tasklist);
}
static long long fib_sequence(long long k)
{
/* FIXME: use clz/ctz and fast algorithms to speed up */
long long f[k + 2];
f[0] = 0;
f[1] = 1;
for (int i = 2; i <= k; i++) {
f[i] = f[i - 1] + f[i - 2];
}
return f[k];
}
/* A task yields control n times */
void task0(void *arg)
{
struct task *task = malloc(sizeof(struct task));
strcpy(task->task_name, ((struct arg *) arg)->task_name);
task->n = ((struct arg *) arg)->n;
task->i = ((struct arg *) arg)->i;
INIT_LIST_HEAD(&task->list);
printf("%s: n = %d\n", task->task_name, task->n);
if (setjmp(task->env) == 0) {
task_add(task);
longjmp(sched, 1);
}
task = cur_task;
for (; task->i < task->n; task->i += 2) {
if (setjmp(task->env) == 0) {
long long res = fib_sequence(task->i);
printf("%s fib(%d) = %lld\n", task->task_name, task->i, res);
task_add(task);
task_switch();
}
task = cur_task;
printf("%s: resume\n", task->task_name);
}
printf("%s: complete\n", task->task_name);
longjmp(sched, 1);
}
void task1(void *arg)
{
struct task *task = malloc(sizeof(struct task));
strcpy(task->task_name, ((struct arg *) arg)->task_name);
task->n = ((struct arg *) arg)->n;
task->i = ((struct arg *) arg)->i;
INIT_LIST_HEAD(&task->list);
printf("%s: n = %d\n", task->task_name, task->n);
if (setjmp(task->env) == 0) {
task_add(task);
longjmp(sched, 1);
}
task = cur_task;
for (; task->i < task->n; task->i++) {
if (setjmp(task->env) == 0) {
printf("%s %d\n", task->task_name, task->i);
task_add(task);
task_switch();
}
task = cur_task;
printf("%s: resume\n", task->task_name);
}
printf("%s: complete\n", task->task_name);
longjmp(sched, 1);
}
#define ARRAY_SIZE(arr) (sizeof(arr) / sizeof((arr)[0]))
int main(void)
{
void (*registered_task[])(void *) = {task0, task0, task1};
struct arg arg0 = {.n = 90, .i = 0, .task_name = "Task 0"};
struct arg arg1 = {.n = 90, .i = 1, .task_name = "Task 1"};
struct arg arg2 = {.n = 100, .i = 0, .task_name = "Task 2"};
struct arg registered_arg[] = {arg0, arg1, arg2};
tasks = registered_task;
args = registered_arg;
ntasks = ARRAY_SIZE(registered_task);
schedule();
return 0;
}
```
:::
<br>
:::success
對比 [concurrent-programs](https://github.com/sysprog21/concurrent-programs) 裡頭的 coroutine 程式碼,如 [tinync](https://github.com/sysprog21/concurrent-programs/tree/master/tinync) 和 [preempt_sched](https://github.com/sysprog21/concurrent-programs/tree/master/preempt_sched),嘗試引入 UNIX Signal 來做到 preemptive scheduling
:::
研讀 [preempt_sched](https://github.com/sysprog21/concurrent-programs/tree/master/preempt_sched) 的實作方式,先嘗試將上述程式碼改為 signal trigger (由 caller raise `SIGSEGV` signal) 的方式,方便使用 GDB 追蹤程式。
:::spoiler 參考程式碼
```c
/* task_sched: preemptive multitasking in userspace based on SIGALRM signal.
*
* This program starts 3 sorting routines, execution of each is preempted by
* SIGALRM signal, simulating an OS timer interrupt. Each routine is an
* execution context, which can do a voluntary scheduling (calling schedule()
* directly) or be preempted by a timer, and in that case nonvoluntary
* scheduling occurs.
*
* The default time slice is 10ms, that means that each 10ms SIGALRM fires and
* next context is scheduled by round robin algorithm.
*/
#define _GNU_SOURCE
#include <signal.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <ucontext.h>
#include <unistd.h>
#include "list.h"
static void local_irq_save(sigset_t *sig_set)
{
sigset_t block_set;
sigfillset(&block_set);
sigdelset(&block_set, SIGINT);
sigprocmask(SIG_BLOCK, &block_set, sig_set);
}
static void local_irq_restore(sigset_t *sig_set)
{
sigprocmask(SIG_SETMASK, sig_set, NULL);
}
typedef void(task_callback_t)(void *arg);
struct task_struct {
struct list_head list;
ucontext_t context;
void *stack;
task_callback_t *callback;
void *arg;
bool reap_self;
};
static struct task_struct *task_current, task_main;
static LIST_HEAD(task_reap);
static void task_init(void)
{
INIT_LIST_HEAD(&task_main.list);
task_current = &task_main;
}
static struct task_struct *task_alloc(task_callback_t *func, void *arg)
{
struct task_struct *task = calloc(1, sizeof(*task));
task->stack = calloc(1, 1 << 20);
task->callback = func;
task->arg = arg;
return task;
}
static void task_destroy(struct task_struct *task)
{
list_del(&task->list);
free(task->stack);
free(task);
}
static void task_switch_to(struct task_struct *from, struct task_struct *to)
{
task_current = to;
swapcontext(&from->context, &to->context);
}
static void schedule(void)
{
sigset_t set;
local_irq_save(&set);
struct task_struct *next_task =
list_first_entry(&task_current->list, struct task_struct, list);
if (next_task) {
if (task_current->reap_self)
list_move(&task_current->list, &task_reap);
task_switch_to(task_current, next_task);
}
struct task_struct *task, *tmp;
list_for_each_entry_safe (task, tmp, &task_reap, list) /* clean reaps */
task_destroy(task);
local_irq_restore(&set);
}
union task_ptr {
void *p;
int i[2];
};
static void local_irq_restore_trampoline(struct task_struct *task)
{
sigdelset(&task->context.uc_sigmask, SIGSEGV);
local_irq_restore(&task->context.uc_sigmask);
}
__attribute__((noreturn)) static void task_trampoline(int i0, int i1)
{
union task_ptr ptr = {.i = {i0, i1}};
struct task_struct *task = ptr.p;
/* We switch to trampoline with blocked timer. That is safe.
* So the first thing that we have to do is to unblock timer signal.
* Paired with task_add().
*/
local_irq_restore_trampoline(task);
task->callback(task->arg);
task->reap_self = true;
schedule();
__builtin_unreachable(); /* shall not reach here */
}
static void task_add(task_callback_t *func, void *param)
{
struct task_struct *task = task_alloc(func, param);
if (getcontext(&task->context) == -1)
abort();
task->context.uc_stack.ss_sp = task->stack;
task->context.uc_stack.ss_size = 1 << 20;
task->context.uc_stack.ss_flags = 0;
task->context.uc_link = NULL;
union task_ptr ptr = {.p = task};
makecontext(&task->context, (void (*)(void)) task_trampoline, 2, ptr.i[0],
ptr.i[1]);
/* When we switch to it for the first time, timer signal must be blocked.
* Paired with task_trampoline().
*/
sigaddset(&task->context.uc_sigmask, SIGSEGV);
list_add_tail(&task->list, &task_main.list);
}
static void signal_handler(int signo, siginfo_t *info, ucontext_t *ctx)
{
schedule();
}
static void signal_init(void)
{
struct sigaction sa = {.sa_handler = (void (*)(int)) signal_handler,
.sa_flags = SA_SIGINFO};
sigfillset(&sa.sa_mask);
sigaction(SIGSEGV, &sa, NULL);
}
static long long fib_sequence(long long k)
{
/* FIXME: use clz/ctz and fast algorithms to speed up */
long long f[k + 2];
f[0] = 0;
f[1] = 1;
for (int i = 2; i <= k; i++) {
f[i] = f[i - 1] + f[i - 2];
}
return f[k];
}
/* A task yields control n times */
struct arg {
int n;
int i;
char *task_name;
};
static void task0(void *arg)
{
char *name = ((struct arg *) arg)->task_name;
int i = ((struct arg *) arg)->i;
int n = ((struct arg *) arg)->n;
printf("%s: n = %d\n", name, n);
for (; i < n; i += 2) {
long long res = fib_sequence(i);
printf("%s fib(%d) = %lld\n", name, i, res);
raise(SIGSEGV);
}
printf("%s: complete\n", name);
}
static void task1(void *arg)
{
char *name = ((struct arg *) arg)->task_name;
int i = ((struct arg *) arg)->i;
int n = ((struct arg *) arg)->n;
printf("%s: n = %d\n", name, n);
for (; i < n; i++) {
printf("%s %d\n", name, i);
usleep(1);
raise(SIGSEGV);
}
printf("%s: complete\n", name);
}
int main()
{
signal_init();
task_init();
struct arg arg0 = {.n = 90, .i = 0, .task_name = "Task 0"};
struct arg arg1 = {.n = 90, .i = 1, .task_name = "Task 1"};
struct arg arg2 = {.n = 100, .i = 0, .task_name = "Task 2"};
task_add(task0, (void *) &arg0);
task_add(task0, (void *) &arg1);
task_add(task1, (void *) &arg2);
while (!list_empty(&task_main.list) || !list_empty(&task_reap)) {
raise(SIGSEGV);
}
return 0;
}
```
:::
不過使用 GDB 追蹤程式會遇到一個問題,就是沒辦法單步執行到 task 所在的 context,以下擷取 GDB 片段,當執行到 main 函式的 `raise(SIGSEGV)` 時,會跳至之前註冊的 signal handler (即 `timer_handler()`):
```c
...
279 raise(SIGSEGV);
(gdb) n
Program received signal SIGSEGV, Segmentation fault.
__GI_raise (sig=<optimised out>) at ../sysdeps/unix/sysv/linux/raise.c:50
50 ../sysdeps/unix/sysv/linux/raise.c: No such file or directory.
(gdb) n
timer_handler (signo=0, info=0x5555555545f1, ctx=0x199260000) at test2.c:170
170 {
(gdb) bt
#0 timer_handler (signo=0, info=0x5555555545f1, ctx=0x199260000) at test2.c:170
#1 <signal handler called>
#2 __GI_raise (sig=<optimised out>) at ../sysdeps/unix/sysv/linux/raise.c:50
#3 0x0000555555555e62 in main () at test2.c:279
(gdb) info frame
Stack level 0, frame at 0x7fffffffd580:
rip = 0x555555555964 in timer_handler (test2.c:170); saved rip = 0x7ffff7e020c0
called by frame at 0x7fffffffdc40
source language c.
Arglist at 0x7fffffffd570, args: signo=0, info=0x5555555545f1, ctx=0x199260000
Locals at 0x7fffffffd570, Previous frame's sp is 0x7fffffffd580
Saved registers:
rip at 0x7fffffffd578
...
```
繼續執行,會發現 info frame 是由以下四個 frame 輪流出現 (0x7fffffffd580, 0x7ffff7dbd7c0, 0x7ffff7cbc7c0, 0x7ffff7bbb7c0)。且根據 [Frame-Info](https://sourceware.org/gdb/onlinedocs/gdb/Frame-Info.html) 的描述,info frame 命令缺少 `caller of this frame` 的資訊 (這是 callback function,caller 不在此程式中,GDB 沒有相關資訊)。至於為什是四個 frame,推測是 main 函式加上三個 task 的 signal handler 部份。上述 GDB 片段屬於 main 函式的,若是其餘三個 task 的 backtrace 結果如下:
```c
(gdb) info frame
Stack level 0, frame at 0x7ffff7cbc7c0:
rip = 0x555555555964 in timer_handler (test2.c:170); saved rip = 0x7ffff7e020c0
called by frame at 0x7ffff7cbce60
source language c.
Arglist at 0x7ffff7cbc7b0, args: signo=11, info=0x7ffff7cbc8f0, ctx=0x7ffff7cbc7c0
Locals at 0x7ffff7cbc7b0, Previous frame's sp is 0x7ffff7cbc7c0
Saved registers:
rip at 0x7ffff7cbc7b8
(gdb) bt
#0 timer_handler (signo=11, info=0x7ffff7cbc8f0, ctx=0x7ffff7cbc7c0) at test2.c:170
#1 <signal handler called>
#2 __GI_raise (sig=<optimised out>) at ../sysdeps/unix/sysv/linux/raise.c:50
#3 0x0000555555555cc6 in task0 (arg=0x7fffffffdd70) at test2.c:238
#4 0x000055555555584a in task_trampoline (i0=1431672480, i1=21845) at test2.c:137
#5 0x00007ffff7e1a510 in ?? () at ../sysdeps/unix/sysv/linux/x86_64/__start_context.S:91 from /lib/x86_64-linux-gnu/libc.so.6
#6 0x0000000000000000 in ?? ()
...
(gdb) info frame
Stack level 0, frame at 0x7ffff7bbb7c0:
rip = 0x555555555964 in timer_handler (test2.c:170); saved rip = 0x7ffff7e020c0
called by frame at 0x7ffff7bbbe70
source language c.
Arglist at 0x7ffff7bbb7b0, args: signo=11, info=0x7ffff7bbb8f0, ctx=0x7ffff7bbb7c0
Locals at 0x7ffff7bbb7b0, Previous frame's sp is 0x7ffff7bbb7c0
Saved registers:
rip at 0x7ffff7bbb7b8
(gdb) bt
#0 timer_handler (signo=11, info=0x7ffff7bbb8f0, ctx=0x7ffff7bbb7c0) at test2.c:170
#1 <signal handler called>
#2 __GI_raise (sig=<optimised out>) at ../sysdeps/unix/sysv/linux/raise.c:50
#3 0x0000555555555d7c in task1 (arg=0x7fffffffdd80) at test2.c:255
#4 0x000055555555584a in task_trampoline (i0=1431673504, i1=21845) at test2.c:137
#5 0x00007ffff7e1a510 in ?? () at ../sysdeps/unix/sysv/linux/x86_64/__start_context.S:91 from /lib/x86_64-linux-gnu/libc.so.6
#6 0x0000000000000000 in ?? ()
...
(gdb) info frame
Stack level 0, frame at 0x7ffff7dbd7c0:
rip = 0x555555555964 in timer_handler (test2.c:170); saved rip = 0x7ffff7e020c0
called by frame at 0x7ffff7dbde60
source language c.
Arglist at 0x7ffff7dbd7b0, args: signo=11, info=0x7ffff7dbd8f0, ctx=0x7ffff7dbd7c0
Locals at 0x7ffff7dbd7b0, Previous frame's sp is 0x7ffff7dbd7c0
Saved registers:
rip at 0x7ffff7dbd7b8
(gdb) bt
#0 timer_handler (signo=11, info=0x7ffff7dbd8f0, ctx=0x7ffff7dbd7c0) at test2.c:170
#1 <signal handler called>
#2 __GI_raise (sig=<optimised out>) at ../sysdeps/unix/sysv/linux/raise.c:50
#3 0x0000555555555cc6 in task0 (arg=0x7fffffffdd60) at test2.c:238
#4 0x000055555555584a in task_trampoline (i0=1431671456, i1=21845) at test2.c:137
#5 0x00007ffff7e1a510 in ?? () at ../sysdeps/unix/sysv/linux/x86_64/__start_context.S:91 from /lib/x86_64-linux-gnu/libc.so.6
#6 0x0000000000000000 in ?? ()
```
如預期的,GDB 的輸出結果會對應到三個不同的 task,概念上是 main 函式和三個 task 在執行時,觸發 signal,跳到 signal handler,而 handler 會呼叫 `schedule()` 將當前 context 保存並切換至下個欲執行的 context,形成四個不同的 context 輪流切換。
:::info
這裡不清楚 GDB 有無辦法 step into 或是 step over 到 task 函式內部,每次呼叫 `task_switch_to()` 之後都會直接跳到下一個 task 接收到 signal 之後:
```
106 task_switch_to(task_current, next_task);
(gdb) n
Program received signal SIGSEGV, Segmentation fault.
__GI_raise (sig=<optimised out>) at ../sysdeps/unix/sysv/linux/raise.c:50
50 ../sysdeps/unix/sysv/linux/raise.c: No such file or directory.
(gdb) n
timer_handler (signo=11, info=0x7fffffffd6b0, ctx=0x7fffffffd580) at test2.c:170
170 {
```
:::
若要將程式碼改成 preemptive scheduling,需要借助 timer。參考 [alarm(2)](https://man7.org/linux/man-pages/man2/alarm.2.html) 以及 [ualarm(3)](https://man7.org/linux/man-pages/man3/ualarm.3.html),使得 timer 在指定時間間隔以後送出 `SIGALRM` signal。若沒有進行訊號處理,預設是會使得程式中止,我們需要將上述程式碼 `SIGSEGV` 改為 `SIGALRM`,做對應的訊號處理。
**[preempt_sched](https://github.com/sysprog21/concurrent-programs/tree/master/preempt_sched) 實作缺陷**
* time slice 過小會發生錯誤。
:::success
對比〈[A brief history of the Linux Kernel’s process scheduler: The very first scheduler, v0.01](https://dev.to/satorutakeuchi/a-brief-history-of-the-linux-kernel-s-process-scheduler-the-very-first-scheduler-v0-01-9e4)〉,解說早期 Linux 核心 CPU 排程器的行為,並利用上述程式碼來模擬
:::
### The very first scheduler
在 linux 核心早期 (v0.01~v2.4.x) 在這篇文章中稱作 "The very first scheduler",只使用了 20 行左右的程式碼。在 v0.01 中,所有 task 用一個陣列表示,同時作為 run queue (這裡 run queue 應該是指陣列中所有可執行的 task 的集合) 使用,陣列長度為 64 (即最大容許的 task 數量為 64)。若陣列的某個 entry 為空,則表示成 NULL。
排程器的 time slice 為 150 ms,而 interval timer 每 10 ms 會發出一次中斷,對應的 handler function 會減少目前執行的 task `Current` 的 time slice,當 time slice 歸零時,排程器會從 run queue 中找出下一個 task。
在後續的版本,time slice 和 timer 的 interval 都有做一些調整,這篇文章不會深入討論。
### The scheduling algorithm
以下是 Linux v0.01 的排程器策略:
1. 從 run queue 中以倒序找出第一個可執行且剩餘 time slice 不為 0 的 task (這邊原文描述有錯誤,應修正為找出 run queue 中剩餘 time slice 最大且狀態為 runnable 的 task)。
2. 如果所有可執行的 task 的 time slice 歸零,則 reset 所有 task 的 time slice。這裡排程器會給所有 task 150 ms 的 time slice,並且額外給 sleeping task 一半的 current time slice (後者是為了要讓剛被喚醒的 task 更早被排程,增加 interactivity)。
假設某一時間陣列的狀態如下,time slice 的單位是 10 ms:
![](https://i.imgur.com/8wJMUsQ.png)
排程器從 run queue 中以倒序找出剩餘 time slice 最大的 task,挑選出 `t1`:
![](https://i.imgur.com/DZqZ5VF.png)
`t1` 一直執行直到 time slice 用完:
![](https://i.imgur.com/kKznmZ6.png)
同理,排程器從 run queue 中以倒序找出剩餘 time slice 最大的 task。如果 run queue 中有多個 tasks 同時擁有最大的剩餘 time slice,則選取第一個找到的 task,因此挑選出 `t2`:
![](https://i.imgur.com/mpEwKgp.png)
最後,所有可執行的 task 都用盡自己的 time slice:
![](https://i.imgur.com/BZKCgjS.png)
則 reset 所有 task 的 time slice,runnable 的 task 增加 150 ms 的 time slice,sleep 的 task 增加 150 ms 加上原本剩餘 time slice 一半的 time slice (`t4` 為 sleep,reset 後的 time slice 為 150 + 120 / 2 = 210):
![](https://i.imgur.com/rwrbdFG.png)
相關程式碼可以在 [v0.01/include/linux/sched.h](https://kernel.googlesource.com/pub/scm/linux/kernel/git/nico/archive/+/v0.01/include/linux/sched.h) 以及 [v0.01/kernel/sched.c](https://kernel.googlesource.com/pub/scm/linux/kernel/git/nico/archive/+/v0.01/kernel/sched.c) 找到:
```c
// include/linux/sched.h
#define NR_TASKS 64
#define FIRST_TASK task[0]
#define LAST_TASK task[NR_TASKS-1]
```
```c=
// kernel/sched.c
/*
* 'schedule()' is the scheduler function. This is GOOD CODE! There
* probably won't be any reason to change this, as it should work well
* in all circumstances (ie gives IO-bound processes good response etc).
* The one thing you might take a look at is the signal-handler code here.
*
* NOTE!! Task 0 is the 'idle' task, which gets called when no other
* tasks can run. It can not be killed, and it cannot sleep. The 'state'
* information in task[0] is never used.
*/
void schedule(void)
{
int i, next, c;
struct task_struct ** p;
/* check alarm, wake up any interruptible tasks that have got a signal */
for(p = &LAST_TASK ; p > &FIRST_TASK ; --p)
if (*p) {
if ((*p)->alarm && (*p)->alarm < jiffies) {
(*p)->signal |= (1<<(SIGALRM-1));
(*p)->alarm = 0;
}
if ((*p)->signal && (*p)->state==TASK_INTERRUPTIBLE)
(*p)->state=TASK_RUNNING;
}
/* this is the scheduler proper: */
while (1) {
c = -1;
next = 0;
i = NR_TASKS;
p = &task[NR_TASKS];
while (--i) {
if (!*--p)
continue;
if ((*p)->state == TASK_RUNNING && (*p)->counter > c)
c = (*p)->counter, next = i;
}
if (c)
break;
for(p = &LAST_TASK ; p > &FIRST_TASK ; --p)
if (*p)
(*p)->counter = ((*p)->counter >> 1) + (*p)->priority;
}
switch_to(next);
}
```
`task` 是一個指向 `struct task_struct` 的指標的陣列,大小為 64,`FIRST_TASK` 和 `LAST_TASK` 巨集分別代表陣列的頭和尾。
程式碼第 17 ~ 25 行將所有 interruptible 並且有收到 signal 的 task 喚醒,從 waiting 狀態變成 running 的狀態 (注意,這裡的 running 狀態不代表執行的狀態,而是 runnable 的狀態,即 ready to run)。
:::info
這裡不清楚 19 行的 `jiffies` 在這裡的用途 (`(*p)->alarm < jiffies`)。`jiffies` 直覺想到是跟 timer 有關,如果 grep 原始程式碼,會發現 `jiffies` 的修改是在 [kernel/system_call.s:158](https://kernel.googlesource.com/pub/scm/linux/kernel/git/nico/archive/+/v0.01/kernel/system_call.s)。該檔案除了 system-call low-level handling routines,也包括 timer-interrupt handler。其中有使用到 `incl _jiffies` 指令,用來計算 timer interrupt 發生的次數。
:::
程式碼第 27 行以後是排程器本身。程式碼第 32 ~ 37 對應到上面排程器策略的第 1 點,以倒序找出 run queue 中剩餘 time slice 最大的 task,並用 `next` 代表該 task 的編號 (慣用 nr)。如果所有 runnable 的 task 的 time slice 用光,會執行第 40 ~ 42 行的程式,對應到排程器策略的第 2 點。這裡是增加 `(*p)->priority` 的 time slice,似乎不是固定 150 ms time slice,而是可以做調整,參考以下程式碼:
```c
int sys_nice(long increment)
{
if (current->priority-increment>0)
current->priority -= increment;
return 0;
}
```
### 利用上述程式碼來模擬
參考 [v0.01/include/linux/sched.h](https://kernel.googlesource.com/pub/scm/linux/kernel/git/nico/archive/+/v0.01/include/linux/sched.h) 以及 [v0.01/kernel/sched.c](https://kernel.googlesource.com/pub/scm/linux/kernel/git/nico/archive/+/v0.01/kernel/sched.c)
## Reference
[Blocking Signals](https://www.gnu.org/software/libc/manual/html_node/Blocking-Signals.html)