# 應用 Golang channel 處理執行緒安全問題 ## 理論 ### 什麼是執行緒安全問題 (Thread Safety) - 多個執行緒可以安全的存取共享資源 - 避免 - deadlock - resource leak - race condition - 換句話說就是在解決**競爭**問題 ### 處理執行緒安全問題的幾個方法 - Immutable Object - Thread-Safe Containers (e.g. Concurrent Queue/Map) - Mutex, Atomic - Message Passing - Software transactional memory (STM) ### Immutable Object - 記憶體消耗大 - 對大型物件,需要考量複製的效能 - Golang沒有原生支援這個作法 ### Thread-Safe Containers - sync.Map, sync.Pool - 該物件所提供的操作保證了執行緒安全 - store, load, delete, range... - 不擅長處理競爭問題 - 遊戲中的HP血條邏輯處理 - 對接API,需要登入取得JWT Token,並保存下來重複使用 ### Atomic - 限制: 通常只能保護一個64bit的資料 - 複雜度: 需要使用支援的atomic指令對資料進行操作,實作更複雜 - 更加偏向底層操作,帶來高效能的同時,也更難撰寫出正確的程式 - memory model將是使用atomic時的必備知識 ### Mutex - 最常見的解法,直接以一把巨型鎖處理 - 巨型鎖的競爭問題會更嚴重 - 細粒度鎖由於鎖數量的增加,可能引入dead lock問題 - Golang的sync.Mutex是不可重入的(不可重複呼叫Lock) ### Golang Channel - 透過Channel通訊,取代了對同一個資源的競爭 - 以底層實作來理解的話,競爭問題被轉移到了內部處理 ### Channel + Select - 避免競爭 一次只處理一個channel的資料 - 組合多種不同用途的channel來處理複雜的系統 - 直觀 撰寫簡單 - 性能差 (不會有multi reader, single writer) ## 常見應用 ### 定時任務 ```go= ticker := time.NewTicker(time.Second) for range ticker.C { doTask() } ``` ### 可中斷的睡眠 應用於需要定時任務的場景,但程式退出時能馬上離開,不用等待下次喚醒 ```go func IntSleep(ctx context.Context, d time.Duration) { select { case <-ctx.Done(): case <-time.After(d): } } ``` ### 緩衝 緩衝區滿時或每秒寫入 (buffer不再需要加鎖) ```go buffer := make([]string, 0, 128) for { select { case log := <-logChan: buffer = append(buffer, log) if len(buffer) >= 128 { flush(buffer) } case <-time.After(time.Second): flush(buffer) } } ``` ## 進階應用 ### 如何以channel方式設計 - Timer我們也當作是一種IO來分類 - 思考系統中的IO Task和Computation Task - 一個goroutine負責一到多個IO Task - Computation Task在channel的send或receive端處理 - __兩個Computation Task之間不要透過Channel溝通__ ### Code Example 1: Quote Receiver - 每分鐘dump一次:一分鐘內收到的封包量 - 收到資料就直接更新redis的內容 - 收到資料後根據條件寫入到資料庫 - 緩衝區已滿 - 每秒寫入 ```go flushTicker := time.NewTicker(time.Second) tickChan := c.subscribeTick(exchange, symbol) for { select { case <-ctx.Done(): c.flush() return case <-time.After(time.Minute): c.dumpMetrics() case tick := <-tickChan: c.writeToRedis(tick) c.addToBuffer(tick) case <-flushTicker.C: c.flush() } } ``` ### Code Example 2: WebSocket Hub Hub是一個廣播者,加入的client可以收到所有廣播訊息 #### client 利用channel將自己的連線新增到hub中 ```go func(x *Controller) webSocketHandler(c *gin.Context) { conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { log.Println(err) return } x.hub.register <- conn <-c.Done() } ``` #### hub ```go func (h *Hub) run() { for { select { case conn := <-h.register: client := NewClient(conn) h.clients[client] = struct{}{} case client := <-h.unregister: h.remove(client) case message := <-h.broadcast: for client := range h.clients { select { case client.send <- message: default: h.remove(client) } } } } } ``` ### Code Example3: Trade Engine engine內含一個狀態機,用來控制目前是否可用,狀態有可能是(paused, ready, running, halted) 外部有可能對engine觸發以下操作: - Hot-Reload 程式設定 - 與其他服務斷線,因此暫停Engine - 中止Engine - 重置異常狀態 ```go for { select { case order := <-e.orderCh: e.placeOrder(order) case report := <-e.reportCh: e.handleReport(report) case fn := <-e.opCh: if e.canDoOperation() { fn(e) } else { e.putToQueue(fn) } case marketStatus := <-e.marketStatusCh: e.handleMarketStatus(marketStatus) } e.nextState() } ``` #### 查詢engine狀態的實作方式 ```go func (e *Engine) QueryStatus() *Status { var wg sync.WaitGroup var status Status wg.Add(1) fn := func(e *Engine) { defer wg.Done() e.CopyStatus(&status) } e.opCh <- fn wg.Wait() return &status } ``` - Engine可以延遲操作,只有狀態滿足條件才被執行 (e.g. 等到變為ready時) - 不用考慮每個操作需要對哪些資料加鎖,因為所有操作都在單執行緒下執行 ### API Design Example1: config watcher - 從資料庫中讀取設定 (Net IO + Compute) - 每分鐘重讀一次 (Timer) - 收到通知時重讀一次 (Net IO) ```go= func NewConfigWatcher(db *gorm.DB) *ConfigWatcher func (c *ConfigWatcher) WatchChan(context.Context) <-chan Config ``` ### API Design Example2: DB writer - 新增記錄到資料庫中,由於每次有資料都新增的速度太慢,我們想要批次新增 - 資料不會暫存超過一秒 - 如果資料蒐集100筆就直接新增 - 如果判斷該資料已經新增過就過濾 (假設沒有其他人會寫入,因此可以在程式內先過濾) ```go= func NewDBWriter(db *gorm.DB) *DBWriter func (w *DBWriter) createHashTable() func (w *DBWriter) Start(context.Context) func (w *DBWriter) Chan() chan<- Record ```