# 2024q1 quiz8 contribute by `SimonLee0316` ## 測驗1 [題目](https://hackmd.io/@sysprog/linux2024-quiz8) [task](https://gist.github.com/jserv/745dab1d9be33a9a635b193aeb53191e) 是個運用 coroutine 實作的任務排程器,允許搭配 POSIX Thread 使用,使用 C11 Atomics 和 x86-64 組合語言開發。 ## test_task_start ```c void test_task_start(void) { /*...*/ quick_start(co_root_entry, co_cleanup, &(int){99999999}); /*...*/ } ``` 執行 co_root_entry 了,在裡面會不斷的新建 child coroutine。 ```c void co_child_entry(void *udata) { int udid = *(int *) udata; assert(udid == nudid); } void co_root_entry(void *udata) { assert(*(int *) udata == 99999999); assert(*(int *) task_udata() == 99999999); assert(task_info_running() == 1); for (int i = 0; i < NCHILDREN; i++) { printf("start child %d\n",i); assert(started == 1 + i); assert(cleaned == 1 + i - 1); nudid = i; quick_start(co_child_entry, co_cleanup, &(int){i}); } } ``` 運行順序如下。 ``` test_task_start start co_root_entry start child 0 start co_child_entry udid: 0 start child 1 start co_child_entry udid: 1 start child 2 ... start child 99 start co_child_entry udid: 99 ``` 總結來說,`do_test(test_task_start)`,會建立多個 coroutine 。 ## test_task_sleep 透過 quick_start 呼叫 co_sleep 函式執行,會在呼叫 task_sleep。 ```c void co_sleep(void *udata) { assert(*(int *) udata == 99999999); task_sleep(1e8); /* 100ms CPU-based sleep */ } static void task_sleep(int64_t nanosecs) { int64_t start = getnow(); while (getnow() - start < nanosecs) task_yield(); } ``` task_sleep 中的 task_yeild 功能主要是將當前 coroutine 放入 task_yielders 的串列當中,這可以表示成主動放棄 cpu 的 coroutine ,可以想成在 sleep 的狀態並等待排程,將自己放入 task_yielders 串列的末端後,在呼叫task_switch 呼叫在 task_nrunners 排隊的 coroutine 執行。 這裡會不斷的將 coroutine 加入 task_yielders 直到 100ms 時間到。 ## test_task_pause 建立100個 `co_pause_one` coroutine 還有 一個 `co_resume_all` coroutine ,並輪流執行。 ```c void test_task_pause(void) { reset_stats(); for (int i = 0; i < NCHILDREN; i++) { quick_start(co_pause_one, co_cleanup, &i); } quick_start(co_resume_all, co_cleanup, 0); while (task_active()) { task_resume(0); } assert(npaused == 0); } ``` * co_pause_one 建立100個的 `co_pause_one` coroutine,每次都會做一次順序暫停一次逆序暫停,再做一次順序再做一次逆序。 順序暫停的實做如下 ```c void co_pause_one(void *udata) { int index = *(int *) udata; assert(index == npaused); paused[index] = task_id(); /* pause in order */ npaused++; is_paused[index] = true; task_pause(); //return from pause is_paused[index] = false; npaused--; while (!all_resumed) task_yield(); } ``` 將目前被暫停的 coroutine 標記為 true ,並做暫停切換到其他 coroutine (包含 test_task_pause),從暫停回來之後將標記改回 false ,如果還有人在被暫停的話,那就將自己排到`task_yielders` ,等待所有人都恢復。 程式碼是如何做到逆序暫停? 答: 將 id 較小的 coroutine sleep 久一點 這樣 id 較大的 coroutine 就會比較先被暫停。 ```c void co_pause_one(void *udata) { /*...*/ /* pause in reverse */ task_sleep(1e6 * (NCHILDREN - index)); npaused++; is_paused[index] = true; task_pause(); is_paused[index] = false; npaused--; while (!all_resumed) { task_yield(); } } ``` * co_resume_all 有 100 個 `co_pause_one` coroutine 和 1 個 `co_resume_all` 依序交換執行之下,在 `co_pause_one` 裡面每一個逆/順序暫停中,最後都會檢查是否所有人都被恢復執行了才會繼續往下一個逆/順序暫停,而負責恢復執行的 coroutine 就是 `co_resume_all` ,在程式碼中可以看到,在暫停的時候的順序是 1. 順序暫停 2. 逆序暫停 3. 順序暫停 4. 逆序暫停,而恢復執行的順序卻是 1. 順序恢復 2. 順序恢復 3. 順序恢復 4. 逆序恢復 ,這種情況會使就算 coroutine 是早暫停的將會晚恢復執行。 ```c void co_resume_all(void *udata) { (void) udata; /* wait for all to be paused */ while (npaused < NCHILDREN) { task_yield(); } all_resumed = false; /* resume in order */ for (int i = 0; i < NCHILDREN; i++) { task_resume(paused[i]); } while (npaused > 0) { task_yield(); } all_resumed = true; /* resume in order */ /* resume in reverse order */ /* resume in reverse order */ /*...*/ } ``` 注意 `task_yield` 是將自己插入 `task_yielders` 末端,這樣的操作是可以再度被執行的。 但是如果使用 `task_pause` 的話是將自己插入到 `task_paused` 是一個 `task_map` 的結構末端, 也就是直到它被 resume 之前都不會被返回 `task_yielders`。 ## test_task_exit 程式碼運作流程: exitvals[] : 用來紀錄執行順序。 1. 啟動 `co_one` coroutine 2. `co_one` 紀錄 1,並啟動 `co_two` 3. `co_two` 將自己放入 `task_yielders` ,task_sleep(1e7 * 2),它會比 `co_three` 晚被恢復執行 4. 回到 `co_one` ,啟動 `co_three` 5. `co_three` 將自己放入 `task_yielders` ,task_sleep(1e7) 6. 回到 `co_one` ,啟動 `co_four` 7. `co_four` 紀錄 4 將自己放入 `task_yielders` 8. `co_one` 呼叫 `task_exit`,回到 `test_task_exit` 9. `test_task_exit` 確認是否還有 coroutine 沒有執行完,如果有就去執行它 ```c void test_task_exit(void) { /*...*/ while (task_active()) { task_resume(0); } /*...*/ } ``` 10. 恢復執行 `co_three` ,紀錄 3 11. 恢復執行 `co_two` ,紀錄 2 12. test_task_exit 紀錄-2,表示所有任務完成。 至此 exitvals 內的值為 [1,4,-1,3,2,-2] ## test_task_order 測試 coroutine 執行順序。 `quickstart(co_yield)` -> `task_start` -> `coro_start` -> `coro_switch0` -> `coro_switch1` -> `task_entry` ```c static void task_entry(void *udata) { /* Initialize a new coroutine on the user's stack. */ /*...*/ if (task_cur) { /* Reschedule the coroutine that started this one */ task_list_push_back(&task_yielders, co); task_list_push_back(&task_yielders, task_cur); task_nyielders += 2; task_switch(false, false); } task_cur = co; if (task_user_entry) { task_user_entry(udata); } /* This coroutine is finished. Switch to the next coroutine. */ task_switch(false, true); } ``` 1. 啟動 co_yield 是第一個 coroutine 所以沒有 task_cur,接著執行 co_yield task_cur = co_yield task_nrunners = {} , task_nyielders = {} a = [A] quick_start(co_yield1) -> ... ->`task_entry` 2. task_cur 是 co_yield,所以將 co_yeld1 和 co_yeld0 插入 task_nyielders task_nrunners = {} , task_nyielders = { co_yield1 -> co_yield } task_cur = co_yield1 呼叫 `task_switch()` 3. `task_switch()` 現在沒有 runners ,所以將 task_nyielders 全部搬到 runners task_cur = co_yield1 task_nrunners = { co_yield1 -> co_yield } , task_nyielders = {} 並取第一個 runners switch 過去執行,所以是 co_yield1。 4. 啟動 co_yeild1 a = [AB] 調用 `task_yield`,將自己插到 task_nyielders。 task_cur = co_yield1 task_nrunners = { co_yield } , task_nyielders = {co_yield1} 呼叫 `task_switch()`。 5. 從runners 取第一個 task = co_yield task_nrunners = {} , task_nyielders = {co_yield1} a = [ABC] quick_start(co_yield2) -> ... -> `task_entry` 6. task_cur 是 co_yield,所以將 co_yeld2 和 co_yield 插入 task_nyielders task_nrunners = {} , task_nyielders = { co_yield1 -> co_yield2 -> co_yield } 呼叫 `task_switch()` 7. 沒有 runners 所以將 task_nyielders 全部搬近來 task_nrunners = { co_yield1 -> co_yield2 -> co_yield } , task_nyielders = {} task_cur = co_yield1 取第一個 runner 執行 = co_yield1 8. co_yield1 a = [ABCD] 執行結束回到 `task_entry` ,並呼叫 task_switch task_cur = co_yield1 task_nrunners = { co_yield2 -> co_yield } , task_nyielders = {} 並取第一個 runners switch 過去執行,所以是 co_yield2。 task_cur = co_yield2 9. co_yield2 a = [ABCDE] 調用 `task_yield`,將自己插到 task_nyielders。 task_cur = co_yield2 task_nrunners = { co_yield } , task_nyielders = { co_yield2 } 並取第一個 runners switch 過去執行,所以是 co_yield。 task_cur = co_yield 10. co_yield a = [ABCDEF] 調用 `task_yield`,將自己插到 task_nyielders。 task_cur = co_yield task_nrunners = {} , task_nyielders = { co_yield2 -> co_yield } 沒有 runners 所以將 task_nyielders 全部搬近來 task_nrunners = { co_yield2 -> co_yield } , task_nyielders = {} 並取第一個 runners switch 過去執行,所以是 co_yield。 task_cur = co_yield2 11. co_yield2 a = [ABCDEFG] 執行結束回到 `task_entry` ,並呼叫 task_switch task_nrunners = { co_yield } , task_nyielders = {} 並取第一個 runners switch 過去執行,所以是 co_yield。 12. co_yield a = [ABCDEFGH] 執行結束回到 `task_entry` ,並呼叫 task_switch task_nrunners = {} , task_nyielders = {} 結束回到 `test_task_order` 。 ## test_task_detach 建立兩個 pthread 分別執行 `threda0` 和 `thread1` 執行後透過 `pthread_join` 等待 pthread 完成並回到 main。 `threda0` 程式碼分析: 啟動 100 個 coroutine ,並當被暫停的 coroutine 數量為 100 的時後(意思就是 task_paused 串列數量為 100),就開始將 task_paused 的所有 coroutine 搬到 task_detached 串列上。 ```c void *thread0(void *arg) { (void) arg; reset_stats(); for (int i = 0; i < NCHILDREN; i++) { quick_start(co_thread_one, co_cleanup, &i); } while (task_active()) { if (task_info_paused() == NCHILDREN) { /* Detach all paused */ for (int i = 0; i < NCHILDREN; i++) { task_detach(thpaused[i]); } } task_resume(0); } return NULL; } ``` `thread1` 程式碼分析: 如果 所有 coroutine 還沒都被 detached 的會就會卡在迴圈裡面。 離開迴圈後將所有 coroutine 從 task_detached 串列移回到 task_paused 串列,並把他們重新加回去 task_yielders 讓他們可以繼續被執行。 ```c void *thread1(void *arg) { (void) arg; reset_stats(); while (task_info_detached() < NCHILDREN) { /* Wait */ } /* Attach all paused */ for (int i = 0; i < NCHILDREN; i++) { task_attach(thpaused[i]); task_resume(thpaused[i]); } while (task_active()) { task_resume(0); } return NULL; } ``` `co_thread_one` 程式碼分析: 將自己加到 task_yielders 串列直到 時間到。 在將自己放到 task_paused 串列。 ```c void co_thread_one(void *udata) { int index = *(int *) udata; int64_t id = task_id(); thpaused[index] = id; task_sleep(1e6); task_pause(); } ``` `task_detach` 程式碼分析: 給定 要 detach 的 coroutine id ,找到並加到 task_detached 結構。 ```c void task_detach(int64_t id) { struct task *co = task_map_delete(&task_paused, &(struct task){.id = id}); if (co) { task_npaused--; task_lock(); task_map_insert(&task_detached, co); task_ndetached++; task_unlock(); } } ``` `task_attach` 程式碼分析: 與 detach 操作差不多。 ```c void task_attach(int64_t id) { task_lock(); struct task *co = task_map_delete(&task_detached, &(struct task){.id = id}); if (co) { task_ndetached--; } task_unlock(); if (co) { task_map_insert(&task_paused, co); task_npaused++; } } ``` 因為是使用 thread 的關係在操作共享變數的時候,必須要確保同步問題,適當的加入 lock & unlock。 `task_lock` 程式碼分析: 透過 `atomic_compare_exchange_weak` 操作嘗試獲取鎖,如果可以成功更改 `task_locker` 變數則代表可以成功獲取鎖,反之則無法獲取,呼叫 `sched_yield0` 讓出 cpu,並循環嘗試獲取鎖。 ```c static void task_lock(void) { bool expected = false; while (!atomic_compare_exchange_weak(&task_locker, &expected, true)) { expected = false; sched_yield0(); } } ``` `task_unlock`程式碼分析: 使用 `atomic_store(&task_locker, false)` ,確保 對 task_locker 值操作具有原子性。