# Flask實作_消息隊列_RabbitMQ_02_工作隊列
###### tags: `flask` `quene` `RabbitMQ`
在hello_world練習中我們初步了解RabbitMQ的工作原理,現在要試著加入更多變化來深入理解整個隊列與工作派送。
## 練習
### 工作派送
[官方連結](https://www.rabbitmq.com/tutorials/tutorial-two-python.html)
這次案例會試著利用`time.sleep`模擬工作繁忙程度,並產生多個『C』(worker)來觀察Task Quene的情況。
```graphviz
digraph graphname {
rankdir=LR; //Rank Direction Left to Right
P -> Q ;
Q -> C1 ;
Q -> C2 ;
}
```
send.py調整為new_task.py
```python=
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
# 設置參數接收外部呼叫程式時傳入的字串
message = ''.join(sys.argv[1:]) or 'Hello World!'
channel.basic_publish(exchange='',
routing_key='hello',
body=message)
print(" [x] Sent % (message,)")
connection.close()
```
第10行:設置參數接收外部傳入的字串
receive.py調整為worker.py
```python=
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % (body, ))
time.sleep(body.count(b'.'))
print(" [x] Done")
channel.basic_consume(callback,
queue='hello',
no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
```
第11行:以『.』做為停頓幾秒的依據,模擬該工作忙碌,不要忘記b
現在可以開啟兩個以上的worker.py,這兩個視窗會開始等待消息,也跟著執行new_task.py,並帶參數字串『.』來讓程序等待,以觀察狀況
```shell=
python new_task.py ....
python new_task.py .....
python new_task.py ......
python new_task.py .......
```
這時候會看到worker.py其中之一會先收到一個訊息,第二個訊息送出的時候另一個不在繁忙的會收到,然後誰先處理完手上的訊息就會再接到下一個待處理訊息,這種方式稱為『round-robin』
註:如果有編碼上的問題就在抬頭加入`# -*- coding: utf-8 -*- `
### 訊息確認
```graphviz
digraph graphname {
rankdir=LR; //Rank Direction Left to Right
Q -> C[label=byebye] ;
}
```
訊息從Quene離開到Consumer之後就從記憶體消失,所以只要執行任務中有執行到一半中斷的話,那就代表該任務也遺失了,Quene並不會主動的將它轉派給其它Consumer,但這並不是我們想要的,我們希望當執行一半中斷的話,該任務主動轉派其它Consumer。
這可以透過RabbitMQ中的機制『acknowledgments』(ack)來預防,Consumer在執行一個任務之後就會跟RabbitMQ說我好了,接著RabbitMQ就會釋放並刪除這任務,一但Consumer掛了,那RabbitMQ就當你沒完成並重新派送給另一個Consumer接續任務。
另外需注意到,任務的處理沒有Timeout的概念,這是避免RabbitMQ誤會執行時間較長的任務是中斷而重新派送給予其它Consumer。
之前我們在`worker.py`中設置的`no_ack=True`參數,就是關閉回應,拿掉它就可以順利的開啟這個回應機制。
調整之後故意設置一個十秒的工作,在執行一半的時候關閉,看看系統是否順利的轉發另一個Consumer,如果你有確實的測試的話,確實的會發現到任務被順利的轉派了。
### 訊息堆積
要注意到,訊息本身是暫存在記憶體中,一但訊息一直沒有被順利的派送出去的話就會一直堆積不會消失,如果需要確認的話可以利用指令來查詢。
```shell=
rabbitmqctl list_queues name messages_ready messages_unacknowledged
```
註:如果環境變數沒有設置也可以直接至指令資料夾(RabbitMQ安裝資料夾\sbin)執行指令
### 訊息持久化
一但RabbitMQ異常中斷,所有的訊息也都會跟著消失,為了確保所有的訊息的保存,我們必需在建立隊列的時候加入參數:
```python=
channel.queue_declare(queue='hello', durable=True)
```
已經產生過的queue無法重新設置,需設置新的queue並加入參數`durable=True`,也注意到這個調整是相對的,Producing與Consumer都需要設置。
調整之後如下:
new_task.py
```python=
# -*- coding: utf-8 -*-
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
# 設置參數接收外部呼叫程式時傳入的字串
message = ''.join(sys.argv[1:]) or 'Hello World!'
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode = 2
))
print(" [x] Sent % (message,)")
connection.close()
```
第13行:`channel.basic_publish`也要修正
worker.py
```python=
# -*- coding: utf-8 -*-
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
def callback(ch, method, properties, body):
print(" [x] Received %r" % (body, ))
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback,
queue='task_queue')
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
```
第14行:callback中加入ack,pass訊息給queue,不加的情況下完成的工作會重覆的派送
第16行:取消`no_ack=True`
註:可以自己測試下拿掉`ch.basic_ack`,會發現完成的工作在加入新的worker之後會重新派送
調整之後並不能100%的保證完美,但至少對要求不高的情況已經可以滿足了。
### 公平調度
```graphviz
digraph graphname {
rankdir=LR; //Rank Direction Left to Right
P1->Q ;
P2->Q ;
P3->Q ;
P4->Q ;
P5->Q ;
P6->Q ;
Q -> C1[label="P1,P3,P5"] ;
Q -> C2[label="P2,P4,P6"] ;
}
```
目前為止我們知道,任務的派送是以『輪播』的方式來指派,如上流程圖,如果你一口氣有六個Producing,Queue會一口氣將這六個任務通通送出給你所有的Consumer。
但是這樣子的模式並無法讓閒置資源馬上補上完成下一個任務,我們希望的是,當一個Consumer完成任務之後就接下一個,而不是大家分一分,忙的還是很忙,閒的變的更閒。
```python=
channel.basic_qos(prefetch_count=1)
```
在建立channel的時候利用`basic_qos`宣告,`prefetch_count=1`代表每一個Consumer每次只會執行一個工作,在ack給Queue之後才會再將下一任務指派給閒置的Consumer。
調整之後的worker.py如下:
```python=
# -*- coding: utf-8 -*-
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
def callback(ch, method, properties, body):
print(" [x] Received %r" % (body, ))
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
```
第17行:加入`basic_qos`設置