# Flask實作_消息隊列_RabbitMQ_02_工作隊列 ###### tags: `flask` `quene` `RabbitMQ` 在hello_world練習中我們初步了解RabbitMQ的工作原理,現在要試著加入更多變化來深入理解整個隊列與工作派送。 ## 練習 ### 工作派送 [官方連結](https://www.rabbitmq.com/tutorials/tutorial-two-python.html) 這次案例會試著利用`time.sleep`模擬工作繁忙程度,並產生多個『C』(worker)來觀察Task Quene的情況。 ```graphviz digraph graphname { rankdir=LR; //Rank Direction Left to Right P -> Q ; Q -> C1 ; Q -> C2 ; } ``` send.py調整為new_task.py ```python= import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') # 設置參數接收外部呼叫程式時傳入的字串 message = ''.join(sys.argv[1:]) or 'Hello World!' channel.basic_publish(exchange='', routing_key='hello', body=message) print(" [x] Sent % (message,)") connection.close() ``` 第10行:設置參數接收外部傳入的字串 receive.py調整為worker.py ```python= import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % (body, )) time.sleep(body.count(b'.')) print(" [x] Done") channel.basic_consume(callback, queue='hello', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() ``` 第11行:以『.』做為停頓幾秒的依據,模擬該工作忙碌,不要忘記b 現在可以開啟兩個以上的worker.py,這兩個視窗會開始等待消息,也跟著執行new_task.py,並帶參數字串『.』來讓程序等待,以觀察狀況 ```shell= python new_task.py .... python new_task.py ..... python new_task.py ...... python new_task.py ....... ``` 這時候會看到worker.py其中之一會先收到一個訊息,第二個訊息送出的時候另一個不在繁忙的會收到,然後誰先處理完手上的訊息就會再接到下一個待處理訊息,這種方式稱為『round-robin』 註:如果有編碼上的問題就在抬頭加入`# -*- coding: utf-8 -*- ` ### 訊息確認 ```graphviz digraph graphname { rankdir=LR; //Rank Direction Left to Right Q -> C[label=byebye] ; } ``` 訊息從Quene離開到Consumer之後就從記憶體消失,所以只要執行任務中有執行到一半中斷的話,那就代表該任務也遺失了,Quene並不會主動的將它轉派給其它Consumer,但這並不是我們想要的,我們希望當執行一半中斷的話,該任務主動轉派其它Consumer。 這可以透過RabbitMQ中的機制『acknowledgments』(ack)來預防,Consumer在執行一個任務之後就會跟RabbitMQ說我好了,接著RabbitMQ就會釋放並刪除這任務,一但Consumer掛了,那RabbitMQ就當你沒完成並重新派送給另一個Consumer接續任務。 另外需注意到,任務的處理沒有Timeout的概念,這是避免RabbitMQ誤會執行時間較長的任務是中斷而重新派送給予其它Consumer。 之前我們在`worker.py`中設置的`no_ack=True`參數,就是關閉回應,拿掉它就可以順利的開啟這個回應機制。 調整之後故意設置一個十秒的工作,在執行一半的時候關閉,看看系統是否順利的轉發另一個Consumer,如果你有確實的測試的話,確實的會發現到任務被順利的轉派了。 ### 訊息堆積 要注意到,訊息本身是暫存在記憶體中,一但訊息一直沒有被順利的派送出去的話就會一直堆積不會消失,如果需要確認的話可以利用指令來查詢。 ```shell= rabbitmqctl list_queues name messages_ready messages_unacknowledged ``` 註:如果環境變數沒有設置也可以直接至指令資料夾(RabbitMQ安裝資料夾\sbin)執行指令 ### 訊息持久化 一但RabbitMQ異常中斷,所有的訊息也都會跟著消失,為了確保所有的訊息的保存,我們必需在建立隊列的時候加入參數: ```python= channel.queue_declare(queue='hello', durable=True) ``` 已經產生過的queue無法重新設置,需設置新的queue並加入參數`durable=True`,也注意到這個調整是相對的,Producing與Consumer都需要設置。 調整之後如下: new_task.py ```python= # -*- coding: utf-8 -*- import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) # 設置參數接收外部呼叫程式時傳入的字串 message = ''.join(sys.argv[1:]) or 'Hello World!' channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2 )) print(" [x] Sent % (message,)") connection.close() ``` 第13行:`channel.basic_publish`也要修正 worker.py ```python= # -*- coding: utf-8 -*- import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) def callback(ch, method, properties, body): print(" [x] Received %r" % (body, )) time.sleep(body.count(b'.')) print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue='task_queue') print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() ``` 第14行:callback中加入ack,pass訊息給queue,不加的情況下完成的工作會重覆的派送 第16行:取消`no_ack=True` 註:可以自己測試下拿掉`ch.basic_ack`,會發現完成的工作在加入新的worker之後會重新派送 調整之後並不能100%的保證完美,但至少對要求不高的情況已經可以滿足了。 ### 公平調度 ```graphviz digraph graphname { rankdir=LR; //Rank Direction Left to Right P1->Q ; P2->Q ; P3->Q ; P4->Q ; P5->Q ; P6->Q ; Q -> C1[label="P1,P3,P5"] ; Q -> C2[label="P2,P4,P6"] ; } ``` 目前為止我們知道,任務的派送是以『輪播』的方式來指派,如上流程圖,如果你一口氣有六個Producing,Queue會一口氣將這六個任務通通送出給你所有的Consumer。 但是這樣子的模式並無法讓閒置資源馬上補上完成下一個任務,我們希望的是,當一個Consumer完成任務之後就接下一個,而不是大家分一分,忙的還是很忙,閒的變的更閒。 ```python= channel.basic_qos(prefetch_count=1) ``` 在建立channel的時候利用`basic_qos`宣告,`prefetch_count=1`代表每一個Consumer每次只會執行一個工作,在ack給Queue之後才會再將下一任務指派給閒置的Consumer。 調整之後的worker.py如下: ```python= # -*- coding: utf-8 -*- import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) def callback(ch, method, properties, body): print(" [x] Received %r" % (body, )) time.sleep(body.count(b'.')) print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() ``` 第17行:加入`basic_qos`設置