---
tags: golang, concurrency, channel
---
# golang concurrency concept
## source
[go_concurrency_example](https://github.com/yuanyu90221/go_concurrency_example)
## refrence:
[linux limit concurrency](https://www.youtube.com/watch?v=jA7aYSRKVTQ)
## introduction
假設現在有100個 job我們希望
透過goroutine的方式 讓每次可以並行執行10個goroutine
原本的寫法是
用waitGroup來把100 job加入
使用一個 integer channel found來接收執行結果
並使用一個size 為10的buffered channel limitCh來接收每次執行的task
然後每次執行完task之後把結果寫入found
並且把waitGroup 的count 減去1, 把limitCh的task讀出
最後的for loop 則在found被close 之前 會一直不斷接收found的結果
直到found close後
才會印出所有的結果result
在這個最出版本的code
有一個問題
就是 當bufferred channel limitCh被寫滿10的時候
limitCh就會被block 無法接收其他寫入的task
那如果這時寫入第11task
就會deadlock 無法運作
```golang
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func main() {
const concurrencyProcesses = 10 // limit the maximum number of concurrent reading process tasks
const jobCount = 100
var wg sync.WaitGroup
wg.Add(jobCount)
found := make(chan int)
limitCh := make(chan struct{}, concurrencyProcesses)
for i := 0; i < concurrencyProcesses; i++ {
limitCh <- struct{}{}
go func(val int) {
defer func() {
<-limitCh
wg.Done()
}()
waitTime := rand.Int31n(1000)
fmt.Println("job:", val, "wait time:", waitTime, "millisecond")
time.Sleep(time.Duration(waitTime) * time.Millisecond)
found <- val
}(i)
}
go func() {
wg.Wait()
close(found)
}()
var results []int
for p := range found {
fmt.Println("Finished job:", p)
results = append(results, p)
}
fmt.Println("result:", results)
}
```
想法一:
出問題的點在於 寫入limitCh的地方有可能遭到block
所以 把 寫入 limitCh的地方 也寫成goroutine
讓limitCh <- struct{}{} 也在背景執行
這樣就不會block
也就是:
```golang
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func main() {
const concurrencyProcesses = 10 // limit the maximum number of concurrent reading process tasks
const jobCount = 100
var wg sync.WaitGroup
wg.Add(jobCount)
found := make(chan int)
limitCh := make(chan struct{}, concurrencyProcesses)
for i := 0; i < concurrencyProcesses; i++ {
go func () {
limitCh <- struct{}{}
}()
go func(val int) {
defer func() {
<-limitCh
wg.Done()
}()
waitTime := rand.Int31n(1000)
fmt.Println("job:", val, "wait time:", waitTime, "millisecond")
time.Sleep(time.Duration(waitTime) * time.Millisecond)
found <- val
}(i)
}
go func() {
wg.Wait()
close(found)
}()
var results []int
for p := range found {
fmt.Println("Finished job:", p)
results = append(results, p)
}
fmt.Println("result:", results)
}
```
然而 這樣做
由於執行寫入limitCh跟執行task的邏輯都在背景執行
因此 limitCh這個 bufferedChannel便無法達到只限制同時只執行10個task的效果
因此需要保持同時只執行10個的效果 必須使用其他的設計方式
設計如下
寫一個 integer channel queue
首先把所有的 task 逐步寫入 queue這個channel
直到結束 才把queue close
然後透過for loop 一次讀取10個
直到所有queue被讀取完畢
實做如下:
```golang
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func main() {
const concurrencyProcesses = 10 // limit the maximum number of concurrent reading process tasks
const jobCount = 100
var wg sync.WaitGroup
wg.Add(jobCount)
found := make(chan int)
queue := make(chan int)
go func(queue chan<- int) {
for i := 0; i < jobCount; i++ {
queue <- i
}
close(queue)
}(queue)
for i := 0; i < concurrencyProcesses; i++ {
go func(val int) {
for val := range queue {
defer wg.Done()
waitTime := rand.Int31n(1000)
fmt.Println("job:", val, "wait time:", waitTime, "millisecond")
time.Sleep(time.Duration(waitTime) * time.Millisecond)
found <- val
}
}(i)
}
go func() {
wg.Wait()
close(found)
}()
var results []int
for p := range found {
fmt.Println("Finished job:", p)
results = append(results, p)
}
fmt.Println("result:", results)
}
```
這樣就能夠保證每次只有10個 task被同時執行
使用這樣的設計 必須要了解到 non-buffered-channel的 讀寫的block特性
還有如何使用waitGroup的概念