# Celery_Next Steps ###### tags: `celery` `celery 5.2` `python` [Celery_Next Steps](https://docs.celeryproject.org/en/stable/getting-started/next-steps.html) ## Using Celery in your Application ### Our Project Project layout: ``` proj/__init__.py /celery.py /tasks.py ``` #### proj/celery.py ```python= from celery import Celery app = Celery('proj', broker='amqp://', backend='rpc://', include=['proj.tasks']) # Optional configuration, see the application user guide. # 設置result的存活時間為3600秒 app.conf.update( result_expires=3600, ) if __name__ == '__main__': app.start() ``` 在這個模組中我們建立了一個[Celery](https://docs.celeryproject.org/en/stable/reference/celery.html#celery.Celery)的實例(有時候稱為app)。想要在專案中使用這個Celery實例,你就只需要把它import進去就可以。 * 參數`broker`指定要使用的broker的URL * 更多可參考[Choosing a Broker](https://hackmd.io/@shaoeChen/BJ2tEIjtt#Choosing-a-Broker) * 參數`backend`指定要使用的result backend * 這用來追蹤任務狀態與結果(results)。雖然預設情況下result的設置是disable的,不過這邊還是會使用RPF來做為result backend,因為後續我們要來說明檢索結果的作業方式。也許你會想要在你的應用程式中使用不同的backend。不過,不同的backend有不同的優缺點。如果你不需要results,那就最好把result disable掉。當然,之前說過,我們可以利用`@task(ignore_result=True)`來對個別的任務做result是否disable的設置。 * 更多可參考[Keeping Results](https://hackmd.io/@shaoeChen/BJ2tEIjtt#Keeping-Results) * 參數`include`是worker起動的時候要導入的模組清單。如果有多的任務,那就要在這邊加入任務模組,這樣workers就能夠找到我們的任務 #### proj/tasks.py ```python= from .celery import app @app.task def add(x, y): return x + y @app.task def mul(x, y): return x * y @app.task def xsum(numbers): return sum(numbers) ``` ### Starting the worker `celery`可以用來啟動worker,只是你需要在proj上的目錄中執行worker: ```shell= $ celery -A proj worker -l INFO ``` worker啟動的時候你應該會看到下面的執行畫面: ```shell --------------- celery@halcyon.local v4.0 (latentcall) --- ***** ----- -- ******* ---- [Configuration] - *** --- * --- . broker: amqp://guest@localhost:5672// - ** ---------- . app: __main__:0x1012d8590 - ** ---------- . concurrency: 8 (processes) - ** ---------- . events: OFF (enable -E to monitor this worker) - ** ---------- - *** --- * --- [Queues] -- ******* ---- . celery: exchange:celery(direct) binding:celery --- ***** ----- [2012-06-08 16:23:51,078: WARNING/MainProcess] celery@halcyon.local has started. ``` \- `broker`就是你在celery模組中那個`broker`參數所指定的URL。你也可以在命令行中使用**-b** 來指定不同的broker。 \- `concurrency`是用於設置並行處理任務的prefork worker process的數量。所設置數量的worker都很忙的時候,新的任務就必需要等到舊任務完成之後才能接續著被處理。 預設的數量為執行機器上的CPU的數量(包含核心數)。你可以利用[celery worker -c](https://docs.celeryproject.org/en/stable/reference/cli.html#cmdoption-celery-worker-c)來指定數量。沒有建議的設置,因為最佳數量取決於很多因素,不過如果你的任務主要受限於I/O的話,那可以試著增加數量。實驗證明,數量大到CPU數的兩倍以上不止沒有效,還會降低效能。 包含預設的prefork pool,Celery還支援使用Eventlet、Gevent、與single thread執行(見Concurrency[) ](https://docs.celeryproject.org/en/stable/userguide/concurrency/index.html#concurrency) \- Events是一個可選項目,它讓Celery為worker中正發生的事情派送監控訊息(事件)。也有監控程式,像是celery events與Flower,即時的Celery監控器,更多可參考[Monitoring and Management guide](https://docs.celeryproject.org/en/stable/userguide/monitoring.html#guide-monitoring)。 -\ Queue是workers會從中取用任務的佇列清單。worker會被告知一次從多個佇列中取用任務,這用於將訊息路由到指定的worker,以做為服務品質、分離關注點、排列優先序的一種手段,更多可參考[Routing Guide](https://docs.celeryproject.org/en/stable/userguide/routing.html#guide-routing)。 你可以在command-line加入參數`--help`來取得完整的清單: ```shell= $ celery worker --help ``` 更多細節可參考[Workers Guide](https://docs.celeryproject.org/en/stable/userguide/workers.html#guide-workers)。 ### Stopping the worker 要停止worker就只需要按一下Control-c。更多關於worker支援的信號清單可參考[Workers Guide](https://docs.celeryproject.org/en/stable/userguide/workers.html#guide-workers) ### In the background 在生產環境中,你會想要在背景執行worker,相關細節在[daemonziation tutorial](https://docs.celeryproject.org/en/stable/userguide/daemonizing.html#daemonizing)有說明。 這種常駐程式的腳本(daemonization scripts)使用命令`celery multi`在背景啟動一個或多個workers: ```shell= $ celery multi start w1 -A proj -l INFO celery multi v4.0.0 (latentcall) > Starting nodes... > w1.halcyon.local: OK ``` 你也可以重新啟動它: ```shell= $ celery multi restart w1 -A proj -l INFO celery multi v4.0.0 (latentcall) > Stopping nodes... > w1.halcyon.local: TERM -> 64024 > Waiting for 1 node..... > w1.halcyon.local: OK > Restarting node w1.halcyon.local: OK celery multi v4.0.0 (latentcall) > Stopping nodes... > w1.halcyon.local: TERM -> 64052 ``` 或停止它: ```shell= $ celery multi stop w1 -A proj -l INFO ``` 停止的命令(stop)非同步的(asynchronous),因此它並不會等待worker關閉(shutdown)。如果你想要確保在退出之前所有當前正在執行的任務都已完成,那你可能要改用`stopwait`: ```shell= $ celery multi stopwait w1 -A proj -l INFO ``` :::warning Note: `celery multi`並不會保存關於workers的信息,所以你在重新啟動的時候需要使用相同的命令參數。在停止的時候只能使用相同的pidfile與logfile參數。 ::: 預設情況下,它會在當前的目錄中建立pid與log files。為了預防多個workers打架,我們鼓勵你把worker放專用目錄下: ```shell= $ mkdir -p /var/run/celery $ mkdir -p /var/log/celery $ celery multi start w1 -A proj -l INFO --pidfile=/var/run/celery/%n.pid \ --logfile=/var/log/celery/%n%I.log ``` 使用`multi`這個命令,你可以啟動多個workers,還有一個強大的命令語法可以為不同的worker指定參數,舉例來說: ```shell= $ celery multi start 10 -A proj -l INFO -Q:1-3 images,video -Q:4,5 data \ -Q default -L:4,5 debug ``` 更多範例可參考API reference中的[multi](https://docs.celeryproject.org/en/stable/reference/celery.bin.multi.html#module-celery.bin.multi)模組說明。 ### About the [--app](https://docs.celeryproject.org/en/stable/reference/cli.html#cmdoption-celery-A) argument [--app](https://docs.celeryproject.org/en/stable/reference/cli.html#cmdoption-celery-A)這個參數以module.path:attribute的形式來指定要使用的Celery app instance。 不過它也支援捷徑的形式。如果我們只指定一個package的名稱,它會試著去按下面的順序來尋找app instance: With [--app=proj:](https://docs.celeryproject.org/en/stable/reference/cli.html#cmdoption-celery-A) 1. 名為`proj.app`的屬性名稱,或 2. 名為`proj.celery`的屬性名稱,或 3. 模組`proj`中的任意屬性,其值為Celery application,或 如果上面都沒有找到,那就會去試著找一下名為`proj.celery`的子模組: 4. 名為`proj.celery.app`的屬性名稱,或 5. 名為`proj.celery.celery`的屬性名稱,或 6. 模組`proj.celery`中的任意屬性,其值為Celery application 這個方案模仿了文檔中所使用的實踐,也就是用於包含單一模組的`proj:app`,以及更大型專案的`proj.celery:app` ## Calling Tasks 你可以使用`delay()`來呼叫一個任務: ```python= >>> from proj.tasks import add >>> add.delay(2, 2) ``` 這種呼叫任務的方式是另一個名為`apply_async()`的star-argument shortcut(星狀參數?): ```python= >>> add.apply_async((2, 2)) ``` 採用`apply_async()`你可以指定執行選項(options),像是執行時間、要送達的佇列名稱等等: ```python= >>> add.apply_async((2, 2), queue='lopri', countdown=10) ``` 上面的範例中,任務會被派送到一個名為`lopri`的佇列,然後任務會在派送到worker的10秒後執行。 直接執行任務就只會在當前的process中執行任務,並不會派送訊息: ```python= >>> add(2, 2) 4 ``` `delay(), apply_async()`跟執行`(__call__)`,這三個方法構成Celery調用的接口(calling API),也用於singatures(簽名、簽章)。 更多關於API的部份可參考[Calling User Guide](https://docs.celeryproject.org/en/stable/userguide/calling.html#guide-calling)。 每個任務的調用都會被賦武一個唯一的識別符(UUID),這就是任務的id,task id。 `delay`與`apply_async`都會回傳一個[AsyncResult](https://docs.celeryproject.org/en/stable/reference/celery.result.html#celery.result.AsyncResult) instance,這可以用來追蹤任務的執行狀態。不過你要記得啟用[result backend](https://docs.celeryproject.org/en/stable/userguide/tasks.html#task-result-backends)才有機會去保存任務狀態。 預設會disable result backend是因為,我們並沒有適合每一個應用程式的result backend;你要自己考慮每個result backend的優缺點。對於多數的任務來說,去保留它的回傳值也不知道要幹嘛,所以預設disable result backend是好的。也要注意到,result backend並不是用來監控任務並是workers:如果你想監控,那請用專用工具(參考[Monitoring and Management Guide](https://docs.celeryproject.org/en/stable/userguide/monitoring.html#guide-monitoring)) 如果你有設置result backend,那你就可以檢索任務的回傳值: ```python= >>> res = add.delay(2, 2) >>> res.get(timeout=1) 4 ``` 你可以利用`id`來找到task id: ```python= >>> res.id d6b3aea2-fb9b-4ebc-8da4-848818db9114 ``` 如果任務拋出異常,你也可以檢查異常與回溯,事實上,`result.get()`預設就會傳播所有的錯誤: ```python= >>> res = add.delay(2, '2') >>> res.get(timeout=1) ``` ```python Traceback (most recent call last): File "<stdin>", line 1, in <module> File "celery/result.py", line 221, in get return self.backend.wait_for_pending( File "celery/backends/asynchronous.py", line 195, in wait_for_pending return result.maybe_throw(callback=callback, propagate=propagate) File "celery/result.py", line 333, in maybe_throw self.throw(value, self._to_remote_traceback(tb)) File "celery/result.py", line 326, in throw self.on_ready.throw(*args, **kwargs) File "vine/promises.py", line 244, in throw reraise(type(exc), exc, tb) File "vine/five.py", line 195, in reraise raise value TypeError: unsupported operand type(s) for +: 'int' and 'str' ``` 如果你並不希望這些錯誤訊息也跟著傳播的話,那你可以利用參數`propagate`來disable: ```python= >>> res.get(propagate=False) TypeError("unsupported operand type(s) for +: 'int' and 'str'") ``` 這種情況下,它回傳的就是引發異常的實例,所以阿,要檢查這個任務是成功還是失敗,你就要在result instance上用相對應的方法: ```python= >>> res.failed() True >>> res.successful() False ``` 那,它是怎麼知道任務是成功還是失敗呢?這可以透過查詢任務狀態來確認: ```python= >>> res.state 'FAILURE' ``` 同一時間一個任務只會處於一個狀態,不過它是會有很多狀態變化的。常見的任務階段如下: ``` PENDING -> STARTED -> SUCCESS ``` 起始狀態是一個特殊的狀態,只有在enable [task_track_started](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-task_track_started)這個設置或者是你為單一任務設置 `@task(track_started=True)`選項的時候才會記錄。 pending這個狀態實際上並不是狀態的記錄,而是所有未知的task id的預設狀態:你可以從下面範例看的出來: ```python= >>> from proj.celery import app >>> res = app.AsyncResult('this-id-does-not-exist') >>> res.state 'PENDING' ``` 如果你的任務重新再來一次,那任務階段就會變的更複雜。為了說明這點,我們說,有個任務我們重新跑兩次,那它的階段變化就會是: ``` PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS ``` 更多關於任務狀態的細節,你可以參考[Tasks的States](https://docs.celeryproject.org/en/stable/userguide/tasks.html#task-states)說明。 更多呼叫任務的細節說明請見[Calling Guide](https://docs.celeryproject.org/en/stable/userguide/calling.html#guide-calling)。 ## Canvas: Designing Work-flows 你剛才已經學會怎麼用`delay`這個方法來呼叫一個任務,這通常就是你需要的。不過有時候你會希望可以把任務調用的簽章(signature)傳遞給另一個process,或是做為參數傳遞給另一個函數,要滿足這些情況,在Celery所使用的就稱之為signatures(簽章)。 signature包裝單一任務調用的參數與執行選項,以此傳遞給函數,甚至可以序列化然後通過網路派送。 你可以用參數`(2,2)`跟`coutdown=10`來幫`add`這個任務加入簽章: ```python= >>> add.signature((2, 2), countdown=10) tasks.add(2, 2) ``` 另外有一個star arguments的快捷方式: ```python= >>> add.s(2, 2) tasks.add(2, 2) ``` ### And there’s that calling API again… signature instances也支援API,這意思就是說,它們也有`delay`與`apply_async`這些方法可以用。 不過這有一點不同,在signature可能已經指定參數簽章(argument signature)。`add`這個任務帶有兩個參數,因此,指定兩個參數的signature就可以購構一個完整的signature: ```python= >>> s1 = add.s(2, 2) >>> res = s1.delay() >>> res.get() 4 ``` 不過,你也可以弄一個不完整的signature,這樣子的方式稱為partials: ```python= # incomplete partial: add(?, 2) >>> s2 = add.s(2) ``` 上面範例看的到,`s2`現在是一個partial signature,還需要另一個參數才會完成,你可以在呼叫這個signature的時候解決這問題: ```python= # resolves the partial: add(8, 2) >>> res = s2.delay(8) >>> res.get() 10 ``` 這邊看的到,你把參數`8`加入現在的參數`2`之前形成`add(8, 2)`,完整的signature。關鍵字參數(keyword arguments)也可以在後面再加入;然後把後面加入的跟前面的做合併,有重複的話就後面的為主: ```python= >>> s3 = add.s(2, 2, debug=True) >>> s3.delay(debug=False) # debug is now False. ``` 一如先前所說,signatures也支援API:這意味著 * `sig.apply_async(args=(), kwargs={}, **options)` * 使用可選的partial arguments與partial keyword arguments來呼叫signature。也支援partial execution options * `sig.delay(*args, **kwargs)` * `apply_async` star argument的版本。任何的參數都會被丟到signature中的參數之前,然後關鍵字參數(keyword argumetns)就會跟現有已存的合併 這看起來就是非常實用,不過你實際上可以用這些來做點什麼呢?為此,我們必需要引入canvas primitives ### The Primitives :::warning * [group](https://docs.celeryproject.org/en/stable/userguide/canvas.html#canvas-group) * [map](https://docs.celeryproject.org/en/stable/userguide/canvas.html#canvas-map) * [chain](https://docs.celeryproject.org/en/stable/userguide/canvas.html#canvas-chain) * [startmap](https://docs.celeryproject.org/en/stable/userguide/canvas.html#canvas-map) * [chord](https://docs.celeryproject.org/en/stable/userguide/canvas.html#canvas-chord) * [chunks](https://docs.celeryproject.org/en/stable/userguide/canvas.html#canvas-chunks) ::: 這些primitives本身就是一個signature objects,因此它們可以用多種方式來結合複雜的工作流。 :::warning Note: 這些範例要檢索results,所以如果你想試試,那你就要配置一個result backend。上面的範例已經有配置了(見[Celery](https://docs.celeryproject.org/en/stable/reference/celery.html#celery.Celery)) ::: 下面來看幾個範例吧。 #### Gruops [group](https://docs.celeryproject.org/en/stable/reference/celery.html#celery.group)會平行的呼叫一個任務清單(list of tasks),它會回傳一個特別的result instance給你做為你檢查這個group的result,並且依序檢索其回傳值。 ```python= >>> from celery import group >>> from proj.tasks import add >>> group(add.s(i, i) for i in range(10))().get() [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] ``` * Partial group ```python= >>> g = group(add.s(i) for i in range(10)) >>> g(10).get() [10, 11, 12, 13, 14, 15, 16, 17, 18, 19] ``` #### Chains 任務可以鏈結在一起,這樣你就可以在一個任務回傳之後再呼叫另一個: ```python= >>> from celery import chain >>> from proj.tasks import add, mul # (4 + 4) * 8 >>> chain(add.s(4, 4) | mul.s(8))().get() 64 ``` 或者做一個partial chain: ```python= >>> # (? + 4) * 8 >>> g = chain(add.s(4) | mul.s(8)) >>> g(4).get() 64 ``` Chains也可以寫成這樣: ```python= >>> (add.s(4, 4) | mul.s(8))().get() 64 ``` #### Chords chord是帶有callback的group: ```python= >>> from celery import chord >>> from proj.tasks import add, xsum >>> chord((add.s(i, i) for i in range(10)), xsum.s())().get() 90 ``` group鏈結到另一個任務的話,就會自動轉為chord: ```python= >>> (group(add.s(i, i) for i in range(10)) | xsum.s())().get() 90 ``` 因為這些primitives都是signature type,所以它們可以根據需求做任何你想要做的組合,就像是: ```python= >>> upload_document.s(file) | group(apply_filter.s() for filter in filters) ``` 更多資訊請參考[Canvas user guide](https://docs.celeryproject.org/en/stable/userguide/canvas.html#guide-canvas)。 ## Routing Celery支援AMQP所提供的所有路由的工具,也支援將訊息派送到named queues的簡單路由。 [task_routes](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-task_routes)的設置讓你可以按著名稱來路由任務,並且將所有的東西都集中在一個位置: ```python= app.conf.update( task_routes = { 'proj.tasks.add': {'queue': 'hipri'}, }, ) ``` 你也可以在執行的時候指定參數`queue`給`apply_async`: ```python= >>> from proj.tasks import add >>> add.apply_async((2, 2), queue='hipri') ``` 然後,你就可以利用指定`celery worker -Q`的選項讓worker從這個佇列中取用任務: ```shell= $ celery -A proj worker -Q hipri ``` 你可以用逗號來指定多個佇列。舉例來說,你可以worker從預設的佇列跟hipri這個來取得任務,另外,預設的佇列名稱為celery,這有它的歷史意義在: ```shell= $ celery -A proj worker -Q hipri,celery ``` 佇列的順序不重要,因為worker會給它們相同的權重。 更多關於路由、AMQP routing的部份請參考[Routing Guide](https://docs.celeryproject.org/en/stable/userguide/routing.html#guide-routing)。 ## Remote Control 如果你使用的是RabbitMQ(AMQP)、 Redis、或是Qpid做為broker,那麼你就可以在執行的時候控制、檢查worker。 舉例來說,你可以查看worker當下正在執行那一個任務: ```shell= $ celery -A proj inspect active ``` 這是透過使用廣播訊息(broadcast messaging)來實現的,所以集群中的每一個worker都會收到遠端控制命令。 你當然可以用`--destination`這個選項來指定一個或是多個workers。下面給出以逗號分隔的worker host names的清單: ```shell= $ celery -A proj inspect active --destination=celery@example.com ``` 如果你沒有提供`destination`,那每一個worker就通通會回復這個request。 `celery inspect`這個命令包含不會改變worker中任何事情的命令;它只會回傳關於worker內部發生的事情的信息與統計分析資訊。對於可執行的inspect,你可以執行: ```shell= $ celery -A proj inspect --help ``` 然後,也有一個`celery control`,這就真的會去改變worker: ```shell= $ celery -A proj control --help ``` 舉例來說,你可以強制worker啟用事件訊息(用來監控任務與workers): ```shell= $ celery -A proj control enable_events ``` 當事件(events)被啟用之後,你就可以啟動事件轉儲(event dumper)來看看workers正在做什麼: ```shell= $ celery -A proj events --dump ``` 或者你可以啟動curses interface(游標介面?): ```shell= $ celery -A proj events ``` 看完你想看的東西之後,你可以再次的disable events: ```shell= $ celery -A proj control disable_events ``` `celery status`還使用遠端控制命令,並顯示集群中在線的workers的清單: ```shell= $ celery -A proj status ``` 關於`celery`命令與監控更多的資訊,你可以參考[Monitoring Guide](https://docs.celeryproject.org/en/stable/userguide/monitoring.html#guide-monitoring)。 ## Timezone 內部與訊息中的所有時間、日期都是使用UTC時區。 當worker接收到一個訊息的時候,假設我們設置了countdown(倒數計時器),它會將UTC time轉為local time。如果你希望使用跟系統時區不一樣的時區,那麼你必需要用`timezone`在組態配置上做設置: ```python= app.conf.timezone = 'Europe/London' ``` ## Optimization 預設的組態配置並未對吞吐量做任何的優化。預設情況下,它是比較中間,不多也不少的方式來設置,這是吞吐量與公平調度(fair scheduling)之間的一個折衷。 如果你有很嚴格的公平調度需求,或者你希望可以最佳化吞吐量,那你應該看看[Optimizing Guide](https://docs.celeryproject.org/en/stable/userguide/optimizing.html#guide-optimizing)。 ## What to do now? 現在,你應該試著繼續讀讀[User Guide](https://docs.celeryproject.org/en/stable/userguide/index.html#guide)。 如果你願意的話,也可以看一下[API reference](https://docs.celeryproject.org/en/stable/reference/index.html#apiref)。 ## History 20211206_依據5.2版本說明調整