應用 Golang channel 處理執行緒安全問題

理論

什麼是執行緒安全問題 (Thread Safety)

  • 多個執行緒可以安全的存取共享資源
  • 避免
    • deadlock
    • resource leak
    • race condition
  • 換句話說就是在解決競爭問題

處理執行緒安全問題的幾個方法

  • Immutable Object
  • Thread-Safe Containers (e.g. Concurrent Queue/Map)
  • Mutex, Atomic
  • Message Passing
  • Software transactional memory (STM)

Immutable Object

  • 記憶體消耗大
  • 對大型物件,需要考量複製的效能
  • Golang沒有原生支援這個作法

Thread-Safe Containers

  • sync.Map, sync.Pool
  • 該物件所提供的操作保證了執行緒安全
    • store, load, delete, range
  • 不擅長處理競爭問題
    • 遊戲中的HP血條邏輯處理
    • 對接API,需要登入取得JWT Token,並保存下來重複使用

Atomic

  • 限制: 通常只能保護一個64bit的資料
  • 複雜度: 需要使用支援的atomic指令對資料進行操作,實作更複雜
  • 更加偏向底層操作,帶來高效能的同時,也更難撰寫出正確的程式
    • memory model將是使用atomic時的必備知識

Mutex

  • 最常見的解法,直接以一把巨型鎖處理
  • 巨型鎖的競爭問題會更嚴重
  • 細粒度鎖由於鎖數量的增加,可能引入dead lock問題
  • Golang的sync.Mutex是不可重入的(不可重複呼叫Lock)

Golang Channel

  • 透過Channel通訊,取代了對同一個資源的競爭
  • 以底層實作來理解的話,競爭問題被轉移到了內部處理

Channel + Select

  • 避免競爭 一次只處理一個channel的資料
  • 組合多種不同用途的channel來處理複雜的系統
  • 直觀 撰寫簡單
  • 性能差 (不會有multi reader, single writer)

常見應用

定時任務

ticker := time.NewTicker(time.Second) for range ticker.C { doTask() }

可中斷的睡眠

應用於需要定時任務的場景,但程式退出時能馬上離開,不用等待下次喚醒

func IntSleep(ctx context.Context, d time.Duration) {
    select {
    case <-ctx.Done():
    case <-time.After(d):
    }
}

緩衝

緩衝區滿時或每秒寫入 (buffer不再需要加鎖)

buffer := make([]string, 0, 128)
for {
    select {
    case log := <-logChan:
        buffer = append(buffer, log)
        if len(buffer) >= 128 {
            flush(buffer)
        }
    case <-time.After(time.Second):
        flush(buffer)
    }
}

進階應用

如何以channel方式設計

  • Timer我們也當作是一種IO來分類
  • 思考系統中的IO Task和Computation Task
  • 一個goroutine負責一到多個IO Task
  • Computation Task在channel的send或receive端處理
  • 兩個Computation Task之間不要透過Channel溝通

Code Example 1: Quote Receiver

  • 每分鐘dump一次:一分鐘內收到的封包量
  • 收到資料就直接更新redis的內容
  • 收到資料後根據條件寫入到資料庫
    • 緩衝區已滿
    • 每秒寫入
flushTicker := time.NewTicker(time.Second)
tickChan := c.subscribeTick(exchange, symbol)
for {
    select {
    case <-ctx.Done():
        c.flush()
        return
    case <-time.After(time.Minute):
        c.dumpMetrics()
    case tick := <-tickChan:
        c.writeToRedis(tick)
        c.addToBuffer(tick)
    case <-flushTicker.C:
        c.flush()
    }
}

Code Example 2: WebSocket Hub

Hub是一個廣播者,加入的client可以收到所有廣播訊息

client

利用channel將自己的連線新增到hub中

func(x *Controller) webSocketHandler(c *gin.Context) {
    conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
    if err != nil {
        log.Println(err)
        return
    }
    
    x.hub.register <- conn
    <-c.Done()
}

hub

func (h *Hub) run() {
    for {
        select {
        case conn := <-h.register:
            client := NewClient(conn)
            h.clients[client] = struct{}{}
        case client := <-h.unregister:
            h.remove(client)
        case message := <-h.broadcast:
            for client := range h.clients {
                select {
                case client.send <- message:
                default:
                    h.remove(client)
                }
            }
        }
    }
}

Code Example3: Trade Engine

engine內含一個狀態機,用來控制目前是否可用,狀態有可能是(paused, ready, running, halted)
外部有可能對engine觸發以下操作:

  • Hot-Reload 程式設定
  • 與其他服務斷線,因此暫停Engine
  • 中止Engine
  • 重置異常狀態
for {
    select {
    case order := <-e.orderCh:
        e.placeOrder(order)
    case report := <-e.reportCh:
        e.handleReport(report)
    case fn := <-e.opCh:
        if e.canDoOperation() {
            fn(e)
        } else {
            e.putToQueue(fn)
        }
    case marketStatus := <-e.marketStatusCh:
        e.handleMarketStatus(marketStatus)
    }
    
    e.nextState()
}

查詢engine狀態的實作方式

func (e *Engine) QueryStatus() *Status {
    var wg sync.WaitGroup
    var status Status
    wg.Add(1)
    fn := func(e *Engine) {
        defer wg.Done()
        e.CopyStatus(&status)
    }
    e.opCh <- fn
    wg.Wait()
    return &status
} 
  • Engine可以延遲操作,只有狀態滿足條件才被執行 (e.g. 等到變為ready時)
  • 不用考慮每個操作需要對哪些資料加鎖,因為所有操作都在單執行緒下執行

API Design Example1: config watcher

  • 從資料庫中讀取設定 (Net IO + Compute)
  • 每分鐘重讀一次 (Timer)
  • 收到通知時重讀一次 (Net IO)
func NewConfigWatcher(db *gorm.DB) *ConfigWatcher func (c *ConfigWatcher) WatchChan(context.Context) <-chan Config

API Design Example2: DB writer

  • 新增記錄到資料庫中,由於每次有資料都新增的速度太慢,我們想要批次新增
    • 資料不會暫存超過一秒
    • 如果資料蒐集100筆就直接新增
    • 如果判斷該資料已經新增過就過濾 (假設沒有其他人會寫入,因此可以在程式內先過濾)
func NewDBWriter(db *gorm.DB) *DBWriter func (w *DBWriter) createHashTable() func (w *DBWriter) Start(context.Context) func (w *DBWriter) Chan() chan<- Record