<style>
.textleft {
text-align:left;
}
.reveal, .reveal h1, .reveal h2, .reveal h3, .reveal h4, .reveal h5, .reveal h6 {
font-family:Arial, Microsoft JhengHei;}
.small-font {
font-size: 20px !important;
}
.reveal .progress {
height: 20px !important;
transform: scale(1,2);
}
.progress span {
background: url() repeat-x !important;
}
.progress span:after, .progress span.nyancat {
content: "";
background: url() ;
width: 36px !important;
height: 21px !important;
border: none !important;
float: right;
margin-top: -7px;
margin-right: -10px;
transform: scale(2,1);
}
.fullimg {
height: 550px !important;
}
.reveal section img {
background: white;
}
</style>
<!-- .slide: data-transition="slide" -->
# RabbitMQ
@Kais(VagrantPi)
###### tags: `slide`, `簡報`, `RabbitMQ`
---
<!-- .slide: data-transition="slide" -->
## Agenda
- Hello World!
- Work queues
- Publish/Subscribe
- Routing
- Topics
- RPC
- sample code link
---
<!-- .slide: data-transition="slide" -->
### Hello World!
![](https://www.rabbitmq.com/img/tutorials/python-one.png)
----
<!-- .slide: data-transition="convex" -->
send.js(producer)
```javascript
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var queue = 'hello';
var msg = 'Hello World!';
// 先戳一下 queue,如果不存在就建立
// 也可以在建立 queue 時帶入參數
// durable: if true, the queue will survive broker restarts
channel.assertQueue(queue, {
durable: false
});
channel.sendToQueue(queue, Buffer.from(msg));
console.log(" [x] Sent %s", msg);
});
setTimeout(function() {
connection.close();
process.exit(0);
}, 500);
});
```
----
<!-- .slide: data-transition="convex" -->
receive.js(consumers)
```javascript
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var queue = 'hello';
// 需要注意的是,所有 assertQueue 'hello' queue 帶的參數要一樣,不然會噴 error
channel.assertQueue(queue, {
durable: false
});
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
channel.consume(queue, function(msg) {
console.log(" [x] Received %s", msg.content.toString());
}, {
noAck: true
});
});
});
```
---
<!-- .slide: data-transition="slide" -->
### Work queues
![](https://www.rabbitmq.com/img/tutorials/python-two.png)
```
預設值情況下,RabbitMQ 會輪著分配,所以在偶數 tack 情況下,每個
worker 分配到的工作是一樣的,這稱為:round-robin
```
----
<!-- .slide: data-transition="convex" -->
#### acknowledgment
```javascript
noAck: true
channel.consume(queue, function(msg) {
....
}, {noAck: false});// 開啟 acknowledgment
```
```
當 worker 拿到 task 掛掉後,RabbitMQ 可以透過 acknowledgment 來發
現,並將 task 在分派下去,原理及 worker 接收到後向 RabbitMQ 發送訊息
所已經取得了 `channel.ack(msg);`
```
----
<!-- .slide: data-transition="convex" -->
#### Message durability
```
上面的 acknowledgment 使 consumer 掛掉後,任務不會遺失,但假設
RabbitMQ 主機掛掉,task 還是會遺失,因此需要設定 queue 為 durable
,然後傳送的 message 必須設定 persistent
```
```javascript
channel.assertQueue('hello',{ durable:true });
```
```
但是,assertQueue 原本以建立的 Queue 餵的參數不能跟當初建立時的
不同
```
```javascript
channel.sendToQueue(q, Buffer.from(msg), {persistent: true});
```
----
<!-- .slide: data-transition="convex" -->
#### Fair dispatch
![](https://www.rabbitmq.com/img/tutorials/prefetch-count.png)
```javascript
channel.prefetch(1);
```
```
前面有提到預設採用 round-robin 的分配 task 方式,所以不管 worker 是
否還在執行 task,他還是會往他那邊塞,prefetch=1 的意思為 worker 同
一時間只會接收一個 task
```
----
<!-- .slide: data-transition="convex" -->
new_task.js(producer)
```javascript
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var queue = 'task_queue';
var msg = process.argv.slice(2).join(' ') || "Hello World!";
channel.assertQueue(queue, {
durable: true
});
channel.sendToQueue(queue, Buffer.from(msg), {
persistent: true
});
console.log(" [x] Sent '%s'", msg);
});
setTimeout(function() {
connection.close();
process.exit(0);
}, 500);
});
```
----
<!-- .slide: data-transition="convex" -->
worker.js(consumers)
```javascript
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(error, connection) {
connection.createChannel(function(error, channel) {
var queue = 'task_queue';
channel.assertQueue(queue, {
durable: true
});
channel.prefetch(1);
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
channel.consume(queue, function(msg) {
var secs = msg.content.toString().split('.').length - 1;
console.log(" [x] Received %s", msg.content.toString());
setTimeout(function() {
console.log(" [x] Done");
channel.ack(msg);
}, secs * 1000);
}, {
noAck: false
});
});
});
```
---
<!-- .slide: data-transition="slide" -->
### Publish/Subscribe
![](https://www.rabbitmq.com/img/tutorials/exchanges.png)
```
要實作一個 log 記錄器,在寫入硬碟的同時(worker1),並輸出至螢幕(worker2)
```
----
<!-- .slide: data-transition="convex" -->
#### Exchanges
```
這邊在架構上引入一個新角色 - Exchanges,producer 透過 Exchanges
分排到你預期的 queue 中,可以使其更加彈性,缺點則是當單一 Exchanges
故障,正個 service 就會停擺,Exchanges 提供 direct, topic,
headers, fanout 不同模式,該範例以 fanout 為主
```
----
<!-- .slide: data-transition="convex" -->
#### Temporary queues
對於一些需求上,可能不需要明確的建立 queue(只需要暫存的)
```javascript
channel.assertQueue('',{
// exclusive 為 true 時,只會讓一個 connection 連接,並在結束時會刪除該 queue
exclusive:true
});
```
----
<!-- .slide: data-transition="convex" -->
#### Bindings
![](https://www.rabbitmq.com/img/tutorials/bindings.png)
```
建立完 Exchange 和 Queue 後,需要綁定兩者之間的關係
```
----
<!-- .slide: data-transition="convex" -->
#### Putting it all together
![](https://www.rabbitmq.com/img/tutorials/python-three-overall.png)
----
<!-- .slide: data-transition="convex" -->
emit_log.js
```javascript
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var exchange = 'logs';
var msg = process.argv.slice(2).join(' ') || 'Hello World!';
channel.assertExchange(exchange, 'fanout', {
durable: false
});
channel.publish(exchange, '', Buffer.from(msg));
console.log(" [x] Sent %s", msg);
});
setTimeout(function() {
connection.close();
process.exit(0);
}, 500);
});
```
----
<!-- .slide: data-transition="convex" -->
receive_logs.js
```javascript
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var exchange = 'logs';
channel.assertExchange(exchange, 'fanout', {
durable: false
});
channel.assertQueue('', {
exclusive: true
}, function(error2, q) {
if (error2) {
throw error2;
}
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue);
// channel.bindQueue(queue, source, pattern, [args, [function(err, ok) {...}]])
channel.bindQueue(q.queue, exchange, '');
channel.consume(q.queue, function(msg) {
if (msg.content) {
console.log(" [x] %s", msg.content.toString());
}
}, {
noAck: true
});
});
});
});
```
----
<!-- .slide: data-transition="convex" -->
```
./receive_logs.js > logs_from_rabbit.log
```
```
./receive_logs.js
```
```
./emit_log.js
```
---
<!-- .slide: data-transition="slide" -->
### Routing
![](https://www.rabbitmq.com/img/tutorials/python-four.png)
----
<!-- .slide: data-transition="convex" -->
#### Bindings
```javascript
channel.bindQueue(q.queue, exchange, '');
```
bindQueue 可以簡單理解成該 queue 只對什麼 exchanges 訊息有興趣
----
<!-- .slide: data-transition="convex" -->
#### Direct exchange
![](https://www.rabbitmq.com/img/tutorials/direct-exchange.png)
message(routing key) -> direct + binding key
----
<!-- .slide: data-transition="convex" -->
#### Multiple bindings
![](https://www.rabbitmq.com/img/tutorials/direct-exchange-multiple.png)
----
<!-- .slide: data-transition="convex" -->
emit_log_direct.js
```javascript
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var exchange = 'direct_logs';
var args = process.argv.slice(2);
var msg = args.slice(1).join(' ') || 'Hello World!';
var severity = (args.length > 0) ? args[0] : 'info';
channel.assertExchange(exchange, 'direct', {
durable: false
});
channel.publish(exchange, severity, Buffer.from(msg));
console.log(" [x] Sent %s: '%s'", severity, msg);
});
setTimeout(function() {
connection.close();
process.exit(0)
}, 500);
});
```
----
<!-- .slide: data-transition="convex" -->
receive_logs_direct.js
```javascript
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
var args = process.argv.slice(2);
if (args.length == 0) {
console.log("Usage: receive_logs_direct.js [info] [warning] [error]");
process.exit(1);
}
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var exchange = 'direct_logs';
channel.assertExchange(exchange, 'direct', {
durable: false
});
channel.assertQueue('', {
exclusive: true
}, function(error2, q) {
if (error2) {
throw error2;
}
console.log(' [*] Waiting for logs. To exit press CTRL+C');
args.forEach(function(severity) {
channel.bindQueue(q.queue, exchange, severity);
});
channel.consume(q.queue, function(msg) {
console.log(" [x] %s: '%s'", msg.fields.routingKey, msg.content.toString());
}, {
noAck: true
});
});
});
});
```
----
<!-- .slide: data-transition="convex" -->
![](https://www.rabbitmq.com/img/tutorials/python-four.png)
```
./receive_logs_direct.js warning error > logs_from_rabbit.log
```
```
./receive_logs_direct.js info warning error
```
```
./emit_log_direct.js error "Run. Run. Or it will explode."
```
---
<!-- .slide: data-transition="slide" -->
### Topics
![](https://www.rabbitmq.com/img/tutorials/python-five.png)
----
<!-- .slide: data-transition="convex" -->
又回到我們的 log 服務上,log 不只有各種 severity level 的差別 (info/warn/crit...) 也會有其他像 facility (auth/cron/kern...).
----
<!-- .slide: data-transition="convex" -->
#### Topic exchange
```
設定廣播的 topic 都用 `.` 隔開,ex: "stock.usd.nyse", "nyse.vmw"
, "quick.orange.rabbit(最多可到 255 bytes)
binding 的方式也跟 direct 很像,選定要訂閱的 topic 就好
- `*` 可代替一個單字
- `#` 可代替 0 或多個單字
ex: *.orange.*", "*.*.rabbit", "lazy.#"
```
----
<!-- .slide: data-transition="convex" -->
範例
```
Queue1: "*.orange.*"
Queue2: "*.*.rabbit", "lazy.#"
routing key: "quick.orange.rabbit"
※ 如果送出的 lazy.orange.male.rabbit 這種長度跟 "*.orange.*", "*.*.rabbit" 的合,不過跟 "lazy.#" 有 match,所以只會傳到 Queue2
```
----
<!-- .slide: data-transition="convex" -->
emit_log_topic.js
```javascript
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var exchange = 'topic_logs';
var args = process.argv.slice(2);
var key = (args.length > 0) ? args[0] : 'anonymous.info';
var msg = args.slice(1).join(' ') || 'Hello World!';
channel.assertExchange(exchange, 'topic', {
durable: false
});
channel.publish(exchange, key, Buffer.from(msg));
console.log(" [x] Sent %s:'%s'", key, msg);
});
setTimeout(function() {
connection.close();
process.exit(0)
}, 500);
});
```
----
<!-- .slide: data-transition="convex" -->
receive_logs_topic.js:
```javascript
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
var args = process.argv.slice(2);
if (args.length == 0) {
console.log("Usage: receive_logs_topic.js <facility>.<severity>");
process.exit(1);
}
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var exchange = 'topic_logs';
channel.assertExchange(exchange, 'topic', {
durable: false
});
channel.assertQueue('', {
exclusive: true
}, function(error2, q) {
if (error2) {
throw error2;
}
console.log(' [*] Waiting for logs. To exit press CTRL+C');
args.forEach(function(key) {
channel.bindQueue(q.queue, exchange, key);
});
channel.consume(q.queue, function(msg) {
console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString());
}, {
noAck: true
});
});
});
});
```
---
<!-- .slide: data-transition="slide" -->
### Remote procedure call (RPC)
![](https://www.rabbitmq.com/img/tutorials/python-six.png)
----
<!-- .slide: data-transition="convex" -->
#### Callback queue
```javascript
channel.assertQueue('', {
exclusive: true
});
channel.sendToQueue('rpc_queue', Buffer.from('10'), {
replyTo: queue_name
});
# ... then code to read a response message from the callback queue ...
```
----
<!-- .slide: data-transition="convex" -->
#### Message properties
```
AMQP 0-9-1 protocol predefines a set of 14 properties
* persistent: tutorial 2 有提到,可以確保 restart 後資料還在
* content_type: mime-type,ex: `application/json`.
* reply_to: callback queue 的名字
* correlation_id: 識別 Useful to correlate RPC responses with requests.
```
----
<!-- .slide: data-transition="convex" -->
#### Correlation Id
如果每次的 RPC callback 都建立一個 callback queue 這樣沒什麼效率,所以好的方法是只建立一個 callback 的 queue,不過為了分辨這次 callback 是哪一次哪一個 producer 傳的,所以需要一個 Correlation Id 來做識別
----
<!-- .slide: data-transition="convex" -->
rpc_client.js
```javascript
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
var args = process.argv.slice(2);
if (args.length === 0) {
console.log("Usage: rpc_client.js num");
process.exit(1);
}
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
channel.assertQueue('', {
exclusive: true
}, function(error2, q) {
if (error2) {
throw error2;
}
var correlationId = generateUuid();
var num = parseInt(args[0]);
console.log(' [x] Requesting fib(%d)', num);
channel.consume(q.queue, function(msg) {
if (msg.properties.correlationId === correlationId) {
console.log(' [.] Got %s', msg.content.toString());
setTimeout(function() {
connection.close();
process.exit(0);
}, 500);
}
}, {
noAck: true
});
channel.sendToQueue('rpc_queue',
Buffer.from(num.toString()), {
correlationId: correlationId,
replyTo: q.queue
});
});
});
});
function generateUuid() {
return Math.random().toString() +
Math.random().toString() +
Math.random().toString();
}
```
----
<!-- .slide: data-transition="convex" -->
```javascript
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var queue = 'rpc_queue';
channel.assertQueue(queue, {
durable: false
});
channel.prefetch(1);
console.log(' [x] Awaiting RPC requests');
channel.consume(queue, function reply(msg) {
var n = parseInt(msg.content.toString());
console.log(" [.] fib(%d)", n);
var r = fibonacci(n);
channel.sendToQueue(msg.properties.replyTo,
Buffer.from(r.toString()), {
correlationId: msg.properties.correlationId
});
channel.ack(msg);
});
});
});
function fibonacci(n) {
if (n === 0 || n === 1)
return n;
else
return fibonacci(n - 1) + fibonacci(n - 2);
}
```
---
<!-- .slide: data-transition="slide" -->
## sample code
- [sample code](https://bitbucket.org/kaislin/bolt-queue/src/master/)
- [官方 tutorials code](https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/javascript-nodejs)
- [node.js amqplib sample code](https://github.com/squaremo/amqp.node/tree/master/examples/tutorials)
{"metaMigratedAt":"2023-06-14T21:42:57.409Z","metaMigratedFrom":"Content","title":"RabbitMQ","breaks":true,"contributors":"[{\"id\":\"69ade472-3ed3-499d-8a69-767243a31621\",\"add\":28103,\"del\":3888}]"}