# Go 語言開發實戰從入門到進階: Consumer 實作 ###### tags: `後端` ## 說明 ### 架構 * 使用者透過input添加多個task進入consumer中,再由consumer分配給底下的worker去做實現 * 加入graceful showdown處理,可以優雅的終止排程 * 總之就是:分批排隊處理大量task ![](https://i.imgur.com/aRN1B9o.jpg) ## 建置 ### 資料夾結構 ``` go-traning.test/consumer │ ├── helper │ │ │ └── helper.go │ ├── queue │ │ │ ├── queue.go │ │ │ ├── consumer.go │ │ │ └── worker.go │ └── main.go ``` ### Step0. 建立Queue物件 1. 定義 job `queue.go` ```go // job for adding new task func type job func() ``` 2. 定義排程結構 `queue.go` ```go type Queue struct { inputs chan job // 任務輸入channel tasks chan job // 任務分配channel workerNum int // 欲啟動的worker數量 } ``` 3. new一個Queue `queue.go` ```go func New() *Queue { q := &Queue{ inputs: make(chan job, 10), tasks: make(chan job, 1024), workerNum: 4, } return q } ``` `main.go` ```go func main() { q := queue.New() //q.workerNum:4 } ``` 4. new一個可以寫入設定的Queue `queue.go` ```go type Option func(*Queue) func WithWorkNum(num int) Option { return func(q *Queue) { q.workerNum = num } } func New(opts ...Option) *Queue { // 定義預設值 q := &Queue{ inputs: make(chan job, 10), tasks: make(chan job, 1024), // buffer需依任務量調整 workerNum: 4, } // 如果有option,套用它 for _, opt := range opts { opt(q) } return q } ``` `main.go` ```go func main() { q := queue.New( queue.WithWorkNum(2), ) //q.workerNum:2 } ``` ### Step1. 建立Consumer `consumer.go` 先於背景啟動StartConsumer,開始接收來自inputs頻道的task 再以for迴圈用AddTask將task塞入inputs頻道 若inputs或tasks頻道的buffer滿了,會進入default ```go func (q *Queue) StartConsumer() { for input := range q.inputs { select { case q.tasks <- input: default: log.Println("job has been reached the limitation") } } } func (q *Queue) AddTask(task job) error { select { case q.inputs <- task: default: return errors.New("queue has been reached the limitation") } return nil } ``` `main.go` ```go func main() { q := queue.New( queue.WithWorkNum(2), ) go q.StartConsumer() for i := 0; i < 10; i++ { if err := q.AddTask(i); err != nil { log.Println(err) } } } ``` ### Step2. 建立 Worker 1. 定義worker功能,用於讀取tasks頻道並實現func task() 為模擬task的處理時間,睡個幾毫秒。 `helper.go` ```go func Sleep() { rand.Seed(time.Now().UnixNano()) n := rand.Intn(1000) // n will be between 0 and 10 log.Printf("Sleeping %d Millisecond...\n", n) time.Sleep(time.Duration(n) * time.Millisecond) } ``` `worker.go` ```go func (q *Queue) worker(num int) { for task := range q.tasks { log.Println("worker:", num) // 我是幾號worker? task() // 睡覺 helper.Sleep() } } ``` 2. 依據設定的 workerNum 來啟動 worker `worker.go` ```go func (q *Queue) StartWorker() { for i := 0; i < q.workerNum; i++ { go q.worker(i) } } ``` `main.go` ```go func main() { q := queue.New( queue.WithWorkNum(2), ) go q.StartConsumer() q.StartWorker() for i := 0; i < 10; i++ { go func(num int) { if err := q.AddTask(func() { log.Println("output:", num) }); err != nil { log.Println(err) } }(i) } } ``` 3. 執行 ...咦?怎麼沒輸出東西呢 ```shell $ go run main.go > $ ``` 4. 因為AddTask後,worker來不及執行完每個task(耗時0~1000毫秒),程式就結束了 在 main.go 尾停個一秒,就有部分的 task 能被處理完。 `main.go` ```go func main() { q := queue.New( queue.WithWorkNum(2), ) go q.StartConsumer() q.StartWorker() for i := 0; i < 10; i++ { go func(num int) { if err := q.AddTask(func() { log.Println("output:", num) }); err != nil { log.Println(err) } }(i) } // 如果沒有sleep,tasks被執行完前 程式就會結束 time.Sleep(time.Second) } ``` 執行後,會看到 log ```shell $ go run main.go 2022/07/01 10:40:32 worker: 1 2022/07/01 10:40:32 output: 4 2022/07/01 10:40:32 Sleeping 470 Millisecond... 2022/07/01 10:40:32 worker: 0 2022/07/01 10:40:32 output: 1 2022/07/01 10:40:32 Sleeping 753 Millisecond... 2022/07/01 10:40:33 worker: 1 2022/07/01 10:40:33 output: 0 2022/07/01 10:40:33 Sleeping 86 Millisecond... 2022/07/01 10:40:33 worker: 1 2022/07/01 10:40:33 output: 6 2022/07/01 10:40:33 Sleeping 616 Millisecond... 2022/07/01 10:40:33 worker: 0 2022/07/01 10:40:33 output: 5 2022/07/01 10:40:33 Sleeping 206 Millisecond... 2022/07/01 10:40:33 worker: 0 2022/07/01 10:40:33 output: 7 2022/07/01 10:40:33 Sleeping 245 Millisecond... $ ``` ### Step3. 加入Waiting Group 為了確保每個 goroutine (worker) 都能被執行完畢,需要加入 sync.WaitGroup 1. Queue結構加入 sync.WaitGroup 物件,New() 那邊也要初始化它 ```go type Queue struct { inputs chan job tasks chan job workerNum int wg *sync.WaitGroup } ``` 2. 改寫 StarkWorker ```go func (q *Queue) StartWorker() { for i := 0; i < q.workerNum; i++ { q.wg.Add(1) go func(i int) { defer q.wg.Done() q.worker(i) }(i) } } ``` 或是以下更好的寫法: ```go func (q *Queue) run(fn func()) { q.wg.Add(1) go func() { defer q.wg.Done() fn() }() } func (q *Queue) StartWorker() { for i := 0; i < q.workerNum; i++ { num := i q.run(func() { q.worker(num) }) } } ``` 3. 建立 Wait func,並在 func main() 做等待 `queue.go` ```go func (q *Queue) Wait() { q.wg.Wait() } ``` `main.go` ```go func main() { q := queue.New( queue.WithWorkNum(2), ) ... q.Wait() // wait for goroutine done log.Println("執行完畢。") } ``` 4. 執行後...deadlock!! 問題出在 main.go 的迴圈把taks全數丟進inputs後,Consumer 仍在嘗試讀取其中 inputs 的值 在持續讀取未關閉的空頻道而造成程式無法繼續往下執行時,就會發生 deadlock。 [可以參考這個簡單範例](https://go.dev/play/p/M4isWrX3g-I) 此情形於 tasks channel 亦同 ```shell $ go run main.go > fatal error: all goroutines are asleep - deadlock! ``` ### Step4. 關閉 channel 1. 關閉 inputs channel `queue.go` ```go // 新增關閉 chan 功能 func (q *Queue) Release() { close(q.inputs) } ``` `main.go` ```go func main() { q := queue.New( queue.WithWorkNum(2), ) ... q.Release() // 關閉 chan q.Wait() log.Println("執行完畢。") } ``` 2. 關閉 tasks channel inputs 關閉後,就不會(也不能)繼續向 tasks 塞值 tasks 也必須關閉否則一樣會 deadlock `consumer.go` ```go func (q *Queue) StartConsumer() { for input := range q.inputs { select { case q.tasks <- input: default: log.Println("job has been reached the limitation") } } close(q.tasks) } ``` 3. 執行 此時除非一次塞入太多 task 造成 input 的 buffer 滿載,程式會在 task 都被執行完畢後才結束。 ```shell $ go run main.go 2022/07/01 11:40:42 worker: 0 2022/07/01 11:40:42 output: 1 2022/07/01 11:40:42 worker: 1 2022/07/01 11:40:42 Sleeping 364 Millisecond... 2022/07/01 11:40:42 output: 0 2022/07/01 11:40:42 Sleeping 496 Millisecond... 2022/07/01 11:40:43 worker: 0 2022/07/01 11:40:43 output: 2 2022/07/01 11:40:43 Sleeping 824 Millisecond... 2022/07/01 11:40:43 worker: 1 2022/07/01 11:40:43 output: 3 2022/07/01 11:40:43 Sleeping 602 Millisecond... 2022/07/01 11:40:43 worker: 1 2022/07/01 11:40:43 output: 4 2022/07/01 11:40:43 Sleeping 960 Millisecond... 2022/07/01 11:40:44 worker: 0 2022/07/01 11:40:44 output: 5 2022/07/01 11:40:44 Sleeping 106 Millisecond... 2022/07/01 11:40:44 worker: 0 2022/07/01 11:40:44 output: 6 2022/07/01 11:40:44 Sleeping 444 Millisecond... 2022/07/01 11:40:44 worker: 0 2022/07/01 11:40:44 output: 7 2022/07/01 11:40:44 Sleeping 163 Millisecond... 2022/07/01 11:40:44 worker: 0 2022/07/01 11:40:44 output: 8 2022/07/01 11:40:44 Sleeping 842 Millisecond... 2022/07/01 11:40:44 worker: 1 2022/07/01 11:40:44 output: 9 2022/07/01 11:40:44 Sleeping 698 Millisecond... 2022/07/01 11:40:45 執行完畢。 ``` ### Step5. graceful-shutdown 但我們的目的是希望,程式能持續處理大量的 task,且能由外部控制何時終止 而不是處理完一批後就結束程序。 這時就需要借助 context.Context 來實作 graceful-shutdown 的機制, 控制 goroutine 何時該繼續讀寫 channel 或停止。 1. 建立 context WithCancel 2. 將 ctx 傳入 StartConsumer、StartWorker `main.go` ```go func main() { q := queue.New( queue.WithWorkNum(2), ) ctx, cancel := context.WithCancel(context.Background()) // 等待系統停止的訊號 go func() { c := make(chan os.Signal, 1) signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) defer signal.Stop(c) select { case <-c: cancel() // 把原本最後面的東西移到這邊 q.Release() // 這個可以拿掉 q.Wait() log.Println("執行完畢。") } }() // 傳入 context go q.StartConsumer(ctx) q.StartWorker(ctx) for i := 0; i < 10; i++ { func(num int) { if err := q.AddTask(func() { log.Println("output:", num) }); err != nil { log.Println(err) } }(i) } } ``` `consumer.go` 將 for range 接收單一 channel 的方式改為 select,接收<-ctx.Done()訊號 移除 close(tasks) ```go func (q *Queue) StartConsumer(ctx context.Context) { for { select { case input := <-q.inputs: select { case q.tasks <- input: default: log.Println("job has been reached the limitation") } case <-ctx.Done(): return } } } ``` `worker.go` 將 ctx 從 StartWorker 傳入 worker,worker 進行同上處理 ```go func (q *Queue) worker(ctx context.Context, num int) { for { select { case task := <-q.tasks: log.Println("worker:", num) task() helper.Sleep() case <-ctx.Done(): return } } } func (q *Queue) StartWorker(ctx context.Context) { for i := 0; i < q.workerNum; i++ { q.wg.Add(1) go func(num int) { defer q.wg.Done() q.worker(ctx, num) }(i) } } ``` 3. 建立一 unbuffer channel 用於等待程式執行結束 使用空結構體,用法可以參考[這裡](http://35.194.182.75:3000/H1eXgMilQa2sieq28YAHyA?view#%E7%A9%BAstruct%E7%9A%84%E7%89%B9%E6%AE%8A%E7%94%A8%E6%B3%95) `main.go` ```go func main() { // make unbuffer struct{} cahnnel finished := make(chan struct{}) q := queue.New( queue.WithWorkNum(2), ) ctx, cancel := context.WithCancel(context.Background()) go func() { c := make(chan os.Signal, 1) signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) defer signal.Stop(c) select { case <-c: cancel() q.Wait() // while queue finished, close finished channel close(finished) log.Println("執行完畢。") } }() go q.StartConsumer(ctx) q.StartWorker(ctx) for i := 0; i < 10; i++ { func(num int) { if err := q.AddTask(func() { log.Println("output:", num) }); err != nil { log.Println(err) } }(i) } // wait for progress end // 如果不等,task未被執行時程序就結束了(跟上面的sleep是一樣的道理) <-finished } ``` 4. 執行,會於 `ctrl+C` 後才結束程序 ### Step6. 最後整理 可以把 context 那段獨立出去 `main.go` ```go func withContext(fn func()) context.Context { ctx, cancel := context.WithCancel(context.Background()) go func() { c := make(chan os.Signal, 1) signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) defer signal.Stop(c) select { case <-c: cancel() log.Println("接收到關閉訊號...") fn() } }() return ctx } func main() { finished := make(chan struct{}) q := queue.New( queue.WithWorkNum(2), ) ctx := withContext(func() { q.Wait() close(finished) log.Println("執行完畢。") }) go q.StartConsumer(ctx) q.StartWorker(ctx) for i := 0; i < 10; i++ { func(num int) { if err := q.AddTask(func() { log.Println("output:", num) }); err != nil { log.Println(err) } }(i) } <-finished } ``` 從執行結果可以看到:接收到關閉訊號後,會將處理中的task結束後才會結束整個程序 ```shell $ go run main.go 2022/07/01 17:47:25 worker: 1 2022/07/01 17:47:25 output: 0 2022/07/01 17:47:25 worker: 0 2022/07/01 17:47:25 Sleeping 396 Millisecond... 2022/07/01 17:47:25 output: 1 2022/07/01 17:47:25 Sleeping 621 Millisecond... 2022/07/01 17:47:25 worker: 1 2022/07/01 17:47:25 output: 2 2022/07/01 17:47:25 Sleeping 475 Millisecond... 2022/07/01 17:47:25 worker: 0 2022/07/01 17:47:25 output: 3 2022/07/01 17:47:25 Sleeping 612 Millisecond... 2022/07/01 17:47:26 worker: 1 2022/07/01 17:47:26 output: 4 2022/07/01 17:47:26 Sleeping 598 Millisecond... ^C2022/07/01 17:47:26 接收到關閉訊號... 2022/07/01 17:47:26 worker: 0 2022/07/01 17:47:26 output: 5 2022/07/01 17:47:26 Sleeping 951 Millisecond... 2022/07/01 17:47:26 worker: 1 2022/07/01 17:47:26 output: 6 2022/07/01 17:47:26 Sleeping 527 Millisecond... 2022/07/01 17:47:27 執行完畢。 ``` ## 參考資料 1. [講師的課程解答檔案](https://drive.google.com/file/d/1CPPR6EIu4UyTwf2FABHhuEevTYnbpzOt/view?usp=sharing) 路徑:workshop-20220527-homework/02-graceful-shutdown 2. [[Go 教學] graceful shutdown with multiple workers](https://blog.wu-boy.com/2020/02/graceful-shutdown-with-multiple-workers/)