# Celery_Canvas: Designing Work-flows
###### tags: `celery` `celery 5.2` `python`
[官方連結_Canvas: Designing Work-flows](https://docs.celeryproject.org/en/stable/userguide/canvas.html)
## Canvas: Designing Work-flows
### Signatures
*New in version 2.0.*
你剛在[calling](https://docs.celeryproject.org/en/stable/userguide/calling.html#guide-calling)的教學指南中學會如何使用任務延遲的方法來調用一個任務,而這通常就是你所需要的,不過有時候你也許希望將任務調用的簽章(signature)傳給另一個process,或做為另一個函數的參數。
[signature()](https://docs.celeryproject.org/en/stable/reference/celery.html#celery.signature)以某種方式來它包裝單一任務(single task)調用的參數、關鍵參數(keyword arguments)與執行選項,以便可以將之傳遞給函數,甚至可以序列化並跨線發送。
* 你可以幫範例中那個加法任務使用它的名稱來建立一個簽章(signature):
```python
>>> from celery import signature
>>> signature('tasks.add', args=(2, 2), countdown=10)
tasks.add(2, 2)
```
這個任務帶兩個引數數目的簽章(兩個參數):`(2, 2)`,並設置`countdown=10`。
* 或者你可以用任務的方法`signature`建立一個簽章:
```python
>>> add.signature((2, 2), countdown=10)
tasks.add(2, 2)
```
* 還有一種使用星型參數(star arguments)的快捷方式:
```python
>>> add.s(2, 2)
tasks.add(2, 2)
```
* 也支援關鍵參數(keyword arguments):
```python
>>> add.s(2, 2, debug=True)
tasks.add(2, 2, debug=True)
```
* 從任何的簽章實例(signature instance),你可以檢查不同的欄位:
```python
>>> s = add.signature((2, 2), {'debug': True}, countdown=10)
>>> s.args
(2, 2)
>>> s.kwargs
{'debug': True}
>>> s.options
{'countdown': 10}
```
* 支援`delay`、`apply_async`等`Calling API`,包括直接呼叫`(__call__)`
* 調用簽章將會在當前process中內聯執行該任務
```python
>>> add(2, 2)
4
>>> add.s(2, 2)()
4
```
* `delay`的話,則是我們喜歡的快捷方式,用於帶有星型參數(star arguments)的`apply_async`
```python
>>> result = add.delay(2, 2)
>>> result.get()
4
```
* `apply_async`與[`app.Task.apply_async()`](https://docs.celeryproject.org/en/stable/reference/celery.app.task.html#celery.app.task.Task.apply_async)帶有相同的參數:
```python
>>> add.apply_async(args, kwargs, **options)
>>> add.signature(args, kwargs, **options).apply_async()
>>> add.apply_async((2, 2), countdown=1)
>>> add.signature((2, 2), countdown=1).apply_async()
```
* 用`s()`的話你不能定義選項,但鏈結`set`可以處理這問題:
```python
>>> add.s(2, 2).set(countdown=1)
proj.tasks.add(2, 2)
```
#### Partials
使用簽章(signature),你就可以在worker執行任務:
```python
>>> add.s(2, 2).delay()
>>> add.s(2, 2).apply_async(countdown=1)
```
或者你可以在當前的process中直接調用:
```python
>>> add.s(2, 2)()
4
```
指定額外的參數(args)、關鍵參數(kwargs)或選項(options)給`apply_async/delay`將建立部份(partial,應該是指partial function):
* 任何加入的參數都會被加到簽章(signature)的args前面:
```python
>>> partial = add.s(2) # incomplete signature
>>> partial.delay(4) # 4 + 2
>>> partial.apply_async((4,)) # same
```
* 任何加入的關鍵參數都會被合併到簽章(signature)的kwargs,後面定義的為主:
```python
>>> s = add.s(2, 2)
>>> s.delay(debug=True) # -> add(2, 2, debug=True)
>>> s.apply_async(kwargs={'debug': True}) # same
```
* 任何加入的選項都會被合併到簽章(signature)的options,後面定義的為主:
```python
>>> s = add.signature((2, 2), countdown=10)
>>> s.apply_async(countdown=1) # countdown is now 1
```
你也可以克隆(clone)簽章(signature)來建立一個衍生類別:
```python
>>> s = add.s(2)
proj.tasks.add(2)
>>> s.clone(args=(4,), kwargs={'debug': True})
proj.tasks.add(4, 2, debug=True)
```
#### Immutability
*New in version 3.0.*
Partials就是說,你希望可以跟callbacks一起去搭配使用,任何鏈結的任務或chord callbacks都可以與父任務的結果一起應用。有時候你希望指定一個不能帶有額外參數的callback,這種情況下你可以將簽章(signature)設置為不可變的:
```python
>>> add.apply_async((2, 2), link=reset_buffers.signature(immutable=True))
```
`.si()`也可以用來建立不可變的簽章(signature):
```python
>>> add.apply_async((2, 2), link=reset_buffers.si())
```
當簽章(signature)不可變的時候,你就只能設置執行項目,因此,這種時候你就不可能用partial args/kwargs來呼叫簽章(signature)。
:::info
**Note:**
這個教程中,有時候我會用`~`來代表簽章(signature)。你可能不應該在你的程式碼中這麼用,但那在Python shell中實驗的時候是一個很便利的快捷方式。
```python
>>> ~sig
>>> # is the same as
>>> sig.delay().get()
```
:::
#### Callbacks
*New in version 3.0.*
使用`apply_async`的參數`link`,你可以將callbacks加到任何的任務之中:
```python
add.apply_async((2, 2), link=other_task.s())
```
只有在任務成功地退出的時候才會執行callback,並且以父任務的回傳值做為參數來使用。
正如之前提過的,你加到簽章(signature)的任何參數,都將被預置到簽章(signature)本身的指定參數之前!
如果你有簽章(signature):
```python
>>> sig = add.s(10)
```
那`sig.delay(result)`就會變成:
```python
>>> add.apply_async(args=(result, 10))
```
現在,讓我們呼叫加法的那個任務`add`,並使用帶有部份參數(partial arguments)的callback:
```python
>>> add.apply_async((2, 2), link=add.s(8))
```
如預期那般,首先會啟動一個任務來計算`2+2`,然後再另一個任務來計算`4+8`
### The [Primitives](http://terms.naer.edu.tw/detail/17040529/)
*New in version 3.0.*
:::info
**Overview**
* group
group primitive是一種帶有要平行執行任務清單的簽章(signature)
* chain
chain primitive讓我們將簽章(signature)鏈結在一起,以便一個一個呼叫,本質上形成一系列的callbacks。
* chord
chord類似於group,只是多帶了callback。一個chord包含一個標題群組(header group)與主體(body),其中主體(body)就是所有在標題內的任務完成之後要執行的任務。
* map
map primitive的原理類似內置函數`map`,但是會建立一個臨時任務(temporary task),其參數列表會應用於任務中。舉例來說,`task.map([1, 2])` - 結果就是調用單個任務,將參數按序應用於任務函數,因此結果是:
```python
res = [task(1), task(2)]
```
* starmap
原理跟上面的`map`完全一樣,就差個星號參數(`*args`)。舉例來說,`add.starmap([(2, 2), (4, 4)])`,結果就是調用單個任務:
```python
res = [add(2, 2), add(4, 4)]
```
* chunks
chunking會將一串很長的參數清單拆成多個部份,如範例操作:
```python
>>> items = zip(range(1000), range(1000)) # 1000 items
>>> add.chunks(items, 10)
```
範例操作會將清單分成10個chunks,從而產生100個任務(每個任務按順序處理10個項目)
:::
基元(primitives)本身也是簽章(signature)物件,因此他們可以以各種方式結合來組成複雜的工作流程。
這裡有一些範例:
* Simple chain
這是一個簡單的鏈結範例,第一個任務執行傳遞它的回傳值給鏈結中下一個任務,以此類推。
```python
>>> from celery import chain
>>> # 2 + 2 + 4 + 8
>>> res = chain(add.s(2, 2), add.s(4), add.s(8))()
>>> res.get()
16
```
也可以利用管道(pipes)來寫:
```python
>>> (add.s(2, 2) | add.s(4) | add.s(8))().get()
16
```
* Immutable signatures
簽章(signature)可以是部份函數(partial),因此參數可以加入到現在參數中,但你可能不總是希望這麼做,舉例來說,如果你不希望使用鏈結(chain)中上一個任務的回傳結果。
這種情況下,你可以讓簽章(signature)不可變(immutable),那參數就不會被改變:
```python
>>> add.signature((2, 2), immutable=True)
```
也可以有另一種快捷方式可以使用,`.si()`,而這也是建立簽章(signature)的首選方式:
```python
>>> add.si(2, 2)
```
現在你可以建立一連串互不相關的任務:
```python
>>> res = (add.si(2, 2) | add.si(4, 4) | add.si(8, 8))()
>>> res.get()
16
>>> res.parent.get()
8
>>> res.parent.parent.get()
4
```
* Simple group
你可以很輕易的建立一組平行執行任務:
```python
>>> from celery import group
>>> res = group(add.s(i, i) for i in range(10))()
>>> res.get(timeout=1)
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
```
* Simple chord
chord primitive讓我們可以增加一個callback,當group中的所有任務執行完畢之後可以調用。這對簡易的平行計算問題(embarrassingly parallel)來說通常是需要的:
```python
>>> from celery import chord
>>> res = chord((add.s(i, i) for i in range(10)), xsum.s())()
>>> res.get()
90
```
上面的範例建立10個平行啟動的任務,當所有任務完成之後,回傳值會合併為一個list,然後送到`xsum task`。
chord的主體(body)本身可以是不變的(immutable),這樣group的回傳值<sub>(意指上面範例合併為list的回傳)</sub>就不會傳遞給callback:
```python
>>> chord((import_contact.s(c) for c in contacts),
... notify_complete.si(import_id)).apply_async()
```
注意到上面使用`.si()`;這會產生一個不可變的簽章(signature),意思是你給它任何的參數(包含上一個任務的回傳值)都會被忽略。
* Blow your mind by combining
chains也可以平行化:
```python
>>> c1 = (add.s(4) | mul.s(8))
# (16 + 4) * 8
>>> res = c1(16)
>>> res.get()
160
```
這意味著你可以結合chains:
```python
# ((4 + 16) * 2 + 4) * 8
>>> c2 = (add.s(4, 16) | mul.s(2) | (add.s(4) | mul.s(8)))
>>> res = c2()
>>> res.get()
352
```
將一個group與另一個任務鏈結在一起會自動升級為一個chord:
```python
>>> c3 = (group(add.s(i, i) for i in range(10)) | xsum.s())
>>> res = c3()
>>> res.get()
90
```
groups與chords也接收部份參數(partial arguments),因此在chain中,前一個任務的回傳值會被發送到group中的所有任務:
```python
>>> new_user_workflow = (create_user.s() | group(
... import_contacts.s(),
... send_welcome_email.s()))
... new_user_workflow.delay(username='artv',
... first='Art',
... last='Vandelay',
... email='art@vandelay.com')
```
如果你不想要發送參數到group,你可以讓簽章(signature)在group中不可變。
```python
>>> res = (add.s(4, 4) | group(add.si(i, i) for i in range(10)))()
>>> res.get()
<GroupResult: de44df8c-821d-4c84-9a6a-44769c738f98 [
bc01831b-9486-4e51-b046-480d7c9b78de,
2650a1b8-32bf-4771-a645-b0a35dcc791b,
dcbee2a5-e92d-4b03-b6eb-7aec60fd30cf,
59f92e0a-23ea-41ce-9fad-8645a0e7759c,
26e1e707-eccf-4bf4-bbd8-1e1729c3cce3,
2d10a5f4-37f0-41b2-96ac-a973b1df024d,
e13d3bdb-7ae3-4101-81a4-6f17ee21df2d,
104b2be0-7b75-44eb-ac8e-f9220bdfa140,
c5c551a5-0386-4973-aa37-b65cbeb2624b,
83f72d71-4b71-428e-b604-6f16599a9f37]>
>>> res.parent.get()
8
```
#### Chains
*New in version 3.0.*
任務可以被連結在一起:被連結的任務會在前一個任務成功回傳的時候被調用:
```python
>>> res = add.apply_async((2, 2), link=mul.s(16))
>>> res.get()
64
```
被連結的任務將帶有它的父任務的結果做為第一個參數。在上面的情況中,結果是64,它是因為`mul(4, 16)`所導致。
任務的結果將跟踪原始任務調用的任何子任務,這可以從結果實例(result instance)存取:
```python
>>> res.children
[<AsyncResult: 8c350acf-519d-4553-8a53-4ad3a5c5aeb4>]
>>> res.children[0].get()
64
```
這個結果實例(result instance)還有一個方法[colletc()](https://docs.celeryproject.org/en/stable/reference/celery.result.html#celery.result.AsyncResult.collect),它將結果視為一個圖(graph),讓你可以迭代這個結果:
```python
>>> list(res.collect())
[(<AsyncResult: 7b720856-dc5f-4415-9134-5c89def5664e>, 4),
(<AsyncResult: 8c350acf-519d-4553-8a53-4ad3a5c5aeb4>, 64)]
```
預設情況下,如果圖(graph)沒有完全形成(其中一個任務尚未完成),那[colletc()](https://docs.celeryproject.org/en/stable/reference/celery.result.html#celery.result.AsyncResult.collect)會拋出異常[IncompleteStream](https://docs.celeryproject.org/en/stable/reference/celery.exceptions.html#celery.exceptions.IncompleteStream),但是你也可以得到一個中間的表示:
```python
>>> for result, value in res.collect(intermediate=True)):
....
```
你可以依你所需來連結任意數量的任務,而且簽章(signature)也是可以拿來連結的:
```python
>>> s = add.s(2, 2)
>>> s.link(mul.s(4))
>>> s.link(log_result.s())
```
你也可以使用方法`on_error`來增加一個error callbacks:
```python
>>> add.s(2, 2).on_error(log_error.s()).delay()
```
當簽章(signature)被執行的時候將導致下面`.apply_async`調用:
```python
>>> add.apply_async((2, 2), link_error=log_error.s())
```
worker實際上不會將errback做為一個任務來調用,而是直接呼叫這個errback function,這樣,原始的請求,異常與回溯物件就可以傳遞給它。
下面給出一個errback範例:
```python
from __future__ import print_function
import os
from proj.celery import app
@app.task
def log_error(request, exc, traceback):
with open(os.path.join('/var/errors', request.id), 'a') as fh:
print('--\n\n{0} {1} {2}'.format(
task_id, exc, traceback), file=fh)
```
為了更簡單的將任務連結在一起,有一個特殊的簽章(signature)稱為chain,它讓你可以將任務鏈結在一起:
```python
>>> from celery import chain
>>> from proj.tasks import add, mul
>>> # (4 + 4) * 8 * 10
>>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))
proj.tasks.add(4, 4) | proj.tasks.mul(8) | proj.tasks.mul(10)
```
你去呼叫這個chain的話,它就會在當下的process中執行裡面的任務,然後回傳chain中最後一個任務的結果:
```python
>>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))()
>>> res.get()
640
```
它還設有屬性`parent`,因此你可以沿著chain來取得中間任務的結果:
```python
>>> res.parent.get()
64
>>> res.parent.parent.get()
8
>>> res.parent.parent
<AsyncResult: eeaad925-6778-4ad1-88c8-b2a63d017933>
```
當然,你也可以用符號`|`(pipe)來操作鏈結:
```python
>>> (add.s(2, 2) | mul.s(8) | mul.s(10)).apply_async()
```
##### Graphs
此外,你可以將結果圖(result graph)視為[DependencyGraph](https://docs.celeryproject.org/en/stable/internals/reference/celery.utils.graph.html#celery.utils.graph.DependencyGraph):
```python
>>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))()
>>> res.parent.parent.graph
285fa253-fcf8-42ef-8b95-0078897e83e6(1)
463afec2-5ed4-4036-b22d-ba067ec64f52(0)
872c3995-6fa0-46ca-98c2-5a19155afcf0(2)
285fa253-fcf8-42ef-8b95-0078897e83e6(1)
463afec2-5ed4-4036-b22d-ba067ec64f52(0)
```
甚至還可以將這些圖轉換為`dot`格式:
```python
>>> with open('graph.dot', 'w') as fh:
... res.parent.parent.graph.to_dot(fh)
```
然後建立照片:
```shell
$ dot -Tpng graph.dot -o graph.png
```
#### Groups
*New in version 3.0.*
group可以用來平行執行多個任務。
函數[group](https://docs.celeryproject.org/en/stable/reference/celery.html#celery.group)會帶著簽章的清單(a list of signatures):
```python
>>> from celery import group
>>> from proj.tasks import add
>>> group(add.s(2, 2), add.s(4, 4))
(proj.tasks.add(2, 2), proj.tasks.add(4, 4))
```
如果你呼叫該群組,任務會在當前的process一個接一個執行,並且回傳[GroupResult](https://docs.celeryproject.org/en/stable/reference/celery.result.html#celery.result.GroupResult),這可以用來追蹤結果,或是告知那些任務已經完成,等等。
```python
>>> g = group(add.s(2, 2), add.s(4, 4))
>>> res = g()
>>> res.get()
[4, 8]
```
group也支援迭代:
```python
>>> group(add.s(i, i) for i in range(100))()
```
group是一種簽章(signature)物件,因此它可以跟其它簽章(signature)結合應用。
##### Group Callbacks and Error Handling
groups也可以跟callback、errback兩種簽章(signature)結合使用,不過阿,因為groups並不是真正的任務,它只是把連結的任務向下傳遞它所封裝的簽章(signature),所以有時候得到的結果會讓你嚇一跳。這意味著group的回傳值並不會像chain一樣傳給連結的回呼的簽章(callback signature)。如下範例,它那樣使用`add(a, b)`是錯的,因為連結的`add.s()`並不會如預期一般的接收group最終的結果。
```python
>>> g = group(add.s(2, 2), add.s(4, 4))
>>> g.link(add.s())
>>> res = g()
[4, 8]
```
注意到,前面兩個任務最終還是會回傳結果,只是啊,那個回呼的簽章(callback signature)會在背景執行,然後拋出異常,因為它並不會接收到預期的那兩個參數。
group errbacks也被傳遞給封裝的簽章(signature),這樣的作法就可以達到一次的連結,群組中多個任務的失敗都可以調用到那個errback。如下範例,群組中那幾個會拋出異常的任務`fail()`都會呼叫一次簽章(signature)`log_error()`
```python
>>> g = group(fail.s(), fail.s())
>>> g.link_error(log_error.s())
>>> res = g()
```
考慮到這一點,我們通常建議用那些能夠接受重覆呼叫像是冪等(idempotent)或是計數類型的任務來做為errbacks。
某些支援後端實現(backend implementations)的的chord class可以更好的處理這類問題。
##### Group Results
群組任務也會回傳一個特別的結果,就像是普通任務的結果那樣,只是說,它是整個群組任務的結果:
```python
>>> from celery import group
>>> from tasks import add
>>> job = group([
... add.s(2, 2),
... add.s(4, 4),
... add.s(8, 8),
... add.s(16, 16),
... add.s(32, 32),
... ])
>>> result = job.apply_async()
>>> result.ready() # have all subtasks completed?
True
>>> result.successful() # were all subtasks successful?
True
>>> result.get()
[4, 8, 16, 32, 64]
```
[GroupResult](https://docs.celeryproject.org/en/stable/reference/celery.result.html#celery.result.GroupResult)帶有一個[AsyncReslut list](https://docs.celeryproject.org/en/stable/reference/celery.result.html#celery.result.AsyncResult)的清單,對它們的操作就像是對單一任務的操作那樣。
它支援下列操作:
* successful()
如果所有的子任務都成功的完成則回傳True
* failed()
如果任一個子任務失敗則回傳True
* waiting()
如果任一個子任務未準備就緒則回傳True
* ready()
如果所有子任務都準備就緒則回傳True
* completed_count()
回傳子任務已完成數量
* revoke()
取消所有子任務
* join()
收集所有子任務的結果,並按調用它們的相同順序回傳它們(list格式)
#### Chords
*New in version 2.3.*
:::info
**Note:**
在chord中使用的任務是不能忽略結果的。如果你的chord中的任一個任務的result backend是disabled的,那你應該閱讀["Important Notes"](https://docs.celeryproject.org/en/stable/userguide/canvas.html#chord-important-notes)。另外要注意的是,chords目前並不支援RPC類型的result backend。
:::
chord是一種只會在group內所有的任務都執行完成之後才會執行的任務。
讓我們來計算一下總計的表達式,`1+1+2+2+3+3....n+n`一路加到100。
首先,你需要兩個任務,`add()`與`tsum()`([sum()](https://docs.python.org/dev/library/functions.html#sum)已經是一個標準函數):
```python
@app.task
def add(x, y):
return x + y
@app.task
def tsum(numbers):
return sum(numbers)
```
現在你可以用chord來平行計算每一個加法步驟,然後取得加總的數值:
```python
>>> from celery import chord
>>> from tasks import add, tsum
>>> chord(add.s(i, i)
... for i in range(100))(tsum.s()).get()
9900
```
這很明顯的是非常刻意的範例,消息傳遞和同步的成本開銷,使其速度比對應的Python慢得多:
```python
>>> sum(i + i for i in range(100))
```
同步的成本所費不貲,你應該盡可能的避免使用chord。不過,chord是工具箱中非常強大的基元(primitive),因為同步是許多平行演算法的必要步驟。
讓我們拆解chord的表達式:
```python
>>> callback = tsum.s()
>>> header = [add.s(i, i) for i in range(100)]
>>> result = chord(header)(callback)
>>> result.get()
9900
```
記得,只有在header內所有任務都回傳之後才會執行callback。header內的每一個步驟都被視為任務來執行,記得,是平行執行(in parallel),所以可能會在不同的節點上執行。然後再利用header內的每一個任務的回傳值來執行callback。`chord()`回傳的`task id`就是callback的id,因此你可以等待它完成,並且得到最終的回傳值。(但是記得,[never have a task wait for other tasks](https://docs.celeryproject.org/en/stable/userguide/tasks.html#task-synchronous-subtasks))
##### Error handling
當一個任務拋出異常的時候會發生什麼事?
chord的callback結果會轉換為失敗狀態,錯誤被設置為異常[ChordError](https://docs.celeryproject.org/en/stable/reference/celery.exceptions.html#celery.exceptions.ChordError):
```python
>>> c = chord([add.s(4, 4), raising_task.s(), add.s(8, 8)])
>>> result = c()
>>> result.get()
```
```shell
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "*/celery/result.py", line 120, in get
interval=interval)
File "*/celery/backends/amqp.py", line 150, in wait_for
raise meta['result']
celery.exceptions.ChordError: Dependency 97de6f3f-ea67-4517-a21c-d867c61fcb47
raised ValueError('something something',)
```
也許追溯會有所不同,這取決於你使用的result backend,不過你可以看到異常的說明包含失敗的`task id`與原始異常的字串表示。你還可以在`result.traceback`發現原始追溯。
注意,其餘的任務還是會繼續執行,因此即使中間的任務執行失敗,第三個任務,`task(add.s(8, 8))`依然會繼續執行。還有,[ChordError](https://docs.celeryproject.org/en/stable/reference/celery.exceptions.html#celery.exceptions.ChordError)只會顯示第一個失敗的任務(及時):它並不會按著`header group`的順序。
如果要在chord執行失敗的時候執行一個動作,你可以在chord的callback加入errback:
```python
@app.task
def on_chord_error(request, exc, traceback):
print('Task {0!r} raised error: {1!r}'.format(request.id, exc))
```
```python
>>> c = (group(add.s(i, i) for i in range(10)) |
... xsum.s().on_error(on_chord_error.s())).delay()
```
chords也許會有callback與errback兩種簽章(signatures)連結著一起用,某種角度來說,這也解決了將簽章(signatures)連結到group的一些問題。當你這麼做的時候,你會把所提供的簽章(signature)連結到chord的主體(body),你可以預期的到,在主體(body)完成之後可以一個漂亮的轉身的呼叫一次這個callbacks,或者是當chord的header或body失敗的必的調用一次errbacks。
##### Important Notes
在chord中使用的任務是不能忽略它們的結果。實務上,這意味著為了使用chord你必需啟用`result_backend`。此外,如果你配置中的`task_ignore_result`設置為True,那就必需確保chord中的每一個任務都設置`ignore_result=False`。這適用於`Task subclasses`與`decorated tasks`。
範例-task subcalsses:
```python
class MyTask(Task):
ignore_result = False
```
範例-decorated tasks:
```python
@app.task(ignore_result=False)
def another_task(project):
do_something()
```
預設情況下,同步的步驟是透過每一秒鐘讓重複任務(recurring task)輪詢一次來完成的,並在準備就緒時調用簽章(signature)來實現。
實作範例:
```python
from celery import maybe_signature
@app.task(bind=True)
def unlock_chord(self, group, callback, interval=1, max_retries=None):
if group.ready():
return maybe_signature(callback).delay(group.join())
raise self.retry(countdown=interval, max_retries=max_retries)
```
除了Redis與Memcached之外的result backend都使用這個方式:它們在header中的每一個任務之後增加一個計數器,然後在計數器超過集合中的任務數量的時候執行callback。
Redis與Memcached實現了更好的解決方案,但在其它的backends上並不好實作。
:::info
**Note:**
Chord不支援Redis-2.2之前的版本;你必需更新redis-server至少2.2之後版本才能使用。
:::
:::info
**Note:**
如果你正在使用chord,且搭配的result backend是Redis的話,並且你還覆寫`Task.after_return()`這個方法,那你應該確認有調用父類方法(用`super()`),否則chord的callback不會被執行。
```python
def after_return(self, *args, **kwargs):
do_something()
super(MyTask, self).after_return(*args, **kwargs)
```
:::
#### Map & Starmap
`map`與`starmap`是內建的任務類型,它會對序列中的每一個元素調用所提供的任務。
他們與[group](https://docs.celeryproject.org/en/stable/reference/celery.html#celery.group)的不同在於:
* 只會派送一個任務訊息
* 按序執行
使用map的範例如下:
```python
>>> from proj.tasks import add
>>> ~xsum.map([range(10), range(100)])
[45, 4950]
```
與下面任務的執行結果一樣:
```python
@app.task
def temp():
return [xsum(range(10)), xsum(range(100))]
```
使用starmap:
```python
>>> ~add.starmap(zip(range(10), range(10)))
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
```
與下面任務的執行結果一樣:
```python
@app.task
def temp():
return [add(i, i) for i in range(10)]
```
map與starmap都是簽章(signature)物件,因此他們可以結合其它的簽章(signature)一起使用,舉例來說,在十秒之後調用starmap:
```python
>>> add.starmap(zip(range(10), range(10))).apply_async(countdown=10)
```
#### Chunks
chunking讓你將可迭代的工作分割,因此如果你有一百萬個物件,你可以建立十個各自帶有十萬個物件的任務。
有些人可能會擔心你這樣切任務會導致平行性降低,但是這對忙碌的集群來說是很少見的,實際上,因為你避免掉了訊息派送的開銷,它可能會因此提高效能。
你可以使用[app.Task.chunks()](https://docs.celeryproject.org/en/stable/reference/celery.app.task.html#celery.app.task.Task.chunks)建立一個chunks signature:
```python
>>> add.chunks(zip(range(100), range(100)), 10)
```
就跟group一樣,當你呼叫chunk的時候,chunks派送訊息的動作會在當前的process發生:
```python
>>> from proj.tasks import add
>>> res = add.chunks(zip(range(100), range(100)), 10)()
>>> res.get()
[[0, 2, 4, 6, 8, 10, 12, 14, 16, 18],
[20, 22, 24, 26, 28, 30, 32, 34, 36, 38],
[40, 42, 44, 46, 48, 50, 52, 54, 56, 58],
[60, 62, 64, 66, 68, 70, 72, 74, 76, 78],
[80, 82, 84, 86, 88, 90, 92, 94, 96, 98],
[100, 102, 104, 106, 108, 110, 112, 114, 116, 118],
[120, 122, 124, 126, 128, 130, 132, 134, 136, 138],
[140, 142, 144, 146, 148, 150, 152, 154, 156, 158],
[160, 162, 164, 166, 168, 170, 172, 174, 176, 178],
[180, 182, 184, 186, 188, 190, 192, 194, 196, 198]]
```
呼叫`.apply_async`會建立一個專屬的任務,以便在worker中執行各別的任務:
```python
>>> add.chunks(zip(range(100), range(100)), 10).apply_async()
```
你也可以將chunks轉為group:
```python
>>> add.chunks(zip(range(100), range(100)), 10).apply_async()
```
使用`group.skew`,每一個任務的倒數以一為單位增加:
```python
>>> group.skew(start=1, stop=10)()
```
這意味著第一個任務會倒數一秒,第二個會倒數兩秒,以此類推。
## History
20190725_依據4.3版本說明調整
20220105_依據5.2版本說明調整