# Celery_Workers Guide ###### tags: `celery` `celery 5.2` `python` [官方連結_Workers Guide](https://docs.celeryproject.org/en/master/userguide/workers.html) ## Workers Guide ### Starting the worker 你可以透過執行下面命令來啟動worker: ```shell $ celery -A proj worker -l info ``` :::info **Daemonizing** 你可以會希望可以使用daemonization tool(常駐程式的工具?)在背景啟動worker。關於使用服務管理器將worker做為常駐程式來改動部份,可以參考[Daemonization](https://docs.celeryproject.org/en/master/userguide/daemonizing.html#daemonizing)。 ::: 關於command-line所有可用選項的完整清單,請參閱[worker](https://docs.celeryproject.org/en/master/reference/celery.bin.worker.html#module-celery.bin.worker),或這麼做: ```shell $ celery worker --help ``` 你可以在同一台機器上面啟動多個workers,但請務必使用參數[`--hostname`](https://docs.celeryproject.org/en/master/reference/cli.html#cmdoption-celery-worker-n)來為每個worker指定節點名稱: ```shell $ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1@%h $ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker2@%h $ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker3@%h ``` 參數`hostname`可以擴展下面變數: * `%h`:主機名稱,包含網域名稱 * `%n`:單純主機名稱 * `%d`:單純網域名稱 如果當前的主機名稱是`george.example.com`,這會擴展為: | Variable | Template | Result | |----------|------------|----------------------------| | %h | worker1@%h | worker1@george.example.com | | %n | worker1@%n | worker1@george | | %d | worker1@%d | worker1@example.com | :::info **Note for [supervisor](https://pypi.org/project/supervisor/) users:** 對使用supervisor管理背景執行的使用者,必須使用第二個%來做轉譯:%%h ::: ### Stopping the worker 你應該使用**TERM** signal來完成關閉worker。 當你啟動關閉的時候,worker在實際終止之前會把所有當前執行的任務完成。如果這些任務是重要的,在你做一些激烈操作之前你應該等待它完成,像是送出**KILL** signal。 如果worker在等待一段時間之後還是不會關閉,像是卡在無限迴圈或類似情況的話,你可以使用**KILL** signal來強制終止worker:但是,請注意,當前執行的任務會因此而遺失(除非你的任務有設置[`acks_late`](https://docs.celeryproject.org/en/master/reference/celery.app.task.html#celery.app.task.Task.acks_late))。 此外,由於processes無法覆蓋**KILL** signal, 因此worker無法取得它的children processes;一定要手動執行。下面命令通常可以解決這個問題: ```shell $ pkill -9 -f 'celery worker' ``` 如果你的系統沒有**pkill**,那可以使用較長一點的版本: ```shell $ ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill -9 ``` version 5.2變更:在Linux systems上,Celery現在已經支援在worker終止之後,發送**KILL** signal給所有的child processes。這可以透過prctl(2)的選項PR_SET_PDEATHSIG來完成。 ### Restarting the worker 如果你想要重新啟動worker,那就應該給它一個**TERM** signal,並啟動一個新的實例。管理worker發展最簡單的方法就是使用`celery multi`: ```shell $ celery multi start 1 -A proj -l info -c4 --pidfile=/var/run/celery/%n.pid $ celery multi restart 1 --pidfile=/var/run/celery/%n.pid ``` 對生產佈署而言,你應該使用init-scripts或是process supervision(進程監控)系統(參考[Daemonization](https://docs.celeryproject.org/en/master/userguide/daemonizing.html#daemonizing)) 除了停止,然後讓worker重新啟動,你也可以使用**HUP** signal來重新啟動worker。要注意,worker將負責重新啟動本身,這很容易出現問題,我們並不建議在生產環境這麼用: ```shell $ kill -HUP $pid ``` :::info **Note:** 只有在worker是背景執行的常駐程式的時候才使用**HUP**來重新啟動worker(它沒有控制終端)。 由於平台的限制,maxOS上禁止使用**UPH**。 ::: ### Process Signals worker的主要過程會覆蓋下面的信號(signals): | TERM | Warm shutdown, wait for tasks to complete. | |------|--------------------------------------------| | QUIT | Cold shutdown, terminate ASAP | | USR1 | Dump traceback for all active threads. | | USR2 | Remote debug, see [celery.contrib.rdb](https://docs.celeryproject.org/en/master/reference/celery.contrib.rdb.html#module-celery.contrib.rdb) | ### Variables in file paths 檔案路徑的參數,像是[--logfile](https://docs.celeryproject.org/en/master/reference/cli.html#cmdoption-celery-worker-f), [--pidfile](https://docs.celeryproject.org/en/master/reference/cli.html#cmdoption-celery-worker-pidfile), [--statedb](https://docs.celeryproject.org/en/master/reference/cli.html#cmdoption-celery-worker-S)可以包含worker要擴展的變數: #### Node name replacements * %p: 完整的節點名稱 * %h: 主機名稱,包含網域名稱 * %n: 單純主機名稱 * %d: 單純網域名稱 * %i: Prefork pool process index or 0 if MainProcess. * %I: Prefork pool process index with separator. 舉例來說,如果當前的主機名稱是`george@foo.example.com`,那它會擴展為: * --logfile=%p.log -> george@foo.example.com.log * --logfile=%h.log -> foo.example.com.log * --logfile=%n.log -> george.log * --logfile=%d.log -> example.com.log #### Prefork pool process index `prefork pool process index specifier`會根據最終需要打開文件的process來擴展為不同的檔案名稱。 這可以用來為每個child process指定一個日誌文件(log file)。 注意到,即使processes退出,或者是使用`autoscale/maxtasksperchild/time limits`,這數值仍然保持在process的界限內。也就是說,這數值是*process index*,並不是process count,也不是pid。 * %i - Pool process index or 0 if MainProcess. * `-n worker1@example.com -c2 -f %n-%i.log`會產生三個log file: * worker1-0.log (main process) * worker1-1.log (pool process 1) * worker1-2.log (pool process 2) * %I - Pool process index with separator. * `n worker1@example.com -c2 -f %n%I.log`會產生三個log file: * worker1.log (main process) * worker1-1.log (pool process 1) * worker1-2.log (pool process 2) ### Concurrency 預設情況下,[多元處理](http://terms.naer.edu.tw/detail/151347/)(multiprocessing)用來處理任務的同步執行,不過你也可以使用[Eventlet](https://docs.celeryproject.org/en/master/userguide/concurrency/eventlet.html#concurrency-eventlet)。worker的processes/threads的數量可以使用參數[--concurrency](https://docs.celeryproject.org/en/master/reference/cli.html#cmdoption-celery-worker-c)來調整,預設為機器上可用的CPU的數量。 #### Number of processes (multiprocessing/prefork pool): 愈多的pool processes通常愈好,不過還是會有一個截止點,過了這個截止點繼續增加更多的pool processes就只會影響效能。甚至會有一些證據顯示,多個worker instance執行,也許效能會比單個worker還要好。舉例,3個workers,每個worker有10個pool processes。你需要經過測試來找到適合你的worker數量,因為這取決你的應用場景,工作負載,任務執行次數,與其它因素。 ### Remote control *New in version 2.0.* **pool support:** prefork, eventlet, gevent, blocking:solo (see note) **broker support:** amqp, redis :::info **The celery command** celery program是用來從command-line做遠端控制命令。它支援下面列出的所有的命令。更多信息請參閱[Management Command-line Utilities(inspect/control)](https://docs.celeryproject.org/en/master/userguide/monitoring.html#monitoring-control) ::: 你可以使用高優先廣播訊息佇列(high-priority broadcast message queue)對workers做遠端控制。命令可以直接下給所有的workers或清單內的特定workers。 命令也可以有回應。Client可以等待並且收集這些回應。因為沒有一個中央機構可以知道你的cluster內有多少的workers可供應用,所以沒有辦法估計有多少workers可以發送回應,因此Client可以設置timeout - 回應到達的截止時間(以秒為單位)。預設timeout為一秒。如果worker在截止時間前沒有回應,那並不代表它們沒有回應,或許更糟的是它們掛了,或者可能只是因為網路延遲或worker正在處理命令而慢了,所以就看情況調整timeout吧。 除了timeout,Client還可以指定最大等待回應數。如果有指定目標,則將此限制設置為目標主機的數量。 :::info **Note:** solo pool支援遠端控制命令,但是任何執行中的任務都會阻斷任何等待控制的命令,因此如果worker非常繁忙的話,那它的用途是有限的。這種情況下,你應該增加Client等待回應的timeout。 ::: #### The [broadcast()](https://docs.celeryproject.org/en/master/reference/celery.app.control.html#celery.app.control.Control.broadcast) function 這是用來向worker發送命令的用戶端函數(client function)。部份遠端控制命令也提供背景執行的高階接口[broadcast()](https://docs.celeryproject.org/en/master/reference/celery.app.control.html#celery.app.control.Control.broadcast),像是[rate_limit](https://docs.celeryproject.org/en/master/reference/celery.app.control.html#celery.app.control.Control.rate_limit)與[ping](https://docs.celeryproject.org/en/master/reference/celery.app.control.html#celery.app.control.Control.ping)。 發送命令[rate_limit](https://docs.celeryproject.org/en/master/userguide/workers.html#std-control-rate_limit)與關鍵參數(keyword arguments): ```python >>> app.control.broadcast('rate_limit', ... arguments={'task_name': 'myapp.mytask', ... 'rate_limit': '200/m'}) ``` 這將非同步的發送命令,而且不需要等待回應。如果要求回應,你必須使用參數`reply`: ```python >>> app.control.broadcast('rate_limit', { ... 'task_name': 'myapp.mytask', 'rate_limit': '200/m'}, reply=True) [{'worker1.example.com': 'New rate limit set successfully'}, {'worker2.example.com': 'New rate limit set successfully'}, {'worker3.example.com': 'New rate limit set successfully'}] ``` 使用參數`destination`,你可以指定接收命令的worker list: ```python >>> app.control.broadcast('rate_limit', { ... 'task_name': 'myapp.mytask', ... 'rate_limit': '200/m'}, reply=True, ... destination=['worker1@example.com']) [{'worker1.example.com': 'New rate limit set successfully'}] ``` 當然,使用高階接口來設置速率限制更為方便,不過有些命令只能用[broadcast()](https://docs.celeryproject.org/en/master/reference/celery.app.control.html#celery.app.control.Control.broadcast)來請求。 ### Commands #### revoke: Revoking tasks **pool support:** all, terminate only supported by prefork **broker support:** `amqp`, `redis` **command:** `celery -A proj control revoke <task_id>` 所有的worker節點都會保留已撤銷的task ids,不論是記憶體內還是寫入磁碟的(見[Persistent revokes](https://docs.celeryproject.org/en/master/userguide/workers.html#worker-persistent-revokes))。 當worker接收到撤銷的請求,它將跳過執行該任務,但是它不會終止已經在執行的任務,除非有設置`terminate`這個選項。 :::info **Note:** 選項`terminate`是當任務卡住的時候管理人員最後的手段。這並不是用來終止任務,而是終止執行該任務的process,而且當終止信號傳到的時候,該process在當下有可能已經開始處理其它任務,基於這個原因,你永遠都不能在程式碼中調用它。 ::: 如果設置`terminate`,那處理該任務的child process將會被終止。預設傳遞訊號是`TERM`,但你可以用參數`signal`來指定。信號號可以是Python標準庫中[signal](https://docs.python.org/dev/library/signal.html#module-signal)模組中定義的任何信號的大寫名稱。 終止任務的同時也會撤銷它。 **Example** ```python >>> result.revoke() >>> AsyncResult(id).revoke() >>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed') >>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed', ... terminate=True) >>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed', ... terminate=True, signal='SIGKILL') ``` #### Revoking multiple tasks *New in version 3.1.* `revoke`也接受list格式的參數,它將一次撤銷多個任務。 **Example** ```python >>> app.control.revoke([ ... '7993b0aa-1f0b-4780-9af0-c47c0858b3f2', ... 'f565793e-b041-4b2b-9ca4-dca22762a55d', ... 'd9d35e03-2997-42d0-a13e-64a66b88a618', ]) ``` `GroupResult.revoke`在3.1版之後就開始利用這一點了。 #### Persistent revokes 撤銷任務的工作方式是向所有workers發送一個廣播訊息,然後workers會在記憶體中保留撤銷的任務清單。當一個worker啟動的時候,它會將撤銷的任務同步給cluster內的其它workers。 撤銷的任務清單保存在記憶體內,因此如果所有的workers都重新啟動,那記錄清單也會跟著消失。如果你想重新啟動之間保留這個清單,那你要利用參數`-statedb`來指定保存的檔案給celery worker知道: ```shell $ celery -A proj worker -l info --statedb=/var/run/celery/worker.state ``` 或者,如果你使用**celery multi**,你想要為每個worker instance都建立一個檔案,你可以`%n`來格式化擴展當前的節點名稱: ```shell celery multi start 2 -l info --statedb=/var/run/celery/%n.state ``` 參考[Variables in file paths](https://docs.celeryproject.org/en/master/userguide/workers.html#worker-files) 注意,遠端控制命令必須能夠正常作業,撤銷才能作業。這情況下,僅RabbitMQ(amqp)與Redis支援遠端控制命令。 ### Time Limits *New in version 2.0.* pool support: `prefork/gevent (see note below)` :::info **Soft, or hard?** time limit,時間限制的設置有兩個值,也就是soft與hard。soft time limit允許任務在被清掉之前拋出例外,而hard timeout並不允許,而且會強制任務終止。 ::: 單個任務可能會永久的執行,如果你有很多任務在等待永遠不會發生的某些事件,那你的worker會一輩子無法處理新任務。要預防這種劇情發生的最好方法就是啟用時間限制(time limit) 時間限制意指任務在process被終止並且替換之前的可執行最大秒數。你也可以啟用soft time limit,它會拋出例外讓任務可以在hard time limit殺掉它之前來處理它: ```python from myapp import app from celery.exceptions import SoftTimeLimitExceeded @app.task def mytask(): try: do_work() except SoftTimeLimitExceeded: clean_up_in_a_hurry() ``` 時間限制也可以使用[task_time_limit](https://docs.celeryproject.org/en/master/userguide/configuration.html#std-setting-task_time_limit)/[task_soft_time_limit](https://docs.celeryproject.org/en/master/userguide/configuration.html#std-setting-task_soft_time_limit)來設置。 :::info **Note:** 時間限制目前在不支援`SIGUSR1 signal`的平台上無法執行。 ::: :::info **Note:** gavent pool無法實作soft time limtes。此外,如果任務阻塞,那它也不會強制執行hard time limit。 ::: #### Changing time limits at run-time *New in version 2.3.* **broker support:** `amqp`, `redis` 有一個遠端控制命令可以讓你同時調整任務的sort、hard time limit,其名稱為`time_limit`。 範例調整`tasks.crawl_the_web`的時間限制,調整soft time limit為一分鐘,hard time limit為兩分鐘: ```shell >>> app.control.time_limit('tasks.crawl_the_web', soft=60, hard=120, reply=True) [{'worker1.example.com': {'ok': 'time limits set successfully'}}] ``` 只有在時間限制改變之後才開始執行的任務才會受影響。 ### Rate Limits #### Changing rate-limits at run-time 下面範例調整`myapp.mytask`的速率限制,每分鐘最多執行200次該類型的任務: ```shell >>> app.control.rate_limit('myapp.mytask', '200/m') ``` 上面並沒有特別指定目標,因此這個調整請求會影響cluster內的所有worker。如果你只想要影響清單內的workers,那你可以使用參數`destination`: ```shell >>> app.control.rate_limit('myapp.mytask', '200/m', ... destination=['celery@worker1.example.com']) ``` :::warning **Warning:** 這並不影響worker啟用的[worker_disable_rate_limits](https://docs.celeryproject.org/en/master/userguide/configuration.html#std-setting-worker_disable_rate_limits)設置 ::: ### Max tasks per child setting *New in version 2.0.* **pool support:** prefork 使用這個選項,你可以配置一個worker在被新的process取代之前的可執行最大任務數量。 如果你有那種無法控制來自C extensions的記憶體漏失問題的話,這非常有用。 這個選項可以用workers的參數[--max-tasks-per-child](https://docs.celeryproject.org/en/master/reference/cli.html#cmdoption-celery-worker-max-tasks-per-child)設置,或使用[worker_max_tasks_per_child](https://docs.celeryproject.org/en/master/userguide/configuration.html#std-setting-worker_max_tasks_per_child)設置。 ### Max memory per child setting *New in version 4.0.* pool support: prefork 使用這個選項,你可以配置一個worker在被新的process取代之前的可執行最大常駐記憶體用量。 如果你有那種無法控制來自C extensions的記憶體漏失問題的話,這非常有用。 這個選項可以用workers的參數[--max-memory-per-child](https://docs.celeryproject.org/en/master/reference/cli.html#cmdoption-celery-worker-max-memory-per-child)設置,或使用[worker_max_memory_per_child](https://docs.celeryproject.org/en/master/userguide/configuration.html#std-setting-worker_max_memory_per_child)設置。 ### Autoscaling *New in version 2.2.* **pool support:** `prefork`, `gevent` 組件autoscaler用於依據負載動態調整pool的大小: * 當有工作要做的時候,autoscaler會增加更多的pool processes * 當工作負載降低的時候開始移除processes 它利用選項[--autoscale](https://docs.celeryproject.org/en/master/reference/cli.html#cmdoption-celery-worker-autoscale)啟用,這需要兩個數值:最大、最小的pool processes設置: ```python --autoscale=AUTOSCALE Enable autoscaling by providing max_concurrency,min_concurrency. Example: --autoscale=10,3 (always keep 3 processes, but grow to 10 if necessary). ``` 你也可以透過繼承`Autoscaler`來自定義autoscaler的規則。關於指標的一些想法包括平衡負載或記憶體可用量。你可以用[worker_autoscaler](https://docs.celeryproject.org/en/master/userguide/configuration.html#std-setting-worker_autoscaler)指定自定義的autoscaler。 ### Queues worker instance可以使用任意數量的佇列(queue)。預設情況下,它將使用[task_queues](https://docs.celeryproject.org/en/master/userguide/configuration.html#std-setting-task_queues)所定義的所有佇列(如果沒有指定,則返回名稱為celery的預設佇列)。 你可以在啟動worker的時候指定佇列,利用comma(,)來區隔選項[-Q](https://docs.celeryproject.org/en/master/reference/cli.html#cmdoption-celery-worker-Q)後面的佇列清單: ```shell $ celery -A proj worker -l info -Q foo,bar,baz ``` 如果佇列名稱定義在[task_queues](https://docs.celeryproject.org/en/master/userguide/configuration.html#std-setting-task_queues),那就會使用該配置,但如果它沒在佇列清單中定義,Celery會自動生成一個新的佇列(取決於選項[task_create_missing_queues](https://docs.celeryproject.org/en/master/userguide/configuration.html#std-setting-task_create_missing_queues))。 你還可以在運行中利用遠端控制命令[add_consumer](https://docs.celeryproject.org/en/master/userguide/workers.html#std-control-add_consumer)與[cancel_consumer](https://docs.celeryproject.org/en/master/userguide/workers.html#std-control-cancel_consumer)告訴worker開始與結束使用某一個佇列。 #### Queues: Adding consumers 控制命令[add_consumer](https://docs.celeryproject.org/en/master/userguide/workers.html#std-control-add_consumer)會告訴一個或多個workers開始使用某一個佇列,這個操作是冪等的(idempotent)。 要告訴cluster中的所有workers開始使用佇列`foo`,你可以使用celery control: ```shell $ celery -A proj control add_consumer foo -> worker1.local: OK started consuming from u'foo' ``` 如果你要特別指定worker,你可以使用參數[--destination](https://docs.celeryproject.org/en/master/reference/cli.html#cmdoption-celery-control-d): ```shell $ celery -A proj control add_consumer foo -d celery@worker1.local ``` 相同的操作可以使用[app.control.add_consumer()](https://docs.celeryproject.org/en/master/reference/celery.app.control.html#celery.app.control.Control.add_consumer)動態地完成: ```python >>> app.control.add_consumer('foo', reply=True) [{u'worker1.local': {u'ok': u"already consuming from u'foo'"}}] >>> app.control.add_consumer('foo', reply=True, ... destination=['worker1@example.com']) [{u'worker1.local': {u'ok': u"already consuming from u'foo'"}}] ``` 目前為止,我們只有演示使用自動佇列的範例,如果你需要更多的控制,你還可以指定`exchange`、 `routing_key`、甚至其它選項: ```python >>> app.control.add_consumer( ... queue='baz', ... exchange='ex', ... exchange_type='topic', ... routing_key='media.*', ... options={ ... 'queue_durable': False, ... 'exchange_durable': False, ... }, ... reply=True, ... destination=['w1@example.com', 'w2@example.com']) ``` #### Queues: Canceling consumers 你可以使用控制命令[cancel_consumer](https://docs.celeryproject.org/en/master/userguide/workers.html#std-control-cancel_consumer)來取消worker使用某一個佇列。 強制cluster中的所有worker取消使用某一個佇列,你可以使用celery contrl: ```shell $ celery -A proj control cancel_consumer foo ``` 參數[--destination](https://docs.celeryproject.org/en/master/reference/cli.html#cmdoption-celery-control-d)可以用於指定作用於該命令的worker或worker list: ```shell $ celery -A proj control cancel_consumer foo -d celery@worker1.local ``` 你也可以使用[app.control.cancel_consumer()](https://docs.celeryproject.org/en/master/reference/celery.app.control.html#celery.app.control.Control.cancel_consumer)用寫程式的方式來取消使用佇列: ```python >>> app.control.cancel_consumer('foo', reply=True) [{u'worker1.local': {u'ok': u"no longer consuming from u'foo'"}}] ``` #### Queues: List of active queues 你可以用控制命令[active_queues](https://docs.celeryproject.org/en/master/userguide/workers.html#std-control-active_queues)來取得worker使用的佇列清單: ```shell $ celery -A proj inspect active_queues [...] ``` 如同其它遠端控制命令,它也支援使用參數[--destination](https://docs.celeryproject.org/en/master/reference/cli.html#cmdoption-celery-inspect-d)指定worker來回覆這個請求: ```shell $ celery -A proj inspect active_queues -d celery@worker1.local [...] ``` 這也可以透過使用[active_queues()](https://docs.celeryproject.org/en/master/reference/celery.app.control.html#celery.app.control.Inspect.active_queues)以寫程式的方式完成: ```python >>> app.control.inspect().active_queues() [...] >>> app.control.inspect(['worker1.local']).active_queues() [...] ``` ### Inspecting workers [app.control.inspect](https://docs.celeryproject.org/en/master/reference/celery.app.control.html#celery.app.control.Control.inspect)讓你檢核執行中的workers。它其實就是使用遠端控制命令。 你也可以使用celery command來檢查workers,它支援與[app.control](https://docs.celeryproject.org/en/master/reference/celery.app.control.html#celery.app.control.Control)接口相同的命令。 ```python >>> # Inspect all nodes. >>> i = app.control.inspect() >>> # Specify multiple nodes to inspect. >>> i = app.control.inspect(['worker1.example.com', 'worker2.example.com']) >>> # Specify a single node to inspect. >>> i = app.control.inspect('worker1.example.com') ``` #### Dump of registered tasks 你可以用[registered()](https://docs.celeryproject.org/en/master/reference/celery.app.control.html#celery.app.control.Inspect.registered)來取得worker所註冊的任務清單: ```shell >>> i.registered() [{'worker1.example.com': ['tasks.add', 'tasks.sleeptask']}] ``` #### Dump of currently executing tasks 你可以用[active()](https://docs.celeryproject.org/en/master/reference/celery.app.control.html#celery.app.control.Inspect.active)來取得活動任務清單: ```shell >>> i.active() [{'worker1.example.com': [{'name': 'tasks.sleeptask', 'id': '32666e9b-809c-41fa-8e93-5ae0c80afbbf', 'args': '(8,)', 'kwargs': '{}'}]}] ``` #### Dump of scheduled (ETA) tasks 你可以用[scheduled()](https://docs.celeryproject.org/en/master/reference/celery.app.control.html#celery.app.control.Inspect.scheduled)來取得等待調度的任務清單: ```shell >>> i.scheduled() [{'worker1.example.com': [{'eta': '2010-06-07 09:07:52', 'priority': 0, 'request': { 'name': 'tasks.sleeptask', 'id': '1a7980ea-8b19-413e-91d2-0b74f3844c4d', 'args': '[1]', 'kwargs': '{}'}}, {'eta': '2010-06-07 09:07:53', 'priority': 0, 'request': { 'name': 'tasks.sleeptask', 'id': '49661b9a-aa22-4120-94b7-9ee8031d219d', 'args': '[2]', 'kwargs': '{}'}}]}] ``` :::info **Note:** 這是帶有參數`ETA/countdown`的任務,並非指週期性任務(periodic tasks) ::: #### Dump of reserved tasks 保留任務(reserved tasks)是指任務已被接收,但仍然等待執行。 你可以使用[reserved()](https://docs.celeryproject.org/en/master/reference/celery.app.control.html#celery.app.control.Inspect.reserved)來取得清單: ```shell >>> i.reserved() [{'worker1.example.com': [{'name': 'tasks.sleeptask', 'id': '32666e9b-809c-41fa-8e93-5ae0c80afbbf', 'args': '(8,)', 'kwargs': '{}'}]}] ``` #### Statistics 遠端控制命令`inspect stats`(或`status()`)會提供你一個關於worker有效統計的清單(或者不是那麼有幫助): ```shell $ celery -A proj inspect stats ``` 輸出資訊請見[stats()](https://docs.celeryproject.org/en/master/reference/celery.app.control.html#celery.app.control.Inspect.stats) ### Additional Commands #### Remote shutdown 這個命令將優雅的遠端關閉worker: ```shell >>> app.control.broadcast('shutdown') # shutdown all workers >>> app.control.broadcast('shutdown', destination='worker1@example.com') ``` #### Ping 這個命令從活動中的workers要求一個回應。Workers的回應字串為`pong`,差不多就這樣。除了你指定自定義timeout,不然它會使用預設的一秒來回應。 ```shell >>> app.control.ping(timeout=0.5) [{'worker1.example.com': 'pong'}, {'worker2.example.com': 'pong'}, {'worker3.example.com': 'pong'}] ``` [ping()](https://docs.celeryproject.org/en/master/reference/celery.app.control.html#celery.app.control.Control.ping)也支援參數`destination`,因此你可以指定worker來ping: ```shell >>> ping(['worker2.example.com', 'worker3.example.com']) [{'worker2.example.com': 'pong'}, {'worker3.example.com': 'pong'}] ``` #### Enable/disable events 你可以使用`enable_events`、`disable_events`命令來啟動/禁用事件。這對於使用**celery events/celerymon**來臨時監視worker非常有用。 ```shell >>> app.control.enable_events() >>> app.control.disable_events() ``` ### Writing your own remote control commands 有兩種遠端控制的命令類型: * Inspect command 沒有副作用,通常只是回傳在worker中找到的一些值,像是目前註冊的任務清單,活動中的任務清單,等等。 * Control command 執行會有副作用,比方說,你想要增加從中使用的新佇列之類的 遠端控制命令註冊在控制台,而且它們只有一個參數:當前的`ControlDispatch` instance。如果有必要,你可以從那邊進入活動中的[Consumer](https://docs.celeryproject.org/en/master/reference/celery.worker.consumer.html#celery.worker.consumer.Consumer) 這邊有一個增加任務預取計數器的控制命令範例: ```python from celery.worker.control import control_command @control_command( args=[('n', int)], signature='[N=1]', # <- used for help on the command-line. ) def increase_prefetch_count(state, n=1): state.consumer.qos.increment_eventually(n) return {'ok': 'prefetch count incremented'} ``` 確認你增加這段程式碼到由worker所導入的模組中:這可以與定義Celery應用程式的模組相同,也可以將模組加到到[imports](https://docs.celeryproject.org/en/master/userguide/configuration.html#std-setting-imports)設置中。 重新啟動worker以便註冊控制命令,現在你可以使用`celery control`工具來調用你的命令: ```shell $ celery -A proj control increase_prefetch_count 3 ``` 你也可以加入動作到`celery instpec`,下面給出一個讀取當前預取計數器的範例: ```python from celery.worker.control import inspect_command @inspect_command def current_prefetch_count(state): return {'prefetch_count': state.consumer.qos.value} ``` 在重新啟動worker之後你就可以使用`celery inspect`來查詢當前預取計數器的值: ```shell $ celery -A proj inspect current_prefetch_count ``` ## History 20190814_依據4.4版本說明翻譯 20220121_依據5.2版本說明翻譯 ## 暫時保留區 輸出資訊 * broker * 關於broker的訊息 * connect_timeout 以秒為單位,建立新連線的Timeout設置(int/float) * heartbeat 當前的heartbeat數值(client設置) * hostname 遠端broker的節點名稱 * insist No longer used.(似乎已棄用) * login_method 連接broker的登入方式 * port 遠端broker的Port * ssl SSL enabled/disabled * transport transport使用的名稱(e.g., `amqp` or `redis`) * transport_options 傳遞給transport的選項 * uri_prefix 部份transport預期以主機名稱做為URL ```shell redis+socket:///tmp/redis.sock ``` 這個範例中,URI-prefix會是`redis` * userid 用來連接到broker的user id * virtual_host 使用虛擬主機 * clock * Worker logical clock的值。這是一個正整數,並且每次收到統計信息時都應該增加。 * pid * Worker實例的process id(Main process) * pool * Pool-specific section * max-concurrency `processes/threads/green threads`的最大數量 * max-tasks-per-child thread在被回收之前可以執行的最大`tasks`數。 * processes List of PIDs (or thread-id’s) * put-guarded-by-semaphore Internal * timeouts 時間限制的預設值 * writes 針對prefork pool,這說明使用非同步I/O時pool中每個process的寫入分佈。 * prefetch_count * `task consumer`的當前預取計數值 * rusage * 系統使用統計,這欄位可能因為平台不同所有差異。 * From `getrusage(2):` * stime 代表這個process花費在作業系統程式碼上的時間。 * utime 執行用戶指令所花費的時間 * maxrss 這個process所使用的最大常駐(KB) * idrss 資料使用的非共享記憶體量(in kilobytes times ticks of execution) * isrss 用於堆疊空間的非共享記憶體量(in kilobytes times ticks of execution) * ixrss 用於其它processes的共享記憶體量(in kilobytes times ticks of execution) * inblock 代表這個process從磁碟讀取文件系統的次數 * oublock 代表這個process從磁碟寫入文件系統的次數 * majflt 通過執行I/O服務的分頁錯誤數量 * minflt 未通過執行I/O服務的分頁錯誤數量 * msgrcv IPC訊息接收的次數 * msgsnd IPC訊息發送的次數 * nvcsw 這個process自願性切換上下文的次數 * nivcsw 非自願性切換上下文的次數 * nsignals 接收到信號的次數 * nswap 這個process `swapped entirely out of memory` 的次數 * total `tasks`名稱的映射,以及Worker啟動以來已接受該類型的`tasks`總數