# Celery_Next Steps
###### tags: `celery` `celery 5.2` `python`
[Celery_Next Steps](https://docs.celeryproject.org/en/stable/getting-started/next-steps.html)
## Using Celery in your Application
### Our Project
Project layout:
```
proj/__init__.py
/celery.py
/tasks.py
```
#### proj/celery.py
```python=
from celery import Celery
app = Celery('proj',
broker='amqp://',
backend='rpc://',
include=['proj.tasks'])
# Optional configuration, see the application user guide.
# 設置result的存活時間為3600秒
app.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
app.start()
```
在這個模組中我們建立了一個[Celery](https://docs.celeryproject.org/en/stable/reference/celery.html#celery.Celery)的實例(有時候稱為app)。想要在專案中使用這個Celery實例,你就只需要把它import進去就可以。
* 參數`broker`指定要使用的broker的URL
* 更多可參考[Choosing a Broker](https://hackmd.io/@shaoeChen/BJ2tEIjtt#Choosing-a-Broker)
* 參數`backend`指定要使用的result backend
* 這用來追蹤任務狀態與結果(results)。雖然預設情況下result的設置是disable的,不過這邊還是會使用RPF來做為result backend,因為後續我們要來說明檢索結果的作業方式。也許你會想要在你的應用程式中使用不同的backend。不過,不同的backend有不同的優缺點。如果你不需要results,那就最好把result disable掉。當然,之前說過,我們可以利用`@task(ignore_result=True)`來對個別的任務做result是否disable的設置。
* 更多可參考[Keeping Results](https://hackmd.io/@shaoeChen/BJ2tEIjtt#Keeping-Results)
* 參數`include`是worker起動的時候要導入的模組清單。如果有多的任務,那就要在這邊加入任務模組,這樣workers就能夠找到我們的任務
#### proj/tasks.py
```python=
from .celery import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)
```
### Starting the worker
`celery`可以用來啟動worker,只是你需要在proj上的目錄中執行worker:
```shell=
$ celery -A proj worker -l INFO
```
worker啟動的時候你應該會看到下面的執行畫面:
```shell
--------------- celery@halcyon.local v4.0 (latentcall)
--- ***** -----
-- ******* ---- [Configuration]
- *** --- * --- . broker: amqp://guest@localhost:5672//
- ** ---------- . app: __main__:0x1012d8590
- ** ---------- . concurrency: 8 (processes)
- ** ---------- . events: OFF (enable -E to monitor this worker)
- ** ----------
- *** --- * --- [Queues]
-- ******* ---- . celery: exchange:celery(direct) binding:celery
--- ***** -----
[2012-06-08 16:23:51,078: WARNING/MainProcess] celery@halcyon.local has started.
```
\- `broker`就是你在celery模組中那個`broker`參數所指定的URL。你也可以在命令行中使用**-b** 來指定不同的broker。
\- `concurrency`是用於設置並行處理任務的prefork worker process的數量。所設置數量的worker都很忙的時候,新的任務就必需要等到舊任務完成之後才能接續著被處理。
預設的數量為執行機器上的CPU的數量(包含核心數)。你可以利用[celery worker -c](https://docs.celeryproject.org/en/stable/reference/cli.html#cmdoption-celery-worker-c)來指定數量。沒有建議的設置,因為最佳數量取決於很多因素,不過如果你的任務主要受限於I/O的話,那可以試著增加數量。實驗證明,數量大到CPU數的兩倍以上不止沒有效,還會降低效能。
包含預設的prefork pool,Celery還支援使用Eventlet、Gevent、與single thread執行(見Concurrency[)
](https://docs.celeryproject.org/en/stable/userguide/concurrency/index.html#concurrency)
\- Events是一個可選項目,它讓Celery為worker中正發生的事情派送監控訊息(事件)。也有監控程式,像是celery events與Flower,即時的Celery監控器,更多可參考[Monitoring and Management guide](https://docs.celeryproject.org/en/stable/userguide/monitoring.html#guide-monitoring)。
-\ Queue是workers會從中取用任務的佇列清單。worker會被告知一次從多個佇列中取用任務,這用於將訊息路由到指定的worker,以做為服務品質、分離關注點、排列優先序的一種手段,更多可參考[Routing Guide](https://docs.celeryproject.org/en/stable/userguide/routing.html#guide-routing)。
你可以在command-line加入參數`--help`來取得完整的清單:
```shell=
$ celery worker --help
```
更多細節可參考[Workers Guide](https://docs.celeryproject.org/en/stable/userguide/workers.html#guide-workers)。
### Stopping the worker
要停止worker就只需要按一下Control-c。更多關於worker支援的信號清單可參考[Workers Guide](https://docs.celeryproject.org/en/stable/userguide/workers.html#guide-workers)
### In the background
在生產環境中,你會想要在背景執行worker,相關細節在[daemonziation tutorial](https://docs.celeryproject.org/en/stable/userguide/daemonizing.html#daemonizing)有說明。
這種常駐程式的腳本(daemonization scripts)使用命令`celery multi`在背景啟動一個或多個workers:
```shell=
$ celery multi start w1 -A proj -l INFO
celery multi v4.0.0 (latentcall)
> Starting nodes...
> w1.halcyon.local: OK
```
你也可以重新啟動它:
```shell=
$ celery multi restart w1 -A proj -l INFO
celery multi v4.0.0 (latentcall)
> Stopping nodes...
> w1.halcyon.local: TERM -> 64024
> Waiting for 1 node.....
> w1.halcyon.local: OK
> Restarting node w1.halcyon.local: OK
celery multi v4.0.0 (latentcall)
> Stopping nodes...
> w1.halcyon.local: TERM -> 64052
```
或停止它:
```shell=
$ celery multi stop w1 -A proj -l INFO
```
停止的命令(stop)非同步的(asynchronous),因此它並不會等待worker關閉(shutdown)。如果你想要確保在退出之前所有當前正在執行的任務都已完成,那你可能要改用`stopwait`:
```shell=
$ celery multi stopwait w1 -A proj -l INFO
```
:::warning
Note:
`celery multi`並不會保存關於workers的信息,所以你在重新啟動的時候需要使用相同的命令參數。在停止的時候只能使用相同的pidfile與logfile參數。
:::
預設情況下,它會在當前的目錄中建立pid與log files。為了預防多個workers打架,我們鼓勵你把worker放專用目錄下:
```shell=
$ mkdir -p /var/run/celery
$ mkdir -p /var/log/celery
$ celery multi start w1 -A proj -l INFO --pidfile=/var/run/celery/%n.pid \
--logfile=/var/log/celery/%n%I.log
```
使用`multi`這個命令,你可以啟動多個workers,還有一個強大的命令語法可以為不同的worker指定參數,舉例來說:
```shell=
$ celery multi start 10 -A proj -l INFO -Q:1-3 images,video -Q:4,5 data \
-Q default -L:4,5 debug
```
更多範例可參考API reference中的[multi](https://docs.celeryproject.org/en/stable/reference/celery.bin.multi.html#module-celery.bin.multi)模組說明。
### About the [--app](https://docs.celeryproject.org/en/stable/reference/cli.html#cmdoption-celery-A) argument
[--app](https://docs.celeryproject.org/en/stable/reference/cli.html#cmdoption-celery-A)這個參數以module.path:attribute的形式來指定要使用的Celery app instance。
不過它也支援捷徑的形式。如果我們只指定一個package的名稱,它會試著去按下面的順序來尋找app instance:
With [--app=proj:](https://docs.celeryproject.org/en/stable/reference/cli.html#cmdoption-celery-A)
1. 名為`proj.app`的屬性名稱,或
2. 名為`proj.celery`的屬性名稱,或
3. 模組`proj`中的任意屬性,其值為Celery application,或
如果上面都沒有找到,那就會去試著找一下名為`proj.celery`的子模組:
4. 名為`proj.celery.app`的屬性名稱,或
5. 名為`proj.celery.celery`的屬性名稱,或
6. 模組`proj.celery`中的任意屬性,其值為Celery application
這個方案模仿了文檔中所使用的實踐,也就是用於包含單一模組的`proj:app`,以及更大型專案的`proj.celery:app`
## Calling Tasks
你可以使用`delay()`來呼叫一個任務:
```python=
>>> from proj.tasks import add
>>> add.delay(2, 2)
```
這種呼叫任務的方式是另一個名為`apply_async()`的star-argument shortcut(星狀參數?):
```python=
>>> add.apply_async((2, 2))
```
採用`apply_async()`你可以指定執行選項(options),像是執行時間、要送達的佇列名稱等等:
```python=
>>> add.apply_async((2, 2), queue='lopri', countdown=10)
```
上面的範例中,任務會被派送到一個名為`lopri`的佇列,然後任務會在派送到worker的10秒後執行。
直接執行任務就只會在當前的process中執行任務,並不會派送訊息:
```python=
>>> add(2, 2)
4
```
`delay(), apply_async()`跟執行`(__call__)`,這三個方法構成Celery調用的接口(calling API),也用於singatures(簽名、簽章)。
更多關於API的部份可參考[Calling User Guide](https://docs.celeryproject.org/en/stable/userguide/calling.html#guide-calling)。
每個任務的調用都會被賦武一個唯一的識別符(UUID),這就是任務的id,task id。
`delay`與`apply_async`都會回傳一個[AsyncResult](https://docs.celeryproject.org/en/stable/reference/celery.result.html#celery.result.AsyncResult) instance,這可以用來追蹤任務的執行狀態。不過你要記得啟用[result backend](https://docs.celeryproject.org/en/stable/userguide/tasks.html#task-result-backends)才有機會去保存任務狀態。
預設會disable result backend是因為,我們並沒有適合每一個應用程式的result backend;你要自己考慮每個result backend的優缺點。對於多數的任務來說,去保留它的回傳值也不知道要幹嘛,所以預設disable result backend是好的。也要注意到,result backend並不是用來監控任務並是workers:如果你想監控,那請用專用工具(參考[Monitoring and Management Guide](https://docs.celeryproject.org/en/stable/userguide/monitoring.html#guide-monitoring))
如果你有設置result backend,那你就可以檢索任務的回傳值:
```python=
>>> res = add.delay(2, 2)
>>> res.get(timeout=1)
4
```
你可以利用`id`來找到task id:
```python=
>>> res.id
d6b3aea2-fb9b-4ebc-8da4-848818db9114
```
如果任務拋出異常,你也可以檢查異常與回溯,事實上,`result.get()`預設就會傳播所有的錯誤:
```python=
>>> res = add.delay(2, '2')
>>> res.get(timeout=1)
```
```python
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "celery/result.py", line 221, in get
return self.backend.wait_for_pending(
File "celery/backends/asynchronous.py", line 195, in wait_for_pending
return result.maybe_throw(callback=callback, propagate=propagate)
File "celery/result.py", line 333, in maybe_throw
self.throw(value, self._to_remote_traceback(tb))
File "celery/result.py", line 326, in throw
self.on_ready.throw(*args, **kwargs)
File "vine/promises.py", line 244, in throw
reraise(type(exc), exc, tb)
File "vine/five.py", line 195, in reraise
raise value
TypeError: unsupported operand type(s) for +: 'int' and 'str'
```
如果你並不希望這些錯誤訊息也跟著傳播的話,那你可以利用參數`propagate`來disable:
```python=
>>> res.get(propagate=False)
TypeError("unsupported operand type(s) for +: 'int' and 'str'")
```
這種情況下,它回傳的就是引發異常的實例,所以阿,要檢查這個任務是成功還是失敗,你就要在result instance上用相對應的方法:
```python=
>>> res.failed()
True
>>> res.successful()
False
```
那,它是怎麼知道任務是成功還是失敗呢?這可以透過查詢任務狀態來確認:
```python=
>>> res.state
'FAILURE'
```
同一時間一個任務只會處於一個狀態,不過它是會有很多狀態變化的。常見的任務階段如下:
```
PENDING -> STARTED -> SUCCESS
```
起始狀態是一個特殊的狀態,只有在enable [task_track_started](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-task_track_started)這個設置或者是你為單一任務設置 `@task(track_started=True)`選項的時候才會記錄。
pending這個狀態實際上並不是狀態的記錄,而是所有未知的task id的預設狀態:你可以從下面範例看的出來:
```python=
>>> from proj.celery import app
>>> res = app.AsyncResult('this-id-does-not-exist')
>>> res.state
'PENDING'
```
如果你的任務重新再來一次,那任務階段就會變的更複雜。為了說明這點,我們說,有個任務我們重新跑兩次,那它的階段變化就會是:
```
PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS
```
更多關於任務狀態的細節,你可以參考[Tasks的States](https://docs.celeryproject.org/en/stable/userguide/tasks.html#task-states)說明。
更多呼叫任務的細節說明請見[Calling Guide](https://docs.celeryproject.org/en/stable/userguide/calling.html#guide-calling)。
## Canvas: Designing Work-flows
你剛才已經學會怎麼用`delay`這個方法來呼叫一個任務,這通常就是你需要的。不過有時候你會希望可以把任務調用的簽章(signature)傳遞給另一個process,或是做為參數傳遞給另一個函數,要滿足這些情況,在Celery所使用的就稱之為signatures(簽章)。
signature包裝單一任務調用的參數與執行選項,以此傳遞給函數,甚至可以序列化然後通過網路派送。
你可以用參數`(2,2)`跟`coutdown=10`來幫`add`這個任務加入簽章:
```python=
>>> add.signature((2, 2), countdown=10)
tasks.add(2, 2)
```
另外有一個star arguments的快捷方式:
```python=
>>> add.s(2, 2)
tasks.add(2, 2)
```
### And there’s that calling API again…
signature instances也支援API,這意思就是說,它們也有`delay`與`apply_async`這些方法可以用。
不過這有一點不同,在signature可能已經指定參數簽章(argument signature)。`add`這個任務帶有兩個參數,因此,指定兩個參數的signature就可以購構一個完整的signature:
```python=
>>> s1 = add.s(2, 2)
>>> res = s1.delay()
>>> res.get()
4
```
不過,你也可以弄一個不完整的signature,這樣子的方式稱為partials:
```python=
# incomplete partial: add(?, 2)
>>> s2 = add.s(2)
```
上面範例看的到,`s2`現在是一個partial signature,還需要另一個參數才會完成,你可以在呼叫這個signature的時候解決這問題:
```python=
# resolves the partial: add(8, 2)
>>> res = s2.delay(8)
>>> res.get()
10
```
這邊看的到,你把參數`8`加入現在的參數`2`之前形成`add(8, 2)`,完整的signature。關鍵字參數(keyword arguments)也可以在後面再加入;然後把後面加入的跟前面的做合併,有重複的話就後面的為主:
```python=
>>> s3 = add.s(2, 2, debug=True)
>>> s3.delay(debug=False) # debug is now False.
```
一如先前所說,signatures也支援API:這意味著
* `sig.apply_async(args=(), kwargs={}, **options)`
* 使用可選的partial arguments與partial keyword arguments來呼叫signature。也支援partial execution options
* `sig.delay(*args, **kwargs)`
* `apply_async` star argument的版本。任何的參數都會被丟到signature中的參數之前,然後關鍵字參數(keyword argumetns)就會跟現有已存的合併
這看起來就是非常實用,不過你實際上可以用這些來做點什麼呢?為此,我們必需要引入canvas primitives
### The Primitives
:::warning
* [group](https://docs.celeryproject.org/en/stable/userguide/canvas.html#canvas-group)
* [map](https://docs.celeryproject.org/en/stable/userguide/canvas.html#canvas-map)
* [chain](https://docs.celeryproject.org/en/stable/userguide/canvas.html#canvas-chain)
* [startmap](https://docs.celeryproject.org/en/stable/userguide/canvas.html#canvas-map)
* [chord](https://docs.celeryproject.org/en/stable/userguide/canvas.html#canvas-chord)
* [chunks](https://docs.celeryproject.org/en/stable/userguide/canvas.html#canvas-chunks)
:::
這些primitives本身就是一個signature objects,因此它們可以用多種方式來結合複雜的工作流。
:::warning
Note:
這些範例要檢索results,所以如果你想試試,那你就要配置一個result backend。上面的範例已經有配置了(見[Celery](https://docs.celeryproject.org/en/stable/reference/celery.html#celery.Celery))
:::
下面來看幾個範例吧。
#### Gruops
[group](https://docs.celeryproject.org/en/stable/reference/celery.html#celery.group)會平行的呼叫一個任務清單(list of tasks),它會回傳一個特別的result instance給你做為你檢查這個group的result,並且依序檢索其回傳值。
```python=
>>> from celery import group
>>> from proj.tasks import add
>>> group(add.s(i, i) for i in range(10))().get()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
```
* Partial group
```python=
>>> g = group(add.s(i) for i in range(10))
>>> g(10).get()
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
```
#### Chains
任務可以鏈結在一起,這樣你就可以在一個任務回傳之後再呼叫另一個:
```python=
>>> from celery import chain
>>> from proj.tasks import add, mul
# (4 + 4) * 8
>>> chain(add.s(4, 4) | mul.s(8))().get()
64
```
或者做一個partial chain:
```python=
>>> # (? + 4) * 8
>>> g = chain(add.s(4) | mul.s(8))
>>> g(4).get()
64
```
Chains也可以寫成這樣:
```python=
>>> (add.s(4, 4) | mul.s(8))().get()
64
```
#### Chords
chord是帶有callback的group:
```python=
>>> from celery import chord
>>> from proj.tasks import add, xsum
>>> chord((add.s(i, i) for i in range(10)), xsum.s())().get()
90
```
group鏈結到另一個任務的話,就會自動轉為chord:
```python=
>>> (group(add.s(i, i) for i in range(10)) | xsum.s())().get()
90
```
因為這些primitives都是signature type,所以它們可以根據需求做任何你想要做的組合,就像是:
```python=
>>> upload_document.s(file) | group(apply_filter.s() for filter in filters)
```
更多資訊請參考[Canvas user guide](https://docs.celeryproject.org/en/stable/userguide/canvas.html#guide-canvas)。
## Routing
Celery支援AMQP所提供的所有路由的工具,也支援將訊息派送到named queues的簡單路由。
[task_routes](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-task_routes)的設置讓你可以按著名稱來路由任務,並且將所有的東西都集中在一個位置:
```python=
app.conf.update(
task_routes = {
'proj.tasks.add': {'queue': 'hipri'},
},
)
```
你也可以在執行的時候指定參數`queue`給`apply_async`:
```python=
>>> from proj.tasks import add
>>> add.apply_async((2, 2), queue='hipri')
```
然後,你就可以利用指定`celery worker -Q`的選項讓worker從這個佇列中取用任務:
```shell=
$ celery -A proj worker -Q hipri
```
你可以用逗號來指定多個佇列。舉例來說,你可以worker從預設的佇列跟hipri這個來取得任務,另外,預設的佇列名稱為celery,這有它的歷史意義在:
```shell=
$ celery -A proj worker -Q hipri,celery
```
佇列的順序不重要,因為worker會給它們相同的權重。
更多關於路由、AMQP routing的部份請參考[Routing Guide](https://docs.celeryproject.org/en/stable/userguide/routing.html#guide-routing)。
## Remote Control
如果你使用的是RabbitMQ(AMQP)、 Redis、或是Qpid做為broker,那麼你就可以在執行的時候控制、檢查worker。
舉例來說,你可以查看worker當下正在執行那一個任務:
```shell=
$ celery -A proj inspect active
```
這是透過使用廣播訊息(broadcast messaging)來實現的,所以集群中的每一個worker都會收到遠端控制命令。
你當然可以用`--destination`這個選項來指定一個或是多個workers。下面給出以逗號分隔的worker host names的清單:
```shell=
$ celery -A proj inspect active --destination=celery@example.com
```
如果你沒有提供`destination`,那每一個worker就通通會回復這個request。
`celery inspect`這個命令包含不會改變worker中任何事情的命令;它只會回傳關於worker內部發生的事情的信息與統計分析資訊。對於可執行的inspect,你可以執行:
```shell=
$ celery -A proj inspect --help
```
然後,也有一個`celery control`,這就真的會去改變worker:
```shell=
$ celery -A proj control --help
```
舉例來說,你可以強制worker啟用事件訊息(用來監控任務與workers):
```shell=
$ celery -A proj control enable_events
```
當事件(events)被啟用之後,你就可以啟動事件轉儲(event dumper)來看看workers正在做什麼:
```shell=
$ celery -A proj events --dump
```
或者你可以啟動curses interface(游標介面?):
```shell=
$ celery -A proj events
```
看完你想看的東西之後,你可以再次的disable events:
```shell=
$ celery -A proj control disable_events
```
`celery status`還使用遠端控制命令,並顯示集群中在線的workers的清單:
```shell=
$ celery -A proj status
```
關於`celery`命令與監控更多的資訊,你可以參考[Monitoring Guide](https://docs.celeryproject.org/en/stable/userguide/monitoring.html#guide-monitoring)。
## Timezone
內部與訊息中的所有時間、日期都是使用UTC時區。
當worker接收到一個訊息的時候,假設我們設置了countdown(倒數計時器),它會將UTC time轉為local time。如果你希望使用跟系統時區不一樣的時區,那麼你必需要用`timezone`在組態配置上做設置:
```python=
app.conf.timezone = 'Europe/London'
```
## Optimization
預設的組態配置並未對吞吐量做任何的優化。預設情況下,它是比較中間,不多也不少的方式來設置,這是吞吐量與公平調度(fair scheduling)之間的一個折衷。
如果你有很嚴格的公平調度需求,或者你希望可以最佳化吞吐量,那你應該看看[Optimizing Guide](https://docs.celeryproject.org/en/stable/userguide/optimizing.html#guide-optimizing)。
## What to do now?
現在,你應該試著繼續讀讀[User Guide](https://docs.celeryproject.org/en/stable/userguide/index.html#guide)。
如果你願意的話,也可以看一下[API reference](https://docs.celeryproject.org/en/stable/reference/index.html#apiref)。
## History
20211206_依據5.2版本說明調整