# Celery_Calling Tasks
###### tags: `celery` `celery 5.2` `python`
[官方連結_Calling Tasks](https://docs.celeryproject.org/en/stable/userguide/calling.html)
## Calling Tasks
### Basics
這份文件說明task instances與[canvas](https://docs.celeryproject.org/en/stable/userguide/canvas.html#guide-canvas)所使用的"Calling API"。
API定義了一組標準的執行選項以及三種方法:
1. `apply_async(args[, kwargs[, …]])`
* 派送任務訊息
2. `delay(*args, **kwargs)`
* 派送任務訊息的快捷方式,但不支援執行選項的設置
3. `calling (__call__)`
* 應用物件(object)支援Calling API,代表任務並不會被worker執行,而是在當前程序執行(current process)。(沒有訊息被派送)
:::info
**Quick Cheat Sheet**
1. `T.delay(arg, kwarg=value)`
* Star arguments shortcut to `.apply_async. (.delay(*args, **kwargs) calls .apply_async(args, kwargs))`.
2. `T.apply_async((arg,), {'kwarg': value})`
3. `T.apply_async(countdown=10)`
* 現在開始10秒內執行
4. `T.apply_async(eta=now + timedelta(seconds=10))`
* 現在開始10秒內執行,指定使用eta
5. `T.apply_async(countdown=60, expires=120)`
* 現在開始1分鐘內執行,但2分鐘之後到期
6. `T.apply_async(expires=now + timedelta(days=2))`
* 使用[datatime](https://docs.python.org/dev/library/datetime.html#datetime.datetime)設置,2天後到期
:::
#### Example
使用[delay()](https://docs.celeryproject.org/en/stable/reference/celery.app.task.html#celery.app.task.Task.delay)很方便,它看起來就像是使用一個再普通不過的函數:
```python
task.delay(arg1, arg2, kwarg1='x', kwarg2='y')
```
使用[apply_async()](https://docs.celeryproject.org/en/stable/reference/celery.app.task.html#celery.app.task.Task.apply_async)來代替的話可以這樣寫:
```python
task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})
```
因此,使用`delay`是很方便沒錯,不過如果你要設置額外的執行選項的話,就必需使用`apply_async`。
這文件的其餘部份會直接進入任務執行選項的細部設置。所有的範例均使用一個名為`add`的任務,函數會回傳兩個參數的加總:
```python
@app.task
def add(x, y):
return x + y
```
:::info
**There’s another way…**
後面你在讀Canvas的時候會有更深的體驗,不過signature(簽章)是用於傳遞任務調用簽章的對象(例如,透過網路派送),而且它們也支援Calling API:
```python
task.s(arg1, arg2, kwarg1='x', kwargs2='y').apply_async()
```
:::
### Linking (callbacks/errbacks)
Celery支援將任務連結在一起,一個跟著一個這樣。這種callback task(回呼任務)會伴隨著父任務的結果做為局部參數(partial argument),如下說明:
```python
add.apply_async((2, 2), link=add.s(16))
```
第一個任務(父任務)的回傳值4將會被送去新任務,並將16加到第一個任務的回傳值4,最後得到(2+2)+16=20。
如果任務拋出異常(errbacks),你也可以觸發回呼函數(callback)。不過worker並不會真正的去呼叫這個errback來做為任務,而是會直接呼叫errback fucntion,這樣就可以把原始的請求(request)、例外(exception)與回溯(trackback)物件傳遞給它。
下面給出`error callback`的範例:
```python
@app.task
def error_handler(request, exc, traceback):
print('Task {0} raised exception: {1!r}\n{2!r}'.format(
request.id, exc, traceback))
```
可以使用`link_error`這個執行選項將之加入任務中:
```python
add.apply_async((2, 2), link_error=error_handler.s())
```
此外,`link`與`link_error`都可以使用`list`來表示:
```python
add.apply_async((2, 2), link=[add.s(16), other_task.s()])
```
以`list`來表示的時候,callbacks與errbacks都會被按序執行,而且所有的回呼函數都會使用父任務的回傳值做為局部參數。
:::info
**What’s s?**
上面用的`add.s`就是一種簽章(signature)。如果你不知道這是什麼的話,後續可以讀讀[canvas guide](https://docs.celeryproject.org/en/stable/userguide/canvas.html#guide-canvas)。在那邊你還可以學到一些關於[chain](https://docs.celeryproject.org/en/stable/reference/celery.html#celery.chain):一種將任務鏈結起來的簡單方法。
:::
### On message
Celery支援使用`on_message` callback來補捉所有狀態變化。
舉例來說,如果你想要一個長時間執行的任務發送它的任務進度的話,可以這麼做:
```python
@app.task(bind=True)
def hello(self, a, b):
time.sleep(1)
self.update_state(state="PROGRESS", meta={'progress': 50})
time.sleep(1)
self.update_state(state="PROGRESS", meta={'progress': 90})
time.sleep(1)
return 'hello world: %i' % (a+b)
```
```python
def on_raw_message(body):
print(body)
a, b = 1, 1
r = hello.apply_async(args=(a, b))
print(r.get(on_message=on_raw_message, propagate=False))
```
會得到下面的結果:
```jsonld
{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
'result': {'progress': 50},
'children': [],
'status': 'PROGRESS',
'traceback': None}
{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
'result': {'progress': 90},
'children': [],
'status': 'PROGRESS',
'traceback': None}
{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
'result': 'hello world: 10',
'children': [],
'status': 'SUCCESS',
'traceback': None}
hello world: 10
```
### ETA and Countdown
ETA(estimated time of arrival)<sub>(預計到達時間)</sub>允許你可以設置指定的日期與時間,這是任務執行的最早時間。`countdown`是一種以秒為單位設置ETA的快捷方式。
```python
# 下面範例設置3秒後執行
>>> result = add.apply_async((2, 2), countdown=3)
>>> result.get() # this takes at least 3 seconds to return
20
```
任務保證在指定日期與時間之後的某個時間被執行,但不一定在那個確切的時間。當然有很多的原因會造成你無法在時間內執行,像是隊列中還有許多項目正排隊等待,或網路負載延遲。為了確認你的任務能夠被及時處理,你應該監視隊最是否擁塞。使用Munin或類似的工具,接收警示,再採取適合的調整來減輕工作量。參閱[Mnium](https://docs.celeryproject.org/en/stable/userguide/monitoring.html#monitoring-munin)
雖然`countdown`是一個整數,不過`eta`必需要是[datatime](https://docs.python.org/dev/library/datetime.html#datetime.datetime),指定確切的日期與時間(包含毫秒精度以及時區):
```python
>>> from datetime import datetime, timedelta
>>> tomorrow = datetime.utcnow() + timedelta(days=1)
>>> add.apply_async((2, 2), eta=tomorrow)
```
:::warning
Warning:
當你使用RabbitMQ做為message broker的時候,你指定一個`countdown`超過15分鐘,你可能會遇到worker拋出一個[PreconditionFailed](https://docs.celeryproject.org/projects/amqp/en/latest/reference/amqp.exceptions.html#amqp.exceptions.PreconditionFailed)的異常:
```
amqp.exceptions.PreconditionFailed: (0, 0): (406) PRECONDITION_FAILED - consumer ack timed out on channel
```
RabbitMQ在3.8.15之後,其`consumer_timeout`預設為15分鐘。而在3.8.17之後預設為30分鐘。如果consumer沒有確認其交付(delivery)超過timeout value,那就會關閉它的channel,然後拋出一個異常`PRECONDITION_FAILED`。更多細節請參閱[Delivery Acknowledgement Tiemout](https://www.rabbitmq.com/consumers.html#acknowledgement-timeout)。
想要解決這個問題,就是要在RabbitMQ的配置文件`rabbitmq.conf`中指定參數`consumer_timeout`的值要大於等於你的countdown value(倒數計時的值)。舉例來說,你可以指定一個很大大大大的值,`consumer_timeout = 31622400000`(毫秒計算來看,這是1年的值)來避免後續有出問題。
:::
### Expiration
參數`exipres`定義了一個可選的到期時間,可以是任務派送之後幾秒,或是使用[`datatime`](https://docs.python.org/dev/library/datetime.html#datetime.datetime)指定日期與時間
```python
>>> # Task expires after one minute from now.
>>> add.apply_async((10, 10), expires=60)
>>> # Also supports datetime
>>> from datetime import datetime, timedelta
>>> add.apply_async((10, 10), kwargs,
... expires=datetime.now() + timedelta(days=1)
```
當一個worker接收到一個過期任務的時候,它會將該任務標記為[REVOKED](https://docs.celeryproject.org/en/stable/userguide/tasks.html#std-state-REVOKED)[TaskRevokedError](https://docs.celeryproject.org/en/stable/reference/celery.exceptions.html#celery.exceptions.TaskRevokedError)
### Message Sending Retry
在連接失敗的時候Celery會自動重新派送訊息,而且你是可以設置這種重新測試的行為,像是多久重新測試,最大重新測試次數,或是通通設置為disable。
要disabled重新測試,你可以設置`retry=False`:
```python
add.apply_async((2, 2), retry=False)
```
:::info
**Related Settings**
* [task_publish_retry](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-task_publish_retry)
* [task_publish_retry_policy](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-task_publish_retry_policy)
:::
#### Retry Policy
重新測試的策略是一種著控制重新測試派送任務行為的映射,可以包含下面幾個關鍵:
* max_retries
* 放棄之前的最大重新測試次數,在這情況下會拋出重新測試異常
* 設置為`None`則代表會一直重新測試
* 預設值為3
* interval_start
* 定義重新測試之間的等待秒數(浮點數或整數),預設為0(第一次重測是瞬間的)
* interval_step
* 在每次連續重新測試的時候,這個數值會被加到重新測試延值(點數或整數)
* 預設值為0.2
* interval_max
* 重新測試之間的等待最大秒數(浮點數或整數)
* 預設值為0.2
舉例來說,預設的重新測試策略為:
```python
add.apply_async((2, 2), retry=True, retry_policy={
'max_retries': 3,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.2,
})
```
重新測試所花的最長時間為0.4秒。預設情況下相對較短,因為如果broker連接中斷,連接失敗可能會導致堆疊效應。舉例來說,許多web server程序等待重新測試,阻止了其它傳入的請求。
### Connection Error Handling
當你派送一個任務,並且該訊息傳輸連接遺失了,或連接無法被啟動,會拋出`OperationalError`異常:
```python
>>> from proj.tasks import add
>>> add.delay(2, 2)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "celery/app/task.py", line 388, in delay
return self.apply_async(args, kwargs)
File "celery/app/task.py", line 503, in apply_async
**options
File "celery/app/base.py", line 662, in send_task
amqp.send_task_message(P, name, message, **options)
File "celery/backends/rpc.py", line 275, in on_task_call
maybe_declare(self.binding(producer.channel), retry=True)
File "/opt/celery/kombu/kombu/messaging.py", line 204, in _get_channel
channel = self._channel = channel()
File "/opt/celery/py-amqp/amqp/connection.py", line 272, in connect
self.transport.connect()
File "/opt/celery/py-amqp/amqp/transport.py", line 100, in connect
self._connect(self.host, self.port, self.connect_timeout)
File "/opt/celery/py-amqp/amqp/transport.py", line 141, in _connect
self.sock.connect(sa)
kombu.exceptions.OperationalError: [Errno 61] Connection refused
```
假如你啟用重新測試,那就只會在重新測試次數耗盡或立即禁用的時候才會發生。
你也可以這麼處理錯誤:
```python
>>> from celery.utils.log import get_logger
>>> logger = get_logger(__name__)
>>> try:
... add.delay(2, 2)
... except add.OperationalError as exc:
... logger.exception('Sending task raised: %r', exc)
```
### Serializers
資料在clients與workers之間的傳輸是需要被序列化的,因此所有在Celery的訊息都有著`content_type`的表頭(header),它描述著序列化的編碼方式。
預設序列化方式是`JSON`,但是你可以使用[task_serializer](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-task_serializer)來改變,或是每次任務單獨的設置,甚至可以細到每個訊息的設置。
內建支援json、pickle、YAML、以及msgpack,你可以透過Kombu來自定義序列化方式。
:::info
See also:
[Message Serialization](https://docs.celeryproject.org/projects/kombu/en/master/userguide/serialization.html#guide-serialization)
:::
:::info
**Security**
pickle這個模組允許可以執行任意的函數,請參閱[security guide](https://docs.celeryproject.org/en/stable/userguide/security.html#guide-security)。
Celery自帶一個特殊的序列化方式,可以針對你所派送的訊息做加密簽章的處理。
:::
每個序列化方式都有它的優缺點:
* json
* JSON支援很多程式語言,是Python的標準庫,編碼相當快速,如[simplejson](https://pypi.org/project/simplejson/)。
* JSON的主要缺點就是它限制了你使用下列資料類型(Decimals與dates很明顯的是沒有的)
* strings
* Unicode
* floats
* Boolean
* dictionaries
* lists
* 二進位資料(Binary data)會使用Base64編碼,與所支援的原生二進位類型的編碼格式相比,傳輸數據量增加了34%
* 如果你的資料符合上述限制,而且你有跨語言支援需求,JSON的預設設置也許是你的最好選擇
* 更多信息請參閱[http://json.org](https://www.json.org/json-en.html)
:::info
**Note:**
從Python[官方文件](https://docs.python.org/3.6/library/json.html))資料來看,其JSON的鍵值結終為字串(str)類型。當你把dictionary轉為JSON的時候,所有的`keys`(鍵值)也都會強制轉為字串。所以你如果轉來轉去的把dict轉JSON,再從JSON轉回dict的話,那它可能就不是原本的dict。簡單來說就是你的dict如果有非字串類型的鍵值,那轉回去就不會是原始的dict。
:::
* pickle
* 如果你沒有支援Python以外的語言需求,使用pickle將擁有所有Python內建資料類型的支援(class instance除外),派送binary files時更小的訊息,以及較JSON快一些的處理速度
* 更多信息請參閱[pickle](https://docs.python.org/dev/library/pickle.html#module-pickle)
* yaml
* YAML有著許多與json相同的特性,不過YAML支援更多的資料類型(包含日期、遞歸引用..等)
* Python的標準庫中,YAML執行效率較JSON慢一些些
* 如果你需要更豐富的資料類型以及保持跨語言的兼容性,那YAML會較上述更適合。
* 更多信息請參閱[http://yaml.org](http://yaml.org/)
* msgpack
* msgpack是一個二進位序列化格式,功能面類似於JSON。還有新,剛出的,所以應該將它視為實驗性的會比較好
* 更多信息請參閱[http://msgpack.org/](http://msgpack.org/)
使用的編碼方式可以做為訊息的表頭(message header),這樣worker就知道對任務做怎麼樣的反序列化。如果你使用自定義的序列化方式,這序列化方式必須可以提供worker使用。
下面順序用於決定派送任務的時候所使用的序列化方式:
1. The `serializer execution` option
2. The [`Task.serializer`](https://docs.celeryproject.org/en/stable/reference/celery.app.task.html#celery.app.task.Task.serializer) attribute
3. The [`task_serializer`](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-task_serializer) setting
下面說明為單個任務設置自定義序列化函式:
```python
>>> add.apply_async((10, 10), serializer='json')
```
### Compression
Celery可以使用下面內建方案來壓縮訊息:
* brotli
* brotli針對網頁進行了最佳化,尤其是小型文字類型的文件( text documents)。對於提供靜態內容,像是字型與html頁面最有效
* 使用它,安裝Celery的時候記得如下安裝:
```shell
$ pip install celery[brotli]
```
* bzip2
* bzip2可以建立比gzip更小的檔案,但壓縮與解壓縮的速度明顯比gzip還要慢
* 要使用它,請確保你的Python支援執行文件是使用bzip2編譯的
* 如果你得到下面異常[`ImportError`](https://docs.python.org/dev/library/exceptions.html#ImportError),這意味著你應該重新編譯你的Python支援的bzip2版本
```python
>>> import bz2
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
ImportError: No module named 'bz2'
```
* gzip
* gzip適用於記憶體空間較小的系統,適合用於記憶體有限的系統。它通常被用來產生副檔名為`.tar.gz`的檔案
* 要使用它,請確保你的Python支援執行文件是使用gzip編譯的
* 如果你得到下面異常[`ImportError`](https://docs.python.org/dev/library/exceptions.html#ImportError),這意味著你應該重新編譯你的Python支援的bzip2版本
```python
>>> import gzip
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
ImportError: No module named 'gzip'
```
* lzma
* lzma提供了良好的壓縮比並且有更快的壓縮、解壓縮執行效率,只是這也造成記憶體用量更高
* 要使用它,請確保你的Python支援執行文件是使用lzma編譯的,而且你的Python是3.3或更新的版本
* 如果你得到下面異常[`ImportError`](https://docs.python.org/dev/library/exceptions.html#ImportError),這意味著你應該重新編譯你的Python支援的lzma版本
```python
>>> import lzma
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
ImportError: No module named 'lzma'
```
* 你也可以如下安裝:
```shell
$ pip install celery[lzma]
```
* zlib
* zlib是庫形式的Deflate演算法的抽象,它包括對gzip文件格式和API中的輕量級串流格式的支援。它是許多軟體系統的關鍵組成,像Linux kernel、Git VCS..
* 要使用它,請確保你的Python支援執行文件是使用zlib編譯的
* * 如果你得到下面異常[`ImportError`](https://docs.python.org/dev/library/exceptions.html#ImportError),這意味著你應該重新編譯你的Python支援的lzma版本
```python
>>> import zlib
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
ImportError: No module named 'zlib'
```
* zstd
* zstd以zlib-level的即時壓縮以及更好的壓縮比為目標。由Huff0與FSE兩個libary所提供的非常快速的entropy stage的支持
* 使用它,安裝Celery的時候記得如下安裝:
```shell
$ pip install celery[zstd]
```
你也可以自定義壓縮方案,並且在[kombu.compression.register](https://docs.celeryproject.org/projects/kombu/en/master/reference/kombu.compression.html#kombu.compression.register)中註冊它。
下面順序用於派送任務的時候決定壓縮的方式:
1. The compression execution option
2. The `Task.compression` attribute
3. The [task_compression](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-task_compression) attribute
下面範例說明在呼叫任務的時候指定壓縮方式:
```python
>>> add.apply_async((2, 2), compression='zlib')
```
### Connections
你可以通過建立publisher來手動處理連接的部份:
```python
results = []
with add.app.pool.acquire(block=True) as connection:
with add.get_publisher(connection) as publisher:
try:
for args in numbers:
res = add.apply_async((2, 2), publisher=publisher)
results.append(res)
print([res.get() for res in results])
```
雖然說這個特殊的例子用`group`的話可以有更好的表示方式:
```python
>>> from celery import group
>>> numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]
>>> res = group(add.s(i, j) for i, j in numbers).apply_async()
>>> res.get()
[4, 8, 16, 32]
```
:::info
**Automatic Pool Support**
從版本2.3之後支援`automatic connection pools`,因此你不需要手動處理連接和發行者以重用連接。
`connection pools`從版本2.5之後預設狀態為啟用。
見[broker_pool_limit](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-broker_pool_limit)說明
:::
### Routing options
Celery可以路由任務到不同的隊列。
簡單的路由(name<->name)可以使用選項`queue`來完成:
```python
add.apply_async(queue='priority.high')
```
然後,你可以利用參數`-Q`來指派worker讓它加入相對應設置的隊列名稱(`celery, priority.high`):
```shell
$ celery -A proj worker -l INFO -Q celery,priority.high
```
:::info
**See also:**
不建議在程式碼程以硬編碼的方式來加入隊列,最好的方法是利用配置路由來設置。([task_routes](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-task_routes))
更多路由可以參閱[Routing Tasks](https://docs.celeryproject.org/en/stable/userguide/routing.html#guide-routing)
:::
### Results options
你可以利用設置[task_ignore_result](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-task_ignore_result)或選項(`ignore_result`)來決定是否保存任務執行結果
```python
>>> result = add.apply_async((1, 2), ignore_result=True)
>>> result.get()
None
>>> # Do not ignore result (default)
...
>>> result = add.apply_async((1, 2), ignore_result=False)
>>> result.get()
3
```
如果你想要額外保存任務的元資料(metadata)在result backend,那就記得將[result_extended](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-result_extended)設置為True。
:::info
**See also:**
更多任務說明請參閱[Tasks](https://docs.celeryproject.org/en/stable/userguide/tasks.html#guide-tasks)
:::
#### Advanced Options
這些選項是提供給應用AMQP完整路由功能的進階用戶,有興趣的話可以閱讀[routind guide](https://docs.celeryproject.org/en/stable/userguide/routing.html#guide-routing)
* excnahge
* 要將訊息派送到的那個交換機的名稱(或[kombu.entity.Exchange](https://docs.celeryproject.org/projects/kombu/en/master/reference/kombu.html#kombu.Exchange))
* routing_key
* routing key
* 用來確定的routing key
* priority
* 0-255的數值,255代表最高優先(RabbitMQ)
* Redis的話則0為最高優化
## History
20190725_依據4.3版本說明調整
20211210_依據5.2版本說明調整