## Goroutine & Channel
### Coroutine
首先要先介紹 `Coroutine (協程)`,coroutine 是相較於 thread 更小的執行單位,但它是純應用端的執行單位,作業系統並不知道它,但它一樣有自己的記憶體空間。
簡單來說,coroutine 可看作是能在執行中**暫停**並在稍後**恢復**執行的函式,這種暫停和恢復並非由作業系統內核進行調度,而是在用戶態由程式設計師或程式語言的執行時 (runtime) 控制。
**協程的核心特性:**
- **使用者層級**: 協程的創建、銷毀和切換都在user space完成,避免了在 kernel mode 與 user mode 之間的切換開銷。
- **協作式多工**: 一個協程需要主動讓出 CPU 執行權,另一個協令才能獲得執行機會,這種方式避免了多執行緒程式設計中常見的 race condition 和 deadlock 問題,因為在同一時間只有一個協程在執行(當只有單核心CPU時)。但現今大都是多核心CPU,所以需考慮 race condition 。
- **輕量級**: 協程的創建和 context switch 成本遠低於 thread,一個 thread 可以承載上萬個協程,因此可以輕鬆實現大規模的concurrency。
- **非阻塞**: 當一個協程遇到 I/O 操作而阻塞時,調度器可以將 CPU 時間切換給其他可執行的協程,從而有效利用 CPU 資源。
### Goroutine
`goroutine` 是 Go 語言中對 `coroutine` 的實作,透過關鍵字 go 啟動,並由 go runtime 調度,每個 Go 程式預設都會建立一個 goroutine,這被稱作是 `main goroutine`,也就是函式 main 中執行的內容。
創建一個 goroutine 的成本非常低,初始 stack 大小僅約 2KB,並且可以根據需要動態擴展。
在多核心系統下,Go runtime會根據可用的邏輯CPU數量(GOMAXPROCS)同時讓多條實體執行緒工作,多個goroutine可以同時(即並行)的執行。
```go=
func main() {
fmt.Println("main execution started")
// set goroutine printHello()
go printHello()
fmt.Println("main execution stopped")
}
func printHello() {
fmt.Println("Hello World")
}
```
>當 main() 函數回傳的時候程式立即退出,它不會等待任何其他非 main 協程的結束,因此 printHello 不一定有機會輸出。
但當 goroutine 被 block 就會把控制權交給其他的 goroutine,因此可以試著用 time.Sleep() 來暫停 main goroutine。
```go=
func main() {
fmt.Println("main started")
// set goroutine printHello()
go printHello()
time.Sleep(2 * 1e9) // sleep for 2 seconds
fmt.Println("main finished")
}
func printHello() {
fmt.Println("Hello Goroutine")
}
```
也可以透過匿名函式建立 goroutine
```go=
func main() {
fmt.Println("main started")
// set anonymous goroutine
go func(){
fmt.Println("Hello Goroutine")
}()
time.Sleep(2 * 1e9) // sleep for 2 seconds
fmt.Println("main finished")
}
```
### Channel
多個 goroutine 之間往往依賴溝通來達成有效率的合作,傳統的溝通有 `shared memory` 與 `message passing` 兩種方式,Go 語言通常是使用後者,並利用一種特殊的類型 `channel (通道)` 來傳遞訊息。
其運作有點類似早期 UNIX 系統所使用的 pipes,一端寫入一端讀取以 FIFO 的方式傳遞資料,建立 channel 時須決定將要傳遞的資料型態以及 buffer size,宣告方式如下:
```go
ch1 := make(chan string) // unbuffered channel with string type
ch2 := make(chan int, 3) // buffered channel size=3 with int type
```
channel 的寫入與讀取分別利用 `ch <-` 與 `<- ch` 來操作,而且是 atomic 的操作,避免了 race condition 問題。
```go=
func main() {
ch := make(chan string)
go sendData(ch)
go getData(ch)
time.Sleep(1e9)
}
func sendData(ch chan string) {
ch <- "Hello"
ch <- "World"
}
func getData(ch chan string) {
var input string
for {
input = <-ch
fmt.Printf("%s ", input)
}
}
```
#### unbuffered channel
當宣告時沒有指定 buffer size 即為 `unbuffered channel`,因此所有的操作都具有 `blocking` 特性,屬於 synchronous
- 當資料傳入 channel 之後 goroutine 會阻塞住,直到有其他的 goroutine 從該 channel 把值讀出來。
- 當要讀取 channel 中的值時,若沒有值可供讀取 goroutine 也會阻塞,直到其他 goroutine 把值寫入 channel 中。
如果 channel 中的資料遲遲無人接收,則無法再傳入其他資料,所以新的輸入無法在通道非空的情況下傳入。
#### buffered channel
channel 可以暫時將資料容納在 `buffer` 裡面,可容納數量的上限為 channel 的 `buffer size` ,當 size 為非 0 時即為 `buffered channel`,此時屬於 asynchronous
- 若資料傳入 channel 時 buffer 尚有空間,此時為 `non-blocking` 狀態,goroutine 可以繼續執行。唯有當 buffer 已滿的時候才會被 block
- 若要讀取 channel 內資料,只要 buffer 非空操作皆為 `non-blocking`。唯有當 buffer 是空的時候才會被 block
可以利用此特性來模擬 semaphore 的 wait & signal 對於系統資源的操作。
```go=
type Empty interface {}
type semaphore chan Empty
// resources numbers = N
sem = make(semaphore, N)
// acquire n resources
func (s semaphore) P(n int) {
e := new(Empty)
for i := 0; i < n; i++ {
s <- e
}
}
// release n resources
func (s semaphore) V(n int) {
for i:= 0; i < n; i++{
<- s
}
}
/* wait & signal */
func (s semaphore) Wait(n int) {
s.P(n)
}
func (s semaphore) Signal() {
s.V(1)
}
```
#### close channel
可以明確的關閉 channel,但也不一定要每次都關閉,只有在告訴接收者不會再提供新的值時才需要關閉。而且關閉通道是 sender 的責任,receiver 不需要負責關閉。
當 channel 被關閉後,如果對這個 channel 繼續寫入,或重複關閉都會導致 panic,但可以讀取已關閉的 channel 直到 buffer 為空。
```go=
ch := make(chan int)
defer close(ch)
if v, ok := <-ch; !ok {
// 表示 channel 內沒有元素且被關閉
}
```
使用 for range 語句取得 channel 內的值,直到 channel 被關閉 (會主動偵測),或是使用 for 迴圈加上判斷 channel 是否關閉。
```go=
for v := range ch {
fmt.Println("The value is %v", v)
}
//-------------------------------------------------
for {
input, open := <-ch
if !open {
break
}
fmt.Println("The value is %v", input)
}
```
### WaitGroup
`WaitGroup` 是 Go 語言中常見的一種等待方式,它允許我們等待一組 goroutine 完成,此用法適合用在需要將單一任務拆成許多次任務,待所有任務完成後才繼續執行的情境。
- `var wg sync.WaitGroup` : 建立 waitgroup,預設 counter 是 0
- `wg.Add(delta int)` : 增加要等待的次數,也可以是負值。
- `defer wg.Done()` : 把要等待的次數減1,通常搭配 defer 使用
- `wg.Wait()` : 把程式暫時阻塞住,直到 counter 歸零,也就是所有 WaitGroup 都呼叫過 done 後才恢復執行
```go=
func main() {
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1)
go worker(i, &wg)
}
wg.Wait()
fmt.Println("all works done")
}
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Println("Worker %d is working", id)
}
```
這通常用在單純等待任務完成,而不需要從 goroutine 中取得所需資料的情況,如果會需要從 goroutine 中返回資料,使用 channel 將是較好的做法。
### Select
使用 `select` 可以等待多個 goroutine 中的任何一個完成,當程式執行到 `select` 時,會被 block 住直到有任何一個 case 收到 channel 傳來的資料後(除非有用 default)才會 unblock
```go=
func main() {
ch1 := make(chan bool)
ch2 := make(chan bool)
go worker(1, ch1)
go worker(2, ch2)
select {
case <-ch1:
fmt.Println("Worker 1 finished")
case <-ch2:
fmt.Println("Worker 2 finished")
default:
...
}
fmt.Println("work done")
}
func worker(id int, ch chan bool) {
fmt.Printf("Worker %d is working\n", id)
ch <- true
}
```
常用的情況是選擇處理列出的多個 channel 中的一個
- 如果都阻塞了,會等待直到其中一個可以處理
- 如果多個可以處理,隨機選擇一個
- 如果沒有 channel 可以處理並且寫了 default 語句,它就會執行 default 的內容
這實現了一種監聽模式,通常用在(無限)迴圈中,在特定情況下透過 break 語句使迴圈退出。
select 搭配 time 還可以用來判斷 channel 的接收是否超時,但在有多個 case 符合條件時, 對 case 的選擇是偽隨機的,select 可能不會在定時器超時訊號到來時立刻選中對應的 case ,因此 goroutine 可能不會嚴格按照定時器設定的時間結束。
```go=
ch := make(chan error, 1)
go func() { ch <- client.Call("Service.Method", args, &reply) } ()
select {
case resp := <-ch
// use resp and reply
case <-time.After(timeoutNs):
// call timed out
break
}
```
---
Reference:
https://github.com/unknwon/the-way-to-go_ZH_CN/blob/master/eBook/14.1.md
https://github.com/unknwon/the-way-to-go_ZH_CN/blob/master/eBook/14.2.md
https://vocus.cc/article/6501b6c7fd897800015c9f30
https://pjchender.dev/golang/go-concurrency-channels/
https://xiang753017.gitbook.io/zixiang-blog/golang/golang-goroutine-concurrency-duo-zhi-hang-xu-qian-tan