# Flask實作_消息隊列_RabbitMQ_03_發佈與訂閱 ###### tags: `flask` `quene` `RabbitMQ` 前面兩個練習主要是一個訊息派送給一個Consumer,這次要練習的是一個訊息同時傳送給兩個Consumer,也會將在第一個練習中沒有解釋清楚的`exchange`在這邊說明清楚。 ## 練習 ```graphviz digraph graphname { rankdir=LR; //Rank Direction Left to Right P -> Q [label=HelloWorld]; Q -> C1_log [label=HelloWorld]; Q -> C2_print [label=HelloWorld]; } ``` 在實作之前,先瞭解完整的RabbitMQ模式,在第一個練習中我們介紹了RabbitMQ的三個角色: * P: Producing,即發送訊息的人員 * Q: Queue,即隊列,訊息的載體 * C: Consumer,即接收訊息的人員 『P傳送訊息給Q,而Q派送訊息給C』 但事實上P(Producing)本身並不直接傳訊息給Q,甚至不知道訊息是否已傳送給Q(隊列),它(P)必需透過exchange(交換機_X)來轉發訊息,透過定義exchange來決定P的訊息要轉發、還是同步發送或是丟掉。(想像一下我們的網路也是透過HUB、SWITCH收發封包)。 ```graphviz digraph graphname { rankdir=LR; //Rank Direction Left to Right P -> X ; X -> Q1 ; X -> Q2 ; } ``` 主要四種模式: * direct * topic * headers * fanout * 將所有訊息廣播(發送)到所有它知道的Queue * 像hub * 以此為範例 ## 練習 [官方連結](https://www.rabbitmq.com/tutorials/tutorial-three-python.html) ### 建立exchange ```graphviz digraph graphname { rankdir=LR; //Rank Direction Left to Right P -> X_logs ; } ``` ```python= channel.exchange_declare(exchange='logs', exchange_type='fanout') ``` 第1行:建立一個exchange,命名為logs 第2行:定義exchange模式為fanout ```shell rabbitmqctl list_exchanges ``` 第1行:取得所有exchange列表,amq開頭的是預設的 ### 產生Temporary queues(臨時隊列) ```graphviz digraph graphname { rankdir=LR; //Rank Direction Left to Right P -> X_logs ; X_logs ; "Temp_Q" ; } ``` ```python= channel.basic_publish(exchange='', routing_key='hello', body=message) ``` 上面是我們第一次練習的時候所建立的,當時的exchange設置為空格,而空格就是匿名設置,只要能夠對應到`routing_key`就可以順利派送出去,配合這次的練習,我們會設置為`exchange='logs'` ```python= channel.basic_publish(exchange='logs', routing_key='', body=message) ``` 第1行:設置exchange為稍早建立的logs 第2行:這種設置方式為Temporary queues(臨時隊列) 臨時隊列的部份會由系統幫我們產生一個臨時的隊列名稱,也因此在宣告的時候要空格 ```python= result = channel.queue_declare() ``` 這時候可以利用指令來查詢產生的臨時隊列名稱,它可能是`amq.gen-JzTY20BRgKO-HjmUJj0wLg` ```python= result.method.queue ``` 接著,在與Consumer斷開連結的時候就應該刪除這個臨時隊列,利用參數`exclusive=True`可以達到這個需求。 ```python= result = channel.queue_declare(exclusive=True) ``` ### Binding ```graphviz digraph graphname { rankdir=LR; //Rank Direction Left to Right P -> X_logs ; X_logs -> "Temp_Q" [label="binding"]; } ``` 目前為止我們有Exchange(logs)跟匿名隊列(Queue),現在要讓Exchange知道訊息要給那些Queue,它們兩個之間的關聯稱為Binding。 ```python= channel.queue_bind(exchange = 'logs', queue = result.method.queue) ``` 第1行:exchange為我們所設置的lgos 第2行:利用`result.method.queue`取得匿名隊列名稱 現在,你可以利用指令來查詢綁定名單 ```shell rabbitmqctl list_bindings ``` ### 程式碼整合 emit_log.py ```python= import pika import sys # 建立連結 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) # 實作channel channel = connection.channel() # 設置exchange(交換機)_名稱logs、類型fanout channel.exchange_declare(exchange='logs', exchange_type='fanout') # 訊息接收 message = ' '.join(sys.argv[1:]) or "info: Hello World!" # 訊息發送,注意到routing_key為匿名隊列,留白 channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % (message,)) # 關閉連結 connection.close() ``` receive_logs.py ```python= import pika # 建立連結 connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) # 實作channel channel = connection.channel() # 設置exchange(交換機)_名稱logs、類型fanout channel.exchange_declare(exchange='logs', exchange_type ='fanout') # 關閉之後刪除臨時隊列 result = channel.queue_declare(exclusive=True) # 取得臨時隊列名稱 queue_name = result.method.queue # 綁定exchange與queue channel.queue_bind(exchange='logs', queue=queue_name) print('[*] Waiting for logs. To exit press CTRL+C') # 設置call back def callback(ch, method, properties, body): print(" [x] %r" % (body,)) # 接收訊息 channel.basic_consume(callback, queue=queue_name, no_ack=True) # 開始接收 channel.start_consuming() ```