# Go 語言開發實戰從入門到進階: Consumer 實作
###### tags: `後端`
## 說明
### 架構
* 使用者透過input添加多個task進入consumer中,再由consumer分配給底下的worker去做實現
* 加入graceful showdown處理,可以優雅的終止排程
* 總之就是:分批排隊處理大量task

## 建置
### 資料夾結構
```
go-traning.test/consumer
│
├── helper
│ │
│ └── helper.go
│
├── queue
│ │
│ ├── queue.go
│ │
│ ├── consumer.go
│ │
│ └── worker.go
│
└── main.go
```
### Step0. 建立Queue物件
1. 定義 job
`queue.go`
```go
// job for adding new task func
type job func()
```
2. 定義排程結構
`queue.go`
```go
type Queue struct {
inputs chan job // 任務輸入channel
tasks chan job // 任務分配channel
workerNum int // 欲啟動的worker數量
}
```
3. new一個Queue
`queue.go`
```go
func New() *Queue {
q := &Queue{
inputs: make(chan job, 10),
tasks: make(chan job, 1024),
workerNum: 4,
}
return q
}
```
`main.go`
```go
func main() {
q := queue.New()
//q.workerNum:4
}
```
4. new一個可以寫入設定的Queue
`queue.go`
```go
type Option func(*Queue)
func WithWorkNum(num int) Option {
return func(q *Queue) {
q.workerNum = num
}
}
func New(opts ...Option) *Queue {
// 定義預設值
q := &Queue{
inputs: make(chan job, 10),
tasks: make(chan job, 1024), // buffer需依任務量調整
workerNum: 4,
}
// 如果有option,套用它
for _, opt := range opts {
opt(q)
}
return q
}
```
`main.go`
```go
func main() {
q := queue.New(
queue.WithWorkNum(2),
)
//q.workerNum:2
}
```
### Step1. 建立Consumer
`consumer.go`
先於背景啟動StartConsumer,開始接收來自inputs頻道的task
再以for迴圈用AddTask將task塞入inputs頻道
若inputs或tasks頻道的buffer滿了,會進入default
```go
func (q *Queue) StartConsumer() {
for input := range q.inputs {
select {
case q.tasks <- input:
default:
log.Println("job has been reached the limitation")
}
}
}
func (q *Queue) AddTask(task job) error {
select {
case q.inputs <- task:
default:
return errors.New("queue has been reached the limitation")
}
return nil
}
```
`main.go`
```go
func main() {
q := queue.New(
queue.WithWorkNum(2),
)
go q.StartConsumer()
for i := 0; i < 10; i++ {
if err := q.AddTask(i); err != nil {
log.Println(err)
}
}
}
```
### Step2. 建立 Worker
1. 定義worker功能,用於讀取tasks頻道並實現func task()
為模擬task的處理時間,睡個幾毫秒。
`helper.go`
```go
func Sleep() {
rand.Seed(time.Now().UnixNano())
n := rand.Intn(1000) // n will be between 0 and 10
log.Printf("Sleeping %d Millisecond...\n", n)
time.Sleep(time.Duration(n) * time.Millisecond)
}
```
`worker.go`
```go
func (q *Queue) worker(num int) {
for task := range q.tasks {
log.Println("worker:", num) // 我是幾號worker?
task()
// 睡覺
helper.Sleep()
}
}
```
2. 依據設定的 workerNum 來啟動 worker
`worker.go`
```go
func (q *Queue) StartWorker() {
for i := 0; i < q.workerNum; i++ {
go q.worker(i)
}
}
```
`main.go`
```go
func main() {
q := queue.New(
queue.WithWorkNum(2),
)
go q.StartConsumer()
q.StartWorker()
for i := 0; i < 10; i++ {
go func(num int) {
if err := q.AddTask(func() {
log.Println("output:", num)
}); err != nil {
log.Println(err)
}
}(i)
}
}
```
3. 執行
...咦?怎麼沒輸出東西呢
```shell
$ go run main.go
>
$
```
4. 因為AddTask後,worker來不及執行完每個task(耗時0~1000毫秒),程式就結束了
在 main.go 尾停個一秒,就有部分的 task 能被處理完。
`main.go`
```go
func main() {
q := queue.New(
queue.WithWorkNum(2),
)
go q.StartConsumer()
q.StartWorker()
for i := 0; i < 10; i++ {
go func(num int) {
if err := q.AddTask(func() {
log.Println("output:", num)
}); err != nil {
log.Println(err)
}
}(i)
}
// 如果沒有sleep,tasks被執行完前 程式就會結束
time.Sleep(time.Second)
}
```
執行後,會看到 log
```shell
$ go run main.go
2022/07/01 10:40:32 worker: 1
2022/07/01 10:40:32 output: 4
2022/07/01 10:40:32 Sleeping 470 Millisecond...
2022/07/01 10:40:32 worker: 0
2022/07/01 10:40:32 output: 1
2022/07/01 10:40:32 Sleeping 753 Millisecond...
2022/07/01 10:40:33 worker: 1
2022/07/01 10:40:33 output: 0
2022/07/01 10:40:33 Sleeping 86 Millisecond...
2022/07/01 10:40:33 worker: 1
2022/07/01 10:40:33 output: 6
2022/07/01 10:40:33 Sleeping 616 Millisecond...
2022/07/01 10:40:33 worker: 0
2022/07/01 10:40:33 output: 5
2022/07/01 10:40:33 Sleeping 206 Millisecond...
2022/07/01 10:40:33 worker: 0
2022/07/01 10:40:33 output: 7
2022/07/01 10:40:33 Sleeping 245 Millisecond...
$
```
### Step3. 加入Waiting Group
為了確保每個 goroutine (worker) 都能被執行完畢,需要加入 sync.WaitGroup
1. Queue結構加入 sync.WaitGroup 物件,New() 那邊也要初始化它
```go
type Queue struct {
inputs chan job
tasks chan job
workerNum int
wg *sync.WaitGroup
}
```
2. 改寫 StarkWorker
```go
func (q *Queue) StartWorker() {
for i := 0; i < q.workerNum; i++ {
q.wg.Add(1)
go func(i int) {
defer q.wg.Done()
q.worker(i)
}(i)
}
}
```
或是以下更好的寫法:
```go
func (q *Queue) run(fn func()) {
q.wg.Add(1)
go func() {
defer q.wg.Done()
fn()
}()
}
func (q *Queue) StartWorker() {
for i := 0; i < q.workerNum; i++ {
num := i
q.run(func() {
q.worker(num)
})
}
}
```
3. 建立 Wait func,並在 func main() 做等待
`queue.go`
```go
func (q *Queue) Wait() {
q.wg.Wait()
}
```
`main.go`
```go
func main() {
q := queue.New(
queue.WithWorkNum(2),
)
...
q.Wait() // wait for goroutine done
log.Println("執行完畢。")
}
```
4. 執行後...deadlock!!
問題出在 main.go 的迴圈把taks全數丟進inputs後,Consumer 仍在嘗試讀取其中 inputs 的值
在持續讀取未關閉的空頻道而造成程式無法繼續往下執行時,就會發生 deadlock。
[可以參考這個簡單範例](https://go.dev/play/p/M4isWrX3g-I)
此情形於 tasks channel 亦同
```shell
$ go run main.go
> fatal error: all goroutines are asleep - deadlock!
```
### Step4. 關閉 channel
1. 關閉 inputs channel
`queue.go`
```go
// 新增關閉 chan 功能
func (q *Queue) Release() {
close(q.inputs)
}
```
`main.go`
```go
func main() {
q := queue.New(
queue.WithWorkNum(2),
)
...
q.Release() // 關閉 chan
q.Wait()
log.Println("執行完畢。")
}
```
2. 關閉 tasks channel
inputs 關閉後,就不會(也不能)繼續向 tasks 塞值
tasks 也必須關閉否則一樣會 deadlock
`consumer.go`
```go
func (q *Queue) StartConsumer() {
for input := range q.inputs {
select {
case q.tasks <- input:
default:
log.Println("job has been reached the limitation")
}
}
close(q.tasks)
}
```
3. 執行
此時除非一次塞入太多 task 造成 input 的 buffer 滿載,程式會在 task 都被執行完畢後才結束。
```shell
$ go run main.go
2022/07/01 11:40:42 worker: 0
2022/07/01 11:40:42 output: 1
2022/07/01 11:40:42 worker: 1
2022/07/01 11:40:42 Sleeping 364 Millisecond...
2022/07/01 11:40:42 output: 0
2022/07/01 11:40:42 Sleeping 496 Millisecond...
2022/07/01 11:40:43 worker: 0
2022/07/01 11:40:43 output: 2
2022/07/01 11:40:43 Sleeping 824 Millisecond...
2022/07/01 11:40:43 worker: 1
2022/07/01 11:40:43 output: 3
2022/07/01 11:40:43 Sleeping 602 Millisecond...
2022/07/01 11:40:43 worker: 1
2022/07/01 11:40:43 output: 4
2022/07/01 11:40:43 Sleeping 960 Millisecond...
2022/07/01 11:40:44 worker: 0
2022/07/01 11:40:44 output: 5
2022/07/01 11:40:44 Sleeping 106 Millisecond...
2022/07/01 11:40:44 worker: 0
2022/07/01 11:40:44 output: 6
2022/07/01 11:40:44 Sleeping 444 Millisecond...
2022/07/01 11:40:44 worker: 0
2022/07/01 11:40:44 output: 7
2022/07/01 11:40:44 Sleeping 163 Millisecond...
2022/07/01 11:40:44 worker: 0
2022/07/01 11:40:44 output: 8
2022/07/01 11:40:44 Sleeping 842 Millisecond...
2022/07/01 11:40:44 worker: 1
2022/07/01 11:40:44 output: 9
2022/07/01 11:40:44 Sleeping 698 Millisecond...
2022/07/01 11:40:45 執行完畢。
```
### Step5. graceful-shutdown
但我們的目的是希望,程式能持續處理大量的 task,且能由外部控制何時終止
而不是處理完一批後就結束程序。
這時就需要借助 context.Context 來實作 graceful-shutdown 的機制,
控制 goroutine 何時該繼續讀寫 channel 或停止。
1. 建立 context WithCancel
2. 將 ctx 傳入 StartConsumer、StartWorker
`main.go`
```go
func main() {
q := queue.New(
queue.WithWorkNum(2),
)
ctx, cancel := context.WithCancel(context.Background())
// 等待系統停止的訊號
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
defer signal.Stop(c)
select {
case <-c:
cancel()
// 把原本最後面的東西移到這邊
q.Release() // 這個可以拿掉
q.Wait()
log.Println("執行完畢。")
}
}()
// 傳入 context
go q.StartConsumer(ctx)
q.StartWorker(ctx)
for i := 0; i < 10; i++ {
func(num int) {
if err := q.AddTask(func() {
log.Println("output:", num)
}); err != nil {
log.Println(err)
}
}(i)
}
}
```
`consumer.go`
將 for range 接收單一 channel 的方式改為 select,接收<-ctx.Done()訊號
移除 close(tasks)
```go
func (q *Queue) StartConsumer(ctx context.Context) {
for {
select {
case input := <-q.inputs:
select {
case q.tasks <- input:
default:
log.Println("job has been reached the limitation")
}
case <-ctx.Done():
return
}
}
}
```
`worker.go`
將 ctx 從 StartWorker 傳入 worker,worker 進行同上處理
```go
func (q *Queue) worker(ctx context.Context, num int) {
for {
select {
case task := <-q.tasks:
log.Println("worker:", num)
task()
helper.Sleep()
case <-ctx.Done():
return
}
}
}
func (q *Queue) StartWorker(ctx context.Context) {
for i := 0; i < q.workerNum; i++ {
q.wg.Add(1)
go func(num int) {
defer q.wg.Done()
q.worker(ctx, num)
}(i)
}
}
```
3. 建立一 unbuffer channel 用於等待程式執行結束
使用空結構體,用法可以參考[這裡](http://35.194.182.75:3000/H1eXgMilQa2sieq28YAHyA?view#%E7%A9%BAstruct%E7%9A%84%E7%89%B9%E6%AE%8A%E7%94%A8%E6%B3%95)
`main.go`
```go
func main() {
// make unbuffer struct{} cahnnel
finished := make(chan struct{})
q := queue.New(
queue.WithWorkNum(2),
)
ctx, cancel := context.WithCancel(context.Background())
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
defer signal.Stop(c)
select {
case <-c:
cancel()
q.Wait()
// while queue finished, close finished channel
close(finished)
log.Println("執行完畢。")
}
}()
go q.StartConsumer(ctx)
q.StartWorker(ctx)
for i := 0; i < 10; i++ {
func(num int) {
if err := q.AddTask(func() {
log.Println("output:", num)
}); err != nil {
log.Println(err)
}
}(i)
}
// wait for progress end
// 如果不等,task未被執行時程序就結束了(跟上面的sleep是一樣的道理)
<-finished
}
```
4. 執行,會於 `ctrl+C` 後才結束程序
### Step6. 最後整理
可以把 context 那段獨立出去
`main.go`
```go
func withContext(fn func()) context.Context {
ctx, cancel := context.WithCancel(context.Background())
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
defer signal.Stop(c)
select {
case <-c:
cancel()
log.Println("接收到關閉訊號...")
fn()
}
}()
return ctx
}
func main() {
finished := make(chan struct{})
q := queue.New(
queue.WithWorkNum(2),
)
ctx := withContext(func() {
q.Wait()
close(finished)
log.Println("執行完畢。")
})
go q.StartConsumer(ctx)
q.StartWorker(ctx)
for i := 0; i < 10; i++ {
func(num int) {
if err := q.AddTask(func() {
log.Println("output:", num)
}); err != nil {
log.Println(err)
}
}(i)
}
<-finished
}
```
從執行結果可以看到:接收到關閉訊號後,會將處理中的task結束後才會結束整個程序
```shell
$ go run main.go
2022/07/01 17:47:25 worker: 1
2022/07/01 17:47:25 output: 0
2022/07/01 17:47:25 worker: 0
2022/07/01 17:47:25 Sleeping 396 Millisecond...
2022/07/01 17:47:25 output: 1
2022/07/01 17:47:25 Sleeping 621 Millisecond...
2022/07/01 17:47:25 worker: 1
2022/07/01 17:47:25 output: 2
2022/07/01 17:47:25 Sleeping 475 Millisecond...
2022/07/01 17:47:25 worker: 0
2022/07/01 17:47:25 output: 3
2022/07/01 17:47:25 Sleeping 612 Millisecond...
2022/07/01 17:47:26 worker: 1
2022/07/01 17:47:26 output: 4
2022/07/01 17:47:26 Sleeping 598 Millisecond...
^C2022/07/01 17:47:26 接收到關閉訊號...
2022/07/01 17:47:26 worker: 0
2022/07/01 17:47:26 output: 5
2022/07/01 17:47:26 Sleeping 951 Millisecond...
2022/07/01 17:47:26 worker: 1
2022/07/01 17:47:26 output: 6
2022/07/01 17:47:26 Sleeping 527 Millisecond...
2022/07/01 17:47:27 執行完畢。
```
## 參考資料
1. [講師的課程解答檔案](https://drive.google.com/file/d/1CPPR6EIu4UyTwf2FABHhuEevTYnbpzOt/view?usp=sharing) 路徑:workshop-20220527-homework/02-graceful-shutdown
2. [[Go 教學] graceful shutdown with multiple workers](https://blog.wu-boy.com/2020/02/graceful-shutdown-with-multiple-workers/)