celery
celery 5.2
python
官方連結_Canvas: Designing Work-flows
New in version 2.0.
你剛在calling的教學指南中學會如何使用任務延遲的方法來調用一個任務,而這通常就是你所需要的,不過有時候你也許希望將任務調用的簽章(signature)傳給另一個process,或做為另一個函數的參數。
signature()以某種方式來它包裝單一任務(single task)調用的參數、關鍵參數(keyword arguments)與執行選項,以便可以將之傳遞給函數,甚至可以序列化並跨線發送。
這個任務帶兩個引數數目的簽章(兩個參數):(2, 2)
,並設置countdown=10
。
signature
建立一個簽章:支援delay
、apply_async
等Calling API
,包括直接呼叫(__call__)
delay
的話,則是我們喜歡的快捷方式,用於帶有星型參數(star arguments)的apply_async
apply_async
與app.Task.apply_async()
帶有相同的參數:s()
的話你不能定義選項,但鏈結set
可以處理這問題:使用簽章(signature),你就可以在worker執行任務:
或者你可以在當前的process中直接調用:
指定額外的參數(args)、關鍵參數(kwargs)或選項(options)給apply_async/delay
將建立部份(partial,應該是指partial function):
你也可以克隆(clone)簽章(signature)來建立一個衍生類別:
New in version 3.0.
Partials就是說,你希望可以跟callbacks一起去搭配使用,任何鏈結的任務或chord callbacks都可以與父任務的結果一起應用。有時候你希望指定一個不能帶有額外參數的callback,這種情況下你可以將簽章(signature)設置為不可變的:
.si()
也可以用來建立不可變的簽章(signature):
當簽章(signature)不可變的時候,你就只能設置執行項目,因此,這種時候你就不可能用partial args/kwargs來呼叫簽章(signature)。
Note:
這個教程中,有時候我會用~
來代表簽章(signature)。你可能不應該在你的程式碼中這麼用,但那在Python shell中實驗的時候是一個很便利的快捷方式。
New in version 3.0.
使用apply_async
的參數link
,你可以將callbacks加到任何的任務之中:
只有在任務成功地退出的時候才會執行callback,並且以父任務的回傳值做為參數來使用。
正如之前提過的,你加到簽章(signature)的任何參數,都將被預置到簽章(signature)本身的指定參數之前!
如果你有簽章(signature):
那sig.delay(result)
就會變成:
現在,讓我們呼叫加法的那個任務add
,並使用帶有部份參數(partial arguments)的callback:
如預期那般,首先會啟動一個任務來計算2+2
,然後再另一個任務來計算4+8
New in version 3.0.
Overview
map
,但是會建立一個臨時任務(temporary task),其參數列表會應用於任務中。舉例來說,task.map([1, 2])
- 結果就是調用單個任務,將參數按序應用於任務函數,因此結果是:
map
完全一樣,就差個星號參數(*args
)。舉例來說,add.starmap([(2, 2), (4, 4)])
,結果就是調用單個任務:
基元(primitives)本身也是簽章(signature)物件,因此他們可以以各種方式結合來組成複雜的工作流程。
這裡有一些範例:
Simple chain
這是一個簡單的鏈結範例,第一個任務執行傳遞它的回傳值給鏈結中下一個任務,以此類推。
也可以利用管道(pipes)來寫:
Immutable signatures
簽章(signature)可以是部份函數(partial),因此參數可以加入到現在參數中,但你可能不總是希望這麼做,舉例來說,如果你不希望使用鏈結(chain)中上一個任務的回傳結果。
這種情況下,你可以讓簽章(signature)不可變(immutable),那參數就不會被改變:
也可以有另一種快捷方式可以使用,.si()
,而這也是建立簽章(signature)的首選方式:
現在你可以建立一連串互不相關的任務:
Simple group
你可以很輕易的建立一組平行執行任務:
Simple chord
chord primitive讓我們可以增加一個callback,當group中的所有任務執行完畢之後可以調用。這對簡易的平行計算問題(embarrassingly parallel)來說通常是需要的:
上面的範例建立10個平行啟動的任務,當所有任務完成之後,回傳值會合併為一個list,然後送到xsum task
。
chord的主體(body)本身可以是不變的(immutable),這樣group的回傳值(意指上面範例合併為list的回傳)就不會傳遞給callback:
注意到上面使用.si()
;這會產生一個不可變的簽章(signature),意思是你給它任何的參數(包含上一個任務的回傳值)都會被忽略。
Blow your mind by combining
chains也可以平行化:
這意味著你可以結合chains:
將一個group與另一個任務鏈結在一起會自動升級為一個chord:
groups與chords也接收部份參數(partial arguments),因此在chain中,前一個任務的回傳值會被發送到group中的所有任務:
如果你不想要發送參數到group,你可以讓簽章(signature)在group中不可變。
New in version 3.0.
任務可以被連結在一起:被連結的任務會在前一個任務成功回傳的時候被調用:
被連結的任務將帶有它的父任務的結果做為第一個參數。在上面的情況中,結果是64,它是因為mul(4, 16)
所導致。
任務的結果將跟踪原始任務調用的任何子任務,這可以從結果實例(result instance)存取:
這個結果實例(result instance)還有一個方法colletc(),它將結果視為一個圖(graph),讓你可以迭代這個結果:
預設情況下,如果圖(graph)沒有完全形成(其中一個任務尚未完成),那colletc()會拋出異常IncompleteStream,但是你也可以得到一個中間的表示:
你可以依你所需來連結任意數量的任務,而且簽章(signature)也是可以拿來連結的:
你也可以使用方法on_error
來增加一個error callbacks:
當簽章(signature)被執行的時候將導致下面.apply_async
調用:
worker實際上不會將errback做為一個任務來調用,而是直接呼叫這個errback function,這樣,原始的請求,異常與回溯物件就可以傳遞給它。
下面給出一個errback範例:
為了更簡單的將任務連結在一起,有一個特殊的簽章(signature)稱為chain,它讓你可以將任務鏈結在一起:
你去呼叫這個chain的話,它就會在當下的process中執行裡面的任務,然後回傳chain中最後一個任務的結果:
它還設有屬性parent
,因此你可以沿著chain來取得中間任務的結果:
當然,你也可以用符號|
(pipe)來操作鏈結:
此外,你可以將結果圖(result graph)視為DependencyGraph:
甚至還可以將這些圖轉換為dot
格式:
然後建立照片:
New in version 3.0.
group可以用來平行執行多個任務。
函數group會帶著簽章的清單(a list of signatures):
如果你呼叫該群組,任務會在當前的process一個接一個執行,並且回傳GroupResult,這可以用來追蹤結果,或是告知那些任務已經完成,等等。
group也支援迭代:
group是一種簽章(signature)物件,因此它可以跟其它簽章(signature)結合應用。
groups也可以跟callback、errback兩種簽章(signature)結合使用,不過阿,因為groups並不是真正的任務,它只是把連結的任務向下傳遞它所封裝的簽章(signature),所以有時候得到的結果會讓你嚇一跳。這意味著group的回傳值並不會像chain一樣傳給連結的回呼的簽章(callback signature)。如下範例,它那樣使用add(a, b)
是錯的,因為連結的add.s()
並不會如預期一般的接收group最終的結果。
注意到,前面兩個任務最終還是會回傳結果,只是啊,那個回呼的簽章(callback signature)會在背景執行,然後拋出異常,因為它並不會接收到預期的那兩個參數。
group errbacks也被傳遞給封裝的簽章(signature),這樣的作法就可以達到一次的連結,群組中多個任務的失敗都可以調用到那個errback。如下範例,群組中那幾個會拋出異常的任務fail()
都會呼叫一次簽章(signature)log_error()
考慮到這一點,我們通常建議用那些能夠接受重覆呼叫像是冪等(idempotent)或是計數類型的任務來做為errbacks。
某些支援後端實現(backend implementations)的的chord class可以更好的處理這類問題。
群組任務也會回傳一個特別的結果,就像是普通任務的結果那樣,只是說,它是整個群組任務的結果:
GroupResult帶有一個AsyncReslut list的清單,對它們的操作就像是對單一任務的操作那樣。
它支援下列操作:
New in version 2.3.
Note:
在chord中使用的任務是不能忽略結果的。如果你的chord中的任一個任務的result backend是disabled的,那你應該閱讀"Important Notes"。另外要注意的是,chords目前並不支援RPC類型的result backend。
chord是一種只會在group內所有的任務都執行完成之後才會執行的任務。
讓我們來計算一下總計的表達式,1+1+2+2+3+3....n+n
一路加到100。
首先,你需要兩個任務,add()
與tsum()
(sum()已經是一個標準函數):
現在你可以用chord來平行計算每一個加法步驟,然後取得加總的數值:
這很明顯的是非常刻意的範例,消息傳遞和同步的成本開銷,使其速度比對應的Python慢得多:
同步的成本所費不貲,你應該盡可能的避免使用chord。不過,chord是工具箱中非常強大的基元(primitive),因為同步是許多平行演算法的必要步驟。
讓我們拆解chord的表達式:
記得,只有在header內所有任務都回傳之後才會執行callback。header內的每一個步驟都被視為任務來執行,記得,是平行執行(in parallel),所以可能會在不同的節點上執行。然後再利用header內的每一個任務的回傳值來執行callback。chord()
回傳的task id
就是callback的id,因此你可以等待它完成,並且得到最終的回傳值。(但是記得,never have a task wait for other tasks)
當一個任務拋出異常的時候會發生什麼事?
chord的callback結果會轉換為失敗狀態,錯誤被設置為異常ChordError:
也許追溯會有所不同,這取決於你使用的result backend,不過你可以看到異常的說明包含失敗的task id
與原始異常的字串表示。你還可以在result.traceback
發現原始追溯。
注意,其餘的任務還是會繼續執行,因此即使中間的任務執行失敗,第三個任務,task(add.s(8, 8))
依然會繼續執行。還有,ChordError只會顯示第一個失敗的任務(及時):它並不會按著header group
的順序。
如果要在chord執行失敗的時候執行一個動作,你可以在chord的callback加入errback:
chords也許會有callback與errback兩種簽章(signatures)連結著一起用,某種角度來說,這也解決了將簽章(signatures)連結到group的一些問題。當你這麼做的時候,你會把所提供的簽章(signature)連結到chord的主體(body),你可以預期的到,在主體(body)完成之後可以一個漂亮的轉身的呼叫一次這個callbacks,或者是當chord的header或body失敗的必的調用一次errbacks。
在chord中使用的任務是不能忽略它們的結果。實務上,這意味著為了使用chord你必需啟用result_backend
。此外,如果你配置中的task_ignore_result
設置為True,那就必需確保chord中的每一個任務都設置ignore_result=False
。這適用於Task subclasses
與decorated tasks
。
範例-task subcalsses:
範例-decorated tasks:
預設情況下,同步的步驟是透過每一秒鐘讓重複任務(recurring task)輪詢一次來完成的,並在準備就緒時調用簽章(signature)來實現。
實作範例:
除了Redis與Memcached之外的result backend都使用這個方式:它們在header中的每一個任務之後增加一個計數器,然後在計數器超過集合中的任務數量的時候執行callback。
Redis與Memcached實現了更好的解決方案,但在其它的backends上並不好實作。
Note:
Chord不支援Redis-2.2之前的版本;你必需更新redis-server至少2.2之後版本才能使用。
Note:
如果你正在使用chord,且搭配的result backend是Redis的話,並且你還覆寫Task.after_return()
這個方法,那你應該確認有調用父類方法(用super()
),否則chord的callback不會被執行。
map
與starmap
是內建的任務類型,它會對序列中的每一個元素調用所提供的任務。
他們與group的不同在於:
使用map的範例如下:
與下面任務的執行結果一樣:
使用starmap:
與下面任務的執行結果一樣:
map與starmap都是簽章(signature)物件,因此他們可以結合其它的簽章(signature)一起使用,舉例來說,在十秒之後調用starmap:
chunking讓你將可迭代的工作分割,因此如果你有一百萬個物件,你可以建立十個各自帶有十萬個物件的任務。
有些人可能會擔心你這樣切任務會導致平行性降低,但是這對忙碌的集群來說是很少見的,實際上,因為你避免掉了訊息派送的開銷,它可能會因此提高效能。
你可以使用app.Task.chunks()建立一個chunks signature:
就跟group一樣,當你呼叫chunk的時候,chunks派送訊息的動作會在當前的process發生:
呼叫.apply_async
會建立一個專屬的任務,以便在worker中執行各別的任務:
你也可以將chunks轉為group:
使用group.skew
,每一個任務的倒數以一為單位增加:
這意味著第一個任務會倒數一秒,第二個會倒數兩秒,以此類推。
20190725_依據4.3版本說明調整
20220105_依據5.2版本說明調整