## Goroutine & Channel ### Coroutine 首先要先介紹 `Coroutine (協程)`,coroutine 是相較於 thread 更小的執行單位,但它是純應用端的執行單位,作業系統並不知道它,但它一樣有自己的記憶體空間。 簡單來說,coroutine 可看作是能在執行中**暫停**並在稍後**恢復**執行的函式,這種暫停和恢復並非由作業系統內核進行調度,而是在用戶態由程式設計師或程式語言的執行時 (runtime) 控制。 **協程的核心特性:** - **使用者層級**: 協程的創建、銷毀和切換都在user space完成,避免了在 kernel mode 與 user mode 之間的切換開銷。 - **協作式多工**: 一個協程需要主動讓出 CPU 執行權,另一個協令才能獲得執行機會,這種方式避免了多執行緒程式設計中常見的 race condition 和 deadlock 問題,因為在同一時間只有一個協程在執行(當只有單核心CPU時)。但現今大都是多核心CPU,所以需考慮 race condition 。 - **輕量級**: 協程的創建和 context switch 成本遠低於 thread,一個 thread 可以承載上萬個協程,因此可以輕鬆實現大規模的concurrency。 - **非阻塞**: 當一個協程遇到 I/O 操作而阻塞時,調度器可以將 CPU 時間切換給其他可執行的協程,從而有效利用 CPU 資源。 ### Goroutine `goroutine` 是 Go 語言中對 `coroutine` 的實作,透過關鍵字 go 啟動,並由 go runtime 調度,每個 Go 程式預設都會建立一個 goroutine,這被稱作是 `main goroutine`,也就是函式 main 中執行的內容。 創建一個 goroutine 的成本非常低,初始 stack 大小僅約 2KB,並且可以根據需要動態擴展。 在多核心系統下,Go runtime會根據可用的邏輯CPU數量(GOMAXPROCS)同時讓多條實體執行緒工作,多個goroutine可以同時(即並行)的執行。 ```go= func main() { fmt.Println("main execution started") // set goroutine printHello() go printHello() fmt.Println("main execution stopped") } func printHello() { fmt.Println("Hello World") } ``` >當 main() 函數回傳的時候程式立即退出,它不會等待任何其他非 main 協程的結束,因此 printHello 不一定有機會輸出。 但當 goroutine 被 block 就會把控制權交給其他的 goroutine,因此可以試著用 time.Sleep() 來暫停 main goroutine。 ```go= func main() { fmt.Println("main started") // set goroutine printHello() go printHello() time.Sleep(2 * 1e9) // sleep for 2 seconds fmt.Println("main finished") } func printHello() { fmt.Println("Hello Goroutine") } ``` 也可以透過匿名函式建立 goroutine ```go= func main() { fmt.Println("main started") // set anonymous goroutine go func(){ fmt.Println("Hello Goroutine") }() time.Sleep(2 * 1e9) // sleep for 2 seconds fmt.Println("main finished") } ``` ### Channel 多個 goroutine 之間往往依賴溝通來達成有效率的合作,傳統的溝通有 `shared memory` 與 `message passing` 兩種方式,Go 語言通常是使用後者,並利用一種特殊的類型 `channel (通道)` 來傳遞訊息。 其運作有點類似早期 UNIX 系統所使用的 pipes,一端寫入一端讀取以 FIFO 的方式傳遞資料,建立 channel 時須決定將要傳遞的資料型態以及 buffer size,宣告方式如下: ```go ch1 := make(chan string) // unbuffered channel with string type ch2 := make(chan int, 3) // buffered channel size=3 with int type ``` channel 的寫入與讀取分別利用 `ch <-` 與 `<- ch` 來操作,而且是 atomic 的操作,避免了 race condition 問題。 ```go= func main() { ch := make(chan string) go sendData(ch) go getData(ch) time.Sleep(1e9) } func sendData(ch chan string) { ch <- "Hello" ch <- "World" } func getData(ch chan string) { var input string for { input = <-ch fmt.Printf("%s ", input) } } ``` #### unbuffered channel 當宣告時沒有指定 buffer size 即為 `unbuffered channel`,因此所有的操作都具有 `blocking` 特性,屬於 synchronous - 當資料傳入 channel 之後 goroutine 會阻塞住,直到有其他的 goroutine 從該 channel 把值讀出來。 - 當要讀取 channel 中的值時,若沒有值可供讀取 goroutine 也會阻塞,直到其他 goroutine 把值寫入 channel 中。 如果 channel 中的資料遲遲無人接收,則無法再傳入其他資料,所以新的輸入無法在通道非空的情況下傳入。 #### buffered channel channel 可以暫時將資料容納在 `buffer` 裡面,可容納數量的上限為 channel 的 `buffer size` ,當 size 為非 0 時即為 `buffered channel`,此時屬於 asynchronous - 若資料傳入 channel 時 buffer 尚有空間,此時為 `non-blocking` 狀態,goroutine 可以繼續執行。唯有當 buffer 已滿的時候才會被 block - 若要讀取 channel 內資料,只要 buffer 非空操作皆為 `non-blocking`。唯有當 buffer 是空的時候才會被 block 可以利用此特性來模擬 semaphore 的 wait & signal 對於系統資源的操作。 ```go= type Empty interface {} type semaphore chan Empty // resources numbers = N sem = make(semaphore, N) // acquire n resources func (s semaphore) P(n int) { e := new(Empty) for i := 0; i < n; i++ { s <- e } } // release n resources func (s semaphore) V(n int) { for i:= 0; i < n; i++{ <- s } } /* wait & signal */ func (s semaphore) Wait(n int) { s.P(n) } func (s semaphore) Signal() { s.V(1) } ``` #### close channel 可以明確的關閉 channel,但也不一定要每次都關閉,只有在告訴接收者不會再提供新的值時才需要關閉。而且關閉通道是 sender 的責任,receiver 不需要負責關閉。 當 channel 被關閉後,如果對這個 channel 繼續寫入,或重複關閉都會導致 panic,但可以讀取已關閉的 channel 直到 buffer 為空。 ```go= ch := make(chan int) defer close(ch) if v, ok := <-ch; !ok { // 表示 channel 內沒有元素且被關閉 } ``` 使用 for range 語句取得 channel 內的值,直到 channel 被關閉 (會主動偵測),或是使用 for 迴圈加上判斷 channel 是否關閉。 ```go= for v := range ch { fmt.Println("The value is %v", v) } //------------------------------------------------- for { input, open := <-ch if !open { break } fmt.Println("The value is %v", input) } ``` ### WaitGroup `WaitGroup` 是 Go 語言中常見的一種等待方式,它允許我們等待一組 goroutine 完成,此用法適合用在需要將單一任務拆成許多次任務,待所有任務完成後才繼續執行的情境。 - `var wg sync.WaitGroup` : 建立 waitgroup,預設 counter 是 0 - `wg.Add(delta int)` : 增加要等待的次數,也可以是負值。 - `defer wg.Done()` : 把要等待的次數減1,通常搭配 defer 使用 - `wg.Wait()` : 把程式暫時阻塞住,直到 counter 歸零,也就是所有 WaitGroup 都呼叫過 done 後才恢復執行 ```go= func main() { var wg sync.WaitGroup for i := 1; i <= 3; i++ { wg.Add(1) go worker(i, &wg) } wg.Wait() fmt.Println("all works done") } func worker(id int, wg *sync.WaitGroup) { defer wg.Done() fmt.Println("Worker %d is working", id) } ``` 這通常用在單純等待任務完成,而不需要從 goroutine 中取得所需資料的情況,如果會需要從 goroutine 中返回資料,使用 channel 將是較好的做法。 ### Select 使用 `select` 可以等待多個 goroutine 中的任何一個完成,當程式執行到 `select` 時,會被 block 住直到有任何一個 case 收到 channel 傳來的資料後(除非有用 default)才會 unblock ```go= func main() { ch1 := make(chan bool) ch2 := make(chan bool) go worker(1, ch1) go worker(2, ch2) select { case <-ch1: fmt.Println("Worker 1 finished") case <-ch2: fmt.Println("Worker 2 finished") default: ... } fmt.Println("work done") } func worker(id int, ch chan bool) { fmt.Printf("Worker %d is working\n", id) ch <- true } ``` 常用的情況是選擇處理列出的多個 channel 中的一個 - 如果都阻塞了,會等待直到其中一個可以處理 - 如果多個可以處理,隨機選擇一個 - 如果沒有 channel 可以處理並且寫了 default 語句,它就會執行 default 的內容 這實現了一種監聽模式,通常用在(無限)迴圈中,在特定情況下透過 break 語句使迴圈退出。 select 搭配 time 還可以用來判斷 channel 的接收是否超時,但在有多個 case 符合條件時, 對 case 的選擇是偽隨機的,select 可能不會在定時器超時訊號到來時立刻選中對應的 case ,因此 goroutine 可能不會嚴格按照定時器設定的時間結束。 ```go= ch := make(chan error, 1) go func() { ch <- client.Call("Service.Method", args, &reply) } () select { case resp := <-ch // use resp and reply case <-time.After(timeoutNs): // call timed out break } ``` --- Reference: https://github.com/unknwon/the-way-to-go_ZH_CN/blob/master/eBook/14.1.md https://github.com/unknwon/the-way-to-go_ZH_CN/blob/master/eBook/14.2.md https://vocus.cc/article/6501b6c7fd897800015c9f30 https://pjchender.dev/golang/go-concurrency-channels/ https://xiang753017.gitbook.io/zixiang-blog/golang/golang-goroutine-concurrency-duo-zhi-hang-xu-qian-tan