# MIT6.s081 Lab: Multithreading ###### tags: `mit6.s081` `作業系統 Operating System` 本實驗對應 xv6 CH7:Scheduling,要求學員實作 - user-level threading 系統的上下文切換機制 - 使用 **pthread** API 平行化 hash table - 實作 Barrier 同步機制 前半段為 CH7 的筆記整理,實驗部份請從 Uthread: switching between threads 開始閱讀 ## Threads 現代的處理器大多為多核心,我們希望盡可能的利用這些 CPU 資源,例如: - CPU 在同一時間不只運行單一任務 - 將程序拆分,並在不同 CPU 上執行其中的一個部份,加速程式運行 執行緒提供了一個抽象,讓我們在處理這類 CPU 平行/並行運算的問題時,能更輕鬆的撰寫程式碼。執行緒可以視為佔用單一 CPU 並序列執行指令的單元。此外,執行緒還具有狀態,透過保存/恢復這些狀態,我們可以隨時地暫停及恢復執行緒的執行: - 程式計數器 (Program Counter) - Callee-Saved Registers - 執行緒的 stack pointer 作業系統的職責是讓多個執行緒能夠分時復用(Multiplexing) CPU 資源,而且這個機制對 user 行程來說應該是透明的,每個程式都認為自己獨占了 CPU 的資源,達到 CPU 的虛擬化 xv6 的多執行緒運行策略與其他作業系統無異,執行緒會運行在所有可用的 CPU 上,而每個 CPU 會切換多個不同執行緒來執行。另外,xv6 所有的 kernel threads 共享虛擬地址 (還記得 xv6 只有一張 kernel page table 嗎),不同 user threads 則使用獨立的地址空間 實現多執行緒系統會面臨以下的挑戰: - 執行緒切換,停止一個執行緒並啟動另一個執行緒的過程,一般稱為執行緒調度 (Scheduling)。xv6 為每個 CPU 運行一個調度器 (Scheduler,每個 CPU 有自己的 CPU 執行緒及 CPU stack) - 切換執行緒時,運行狀態的保存及恢復必需透明化。另外,多核心 CPU 會並行地切換所有進程,而 kernel 又共用虛擬地址空間,如何確保多核心切換的正確性,也是重要的議題 - 運算密集的執行緒處理 (compute-bound thread)。執行緒切換通常分為自願與非自願: - **自願式排程(voluntary scheduling)** 發生在進程要等待某個資源或條件,如 **I/O**、**等待子進程退出**等,執行緒可以透過 **sleep** 主動放棄 CPU,稍後條件滿足時在透過 **wakeup** 重新喚醒 - 對於**計算密集**的程式來說,可能我們正在執行一個需要耗費數小時甚至數天的任務,這種情況執行緒是不會願意讓出 CPU 的,作業系統需要一套機制,從計算密集的任務取回 CPU 的控制權。這種機制通常稱為**搶佔式排程(pre-emptive scheduling)** 針對計算密集的執行緒,從 xv6 CH5 Interrupt 我們知道每個 CPU 都有一個計時器,該計時器每過一段時間會發出計時器中斷,並利用 trap 機制陷入 kernel,透過計時器中斷,我們可以將 CPU 的控制權由 user thread 轉交給 kernel thread。timer interrupt handler 會呼叫 **yield** 告知調度器可以讓其他進程運行(藉由修改進程狀態為 RUNNABLE),並讓出 CPU 資源 所以在 xv6 中 CPU-bound 執行緒的調度其實包含了自願及搶佔式排程:定時器中斷會強制地讓 kernel 搶佔 CPU 資源。稍後 kernel 會代表 user 進程自願地掛起進程 (RUNNABLE) 並將資源讓給調度器 執行緒一般有以下幾種狀態,讓作業系統知道該如何調度: - RUNNING:正在某個 CPU 上運行 - RUNNABLE:目前沒有在運行,但一旦有閒置的 CPU 立刻可以運行 - SLEEPING:暫時不想運行的執行緒,可能在等待 I/O 或其他事件 每一個 CPU 核心在一個時間只會運行一個執行緒,要麼是 user thread,或著是 kernel thread,又或著是 CPU 調度器的執行緒。執行緒的切換,創造了多個不同的執行緒同時運行在一個 CPU 的假象 ## Thread Switching in xv6 與 trap 機制類似,若要從一個進程切換至另一個進程,我們必需保存原進程的上下文 (也就是運行狀態和暫存器),從可被執行的進程中挑選一個,恢復該進程的上下文,然後開始運行新的進程 xv6 切換進程的整個流程遵循相同的概念。假設某個時刻定時器中斷發生,進程從 user space 陷入 kernel,trap handler 發現這是一個定時器中斷,如果作業系統決定要切換運行的進程: 1. 保存 P1 進程 kernel thread 的上下文 (user thread 的狀態保存在 trapframe 中,而 kernel thread 的狀態保存在 context 對象中) 2. 切換為 CPU 對應的上下文,也就是調度器的狀態,包含暫存器及 stack pointer (每個 CPU 的調度器都有自己的 stack,在 xv6 啟動過程中配置) 3. 選擇要切換的進程 P2 4. 切換為 P2 進程 kernel thread 的上下文 5. 最後透過 trap 機制回到 user space,運行新的進程 詳細的架構與流程可參考下圖 ![](https://i.imgur.com/XyG9zc6.png) 這邊要特別說明,一般我們慣用的上下文切換,是指一個執行緒切換成另一個執行緒,或著是兩個 user 進程切換的完整過程。而在這節課當中是說 kernel thread 與調度器 thread 之間的切換 ## Code:Context Switching xv6 透過調用 **swtch** 切換上下文,**a0** 及 **a1** 暫存器的值分別對應第一及第二個參數。程式上半段儲存 **a0** 指向的 context 對象,下半段則將 **a1** 指向的 context 對象載入暫存器中 kernel thread 的上下文保存在進程的結構體 p->context,CPU 調度執行緒的上下文則保存在 cpu->context。值得注意的是,無論是 kernel 或著 cpu,context 都沒有包含程式計數器的資訊,因為切換的當下一定運行在 **swtch** 函數中,程式計數器並沒有提供有用的訊息。我們關心的是從哪個地方調用進入 **swtch** 的,所以我們保存返回地址暫存器 **ra** 及 stack pointer 暫存器 **sp**,當我們從 **swtch** 返回後,透過 **ra** 回到新進程調用 swtch 的地方繼續執行程式 另外,由於 swtch 是按照一般的函式來呼叫,Caller-saved 暫存器會被保存在 stack 上,函式返回時這些暫存器的值會自動被載回,所以 swtch 只保存了 **Callee-saved** 暫存器 ```cpp kernel/swtch.S # Context switch # # void swtch(struct context *old, struct context *new); # # Save current registers in old. Load from new. .globl swtch swtch: sd ra, 0(a0) sd sp, 8(a0) sd s0, 16(a0) sd s1, 24(a0) sd s2, 32(a0) sd s3, 40(a0) sd s4, 48(a0) sd s5, 56(a0) sd s6, 64(a0) sd s7, 72(a0) sd s8, 80(a0) sd s9, 88(a0) sd s10, 96(a0) sd s11, 104(a0) ld ra, 0(a1) ld sp, 8(a1) ld s0, 16(a1) ld s1, 24(a1) ld s2, 32(a1) ld s3, 40(a1) ld s4, 48(a1) ld s5, 56(a1) ld s6, 64(a1) ld s7, 72(a1) ld s8, 80(a1) ld s9, 88(a1) ld s10, 96(a1) ld s11, 104(a1) ret ``` ## Code: Scheduling 當計時器中斷發生進入 trap 程序時,首先會呼叫 **yield**,**yield** 做的事情很簡單,將進程的狀態改為 RUNNABLE 代表要讓出 CPU 資源,再調用 **sched** 準備切換上下文。雖然 RUNNABLE 代表進程沒有在運行,但狀態變更的瞬間進程仍在運行中,若其他 CPU thread 看到該進程的狀態並且調度執行,那麼就有兩個 CPU 在運行同一個進程,可能導致嚴重的錯誤,因此這段程式碼加鎖的原因之一,就是為了避免這種狀況 ```cpp kernel/proc.c // Give up the CPU for one scheduling round. void yield(void) { struct proc *p = myproc(); acquire(&p->lock); p->state = RUNNABLE; sched(); release(&p->lock); } ``` **sched** 會做一些 sanity check,首先再次確認是否持有 p->lock,接著透過檢查 CPU 中斷嵌套的層數 **noff** 檢查是否還有 p->lock 以外的鎖。然後檢查進程的狀態和中斷檢查,最後呼叫 **swtch** 切換至 CPU 的調度執行緒 檢查 **noff** 的原因是,如果 kernel thread 擁有除了 p->lock 以外的鎖,進入調度器並切換成另一個 kernel thread,若新的 kernel thread 也要獲取該鎖的話,因為 acquire 會關閉中斷(避免例如驅動程式的 deadlock),沒辦法透過計時器中斷跳回持有 lock 的進程,所以整個流程會卡死導致 deadlock ```cpp kernel/proc.c // Switch to scheduler. Must hold only p->lock // and have changed proc->state. Saves and restores // intena because intena is a property of this // kernel thread, not this CPU. It should // be proc->intena and proc->noff, but that would // break in the few places where a lock is held but // there's no process. void sched(void) { int intena; struct proc *p = myproc(); if(!holding(&p->lock)) panic("sched p->lock"); if(mycpu()->noff != 1) panic("sched locks"); if(p->state == RUNNING) panic("sched running"); if(intr_get()) panic("sched interruptible"); intena = mycpu()->intena; swtch(&p->context, &mycpu()->context); mycpu()->intena = intena; } ``` 切換的過程中swtch 一共呼叫了兩次,而 **scheduler** 函數是連接這兩次呼叫的橋樑。p1 從 yield 取得 lock 之後進入 sched,sched 呼叫 swtch 切換到調度器執行緒,因為 p1 的運行已經結束,所以我們將 c->proc 設為 0,然後 p1 釋放 lock。接著調度器回到循環取得其他進程的 p->lock 並檢查進程是否可被運行,若找到可運行的 p2 則切換進程狀態為 RUNNING,cpu->proc = p2,再呼叫 swtch 切換為 p2 kernel thread 的上下文,接著返回 p2 上次 yield 呼叫 shced 的地方,會後釋放 p2->lock 透過 trap 機制回到 user space ![](https://i.imgur.com/FoNw6VK.png) ```cpp kernel/proc.c // Per-CPU process scheduler. // Each CPU calls scheduler() after setting itself up. // Scheduler never returns. It loops, doing: // - choose a process to run. // - swtch to start running that process. // - eventually that process transfers control // via swtch back to the scheduler. void scheduler(void) { struct proc *p; struct cpu *c = mycpu(); c->proc = 0; for(;;){ // Avoid deadlock by ensuring that devices can interrupt. intr_on(); int nproc = 0; for(p = proc; p < &proc[NPROC]; p++) { acquire(&p->lock); if(p->state != UNUSED) { nproc++; } if(p->state == RUNNABLE) { // Switch to chosen process. It is the process's job // to release its lock and then reacquire it // before jumping back to us. p->state = RUNNING; c->proc = p; swtch(&c->context, &p->context); // Process is done running for now. // It should have changed its p->state before coming back. c->proc = 0; } release(&p->lock); } if(nproc <= 2) { // only init and sh exist intr_on(); asm volatile("wfi"); } } } ``` 上述流程的一個例外是在 xv6 創建進程的時候。xv6 切換進程時,會從一個進程的 swtch 調用切換到另一個進程的 swtch 調用,但新的進程並沒有從計時器中斷進入排程,當新的進程被調度時,若完全照搬上面的流程,在 swtch 切換上下文後,我們將跳轉到一個奇怪的地方,因為我們並非從 swtch 進入 scheduler 函數,而且進程的上下文是全新的。所以我們需要一個特殊的函數 **forkret**,讓進程第一次呼叫 swtch 時,可以正確地返回 user space ![](https://i.imgur.com/mnoqRnx.png) 在初始化 user space 或是 fork 建立子進程時都會呼叫 allocproc,allocproc 下半段的程式碼會重新設置進程的上下文,我們將 ra 暫存器設為 **forkret** 的地址,當新的進程被調度時,swtch 切換上下文後會跳轉到 forkret,forkret 呼叫 usertrapret 返回 user space。另外進程需要有自己的 stack,所以 sp 暫存器也需要設置。由於進程是第一次運行,其他暫存器的內容不重要也沒有意義 ```cpp kernel/proc.c static struct proc* allocproc(void) { ... // Set up new context to start executing at forkret, // which returns to user space. memset(&p->context, 0, sizeof(p->context)); p->context.ra = (uint64)forkret; p->context.sp = p->kstack + PGSIZE; return p; } // A fork child's very first scheduling by scheduler() // will swtch to forkret. void forkret(void) { static int first = 1; // Still holding p->lock from scheduler. release(&myproc()->lock); if (first) { // File system initialization must be run in the context of a // regular process (e.g., because it calls sleep), and thus cannot // be run from main(). first = 0; fsinit(ROOTDEV); } usertrapret(); } ``` 在調度中,鎖扮演重要的角色,我們知道切換上下文的過程有非常多步驟: - 變更進程的狀態 - 保存進程的上下文 - 切換 kernel stack 進程在這個時間點處在一個非穩定的中間態,而完成這三件事情需要一段時間,因此我們需要鎖確保這三件事情的原子性,要麼全部發生,要麼全部不發生,另外鎖也可以確保其他 CPU 調度器無法看見該進程,避免發生進程剛切換狀態為 RUNNABLE,被另一個 CPU 調度器排程,導致一個進程同時在兩個 CPU 上執行的情況 但如果在這個過程中發生計時器中斷呢?假設我們在 scheduler 切換進程的狀態為 RUNNING 後,呼叫 swtch 切換上下文,我們尚未完全載入進程的暫存器,不幸地此時發生了計時器中斷,我們又將沒有載入完全的暫存器存入上下文中,導致進程部份的上下文被覆蓋,因此 **acquire** 會關閉中斷,確保整個調度過程的原子性 ## Code: mycpu and myproc xv6 為每個 CPU 維護一個結構體 **struct cpu**,包含 CPU 執行的進程、調度器的上下文以及中斷管理的相關訊息 ```cpp kernel/proc.c // Per-CPU state. struct cpu { struct proc *proc; // The process running on this cpu, or null. struct context context; // swtch() here to enter scheduler(). int noff; // Depth of push_off() nesting. int intena; // Were interrupts enabled before push_off()? }; ``` 執行進程的 CPU 編號透過 **tp** 暫存器取得,在讀取 **tp** 時必需關閉中斷,避免因計時器中斷將進程切換到不同 CPU 執行,導致返回錯誤的 CPU 編號。myproc 同樣地必需關閉中斷,避免在取出進程結構體之後,切換到不同的 CPU 上執行 ```cpp kernel/proc.c // Return this CPU's cpu struct. // Interrupts must be disabled. struct cpu* mycpu(void) { int id = cpuid(); struct cpu *c = &cpus[id]; return c; } // Must be called with interrupts disabled, // to prevent race with process being moved // to a different CPU. int cpuid() { int id = r_tp(); return id; } // Return the current struct proc *, or zero if none. struct proc* myproc(void) { push_off(); struct cpu *c = mycpu(); struct proc *p = c->proc; pop_off(); return p; } ``` ## Sleep & Wakeup 有些時候程式要等待特定的事件發生,或著不同執行緒、進程之間需要協同工作,例如: - 某個執行緒在讀取 Pipe 的資訊,但 Pipe 目前沒有任何數據,因此執行緒必需等待 Pipe 非空的事件發生 - 進程對 disk 發出請求讀取特定的 block 時,必需等待 disk 讀取完成的事件 - Unix 進程可以呼叫 wait 函數,等待子進程退出的事件發生 這些特定的事件可能來自 I/O 或其他進程,而且事件發生的時間可能是幾個 ms 甚至好幾分鐘以後,若採用 busy-waiting 的方式等待,會耗費大量的 CPU 資源在空轉。因此我們需要一套協同的機制,讓等待事件發生的進程先讓出 CPU,並在關心的事件發生時重新取得 CPU 的使用權。xv6 採用 **sleep** & **wakeup** 建立進程間的協同工作,一個進程呼叫 **sleep** 讓出 CPU 資源並等待事件發生,由另一個進程在事件發生後調用 **wakeup** 喚醒等待中的進程,這種模型稱為 **Conditional Synchronization** 課堂上教授重新撰寫了 **uartwrite** 及 **uartintr** 函數來展示 sleep & wakeup 接口的使用方式。uartwrite 迭代地將字元寫入緩衝區(UART 一次只能接受一個字元的傳輸),我們向 UART 寫入一個字元後,必需等待 UART 發出傳輸完成中斷,才能再寫下一個字元。而傳輸的速度可能非常緩慢,字元間的等待時間或許是毫秒等級,因此在另一個進程表明傳輸完成之前(透過 tx_done 表示),會呼叫 sleep 進入睡眠狀態 UART 觸發傳輸完成中斷後,會進入中斷處理程序 **uartintr**,uartintr 會讀取 UART 的控制暫存器,檢查表明傳輸完成的標誌位,如果傳輸完成則將 tx_done 設為 1,並調用 wakeup 喚醒等待的進程,而在 uartwrite 等待的進程恢復執行,開始發送新的字元 由 uartwrite & uartintr,我們可以觀察到 sleep 函數必需完成幾件事情 - 在睡眠期間不得持有鎖,否則無法進入 wakeup 進程的 critical section - 改變進程的狀態,確保進程在睡眠期間不會被排程 - 被喚醒之後必需重新持有鎖,因為通常被喚醒之後會進入 critical section 稍後我們會看到 sleep & wakeup 的實現 ```cpp void uartwrite(char buf[], int n) { acquire(&uart_tx_lock); int i = 0; while(i < n){ while(tx_done == 0){ // UART is busy sending a character. // wait for it to interrupt sleep(&tx_chan, &uart_tx_lock); } WriteReg(THR, buf[i]); i += 1; tx_done = 0; } release(&uart_tx_lock); } void uartintr(void) { acquire(&uart_tx_lock); if(ReadReg(LSR) & LSR_TX_IDLE){ // UART finished transmitting; wake up any sending thread. tx_done = 1; wakeup(&tx_chan); } release(&uart_tx_lock); // read and process incoming characters. while(1){ int c = uartgetc(); if(c == -1) break; consoleintr(c); } // send buffered characters. acquire(&uart_tx_lock); uartstart(); release(&uart_tx_lock); } ``` 由上述的內容我們知道 sleep 函數會在睡眠前將鎖釋放,並在被喚醒後重新取得鎖,這樣感覺用一般的鎖就可以達成 sleep 的機制了,為什麼我們需要額外的 sleep & wakeup 接口?考慮以下 **uartwrite** 修改的程式碼 ```cpp void uartwrite(char buf[], int n) { acquire(&uart_tx_lock); int i = 0; while(i < n){ while(tx_done == 0){ // UART is busy sending a character. // wait for it to interrupt // //sleep(&tx_chan, &uart_tx_lock); release(&uart_tx_lock); broken_sleep(&tx_chan); acquire(&uart_tx_lock); } WriteReg(THR, buf[i]); i += 1; tx_done = 0; } release(&uart_tx_lock); } ``` 程式碼看起來沒有什麼問題對吧?但假設不幸地我們在釋放鎖之後立刻發生中斷會發生什麼事呢?以 UART 傳輸的例子來說,很有可能資料傳輸完成後 UART 發出中斷,而此時鎖被寫進程釋放了,所以我們能夠調用 wakeup,但實際上寫進程根本就還沒進入睡眠狀態,所以中斷處理程序沒有喚醒任何進程。稍後寫進程繼續執行,呼叫 broken_sleep 將進程設為 SLEEPING,但因為中斷已經發生過,wakeup 也已經被調用,導致這次的 broken_sleep 將不會被喚醒,這個問題稱為 **Lost Wakup** ```cpp release(&uart_tx_lock); // Innterrupt Right Here broken_sleep(&tx_chan); acquire(&uart_tx_lock); ``` 為了解決 lost wakeup,我們必需確保 sleep 可以原子地(atomic)將進程狀態設為 SLEEPING 並釋放鎖,因此 sleep 的實現比想像的還要稍微複雜一點。首先我們必需在修改進程狀態前,先取得進程的鎖,避免進程狀態被 wakeup 看到而修改,並釋放條件鎖讓 wakeup 可以被調用。確定取得進程鎖之後,我們可以放心修改進程的狀態,並紀錄進程睡在哪個 channel 上,然後呼叫 sched 讓出 CPU 資源並釋放進程鎖 ```cpp kernel/proc.c // Atomically release lock and sleep on chan. // Reacquires lock when awakened. void sleep(void *chan, struct spinlock *lk) { struct proc *p = myproc(); // Must acquire p->lock in order to // change p->state and then call sched. // Once we hold p->lock, we can be // guaranteed that we won't miss any wakeup // (wakeup locks p->lock), // so it's okay to release lk. if(lk != &p->lock){ //DOC: sleeplock0 acquire(&p->lock); //DOC: sleeplock1 release(lk); } // Go to sleep. p->chan = chan; p->state = SLEEPING; sched(); // Tidy up. p->chan = 0; // Reacquire original lock. if(lk != &p->lock){ release(&p->lock); acquire(lk); } } ``` wakeup 的實現比較單純,遍歷進程表找出正在睡眠且等待在 chan 的進程,將進程的狀態設為 RUNNBALE。sleep 會先取得進程的鎖再釋放條件鎖,所以即便我們調用了 wakeup,仍然無法看到進程的狀態。然後 sleep 會修改進程狀態並調用 sched,接著調度器會釋放進程的鎖後開始排程,直到這個時候 wakeup 才能取得進程的鎖,並發現進程處於 SLEEPING 狀態而喚醒它 ```cpp kernel/proc.c // Wake up all processes sleeping on chan. // Must be called without any p->lock. void wakeup(void *chan) { struct proc *p; for(p = proc; p < &proc[NPROC]; p++) { acquire(&p->lock); if(p->state == SLEEPING && p->chan == chan) { p->state = RUNNABLE; } release(&p->lock); } } ``` 對於生產者進程(sleep 的調用者)來說,可能持有進程鎖或條件鎖或兩個鎖皆持有,而消費者進程(wakeup 的調用者) 在迭代檢查進程狀態時,必需同時持有兩個鎖才能對進程的狀態進行修改 另外,也有多個進程睡眠在同一個 channel 的狀況,例如多個進程讀取同一個 pipe。由於 wakeup 會檢查整個進程表,因此正在睡眠的進程都會被喚醒,而第一個被喚醒的進程會重新取得條件鎖,進行相應的處理之後才會釋放條件鎖,因此其他較晚被喚醒的進程會阻塞在 acquire(lk)。當條件鎖被第一個進程釋放後,其他進程取得鎖發現條件不滿足,又繼續陷入睡眠狀態,因此 sleep 調用總是搭配 while loop 使用,檢查條件是否真的滿足 ## Code: Pipes xv6 很多的系統呼叫都透過 sleep & wakeup 機制實作,首先我們來看 xv6 如何利用這兩個接口解決 pipe 生產者與消費者的同步問題。pipe 的結構體定義如下,包含了一個 spinlock、緩衝區、讀寫字元數的計數器以及讀寫端是否開啟的標誌位 ```cpp kernel/pipe.c struct pipe { struct spinlock lock; char data[PIPESIZE]; uint nread; // number of bytes read uint nwrite; // number of bytes written int readopen; // read fd is still open int writeopen; // write fd is still open }; ``` pipe 讀取端的函數為 piperead,piperead 首先會取得 pipe 的鎖,並判斷 pipe 裡頭是否有待讀取的資料,如果 buffer 為空則讓進程睡眠,否則我們可以從 buffer 讀取 n 個字元、複製到 user space,喚醒寫進程然後釋放鎖 ```cpp kernel/pipe.c int piperead(struct pipe *pi, uint64 addr, int n) { int i; struct proc *pr = myproc(); char ch; acquire(&pi->lock); while(pi->nread == pi->nwrite && pi->writeopen){ //DOC: pipe-empty if(pr->killed){ release(&pi->lock); return -1; } sleep(&pi->nread, &pi->lock); //DOC: piperead-sleep } for(i = 0; i < n; i++){ //DOC: piperead-copy if(pi->nread == pi->nwrite) break; ch = pi->data[pi->nread++ % PIPESIZE]; if(copyout(pr->pagetable, addr + i, &ch, 1) == -1) break; } wakeup(&pi->nwrite); //DOC: piperead-wakeup release(&pi->lock); return i; } ``` pipewrite 同樣先取得 pipe 的鎖,每寫入一個字元之前,都先判斷 buffer 是否是滿的,若 pipe 有空間的話,我們先調用 wakeup 喚醒 pipe 讀取端的進程消化 buffer,再讓自己進入睡眠狀態。寫完所有字元之後, pipewrite 喚醒等待讀取的進程並釋放 pipe 鎖 ```cpp kernel/pipe.c int pipewrite(struct pipe *pi, uint64 addr, int n) { int i; char ch; struct proc *pr = myproc(); acquire(&pi->lock); for(i = 0; i < n; i++){ while(pi->nwrite == pi->nread + PIPESIZE){ //DOC: pipewrite-full if(pi->readopen == 0 || pr->killed){ release(&pi->lock); return -1; } wakeup(&pi->nread); sleep(&pi->nwrite, &pi->lock); } if(copyin(pr->pagetable, &ch, addr + i, 1) == -1) break; pi->data[pi->nwrite++ % PIPESIZE] = ch; } wakeup(&pi->nread); release(&pi->lock); return i; } ``` ## Code: Code: Wait, exit, and kill 另一個與 sleep & wakeup 相關的議題,是如何正確地關閉一個進程。退出一個進程包含許多的步驟,我們要釋放 user space 的記憶體空間、釋放 page table、釋放 trapframe 及修改進程狀態為 REUSABLE 等,這裡有兩個重要的問題需要處理: - 待中止的進程此時有可能運行在其他 CPU 上、在 kernel 中持有了鎖,或著正在更新 kernel 某個複雜的資料結構,我們不能直接將另一個進程殺掉 - 另一個問題是,即便是進程自己調用 exit 退出,進程運行仍有一些必要的資源例如 stack、進程表中的位置等。當進程還在執行時,不能輕易地將這些資源釋放,所以我們需要一個機制讓進程最後能釋放某些對於執行程式碼來說關鍵的資源 在使用 fork 時,常見的組合是父進程呼叫 wait 等待子進程 exit,而 wait & exit 也是 sleep & wakeup 交互模型的一個應用。 exit 首先關閉進程開啟的所有文件,並釋放對目錄的引用,將該進程的子進程交由 init 進程管理,設置進程狀態設為 ZOMBIE,最後呼叫 sched 讓出 CPU 資源。在變更進程狀態前,我們會先取得進程原本的父進程,原因是父進程有可能同時在 exit,導致兩者的父進程都變成 init,兩個進程會爭搶 init 進程的鎖造成 dead lock。另外,在喚醒父進程之前,我們必需先持有父進程的鎖,避免 lost wakeup 此時我們尚未釋放進程所有的資源,雖然進程因為狀態被設置為 ZOMBIE 不再被調度,因此我們能確保進程不會再運行了,但進程還不能被重新使用,稍後我們會在 wait 函數看到該如何進行後續的處理 ```cpp kernel/proc.c void exit(int status) { struct proc *p = myproc(); if(p == initproc) panic("init exiting"); // Close all open files. for(int fd = 0; fd < NOFILE; fd++){ if(p->ofile[fd]){ struct file *f = p->ofile[fd]; fileclose(f); p->ofile[fd] = 0; } } begin_op(); iput(p->cwd); end_op(); p->cwd = 0; // we might re-parent a child to init. we can't be precise about // waking up init, since we can't acquire its lock once we've // acquired any other proc lock. so wake up init whether that's // necessary or not. init may miss this wakeup, but that seems // harmless. acquire(&initproc->lock); wakeup1(initproc); release(&initproc->lock); // grab a copy of p->parent, to ensure that we unlock the same // parent we locked. in case our parent gives us away to init while // we're waiting for the parent lock. we may then race with an // exiting parent, but the result will be a harmless spurious wakeup // to a dead or wrong process; proc structs are never re-allocated // as anything else. acquire(&p->lock); struct proc *original_parent = p->parent; release(&p->lock); // we need the parent's lock in order to wake it up from wait(). // the parent-then-child rule says we have to lock it first. // 在變更狀態前,我們必需讓父進程等在 wait 函數掃描進程表的迴圈外頭,當我們改完狀態之後, // 才能讓父進程進入迴圈,遍歷進程表 acquire(&original_parent->lock); // 當然進程自己的鎖也必需持有,避免在修改的過程中進程對調度器可見 acquire(&p->lock); // Give any children to init. reparent(p); // Parent might be sleeping in wait(). wakeup1(original_parent); p->xstate = status; p->state = ZOMBIE; release(&original_parent->lock); // Jump into the scheduler, never to return. sched(); panic("zombie exit"); } ``` wakeup1 是 wakeup 的特殊版本,僅喚醒特定的進程 ```cpp kernel/proc.c static void wakeup1(struct proc *p) { if(!holding(&p->lock)) panic("wakeup1"); if(p->chan == p && p->state == SLEEPING) { p->state = RUNNABLE; } } ``` wait 首先取得自己的鎖作為條件鎖,避免 lost wakeup。然後進入一個大型的迴圈,持續掃描進程表,如果進程的父進程是自己,而且子進程的狀態為 ZOMBIE,則將子進程的退出狀態複製到 wait 傳入的地址,並呼叫 freeproc 清除子進程的資源及進程結構,最後將子進程及自己的鎖釋放掉返回。如果子進程都在運行中,則進程會呼叫 sleep 讓自己進入睡眠狀態,等待其中一個子進程把自己喚醒 一個值得注意的地方是,wait 在檢查進程的父進程是否為自己時,沒有先取得進程的鎖,原因是進程 np 有可能是目前進程的父進程,如果此時我們嘗試取得 np->lock,就違反了 xv6 先對父進程再對子進程上鎖的規則。而且這裡不上鎖也沒有關係,在 xv6 中只有父進程能改變子進程的 parent 對象 ```cpp kernel/proc.c int wait(uint64 addr) { struct proc *np; int havekids, pid; struct proc *p = myproc(); // hold p->lock for the whole time to avoid lost // wakeups from a child's exit(). acquire(&p->lock); for(;;){ // Scan through table looking for exited children. havekids = 0; for(np = proc; np < &proc[NPROC]; np++){ // this code uses np->parent without holding np->lock. // acquiring the lock first would cause a deadlock, // since np might be an ancestor, and we already hold p->lock. if(np->parent == p){ // np->parent can't change between the check and the acquire() // because only the parent changes it, and we're the parent. acquire(&np->lock); havekids = 1; if(np->state == ZOMBIE){ // Found one. pid = np->pid; if(addr != 0 && copyout(p->pagetable, addr, (char *)&np->xstate, sizeof(np->xstate)) < 0) { release(&np->lock); release(&p->lock); return -1; } freeproc(np); release(&np->lock); release(&p->lock); return pid; } release(&np->lock); } } // No point waiting if we don't have any children. if(!havekids || p->killed){ release(&p->lock); return -1; } // Wait for a child to exit. sleep(p, &p->lock); //DOC: wait-sleep } } static void freeproc(struct proc *p) { if(p->trapframe) kfree((void*)p->trapframe); p->trapframe = 0; if(p->pagetable) proc_freepagetable(p->pagetable, p->sz); p->pagetable = 0; p->sz = 0; p->pid = 0; p->parent = 0; p->name[0] = 0; p->chan = 0; p->killed = 0; p->xstate = 0; p->state = UNUSED; } ``` 另一個關閉進程的方法為 kill 系統呼叫,kill 讓進程能夠終止另一個進程,與 exit 一樣待終止的進程可能正在進行一些重要的運算,我們不能直接將進程 kill 掉,所以 kill 函式做的事情非常有限,遍歷進程表並找到 pid 對應的進程,找到之後設置進程的 killed 狀態,如果進程處於睡眠狀態則將狀態改為 RUNNABLE 讓調度器進行排程 待終止的進程因為某些原因進入 trap 時,usertrap 會停在在能安全停止運行的位置(如系統呼叫之前、trap handler 完成之後等),檢查 p->killed 是否為真,若為真則呼叫 exit 終止進程。所以實際上 kill 不是立刻停止進程的運行,而是在下一次因系統呼叫或中斷陷入 kernel space 時,進程才會開始退出流程,所以 kill 很可能有明顯的延時 以 piperead 為例,當進程被設置為 RUNNBALE,進程將從睡眠狀態中喚醒並重新進入排程,並返回 sleep 函數的呼叫處。當我們回到 piperead 繼續 while loop,很有可能 buffer 中仍然沒有資料,接著 piperead 會判斷進程是否被 kill,如果是則返回 -1 並跳轉到 usertrap syscall 的位置 (piperead 是系統呼叫)。最後 trap handler 檢查 p->killed 並呼叫 exit ```cpp kernel/proc.c int kill(int pid) { struct proc *p; for(p = proc; p < &proc[NPROC]; p++){ acquire(&p->lock); if(p->pid == pid){ p->killed = 1; if(p->state == SLEEPING){ // Wake process from sleep(). p->state = RUNNABLE; } release(&p->lock); return 0; } release(&p->lock); } return -1; } ``` 還有一些情況,進程在 SLEEPING 狀態被 kill 時不能直接退出,通常是與 file system 相關的操作,因為我們希望保持 file system 的持久性與一致性,所以在完成檔案相關的操作之前,進程不能退出。例如以下硬碟驅動的程式碼片段,我們不會在 while loop 內檢查 p->killed,而是等到整個系統呼叫完成後,再進行檢查及退出的動作 ```cpp kernel/virtio_disk.c // Wait for virtio_disk_intr() to say request has finished. while(b->disk == 1) { sleep(b, &disk.vdisk_lock); } ``` 最後我們來看一下 init 進程,init 是 xv6 創建的第一個 user 進程,init 會不斷地呼叫 wait,好讓被託管給 init 的子進程在未來被 kill 時,資源可以被正常釋放 ```cpp user/init.c int main(void) { int pid, wpid; if(open("console", O_RDWR) < 0){ mknod("console", CONSOLE, 0); open("console", O_RDWR); } dup(0); // stdout dup(0); // stderr for(;;){ printf("init: starting sh\n"); pid = fork(); if(pid < 0){ printf("init: fork failed\n"); exit(1); } if(pid == 0){ exec("sh", argv); printf("init: exec sh failed\n"); exit(1); } for(;;){ // this call to wait() returns if the shell exits, // or if a parentless process exits. wpid = wait((int *) 0); if(wpid == pid){ // the shell exited; restart it. break; } else if(wpid < 0){ printf("init: wait returned an error\n"); exit(1); } else { // it was a parentless process; do nothing. } } } } ``` ## Uthread: switching between threads (moderate) `TODO` ## Using threads (moderate) `TODO` ## Barrier(moderate) `TODO` ## Reference 1. [xv6: a simple, Unix-like teaching operating system](https://pdos.csail.mit.edu/6.828/2020/xv6/book-riscv-rev1.pdf) 2. [Lab: Multithreading](https://pdos.csail.mit.edu/6.S081/2020/labs/thread.html) 3. [Chapter 7: Scheduling](https://zhuanlan.zhihu.com/p/353580321) 4. [MIT6.S081 課程翻譯](https://mit-public-courses-cn-translatio.gitbook.io/mit6-s081/)