Try   HackMD

RabbitMQ 筆記

tags: 技術備忘錄 RabbitMQ

參照RabbitMQ實戰指南 , 官方API

安裝篇->RabbitMQ 快速安裝筆記

進階篇->RabbitMQ 叢集搭建



底下開始為RabbitMQ預設走AMQP的情況
MQTT請往最下

大概的層級概念

Virtual Hosts可以複數的User跟Queue

User跟Queue都是建立在Virtual Hosts底下

發送







hierarchy



User

User



Virtual_Hosts

Virtual_Hosts



User->Virtual_Hosts





Virtual_Hosts->User





Exchange

Exchange



Virtual_Hosts->Exchange





Routing_Key

Routing_Key



Exchange->Routing_Key





Queue

Queue



Exchange->Queue





Routing_Key->Exchange





Routing_Key->Queue





接收







hierarchy



User

User



Virtual_Hosts

Virtual_Hosts



User->Virtual_Hosts





Virtual_Hosts->User





Queue

Queue



Virtual_Hosts->Queue





交換器 Exchange

每當一個Virtual Hosts建立就會產生一組預設的Exchange

預設產生的Exchange們

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

路由鍵

  1. Bindings (BindingKey)
    • 對應建立在Exchange下,實際上就是Server端Exchange下的RoutingKey
  2. RoutingKey
    • 為Client端帶的RoutingKey
    • 對應BindingKey
    • 以" . "分割字串

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

完成效果類似這樣
在"amq.direct"(Exchange)下,
建立Bindings將Key指為"OAO.key",
並將資料倒給"OAO"(Queue) > (可以給Queue or Exchange)

To Exchange的樣式

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

交換器類型(Exchange type)

  1. fanout

    • 將資料倒給所有綁定在此Exchange的Queue
      • 如果沒有綁定Queue會自動丟棄進來的資料
  2. direct

    • 依照Routing Key將資料倒給對應的Queue
      • 如果沒有對應的Routing Key會自動丟棄進來的資料
  3. topic

    • 同direct,差別: topic支援 # 或 * 的模糊搜尋
      • # : 多字串
      • * : 單一字串

    模糊搜尋,必須設定在BindingKey上(Server端)
    類似這樣↓

    Image Not Showing Possible Reasons
    • The image file may be corrupted
    • The server hosting the image is unavailable
    • The image path is incorrect
    • The image format is not supported
    Learn More →

  4. headers

    • 根據資料的header做導向
    • 不實用,不要理他

預設情況

在沒有指定的Exchange的情形下所有的資料都會進入預設的Exchange

如圖,為預設的Exchange

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

可以從圖看到這個Exchange的type為direct
也就是說進來的資料必須要有Routing Key做對應
而預設Exchange的Routing Key就是直接對應Queue Name

Exchange default : Routing Key <> Queue Name

Queue

進入Queue的資料在有人收走以前是不會消失的

設定參數

建立新Queue可以設定的參數的定義

Durability

  • Durable : 系統關機後會保存
  • Transient : 系統關機後Queue會消失

Auto delete

  • No : Queue不會自我刪除
  • Yes : Queue在最後一個連線到自己的連線斷了之後會自我刪除

備註: 當Queue建立時,假設Auto delete設為Yes,那在第一個人與這隻Queue建立連線之前,Queue的Auto delete機制都不會觸發

另有名為 Quorum Queues 的特別 Queue,參照RabbitMQ 叢集搭建 - Quorum Queues

實作

相關套件

RabbitMQ.Client

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

題外話,網路上有人說這套比官方的那套好用

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

可是我拉這套怎樣都連不進Server,莫名其妙

共通部分

建立連線的基本資料,發送跟接收都會用到

public class RabbitMqFactory
{
    public static ConnectionFactory Create()
    {
        return new ConnectionFactory()
        {
            HostName = "192.168.11.142",
            UserName = "dpc",
            Password = "123",
            VirtualHost = "DPC"
        };
    }
}

接收

var factory = RabbitMqFactory.Create();
using (var connection = factory.CreateConnection())
{
    using (var channel = connection.CreateModel())
    {
        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body;
            var message = Encoding.UTF8.GetString(body);
            Console.WriteLine(" [x] Received {0}", message);
        };
        //底下代表同時監聽OAO,OAO2,OAO3
        channel.BasicConsume(queue: "OAO",
            autoAck: true,
            consumer: consumer);
        channel.BasicConsume(queue: "OAO2",
            autoAck: true,
            consumer: consumer);
        channel.BasicConsume(queue: "OAO3",
            autoAck: true,
            consumer: consumer);

        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }
}

發送

var factory = RabbitMqFactory.Create();
using (var connection = factory.CreateConnection())
{
    using (var channel = connection.CreateModel())
    {
        string message = "Hello World!";
        var body = Encoding.UTF8.GetBytes(message);
                        
        channel.BasicPublish(exchange: "",
                             routingKey: "OAO",
                             basicProperties: null,
                             body: body);
        Console.WriteLine(" [x] Sent {0}", message);
    }
}

建立Exchange

channel.ExchangeDeclare("testOAO",ExchangeType.Direct);

建立Queue

channel.QueueDeclare(queue: "OAO",
                     durable: false,
                     exclusive: false,
                     autoDelete: false,
                     arguments: null);

查詢卡在Queue上的資料

BasicGetResult result = channel.BasicGet("QueueName", false);
uint count = result != null ? result.MessageCount : 0;

https://stackoverflow.com/questions/1038318/check-rabbitmq-queue-size-from-client


For MQTT

RabbitMQ 預設是沒有MQTT的情況
要走MQTT要開對應的功能

連線狀況

在用MQTT協定連RabbitMQ時
Client的連線設定會長的不一樣

MQTT Client可以帶的值有

  • User Name
  • Password
  • Topic (?)

MQTT協定下預設走 "/" 的 Virtual Hosts

因為預設走 "/"
所以如果 User 沒有辦法操作 "/" 的 VH
連線會直接不過

每個進來的連線會自動開對應那個連線的Queue

並且所有資料都走預設的Exchange ==> "amq.topic"

上面說 Client 帶的 Topic 會直接變成 RabbitMQ 的 Routing key

然後上面的C#接收方式只能直接對接Queue
所以在接收端的實作上可能會需要依照Routing key
在接受後再做一次判斷 (??)

其他就都一樣 ((不確定,有狀況再補