--- title: Nats筆記 --- # Nats筆記 ###### tags: `Golang` `Optimize` `infrastructure` [Toc] ## Golang 發布訂閱模式: https://github.com/ronnielin8862/go-practice/tree/master/cmd/mq/nat # JetStream 為何需要在程式中使用JetStream? 除了更多功能以外,最重要的是Nats 本身不提供訊息持久化,當pub沒被sub接走,隨即被丟棄了。 JetStream即是這個功能的補全。 其他細節看參考文章 參考文章:https://marco79423.net/articles/%E6%B7%BA%E8%AB%87-natsstan-%E5%92%8C-jetstream-%E5%85%A9%E4%B8%89%E4%BA%8B ### up docker run -p 4222:4222 nats -js ### 重點 1. 透過jetStreamPublish方法預執行,再運行sub,可以發現會一次讀取到之前被存儲的消息 2. one stream can be many subject, but one subject can be only one stream. if you want change subject in stream, restart nats 3. # Nats-streaming ### 特點: 1. 已被棄用 2. 頻道訂閱無法使用模糊匹配 3. 沒有斷線重連機制,需搭配使用原生nats的重連機制。 簡的來說也就是比較多工 4. 推薦使用 JetStream 達到消息持久化 ## Install 1. docker-compose.yaml配置 ``` version: "2" services: nats-server: #image: provide/nats-server:latest image: nats:latest volumes: - ./nats-server.conf:/nats-server.conf ports: - 4222:4222 ``` 2. 配置文件nats-server.conf ``` # Client port of 4222 on all interfaces port: 4222 # HTTP monitoring port monitor_port: 8222 # This is for clustering multiple servers together. cluster { # Route connections to be received on any interface on port 6222 port: 6222 # Routes are protected, so need to use them with --routes flag # e.g. --routes=nats-route://ruser:T0pS3cr3t@otherdockerhost:6222 authorization { user: ruser password: T0pS3cr3t timeout: 2 } # Routes are actively solicited and connected to from this server. # This Docker image has none by default, but you can pass a # flag to the gnatsd docker image to create one to an existing server. routes = [] } ``` 3. up ``` docker-compose -f up/restart/start/stop ``` ## 代碼 (含純nats, jetstream, stan三種) https://github.com/ronnielin8862/go-practice/tree/master/cmd/mq/nat max_payload 上限調整: ``` # # 最大的有效负载 max_payload: 10485760 ``` 透過conf檔設定重啟: ```nats-streaming-server -st FILE -dir natslog -c /etc/nats/nats.conf``` ## 踩坑、與重要概念: 1. 重要交易ack: sub 預設 收到msg auto ack, 如果處理失敗不會再次被處理。 處理高敏資料時這裡需要改成手動ack, 交易處理完成後才ack, 遇到失敗時nats可以讓下一個訂閱再次處理,確保交易完成 2. sub.ackWait 參數達到設定的時間,nats會視為timeout並且重送。 實務上遇到的狀況是,系統消化mq較慢。 在一般情況下ackwait設定得當,系統會逐步消化mq,達到流量肖峰; 但因為ackwait設定為五秒(某位天才同事設定的,默認值是30秒),nats五秒後又重送mq到系統,重複增加更多的待消化mq在memory,最終導致了server oom整個崩潰。 當時我們面對OOM的方式是將訂閱的sub.maxInflight設定為1,nats不會堆積消息在server memory,但是這副作用是每次處理完一個mq, ack後nata才發送新的mq,反覆的在網路io等待,降低了處理效能,導致消息自publish以後更久才獲得處理,系統顯示的數據比實際情況慢上許多。 再加上最後還會處理到nats認為timeout重複發送的消息,讓server重複處理數據,再次降低消化mq的效能。 3. 訂閱上要留意ackWait, maxInflight之間的搭配,調適的好,可以讓系統效能最大化;處理不好會拖垮整個系統、消息重複消費,等等各式各樣的問題 4. 訂閱如果採用queueSub mode, 也須要留意不同suber 因手上積壓mq過多,消費能力較弱時所導致重複消費mq的問題,遇到金額等高敏資料絕對出人命 ## Nats 掉消息 測試 Publish 如果採用async mode, 且推送之間短於1ms,這種情境下訂閱方極容易掉消息(實測掉率超過2/3) sync:確保消息被處理,可採sync模式會避免上述問題,但是會有極大的效能瓶頸; async: 只要消息發送結束前有等待一毫秒,經過實測3次100萬條消息沒有遺失過 經果整理以後的心得:異步發送結束前之所以需要等待一毫秒,是因為go語言的特性,在結束主協程後會將子也一併關閉,不像Java等語言,主線程關閉前會等待子線程結束。 因此在實務使用上,遇到接收api觸發多獨立協程的情境,需要留意別讓協程關閉太快,以免丟失訊息 測試代碼: https://github.com/ronnielin8862/go-practice/tree/master/cmd/mq/nat_stan/loss_msg ## goroutine 處理測試 由於官方文檔沒有細緻說明到library處理的細節,因此自行測試 1. 測試不同channel之間是否是不同goroutine 測試結果: 不同channel之間,一般沒有分別訂閱下,是相同的goroutine 2. 測試同個channel能否活用不同goruotine 測試結果: 同channel為同個goroutine,但可以透過goroutine同時處理。 搭配最大goroutine數控制,可以活用cpu最大化消化mq (如果是常態性的大量處理mq服務,可以考慮使用協程池,減少超高頻率的反覆創建、與關閉斜程,增加系統效能) 3. 根據平常使用經驗以及測試結果可以得知: 訂閱頻道時用a goroutine,收到訊息時就會用a處理,因此如果希望分goroutine 處理消息,可以在訂閱時開啟goroutine 4. 如果單一goroutine處理單channel消化不及,cpu有餘裕時可以在訂閱到消息後再開goroutine加速消化,將容易堆積mq的訂閱,開啟提高消化速度, 本次工作上待調整重點: 1. 訂閱的 maxInflight 增大 - 減少每次ack才取得mq的網路io 2. ackWait 增加 - 避免消化不及server重送消息 3. 獨立的go routine處理獨立的訂閱, 在mq數量龐大的指數訂閱,在消費後再開啟goroutine,用多協程加速處理,根據核心數決定協程數 4. 留意推送是否快到短於1ms,修正這段