本文章資訊來源為尚硅谷RabbitMQ教學 https://www.youtube.com/playlist?list=PLmOn9nNkQxJGqsOsBEs0MPfazE4mI1QIB ## 為什麼要使用MQ? ## 1. **流量消峰** 若系統處理效能為 1000 Request/s, 但在流量的極端情況 (這裡指的是非常態的瞬時暴增流量,Ex: 秒殺搶購活動) 可能會有每秒大於1000個Request的時候, 這時多出來的Request伺服器沒辦法處理, 通常情況下只能透過限流或者創建多實例LoadBalance來解決, 但如果使用了MQ,就能將Request排入MQ做緩衝, 把一秒內下單的訂單分散成一段時間來處理, 雖然可能用戶在下單一段時間後才能確認下單成功, 但總比不能下單的體驗要好。 ## 2. **應用解偶** 假設有個訂單系統,業務邏輯如下圖: ![](https://hackmd.io/_uploads/BkR6YxiOh.png) (其中每個系統為各自獨立的server) 訂單系統成功下單後,會分別透過API呼叫這三個子系統, 這時若有任何一個的子系統發生故障,都會造成下單異常, 當訂單系統的API呼叫都改成透過MQ來呼叫後, 訂單系統若確認發布消息給MQ,即視為訂單完成, 因為後續若子系統發生故障時,這個消息仍存在於MQ中, 待系統修復後,MQ可以繼續進行發送,使得子系統繼續處理訂單信息。 ## 3. **異步處理** 有些服務間的調用是非同步的, 假設 A服務調用B服務時,B服務需要很長時間去執行, 但是A需要知道B什麼時候可以執行完, 以前有兩種方式解決: 1. A一段時間後去輪詢呼叫B的API查詢 2. A提供一個Callback api,讓B執行完之後可以呼叫A 這兩種方式都不是很優雅, 若使用MQ可以很方便的解決這個問題, A調用B服務後,只需要監聽B處理完成的消息, 當B處理完成後,會透過MQ發送一條消息給A, 這樣A服務就能即時的知道非同步處理成功的消息。 ## **RabbitMQ** ### 各名詞概念及介紹 1. **生產者(Producer)** 產生數據發送消息的程式就是生產者。 2. **消費者(Consumer)** 接收隊列中數據的為消費者,通常是一個等待接收消息的程序。 3. **隊列(Queue)** MQ裡面使用的一種數據結構,存放消息的地方,本質是一種消息緩衝區,生產者可以將消息發送到一個隊列,消費者可以嘗試從一個隊列中接收數據。 4. **交換機(Exchange)** 接收Producer的消息,並將其消息推送到Queue中,交換機負責將消息推送到**指定**隊列、或者推送到多個隊列中,或者將消息丟棄。 5. **Broker** 接收和分發消息的應用,RabbitMQ Server就是MessageBroker ![](https://hackmd.io/_uploads/SkgcP-iuh.png) ### 六大模式 1. Hello World ![](https://hackmd.io/_uploads/ryiDTfTF3.png) 從生產者\(P\)發送消息給Queue,並由消費者\(C\)處理該隊列。 2. Work Queues ![](https://hackmd.io/_uploads/rJHOTzTK2.png) 當生產者大量發消息,從而導致消費者無法即時處理消息時,就可以將消費者的數量增加,讓消費者用輪詢的方式分擔處理消息的流量。 3. Publish/Subscribe ![](https://hackmd.io/_uploads/r1p_afpY3.png) 4. Routing ![](https://hackmd.io/_uploads/r1CYaMpth.png) 5. Topics ![](https://hackmd.io/_uploads/SkdqpfTKn.png) 6. RPC ![](https://hackmd.io/_uploads/SkIsTzTY3.png) ## 消息應答機制 消費者完成一個任務可能需要一段時間,如果消費者處理一個長時間任務,並且在還未處理完成時,發生**單點故障**,Queue一但向消費者傳送了一條消息,便會刪除該消息,這時將會造成消息丟失。 為了保證消息在**發送過程中不丟失**,RabbitMQ引入了消息應答機制,在Consumer收到消息,並且**處理消息後**,告訴MQ已經處理完畢,MQ才會將該消息刪除。 ### 自動應答 在這種模式下,**消息發送給消費者後**可以立即被認為已經**傳送成功**,這種模式需要在**高吞吐量和數據安全性方面做權衡**,因為這種模式如果**消息在接收到之前或者尚未處理完**,消費者那邊出現故障導致連接關閉,則**消息就會丟失**,這種模式僅適合用在消費者可以高效處理消息不會造成消息積壓的情況下使用。 ### 手動應答的幾種方法 1. Channel.basicAck(用於肯定確認): 告知MQ該消息已經成功被處理,可以將其丟棄了。 2. Channel.basicNack(用於否定確認) 3. Channel.basicReject(用於否定確認): 與Nack相比,Reject可以批量應答 #### 批量應答(multiple) ```java channel.basicAck(deliveryTag, ${multiple}) ``` 若multiple為true,代表會批量應答channel上**所有未應答的消息**。 ### 消息自動重新入隊 若在處理消息時,還未回傳Ack就發生失去連接,此時MQ會將該消息轉發給其他消費者,由其他消費者處理,進而達到消息不丟失。 ### 發送消息範例 ```java /** * 生成一個隊列 * 1.隊列名稱 * 2.隊列裡面的消息是否持久化(默認存在內存中) * 3.該隊列是否只供一個消費者進行消費,是否進行消息共享 * 4.是否自動刪除,當最後一個消費者斷開連接以後,是否自動刪除 * 5.其他參數(死信消息、延遲消息之類設定) */ channel.queueDeclare(QUEUE_NAME,isDurable,isExclusive,isAutoDelete,null); /** * 發送一個消息 * 1.發送到指定的Exchange * 2.路由的key值 (例如queue名稱) * 3.其他參數信息 * 4.發送內容 */ channel.basicPublish("", QUEUE_NAME, null, new String("hello world").getBytes()); ```