@Kais(VagrantPi)
slide
, 簡報
, RabbitMQ
send.js(producer)
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var queue = 'hello';
var msg = 'Hello World!';
// 先戳一下 queue,如果不存在就建立
// 也可以在建立 queue 時帶入參數
// durable: if true, the queue will survive broker restarts
channel.assertQueue(queue, {
durable: false
});
channel.sendToQueue(queue, Buffer.from(msg));
console.log(" [x] Sent %s", msg);
});
setTimeout(function() {
connection.close();
process.exit(0);
}, 500);
});
receive.js(consumers)
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var queue = 'hello';
// 需要注意的是,所有 assertQueue 'hello' queue 帶的參數要一樣,不然會噴 error
channel.assertQueue(queue, {
durable: false
});
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
channel.consume(queue, function(msg) {
console.log(" [x] Received %s", msg.content.toString());
}, {
noAck: true
});
});
});
預設值情況下,RabbitMQ 會輪著分配,所以在偶數 tack 情況下,每個
worker 分配到的工作是一樣的,這稱為:round-robin
noAck: true
channel.consume(queue, function(msg) {
....
}, {noAck: false});// 開啟 acknowledgment
當 worker 拿到 task 掛掉後,RabbitMQ 可以透過 acknowledgment 來發
現,並將 task 在分派下去,原理及 worker 接收到後向 RabbitMQ 發送訊息
所已經取得了 `channel.ack(msg);`
上面的 acknowledgment 使 consumer 掛掉後,任務不會遺失,但假設
RabbitMQ 主機掛掉,task 還是會遺失,因此需要設定 queue 為 durable
,然後傳送的 message 必須設定 persistent
channel.assertQueue('hello',{ durable:true });
但是,assertQueue 原本以建立的 Queue 餵的參數不能跟當初建立時的
不同
channel.sendToQueue(q, Buffer.from(msg), {persistent: true});
channel.prefetch(1);
前面有提到預設採用 round-robin 的分配 task 方式,所以不管 worker 是
否還在執行 task,他還是會往他那邊塞,prefetch=1 的意思為 worker 同
一時間只會接收一個 task
new_task.js(producer)
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var queue = 'task_queue';
var msg = process.argv.slice(2).join(' ') || "Hello World!";
channel.assertQueue(queue, {
durable: true
});
channel.sendToQueue(queue, Buffer.from(msg), {
persistent: true
});
console.log(" [x] Sent '%s'", msg);
});
setTimeout(function() {
connection.close();
process.exit(0);
}, 500);
});
worker.js(consumers)
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(error, connection) {
connection.createChannel(function(error, channel) {
var queue = 'task_queue';
channel.assertQueue(queue, {
durable: true
});
channel.prefetch(1);
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
channel.consume(queue, function(msg) {
var secs = msg.content.toString().split('.').length - 1;
console.log(" [x] Received %s", msg.content.toString());
setTimeout(function() {
console.log(" [x] Done");
channel.ack(msg);
}, secs * 1000);
}, {
noAck: false
});
});
});
要實作一個 log 記錄器,在寫入硬碟的同時(worker1),並輸出至螢幕(worker2)
這邊在架構上引入一個新角色 - Exchanges,producer 透過 Exchanges
分排到你預期的 queue 中,可以使其更加彈性,缺點則是當單一 Exchanges
故障,正個 service 就會停擺,Exchanges 提供 direct, topic,
headers, fanout 不同模式,該範例以 fanout 為主
對於一些需求上,可能不需要明確的建立 queue(只需要暫存的)
channel.assertQueue('',{
// exclusive 為 true 時,只會讓一個 connection 連接,並在結束時會刪除該 queue
exclusive:true
});
建立完 Exchange 和 Queue 後,需要綁定兩者之間的關係
emit_log.js
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var exchange = 'logs';
var msg = process.argv.slice(2).join(' ') || 'Hello World!';
channel.assertExchange(exchange, 'fanout', {
durable: false
});
channel.publish(exchange, '', Buffer.from(msg));
console.log(" [x] Sent %s", msg);
});
setTimeout(function() {
connection.close();
process.exit(0);
}, 500);
});
receive_logs.js
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var exchange = 'logs';
channel.assertExchange(exchange, 'fanout', {
durable: false
});
channel.assertQueue('', {
exclusive: true
}, function(error2, q) {
if (error2) {
throw error2;
}
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue);
// channel.bindQueue(queue, source, pattern, [args, [function(err, ok) {...}]])
channel.bindQueue(q.queue, exchange, '');
channel.consume(q.queue, function(msg) {
if (msg.content) {
console.log(" [x] %s", msg.content.toString());
}
}, {
noAck: true
});
});
});
});
./receive_logs.js > logs_from_rabbit.log
./receive_logs.js
./emit_log.js
channel.bindQueue(q.queue, exchange, '');
bindQueue 可以簡單理解成該 queue 只對什麼 exchanges 訊息有興趣
message(routing key) -> direct + binding key
emit_log_direct.js
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var exchange = 'direct_logs';
var args = process.argv.slice(2);
var msg = args.slice(1).join(' ') || 'Hello World!';
var severity = (args.length > 0) ? args[0] : 'info';
channel.assertExchange(exchange, 'direct', {
durable: false
});
channel.publish(exchange, severity, Buffer.from(msg));
console.log(" [x] Sent %s: '%s'", severity, msg);
});
setTimeout(function() {
connection.close();
process.exit(0)
}, 500);
});
receive_logs_direct.js
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
var args = process.argv.slice(2);
if (args.length == 0) {
console.log("Usage: receive_logs_direct.js [info] [warning] [error]");
process.exit(1);
}
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var exchange = 'direct_logs';
channel.assertExchange(exchange, 'direct', {
durable: false
});
channel.assertQueue('', {
exclusive: true
}, function(error2, q) {
if (error2) {
throw error2;
}
console.log(' [*] Waiting for logs. To exit press CTRL+C');
args.forEach(function(severity) {
channel.bindQueue(q.queue, exchange, severity);
});
channel.consume(q.queue, function(msg) {
console.log(" [x] %s: '%s'", msg.fields.routingKey, msg.content.toString());
}, {
noAck: true
});
});
});
});
./receive_logs_direct.js warning error > logs_from_rabbit.log
./receive_logs_direct.js info warning error
./emit_log_direct.js error "Run. Run. Or it will explode."
又回到我們的 log 服務上,log 不只有各種 severity level 的差別 (info/warn/crit…) 也會有其他像 facility (auth/cron/kern…).
設定廣播的 topic 都用 `.` 隔開,ex: "stock.usd.nyse", "nyse.vmw"
, "quick.orange.rabbit(最多可到 255 bytes)
binding 的方式也跟 direct 很像,選定要訂閱的 topic 就好
- `*` 可代替一個單字
- `#` 可代替 0 或多個單字
ex: *.orange.*", "*.*.rabbit", "lazy.#"
範例
Queue1: "*.orange.*"
Queue2: "*.*.rabbit", "lazy.#"
routing key: "quick.orange.rabbit"
※ 如果送出的 lazy.orange.male.rabbit 這種長度跟 "*.orange.*", "*.*.rabbit" 的合,不過跟 "lazy.#" 有 match,所以只會傳到 Queue2
emit_log_topic.js
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var exchange = 'topic_logs';
var args = process.argv.slice(2);
var key = (args.length > 0) ? args[0] : 'anonymous.info';
var msg = args.slice(1).join(' ') || 'Hello World!';
channel.assertExchange(exchange, 'topic', {
durable: false
});
channel.publish(exchange, key, Buffer.from(msg));
console.log(" [x] Sent %s:'%s'", key, msg);
});
setTimeout(function() {
connection.close();
process.exit(0)
}, 500);
});
receive_logs_topic.js:
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
var args = process.argv.slice(2);
if (args.length == 0) {
console.log("Usage: receive_logs_topic.js <facility>.<severity>");
process.exit(1);
}
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var exchange = 'topic_logs';
channel.assertExchange(exchange, 'topic', {
durable: false
});
channel.assertQueue('', {
exclusive: true
}, function(error2, q) {
if (error2) {
throw error2;
}
console.log(' [*] Waiting for logs. To exit press CTRL+C');
args.forEach(function(key) {
channel.bindQueue(q.queue, exchange, key);
});
channel.consume(q.queue, function(msg) {
console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString());
}, {
noAck: true
});
});
});
});
channel.assertQueue('', {
exclusive: true
});
channel.sendToQueue('rpc_queue', Buffer.from('10'), {
replyTo: queue_name
});
# ... then code to read a response message from the callback queue ...
AMQP 0-9-1 protocol predefines a set of 14 properties
* persistent: tutorial 2 有提到,可以確保 restart 後資料還在
* content_type: mime-type,ex: `application/json`.
* reply_to: callback queue 的名字
* correlation_id: 識別 Useful to correlate RPC responses with requests.
如果每次的 RPC callback 都建立一個 callback queue 這樣沒什麼效率,所以好的方法是只建立一個 callback 的 queue,不過為了分辨這次 callback 是哪一次哪一個 producer 傳的,所以需要一個 Correlation Id 來做識別
rpc_client.js
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
var args = process.argv.slice(2);
if (args.length === 0) {
console.log("Usage: rpc_client.js num");
process.exit(1);
}
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
channel.assertQueue('', {
exclusive: true
}, function(error2, q) {
if (error2) {
throw error2;
}
var correlationId = generateUuid();
var num = parseInt(args[0]);
console.log(' [x] Requesting fib(%d)', num);
channel.consume(q.queue, function(msg) {
if (msg.properties.correlationId === correlationId) {
console.log(' [.] Got %s', msg.content.toString());
setTimeout(function() {
connection.close();
process.exit(0);
}, 500);
}
}, {
noAck: true
});
channel.sendToQueue('rpc_queue',
Buffer.from(num.toString()), {
correlationId: correlationId,
replyTo: q.queue
});
});
});
});
function generateUuid() {
return Math.random().toString() +
Math.random().toString() +
Math.random().toString();
}
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var queue = 'rpc_queue';
channel.assertQueue(queue, {
durable: false
});
channel.prefetch(1);
console.log(' [x] Awaiting RPC requests');
channel.consume(queue, function reply(msg) {
var n = parseInt(msg.content.toString());
console.log(" [.] fib(%d)", n);
var r = fibonacci(n);
channel.sendToQueue(msg.properties.replyTo,
Buffer.from(r.toString()), {
correlationId: msg.properties.correlationId
});
channel.ack(msg);
});
});
});
function fibonacci(n) {
if (n === 0 || n === 1)
return n;
else
return fibonacci(n - 1) + fibonacci(n - 2);
}