websocket 與gorilla 和 melody
===
###### tags: `技術分享` `Golang`
## 名詞說明
### [gorilla](https://github.com/gorilla/websocket)
gorilla 是golang著名的websocket package
### [melody](https://github.com/olahol/melody)
melody 是基於gorilla,再包裝一層的package
### [echo](https://github.com/labstack/echo)
http server framework
### [gin](https://github.com/gin-gonic/gin)
http server framework
<br/><br/><br/>
## 問題說明
### 下面是某個應用echo的method,介接gorilla websocket連線
``` go
import "github.com/gorilla/websocket"
func WSController(c echo.Context) error {
wsupgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }}
//建立websocket連線
ws, err := wsupgrader.Upgrade(c.Response(), c.Request(), nil)
//錯誤處理
if err != nil {
log.Println(err)
}
// websocket 連線資料處理
socketConnecting(ws, c)
// 連線關閉
// ws.Close()
return nil
}
```
### 問題點
websocket在建立連線後
(http server 路由已經處理,導轉到目標的method,如已進到上面的WSController())
1. 何時有message進來,並且可以即時處理?
1. 若使用者直接關閉網頁,如何知道已經斷線?
1. 若要定時主動送message給client,應該怎麼做?
<br/><br/><br/>
## 原本的問題處理方式
### 程式碼
``` go
func socketConnecting(ws *websocket.Conn, c echo.Context) bool {
var (
// 3分 Timeout 斷線
idleLimit = 3 * time.Minute
timeoutTimer = time.NewTimer(idleLimit)
// 10秒限制自動觸發
conditionLimit = 10 * time.Second
conditionActionTimer = time.NewTimer(conditionLimit)
// Logout channel
logoutChan = make(chan bool, 1)
// 斷線channel
disConnChan = make(chan bool)
)
// 關閉 channel
defer timeoutTimer.Stop()
defer close(logoutChan)
defer conditionActionTimer.Stop()
defer close(disConnChan)
// 做個gorutine 用channel 控制流程
go func() {
for {
select {
// 固定時間到了,定期推送訊息到client
case <-conditionActionTimer.C:
err := ws.WriteMessage(websocket.BinaryMessage, []byte("condtion msg"))
if err != nil {
log.Println(err)
}
// websocket連線timeout,關閉websocket
// 這個關了,讀取就會失敗,所以 timeoutTimer.C >> disConnChan >> logoutChan >> 離開最下面的select
case <-timeoutTimer.C:
ws.Close()
// 收到斷線channel訊息後,等待收到登出的channel訊息
case <-disConnChan:
select {
case logoutChan <- true:
return
}
}
}
}()
// 做個gorutine 持續監聽client是否有送訊息過來
go func() {
for {
// 讀取client送過來的訊息,如果有錯誤,將判斷為已經斷線,送出斷線channel訊息
_, clientData, err := ws.ReadMessage()
if err != nil {
disConnChan <- true
return
}
// 重設timeout的timer(用意是卡在for迴圈的某個地方,3分鐘內來不及重設timeoutTimer的時間就會錯誤嗎?)
// 正常情形會一直不斷對這個timer重設,這樣好嗎?
timeoutTimer.Reset(idleLimit)
// 對clientData做處理
// 處理處理
clientData2 := clientData
// 送訊息到client
ws.WriteMessage(websocket.TextMessage, []byte(clientData2))
}
}()
// 登出
select {
case <-logoutChan:
// 做登出的處理
}
// 不知道又關一次的用意
ws.Close()
return true
}
```
### 結論
1. 有些巧思,有timeout的設計,記得關閉channel和websocket,避免了leak。
1. 都開兩個gorutine了,也許考慮用context串接起來,會比較完整。
1. disconn依靠read拿到錯誤,channel的傳接順序讓閱讀的人不直覺,設計上耦合度太高,個人不喜歡。
<br/><br/><br/>
## 改用melody處理
### 程式碼
``` go
WSServer := melody.New()
// 收到client的訊息
WSServer.HandleMessage(func(s *melody.Session, msg []byte) {
})
// 連線關閉
WSServer.HandleDisconnect(func(s *melody.Session) {
})
```
### 說明
1. melody提供兩個method,一個是收到client的訊息,一個是收到連線關閉的狀態,使用這兩個method就能夠輕易的解決問題點1和問題點2的問題。
1. 原先timer的方式比較像polling,而melody卻是event trigger,兩者高判立下。
1. 該使用方式也能夠把『讀取訊息』和『偵測到關閉連線』兩件事情解耦合,本來兩個業務就應該獨立,而非『偵測連線關閉』要依賴在『讀取錯誤時發生錯誤』。收到錯誤,隨之而來的應該是錯誤處理的考量,而非視為下個觸發機關的要件,這麼做會導致該錯誤處理納入正流程的規劃上,『意外發生』變成了『應該如此發生』。
### 底層mining
如果不知道polling怎麼轉成event trigger的,下次沒有melody可用,我會的可能只是寫出更好的polling罷了....
```go
// 在自己的檔案file底下
WSServer := melody.New()
// 連線關閉
WSServer.HandleDisconnect(func(s *melody.Session) {
log.Println("做關閉的處理")
})
```
#### step 1. 將handler function 傳入
看來它是將disconnectHandler 這個handler綁定我們要做的function而已
```go
// 在 melody.go 底下
// HandleDisconnect fires fn when a session disconnects.
func (m *Melody) HandleDisconnect(fn func(*Session)) {
m.disconnectHandler = fn
}
```
#### step 2. 觀察Melody整個struct
繼續看disconnectHandler所對應的handleSessionFunc
```go
// Melody implements a websocket manager.
type Melody struct {
Config *Config
Upgrader *websocket.Upgrader
messageHandler handleMessageFunc
messageHandlerBinary handleMessageFunc
messageSentHandler handleMessageFunc
messageSentHandlerBinary handleMessageFunc
errorHandler handleErrorFunc
closeHandler handleCloseFunc
connectHandler handleSessionFunc
disconnectHandler handleSessionFunc
pongHandler handleSessionFunc
hub *hub
}
```
#### step 3. 看來handleSessionFunc 只是單純一個type宣告
```go
type handleSessionFunc func(*Session)
```
#### step 4. 接下來應該找Session
step 1 ~ 3 都是綁定的相關步驟,實體資料處理流程在別處
Session 的struct 看起來就很複雜
```go
// 在 session.go 底下
// Session wrapper around websocket connections.
type Session struct {
Request *http.Request
Keys map[string]interface{}
conn *websocket.Conn
output chan *envelope
melody *Melody // 此處竟然綁了一個Melody Struct,真的十分有趣
open bool
rwmutex *sync.RWMutex
}
```
#### step 5.既然有struct,就會有實體化的地方,回頭找 melody.New()
但是New()裡面看不到有跟Session實體化相關的部分,newConfig()沒有,只剩hub要找了
```go
// 在 melody.go 底下
// New creates a new melody instance with default Upgrader and Config.
func New() *Melody {
upgrader := &websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool { return true },
}
hub := newHub()
go hub.run()
return &Melody{
Config: newConfig(),
Upgrader: upgrader,
messageHandler: func(*Session, []byte) {},
messageHandlerBinary: func(*Session, []byte) {},
messageSentHandler: func(*Session, []byte) {},
messageSentHandlerBinary: func(*Session, []byte) {},
errorHandler: func(*Session, error) {},
closeHandler: nil,
connectHandler: func(*Session) {},
disconnectHandler: func(*Session) {},
pongHandler: func(*Session) {},
hub: hub,
}
}
```
#### step 6. 來找hub
sessions、register、unregister 都有Session指標的影子
但光憑這樣還無法把事情前後對起來
```go
// 在 hun.go 底下
type hub struct {
sessions map[*Session]bool
broadcast chan *envelope
register chan *Session
unregister chan *Session
exit chan *envelope
open bool
rwmutex *sync.RWMutex
}
func newHub() *hub {
return &hub{
sessions: make(map[*Session]bool),
broadcast: make(chan *envelope),
register: make(chan *Session),
unregister: make(chan *Session),
exit: make(chan *envelope),
open: true,
rwmutex: &sync.RWMutex{},
}
}
```
#### step 7. 看看 hub.run(),這玩意本身是開goruntine下去跑
case s := <-h.register: 看起來能夠從這裡拿到Session結構的東西,並且放入sessions這個map
下一步,找出哪邊塞入它的!
```go
func (h *hub) run() {
loop:
for {
select {
case s := <-h.register:
h.rwmutex.Lock()
h.sessions[s] = true
h.rwmutex.Unlock()
case s := <-h.unregister:
if _, ok := h.sessions[s]; ok {
h.rwmutex.Lock()
delete(h.sessions, s)
h.rwmutex.Unlock()
}
case m := <-h.broadcast:
h.rwmutex.RLock()
for s := range h.sessions {
if m.filter != nil {
if m.filter(s) {
s.writeMessage(m)
}
} else {
s.writeMessage(m)
}
}
h.rwmutex.RUnlock()
case m := <-h.exit:
h.rwmutex.Lock()
for s := range h.sessions {
s.writeMessage(m)
delete(h.sessions, s)
s.Close()
}
h.open = false
h.rwmutex.Unlock()
break loop
}
}
}
```
#### step 8. 結果稍微有點意外的,發現有塞入channel的地方,要回到melody.go
```go
// 我自己的檔案,綁定gin的router,websocket會進來的地方
// WSConn webscoket 連線
func WSConn() func(c *gin.Context) {
return func(c *gin.Context) {
m := melody.New()
m.HandleRequest(c.Writer, c.Request)
}
}
// 在 melody.go 底下
// HandleRequest upgrades http requests to websocket connections and dispatches them to be handled by the melody instance.
func (m *Melody) HandleRequest(w http.ResponseWriter, r *http.Request) error {
return m.HandleRequestWithKeys(w, r, nil)
}
// HandleRequestWithKeys does the same as HandleRequest but populates session.Keys with keys.
func (m *Melody) HandleRequestWithKeys(w http.ResponseWriter, r *http.Request, keys map[string]interface{}) error {
// 先看hub有沒有被關掉
if m.hub.closed() {
return errors.New("melody instance is closed")
}
// 一樣也是走Upgrader,跟原本的gorilla使用方式沒有什麼不同,只是包裝在這邊
// conn 的type也是gorilla提供的
conn, err := m.Upgrader.Upgrade(w, r, nil)
if err != nil {
return err
}
// 從這裡開始,就是決勝負的關鍵,我們原先處理的方式對於拿到的conn只會讀寫,和close
// melody做了什麼事情,而可以主動知道有message進來,並且主動知道連線已經中斷!
session := &Session{
Request: r,
Keys: keys,
conn: conn, // gorilla的websocket連線!
output: make(chan *envelope, m.Config.MessageBufferSize),
melody: m, // 把自己組裝進Session,成為Session的一份子,OMG這也太酷了
open: true,
rwmutex: &sync.RWMutex{},
}
// 在這裡把session送入hub,hub已經是屬於melody的一份子
// 所以melody裡面有session們,而每個session裡面又有melody(的指標)
m.hub.register <- session
// 觸發connectHandler,所以connectHandler就接得到這個事件發生
m.connectHandler(session)
// 開goruntine 處理對於這個session,寫message到client的行為
go session.writePump()
// 裡面有個無窮迴圈,所以會卡住,直到讀取時發生error
session.readPump()
if !m.hub.closed() {
m.hub.unregister <- session
}
session.close()
// 觸發disconnectHandler,所以disconnectHandler就接得到這個事件發生
// 這裡就是引發了disconnect的事件!
m.disconnectHandler(session)
return nil
}
```
#### step 9. 分析 session.readPump()
```go
func (s *Session) readPump() {
s.conn.SetReadLimit(s.melody.Config.MaxMessageSize)
s.conn.SetReadDeadline(time.Now().Add(s.melody.Config.PongWait))
s.conn.SetPongHandler(func(string) error {
s.conn.SetReadDeadline(time.Now().Add(s.melody.Config.PongWait))
s.melody.pongHandler(s)
return nil
})
if s.melody.closeHandler != nil {
s.conn.SetCloseHandler(func(code int, text string) error {
return s.melody.closeHandler(s, code, text)
})
}
// 這裡的處理方式跟原先程式的處理方式幾乎一模一樣
// 利用無限迴圈,read發現錯誤,而能夠知道已經斷線
// 之前也沒什麼寫過socket-client的程式,也許都是同樣一套搞法?我不知道
for {
t, message, err := s.conn.ReadMessage()
if err != nil {
s.melody.errorHandler(s, err)
break
}
if t == websocket.TextMessage {
s.melody.messageHandler(s, message)
}
if t == websocket.BinaryMessage {
s.melody.messageHandlerBinary(s, message)
}
}
}
```
### 結論
1. 我不知道偵測disconnect是不是一定要靠read拿到error,不論原本的程式做法或者使用melody,兩者的本質是一樣的。
1. 拿到read的方式也是一樣的,用個無窮迴圈持續去要,大神寫的package本質與我們原先的處理方式並無太大差異。
1. 但是包裝成package的閱讀性大幅提高,可以專注於read和disconn兩個事件處理。
1. 使用handler的方式,也不用將處理邏輯綁在接收channel訊號的底下,能夠自由輕鬆地使用。
<br/><br/><br/>
## 總結
1. 以前原本的寫法而言,websocket連線建立後會開3個gorutine,而melody因為read的無窮迴圈跟本身request handler是在一起的,所以只開兩個gorutine,當然以全體的角度而言,還多了一個hub在跑。
1. 套件裡面的Broadcast,與其他部分我並沒有研究,我注重的重點並不一定是這個套件的精髓。
1. 雖然說本質發現read和disconn的方式一致,但是melody的handler寫法讓我有許多觸發,因為研究這個package,讓我明白handler和event trigger的寫法在golang怎麼寫,以及是什麼回事。
1. 另外很有收穫的是,melody竟然把自己的struct包進了每個session,melody擁有的hub,卻又管理著session們,三個struct因此連成一個圈。
1. 每個session負責自己的write和read gorutine,又能夠在適當的時機觸發綁在melody的事件。