---
# System prepended metadata

title: '[練習範例]RabbitMQ 消息列隊練習'
tags: [RabbitMQ]

---


# [練習範例]RabbitMQ 消息列隊練習

## 面相消息的架構

消息機制是不同應用程序之間或同一個應用程序內的不同組件之間的痛信方法。
當一個應用程序或一個組件將消息發送到指定的消息目的之後，該消息可以被一個或多個組件讀取並處理。

對於面相消息的應用架構來說，消息生產者與消息消費者之間是完全隔離。
消息生產者只負責將消息發送到消息目的，至於該消息的處理細節則是消息消費者應該關心的。
同樣的消息消費者也只面相消息目的，消息消費者從消息目的讀取並處理消息。

從而達到徹底解偶，互相只需要了解交換的消息格是即可。
![](https://i.imgur.com/tDUJx4l.png)

這裡我們舉例一個訂單子系統與庫存子系統
我們起初設定該系統為分散式是系統，訂單子系統與庫存子系統是獨立存在。
當用戶向定單子系統發出訂單，訂單子系統就需要通知庫存子系統。
如果讓訂單子系統直接調用庫存子系統對外暴露的接口，可能就會產生如下問題
* 如果庫存子系統剛好當機，庫存查詢就會失敗，從而導致訂單創建失敗。
* 訂單子系統與庫存子系統耦合高，違反分散式的設計初衷。

而若在兩個系統中間加入了消息機制
* 解除兩個子系統的耦合，彼此不需要依賴彼此狀況。
* 訂單資訊傳入消息列隊中，而庫存系統就監聽消息列隊是否有狀況即可。

消息列隊的技術也經常用於高併發的流量銷鋒場景中，可以用來控制參加活動的人數、緩解瞬間高流量時的系統壓力。
如當瞬間高流量到來時，服務器雖然接收所有請求，但並不處理這些請求，紙是將它存入消息列隊中，而當被加入消息列隊的消息數量達到我們設定的上限時，就直接回傳到錯誤題示的頁面，從而降低服務器的壓力。

另外面相消息的架構還有以下優勢
* 消息採用非同步處理機制，可以避免客戶端等待
* 消息服務器可以持久的保存消息，因而提高了系統得可靠性。
* 一條消息可以發給多個接收者，這跟傳統的方法調用有很大的不同，大大提高效率

## 整合高級消息列隊協議(Advanced Message Queuing Protocel, AMQP)
高級消息列隊協議是一種與平台無關的、線路級的消息中間協議。AMQP並不屬於JMS(Java Message Service)。
AMQP與JMS的差別
* JMS定義了消息中間件的規範，從而實現對消息統一，AMQP則通過制定協議來統一數據交互的格式。
* JMS限制了必須使用Java語言，AMQP紙製訂協議，部規定時限語言和實現方式，因此是跨語言的。
* JMS只制定了兩種消息模式，AMQP的消息模型更加靈活。
RabbitMQ就是典型的AMQP產品，他是用Erlang語言開發的。從靈活角度、性能角度都比JMS的ActiveMQ更好。

### 安裝RabbitMQ
這裡我們直接使用docker來安裝，更快速。
1. docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management
2. 服務啟動後在網址輸入[http://localhost:15672/](http://localhost:15672/)，即可登入RabbitMQ管理介面
    ![](https://i.imgur.com/wd6JCTB.png)
3. 預設帳密：guest , guest即可登入進到overview頁面
    ![](https://i.imgur.com/Q9AViCs.png)
* 點擊Admin的頁籤，這邊可以看到使用者清單以及增加使用者，Tags的部分可以用來標註該使用者是哪一類。
    ![](https://i.imgur.com/VTzNawE.png)
* 點擊使用者名稱可以進入該使用者的權限設定，包含以下4種
    1. Permissions:為root用戶設置針堆選定的虛擬主機的權限。
    2. Topic permissions:為root用戶設置針堆選定的虛擬主機、選定的Exchange的權限。
    3. Update this user
    4. Delete this user
    ![](https://i.imgur.com/7FIav2h.png)
    以上操作也可以藉由rabbitmqctl.bat輸入指令來完全，這邊不介紹了，因為我菜鳥。
* 點擊Exchange頁籤，我好懶得寫啊!!!總之這邊可以查看、新增、刪除這些Exchanges(P580)
    ![](https://i.imgur.com/ntxch2a.png)

    
### RabbitMQ的工作機制
* connection:代表客戶(包括消息生產者與消息消費者)端與RabbitMQ之間的連結。
* channel: channel位於連接內部，負責實際通訊。
* exchange:充當消息交換機的組件。
* queue:消息列隊。
![](https://i.imgur.com/BkfoJof.png)
客戶端與RabbitMQ之間的實際通信使用的是chennel，這是因為RabbitMQ採用了類似於Java NIO的坐是使用C法，避免為應用程序中的每個現成都建立單獨的連結，而是使用Chennel來重複使用連結，這樣不僅可以降低性能開銷，也便於管理。
應用程序的每個線程都能持有自己應對的chennel，因此重複使用了連結，同時RabbitMQ可以確保每個現成的私密性，就像各自擁有獨立的連結一樣。當chennel的數據流量不是很大時，重複使用單一的連結可以有效地節省連結資源。當數據流量很大時，多個chennel共用一個連結就會產生性能瓶頸，此時可以考慮建立多個連結，把chennel平攤到所有連結中。
消息生產者發送消息時，只需要定義兩個關鍵消息:
* Exchange:將該消息發送到哪個Exchange。
* Routing key:消息的路由Key。
Exchange可以分為幾類:
* fanout:廣播類型，會將此消息廣播到所有與他綁訂的消息列隊中。分發消息時不看路由key
* direct:會依據路由key來發送消息。
* topic:在比對路由key時支持通配符(??
* headers:少用，這是更具heafer中的訊息進行路由分配。

### 使用默認Exchange支持P2P(Point-to-Point)模型
RabbitMQ會自動為每個虛擬主機創建七個Exchange，而默認的Exchange的類型是direct(type是direct)，並且是持久畫保存的Exchage。
* P2P模型:
  每個消息都被發送到一個特定的隊列，接收者從隊列中獲取消息。隊列保留著消息，直到它們被消費或超時。
    每個消息只有一個消費者(Consumer)（即一旦被消費，消息就不再在消息隊列中）
    發送者和接收者之間在時間上沒有依賴性，也就是說當發送者發送了消息之後，不管接收者有沒有正在運行，它不會影響到消息被發送到隊列。
    接收者在成功接收消息之後需向隊列應答成功
    如果你希望發送的每個消息都應該被成功處理的話，那麽你需要P2P模型。
1. 創建新的spring boot 專案 [Spring initializr](https://start.spring.io/)
2. 在pom.xml裡面引入
    ```
    <dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-amqp</artifactId>
	</dependency>
    <dependency>
		<groupId>org.springframework.amqp</groupId>
		<artifactId>spring-rabbit-test</artifactId>
		<scope>test</scope>
    </dependency>
    ```
    刷新pom.xml確保有正確引入。
    引入後即代表依賴了 RabbitMQ Java Client，以下一般狀況的步驟如下:
    1. 創建ConnectionDacotry，設置接信息，再通過ConnectionFactory獲取Connection
    2. 通過Connection獲取Channel
    3. 根據需要調用Channel的queueDeclare()方法聲明消息列隊，如果聲明的列隊已經存在，該方法會直接獲取已有的列隊，如果不存在，該方法將會創建新的列隊。
    4. 調用Channel的basicConsume()方法開始處裡消息，在調用該方法時需要傳入一個Consumer參數，該參數相當於JMS中的消息監聽器。
3. 創建檔案 ConnectionUtil.java
    ```
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    public class ConnectionUtil {

    public static Connection getConnection() throws IOException, TimeoutException {
        // ConnectionFactory: a factory to create connections
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        // if the virtual host is not set, the default virtual host is "/"
        factory.setVirtualHost("/");
        return factory.newConnection();
    }
}

    ```
5. 創建檔案 P2PConsumer.java 作為消費者
    ```
    package com.example.rabbitMQPractice.basicModel.message;

    import com.example.rabbitMQPractice.connectUtil.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;

    // P2P 消息模型
    // 一個生產者對應一個消費者

    public class P2PProducer {

    public static void main(String[] args) {

        // use auto closeable to close the connection automatically to manage Connection, Channel, and other resources
        try {
            // create RabbitMQ connection
            Connection connection = ConnectionUtil.getConnection();
            // create a channel
            Channel channel = connection.createChannel();
            // declare a queue, if the queue does not exist, create it
            channel.queueDeclare("firstQueue", true/*是否持久化*/, false/*是否獨享*/, true/*是否自動刪除*/, null/*其他參數*/);
            // send a message
            for (int i = 0; i < 10; i++) {
                String message = "Hello RabbitMQ! " + i;
                channel.basicPublish("", "firstQueue", null, message.getBytes());
                System.out.println("Sent: " + message);
            }
            } catch (Exception e) {
                e.printStackTrace();

            }

       }
    }
    ```
6. 運行上面的P2PConsumer.java，由於該程序的最後位關閉Channel和Connection，因此他將一直與RabbitMQ保持連結，除非強制退出該程序。而此時打開我們的RabbitMQ的預設頁面[http://localhost:15672/](http://localhost:15672/)，點擊Queues的頁簽，會發現多了我們的firstQueue
    ![](https://i.imgur.com/1qU7a0e.png)
    而這個消息列隊我們在程式碼中把他設定為自動刪除，所以當我們沒有使用他時(沒有Exchange向列隊發送消息、沒有消息消費者監聽該列隊、該列隊中的消息數量為0)，就會自動被清除。可以強制停止剛剛啟動的程式碼，就會發現這個消息列隊被清除。


7. 接下來我們要來寫消息生產者的程式碼。大至的步驟如下:
    1. 創建ConnectionFactory，設置連結信息，在通過ConnectionFactory獲取Connection。
    2. 通過Connection獲取Channel。
    3. 根據需要調用exchange()、queueDeclate()方法聲明Exchange和消息列隊，並完成列隊與Exchange的綁定。類似的，如果聲明的Exchange不存在，就會創建新的，否則直接使用已有的Exchange。
    4. 調用Channel的basicConsume()方法開始處理消息在調用開方法時，需要傳入一個Consumer參數，該參數就是消息監聽器。
    P2PProducer.java程式碼如下
    ```
    import com.example.rabbitMQPractice.connectUtil.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;

    // P2P 消息模型
    // 一個生產者對應一個消費者

    public class P2PProducer {

    public static void main(String[] args) {

        // use auto closeable to close the connection automatically to manage Connection, Channel, and other resources
        try {
            // create RabbitMQ connection
            Connection connection = ConnectionUtil.getConnection();
            // create a channel
            Channel channel = connection.createChannel();
            // declare a queue, if the queue does not exist, create it
            channel.queueDeclare("firstQueue", true/*是否持久化*/, false/*是否獨享*/, true/*是否自動刪除*/, null/*其他參數*/);
            // send a message
            for (int i = 0; i < 10; i++) {
                String message = "Hello RabbitMQ! " + i;
                channel.basicPublish("", "firstQueue", null, message.getBytes());
                System.out.println("Sent: " + message);
            }
        } catch (Exception e) {
            e.printStackTrace();

        }

    }
    }
    ```
8. 先啟動P2PConsume，再啟動P2PProducer，在P2PProducer的console中我們可以發出去的所有消息
    ![](https://i.imgur.com/mDT0UQA.png)
    再看看P2PConsume的console可以看到我們接收到的消息
    ![](https://i.imgur.com/dAH7Ztt.png)
    basicPublish()這個方法向默認的Exchange發送消息，在發送消息時指定了P2PConsumer中的QueueName做為路由Key，這意味著該消息將會被分發給與路由Key同名的消息列隊。

### 工作列對(Work Queue)
RabbitMQ可以讓多個消費者競爭消費同一個消息列隊，這種方式我們稱為工作列隊
![](https://i.imgur.com/LnUYOeR.png)

這樣的工作模式可以達到"能者多勞"的效果，提高效率，讓消耗比較少時間就可以完成任務的消費者多消費一些。
但誰知道哪一個消費者消費的比較快呢?
其實我們沒有辦法得知，但Channel提供了一個basicQos(1)的方法，表示每個消息消費者在同一時間內最多只能得到一條消息，換言之，在消息列隊收到該消息消費者的確認之前，消息列隊不會將新的消息分發給該消息消費者，而是將消息分給其他處於空閒狀態(已返回確認)的消息消費者。

以下我們新增兩個消息消費者，其他別只在消費時增加thread.sleep(1000)，來觀察是不是能者真的可以多勞
1. Consumer1
```
import com.example.rabbitMQPractice.connectUtil.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer1 {

    public final static String Queue_Name = "firstQueue";

    public static void main(String[] args) throws IOException, TimeoutException {

        // create a connection between the consumer and the rabbitMQ server
        Connection factory = ConnectionUtil.getConnection();
        // create a chennel
        Channel channel = factory.createChannel();
        // declare a queue, if the queue does not exist, create it
        channel.queueDeclare(Queue_Name, true/*是否持久化*/, false/*是否獨享*/, true/*是否自動刪除*/, null/*其他參數*/);
        // limit the number of messages that the consumer can receive at a time
        // basicQos 可以設定一次接收多少個消息，如果設定為1，則表示一次只能接收一個消息，接收到消息後，必須回復ack，才能接收下一個消息，
        // 藉此讓消息可以平均分配給多個消費者，並確保善用效能。
        channel.basicQos(1);
        // create a consumer
        Consumer consumer = new DefaultConsumer(channel) {
            // when a message is received, the following method will be called
            // Envelope: the message envelope, which contains the message ID, routing key, exchange name, and message delivery mode
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                try {
                    // simulate the time-consuming process
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(envelope.getExchange() + "," + envelope.getRoutingKey() + "," + envelope.getDeliveryTag() + "," + message);
                // send a message to the server to confirm that the message has been received
                channel.basicAck(envelope.getDeliveryTag(), false/*是否批量確認*/);
            }
        };
        // set the consumer to the channel
        channel.basicConsume(Queue_Name, false/*是否自動確認*/, consumer);
    }
}

```
2. Consumer2
```
import com.example.rabbitMQPractice.connectUtil.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer2 {

    public final static String Queue_Name = "firstQueue";

    public static void main(String[] args) throws IOException, TimeoutException {

        // create a connection between the consumer and the rabbitMQ server
        Connection factory = ConnectionUtil.getConnection();
        // create a chennel
        Channel channel = factory.createChannel();
        // declare a queue, if the queue does not exist, create it
        channel.queueDeclare(Queue_Name, true/*是否持久化*/, false/*是否獨享*/, true/*是否自動刪除*/, null/*其他參數*/);
        // limit the number of messages that the consumer can receive at a time
        // basicQos 可以設定一次接收多少個消息，如果設定為1，則表示一次只能接收一個消息，接收到消息後，必須回復ack，才能接收下一個消息，
        // 藉此讓消息可以平均分配給多個消費者，並確保善用效能。
        channel.basicQos(1);
        // create a consumer
        Consumer consumer = new DefaultConsumer(channel) {
            // when a message is received, the following method will be called
            // Envelope: the message envelope, which contains the message ID, routing key, exchange name, and message delivery mode
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
//                try {
//                    // simulate the time-consuming process
//                    Thread.sleep(1000);
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
                System.out.println(envelope.getExchange() + "," + envelope.getRoutingKey() + "," + envelope.getDeliveryTag() + "," + message);
                // send a message to the server to confirm that the message has been received
                channel.basicAck(envelope.getDeliveryTag(), false/*是否批量確認*/);
            }
        };
        // set the consumer to the channel
        channel.basicConsume(Queue_Name, false/*是否自動確認*/, consumer);
    }
}

```
3. 接者我們先把這兩個消費者啟動
4. 啟動我們的P2PProducer
5. 我們先觀察P2PProducer的coneole
    送出了10條消息
    ![](https://i.imgur.com/u6uhsWO.png)
6. 接著觀察沒有thread.sleep(1000)的Consumer2的console
    發現只有9條消息
    ![](https://i.imgur.com/mvZsdH0.png)
7. 再觀察有thread.sleep(1000)的Consumer1的console
    發現這裡有1條消息
    ![](https://i.imgur.com/kJYPlQt.png)
8. 所以我們得到結論，Consumer1做事很慢，所以他只消費了1條，剩下的9條就都被Consumer2消費完了


### 使用fanout實現Pub-Sub消息模型
fanout類型的Exchange不會判斷消息的路由key，該Exchange直接將消息分發給綁定到他的所有列隊。
![](https://i.imgur.com/eEJryei.png)

那我們開始弄髒手吧!

1. 產生MyProducer.java
```
import com.example.rabbitMQPractice.connectUtil.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

// 使用fanout實現消息的發布pub/訂閱sub模型
// fanout: 將消息發送到所有綁定的隊列中

public class MyProducer {

    public final static String EXCHANGE_NAME = "fkjava.fanout";
    public final static String ROUTING_KEY = "test1";

    public static void main(String[] args) throws IOException, TimeoutException {
        // use autoCloseable to close the connection and channel automatically
        try (
                // create a connection between the producer and the rabbitMQ server
                Connection connection = ConnectionUtil.getConnection();
                // create a channel
                Channel channel = connection.createChannel()
        ) {
            // declare an exchange, if the exchange does not exist, create it, set the exchange type to fanout
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true/*是否持久化*/, false/*是否自動刪除*/, null/*其他參數*/);
            // declare an exchange, if the exchange does not exist, create it
            channel.queueDeclare(Consumer1.Queue_Name, true/*是否持久化*/, false/*是否獨享*/, true/*是否自動刪除*/, null/*其他參數*/);
            // queueBind: bind a queue to an exchange
            channel.queueBind(Consumer1.Queue_Name, EXCHANGE_NAME, ROUTING_KEY, null);
            // declare an exchange, if the exchange does not exist, create it
            channel.queueDeclare(Consumer2.Queue_Name, true/*是否持久化*/, false/*是否獨享*/, true/*是否自動刪除*/, null/*其他參數*/);
            // queueBind: bind a queue to an exchange
            channel.queueBind(Consumer2.Queue_Name, EXCHANGE_NAME, ROUTING_KEY, null);
            // send a message
            for (int i = 0; i < 10; i++) {
                String message = "Hello RabbitMQ! " + i;
                // basicPublish: publish a message to the specify exchange 向指定的交換器發送消息
                // routingKey: the routing key of the message
                channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
                System.out.println("Sent: " + message);
            }
        }
    }
}

```
2. 寫一個Consumer1.java，一個Consumer2.java，其他別只在Queue_Name一個是queue1，一個是queue2，其他都一樣，以下就只寫一個。
```
import com.example.rabbitMQPractice.connectUtil.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer2 {

    final static String Queue_Name = "queue2";

    public static void main(String[] args) throws IOException, TimeoutException {

        // create the connection between the consumer and the rabbitMQ server
        Connection connection = ConnectionUtil.getConnection();
        // create a channel
        Channel channel = connection.createChannel();
        // declare a queue, if the queue does not exist, create it
        channel.queueDeclare(Queue_Name, true/*是否持久化*/, false/*是否獨享*/, true/*是否自動刪除*/, null/*其他參數*/);
        // create a consumer
        Consumer consumer = new DefaultConsumer(channel) {
            // when a message is received, the following method will be called
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Received: " + message);
            }
        };
        // get the message from the queue
        channel.basicConsume(Queue_Name, true/*是否自動確認*/, consumer);

    }

}

```
3. 先啟動兩個Consumer，再啟動MyProducer。
4. 觀察console，可以發現所有消費者都收到完整的10條消息
    MyProducer.java
    ![](https://i.imgur.com/lIlGxXE.png)
    Consumer1.java
    ![](https://i.imgur.com/3l7yBsh.png)
    Consumer2.java
    ![](https://i.imgur.com/xgOxsPu.png)



### 使用Direct實現消息路由
待續.........
### 使用topic實現通佩符路由
待續.........
### RPC通信模型
待續.........
## Spring boot 的 RabbitMQ支持
待續.........
 
 
---
程式碼:https://github.com/mister33221/spring_boot_rabbitMQ_practice.git

###### tags: `RabbitMQ`