websocket 與gorilla 和 melody === ###### tags: `技術分享` `Golang` ## 名詞說明 ### [gorilla](https://github.com/gorilla/websocket) gorilla 是golang著名的websocket package ### [melody](https://github.com/olahol/melody) melody 是基於gorilla,再包裝一層的package ### [echo](https://github.com/labstack/echo) http server framework ### [gin](https://github.com/gin-gonic/gin) http server framework <br/><br/><br/> ## 問題說明 ### 下面是某個應用echo的method,介接gorilla websocket連線 ``` go import "github.com/gorilla/websocket" func WSController(c echo.Context) error { wsupgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }} //建立websocket連線 ws, err := wsupgrader.Upgrade(c.Response(), c.Request(), nil) //錯誤處理 if err != nil { log.Println(err) } // websocket 連線資料處理 socketConnecting(ws, c) // 連線關閉 // ws.Close() return nil } ``` ### 問題點 websocket在建立連線後 (http server 路由已經處理,導轉到目標的method,如已進到上面的WSController()) 1. 何時有message進來,並且可以即時處理? 1. 若使用者直接關閉網頁,如何知道已經斷線? 1. 若要定時主動送message給client,應該怎麼做? <br/><br/><br/> ## 原本的問題處理方式 ### 程式碼 ``` go func socketConnecting(ws *websocket.Conn, c echo.Context) bool { var ( // 3分 Timeout 斷線 idleLimit = 3 * time.Minute timeoutTimer = time.NewTimer(idleLimit) // 10秒限制自動觸發 conditionLimit = 10 * time.Second conditionActionTimer = time.NewTimer(conditionLimit) // Logout channel logoutChan = make(chan bool, 1) // 斷線channel disConnChan = make(chan bool) ) // 關閉 channel defer timeoutTimer.Stop() defer close(logoutChan) defer conditionActionTimer.Stop() defer close(disConnChan) // 做個gorutine 用channel 控制流程 go func() { for { select { // 固定時間到了,定期推送訊息到client case <-conditionActionTimer.C: err := ws.WriteMessage(websocket.BinaryMessage, []byte("condtion msg")) if err != nil { log.Println(err) } // websocket連線timeout,關閉websocket // 這個關了,讀取就會失敗,所以 timeoutTimer.C >> disConnChan >> logoutChan >> 離開最下面的select case <-timeoutTimer.C: ws.Close() // 收到斷線channel訊息後,等待收到登出的channel訊息 case <-disConnChan: select { case logoutChan <- true: return } } } }() // 做個gorutine 持續監聽client是否有送訊息過來 go func() { for { // 讀取client送過來的訊息,如果有錯誤,將判斷為已經斷線,送出斷線channel訊息 _, clientData, err := ws.ReadMessage() if err != nil { disConnChan <- true return } // 重設timeout的timer(用意是卡在for迴圈的某個地方,3分鐘內來不及重設timeoutTimer的時間就會錯誤嗎?) // 正常情形會一直不斷對這個timer重設,這樣好嗎? timeoutTimer.Reset(idleLimit) // 對clientData做處理 // 處理處理 clientData2 := clientData // 送訊息到client ws.WriteMessage(websocket.TextMessage, []byte(clientData2)) } }() // 登出 select { case <-logoutChan: // 做登出的處理 } // 不知道又關一次的用意 ws.Close() return true } ``` ### 結論 1. 有些巧思,有timeout的設計,記得關閉channel和websocket,避免了leak。 1. 都開兩個gorutine了,也許考慮用context串接起來,會比較完整。 1. disconn依靠read拿到錯誤,channel的傳接順序讓閱讀的人不直覺,設計上耦合度太高,個人不喜歡。 <br/><br/><br/> ## 改用melody處理 ### 程式碼 ``` go WSServer := melody.New() // 收到client的訊息 WSServer.HandleMessage(func(s *melody.Session, msg []byte) { }) // 連線關閉 WSServer.HandleDisconnect(func(s *melody.Session) { }) ``` ### 說明 1. melody提供兩個method,一個是收到client的訊息,一個是收到連線關閉的狀態,使用這兩個method就能夠輕易的解決問題點1和問題點2的問題。 1. 原先timer的方式比較像polling,而melody卻是event trigger,兩者高判立下。 1. 該使用方式也能夠把『讀取訊息』和『偵測到關閉連線』兩件事情解耦合,本來兩個業務就應該獨立,而非『偵測連線關閉』要依賴在『讀取錯誤時發生錯誤』。收到錯誤,隨之而來的應該是錯誤處理的考量,而非視為下個觸發機關的要件,這麼做會導致該錯誤處理納入正流程的規劃上,『意外發生』變成了『應該如此發生』。 ### 底層mining 如果不知道polling怎麼轉成event trigger的,下次沒有melody可用,我會的可能只是寫出更好的polling罷了.... ```go // 在自己的檔案file底下 WSServer := melody.New() // 連線關閉 WSServer.HandleDisconnect(func(s *melody.Session) { log.Println("做關閉的處理") }) ``` #### step 1. 將handler function 傳入 看來它是將disconnectHandler 這個handler綁定我們要做的function而已 ```go // 在 melody.go 底下 // HandleDisconnect fires fn when a session disconnects. func (m *Melody) HandleDisconnect(fn func(*Session)) { m.disconnectHandler = fn } ``` #### step 2. 觀察Melody整個struct 繼續看disconnectHandler所對應的handleSessionFunc ```go // Melody implements a websocket manager. type Melody struct { Config *Config Upgrader *websocket.Upgrader messageHandler handleMessageFunc messageHandlerBinary handleMessageFunc messageSentHandler handleMessageFunc messageSentHandlerBinary handleMessageFunc errorHandler handleErrorFunc closeHandler handleCloseFunc connectHandler handleSessionFunc disconnectHandler handleSessionFunc pongHandler handleSessionFunc hub *hub } ``` #### step 3. 看來handleSessionFunc 只是單純一個type宣告 ```go type handleSessionFunc func(*Session) ``` #### step 4. 接下來應該找Session step 1 ~ 3 都是綁定的相關步驟,實體資料處理流程在別處 Session 的struct 看起來就很複雜 ```go // 在 session.go 底下 // Session wrapper around websocket connections. type Session struct { Request *http.Request Keys map[string]interface{} conn *websocket.Conn output chan *envelope melody *Melody // 此處竟然綁了一個Melody Struct,真的十分有趣 open bool rwmutex *sync.RWMutex } ``` #### step 5.既然有struct,就會有實體化的地方,回頭找 melody.New() 但是New()裡面看不到有跟Session實體化相關的部分,newConfig()沒有,只剩hub要找了 ```go // 在 melody.go 底下 // New creates a new melody instance with default Upgrader and Config. func New() *Melody { upgrader := &websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, CheckOrigin: func(r *http.Request) bool { return true }, } hub := newHub() go hub.run() return &Melody{ Config: newConfig(), Upgrader: upgrader, messageHandler: func(*Session, []byte) {}, messageHandlerBinary: func(*Session, []byte) {}, messageSentHandler: func(*Session, []byte) {}, messageSentHandlerBinary: func(*Session, []byte) {}, errorHandler: func(*Session, error) {}, closeHandler: nil, connectHandler: func(*Session) {}, disconnectHandler: func(*Session) {}, pongHandler: func(*Session) {}, hub: hub, } } ``` #### step 6. 來找hub sessions、register、unregister 都有Session指標的影子 但光憑這樣還無法把事情前後對起來 ```go // 在 hun.go 底下 type hub struct { sessions map[*Session]bool broadcast chan *envelope register chan *Session unregister chan *Session exit chan *envelope open bool rwmutex *sync.RWMutex } func newHub() *hub { return &hub{ sessions: make(map[*Session]bool), broadcast: make(chan *envelope), register: make(chan *Session), unregister: make(chan *Session), exit: make(chan *envelope), open: true, rwmutex: &sync.RWMutex{}, } } ``` #### step 7. 看看 hub.run(),這玩意本身是開goruntine下去跑 case s := <-h.register: 看起來能夠從這裡拿到Session結構的東西,並且放入sessions這個map 下一步,找出哪邊塞入它的! ```go func (h *hub) run() { loop: for { select { case s := <-h.register: h.rwmutex.Lock() h.sessions[s] = true h.rwmutex.Unlock() case s := <-h.unregister: if _, ok := h.sessions[s]; ok { h.rwmutex.Lock() delete(h.sessions, s) h.rwmutex.Unlock() } case m := <-h.broadcast: h.rwmutex.RLock() for s := range h.sessions { if m.filter != nil { if m.filter(s) { s.writeMessage(m) } } else { s.writeMessage(m) } } h.rwmutex.RUnlock() case m := <-h.exit: h.rwmutex.Lock() for s := range h.sessions { s.writeMessage(m) delete(h.sessions, s) s.Close() } h.open = false h.rwmutex.Unlock() break loop } } } ``` #### step 8. 結果稍微有點意外的,發現有塞入channel的地方,要回到melody.go ```go // 我自己的檔案,綁定gin的router,websocket會進來的地方 // WSConn webscoket 連線 func WSConn() func(c *gin.Context) { return func(c *gin.Context) { m := melody.New() m.HandleRequest(c.Writer, c.Request) } } // 在 melody.go 底下 // HandleRequest upgrades http requests to websocket connections and dispatches them to be handled by the melody instance. func (m *Melody) HandleRequest(w http.ResponseWriter, r *http.Request) error { return m.HandleRequestWithKeys(w, r, nil) } // HandleRequestWithKeys does the same as HandleRequest but populates session.Keys with keys. func (m *Melody) HandleRequestWithKeys(w http.ResponseWriter, r *http.Request, keys map[string]interface{}) error { // 先看hub有沒有被關掉 if m.hub.closed() { return errors.New("melody instance is closed") } // 一樣也是走Upgrader,跟原本的gorilla使用方式沒有什麼不同,只是包裝在這邊 // conn 的type也是gorilla提供的 conn, err := m.Upgrader.Upgrade(w, r, nil) if err != nil { return err } // 從這裡開始,就是決勝負的關鍵,我們原先處理的方式對於拿到的conn只會讀寫,和close // melody做了什麼事情,而可以主動知道有message進來,並且主動知道連線已經中斷! session := &Session{ Request: r, Keys: keys, conn: conn, // gorilla的websocket連線! output: make(chan *envelope, m.Config.MessageBufferSize), melody: m, // 把自己組裝進Session,成為Session的一份子,OMG這也太酷了 open: true, rwmutex: &sync.RWMutex{}, } // 在這裡把session送入hub,hub已經是屬於melody的一份子 // 所以melody裡面有session們,而每個session裡面又有melody(的指標) m.hub.register <- session // 觸發connectHandler,所以connectHandler就接得到這個事件發生 m.connectHandler(session) // 開goruntine 處理對於這個session,寫message到client的行為 go session.writePump() // 裡面有個無窮迴圈,所以會卡住,直到讀取時發生error session.readPump() if !m.hub.closed() { m.hub.unregister <- session } session.close() // 觸發disconnectHandler,所以disconnectHandler就接得到這個事件發生 // 這裡就是引發了disconnect的事件! m.disconnectHandler(session) return nil } ``` #### step 9. 分析 session.readPump() ```go func (s *Session) readPump() { s.conn.SetReadLimit(s.melody.Config.MaxMessageSize) s.conn.SetReadDeadline(time.Now().Add(s.melody.Config.PongWait)) s.conn.SetPongHandler(func(string) error { s.conn.SetReadDeadline(time.Now().Add(s.melody.Config.PongWait)) s.melody.pongHandler(s) return nil }) if s.melody.closeHandler != nil { s.conn.SetCloseHandler(func(code int, text string) error { return s.melody.closeHandler(s, code, text) }) } // 這裡的處理方式跟原先程式的處理方式幾乎一模一樣 // 利用無限迴圈,read發現錯誤,而能夠知道已經斷線 // 之前也沒什麼寫過socket-client的程式,也許都是同樣一套搞法?我不知道 for { t, message, err := s.conn.ReadMessage() if err != nil { s.melody.errorHandler(s, err) break } if t == websocket.TextMessage { s.melody.messageHandler(s, message) } if t == websocket.BinaryMessage { s.melody.messageHandlerBinary(s, message) } } } ``` ### 結論 1. 我不知道偵測disconnect是不是一定要靠read拿到error,不論原本的程式做法或者使用melody,兩者的本質是一樣的。 1. 拿到read的方式也是一樣的,用個無窮迴圈持續去要,大神寫的package本質與我們原先的處理方式並無太大差異。 1. 但是包裝成package的閱讀性大幅提高,可以專注於read和disconn兩個事件處理。 1. 使用handler的方式,也不用將處理邏輯綁在接收channel訊號的底下,能夠自由輕鬆地使用。 <br/><br/><br/> ## 總結 1. 以前原本的寫法而言,websocket連線建立後會開3個gorutine,而melody因為read的無窮迴圈跟本身request handler是在一起的,所以只開兩個gorutine,當然以全體的角度而言,還多了一個hub在跑。 1. 套件裡面的Broadcast,與其他部分我並沒有研究,我注重的重點並不一定是這個套件的精髓。 1. 雖然說本質發現read和disconn的方式一致,但是melody的handler寫法讓我有許多觸發,因為研究這個package,讓我明白handler和event trigger的寫法在golang怎麼寫,以及是什麼回事。 1. 另外很有收穫的是,melody竟然把自己的struct包進了每個session,melody擁有的hub,卻又管理著session們,三個struct因此連成一個圈。 1. 每個session負責自己的write和read gorutine,又能夠在適當的時機觸發綁在melody的事件。