<style> .textleft { text-align:left; } .reveal, .reveal h1, .reveal h2, .reveal h3, .reveal h4, .reveal h5, .reveal h6 { font-family:Arial, Microsoft JhengHei;} .small-font { font-size: 20px !important; } .reveal .progress { height: 20px !important; transform: scale(1,2); } .progress span { background: url() repeat-x !important; } .progress span:after, .progress span.nyancat { content: ""; background: url() ; width: 36px !important; height: 21px !important; border: none !important; float: right; margin-top: -7px; margin-right: -10px; transform: scale(2,1); } .fullimg { height: 550px !important; } .reveal section img { background: white; } </style> <!-- .slide: data-transition="slide" --> # RabbitMQ @Kais(VagrantPi) ###### tags: `slide`, `簡報`, `RabbitMQ` --- <!-- .slide: data-transition="slide" --> ## Agenda - Hello World! - Work queues - Publish/Subscribe - Routing - Topics - RPC - sample code link --- <!-- .slide: data-transition="slide" --> ### Hello World! ![](https://www.rabbitmq.com/img/tutorials/python-one.png) ---- <!-- .slide: data-transition="convex" --> send.js(producer) ```javascript #!/usr/bin/env node var amqp = require('amqplib/callback_api'); amqp.connect('amqp://localhost', function(error0, connection) { if (error0) { throw error0; } connection.createChannel(function(error1, channel) { if (error1) { throw error1; } var queue = 'hello'; var msg = 'Hello World!'; // 先戳一下 queue,如果不存在就建立 // 也可以在建立 queue 時帶入參數 // durable: if true, the queue will survive broker restarts channel.assertQueue(queue, { durable: false }); channel.sendToQueue(queue, Buffer.from(msg)); console.log(" [x] Sent %s", msg); }); setTimeout(function() { connection.close(); process.exit(0); }, 500); }); ``` ---- <!-- .slide: data-transition="convex" --> receive.js(consumers) ```javascript #!/usr/bin/env node var amqp = require('amqplib/callback_api'); amqp.connect('amqp://localhost', function(error0, connection) { if (error0) { throw error0; } connection.createChannel(function(error1, channel) { if (error1) { throw error1; } var queue = 'hello'; // 需要注意的是,所有 assertQueue 'hello' queue 帶的參數要一樣,不然會噴 error channel.assertQueue(queue, { durable: false }); console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue); channel.consume(queue, function(msg) { console.log(" [x] Received %s", msg.content.toString()); }, { noAck: true }); }); }); ``` --- <!-- .slide: data-transition="slide" --> ### Work queues ![](https://www.rabbitmq.com/img/tutorials/python-two.png) ``` 預設值情況下,RabbitMQ 會輪著分配,所以在偶數 tack 情況下,每個 worker 分配到的工作是一樣的,這稱為:round-robin ``` ---- <!-- .slide: data-transition="convex" --> #### acknowledgment ```javascript noAck: true channel.consume(queue, function(msg) { .... }, {noAck: false});// 開啟 acknowledgment ``` ``` 當 worker 拿到 task 掛掉後,RabbitMQ 可以透過 acknowledgment 來發 現,並將 task 在分派下去,原理及 worker 接收到後向 RabbitMQ 發送訊息 所已經取得了 `channel.ack(msg);` ``` ---- <!-- .slide: data-transition="convex" --> #### Message durability ``` 上面的 acknowledgment 使 consumer 掛掉後,任務不會遺失,但假設 RabbitMQ 主機掛掉,task 還是會遺失,因此需要設定 queue 為 durable ,然後傳送的 message 必須設定 persistent ``` ```javascript channel.assertQueue('hello',{ durable:true }); ``` ``` 但是,assertQueue 原本以建立的 Queue 餵的參數不能跟當初建立時的 不同 ``` ```javascript channel.sendToQueue(q, Buffer.from(msg), {persistent: true}); ``` ---- <!-- .slide: data-transition="convex" --> #### Fair dispatch ![](https://www.rabbitmq.com/img/tutorials/prefetch-count.png) ```javascript channel.prefetch(1); ``` ``` 前面有提到預設採用 round-robin 的分配 task 方式,所以不管 worker 是 否還在執行 task,他還是會往他那邊塞,prefetch=1 的意思為 worker 同 一時間只會接收一個 task ``` ---- <!-- .slide: data-transition="convex" --> new_task.js(producer) ```javascript #!/usr/bin/env node var amqp = require('amqplib/callback_api'); amqp.connect('amqp://localhost', function(error0, connection) { if (error0) { throw error0; } connection.createChannel(function(error1, channel) { if (error1) { throw error1; } var queue = 'task_queue'; var msg = process.argv.slice(2).join(' ') || "Hello World!"; channel.assertQueue(queue, { durable: true }); channel.sendToQueue(queue, Buffer.from(msg), { persistent: true }); console.log(" [x] Sent '%s'", msg); }); setTimeout(function() { connection.close(); process.exit(0); }, 500); }); ``` ---- <!-- .slide: data-transition="convex" --> worker.js(consumers) ```javascript #!/usr/bin/env node var amqp = require('amqplib/callback_api'); amqp.connect('amqp://localhost', function(error, connection) { connection.createChannel(function(error, channel) { var queue = 'task_queue'; channel.assertQueue(queue, { durable: true }); channel.prefetch(1); console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue); channel.consume(queue, function(msg) { var secs = msg.content.toString().split('.').length - 1; console.log(" [x] Received %s", msg.content.toString()); setTimeout(function() { console.log(" [x] Done"); channel.ack(msg); }, secs * 1000); }, { noAck: false }); }); }); ``` --- <!-- .slide: data-transition="slide" --> ### Publish/Subscribe ![](https://www.rabbitmq.com/img/tutorials/exchanges.png) ``` 要實作一個 log 記錄器,在寫入硬碟的同時(worker1),並輸出至螢幕(worker2) ``` ---- <!-- .slide: data-transition="convex" --> #### Exchanges ``` 這邊在架構上引入一個新角色 - Exchanges,producer 透過 Exchanges 分排到你預期的 queue 中,可以使其更加彈性,缺點則是當單一 Exchanges 故障,正個 service 就會停擺,Exchanges 提供 direct, topic, headers, fanout 不同模式,該範例以 fanout 為主 ``` ---- <!-- .slide: data-transition="convex" --> #### Temporary queues 對於一些需求上,可能不需要明確的建立 queue(只需要暫存的) ```javascript channel.assertQueue('',{ // exclusive 為 true 時,只會讓一個 connection 連接,並在結束時會刪除該 queue exclusive:true }); ``` ---- <!-- .slide: data-transition="convex" --> #### Bindings ![](https://www.rabbitmq.com/img/tutorials/bindings.png) ``` 建立完 Exchange 和 Queue 後,需要綁定兩者之間的關係 ``` ---- <!-- .slide: data-transition="convex" --> #### Putting it all together ![](https://www.rabbitmq.com/img/tutorials/python-three-overall.png) ---- <!-- .slide: data-transition="convex" --> emit_log.js ```javascript #!/usr/bin/env node var amqp = require('amqplib/callback_api'); amqp.connect('amqp://localhost', function(error0, connection) { if (error0) { throw error0; } connection.createChannel(function(error1, channel) { if (error1) { throw error1; } var exchange = 'logs'; var msg = process.argv.slice(2).join(' ') || 'Hello World!'; channel.assertExchange(exchange, 'fanout', { durable: false }); channel.publish(exchange, '', Buffer.from(msg)); console.log(" [x] Sent %s", msg); }); setTimeout(function() { connection.close(); process.exit(0); }, 500); }); ``` ---- <!-- .slide: data-transition="convex" --> receive_logs.js ```javascript #!/usr/bin/env node var amqp = require('amqplib/callback_api'); amqp.connect('amqp://localhost', function(error0, connection) { if (error0) { throw error0; } connection.createChannel(function(error1, channel) { if (error1) { throw error1; } var exchange = 'logs'; channel.assertExchange(exchange, 'fanout', { durable: false }); channel.assertQueue('', { exclusive: true }, function(error2, q) { if (error2) { throw error2; } console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue); // channel.bindQueue(queue, source, pattern, [args, [function(err, ok) {...}]]) channel.bindQueue(q.queue, exchange, ''); channel.consume(q.queue, function(msg) { if (msg.content) { console.log(" [x] %s", msg.content.toString()); } }, { noAck: true }); }); }); }); ``` ---- <!-- .slide: data-transition="convex" --> ``` ./receive_logs.js > logs_from_rabbit.log ``` ``` ./receive_logs.js ``` ``` ./emit_log.js ``` --- <!-- .slide: data-transition="slide" --> ### Routing ![](https://www.rabbitmq.com/img/tutorials/python-four.png) ---- <!-- .slide: data-transition="convex" --> #### Bindings ```javascript channel.bindQueue(q.queue, exchange, ''); ``` bindQueue 可以簡單理解成該 queue 只對什麼 exchanges 訊息有興趣 ---- <!-- .slide: data-transition="convex" --> #### Direct exchange ![](https://www.rabbitmq.com/img/tutorials/direct-exchange.png) message(routing key) -> direct + binding key ---- <!-- .slide: data-transition="convex" --> #### Multiple bindings ![](https://www.rabbitmq.com/img/tutorials/direct-exchange-multiple.png) ---- <!-- .slide: data-transition="convex" --> emit_log_direct.js ```javascript #!/usr/bin/env node var amqp = require('amqplib/callback_api'); amqp.connect('amqp://localhost', function(error0, connection) { if (error0) { throw error0; } connection.createChannel(function(error1, channel) { if (error1) { throw error1; } var exchange = 'direct_logs'; var args = process.argv.slice(2); var msg = args.slice(1).join(' ') || 'Hello World!'; var severity = (args.length > 0) ? args[0] : 'info'; channel.assertExchange(exchange, 'direct', { durable: false }); channel.publish(exchange, severity, Buffer.from(msg)); console.log(" [x] Sent %s: '%s'", severity, msg); }); setTimeout(function() { connection.close(); process.exit(0) }, 500); }); ``` ---- <!-- .slide: data-transition="convex" --> receive_logs_direct.js ```javascript #!/usr/bin/env node var amqp = require('amqplib/callback_api'); var args = process.argv.slice(2); if (args.length == 0) { console.log("Usage: receive_logs_direct.js [info] [warning] [error]"); process.exit(1); } amqp.connect('amqp://localhost', function(error0, connection) { if (error0) { throw error0; } connection.createChannel(function(error1, channel) { if (error1) { throw error1; } var exchange = 'direct_logs'; channel.assertExchange(exchange, 'direct', { durable: false }); channel.assertQueue('', { exclusive: true }, function(error2, q) { if (error2) { throw error2; } console.log(' [*] Waiting for logs. To exit press CTRL+C'); args.forEach(function(severity) { channel.bindQueue(q.queue, exchange, severity); }); channel.consume(q.queue, function(msg) { console.log(" [x] %s: '%s'", msg.fields.routingKey, msg.content.toString()); }, { noAck: true }); }); }); }); ``` ---- <!-- .slide: data-transition="convex" --> ![](https://www.rabbitmq.com/img/tutorials/python-four.png) ``` ./receive_logs_direct.js warning error > logs_from_rabbit.log ``` ``` ./receive_logs_direct.js info warning error ``` ``` ./emit_log_direct.js error "Run. Run. Or it will explode." ``` --- <!-- .slide: data-transition="slide" --> ### Topics ![](https://www.rabbitmq.com/img/tutorials/python-five.png) ---- <!-- .slide: data-transition="convex" --> 又回到我們的 log 服務上,log 不只有各種 severity level 的差別 (info/warn/crit...) 也會有其他像 facility (auth/cron/kern...). ---- <!-- .slide: data-transition="convex" --> #### Topic exchange ``` 設定廣播的 topic 都用 `.` 隔開,ex: "stock.usd.nyse", "nyse.vmw" , "quick.orange.rabbit(最多可到 255 bytes) binding 的方式也跟 direct 很像,選定要訂閱的 topic 就好 - `*` 可代替一個單字 - `#` 可代替 0 或多個單字 ex: *.orange.*", "*.*.rabbit", "lazy.#" ``` ---- <!-- .slide: data-transition="convex" --> 範例 ``` Queue1: "*.orange.*" Queue2: "*.*.rabbit", "lazy.#" routing key: "quick.orange.rabbit" ※ 如果送出的 lazy.orange.male.rabbit 這種長度跟 "*.orange.*", "*.*.rabbit" 的合,不過跟 "lazy.#" 有 match,所以只會傳到 Queue2 ``` ---- <!-- .slide: data-transition="convex" --> emit_log_topic.js ```javascript #!/usr/bin/env node var amqp = require('amqplib/callback_api'); amqp.connect('amqp://localhost', function(error0, connection) { if (error0) { throw error0; } connection.createChannel(function(error1, channel) { if (error1) { throw error1; } var exchange = 'topic_logs'; var args = process.argv.slice(2); var key = (args.length > 0) ? args[0] : 'anonymous.info'; var msg = args.slice(1).join(' ') || 'Hello World!'; channel.assertExchange(exchange, 'topic', { durable: false }); channel.publish(exchange, key, Buffer.from(msg)); console.log(" [x] Sent %s:'%s'", key, msg); }); setTimeout(function() { connection.close(); process.exit(0) }, 500); }); ``` ---- <!-- .slide: data-transition="convex" --> receive_logs_topic.js: ```javascript #!/usr/bin/env node var amqp = require('amqplib/callback_api'); var args = process.argv.slice(2); if (args.length == 0) { console.log("Usage: receive_logs_topic.js <facility>.<severity>"); process.exit(1); } amqp.connect('amqp://localhost', function(error0, connection) { if (error0) { throw error0; } connection.createChannel(function(error1, channel) { if (error1) { throw error1; } var exchange = 'topic_logs'; channel.assertExchange(exchange, 'topic', { durable: false }); channel.assertQueue('', { exclusive: true }, function(error2, q) { if (error2) { throw error2; } console.log(' [*] Waiting for logs. To exit press CTRL+C'); args.forEach(function(key) { channel.bindQueue(q.queue, exchange, key); }); channel.consume(q.queue, function(msg) { console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString()); }, { noAck: true }); }); }); }); ``` --- <!-- .slide: data-transition="slide" --> ### Remote procedure call (RPC) ![](https://www.rabbitmq.com/img/tutorials/python-six.png) ---- <!-- .slide: data-transition="convex" --> #### Callback queue ```javascript channel.assertQueue('', { exclusive: true }); channel.sendToQueue('rpc_queue', Buffer.from('10'), { replyTo: queue_name }); # ... then code to read a response message from the callback queue ... ``` ---- <!-- .slide: data-transition="convex" --> #### Message properties ``` AMQP 0-9-1 protocol predefines a set of 14 properties * persistent: tutorial 2 有提到,可以確保 restart 後資料還在 * content_type: mime-type,ex: `application/json`. * reply_to: callback queue 的名字 * correlation_id: 識別 Useful to correlate RPC responses with requests. ``` ---- <!-- .slide: data-transition="convex" --> #### Correlation Id 如果每次的 RPC callback 都建立一個 callback queue 這樣沒什麼效率,所以好的方法是只建立一個 callback 的 queue,不過為了分辨這次 callback 是哪一次哪一個 producer 傳的,所以需要一個 Correlation Id 來做識別 ---- <!-- .slide: data-transition="convex" --> rpc_client.js ```javascript #!/usr/bin/env node var amqp = require('amqplib/callback_api'); var args = process.argv.slice(2); if (args.length === 0) { console.log("Usage: rpc_client.js num"); process.exit(1); } amqp.connect('amqp://localhost', function(error0, connection) { if (error0) { throw error0; } connection.createChannel(function(error1, channel) { if (error1) { throw error1; } channel.assertQueue('', { exclusive: true }, function(error2, q) { if (error2) { throw error2; } var correlationId = generateUuid(); var num = parseInt(args[0]); console.log(' [x] Requesting fib(%d)', num); channel.consume(q.queue, function(msg) { if (msg.properties.correlationId === correlationId) { console.log(' [.] Got %s', msg.content.toString()); setTimeout(function() { connection.close(); process.exit(0); }, 500); } }, { noAck: true }); channel.sendToQueue('rpc_queue', Buffer.from(num.toString()), { correlationId: correlationId, replyTo: q.queue }); }); }); }); function generateUuid() { return Math.random().toString() + Math.random().toString() + Math.random().toString(); } ``` ---- <!-- .slide: data-transition="convex" --> ```javascript #!/usr/bin/env node var amqp = require('amqplib/callback_api'); amqp.connect('amqp://localhost', function(error0, connection) { if (error0) { throw error0; } connection.createChannel(function(error1, channel) { if (error1) { throw error1; } var queue = 'rpc_queue'; channel.assertQueue(queue, { durable: false }); channel.prefetch(1); console.log(' [x] Awaiting RPC requests'); channel.consume(queue, function reply(msg) { var n = parseInt(msg.content.toString()); console.log(" [.] fib(%d)", n); var r = fibonacci(n); channel.sendToQueue(msg.properties.replyTo, Buffer.from(r.toString()), { correlationId: msg.properties.correlationId }); channel.ack(msg); }); }); }); function fibonacci(n) { if (n === 0 || n === 1) return n; else return fibonacci(n - 1) + fibonacci(n - 2); } ``` --- <!-- .slide: data-transition="slide" --> ## sample code - [sample code](https://bitbucket.org/kaislin/bolt-queue/src/master/) - [官方 tutorials code](https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/javascript-nodejs) - [node.js amqplib sample code](https://github.com/squaremo/amqp.node/tree/master/examples/tutorials)
{"metaMigratedAt":"2023-06-14T21:42:57.409Z","metaMigratedFrom":"Content","title":"RabbitMQ","breaks":true,"contributors":"[{\"id\":\"69ade472-3ed3-499d-8a69-767243a31621\",\"add\":28103,\"del\":3888}]"}
    482 views