# 應用 Golang channel 處理執行緒安全問題
## 理論
### 什麼是執行緒安全問題 (Thread Safety)
- 多個執行緒可以安全的存取共享資源
- 避免
- deadlock
- resource leak
- race condition
- 換句話說就是在解決**競爭**問題
### 處理執行緒安全問題的幾個方法
- Immutable Object
- Thread-Safe Containers (e.g. Concurrent Queue/Map)
- Mutex, Atomic
- Message Passing
- Software transactional memory (STM)
### Immutable Object
- 記憶體消耗大
- 對大型物件,需要考量複製的效能
- Golang沒有原生支援這個作法
### Thread-Safe Containers
- sync.Map, sync.Pool
- 該物件所提供的操作保證了執行緒安全
- store, load, delete, range...
- 不擅長處理競爭問題
- 遊戲中的HP血條邏輯處理
- 對接API,需要登入取得JWT Token,並保存下來重複使用
### Atomic
- 限制: 通常只能保護一個64bit的資料
- 複雜度: 需要使用支援的atomic指令對資料進行操作,實作更複雜
- 更加偏向底層操作,帶來高效能的同時,也更難撰寫出正確的程式
- memory model將是使用atomic時的必備知識
### Mutex
- 最常見的解法,直接以一把巨型鎖處理
- 巨型鎖的競爭問題會更嚴重
- 細粒度鎖由於鎖數量的增加,可能引入dead lock問題
- Golang的sync.Mutex是不可重入的(不可重複呼叫Lock)
### Golang Channel
- 透過Channel通訊,取代了對同一個資源的競爭
- 以底層實作來理解的話,競爭問題被轉移到了內部處理
### Channel + Select
- 避免競爭 一次只處理一個channel的資料
- 組合多種不同用途的channel來處理複雜的系統
- 直觀 撰寫簡單
- 性能差 (不會有multi reader, single writer)
## 常見應用
### 定時任務
```go=
ticker := time.NewTicker(time.Second)
for range ticker.C {
doTask()
}
```
### 可中斷的睡眠
應用於需要定時任務的場景,但程式退出時能馬上離開,不用等待下次喚醒
```go
func IntSleep(ctx context.Context, d time.Duration) {
select {
case <-ctx.Done():
case <-time.After(d):
}
}
```
### 緩衝
緩衝區滿時或每秒寫入 (buffer不再需要加鎖)
```go
buffer := make([]string, 0, 128)
for {
select {
case log := <-logChan:
buffer = append(buffer, log)
if len(buffer) >= 128 {
flush(buffer)
}
case <-time.After(time.Second):
flush(buffer)
}
}
```
## 進階應用
### 如何以channel方式設計
- Timer我們也當作是一種IO來分類
- 思考系統中的IO Task和Computation Task
- 一個goroutine負責一到多個IO Task
- Computation Task在channel的send或receive端處理
- __兩個Computation Task之間不要透過Channel溝通__
### Code Example 1: Quote Receiver
- 每分鐘dump一次:一分鐘內收到的封包量
- 收到資料就直接更新redis的內容
- 收到資料後根據條件寫入到資料庫
- 緩衝區已滿
- 每秒寫入
```go
flushTicker := time.NewTicker(time.Second)
tickChan := c.subscribeTick(exchange, symbol)
for {
select {
case <-ctx.Done():
c.flush()
return
case <-time.After(time.Minute):
c.dumpMetrics()
case tick := <-tickChan:
c.writeToRedis(tick)
c.addToBuffer(tick)
case <-flushTicker.C:
c.flush()
}
}
```
### Code Example 2: WebSocket Hub
Hub是一個廣播者,加入的client可以收到所有廣播訊息
#### client
利用channel將自己的連線新增到hub中
```go
func(x *Controller) webSocketHandler(c *gin.Context) {
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
log.Println(err)
return
}
x.hub.register <- conn
<-c.Done()
}
```
#### hub
```go
func (h *Hub) run() {
for {
select {
case conn := <-h.register:
client := NewClient(conn)
h.clients[client] = struct{}{}
case client := <-h.unregister:
h.remove(client)
case message := <-h.broadcast:
for client := range h.clients {
select {
case client.send <- message:
default:
h.remove(client)
}
}
}
}
}
```
### Code Example3: Trade Engine
engine內含一個狀態機,用來控制目前是否可用,狀態有可能是(paused, ready, running, halted)
外部有可能對engine觸發以下操作:
- Hot-Reload 程式設定
- 與其他服務斷線,因此暫停Engine
- 中止Engine
- 重置異常狀態
```go
for {
select {
case order := <-e.orderCh:
e.placeOrder(order)
case report := <-e.reportCh:
e.handleReport(report)
case fn := <-e.opCh:
if e.canDoOperation() {
fn(e)
} else {
e.putToQueue(fn)
}
case marketStatus := <-e.marketStatusCh:
e.handleMarketStatus(marketStatus)
}
e.nextState()
}
```
#### 查詢engine狀態的實作方式
```go
func (e *Engine) QueryStatus() *Status {
var wg sync.WaitGroup
var status Status
wg.Add(1)
fn := func(e *Engine) {
defer wg.Done()
e.CopyStatus(&status)
}
e.opCh <- fn
wg.Wait()
return &status
}
```
- Engine可以延遲操作,只有狀態滿足條件才被執行 (e.g. 等到變為ready時)
- 不用考慮每個操作需要對哪些資料加鎖,因為所有操作都在單執行緒下執行
### API Design Example1: config watcher
- 從資料庫中讀取設定 (Net IO + Compute)
- 每分鐘重讀一次 (Timer)
- 收到通知時重讀一次 (Net IO)
```go=
func NewConfigWatcher(db *gorm.DB) *ConfigWatcher
func (c *ConfigWatcher) WatchChan(context.Context) <-chan Config
```
### API Design Example2: DB writer
- 新增記錄到資料庫中,由於每次有資料都新增的速度太慢,我們想要批次新增
- 資料不會暫存超過一秒
- 如果資料蒐集100筆就直接新增
- 如果判斷該資料已經新增過就過濾 (假設沒有其他人會寫入,因此可以在程式內先過濾)
```go=
func NewDBWriter(db *gorm.DB) *DBWriter
func (w *DBWriter) createHashTable()
func (w *DBWriter) Start(context.Context)
func (w *DBWriter) Chan() chan<- Record
```