# AEDES 0.46.2 ```shell= npm install aedes@0.46.2 aedes-server-factory aedes-protocol-decoder aedes-persistence-redis redis docker run --name redis -d -p 6379:6379 redis ``` 請去tsconfig新增設定 ```json= "esModuleInterop": true ``` ## aedes宣告 在0.46.2版,aedes是function,所以要用`this.aedesInstance = aedes({});` 而到了0.48.0版,aedes變成了class,會變成要用`this.aedesInstance = new aedes({});` ## example ```typescript= //broker.service.ts import { Injectable, OnModuleInit, OnApplicationBootstrap, } from '@nestjs/common'; import aedes from 'aedes'; import { Aedes } from 'aedes'; import { createServer } from 'aedes-server-factory'; import { protocolDecoder } from 'aedes-protocol-decoder'; import { AuthService } from './auth.service'; @Injectable() export class BrokerService implements OnModuleInit, OnApplicationBootstrap { private aedesInstance: Aedes; constructor(private authService: AuthService) {} onModuleInit() { this.aedesInstance = aedes({}); } onApplicationBootstrap() { createServer(this.aedesInstance, { trustProxy: true, protocolDecoder, }).listen(1883, () => { console.log('MQTT Broker is listening on port 1883'); }); } } ``` <a href="https://docs.nestjs.com/fundamentals/lifecycle-events">NestJS Lifecycle</a> - OnModuleInit 在程式讀到這個service所在的module之後會立刻執行該function ![](https://i.imgur.com/sNaMyUT.png) - OnApplicationBootstrap 在程式執行OnModuleInit之後跟著執行該function 這兩個函式都可以加上async - createServer `createServer(broker, options).listen(port, function)` 建立broker 當trustProxy為true且有設置protocolDecoder時,他會parse http header以及/或parse proxy protocol,然後傳到client.connDetails裡 ![](https://i.imgur.com/UG9KKMG.png) ## Aedes interface ```typescript= // aedes declare export declare function aedes (options?: AedesOptions): Aedes // Aedes interface export interface Aedes extends EventEmitter { id: Readonly<string> connectedClients: Readonly<number> closed: Readonly<boolean> brokers: Readonly<Brokers> handle: (stream: Connection) => Client on (event: 'closed', listener: () => void): this on (event: 'client' | 'clientReady' | 'clientDisconnect' | 'keepaliveTimeout', listener: (client: Client) => void): this on (event: 'clientError' | 'connectionError', listener: (client: Client, error: Error) => void): this on (event: 'connackSent', listener: (packet: ConnackPacket, client: Client) => void): this on (event: 'ping', listener: (packet: PingreqPacket, client: Client) => void): this on (event: 'publish', listener: (packet: AedesPublishPacket, client: Client) => void): this on (event: 'ack', listener: (packet: PublishPacket | PubrelPacket, client: Client) => void): this on (event: 'subscribe', listener: (subscriptions: Subscription[], client: Client) => void): this on (event: 'unsubscribe', listener: (unsubscriptions: string[], client: Client) => void): this publish ( packet: PublishPacket, callback: (error?: Error) => void ): void subscribe ( topic: string, deliverfunc: (packet: AedesPublishPacket, callback: () => void) => void, callback: () => void ): void unsubscribe ( topic: string, deliverfunc: (packet: AedesPublishPacket, callback: () => void) => void, callback: () => void ): void close (callback?: () => void): void preConnect: PreConnectHandler authenticate: AuthenticateHandler authorizePublish: AuthorizePublishHandler authorizeSubscribe: AuthorizeSubscribeHandler authorizeForward: AuthorizeForwardHandler published: PublishedHandler } // Handler type PreConnectHandler = (client: Client, packet: ConnectPacket, callback: (error: Error | null, success: boolean) => void) => void type AuthenticateHandler = ( client: Client, username: Readonly<string>, password: Readonly<Buffer>, done: (error: AuthenticateError | null, success: boolean | null) => void ) => void type AuthorizePublishHandler = (client: Client, packet: PublishPacket, callback: (error?: Error | null) => void) => void type AuthorizeSubscribeHandler = (client: Client, subscription: Subscription, callback: (error: Error | null, subscription?: Subscription | null) => void) => void type AuthorizeForwardHandler = (client: Client, packet: AedesPublishPacket) => AedesPublishPacket | null | void type PublishedHandler = (packet: AedesPublishPacket, client: Client, callback: (error?: Error | null) => void) => void ``` ### id \<string> Default: uuidv4() 回傳該broker的識別碼,預設是用uuid ### connectedClients \<number> Default: 0 回傳連接到這個broker的client的數量 ### closed \<boolean> Default: false 回傳該broker是否已關閉 ### preConnect ```typescript= type PreConnectHandler = ( client: Client, packet: ConnectPacket, callback: (error: Error | null, success: boolean) => void ) => void ... export type ConnectPacket = IConnectPacket & { cmd: 'connect' } ... export interface IConnectPacket extends IPacket { cmd: 'connect' clientId: string protocolVersion?: 4 | 5 | 3 protocolId?: 'MQTT' | 'MQIsdp' clean?: boolean keepalive?: number username?: string password?: Buffer will?: { topic: string payload: Buffer qos?: QoS retain?: boolean properties?: { willDelayInterval?: number, payloadFormatIndicator?: boolean, messageExpiryInterval?: number, contentType?: string, responseTopic?: string, correlationData?: Buffer, userProperties?: UserProperties } } properties?: { sessionExpiryInterval?: number, receiveMaximum?: number, maximumPacketSize?: number, topicAliasMaximum?: number, requestResponseInformation?: boolean, requestProblemInformation?: boolean, userProperties?: UserProperties, authenticationMethod?: string, authenticationData?: Buffer } } ``` 當一個broker收到CONNECT packet時會執行這個handler 通常是client向broker發出連線請求 在callback裡第一個參數放Error,第二個參數決定是否讓該client連線 如果要存ip位置跟timestamp,可以用extends增加property ```typescript= interface IpClient extends Client { ip: string; eventAt: Date; connDetails?; } ... preConnect: ( client: IpClient, packet: ConnectPacket, callback: (error: Error | null, success: boolean) => void, ) => { if (client.connDetails && client.connDetails.ipAddress) { client.ip = client.connDetails.ipAddress.split(':')[3]; } client.eventAt = new Date(); callback(null, true); } ``` ![](https://i.imgur.com/UG9KKMG.png) `client.connDetails.ipAddress.split(':')[3] === 127.0.0.1` ![](https://i.imgur.com/Ke8NvAM.png) 如果ip在blacklist裡,可以拒絕它連線 ### authenticate ```typescript= export enum AuthErrorCode { UNNACCEPTABLE_PROTOCOL = 1, IDENTIFIER_REJECTED = 2, SERVER_UNAVAILABLE = 3, BAD_USERNAME_OR_PASSWORD = 4, NOT_AUTHORIZED = 5 } export type AuthenticateError = Error & { returnCode: AuthErrorCode } type AuthenticateHandler = ( client: Client, username: Readonly<string>, password: Readonly<Buffer>, done: (error: AuthenticateError | null, success: boolean | null) => void ) => void ``` 在preConnect之後接著使用這個handler 用來驗證連到這個broker的client 在這個handler可以驗證帳號跟密碼 callback的第一個參數是error的種類,沒有error就放null 第二個參數是是否通過驗證 通過驗證後會傳CONNACK回去,其return code為0,沒通過驗證的return code預設為5 也可以在2到5之間自己自己設定一個return code return code數字代表的狀態可以參考這個網址 <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_3.1_-">Connect Return code values</a> 小發現: 使用MQTTBOX如果在這個階段被拒絕連線他會持續發送請packet,而在preConnect階段就被拒絕的話不會持續發送packet ```typescript= // example connectAuth(client, username, password, callback) { // console.log({ // id: client.id, // username, // password: password.toString(), // }); const whitelist = ['device:DEMO0001', 'device:DEMO0002']; if (!whitelist.includes(client.id)) { callback(null, false); } callback(null, true); } ``` ### authorizePublish ```typescript= type AuthorizePublishHandler = ( client: Client, packet: PublishPacket, callback: (error?: Error | null) => void ) => void ... export interface IPublishPacket extends IPacket { cmd: 'publish' qos: 0 | 1 | 2 dup: boolean retain: boolean topic: string payload: string | Buffer properties?: { payloadFormatIndicator?: boolean, messageExpiryInterval?: number, topicAlias?: number, responseTopic?: string, correlationData?: Buffer, userProperties?: UserProperties, subscriptionIdentifier?: number, contentType?: string } } ... export interface IPacket { cmd: PacketCmd messageId?: number length?: number } ``` ![](https://i.imgur.com/U7roQ5B.png) broker收到client publish的packet時會啟用這個handler 可以用來檢查publish的內容,用callback決定要不要讓他publish出去 payload有可能是Buffer 官方文件說建議把前綴是$SYS/的topic通通擋掉避免DoS 小發現: LWT -> client出乎意料地斷連時會傳出LWT 這個時候client可能是null ### authorizeSubscribe ```typescript= type AuthorizeSubscribeHandler = ( client: Client, subscription: Subscription, callback: (error: Error | null, subscription?: Subscription | null) => void ) => void ... export type Subscription = ISubscription & { clientId?: string } export interface ISubscription { topic: string qos: 0 | 1 | 2, nl?: boolean, rap?: boolean, rh?: number } ``` 會觸發這個handler的情況: - 有client subscribe某個topic - 有設置broker persistence,重啟之後他要恢復關掉之前的訂閱狀態,裡面儲存的訂閱資訊送過來? subscription通常會長成這樣`{ topic: hello, qos: 0 }` 可以改動topic跟內容 可以回傳error讓他斷線,或是在callback的subscription欄位設成null,拒絕他訂閱該topic,但這樣不會讓他斷線,疑似還會讓他以為他成功的訂閱了該topic? ### authorizeForward 通常是broker要傳送retained message之前會啟用 可以修改該message的內容 ## 設置redis redis在4.0以後已經支援promise,因此不用把函式都手動promisify ![](https://i.imgur.com/VcJ7Rnx.png) ```typescript= // redis.service.ts ... const connectConfig = { host: '127.0.0.1', port: 6379, database: 0, }; @Injectable() export class RedisService { private client: RedisClientType; constructor() { this.startRedis(); } async startRedis() { this.client = createClient(connectConfig); await this.client.connect(); console.log('redis connected'); } getRedisClient() { return this.client; } } ``` ## AedesOptions - mq - concurrency - persistence - queueLimit - maxClientsIdLength - heartbeatInterval - id - connectTimeout ### mq(mqemitter) 用來傳送訊息給client的middleware,預設為<a href="https://github.com/mcollina/mqemitter">mqemitter</a> 在options裡可以設置如何解讀topic ### concurrency 決定broker用mq最多可以同時傳送多少訊息,預設為100 ### persistence ```typescript= import RedisPersistence from 'aedes-persistence-redis'; ... persistence: RedisPersistence({ port: 6379, host: '127.0.0.1', family: 4, // 4 (IPv4) or 6 (IPv6) db: 15, }), ``` persistence是一個middleware,可以設置儲存QoS>0 data、retained data、will packet及subscription資料的地方 預設是<a href="https://github.com/moscajs/aedes-persistence">aedes-persistence</a>,這裡我們用的是<a href="https://github.com/moscajs/aedes-persistence-redis">aedes-persistence-redis</a> - subscription, retained data 當client連線斷掉的時候,再次連回來可以取回訂閱資訊,順便取得retained data 要開啟這項功能你的client的clean session必須要設為false 這兩項在MQTT5.0會被移除,改由Message Expiry Interval代替 用MQTTBOX有點難重現這個功能,斷線之後原本已經subscribe的視窗會跳掉,重新連線之後介面不會顯示有沒有重新subscribe,只試出設定為non-clean cache的時候重新連線之後再次subscribe已經有retained message的topic的時候會把你踢斷線 ### queueLimit ### maxClientsIdLength 決定clinetId可以有多長,預設為23 ### heartbeatInterval broker每隔一段時間就會傳訊息到topic $SYS/<aedes.id>/heartbeat裡,這裡可以以毫秒為單位決定每多久要傳一次訊息,預設為60000ms(60s) ### id broker id預設為uuidv4() ### connectTimeout 決定等待CONNECT packet的最大時間,預設為30000ms(30s) ## AedesEvent ### publish ```typescript= this.aedesInstance.on('publish', async (packet, client) => { if (client && /^\$/.test(packet.topic)) { const payload = JSON.parse(packet.payload.toString()); // console.log(payload); // const result = Object.keys(payload).map((key) => { // return [key, payload.key]; // }); await this.redisService.getRedisClient().HSET(packet.topic, payload); await this.redisService .getRedisClient() .SETEX(`healthCheck:${client.id}`, 60, Date.now().toString()); } }); ``` 用on監聽事件 當broker publish訊息到subscriber時會觸發 要注意的是不只有client publish給broker轉傳的時候會觸發 broker自己本身也會傳訊息,用的是$SYS的前綴,並且client為null 所以要過濾掉這類訊息最簡單的方法通常是if(client)或用regex來測試