# Celery_Tasks
###### tags: `celery` `celery 5.2` `python`
[官方連結_Tasks](https://docs.celeryproject.org/en/stable/userguide/tasks.html)
## Tasks
`Tasks`是Celery應用程式的構件(building block)。
`task`是一個類別,可以被外部任何可調用對象建立。它扮演兩種角色,它定義了當`task`被呼叫時(派送訊息)發生的事情,以及當worker收到該訊息時會發生什麼。
每個`task`都會有一個唯一的名稱,並且在訊息中(messages)被參照引用,如此worker便可以找到正確的函數並執行。
`task message`(任務訊息)會一直到得到worker的回應([acknowledged](https://docs.celeryproject.org/en/stable/glossary.html#term-acknowledged)))之後才從隊列中移除。一個worker可以提前預訂很多任務訊息,即使因為停電或其原因導致它被關閉了,這些任務訊息依然可以被轉派至其它worker身上。
理想情況下`task functions`(任務函數)應該是鑑一性(冪等)<sub>(多次執行結果依然不變)</sub>([idempotent](https://docs.celeryproject.org/en/stable/glossary.html#term-idempotent))的,意思是指即使使用相同的參數多次呼叫,該函數也不會導致意外的影響。由於worker並不能判斷任務是否為鑑一性,預設情況下是在任務被執行之前先提前回應訊息,避免執行中的任務重覆執行。
如果你的`task`是鑑一性(idempotent),你可以設置[acks_late](https://docs.celeryproject.org/en/stable/userguide/tasks.html#Task.acks_late)這個選項讓worker在任務回傳之後確認消息。參考[FAQ:Should I use retry or acks_late](https://docs.celeryproject.org/en/stable/faq.html#faq-acks-late-vs-retry)
注意到,假如執行任務的子程序被終止(或呼叫[sys.exit()](https://docs.python.org/dev/library/sys.html#sys.exit),或通過信號(signal)),那麼worker將會確認訊息,即使你已經設置`acks_late=enable`,這種行為的目的是:
1. 我們並不想重新執行任務,這會強制kernel發送SIGSEGV(segmentation fault,區段錯誤)或類似信號給process
2. 我們假設系統管理員是特地刪除該任務,當然就不希望它自動重新啟動
3. 分配過多的記憶體的任務有觸發kernel OOM-Killer(Out Of Memory killer)的危險,同樣可能再次的發生
4. 當重新傳送的任務始終失敗的話,那就可能會導致高頻訊息息環路(high-frequency message loop)關閉系統
這種情況下如果你真的希望重新派送任務,那你應該考慮設置[task_reject_on_worker_lost](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-task_reject_on_worker_lost)。
:::danger
Warning:
如果一個任務無限期地阻塞,最終也許會讓worker無法執行其它工作。
如果你的任務做的是I/O類的工作,那麼請確保你對這些操作是有加入`timeout`的,例如,使用`requests`這個library,在對網頁發出請求的時候加入`timeout`:
```python=
connect_timeout, read_timeout = 5.0, 30.0
response = requests.get(URL, timeout=(connect_timeout, read_timeout))
```
[Time limits](https://docs.celeryproject.org/en/stable/userguide/workers.html#worker-time-limits)對於確認所有的任務及時回傳(return)是非常方便的,不過`time limit`這個事件會實際的強制刪除process,因此只能用它們來檢測尚未使用手動超時的情況。
之前版本的話,預設的`perfork pool scheduler`對執行時間長的任務並不是那麼友善,因此,如果你有任務的執行時間是幾分鐘/幾小時話,那就建議你啟用celery worker的[-Ofair](https://docs.celeryproject.org/en/stable/reference/cli.html#cmdoption-celery-worker-O)。不過,在4.0版之後[-Ofair](https://docs.celeryproject.org/en/stable/reference/cli.html#cmdoption-celery-worker-O)已經是預設的排程策略(scheduling strategy)。更多資訊可參考[Prefetch Limits](https://docs.celeryproject.org/en/stable/userguide/optimizing.html#optimizing-prefetch-limit),關於把執行時間長、短的任務路由到專用worker的最佳效能請參考[Automatic routing](https://docs.celeryproject.org/en/stable/userguide/routing.html#routing-automatic)。
如果你的worker停擺,在提交問題之前請調查正在執行的任務狀況,最有可能引起停擺的原因是因為一個或多個任務因為網路操作問題而停擺。
:::
### Basic
透過裝飾器[app.task()](https://docs.celeryproject.org/en/stable/reference/celery.html#celery.Celery.task),你可以很輕易的從任何可呼叫的函數建立任務:
```python
from .models import User
@app.task
def create_user(username, password):
User.objects.create(username=username, password=password)
```
`app.task()`有很多的[`option`](http://docs.celeryproject.org/en/master/userguide/tasks.html#task-options)可以設置,這些選項可以做為參數設置在裝飾器上:
```python
@app.task(serializer='json')
def create_user(username, password):
User.objects.create(username=username, password=password)
```
:::info
**How do I import the task decorator? And what’s "app"?**
這個裝飾器在你建立的Celery application instance也是可以用的,如果你不知道我在說什麼,那就去看一下[First Steps with Celery](https://hackmd.io/@shaoeChen/BJ2tEIjtt)。
如果你正在使用Django,那我就沒有翻譯了,因為我用Flask。
:::
:::info
**Multiple decorators**
當你有多個裝飾器要跟`app.task()`這個裝飾器一起使用的時候,你必須要確認裝飾器`app.task()`是最後一個被執行的(這很怪,在Python這意味著它必須是列表的第一個,這句是官方文件說的,不是我說的):
```python
@app.task
@decorator2
@decorator1
def add(x, y):
return x + y
```
:::
:::info
**How do I import the task decorator? And what's "app"?**
裝飾器`task()`在Celery應用程式實例是可用的,如果您不知道這是什麼,請參考[`First Steps with Celery`](http://docs.celeryproject.org/en/master/getting-started/first-steps-with-celery.html#first-steps)
如果你正使用Django(見[`First Steps with Django`](http://docs.celeryproject.org/en/master/django/first-steps-with-django.html#django-first-steps)),或者你是某一個套件的作者,那你可能想用裝飾器[`shared_task()`]
```python
from celery import shared_task
@shared_task
def add(x, y):
return x + y
```
:::
#### Bound task
綁定任務意味著任務的第一個參數始終會是任務實務(task instance),也就是self,就跟Python綁定方法(method)一樣:
```python=
logger = get_task_logger(__name__)
@app.task(bind=True)
def add(self, x, y):
logger.info(self.request.id)
```
重測任務(使用[app.Task.retry()](https://docs.celeryproject.org/en/stable/reference/celery.app.task.html#celery.app.task.Task.retry))、訪問關於當前任務請求的信息,以及添加到自定義任務基類的任何附加功能,都需要綁定`task`。
#### Task inheritance
裝飾器`task`中的參數`base`所指的就是指定`task`的基類(base class):
```python
import celery
class MyTask(celery.Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
print('{0!r} failed: {1!r}'.format(task_id, exc))
@app.task(base=MyTask)
def add(x, y):
raise KeyError()
```
### Names
每一個任務都有一個獨一無二的名稱。
假如沒有明確的指定,那裝飾器`task`會自己產生一個給你,這個由`task`所產生的名稱會基於1)這定義這個`task`的模組與2)這個`task`所裝飾的function名稱。
下面範例為顯示的指定任務名稱:
```python
>>> @app.task(name='sum-of-two-numbers')
>>> def add(x, y):
... return x + y
>>> add.name
'sum-of-two-numbers'
```
最好的方式就是使用模組名稱做為命名空間,這可以避免任務名稱的重覆:
```python
>>> @app.task(name='tasks.add')
>>> def add(x, y):
... return x + y
```
你可以通過訪問它的屬性`name`來得到`task`的名稱:
```python
>>> add.name
'tasks.add'
```
上面的我們指定的名稱`tasks.add`方式就跟裝飾器自己產生的是一樣的模式,假設Python文件名稱為`tasks.py`:
```pyton
@app.task
def add(x, y):
return x + y
```
```shell
>>> from tasks import add
>>> add.name
'tasks.add'
```
#### Changing the automatic naming behavior
Version 4.0新增功能
部份情況下,預設的自動命名並不適合,考量不同模組中有很多的任務:
```
project/
/__init__.py
/celery.py
/moduleA/
/__init__.py
/tasks.py
/moduleB/
/__init__.py
/tasks.py
```
使用預設的自動命名,每一個任務都會有一個自動生成的名稱,像是`moduleA.tasks.taskA`, `moduleB.tasks.test`,等等。也許你希望將所有`task`名稱內的`tasks`排除掉。如剛剛所說,你是可以幫所有的任務顯示的指定任務名稱的,或者你可以覆寫[app.gen_task_name()](https://docs.celeryproject.org/en/stable/reference/celery.html#celery.Celery.gen_task_name)來改變自動生成名稱的行為。我們繼續用`celery.py`這個範例:
```python=
from celery import Celery
class MyCelery(Celery):
def gen_task_name(self, name, module):
if module.endswith('.tasks'):
module = module[:-6]
return super().gen_task_name(name, module)
app = MyCelery('main')
```
現在每個任務名稱看起來應該就是`moduleA.taskA`,`moduleA.taskB`。
:::warning
請確認你的[app.gen_task_name()](https://docs.celeryproject.org/en/stable/reference/celery.html#celery.Celery.gen_task_name)是一個純函數:意思是,相同的輸入總是會有相同的輸出。
:::
### Task Request
[app.Task.request](https://docs.celeryproject.org/en/stable/reference/celery.app.task.html#celery.app.task.Task.request)包含當前正在執行的任務的信息及狀態。
這個`request`定義了以下屬性:
* id:執行中任務的id,唯一值
* group:如果任務是屬於某個群組的話,那這個group就是任務群組([group](https://docs.celeryproject.org/en/stable/userguide/canvas.html#canvas-group))的id,唯一值
* chord:任務所屬的`chord`的id,唯一值(if the task is part of the header)
* correlation_id:用於重複資料刪除等內容的自定義ID
* args:Positional arguments
* kwargs:Keyword arguments
* origin:發送該任務的主機名稱(host name)
* retries:記錄當前任務已經被重覆執行次數。整數,從0開始計數
* is_eager:如果任務要直接在Client端執行而不是由worker執行的話,就設為`True`
* eta:任務的原始預計到達時間(ETA,estimated time of arrival)。UTC時間記錄(取決於[enable_utc](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-enable_utc)的設置)
* expires:任務的原始到期(失效)時間,UTC時間記錄。(取決於[enable_utc](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-enable_utc)的設置)
* hostname:執行任務的那個worker的節點名稱
* delivery_info:附加傳遞的信息。這是一個包含用來交付此任務的的`exchange`與`routing key`的映射資訊。例如[app.Task.retry](https://docs.celeryproject.org/en/stable/reference/celery.app.task.html#celery.app.task.Task.retry)是用於將任務重新派送到相同的目地隊列,這個dict中密鑰的可用性取決於所使用的broker
* reply-to:發送回覆的隊列名稱(例如,與RPC result backend一起使用)
* called_directly:如果這個任務不是由worker所執行處理,那flag就設置為true
* timelimit:該任務的時間限制,`tuple(soft, hard)`
* callbacks:如果任務成功回傳,後續要調用的簽章清單(list of signatures)
* errback:如果任務執行失敗,後續要調用的簽章清單(list of signatures)
* utc:如果調用的function已設置UTC enable,則設置為True。[enable_utc](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-enable_utc)
**New in version 3.1.**
* headers:與該任務訊息一起派送的訊息表頭(message headers)的映射(也能是`None`)
* reply_to:這跟上面的不一樣,要注意一下,要向那邊派送回覆(隊列名稱)
* correlation_id: 通常與任務的id相同,通常在amqp中用來跟蹤回覆的內容
**New in version 4.0.**
* root_id:這個任務所屬工作流(workflow)中第一個任務的id,唯一值(如果有在某一個工作流的話)
* parent_id:調用這個任務(兒子)的任務id(爸爸),唯一值,也就是爸爸的id
* chain:形成鏈結(chain)的那些任務的反向列表。這個列表(list)的最後一個項目就是當前任務成功之後的下一個任務。如果用任務協議(task protocol)的第一個版本,那鏈結任務(chain tasks)就會在`request.callbacks`中
**New in version 5.2.**
* properties:跟這個任務訊息一起接收到的訊息屬性的映射(可能是`None`或是`{}`)
* replaced_task_nesting:任務被替換的次數(如果有的話,可能為0)
#### Example
在上下文中訪問任務信息的範例:
```python
@app.task(bind=True)
def dump_context(self, x, y):
print('Executing task id {0.id}, args: {0.args!r} kwargs: {0.kwargs!r}'.format(
self.request))
```
參數`bind`意味著這函數將會是一個綁定方法(bound method),因此你可以觀察這個任務類型實例的屬性與方法。
### Logging
worker會自動幫你設置日誌記錄,或是你也可以自己手動配置。
celery中有一個特別的logger可以用,名稱為`celery.task`,你可以繼承這個logger來自動取得任務的名稱與id做為日誌的一部份。
最好做法是在模組頂部為所有任務建立一個通用的日誌記錄器(common logger):
```python=
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@app.task
def add(x, y):
logger.info('Adding {0} + {1}'.format(x, y))
return x + y
```
Celery使用Python的標準庫logger,相關文件可以參考[這裡](https://docs.python.org/dev/library/logging.html#module-logging)
你也可以用[print()](https://docs.python.org/dev/library/functions.html#print),因為寫入標準輸出/錯誤資訊的任何資料都會被重新導向日誌系統(你可以禁用,參考[worker_redirect_stdouts](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-worker_redirect_stdouts))
:::info
Note:
如果你是在你的任務或任務模組中的某個地方建立logger instance,那worker是不會更新重定向的(redirection)。
如果你想要將`sys.stdout`和`sys.stderr`重新定向到自定義的logger的話,那你就要手動啟用它,舉例來說:
```python
import sys
logger = get_task_logger(__name__)
@app.task(bind=True)
def add(self, x, y):
old_outs = sys.stdout, sys.stderr
rlevel = self.app.conf.worker_redirect_stdouts_level
try:
self.app.log.redirect_stdouts_to_logger(logger, rlevel)
print('Adding {0} + {1}'.format(x, y))
return x + y
finally:
sys.stdout, sys.stderr = old_outs
```
:::
:::info
Note:
如果有一個你所需要的特定的Celery logger沒有發送出日誌記錄,你應該確認logger是否有被正常的派送。下面的範例中,`celery.app.trace`有被啟用,因此`succeeded in`日誌可以被發送的。
```python
import celery
import logging
@celery.signals.after_setup_logger.connect
def on_after_setup_logger(**kwargs):
logger = logging.getLogger('celery')
logger.propagate = True
logger = logging.getLogger('celery.app.trace')
logger.propagate = True
```
:::
:::info
Note:
如果你想要完全的禁用Celery logging的配置,那你可以用[setup_logging](https://docs.celeryproject.org/en/stable/userguide/signals.html#std-signal-setup_logging)這個signal:
```python
import celery
@celery.signals.setup_logging.connect
def on_setup_logging(**kwargs):
pass
```
:::
#### Argument checking
*New in version 4.0.*
在你呼叫任務的時候,Celery會驗證傳遞過來的參數,就跟Python呼叫一般函數一樣:
```python=
>>> @app.task
... def add(x, y):
... return x + y
# Calling the task with two arguments works:
>>> add.delay(8, 8)
<AsyncResult: f59d71ca-1549-43e0-be41-4e8821a83c0c>
# Calling the task with only one argument fails:
>>> add.delay(8)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "celery/app/task.py", line 376, in delay
return self.apply_async(args, kwargs)
File "celery/app/task.py", line 485, in apply_async
check_arguments(*(args or ()), **(kwargs or {}))
TypeError: add() takes exactly 2 arguments (1 given)
```
你可以透過設置`task`的屬性[typing](https://docs.celeryproject.org/en/stable/reference/celery.app.task.html#celery.app.task.Task.typing)來關閉這種參數確認的機制:
```python
>>> @app.task(typing=False)
... def add(x, y):
... return x + y
# Works locally, but the worker receiving the task will raise an error.
>>> add.delay(8)
<AsyncResult: f59d71ca-1549-43e0-be41-4e8821a83c0c>
```
透過`typing=False`的設罝,在調用任務的時候就不會做任何的驗證
#### Hiding sensitive information in arguments
*New in version 4.0.*
使用[task_protocol](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-task_protocol)為2或更高版本(自4.0起為預設值)時,你可以使用`argsrepr`和`kwargsrepr`來調用參數,以此覆蓋掉日誌跟監視事件中`positional arguments`和`keyword arguments`的表示方式:
```python=
>>> add.apply_async((2, 3), argsrepr='(<secret-x>, <secret-y>)')
>>> charge.s(account, card='1234 5678 1234 5678').set(
... kwargsrepr=repr({'card': '**** **** **** 5678'})
... ).delay()
```
:::warning
Warning:
任何能夠直接從broker上看到任務訊息的方式,或是能夠以其它方式攔截任務訊息的人都還是可以看的到敏感信息。
基於這個原因,你應該盡可能的對敏感信息加密,或者這個範例中,信用卡號碼的實際數字可以被以加密的方式保存在你可以檢索和解密的安全地方。
:::
### Retrying
[app.Task.retry()](https://docs.celeryproject.org/en/stable/reference/celery.app.task.html#celery.app.task.Task.retry)可以用在重新執行任務上,例如在可恢復錯誤的情況下。
當你呼叫`retry`的時候,它將會送一條新的訊息並使用相同的`task-id`,並且確保將訊息送到與原始任務相同的隊列上。
當任務被重啟的時候,它也會被記錄為任務狀態,因此可以使用result instance來追蹤狀態(參考[States](https://docs.celeryproject.org/en/stable/userguide/tasks.html#task-states))
下面給出一個使用`retry`的範例:
```python=
@app.task(bind=True)
def send_twitter_status(self, oauth, tweet):
try:
twitter = Twitter(oauth)
twitter.update_status(tweet)
except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
raise self.retry(exc=exc)
```
:::info
Note:
呼叫[app.Task.retry()](https://docs.celeryproject.org/en/stable/reference/celery.app.task.html#celery.app.task.Task.retry)將會拋出例外,所有在`retry`之後的程式都不會被執行。這是[Retry](https://docs.celeryproject.org/en/stable/reference/celery.exceptions.html#celery.exceptions.Retry)的例外,它不是為了處理異常拋出的例外,而是為了作為semi-predicate來向worker表明要重啟任務,以便在啟用result backend情況下可以存儲正確的狀態。
這是正常的操作,除非將`retry`的參數`throw`設置為`False`,不然它就是會一直發生。
:::
裝飾器`task`的參數`bind=True`會綁定任務,並授權訪問本身(self,即task instance)。
參數`exc`用來傳遞使用日誌及保存任務結果時所產生的的例外信息。例外和回溯都將在任務狀態下可用(如果你有啟用result backend)
如果任務設置了`max_retries`,在超過最大重試次數的時候會重新引發當前異常,下面幾種狀況不會拋出例外:
* 未提供`exc`參數
* 這種情況下會拋出[MaxRetriesExceededError](https://docs.celeryproject.org/en/stable/reference/celery.exceptions.html#celery.exceptions.MaxRetriesExceededError)
* 目前沒有例外
* 如果沒有原始異常(original exception)可以被重新拋出,那就拋出參數`exc`設置的異常
```python
self.retry(exc=Twitter.LoginError())
```
#### Using a custom retry delay
當任務要被重新測試的時候,它可以在重新之前等待一段給定的時間之後再執行,預設的延遲時間設置由[defaul_retry_delay](https://docs.celeryproject.org/en/stable/reference/celery.app.task.html#celery.app.task.Task.default_retry_delay)定義。預設值為3分鐘。注意,單位是秒(int或是float)。
你也可以提供[retry()](https://docs.celeryproject.org/en/stable/reference/celery.app.task.html#celery.app.task.Task.retry)函數一個參數`countdown`來覆蓋掉上面的設置。
下面範例雖然設置了30分鐘,但是下面參數`countdown`設置了60秒,因此會在60秒之後重啟:
```python=
@app.task(bind=True, default_retry_delay=30 * 60) # retry in 30 minutes.
def add(self, x, y):
try:
something_raising()
except Exception as exc:
# overrides the default delay to retry after 1 minute
raise self.retry(exc=exc, countdown=60)
```
#### Automatic retry for known exceptions
*New in version 4.0.*
有時候你只是想在引發特定異常的時候再來重新啟動任務。
幸運的是,你可以在裝飾器[app.task()](https://docs.celeryproject.org/en/stable/reference/celery.html#celery.Celery.task)中利用參數`autoretry_for`告訴Celery什麼情況下要重新測試任務:
```python
from twitter.exceptions import FailWhaleError
@app.task(autoretry_for=(FailWhaleError,))
def refresh_timeline(user):
return twitter.refresh_timeline(user)
```
如果你想在[retry()](https://docs.celeryproject.org/en/stable/reference/celery.app.task.html#celery.app.task.Task.retry)內部調用中指定自定義參數,可以在裝飾器[app.task()](https://docs.celeryproject.org/en/stable/reference/celery.html#celery.Celery.task)中利用參數`retry_kwargs`設置
```python
@app.task(autoretry_for=(FailWhaleError,),
retry_kwargs={'max_retries': 5})
def refresh_timeline(user):
return twitter.refresh_timeline(user)
```
這提供了一個替代方案來手動處理例外異常,上述作法跟你用`try...except`一樣:
```python
@app.task
def refresh_timeline(user):
try:
twitter.refresh_timeline(user)
except FailWhaleError as exc:
raise div.retry(exc=exc, max_retries=5)
```
如果你只要有例外異常就要重試一次,那很簡單,只需要這麼做:
```python
@app.task(autoretry_for=(Exception,))
def x():
...
```
*New in version 4.2.*
`exponential backoff`:指數輪詢<sub>(微軟翻譯)</sub>,國家教育研究院譯為二進指數倒回
如果你的任務是取決於其它的服務之上,像是發出一個request給某一個API之類的,那使用`exponential backoff(二進指數倒回)`來避免對服務造成衝擊是一個不錯的方法。很幸運的,Celery的自動重試的支援讓這件事變的簡單。只需要指定參數[retry_backoff](https://docs.celeryproject.org/en/stable/userguide/tasks.html#Task.retry_backoff),像這樣:
```python
from requests.exceptions import RequestException
@app.task(autoretry_for=(RequestException,), retry_backoff=True)
def x():
...
```
預設情況下,二進指數倒回(exponential backoff)會引入隨機抖動([jitter](https://en.wikipedia.org/wiki/Jitter))來避免所有任務在同一時間執行。它將最大後退延遲限制為十分鐘。這些設置都可以利用下列參數來調控。
*New in version 4.4*
你也可以在`class-based`的任務中設置`autoretry_for, max_retries, retry_backoff, retry_backoff_max,retry_jitter`,等選項:
```python=
class BaseTaskWithRetry(Task):
autoretry_for = (TypeError,)
max_retries = 5
retry_backoff = True
retry_backoff_max = 700
retry_jitter = False
```
* Task.autoretry_for
* `list/tuple`的異常(例外)類別。如果在任務執行期間有任何清單內的異常被拋出,任務將自動重試。預設情況下不會有任何的異常會被拋出
* Task.max_retries
* 數值。放棄任務之前的最大重新測試次數。如果給個`None`就代表永遠不放棄,會一直測下去。預設情況下該設置為3
* Task.retry_backoff
* `boolean`或數值。如果這個選項設置為True,那自動重試將按二進指數倒回([exponential backoff](https://en.wikipedia.org/wiki/Exponential_backoff))規則延遲。第一次重試會延遲1秒,第二次重試就延遲2秒,第三次變延遲4秒,第四次就是延遲8秒,依此類推。(然而,如果啟用[retry_jitter](https://docs.celeryproject.org/en/stable/userguide/tasks.html#Task.retry_jitter),那就會修改此延遲數值。)。如果這個選項設置為數值,那就把它當做延遲因子。舉例來說,如果這個選項設置為3,那第一次重試就是延遲3秒,第二次就是延遲6秒,第三次就是延遲12秒,第四次就是延遲24秒,等等。預設情況下,這個選項是設置為False,並且不會延遲自動重試。
* Task.retry_backoff_max
* 數值。如果啟用`retry_backoff`,這個選項將以秒為單位來設置任務自動重試之間的最大延遲。該選項預設為600,相當於10分鐘。
* Task.retry_jitter
* `boolean`,[Jitter](https://en.wikipedia.org/wiki/Jitter)用來將隨機性引入二進指數倒回,以避免隊列中的所有任務同時執行。如果這個選項設置為True,那[retry_backoff](https://docs.celeryproject.org/en/stable/userguide/tasks.html#Task.retry_backoff)計算的延遲數值將被視為最大值,實際的延遲數值將是一個介於0與最大值的亂數。該選項預設為True
### List of Options
裝飾器`task()`可以有很多的選項來改變任務的行為,舉例來說,你可以使用`rate_limit`來設置任務的速度限制。
任何傳遞給裝飾器`task()`的`keyword argument`實際上都將設置為生成的`resulting task class`的屬性,這是內置屬性的清單。
#### General
* Task.name
* 任務註冊的名稱
* 你可手動設置也可以使用模組與類別名稱自動生成
* 參閱[Names](https://docs.celeryproject.org/en/stable/userguide/tasks.html#task-names)
* Task.request
* 如果任務正在執行中,那這邊包含的信息就會是寫於當前的請求(current request)。使用線程區域儲存區(Thread Local Storage,TLS)
* 參閱[Task Request](https://docs.celeryproject.org/en/stable/userguide/tasks.html#task-request-info)
* Task.max_retires
* 只有在任務呼叫`self.retry`或裝飾器`task()`使用參數[autoretry_for](https://docs.celeryproject.org/en/stable/userguide/tasks.html#task-autoretry)的時候適用
* 放棄任務前最大重新測試次數。如果重測次數大於這個數值,那就會拋出[MaxRetriesExceededError](https://docs.celeryproject.org/en/stable/reference/celery.exceptions.html#celery.exceptions.MaxRetriesExceededError)
:::info
Note:
你必須手動調用[retry()](https://docs.celeryproject.org/en/stable/reference/celery.app.task.html#celery.app.task.Task.retry),因為它不會在異常發生時自動重試
:::
* 預設值為3。如果設置為`None`,那任務就會一直試到人生成功為止
* Task.throws
* 預期錯誤並且不該被視為真正的錯誤類別
* 清單中的錯誤將被result backend記錄為失敗,但worker不會將該事件記錄為錯誤,而且不包含任何的回溯
```python
@task(throws=(KeyError, HttpNotFound)):
def get_foo():
something()
```
* Error types:
* 預期錯誤(存在`Task.throes`)
* logger層級為INFO,排除回溯(traceback)
* 非預期錯誤
* logger層級為ERROR,包含回溯(traceback)
* Task.default_retry_delay
* 重啟任務之前的延遲時間,單位為秒(`int, float`)。預設為延遲三分鐘。
* Task.rate_limit
* 設置該任務類型的速限(限制在時間範圍內可以執行該任數的數量),當速率限制生效的時候,任務依然會完成,但它可能需要花費一點時間才會開始執行。
* 如果是`None`那就代表沒有限制。如果設置`int`或是`float`則視為每秒任務數("tasks per second")
* 限制的部份可以透過在數值中附加`/s`, `/m`, `/h`來定義每秒、分、時,任務會在指定時間範圍內平均分配執行。
* 舉例說明:`100/m`,代表每分鐘100個任務。這將在同一個worker instance上啟動兩個任務之間最少600ms的延遲。
* 預設值為[task_default_rate_limit](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-task_default_rate_limit)的設置:如果沒有指定,那就預設為disabled任務的速限
* 這是每個worker instance的限制,而不是全域的限制,若要全域限制就需要將限制做在隊列上(如果你想限制每秒最大請求數之類的)
* Task.time_limit
* 針對該任務的時間上限(hard time),以秒為單位,若未設置則使用worker的預設值
* Task.soft_time_limit
* 針對該任務的時間下限(soft time),若未設置則使用worker的預設值
* Task.ignore_result
* 不保存任務的狀態。這代表你無法利用[AsyncResult](https://docs.celeryproject.org/en/stable/reference/celery.result.html#celery.result.AsyncResult)來確認任務是否就緒或取得其回傳值
* Task.store_errors_even_if_ignored
* 如果設置為True,即使任務的組態設置忽略結果也會保存錯誤
* Task.serializer
* 字串,標記預設要使用的序列化方法。預設取[task_serializer](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-task_serializer)的設置。可以使用`pickle`、`json`、`yaml`或自定義於`kombu.serialization.registry`的序列化方法
* 更多資訊請參考[Serializers](https://docs.celeryproject.org/en/stable/userguide/calling.html#calling-serializers)
* Task.compression
* 字串,標記預設要使用的壓縮方法
* 預設使用[task_compression](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-task_compression)的設置。可以使用`gzip`、`bzip2`或任何自定義於[kombu.compression](https://docs.celeryproject.org/projects/kombu/en/master/reference/kombu.compression.html#module-kombu.compression)的壓縮方法
* 更多請參考[`Compression`](https://docs.celeryproject.org/en/stable/userguide/calling.html#calling-compression)
* Task.backend
* 用於保存此任務的結果的backend。`celery.backends`其中一個後端類別的實例。預設為`app.backend`,由[result_backend](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-result_backend)所定義
* Task.acks_late
* 如果設置為True,那該任務在執行之後將會被確認,而不僅僅是執行之前(原文中寫著default behavior,不確定指只在執行之前被確認,還是執行之後會再被確認)。
* 注意:這意味著任務在執行過程中如果掛了,那可能會被多次執行(因為在執行之後才會被確認),因此,請確認你的任務是冪等(idempotent)的。
* 全域設置值是可以被[ask_acks_late](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-task_acks_late)的設置覆蓋掉的
* Task.track_started
* 如果設置為True,那麼,當worker在執行該任務的時候,該任務就會報告其狀態為"started"。預設置為False,因為預設情況下是不會回報到這麼細粒。任務的狀況要嘛掛著(pending)、要嘛等待(waiting)、要嘛就是重試(retried)。如果你有一個任務的執行時間很長,你又需要它告訴你它正在幹嘛的時候,那"started"這個狀態就可能不錯用
* 執行該任務的worker的host name與process id在狀態的meta-data中是可用的
* 全域設置值是可以被[`task_track_started`](http://docs.celeryproject.org/en/master/userguide/configuration.html#std:setting-task_track_started)的設置覆蓋掉的
### States
Celery可以持續的追蹤當前任務的狀態。狀態包含任務成功的結果以及失敗的異常、回溯信息。
有很多種的result backend可以選擇,每一種都有不同的優缺點(參考[Result Backends](https://docs.celeryproject.org/en/stable/userguide/tasks.html#task-result-backends))。
任務在其生命週期期間會轉換很多種狀態,而且每一種狀態都可以任意的附加元數據(meta-data)。當任務進入新狀態的時候,就會把舊的狀態給忘掉,但某些轉換是可以被推導出來的,像任務如果狀態如果是FAILED,那就代表它可能曾經在某個時間點是[STARTED](https://docs.celeryproject.org/en/stable/userguide/tasks.html#std-state-STARTED)。
還有一些狀態集合,像是FAILURE_STATES與[READY_STATES](https://docs.celeryproject.org/en/stable/reference/celery.states.html#std-state-READY_STATES)。
用戶端可以利用這些集合來決定是否要重新拋出異常[PROPAGATE_STATES](https://docs.celeryproject.org/en/stable/reference/celery.states.html#std-state-PROPAGATE_STATES)或是緩存(cache)狀態(如果任務已經就緒就可以緩存)
當然你也可以自定義狀態[Custom states](https://docs.celeryproject.org/en/stable/userguide/tasks.html#custom-states)
#### Result Backends
如果你想持續追蹤任務或需要它們的回傳值,那麼Celery必需在某處保存或發送狀態以便後續可以檢索它們。有幾個內置result backend可以選擇:SQLAlchemy/Django ORM,Memcached,RabbitMQ/QPid(rpc)以及Redis,或自定義。
沒有那一個backend可以適用任何一種狀況。你應該要瞭解每一個backend優缺點之後選擇一個最符合你需求的。
:::warning
Warning:
Backend使用資源來保存以及傳輸結果。為了確保資源的釋放,最終你必須在呼叫任務之後的每一個[AsyncResult](https://docs.celeryproject.org/en/stable/reference/celery.result.html#celery.result.AsyncResult)instance上調用[get()](https://docs.celeryproject.org/en/stable/reference/celery.result.html#celery.result.AsyncResult.get)或[forget()](https://docs.celeryproject.org/en/stable/reference/celery.result.html#celery.result.AsyncResult.forget)
:::
:::info
See also:
[Task result backend settings](https://docs.celeryproject.org/en/stable/userguide/configuration.html#conf-result-backend)]
:::
#### RPC Result Backend (RabbitMQ/QPid)
RPC result backend (rpc://)是一種特殊的Backend,它並沒有實際的保存狀態,而是將狀態做為訊息派送。這是一個很重要的差異,這代表其結果只能被檢索一次,而且只能被發起任務的client檢索。兩個不同的processes不能等待相同的結果。
即使有這些限制,如果你需要及時接收狀態的變化的話,它依然是一個非常好的選擇。使用訊息傳遞代表client不需要一直詢問目前狀態。
預設情況下訊息是非持久性的,所以只要broker重新啟動,所有的結果都會消失。你可以利用[result_persistent](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-result_persistent)的設置,讓Backend發送持久性訊息。
#### Database Result Backend
把狀態保存在資料庫對許多人來說都是比較方便的,特別是對於那些已經有在使用資料庫的網頁應用程式來說更是方便,但它依然有一些限制。
1. 一直對資料庫查詢狀態所付出的成本不小,因此應該增加查詢的間隔,如`result.get()`
2. 部份資料庫使用預設的交易隔離層級並不適合polling table
在MySQL中的預設transaction isolation level(交易隔離層級)是`REPEATABLE-READ`<sub>(讀取中資料會被鎖定,確保同一筆交易中的讀取資料必須相同)</sub>,這意味著在提交當前事務之前,事務不會看到其他事務所做的更改
建議調整為`READ-COMMITTED`
#### Built-in States
* PENDING
* 任務等待處理或未知狀態。任何未知的任務ID都是待處理狀態
* STARTED
* 任務開始。預設是不回報任務開始,需從[app.Task.track_started](https://docs.celeryproject.org/en/stable/reference/celery.app.task.html#celery.app.task.Task.track_started)設置。
* meta-data:執行該任務的worker的`pid`與`hostname`
* SUCCESS
* 任務已經被成功執行
* meta-data:`result`包含該任務的回傳值
* propagates:Yes
* ready:Yes
* FAILURE
* 任務執行結果失敗
* meta-data: 結果包含了異常,回溯包含引發異常的堆棧訊息
* propagates: Yes
* RETRY
* 任務正在重新啟動
* meta-data: 結果包含了引起重新啟動的異常,回溯包含引發異常的堆棧訊息
* propagates: No
* REVOKED
* 任務撤銷
* propagates: Yes
#### Custom states
你可以很輕易的自定義狀態,你需要的就只是一個『唯一的狀態名稱』。狀態的名稱通常使用大寫字串。作為範例,你可以看一下`abortable tasks`,其自定義中止任務為`ABORTED`
使用[update_state()`(https://docs.celeryproject.org/en/stable/reference/celery.app.task.html#celery.app.task.Task.update_state)來更新任務狀態:
```python
@app.task(bind=True)
def upload_files(self, filenames):
for i, file in enumerate(filenames):
if not self.request.called_directly:
self.update_state(state='PROGRESS',
meta={'current': i, 'total': len(filenames)})
```
上面範例,設置了一個名為`PROGRESS`的狀態,利用將目前以及總數做為一個元資料的部份,告知任務知道此狀態的應用程式當前正執行任務中進度,這可以被用來建立進度條。(flask_web作者有demo)
#### Creating pickleable exceptions
一個Python鮮為人知的事情是,異常必須符合一些簡單的規則,這樣就支援被`pickle`序列化。
當你用`Pickle`來做為序列化工具的時候,如果你的任務引發無法序列化的異常的話,那就會無法正常的作業。
為了確保你的例外(異常)是可序列化的(pickleable),異常必須提供它在`.args`屬性中實例化的原始參數。確保這一點最簡單的作法就是讓異常呼叫`Exception.__init__`。
見下面幾個範例:
```python
# OK:
class HttpError(Exception):
pass
# BAD:
class HttpError(Exception):
def __init__(self, status_code):
self.status_code = status_code
# OK:
class HttpError(Exception):
def __init__(self, status_code):
self.status_code = status_code
Exception.__init__(self, status_code) # <-- REQUIRED
```
規則是:對任何支援自定義的參數`*args`的異常,你必須使用`Exception.__init__(self, *args)`
對於`keyword arguments`並沒有特別的支援,如果你希望在反序列化的時候保留`keyword arguments`,那你就必需將它們當做一般參數來使用:
```python
class HttpError(Exception):
def __init__(self, status_code, headers=None, body=None):
self.status_code = status_code
self.headers = headers
self.body = body
super(HttpError, self).__init__(status_code, headers, body)
```
### Semipredicates
worker會把任務包裝在一個`tracing function`,以此記錄任務的最終狀態。有許多異常(exceptions)可以被用來向該函數發出信號,以此更改任務回傳的處理方式。
#### Ignore
任務也許會拋出[Igonre](https://docs.celeryproject.org/en/stable/reference/celery.exceptions.html#celery.exceptions.Ignore)來強制worker放棄該任務。意思就是說該任務不會有狀態會被記錄,但訊息依然會被確認(也就是訊息會被從隊列中移除)
如果你想要實現自定義一個類似撤銷的功能,或是手動保存任務結果,你就可以使用它。
範例,把撤銷的任務保存在Redis set中:
```python
from celery.exceptions import Ignore
@app.task(bind=True)
def some_task(self):
if redis.ismember('tasks.revoked', self.request.id):
raise Ignore()
```
範例,手動保存結果:
```python
from celery import states
from celery.exceptions import Ignore
@app.task(bind=True)
def get_tweets(self, user):
timeline = twitter.get_timeline(user)
if not self.request.called_directly:
self.update_state(state=states.SUCCESS, meta=timeline)
raise Ignore()
```
#### Reject
任務也許會用`AMQPs basic_reject`來拋出[Reject](https://docs.celeryproject.org/en/stable/reference/celery.exceptions.html#celery.exceptions.Reject),以此拒絕任務訊息。除非啟用[Task.acks_late](https://docs.celeryproject.org/en/stable/userguide/tasks.html#Task.acks_late),否則不會任何影響。
拒絕訊息(rejecting message)跟確認訊息(acking message)有相同的效果,但是一些brokers也許可以實現一些可用的額外功能。舉例來說,RabbitMQ支援[Dead Letter Exchanges](https://www.rabbitmq.com/dlx.html),其隊列(queue)設置之後,那些被拒絕訊息可以就傳遞到這裡。
Reject也可以用來把訊息重新寫入隊列,但要特別小心,一不注意就會造成無限循環。
範例,當任務導致記憶體不足的時候使用Reject:
```python=
import errno
from celery.exceptions import Reject
@app.task(bind=True, acks_late=True)
def render_scene(self, path):
file = get_file(path)
try:
renderer.render_scene(file)
# if the file is too big to fit in memory
# we reject it so that it's redelivered to the dead letter exchange
# and we can manually inspect the situation.
except MemoryError as exc:
raise Reject(exc, requeue=False)
except OSError as exc:
if exc.errno == errno.ENOMEM:
raise Reject(exc, requeue=False)
# For any other error we retry after 10 seconds.
except Exception as exc:
raise self.retry(exc, countdown=10)
```
範例,重新寫入隊列:
```python=
from celery.exceptions import Reject
@app.task(bind=True, acks_late=True)
def requeues(self):
if not self.request.delivery_info['redelivered']:
raise Reject('no reason', requeue=True)
print('received two times')
```
更多請參閱broker中`basic_reject`文件說明
#### Retry
[Retry](https://docs.celeryproject.org/en/stable/reference/celery.exceptions.html#celery.exceptions.Retry)是由`Task.retry`拋出,以此告知worker當前任務將重新啟動。
### Custom task classes
所有的任務都繼承自[app.Task](https://docs.celeryproject.org/en/stable/reference/celery.app.task.html#celery.app.task.Task)這個類別。而[run()](https://docs.celeryproject.org/en/stable/reference/celery.app.task.html#celery.app.task.Task.run)這個方法(method)則變成是任務的主體(task body)。
如下範例:
```python
@app.task
def add(x, y):
return x + y
```
原理大致如下:
```python
class _AddTask(app.Task):
def run(self, x, y):
return x + y
add = app.tasks[_AddTask.name]
```
#### Instantiation
任務並不是針對每個請求(request)yua 實例化, 而是註冊在任務註冊表作為全域的實例(global instance)。
這意思是指,`__init__`構造函數在每個process只會呼叫一次,並且任務類別在語義上更接近`Actor`。
假設你有一個任務,
```python
from celery import Task
class NaiveAuthenticateServer(Task):
def __init__(self):
self.users = {'george': 'password'}
def run(self, username, password):
try:
return self.users[username] == password
except KeyError:
return False
```
並且你會將每個請求路由到同一個process,然後它將會在請求之間保持狀態。
這對緩存(cache)資源也是有幫助的,舉例來說,任務基類緩存一個資料庫的連接:
```python
from celery import Task
class DatabaseTask(Task):
_db = None
@property
def db(self):
if self._db is None:
self._db = Database.connect()
return self._db
```
#### Per task usage
可以像下面這樣將上面說的加到每個任務中:
```python
@app.task(base=DatabaseTask)
def process_rows():
for row in process_rows.db.table.all():
process_row(row)
```
任務`process_rows`的屬性`db`在每一個process中都會是相同的。
#### App-wide usage
當你在實例外應用程式的時候,你還可以透過傳遞參數`task_cls`在整個Celery應用程式中使用自定義類別。這個參數應該是一個字串,給定任務類別或類別本身的Python路徑。
```python
from celery import Celery
app = Celery('tasks', task_cls='your.module.path:DatabaseTask')
```
這將使所有以裝飾器語句宣告的任務都使用你的`DatabaseTask`類別,並且都將擁有你自定義的`db`屬性。
預設值是Celery提供的類別,`celery.app.task:Task`
#### Handlers
* `before_start(self, task_id, args, kwargs)`
* 由worker在任務開始執行之前所執行
* *New in version 5.2.*
* Parameters
* task_id:預計要執行的task id,唯一值
* args:預計要執行的任務的原始參數
* kwargs:預計要執行的任務的關鍵字參數(keyword arguments)
* 忽略此處理程序的回傳值
* `after_return(self, status, retval, task_id, args, kwargs, einfo)`
* 任務回傳後呼叫的處理程序
* Parameters
* status:當前任務狀態
* retvalv:任務的回傳值或是異常
* task_id:task id,唯一值
* args:該回傳任務的原始參數
* kwargs:該回傳任務的關鍵字參數(keyword arguments)
* Keyword Arguments
* einfo:ExceptionInfo instance,包含了回溯(如果有的話)
* 忽略此處理程序的回傳值
* `on_failure(self, exc, task_id, args, kwargs, einfo)`
* 當任務失敗的時候,由worker執行此程序
* Parameters
* exc:由任務所引發的例外
* task_id:那個失敗任務的task id,唯一值
* args:那個失敗任務的原始參數
* kwargs:那個失敗任務的關鍵字參數(keyword arguments)
* Keyword Arguments
* einfo:ExceptionInfo instance,包含了回溯
* 忽略此處理程序的回傳值
* `on_retry(self, exc, task_id, args, kwargs, einfo)`
* 當任務要重啟的時候,由worker執行此程序
* Parameters:
* exc:發送到[retry()](https://docs.celeryproject.org/en/stable/reference/celery.app.task.html#celery.app.task.Task.retry)的例外
* task_id:那個重新測試的任務的task id,唯一值
* args:那個重新測試的任務的原始參數
* kwargs:那個重新測試的任務的關鍵字參數(keyword arguments)
* Keyword Arguments:
* einfo: ExceptionInfo instance,包含了回溯
* 忽略此處理程序的回傳值
* `on_success(self, retval, task_id, args, kwargs)`
* 如果任務執行成功,那就由worker執行此程序
* Parameters:
* retval:任務的回傳值
* task_id:該執行成功的任務的task id,唯一值
* args:該執行成功的任務的原始參數
* kwargs:該執行成功的任務的關鍵字參數(keyword arguments)
#### Requests and custom requests
在收到執行任務的訊息之後,worker會建立一個request來表示這種的需求(demand)。
自定義的任務類別可以通過改變屬性[celery.app.task.Task.Request](https://docs.celeryproject.org/en/stable/reference/celery.app.task.html#celery.app.task.Task.Request)來覆寫`request class`。除了可以指定自定義的`request class`之外,還可以完全限定名稱。
`request`有很多責任(responsibilities)。自定義的`request class`應該包含所有它們負責實際執行與追蹤的任務,我們強烈的建議直接繼承[celery.worker.request.Request](https://docs.celeryproject.org/en/stable/reference/celery.worker.request.html#celery.worker.request.Request)
當使用[pre-forking worker](https://docs.celeryproject.org/en/stable/userguide/workers.html#worker-concurrency)的時候,[on_timeout()](https://docs.celeryproject.org/en/stable/reference/celery.worker.request.html#celery.worker.request.Request.on_timeout)與[on_failure()](https://docs.celeryproject.org/en/stable/reference/celery.worker.request.html#celery.worker.request.Request.on_failure)兩個方法(methods)就會在主要的worker proces中執行,應用程式可以利用這類工具來檢測[celery.app.task.Task.on_failure()](https://docs.celeryproject.org/en/stable/reference/celery.app.task.html#celery.app.task.Task.on_failure)沒有檢測到的異常。
下面範例自定義`request`來偵測以及記錄hard time limits以及其它的失敗:
```python
import logging
from celery import Task
from celery.worker.request import Request
logger = logging.getLogger('my.package')
class MyRequest(Request):
'A minimal custom request to log failures and hard time limits.'
def on_timeout(self, soft, timeout):
super(MyRequest, self).on_timeout(soft, timeout)
if not soft:
logger.warning(
'A hard timeout was enforced for task %s',
self.task.name
)
def on_failure(self, exc_info, send_failed_event=True, return_ok=False):
super().on_failure(
exc_info,
send_failed_event=send_failed_event,
return_ok=return_ok
)
logger.warning(
'Failure detected for task %s',
self.task.name
)
class MyTask(Task):
Request = MyRequest # you can use a FQN 'my.package:MyRequest'
@app.task(base=MyTask)
def some_longrunning_task():
# use your imagination
```
### How it works
這邊是技術細節,並不一定要知道,但你可能會有興趣瞭解。
所有定義好的任務都會被條列到註冊表中。註冊表包含了任務名稱以及其任務類別。你可以自己看一下這個註冊表:
```python
>>> from proj.celery import app
>>> app.tasks
{'celery.chord_unlock':
<@task: celery.chord_unlock>,
'celery.backend_cleanup':
<@task: celery.backend_cleanup>,
'celery.chord':
<@task: celery.chord>}
```
這是寫入到Celery的任務清單。注意到,只有在導入定義任務的模組時才會註冊任務。
預設會載入[imports](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-imports)內的模組清單
裝飾器[app.task()](https://docs.celeryproject.org/en/stable/reference/celery.html#celery.Celery.task)負責在應用程式任務註冊表中註冊你的任務。
當任務被發送的時候並不會真的把函數的程式碼送出去,而是只有要被執行的那個任務的名稱被派送出去而以。當worker收到訊息的時候,它就會從任務註冊表中去找尋相對應的任務出來執行。
這意昧著你的worker與Client始終需要同步更新軟體。這是一個缺點,但是替代方案是尚未被解決的技術問題。
### Tips and Best Practices
#### Ignore results you don't want
如果你並不關心任務的結果,那就記得設置選項[ignore_result](https://docs.celeryproject.org/en/stable/reference/celery.app.task.html#celery.app.task.Task.ignore_result),因為保存結果是需要浪費時間跟資源的。
```python=
@app.task(ignore_result=True)
def mytask():
something()
```
你也可以用[task_ignore_result](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-task_ignore_result)來做全域設置。
當呼叫`apply_async`或`delay`的時候可以透過傳遞參數`ignore_result`(boolean),在每次執行的基礎上啟用或禁止`Results`。
```python=
@app.task
def mytask(x, y):
return x + y
# No result will be stored
result = mytask.apply_async(1, 2, ignore_result=True)
print result.get() # -> None
# Result will be stored
result = mytask.apply_async(1, 2, ignore_result=False)
print result.get() # -> 3
```
預設情況下,如果你有設置result backend的話,那就不會放棄保存結果的(`ignore_result=False`)
選項優先順序如下:
1. 全域[task_ignore_result](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-task_ignore_result)
2. 參數選項[ignore_result](https://docs.celeryproject.org/en/stable/reference/celery.app.task.html#celery.app.task.Task.ignore_result)
3. 每次執行任務設置`ignore_result`
#### More optimization tips
更多優化技巧請參閱[Opimizing Guide](https://docs.celeryproject.org/en/stable/userguide/optimizing.html#guide-optimizing)
#### Avoid launching synchronous subtasks
讓一個任務等待另一個任務的結果是非常沒有營養的,甚至也可能會因為worker pool耗盡而造成死結。
把你的任務設計成異步(asynchornous),像是使用callback:
這是一個不好的範例:
```python=
@app.task
def update_page_info(url):
page = fetch_page.delay(url).get()
info = parse_page.delay(url, page).get()
store_page_info.delay(url, info)
@app.task
def fetch_page(url):
return myhttplib.get(url)
@app.task
def parse_page(page):
return myparser.parse_document(page)
@app.task
def store_page_info(url, info):
return PageInfo.objects.create(url, info)
```
這是一個好的範例:
```python=
def update_page_info(url):
# fetch_page -> parse_page -> store_page
chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)
chain()
@app.task()
def fetch_page(url):
return myhttplib.get(url)
@app.task()
def parse_page(page):
return myparser.parse_document(page)
@app.task(ignore_result=True)
def store_page_info(info, url):
PageInfo.objects.create(url=url, info=info)
```
好的範例中利用`chain`搭配[signature()](https://docs.celeryproject.org/en/stable/reference/celery.html#celery.signature)的方式來鏈結任務,這部份可以參閱[Canvas: Designing Work-flows](https://docs.celeryproject.org/en/stable/getting-started/next-steps.html#designing-workflows)。
預設情況下Celery並不允許你在任務中同步執行子任務,但在極少數情況下也許需要這麼做。但是**不建議啟用同步執行子任務**。
```python
@app.task
def update_page_info(url):
page = fetch_page.delay(url).get(disable_sync_subtasks=False)
info = parse_page.delay(url, page).get(disable_sync_subtasks=False)
store_page_info.delay(url, info)
@app.task
def fetch_page(url):
return myhttplib.get(url)
@app.task
def parse_page(url, page):
return myparser.parse_document(page)
@app.task
def store_page_info(url, info):
return PageInfo.objects.create(url, info)
```
### Performance and Strategies¶
#### Granularity(間隔、頻率、粒度)
任務的粒度是每個子任務所需的計算量。一般來說,最好的方式是將問題分解為很多小任務,而不是去跑一個執行時間長的任務。
切割成較小的任候之後,你可以同步處理更多任務,而且任務不會因為執行時間過長去影響worker處理其它待執行的任務。
然而,執行任務確實會產生開銷。訊息需要被派送,資料也許不在本地端…等等。因此,如果任務的細粒度過細,所增加的開銷可能會超過切割任務所帶來的好處。
:::info
See also:
[Art of Concurrency](http://oreilly.com/catalog/9780596521547)
:::
#### Data locality
處理任務的worker要盡可能的離資料源近一點。最好的方式是在記憶體中存有複本,最糟的就是在另一個地方傳輸過來。
假如資料源很遠,你可以試著在該位置執行另一個本地端的worker,假如不可能這麼做,緩存常用資料,或預先讀入將被使用的資料。
在worker之間共享資料最簡單的方式就是使用分佈式緩存系統(distributed cache system),像[memcached](http://memcached.org/)
:::info
See also:
[Distributed Computing Economics](https://www.microsoft.com/en-us/research/publication/distributed-computing-economics/?from=http%3A%2F%2Fresearch.microsoft.com%2Fpubs%2F70001%2Ftr-2003-24.pdf)
:::
#### State
由於Celery是一種分佈式系統架構,你無法知道過程,或任務將被那一個worker執行。你甚至不知道任務是否會被及時處理。
俗話說『asserting the worlds is responsibility of the task』,這意味著,從任務被請求之後,世界觀已經發生變化,因此任務有責任要去確認世界現在是什麼樣子的;假如你有一個重新索引搜索引擎的任務,而且搜索引擎應該最多每5分鐘重新索引,那這個任務就有責任去對這一點做斷言,而不是去呼叫這任務的有這個責任。
另一個問題是Django的model object。它們不應該被當參數傳遞給任務。在任務執行的時候從資料庫重新取得最新資料是最好的方式,因為使用舊資料可能會導致race conditions(競賽條件)。
想像一下下面的場景,你有一篇文章跟自動展開縮寫的任務:
```python
class Article(models.Model):
title = models.CharField()
body = models.TextField()
@app.task
def expand_abbreviations(article):
article.body.replace('MyCorp', 'My Corporation')
article.save()
```
首先,作者建立一篇文章,然後保存它,接著作者按下啟動縮寫任務的按鈕:
```python
>>> article = Article.objects.get(id=102)
>>> expand_abbreviations.delay(article)
```
現在隊列非常忙碌,任務不再是2分鐘就可以完成。同時,另一位作者也對相同的文章做了編輯,最終當任務完成的時候,文章內容會是舊版的內容,因為任務內的參數是舊文章資料。
修正這個競賽條件是很簡單的,只要用文章id在任務中重新取得文章內容:
```python
@app.task
def expand_abbreviations(article_id):
article = Article.objects.get(id=article_id)
article.body.replace('MyCorp', 'My Corporation')
article.save()
```
```python
>>> expand_abbreviations.delay(article_id)
```
這種方式或許可以有更好的性能,因為發送大量的訊息所耗成本不低。
#### Database transactions
我們來看另一個案例:
```python
from django.db import transaction
from django.http import HttpResponseRedirect
@transaction.atomic
def create_article(request):
article = Article.objects.create()
expand_abbreviations.delay(article.pk)
return HttpResponseRedirect('/articles/')
```
上面是在資料庫內建立文章對象的Django view,它會傳遞primary key給一個任務。它使用了裝飾器`transaction.atomic`,它將在view回傳的時候commit掉這個transaction,或是因為拋出異常而roll back。
如果任務跑的比commit掉這個transaction還要快的話,會有race condition的狀況;因為資料庫物件是還不存在的。
解決方案是在`on_commit`的時候做callback來發起Celery的任務,以確保是在交易成功之後才發起任務的。
```python
from django.db.transaction import on_commit
def create_article(request):
article = Article.objects.create()
on_commit(lambda: expand_abbreviations.delay(article.pk))
```
### Example
我們來建立一個實際的範例吧:Blog,留言貼文需要過濾垃圾郵件。當一個留言建立的時候,垃圾郵件過濾器在背景執行,使用者不需要等待它完成。
我有一個Django Blog application 允許在文章上留言。我將描述這個應用程式的Models/Views與任務。
#### blog/models.py
留言的model看起來是這樣:
```python
from django.db import models
from django.utils.translation import ugettext_lazy as _
class Comment(models.Model):
name = models.CharField(_('name'), max_length=64)
email_address = models.EmailField(_('email address'))
homepage = models.URLField(_('home page'),
blank=True, verify_exists=False)
comment = models.TextField(_('comment'))
pub_date = models.DateTimeField(_('Published date'),
editable=False, auto_add_now=True)
is_spam = models.BooleanField(_('spam?'),
default=False, editable=False)
class Meta:
verbose_name = _('comment')
verbose_name_plural = _('comments')
```
在發表留言的view中,我首先將留言寫入資料庫,然後再於背景啟動垃圾郵件過濾器。
#### blog/views.py
```python
from django import forms
from django.http import HttpResponseRedirect
from django.template.context import RequestContext
from django.shortcuts import get_object_or_404, render_to_response
from blog import tasks
from blog.models import Comment
class CommentForm(forms.ModelForm):
class Meta:
model = Comment
def add_comment(request, slug, template_name='comments/create.html'):
post = get_object_or_404(Entry, slug=slug)
remote_addr = request.META.get('REMOTE_ADDR')
if request.method == 'post':
form = CommentForm(request.POST, request.FILES)
if form.is_valid():
comment = form.save()
# Check spam asynchronously.
tasks.spam_filter.delay(comment_id=comment.id,
remote_addr=remote_addr)
return HttpResponseRedirect(post.get_absolute_url())
else:
form = CommentForm()
context = RequestContext(request, {'form': form})
return render_to_response(template_name, context_instance=context)
```
這邊,為了過濾垃圾留言,我使用[Akismet](http://akismet.com/faq/),這個服務用於Wordpress。Akismet個人使用免費,但是商業使用就要收一點點費用,註冊之後就可以得到API key。
#### blog/tasks.py
```python
from celery import Celery
from akismet import Akismet
from django.core.exceptions import ImproperlyConfigured
from django.contrib.sites.models import Site
from blog.models import Comment
app = Celery(broker='amqp://')
@app.task
def spam_filter(comment_id, remote_addr=None):
logger = spam_filter.get_logger()
logger.info('Running spam filter for comment %s', comment_id)
comment = Comment.objects.get(pk=comment_id)
current_domain = Site.objects.get_current().domain
akismet = Akismet(settings.AKISMET_KEY, 'http://{0}'.format(domain))
if not akismet.verify_key():
raise ImproperlyConfigured('Invalid AKISMET_KEY')
is_spam = akismet.comment_check(user_ip=remote_addr,
comment_content=comment.comment,
comment_author=comment.name,
comment_author_email=comment.email_address)
if is_spam:
comment.is_spam = True
comment.save()
return is_spam
```
## History
20190720_依據4.3版本說明調整
20211207_依據5.2版本說明調整