# Flask實作_消息隊列_RabbitMQ_03_發佈與訂閱
###### tags: `flask` `quene` `RabbitMQ`
前面兩個練習主要是一個訊息派送給一個Consumer,這次要練習的是一個訊息同時傳送給兩個Consumer,也會將在第一個練習中沒有解釋清楚的`exchange`在這邊說明清楚。
## 練習
```graphviz
digraph graphname {
rankdir=LR; //Rank Direction Left to Right
P -> Q [label=HelloWorld];
Q -> C1_log [label=HelloWorld];
Q -> C2_print [label=HelloWorld];
}
```
在實作之前,先瞭解完整的RabbitMQ模式,在第一個練習中我們介紹了RabbitMQ的三個角色:
* P: Producing,即發送訊息的人員
* Q: Queue,即隊列,訊息的載體
* C: Consumer,即接收訊息的人員
『P傳送訊息給Q,而Q派送訊息給C』
但事實上P(Producing)本身並不直接傳訊息給Q,甚至不知道訊息是否已傳送給Q(隊列),它(P)必需透過exchange(交換機_X)來轉發訊息,透過定義exchange來決定P的訊息要轉發、還是同步發送或是丟掉。(想像一下我們的網路也是透過HUB、SWITCH收發封包)。
```graphviz
digraph graphname {
rankdir=LR; //Rank Direction Left to Right
P -> X ;
X -> Q1 ;
X -> Q2 ;
}
```
主要四種模式:
* direct
* topic
* headers
* fanout
* 將所有訊息廣播(發送)到所有它知道的Queue
* 像hub
* 以此為範例
## 練習
[官方連結](https://www.rabbitmq.com/tutorials/tutorial-three-python.html)
### 建立exchange
```graphviz
digraph graphname {
rankdir=LR; //Rank Direction Left to Right
P -> X_logs ;
}
```
```python=
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
```
第1行:建立一個exchange,命名為logs
第2行:定義exchange模式為fanout
```shell
rabbitmqctl list_exchanges
```
第1行:取得所有exchange列表,amq開頭的是預設的
### 產生Temporary queues(臨時隊列)
```graphviz
digraph graphname {
rankdir=LR; //Rank Direction Left to Right
P -> X_logs ;
X_logs ;
"Temp_Q" ;
}
```
```python=
channel.basic_publish(exchange='',
routing_key='hello',
body=message)
```
上面是我們第一次練習的時候所建立的,當時的exchange設置為空格,而空格就是匿名設置,只要能夠對應到`routing_key`就可以順利派送出去,配合這次的練習,我們會設置為`exchange='logs'`
```python=
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
```
第1行:設置exchange為稍早建立的logs
第2行:這種設置方式為Temporary queues(臨時隊列)
臨時隊列的部份會由系統幫我們產生一個臨時的隊列名稱,也因此在宣告的時候要空格
```python=
result = channel.queue_declare()
```
這時候可以利用指令來查詢產生的臨時隊列名稱,它可能是`amq.gen-JzTY20BRgKO-HjmUJj0wLg`
```python=
result.method.queue
```
接著,在與Consumer斷開連結的時候就應該刪除這個臨時隊列,利用參數`exclusive=True`可以達到這個需求。
```python=
result = channel.queue_declare(exclusive=True)
```
### Binding
```graphviz
digraph graphname {
rankdir=LR; //Rank Direction Left to Right
P -> X_logs ;
X_logs -> "Temp_Q" [label="binding"];
}
```
目前為止我們有Exchange(logs)跟匿名隊列(Queue),現在要讓Exchange知道訊息要給那些Queue,它們兩個之間的關聯稱為Binding。
```python=
channel.queue_bind(exchange = 'logs',
queue = result.method.queue)
```
第1行:exchange為我們所設置的lgos
第2行:利用`result.method.queue`取得匿名隊列名稱
現在,你可以利用指令來查詢綁定名單
```shell
rabbitmqctl list_bindings
```
### 程式碼整合
emit_log.py
```python=
import pika
import sys
# 建立連結
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
# 實作channel
channel = connection.channel()
# 設置exchange(交換機)_名稱logs、類型fanout
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
# 訊息接收
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
# 訊息發送,注意到routing_key為匿名隊列,留白
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print(" [x] Sent %r" % (message,))
# 關閉連結
connection.close()
```
receive_logs.py
```python=
import pika
# 建立連結
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
# 實作channel
channel = connection.channel()
# 設置exchange(交換機)_名稱logs、類型fanout
channel.exchange_declare(exchange='logs',
exchange_type ='fanout')
# 關閉之後刪除臨時隊列
result = channel.queue_declare(exclusive=True)
# 取得臨時隊列名稱
queue_name = result.method.queue
# 綁定exchange與queue
channel.queue_bind(exchange='logs',
queue=queue_name)
print('[*] Waiting for logs. To exit press CTRL+C')
# 設置call back
def callback(ch, method, properties, body):
print(" [x] %r" % (body,))
# 接收訊息
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
# 開始接收
channel.start_consuming()
```