# AEDES 0.46.2
```shell=
npm install aedes@0.46.2 aedes-server-factory aedes-protocol-decoder aedes-persistence-redis redis
docker run --name redis -d -p 6379:6379 redis
```
請去tsconfig新增設定
```json=
"esModuleInterop": true
```
## aedes宣告
在0.46.2版,aedes是function,所以要用`this.aedesInstance = aedes({});`
而到了0.48.0版,aedes變成了class,會變成要用`this.aedesInstance = new aedes({});`
## example
```typescript=
//broker.service.ts
import {
Injectable,
OnModuleInit,
OnApplicationBootstrap,
} from '@nestjs/common';
import aedes from 'aedes';
import { Aedes } from 'aedes';
import { createServer } from 'aedes-server-factory';
import { protocolDecoder } from 'aedes-protocol-decoder';
import { AuthService } from './auth.service';
@Injectable()
export class BrokerService implements OnModuleInit, OnApplicationBootstrap {
private aedesInstance: Aedes;
constructor(private authService: AuthService) {}
onModuleInit() {
this.aedesInstance = aedes({});
}
onApplicationBootstrap() {
createServer(this.aedesInstance, {
trustProxy: true,
protocolDecoder,
}).listen(1883, () => {
console.log('MQTT Broker is listening on port 1883');
});
}
}
```
<a href="https://docs.nestjs.com/fundamentals/lifecycle-events">NestJS Lifecycle</a>
- OnModuleInit
在程式讀到這個service所在的module之後會立刻執行該function

- OnApplicationBootstrap
在程式執行OnModuleInit之後跟著執行該function
這兩個函式都可以加上async
- createServer
`createServer(broker, options).listen(port, function)`
建立broker
當trustProxy為true且有設置protocolDecoder時,他會parse http header以及/或parse proxy protocol,然後傳到client.connDetails裡

## Aedes interface
```typescript=
// aedes declare
export declare function aedes (options?: AedesOptions): Aedes
// Aedes interface
export interface Aedes extends EventEmitter {
id: Readonly<string>
connectedClients: Readonly<number>
closed: Readonly<boolean>
brokers: Readonly<Brokers>
handle: (stream: Connection) => Client
on (event: 'closed', listener: () => void): this
on (event: 'client' | 'clientReady' | 'clientDisconnect' | 'keepaliveTimeout', listener: (client: Client) => void): this
on (event: 'clientError' | 'connectionError', listener: (client: Client, error: Error) => void): this
on (event: 'connackSent', listener: (packet: ConnackPacket, client: Client) => void): this
on (event: 'ping', listener: (packet: PingreqPacket, client: Client) => void): this
on (event: 'publish', listener: (packet: AedesPublishPacket, client: Client) => void): this
on (event: 'ack', listener: (packet: PublishPacket | PubrelPacket, client: Client) => void): this
on (event: 'subscribe', listener: (subscriptions: Subscription[], client: Client) => void): this
on (event: 'unsubscribe', listener: (unsubscriptions: string[], client: Client) => void): this
publish (
packet: PublishPacket,
callback: (error?: Error) => void
): void
subscribe (
topic: string,
deliverfunc: (packet: AedesPublishPacket, callback: () => void) => void,
callback: () => void
): void
unsubscribe (
topic: string,
deliverfunc: (packet: AedesPublishPacket, callback: () => void) => void,
callback: () => void
): void
close (callback?: () => void): void
preConnect: PreConnectHandler
authenticate: AuthenticateHandler
authorizePublish: AuthorizePublishHandler
authorizeSubscribe: AuthorizeSubscribeHandler
authorizeForward: AuthorizeForwardHandler
published: PublishedHandler
}
// Handler
type PreConnectHandler = (client: Client, packet: ConnectPacket, callback: (error: Error | null, success: boolean) => void) => void
type AuthenticateHandler = (
client: Client,
username: Readonly<string>,
password: Readonly<Buffer>,
done: (error: AuthenticateError | null, success: boolean | null) => void
) => void
type AuthorizePublishHandler = (client: Client, packet: PublishPacket, callback: (error?: Error | null) => void) => void
type AuthorizeSubscribeHandler = (client: Client, subscription: Subscription, callback: (error: Error | null, subscription?: Subscription | null) => void) => void
type AuthorizeForwardHandler = (client: Client, packet: AedesPublishPacket) => AedesPublishPacket | null | void
type PublishedHandler = (packet: AedesPublishPacket, client: Client, callback: (error?: Error | null) => void) => void
```
### id
\<string> Default: uuidv4()
回傳該broker的識別碼,預設是用uuid
### connectedClients
\<number> Default: 0
回傳連接到這個broker的client的數量
### closed
\<boolean> Default: false
回傳該broker是否已關閉
### preConnect
```typescript=
type PreConnectHandler = (
client: Client,
packet: ConnectPacket,
callback: (error: Error | null, success: boolean) => void
) => void
...
export type ConnectPacket = IConnectPacket & { cmd: 'connect' }
...
export interface IConnectPacket extends IPacket {
cmd: 'connect'
clientId: string
protocolVersion?: 4 | 5 | 3
protocolId?: 'MQTT' | 'MQIsdp'
clean?: boolean
keepalive?: number
username?: string
password?: Buffer
will?: {
topic: string
payload: Buffer
qos?: QoS
retain?: boolean
properties?: {
willDelayInterval?: number,
payloadFormatIndicator?: boolean,
messageExpiryInterval?: number,
contentType?: string,
responseTopic?: string,
correlationData?: Buffer,
userProperties?: UserProperties
}
}
properties?: {
sessionExpiryInterval?: number,
receiveMaximum?: number,
maximumPacketSize?: number,
topicAliasMaximum?: number,
requestResponseInformation?: boolean,
requestProblemInformation?: boolean,
userProperties?: UserProperties,
authenticationMethod?: string,
authenticationData?: Buffer
}
}
```
當一個broker收到CONNECT packet時會執行這個handler
通常是client向broker發出連線請求
在callback裡第一個參數放Error,第二個參數決定是否讓該client連線
如果要存ip位置跟timestamp,可以用extends增加property
```typescript=
interface IpClient extends Client {
ip: string;
eventAt: Date;
connDetails?;
}
...
preConnect: (
client: IpClient,
packet: ConnectPacket,
callback: (error: Error | null, success: boolean) => void,
) => {
if (client.connDetails && client.connDetails.ipAddress) {
client.ip = client.connDetails.ipAddress.split(':')[3];
}
client.eventAt = new Date();
callback(null, true);
}
```

`client.connDetails.ipAddress.split(':')[3] === 127.0.0.1`

如果ip在blacklist裡,可以拒絕它連線
### authenticate
```typescript=
export enum AuthErrorCode {
UNNACCEPTABLE_PROTOCOL = 1,
IDENTIFIER_REJECTED = 2,
SERVER_UNAVAILABLE = 3,
BAD_USERNAME_OR_PASSWORD = 4,
NOT_AUTHORIZED = 5
}
export type AuthenticateError = Error & { returnCode: AuthErrorCode }
type AuthenticateHandler = (
client: Client,
username: Readonly<string>,
password: Readonly<Buffer>,
done: (error: AuthenticateError | null, success: boolean | null) => void
) => void
```
在preConnect之後接著使用這個handler
用來驗證連到這個broker的client
在這個handler可以驗證帳號跟密碼
callback的第一個參數是error的種類,沒有error就放null
第二個參數是是否通過驗證
通過驗證後會傳CONNACK回去,其return code為0,沒通過驗證的return code預設為5
也可以在2到5之間自己自己設定一個return code
return code數字代表的狀態可以參考這個網址
<a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_3.1_-">Connect Return code values</a>
小發現: 使用MQTTBOX如果在這個階段被拒絕連線他會持續發送請packet,而在preConnect階段就被拒絕的話不會持續發送packet
```typescript=
// example
connectAuth(client, username, password, callback) {
// console.log({
// id: client.id,
// username,
// password: password.toString(),
// });
const whitelist = ['device:DEMO0001', 'device:DEMO0002'];
if (!whitelist.includes(client.id)) {
callback(null, false);
}
callback(null, true);
}
```
### authorizePublish
```typescript=
type AuthorizePublishHandler = (
client: Client,
packet: PublishPacket,
callback: (error?: Error | null) => void
) => void
...
export interface IPublishPacket extends IPacket {
cmd: 'publish'
qos: 0 | 1 | 2
dup: boolean
retain: boolean
topic: string
payload: string | Buffer
properties?: {
payloadFormatIndicator?: boolean,
messageExpiryInterval?: number,
topicAlias?: number,
responseTopic?: string,
correlationData?: Buffer,
userProperties?: UserProperties,
subscriptionIdentifier?: number,
contentType?: string
}
}
...
export interface IPacket {
cmd: PacketCmd
messageId?: number
length?: number
}
```

broker收到client publish的packet時會啟用這個handler
可以用來檢查publish的內容,用callback決定要不要讓他publish出去
payload有可能是Buffer
官方文件說建議把前綴是$SYS/的topic通通擋掉避免DoS
小發現: LWT -> client出乎意料地斷連時會傳出LWT
這個時候client可能是null
### authorizeSubscribe
```typescript=
type AuthorizeSubscribeHandler = (
client: Client,
subscription: Subscription,
callback: (error: Error | null, subscription?: Subscription | null) => void
) => void
...
export type Subscription = ISubscription & { clientId?: string }
export interface ISubscription {
topic: string
qos: 0 | 1 | 2,
nl?: boolean,
rap?: boolean,
rh?: number
}
```
會觸發這個handler的情況:
- 有client subscribe某個topic
- 有設置broker persistence,重啟之後他要恢復關掉之前的訂閱狀態,裡面儲存的訂閱資訊送過來?
subscription通常會長成這樣`{ topic: hello, qos: 0 }`
可以改動topic跟內容
可以回傳error讓他斷線,或是在callback的subscription欄位設成null,拒絕他訂閱該topic,但這樣不會讓他斷線,疑似還會讓他以為他成功的訂閱了該topic?
### authorizeForward
通常是broker要傳送retained message之前會啟用
可以修改該message的內容
## 設置redis
redis在4.0以後已經支援promise,因此不用把函式都手動promisify

```typescript=
// redis.service.ts
...
const connectConfig = {
host: '127.0.0.1',
port: 6379,
database: 0,
};
@Injectable()
export class RedisService {
private client: RedisClientType;
constructor() {
this.startRedis();
}
async startRedis() {
this.client = createClient(connectConfig);
await this.client.connect();
console.log('redis connected');
}
getRedisClient() {
return this.client;
}
}
```
## AedesOptions
- mq
- concurrency
- persistence
- queueLimit
- maxClientsIdLength
- heartbeatInterval
- id
- connectTimeout
### mq(mqemitter)
用來傳送訊息給client的middleware,預設為<a href="https://github.com/mcollina/mqemitter">mqemitter</a>
在options裡可以設置如何解讀topic
### concurrency
決定broker用mq最多可以同時傳送多少訊息,預設為100
### persistence
```typescript=
import RedisPersistence from 'aedes-persistence-redis';
...
persistence: RedisPersistence({
port: 6379,
host: '127.0.0.1',
family: 4, // 4 (IPv4) or 6 (IPv6)
db: 15,
}),
```
persistence是一個middleware,可以設置儲存QoS>0 data、retained data、will packet及subscription資料的地方
預設是<a href="https://github.com/moscajs/aedes-persistence">aedes-persistence</a>,這裡我們用的是<a href="https://github.com/moscajs/aedes-persistence-redis">aedes-persistence-redis</a>
- subscription, retained data
當client連線斷掉的時候,再次連回來可以取回訂閱資訊,順便取得retained data
要開啟這項功能你的client的clean session必須要設為false
這兩項在MQTT5.0會被移除,改由Message Expiry Interval代替
用MQTTBOX有點難重現這個功能,斷線之後原本已經subscribe的視窗會跳掉,重新連線之後介面不會顯示有沒有重新subscribe,只試出設定為non-clean cache的時候重新連線之後再次subscribe已經有retained message的topic的時候會把你踢斷線
### queueLimit
### maxClientsIdLength
決定clinetId可以有多長,預設為23
### heartbeatInterval
broker每隔一段時間就會傳訊息到topic $SYS/<aedes.id>/heartbeat裡,這裡可以以毫秒為單位決定每多久要傳一次訊息,預設為60000ms(60s)
### id
broker id預設為uuidv4()
### connectTimeout
決定等待CONNECT packet的最大時間,預設為30000ms(30s)
## AedesEvent
### publish
```typescript=
this.aedesInstance.on('publish', async (packet, client) => {
if (client && /^\$/.test(packet.topic)) {
const payload = JSON.parse(packet.payload.toString());
// console.log(payload);
// const result = Object.keys(payload).map((key) => {
// return [key, payload.key];
// });
await this.redisService.getRedisClient().HSET(packet.topic, payload);
await this.redisService
.getRedisClient()
.SETEX(`healthCheck:${client.id}`, 60, Date.now().toString());
}
});
```
用on監聽事件
當broker publish訊息到subscriber時會觸發
要注意的是不只有client publish給broker轉傳的時候會觸發
broker自己本身也會傳訊息,用的是$SYS的前綴,並且client為null
所以要過濾掉這類訊息最簡單的方法通常是if(client)或用regex來測試