# Flask實作_消息隊列_Celery_01_初探
###### tags: `flask` `celery`
[官方連結](http://docs.celeryproject.org/en/latest/index.html#)
在了解Celery之前建議先看過RabbitMQ,或多或少有些許的幫助。
Celery是一個簡單、靈活、的分佈式系統,由python開發,而且它不需要配置文件,在使用的時候利用參數設置即可。
常見使用場景:
1. web中當執行一個時間較久的需求的時候會讓使用者體驗非常不佳,因為整個網頁開在那邊也不知道是死還是活,這時候利用celery將任務派送出去,使用者只需要等待結果。
* 常見如註冊驗證信,點擊之後會告訴你請確認郵件,這時候它的寄送行為已經由別人接手,所以你感受不到也不需要等待信件寄出。
2. 可以安排定時任務
* 如Linux中crontab的排程
在實作之前還需要認識幾個Celery的相關角色:
1. Broker: 代理人
* 普遍使用RabbitMQ
* pika
* librabbitmq
* 也可以選擇Redis
2. Worker: 執行任務的人
* celery支援本地也支援遠端
3. Producer: 派送任務的人
* 跟flask結合的話,flask就是producer
4. Beat: 定期任務
* [官方連結](http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html)
5. Result: 保持結果
* 在任務完成之後,會將結果保存
* SQLAlchemy、Memcached、Redis、RPC(RabbitMQ/AMQP)
* 普遍使用Redis
它們的關係跟RabbitMQ還蠻像的,見下圖:
```graphviz
digraph graphname {
rankdir=LR; //Rank Direction Left to Right
Producer -> Broker ;
Beat -> Broker ;
Broker -> Worker;
Worker ->Result;
}
```
Producer與Beat本身無法直接與Worker溝通,必需透過Broker才能將任務派送給予Worker,這跟RabbitMQ中的Producer無法直接提供訊息給Consumer一樣,必需經過Exchange發送給Queue之後才能轉發給Consumer。
## 練習
一樣的,利用官方文件來初探來做練習,初步了解到Celery是需要的。
### 事前準備
* 確認要使用的Broker
* 依建議使用RabbitMQ
* 相關說明可見RabbitMQ文章
* 安裝celery
* pip install celery
### 實作Celery_設置broker
* tasks.py
```python=
from celery import Celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def add(x, y):
return x + y
```
第3行:實作Celery,第一個參數為當前module名稱,第二個參數是Broker的路徑
第5行:利用裝飾器`app.task`來設置一個執行加法的function
接下來,在目錄底下執行shell
```shell
>>> celery -A tasks worker --loglevel=info
```
參數-A:會尋找tasks這個module,我們實作celery的時候設置是tasks,故後接tasks
如果有順利的話,就可以看的到啟動成功的訊息

如果需要幫助的就可以使用`--help`
```shell=
celery worker --help
celery --help
```
另外開啟一個telnet來測試,確實的得到了回應,回傳了AsyncResult
```shell
>>> from tasks import add
>>> add.delay(4, 4)
<AsyncResult: 97975726-60ee-427a-95a5-424e05e88578>
```
目前為止,我們還沒有設置保存結果,因此看不到結果,往下繼續前進...
可查詢rabbitmq目前的隊列訊息
```shell
rabbitmqctl list_queues
```
註:Windows環境下如果執行`add.delay(4,4)`出現異常訊息,可見參考3說明。
### Celery_result_backend
如果你有安裝redis,可以直接使用redis來做backend,配合官方案例,我們直接使用rpc。
* `tasks.py`
```python=
# -*- coding: utf-8 -*-
from celery import Celery
app = Celery('tasks', backend='rpc://', broker='pyamqp://guest@localhost/')
@app.task
def add(x, y):
return x + y
```
現在,我們可以開啟shell來測試了
```python=
>>> from tasks import add
>>> result = add.delay(4,4)
>>> result.ready()
True
>>> result.info
8
>>> result.result
8
>>> result.get()
8
```
第3行:result.ready()用來取得該任務狀態,若完成回傳True,反之為False
更多的AsyncResult說明,請參閱下面官方說明連結
[AsyncResult](http://docs.celeryproject.org/en/latest/reference/celery.result.html#celery.result.AsyncResult.ready)
```
Warning
Backends use resources to store and transmit results.
To ensure that resources are released,
you must eventually call get() or forget()
on EVERY AsyncResult instance returned after calling a task.
```
### Configuration
Celery並不需要太多配置,普遍來說預設的配置就夠以使用了,但如果需要的話還是可以自己調適,它的配置方式跟flask很相似。
1. app.conf.update()
2. app.config_from_object('celeryconfig')
`celeryconfig.py`
```python
broker_url = 'pyamqp://'
result_backend = 'rpc://'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
```
如果想確認配置文件是否正常,也可以利用語法來驗證
```shell
python -m celeryconfig
```
[完整配置說明](http://docs.celeryproject.org/en/latest/userguide/configuration.html#configuration)
## 參考
1. [flask_web作者_miguelgrinberg](https://blog.miguelgrinberg.com/post/using-celery-with-flask)
* miguelgriberg分享了celery with flask的應用,說明的很清楚,建議閱讀。
2. [skychang_redis on windows](https://skychang.github.io/2017/04/09/Redis-Create_Redis_HA/)
* Redis並不支援Windows,這不令人崩潰,因為Windows團隊有人接手處理。
* [Windows Redis 直接下載頁面](https://github.com/MicrosoftArchive/redis/releases)
3. [孔天逸's Blog](https://blog.csdn.net/qq_30242609/article/details/79047660)
* `Task handler raised error: ValueError('not enough values to unpack (expected 3, got 0)',)`
* Windows上執行Celery4.x的時候會有問題,需要另外安裝套件排除
* `pip install eventlet`
* `celery -A <mymodule> worker -l info -P eventlet`
* 加入參數`-P eventlet`