# 2024q1 quiz8
contribute by `SimonLee0316`
## 測驗1
[題目](https://hackmd.io/@sysprog/linux2024-quiz8)
[task](https://gist.github.com/jserv/745dab1d9be33a9a635b193aeb53191e) 是個運用 coroutine 實作的任務排程器,允許搭配 POSIX Thread 使用,使用 C11 Atomics 和 x86-64 組合語言開發。
## test_task_start
```c
void test_task_start(void)
{
/*...*/
quick_start(co_root_entry, co_cleanup, &(int){99999999});
/*...*/
}
```
執行 co_root_entry 了,在裡面會不斷的新建 child coroutine。
```c
void co_child_entry(void *udata)
{
int udid = *(int *) udata;
assert(udid == nudid);
}
void co_root_entry(void *udata)
{
assert(*(int *) udata == 99999999);
assert(*(int *) task_udata() == 99999999);
assert(task_info_running() == 1);
for (int i = 0; i < NCHILDREN; i++) {
printf("start child %d\n",i);
assert(started == 1 + i);
assert(cleaned == 1 + i - 1);
nudid = i;
quick_start(co_child_entry, co_cleanup, &(int){i});
}
}
```
運行順序如下。
```
test_task_start
start co_root_entry
start child 0
start co_child_entry udid: 0
start child 1
start co_child_entry udid: 1
start child 2
...
start child 99
start co_child_entry udid: 99
```
總結來說,`do_test(test_task_start)`,會建立多個 coroutine 。
## test_task_sleep
透過 quick_start 呼叫 co_sleep 函式執行,會在呼叫 task_sleep。
```c
void co_sleep(void *udata)
{
assert(*(int *) udata == 99999999);
task_sleep(1e8); /* 100ms CPU-based sleep */
}
static void task_sleep(int64_t nanosecs)
{
int64_t start = getnow();
while (getnow() - start < nanosecs)
task_yield();
}
```
task_sleep 中的 task_yeild 功能主要是將當前 coroutine 放入 task_yielders 的串列當中,這可以表示成主動放棄 cpu 的 coroutine ,可以想成在 sleep 的狀態並等待排程,將自己放入 task_yielders 串列的末端後,在呼叫task_switch 呼叫在 task_nrunners 排隊的 coroutine 執行。
這裡會不斷的將 coroutine 加入 task_yielders 直到 100ms 時間到。
## test_task_pause
建立100個 `co_pause_one` coroutine 還有 一個 `co_resume_all` coroutine ,並輪流執行。
```c
void test_task_pause(void)
{
reset_stats();
for (int i = 0; i < NCHILDREN; i++) {
quick_start(co_pause_one, co_cleanup, &i);
}
quick_start(co_resume_all, co_cleanup, 0);
while (task_active()) {
task_resume(0);
}
assert(npaused == 0);
}
```
* co_pause_one
建立100個的 `co_pause_one` coroutine,每次都會做一次順序暫停一次逆序暫停,再做一次順序再做一次逆序。
順序暫停的實做如下
```c
void co_pause_one(void *udata)
{
int index = *(int *) udata;
assert(index == npaused);
paused[index] = task_id();
/* pause in order */
npaused++;
is_paused[index] = true;
task_pause();
//return from pause
is_paused[index] = false;
npaused--;
while (!all_resumed)
task_yield();
}
```
將目前被暫停的 coroutine 標記為 true ,並做暫停切換到其他 coroutine (包含 test_task_pause),從暫停回來之後將標記改回 false ,如果還有人在被暫停的話,那就將自己排到`task_yielders` ,等待所有人都恢復。
程式碼是如何做到逆序暫停?
答: 將 id 較小的 coroutine sleep 久一點 這樣 id 較大的 coroutine 就會比較先被暫停。
```c
void co_pause_one(void *udata)
{
/*...*/
/* pause in reverse */
task_sleep(1e6 * (NCHILDREN - index));
npaused++;
is_paused[index] = true;
task_pause();
is_paused[index] = false;
npaused--;
while (!all_resumed) {
task_yield();
}
}
```
* co_resume_all
有 100 個 `co_pause_one` coroutine 和 1 個 `co_resume_all` 依序交換執行之下,在 `co_pause_one` 裡面每一個逆/順序暫停中,最後都會檢查是否所有人都被恢復執行了才會繼續往下一個逆/順序暫停,而負責恢復執行的 coroutine 就是 `co_resume_all` ,在程式碼中可以看到,在暫停的時候的順序是 1. 順序暫停 2. 逆序暫停 3. 順序暫停 4. 逆序暫停,而恢復執行的順序卻是 1. 順序恢復 2. 順序恢復 3. 順序恢復 4. 逆序恢復 ,這種情況會使就算 coroutine 是早暫停的將會晚恢復執行。
```c
void co_resume_all(void *udata)
{
(void) udata;
/* wait for all to be paused */
while (npaused < NCHILDREN) {
task_yield();
}
all_resumed = false;
/* resume in order */
for (int i = 0; i < NCHILDREN; i++) {
task_resume(paused[i]);
}
while (npaused > 0) {
task_yield();
}
all_resumed = true;
/* resume in order */
/* resume in reverse order */
/* resume in reverse order */
/*...*/
}
```
注意 `task_yield` 是將自己插入 `task_yielders` 末端,這樣的操作是可以再度被執行的。
但是如果使用 `task_pause` 的話是將自己插入到 `task_paused` 是一個 `task_map` 的結構末端, 也就是直到它被 resume 之前都不會被返回 `task_yielders`。
## test_task_exit
程式碼運作流程:
exitvals[] : 用來紀錄執行順序。
1. 啟動 `co_one` coroutine
2. `co_one` 紀錄 1,並啟動 `co_two`
3. `co_two` 將自己放入 `task_yielders` ,task_sleep(1e7 * 2),它會比 `co_three` 晚被恢復執行
4. 回到 `co_one` ,啟動 `co_three`
5. `co_three` 將自己放入 `task_yielders` ,task_sleep(1e7)
6. 回到 `co_one` ,啟動 `co_four`
7. `co_four` 紀錄 4 將自己放入 `task_yielders`
8. `co_one` 呼叫 `task_exit`,回到 `test_task_exit`
9. `test_task_exit` 確認是否還有 coroutine 沒有執行完,如果有就去執行它
```c
void test_task_exit(void)
{
/*...*/
while (task_active()) {
task_resume(0);
}
/*...*/
}
```
10. 恢復執行 `co_three` ,紀錄 3
11. 恢復執行 `co_two` ,紀錄 2
12. test_task_exit 紀錄-2,表示所有任務完成。
至此 exitvals 內的值為 [1,4,-1,3,2,-2]
## test_task_order
測試 coroutine 執行順序。
`quickstart(co_yield)` -> `task_start` -> `coro_start` -> `coro_switch0` -> `coro_switch1` -> `task_entry`
```c
static void task_entry(void *udata)
{
/* Initialize a new coroutine on the user's stack. */
/*...*/
if (task_cur) {
/* Reschedule the coroutine that started this one */
task_list_push_back(&task_yielders, co);
task_list_push_back(&task_yielders, task_cur);
task_nyielders += 2;
task_switch(false, false);
}
task_cur = co;
if (task_user_entry) {
task_user_entry(udata);
}
/* This coroutine is finished. Switch to the next coroutine. */
task_switch(false, true);
}
```
1. 啟動 co_yield 是第一個 coroutine 所以沒有 task_cur,接著執行 co_yield
task_cur = co_yield
task_nrunners = {} , task_nyielders = {}
a = [A]
quick_start(co_yield1) -> ... ->`task_entry`
2. task_cur 是 co_yield,所以將 co_yeld1 和 co_yeld0 插入 task_nyielders
task_nrunners = {} , task_nyielders = { co_yield1 -> co_yield }
task_cur = co_yield1
呼叫 `task_switch()`
3. `task_switch()` 現在沒有 runners ,所以將 task_nyielders 全部搬到 runners
task_cur = co_yield1
task_nrunners = { co_yield1 -> co_yield } , task_nyielders = {}
並取第一個 runners switch 過去執行,所以是 co_yield1。
4. 啟動 co_yeild1
a = [AB]
調用 `task_yield`,將自己插到 task_nyielders。
task_cur = co_yield1
task_nrunners = { co_yield } , task_nyielders = {co_yield1}
呼叫 `task_switch()`。
5. 從runners 取第一個 task = co_yield
task_nrunners = {} , task_nyielders = {co_yield1}
a = [ABC]
quick_start(co_yield2) -> ... -> `task_entry`
6. task_cur 是 co_yield,所以將 co_yeld2 和 co_yield 插入 task_nyielders
task_nrunners = {} , task_nyielders = { co_yield1 -> co_yield2 -> co_yield }
呼叫 `task_switch()`
7. 沒有 runners 所以將 task_nyielders 全部搬近來
task_nrunners = { co_yield1 -> co_yield2 -> co_yield } , task_nyielders = {}
task_cur = co_yield1
取第一個 runner 執行 = co_yield1
8. co_yield1
a = [ABCD]
執行結束回到 `task_entry` ,並呼叫 task_switch
task_cur = co_yield1
task_nrunners = { co_yield2 -> co_yield } , task_nyielders = {}
並取第一個 runners switch 過去執行,所以是 co_yield2。
task_cur = co_yield2
9. co_yield2
a = [ABCDE]
調用 `task_yield`,將自己插到 task_nyielders。
task_cur = co_yield2
task_nrunners = { co_yield } , task_nyielders = { co_yield2 }
並取第一個 runners switch 過去執行,所以是 co_yield。
task_cur = co_yield
10. co_yield
a = [ABCDEF]
調用 `task_yield`,將自己插到 task_nyielders。
task_cur = co_yield
task_nrunners = {} , task_nyielders = { co_yield2 -> co_yield }
沒有 runners 所以將 task_nyielders 全部搬近來
task_nrunners = { co_yield2 -> co_yield } , task_nyielders = {}
並取第一個 runners switch 過去執行,所以是 co_yield。
task_cur = co_yield2
11. co_yield2
a = [ABCDEFG]
執行結束回到 `task_entry` ,並呼叫 task_switch
task_nrunners = { co_yield } , task_nyielders = {}
並取第一個 runners switch 過去執行,所以是 co_yield。
12. co_yield
a = [ABCDEFGH]
執行結束回到 `task_entry` ,並呼叫 task_switch
task_nrunners = {} , task_nyielders = {}
結束回到 `test_task_order` 。
## test_task_detach
建立兩個 pthread 分別執行 `threda0` 和 `thread1`
執行後透過 `pthread_join` 等待 pthread 完成並回到 main。
`threda0` 程式碼分析:
啟動 100 個 coroutine ,並當被暫停的 coroutine 數量為 100 的時後(意思就是 task_paused 串列數量為 100),就開始將 task_paused 的所有 coroutine 搬到 task_detached 串列上。
```c
void *thread0(void *arg)
{
(void) arg;
reset_stats();
for (int i = 0; i < NCHILDREN; i++) {
quick_start(co_thread_one, co_cleanup, &i);
}
while (task_active()) {
if (task_info_paused() == NCHILDREN) {
/* Detach all paused */
for (int i = 0; i < NCHILDREN; i++) {
task_detach(thpaused[i]);
}
}
task_resume(0);
}
return NULL;
}
```
`thread1` 程式碼分析:
如果 所有 coroutine 還沒都被 detached 的會就會卡在迴圈裡面。
離開迴圈後將所有 coroutine 從 task_detached 串列移回到 task_paused 串列,並把他們重新加回去 task_yielders 讓他們可以繼續被執行。
```c
void *thread1(void *arg)
{
(void) arg;
reset_stats();
while (task_info_detached() < NCHILDREN) {
/* Wait */
}
/* Attach all paused */
for (int i = 0; i < NCHILDREN; i++) {
task_attach(thpaused[i]);
task_resume(thpaused[i]);
}
while (task_active()) {
task_resume(0);
}
return NULL;
}
```
`co_thread_one` 程式碼分析:
將自己加到 task_yielders 串列直到 時間到。
在將自己放到 task_paused 串列。
```c
void co_thread_one(void *udata)
{
int index = *(int *) udata;
int64_t id = task_id();
thpaused[index] = id;
task_sleep(1e6);
task_pause();
}
```
`task_detach` 程式碼分析:
給定 要 detach 的 coroutine id ,找到並加到 task_detached 結構。
```c
void task_detach(int64_t id)
{
struct task *co = task_map_delete(&task_paused, &(struct task){.id = id});
if (co) {
task_npaused--;
task_lock();
task_map_insert(&task_detached, co);
task_ndetached++;
task_unlock();
}
}
```
`task_attach` 程式碼分析:
與 detach 操作差不多。
```c
void task_attach(int64_t id)
{
task_lock();
struct task *co = task_map_delete(&task_detached, &(struct task){.id = id});
if (co) {
task_ndetached--;
}
task_unlock();
if (co) {
task_map_insert(&task_paused, co);
task_npaused++;
}
}
```
因為是使用 thread 的關係在操作共享變數的時候,必須要確保同步問題,適當的加入 lock & unlock。
`task_lock` 程式碼分析:
透過 `atomic_compare_exchange_weak` 操作嘗試獲取鎖,如果可以成功更改 `task_locker` 變數則代表可以成功獲取鎖,反之則無法獲取,呼叫 `sched_yield0` 讓出 cpu,並循環嘗試獲取鎖。
```c
static void task_lock(void)
{
bool expected = false;
while (!atomic_compare_exchange_weak(&task_locker, &expected, true)) {
expected = false;
sched_yield0();
}
}
```
`task_unlock`程式碼分析:
使用 `atomic_store(&task_locker, false)` ,確保 對 task_locker 值操作具有原子性。