# Reading Note ## About coroutine ### coro(non-preemptive coroutine) * Using setjmp and longjmp to implement schedule in user space, how? * In the function schedule, it calls all the task functions one time, and then calls the task_switch() function to start the coroutine scheme. ```c while (ntasks-- > 0) { struct arg arg = args[i]; tasks[i++](&arg); printf("Never reached\n"); } task_switch(); ``` * In the function tesk, * we can see that if it is the first time entering the function, it will go back to the schedule function at the top of the if section and set the jump point, so that next time when calling longjmp(t->env, 1), it will return to this if section. At that time, it will jump out of the if and proceed to the for loop. * In the for loop, it first sets the jump point for the next time we want to properly jump back to the desired point. * After setting the jump point, it proceeds to execute the function’s original logic, which in this case is calculating the Fibonacci sequence. * Once the function finishes its task (printing the Fibonacci numbers), it calls task_switch to allow other functions to perform their work. * Up to this point, we can see that this coroutine is non-preemptive — meaning the function (or we can say the "thread" here) needs to actively release the computation resource. Therefore, these threads must cooperate; otherwise, the system will hang. ```c if (setjmp(task->env) == 0) { task_add(task); longjmp(sched, 1); } task = cur_task; for (; task->i < task->n; task->i += 2) { if (setjmp(task->env) == 0) { long long res = fib_sequence(task->i); printf("%s fib(%d) = %lld\n", task->task_name, task->i, res); task_add(task); task_switch(); } task = cur_task; printf("%s: resume\n", task->task_name); } printf("%s: complete\n", task->task_name); free(task); longjmp(sched, 1); ``` * And how tesk_switch behave? * The coroutine scheme is combined from two sections of code: one is task_add(), and the other is task_switch(). * In task_switch, we can see it first deletes the task from the head of the task list. The reason for doing so is because the current task is at the head of the list — so we remove it and then call longjmp to jump into its function. * After the function finishes its work, it calls task_add to add itself back to the task list and then calls task_switch again to activate the next task at the head. * The task_add function puts the task structure at the tail of the task list. ```c task_add(task); task_switch(); ``` ```c static void task_switch() { if (!list_empty(&tasklist)) { struct task *t = list_first_entry(&tasklist, struct task, list); list_del(&t->list); cur_task = t; longjmp(t->env, 1); } } ``` ### preemptive coroutine Than how preemptive be implemented? * Use signal, we use timer to send SIGALRM in the signal handler function, and apply longjump to do context switch. ### Python eventloop Python 內設有 asyncio event loop 機制正是使用協程概念實作,使用場景如伺服器,透過 coro event loop 能夠達到單一執行緒多工的效果,以下解釋其如何運作。 * 首先為 event loop 主要機制 -- 回圈主要由兩個 phase 組成: * Phase 1:取 ntodo = len(_ready),依 FIFO 連續執行 ntodo 個 Handle/Task。本輪新增的排程項會留到下一次 Phase 1。 * Phase 2:selector.select(timeout) 等 I/O/timer;就緒事件與到期 TimerHandle 轉成 callback,放入 _ready 等下一輪。 what can i say.... just brillient. ```nginx while loop.is_running(): ┌─────────────────────────┐ │ 1. 處理 _ready 佇列 │ ← 就緒 callback/Task 在此 FIFO 執行 └─────────────────────────┘ │ ^ 執行中途如果又把新的 Task 加進 _ready (例:create_task) │ | 會立即被同一輪迴圈接著執行,直到 _ready 清空 │ ┌─────────────────────────┐ │ 2. poll I/O selector │ ← epoll/kqueue 等待可讀寫或 timer 到期 └─────────────────────────┘ │ ┌─────────────────────────┐ │ 3. 將新事件放進 _ready │ ← 產生 I/O callback、計時器 callback └─────────────────────────┘ │ └── 回到步驟 1 (下一 round) ``` * 那麼 coro 們如何讓出 cpu? * 注意如一 coro 卡在一個無窮回圈,eventloop 會直接 hang,這是因為 coro 並沒有中斷機制,必須由每一個 coro 維護好排程,讓出資源 * 在 python 的機制中 coro 有三種方式能夠讓出 cpu 資源分別為:await / return / raise。 * **task/future** : 在解釋其餘機制之前,首先需要理解 ready queue 裏存放的結構。 ```python class Future: _state: PENDING | FINISHED | CANCELLED _result / _exception _callbacks: list[Callable[[Future], None]] class Task(Future): _coro: coroutine # 目前要執行的協程對象 _fut_waiter: Future | None # 協程最後一次 yield 的 awaitable def _step(): # 執行協程到下一個 await/return ``` **Future**: * 屬性 _state : 即代表 coro 結束的三種狀況 * _callback : 為當該 future (or task) 結束時所呼叫之函式 list (wake_up().... and so on) **Task** 繼承自 future,多加了以下兩個屬性 (Task (繼承 Future) 與 Handle 是 _ready 能直接執行的單位;純 Future 不會進 _ready) * _coro : 比起 future 額外掛載協程工作 * _fut_waiter:紀錄最近一次註冊之 awaitable object ,可以用於後續取消 task 所用。 * **await** : 此為核心機制 ```python result = await expr #or just await expr ``` * 當一個 coro 內遇到 await 時,會被編譯成 `yield from expr.__await__()`; 協程此刻產生 (yield) 出 awaitable 物件並暫停。 * 接著近一步將 awaitable 物件包裝為 future/task,並在該新 future 之 _callback 註冊此 coro 的 wakeup function,並將 coro._state 設為 pending。 * 注意只有當 inner 轉成新的 Task 時,才會把該 Task 加入 ready queue, * 若 inner 已是 Future/Task,await 僅掛 _wakeup,不會再排程。 * 當 future/task 結束 :`Future.set_result/exception()` 將執行: ```python self._state = FINISHED self._result = result # 或 _exception = exc ``` 並迭代該 future 之 callback list,將 wake_up()、handler() 註冊回 ready queue 進一步喚醒父 coro ```python for cb in self._callbacks: loop.call_soon(cb, self) # 交給事件迴圈排程 self._callbacks.clear() ``` * 等待 I/O: * 高階 API(streams、loop.sock_recv()…)在呼叫點同樣建立new Future。 * 然後把 Future.set_result() 包成 callback,交給 loop.add_reader(fd, cb) (此函式將 fd 註冊進 select) 登記。 * 當 selector 報告 fd 可讀/可寫時,將該 cb 放回 ready queue → Future 完成(Future.set_result()) → 呼叫等待它的 Task 的 _wakeup()。 ```python outer_task._step(): try: inner = coro.send(value) # ─┐ ① 產生 awaitable except StopIteration as e: # | self.set_result(e.value) # | 協程 return(所有 await 皆完成,reach return) else: # | fut = asyncio.ensure_future(inner, loop=self._loop) fut.add_done_callback(self._wakeup) # ② 掛回呼 self._fut_waiter = fut # ③ 暫存 return # ④ outer Task 暫停 ``` ## OS concept * 當呼叫 sys_call 不稱為 context switch,儘管的確 CPU 跳去執行 os 程式 * User stack && kernel stack,一個執行單位皆需要要有這兩個元素前者用於正常程序,後者用於模式切換時使用,CPU 視當下情況改變 SP(Stack pointer) * 當創建執行緒,也會為該執行緒創建此二種堆疊。 ## 問答簡記 #### fork 後父子進程檔案描述符行為 > The child inherits copies of the parent's set of open file descriptors. Each file descriptor in the child refers to the same open file description (see open(2)) as the corresponding file descriptor in the parent. This means that the two file descriptors share open file status flags, file offset, and signal-driven I/O attributes 在 fork 呼叫之後,父子進程都會繼承父進程中所有的 open file description(即系統層級的開啟檔案表)。因此即便作業系統透過 Copy-On-Write(COW)機制能確保兩個進程在記憶體等資源上的互不干擾,對於檔案的操作卻不然:父子進程共享同一份 open file description,因此不僅檔案的偏移量(offset),還有檔案狀態(status flags)等也會被共享,若父子進程在 fork 後各自對該檔案進行讀寫,雙方之檔案偏移量會同時改變。 此情況對於 socket 亦然,父子進程將共享相同的 kernel-level socket 物件,等同共用以下資源: * Socket 狀態(如連線是否建立) * 傳送與接收緩衝區(send/receive buffers) * Socket 設定選項 當父子進程同時對該 socket 物件進行操作(如 `send()` 或 `recv()`),可能導致資料交錯、race condition 或其他非預期行為。 為了避免上述描述符共用所造成的問題,常見的作法之是在 `fork()` 之後,於子進程中立即關閉不需要繼承的共享檔案描述符。 #### dup * `int dup(int oldfd)` 此系統呼叫會回傳一個非負整數,代表呼叫進程中 per-process file descriptor table 中最小的未使用索引值,並讓該索引指向與 oldfd 相同的系統級 open file description。 須注意的一點是,新的描述符號雖與舊描述符共享檔案偏移量、檔案狀態,但不共享檔案描述符旗標(file descriptor flags),即 close-on-exec flag,若此 flag 被設置,當進程呼叫 `execve()` 時,設置此旗幟之檔案描述符將被自動關閉。 `dup()` 預設新描述符此 flag 不會被設置 * `int dup2(int oldfd, int newfd)` 此系統呼叫與 `dup()` 行為基本一致,不同的是此系統呼叫能夠透過參數 `newfd` 指定新描述符之數值。 假使 `newfd` 在呼叫函式前已經被使用,將在使用前關閉原本的描述符 -- `close(newfd)`,須注意 `dup2()` 不回傳任何呼叫 `close()` 時發生之錯誤訊息。 此外,`dup2()` 會將「關閉 `newfd`」與「將其指向 `oldfd`」這兩個步驟以 atomic 方式執行。這一點相當重要,因為若試圖以下列方式自行模擬 `dup2()`: ```c close(newfd); dup(oldfd); ``` 將可能在兩步驟之間發生 race condition: * 如主程式被 signal handler 打斷,該 handler 在此時開啟了一個新的檔案描述元,恰巧佔用了剛關閉的 `newfd` * 或是其他執行緒同時分配了新的檔案描述元 導致 `dup()` 無法取得原本預期的 `newfd` 數值,造成預期之外的行為。因此,若需要精確指定描述元並避免 race,應優先使用 `dup2()`。 * `int dup3(int oldfd, int newfd, int flags)` 此系統呼叫與 `dup2()` 行為基本一致,但能夠針對 `newfd` 設置 close-on-exec flag,此旗幟行為於上述已提及。 此旗幟設置方法有兩個,一個是透過 `fcntl()`,而另一個方式是在開啟檔案時,透過 `open()` 設置,而設計 `dup3()` 的考量與設計 `open()` 能夠設置此旗幟之考量一致 > Note that the use of this flag is essential in some multithreaded programs, because using a separate fcntl(2) F_SETFD operation to set the FD_CLOEXEC flag does not suffice to avoid race conditions where one thread opens a file descriptor and attempts to set its close-on-exec flag using fcntl(2) at the same time as another thread does a fork(2) plus execve(2). Depending on the order of execution, the race may lead to the file descriptor returned by open() being unintentionally leaked to the program executed by the child process created by fork(2). 即在多執行緒環境中,若在建立檔案描述符的同時立即設置 `FD_CLOEXEC` 旗幟,能夠避免 race condition 發生: 當採取先建立描述符再使用 `fcntl()` 設定旗幟,在 `fcntl()` 尚未成功執行前,若其他執行緒同時呼叫 `fork() + execve()`,該描述符將在尚未設定 `FD_CLOEXEC` 就被 fork 出去,導致子進程執行的新程式意外繼承原本應該自動關閉的描述符,造成 file descriptor 洩漏 `dup()` 並不會建立新的 open file description,而是產生一個新的 file descriptor 指向相同的系統層級 entry,因此新舊描述符會共用 file offset 與 open flags,但不共用 `FD_CLOEXEC`;若希望 parent 與 child 在 `fork()` 後能各自操作獨立的檔案 offset,仍需在 `fork()` 後於子進程中重新 open 該檔案,取得新的 open file description。 但對於預計會 `fork()` 的進程來說,設置 `O_CLOEXEC` 能夠安全的確保,子進程在 `execve()` 後能夠自動關閉描述符,避免誤用。