---
title: Nats筆記
---
# Nats筆記
###### tags: `Golang` `Optimize` `infrastructure`
[Toc]
## Golang
發布訂閱模式: https://github.com/ronnielin8862/go-practice/tree/master/cmd/mq/nat
# JetStream
為何需要在程式中使用JetStream? 除了更多功能以外,最重要的是Nats 本身不提供訊息持久化,當pub沒被sub接走,隨即被丟棄了。 JetStream即是這個功能的補全。 其他細節看參考文章
參考文章:https://marco79423.net/articles/%E6%B7%BA%E8%AB%87-natsstan-%E5%92%8C-jetstream-%E5%85%A9%E4%B8%89%E4%BA%8B
### up
docker run -p 4222:4222 nats -js
### 重點
1. 透過jetStreamPublish方法預執行,再運行sub,可以發現會一次讀取到之前被存儲的消息
2. one stream can be many subject, but one subject can be only one stream. if you want change subject in stream, restart nats
3.
# Nats-streaming
### 特點:
1. 已被棄用
2. 頻道訂閱無法使用模糊匹配
3. 沒有斷線重連機制,需搭配使用原生nats的重連機制。 簡的來說也就是比較多工
4. 推薦使用 JetStream 達到消息持久化
## Install
1. docker-compose.yaml配置
```
version: "2"
services:
nats-server:
#image: provide/nats-server:latest
image: nats:latest
volumes:
- ./nats-server.conf:/nats-server.conf
ports:
- 4222:4222
```
2. 配置文件nats-server.conf
```
# Client port of 4222 on all interfaces
port: 4222
# HTTP monitoring port
monitor_port: 8222
# This is for clustering multiple servers together.
cluster {
# Route connections to be received on any interface on port 6222
port: 6222
# Routes are protected, so need to use them with --routes flag
# e.g. --routes=nats-route://ruser:T0pS3cr3t@otherdockerhost:6222
authorization {
user: ruser
password: T0pS3cr3t
timeout: 2
}
# Routes are actively solicited and connected to from this server.
# This Docker image has none by default, but you can pass a
# flag to the gnatsd docker image to create one to an existing server.
routes = []
}
```
3. up
```
docker-compose -f up/restart/start/stop
```
## 代碼 (含純nats, jetstream, stan三種)
https://github.com/ronnielin8862/go-practice/tree/master/cmd/mq/nat
max_payload 上限調整:
```
# # 最大的有效负载
max_payload: 10485760
```
透過conf檔設定重啟: ```nats-streaming-server -st FILE -dir natslog -c /etc/nats/nats.conf```
## 踩坑、與重要概念:
1. 重要交易ack:
sub 預設 收到msg auto ack, 如果處理失敗不會再次被處理。 處理高敏資料時這裡需要改成手動ack, 交易處理完成後才ack, 遇到失敗時nats可以讓下一個訂閱再次處理,確保交易完成
2. sub.ackWait 參數達到設定的時間,nats會視為timeout並且重送。 實務上遇到的狀況是,系統消化mq較慢。 在一般情況下ackwait設定得當,系統會逐步消化mq,達到流量肖峰; 但因為ackwait設定為五秒(某位天才同事設定的,默認值是30秒),nats五秒後又重送mq到系統,重複增加更多的待消化mq在memory,最終導致了server oom整個崩潰。
當時我們面對OOM的方式是將訂閱的sub.maxInflight設定為1,nats不會堆積消息在server memory,但是這副作用是每次處理完一個mq, ack後nata才發送新的mq,反覆的在網路io等待,降低了處理效能,導致消息自publish以後更久才獲得處理,系統顯示的數據比實際情況慢上許多。 再加上最後還會處理到nats認為timeout重複發送的消息,讓server重複處理數據,再次降低消化mq的效能。
3. 訂閱上要留意ackWait, maxInflight之間的搭配,調適的好,可以讓系統效能最大化;處理不好會拖垮整個系統、消息重複消費,等等各式各樣的問題
4. 訂閱如果採用queueSub mode, 也須要留意不同suber 因手上積壓mq過多,消費能力較弱時所導致重複消費mq的問題,遇到金額等高敏資料絕對出人命
## Nats 掉消息 測試
Publish 如果採用async mode, 且推送之間短於1ms,這種情境下訂閱方極容易掉消息(實測掉率超過2/3)
sync:確保消息被處理,可採sync模式會避免上述問題,但是會有極大的效能瓶頸;
async: 只要消息發送結束前有等待一毫秒,經過實測3次100萬條消息沒有遺失過
經果整理以後的心得:異步發送結束前之所以需要等待一毫秒,是因為go語言的特性,在結束主協程後會將子也一併關閉,不像Java等語言,主線程關閉前會等待子線程結束。 因此在實務使用上,遇到接收api觸發多獨立協程的情境,需要留意別讓協程關閉太快,以免丟失訊息
測試代碼:
https://github.com/ronnielin8862/go-practice/tree/master/cmd/mq/nat_stan/loss_msg
## goroutine 處理測試
由於官方文檔沒有細緻說明到library處理的細節,因此自行測試
1. 測試不同channel之間是否是不同goroutine
測試結果: 不同channel之間,一般沒有分別訂閱下,是相同的goroutine
2. 測試同個channel能否活用不同goruotine
測試結果: 同channel為同個goroutine,但可以透過goroutine同時處理。 搭配最大goroutine數控制,可以活用cpu最大化消化mq (如果是常態性的大量處理mq服務,可以考慮使用協程池,減少超高頻率的反覆創建、與關閉斜程,增加系統效能)
3. 根據平常使用經驗以及測試結果可以得知: 訂閱頻道時用a goroutine,收到訊息時就會用a處理,因此如果希望分goroutine 處理消息,可以在訂閱時開啟goroutine
4. 如果單一goroutine處理單channel消化不及,cpu有餘裕時可以在訂閱到消息後再開goroutine加速消化,將容易堆積mq的訂閱,開啟提高消化速度,
本次工作上待調整重點:
1. 訂閱的 maxInflight 增大 - 減少每次ack才取得mq的網路io
2. ackWait 增加 - 避免消化不及server重送消息
3. 獨立的go routine處理獨立的訂閱, 在mq數量龐大的指數訂閱,在消費後再開啟goroutine,用多協程加速處理,根據核心數決定協程數
4. 留意推送是否快到短於1ms,修正這段