# Celery_Workers Guide
###### tags: `celery` `celery 5.2` `python`
[官方連結_Workers Guide](https://docs.celeryproject.org/en/master/userguide/workers.html)
## Workers Guide
### Starting the worker
你可以透過執行下面命令來啟動worker:
```shell
$ celery -A proj worker -l info
```
:::info
**Daemonizing**
你可以會希望可以使用daemonization tool(常駐程式的工具?)在背景啟動worker。關於使用服務管理器將worker做為常駐程式來改動部份,可以參考[Daemonization](https://docs.celeryproject.org/en/master/userguide/daemonizing.html#daemonizing)。
:::
關於command-line所有可用選項的完整清單,請參閱[worker](https://docs.celeryproject.org/en/master/reference/celery.bin.worker.html#module-celery.bin.worker),或這麼做:
```shell
$ celery worker --help
```
你可以在同一台機器上面啟動多個workers,但請務必使用參數[`--hostname`](https://docs.celeryproject.org/en/master/reference/cli.html#cmdoption-celery-worker-n)來為每個worker指定節點名稱:
```shell
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1@%h
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker2@%h
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker3@%h
```
參數`hostname`可以擴展下面變數:
* `%h`:主機名稱,包含網域名稱
* `%n`:單純主機名稱
* `%d`:單純網域名稱
如果當前的主機名稱是`george.example.com`,這會擴展為:
| Variable | Template | Result |
|----------|------------|----------------------------|
| %h | worker1@%h | worker1@george.example.com |
| %n | worker1@%n | worker1@george |
| %d | worker1@%d | worker1@example.com |
:::info
**Note for [supervisor](https://pypi.org/project/supervisor/) users:**
對使用supervisor管理背景執行的使用者,必須使用第二個%來做轉譯:%%h
:::
### Stopping the worker
你應該使用**TERM** signal來完成關閉worker。
當你啟動關閉的時候,worker在實際終止之前會把所有當前執行的任務完成。如果這些任務是重要的,在你做一些激烈操作之前你應該等待它完成,像是送出**KILL** signal。
如果worker在等待一段時間之後還是不會關閉,像是卡在無限迴圈或類似情況的話,你可以使用**KILL** signal來強制終止worker:但是,請注意,當前執行的任務會因此而遺失(除非你的任務有設置[`acks_late`](https://docs.celeryproject.org/en/master/reference/celery.app.task.html#celery.app.task.Task.acks_late))。
此外,由於processes無法覆蓋**KILL** signal,
因此worker無法取得它的children processes;一定要手動執行。下面命令通常可以解決這個問題:
```shell
$ pkill -9 -f 'celery worker'
```
如果你的系統沒有**pkill**,那可以使用較長一點的版本:
```shell
$ ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill -9
```
version 5.2變更:在Linux systems上,Celery現在已經支援在worker終止之後,發送**KILL** signal給所有的child processes。這可以透過prctl(2)的選項PR_SET_PDEATHSIG來完成。
### Restarting the worker
如果你想要重新啟動worker,那就應該給它一個**TERM** signal,並啟動一個新的實例。管理worker發展最簡單的方法就是使用`celery multi`:
```shell
$ celery multi start 1 -A proj -l info -c4 --pidfile=/var/run/celery/%n.pid
$ celery multi restart 1 --pidfile=/var/run/celery/%n.pid
```
對生產佈署而言,你應該使用init-scripts或是process supervision(進程監控)系統(參考[Daemonization](https://docs.celeryproject.org/en/master/userguide/daemonizing.html#daemonizing))
除了停止,然後讓worker重新啟動,你也可以使用**HUP** signal來重新啟動worker。要注意,worker將負責重新啟動本身,這很容易出現問題,我們並不建議在生產環境這麼用:
```shell
$ kill -HUP $pid
```
:::info
**Note:**
只有在worker是背景執行的常駐程式的時候才使用**HUP**來重新啟動worker(它沒有控制終端)。
由於平台的限制,maxOS上禁止使用**UPH**。
:::
### Process Signals
worker的主要過程會覆蓋下面的信號(signals):
| TERM | Warm shutdown, wait for tasks to complete. |
|------|--------------------------------------------|
| QUIT | Cold shutdown, terminate ASAP |
| USR1 | Dump traceback for all active threads. |
| USR2 | Remote debug, see [celery.contrib.rdb](https://docs.celeryproject.org/en/master/reference/celery.contrib.rdb.html#module-celery.contrib.rdb) |
### Variables in file paths
檔案路徑的參數,像是[--logfile](https://docs.celeryproject.org/en/master/reference/cli.html#cmdoption-celery-worker-f), [--pidfile](https://docs.celeryproject.org/en/master/reference/cli.html#cmdoption-celery-worker-pidfile), [--statedb](https://docs.celeryproject.org/en/master/reference/cli.html#cmdoption-celery-worker-S)可以包含worker要擴展的變數:
#### Node name replacements
* %p: 完整的節點名稱
* %h: 主機名稱,包含網域名稱
* %n: 單純主機名稱
* %d: 單純網域名稱
* %i: Prefork pool process index or 0 if MainProcess.
* %I: Prefork pool process index with separator.
舉例來說,如果當前的主機名稱是`george@foo.example.com`,那它會擴展為:
* --logfile=%p.log -> george@foo.example.com.log
* --logfile=%h.log -> foo.example.com.log
* --logfile=%n.log -> george.log
* --logfile=%d.log -> example.com.log
#### Prefork pool process index
`prefork pool process index specifier`會根據最終需要打開文件的process來擴展為不同的檔案名稱。
這可以用來為每個child process指定一個日誌文件(log file)。
注意到,即使processes退出,或者是使用`autoscale/maxtasksperchild/time limits`,這數值仍然保持在process的界限內。也就是說,這數值是*process index*,並不是process count,也不是pid。
* %i - Pool process index or 0 if MainProcess.
* `-n worker1@example.com -c2 -f %n-%i.log`會產生三個log file:
* worker1-0.log (main process)
* worker1-1.log (pool process 1)
* worker1-2.log (pool process 2)
* %I - Pool process index with separator.
* `n worker1@example.com -c2 -f %n%I.log`會產生三個log file:
* worker1.log (main process)
* worker1-1.log (pool process 1)
* worker1-2.log (pool process 2)
### Concurrency
預設情況下,[多元處理](http://terms.naer.edu.tw/detail/151347/)(multiprocessing)用來處理任務的同步執行,不過你也可以使用[Eventlet](https://docs.celeryproject.org/en/master/userguide/concurrency/eventlet.html#concurrency-eventlet)。worker的processes/threads的數量可以使用參數[--concurrency](https://docs.celeryproject.org/en/master/reference/cli.html#cmdoption-celery-worker-c)來調整,預設為機器上可用的CPU的數量。
#### Number of processes (multiprocessing/prefork pool):
愈多的pool processes通常愈好,不過還是會有一個截止點,過了這個截止點繼續增加更多的pool processes就只會影響效能。甚至會有一些證據顯示,多個worker instance執行,也許效能會比單個worker還要好。舉例,3個workers,每個worker有10個pool processes。你需要經過測試來找到適合你的worker數量,因為這取決你的應用場景,工作負載,任務執行次數,與其它因素。
### Remote control
*New in version 2.0.*
**pool support:** prefork, eventlet, gevent, blocking:solo (see note)
**broker support:** amqp, redis
:::info
**The celery command**
celery program是用來從command-line做遠端控制命令。它支援下面列出的所有的命令。更多信息請參閱[Management Command-line Utilities(inspect/control)](https://docs.celeryproject.org/en/master/userguide/monitoring.html#monitoring-control)
:::
你可以使用高優先廣播訊息佇列(high-priority broadcast message queue)對workers做遠端控制。命令可以直接下給所有的workers或清單內的特定workers。
命令也可以有回應。Client可以等待並且收集這些回應。因為沒有一個中央機構可以知道你的cluster內有多少的workers可供應用,所以沒有辦法估計有多少workers可以發送回應,因此Client可以設置timeout - 回應到達的截止時間(以秒為單位)。預設timeout為一秒。如果worker在截止時間前沒有回應,那並不代表它們沒有回應,或許更糟的是它們掛了,或者可能只是因為網路延遲或worker正在處理命令而慢了,所以就看情況調整timeout吧。
除了timeout,Client還可以指定最大等待回應數。如果有指定目標,則將此限制設置為目標主機的數量。
:::info
**Note:**
solo pool支援遠端控制命令,但是任何執行中的任務都會阻斷任何等待控制的命令,因此如果worker非常繁忙的話,那它的用途是有限的。這種情況下,你應該增加Client等待回應的timeout。
:::
#### The [broadcast()](https://docs.celeryproject.org/en/master/reference/celery.app.control.html#celery.app.control.Control.broadcast) function
這是用來向worker發送命令的用戶端函數(client function)。部份遠端控制命令也提供背景執行的高階接口[broadcast()](https://docs.celeryproject.org/en/master/reference/celery.app.control.html#celery.app.control.Control.broadcast),像是[rate_limit](https://docs.celeryproject.org/en/master/reference/celery.app.control.html#celery.app.control.Control.rate_limit)與[ping](https://docs.celeryproject.org/en/master/reference/celery.app.control.html#celery.app.control.Control.ping)。
發送命令[rate_limit](https://docs.celeryproject.org/en/master/userguide/workers.html#std-control-rate_limit)與關鍵參數(keyword arguments):
```python
>>> app.control.broadcast('rate_limit',
... arguments={'task_name': 'myapp.mytask',
... 'rate_limit': '200/m'})
```
這將非同步的發送命令,而且不需要等待回應。如果要求回應,你必須使用參數`reply`:
```python
>>> app.control.broadcast('rate_limit', {
... 'task_name': 'myapp.mytask', 'rate_limit': '200/m'}, reply=True)
[{'worker1.example.com': 'New rate limit set successfully'},
{'worker2.example.com': 'New rate limit set successfully'},
{'worker3.example.com': 'New rate limit set successfully'}]
```
使用參數`destination`,你可以指定接收命令的worker list:
```python
>>> app.control.broadcast('rate_limit', {
... 'task_name': 'myapp.mytask',
... 'rate_limit': '200/m'}, reply=True,
... destination=['worker1@example.com'])
[{'worker1.example.com': 'New rate limit set successfully'}]
```
當然,使用高階接口來設置速率限制更為方便,不過有些命令只能用[broadcast()](https://docs.celeryproject.org/en/master/reference/celery.app.control.html#celery.app.control.Control.broadcast)來請求。
### Commands
#### revoke: Revoking tasks
**pool support:** all, terminate only supported by prefork
**broker support:** `amqp`, `redis`
**command:** `celery -A proj control revoke <task_id>`
所有的worker節點都會保留已撤銷的task ids,不論是記憶體內還是寫入磁碟的(見[Persistent revokes](https://docs.celeryproject.org/en/master/userguide/workers.html#worker-persistent-revokes))。
當worker接收到撤銷的請求,它將跳過執行該任務,但是它不會終止已經在執行的任務,除非有設置`terminate`這個選項。
:::info
**Note:**
選項`terminate`是當任務卡住的時候管理人員最後的手段。這並不是用來終止任務,而是終止執行該任務的process,而且當終止信號傳到的時候,該process在當下有可能已經開始處理其它任務,基於這個原因,你永遠都不能在程式碼中調用它。
:::
如果設置`terminate`,那處理該任務的child process將會被終止。預設傳遞訊號是`TERM`,但你可以用參數`signal`來指定。信號號可以是Python標準庫中[signal](https://docs.python.org/dev/library/signal.html#module-signal)模組中定義的任何信號的大寫名稱。
終止任務的同時也會撤銷它。
**Example**
```python
>>> result.revoke()
>>> AsyncResult(id).revoke()
>>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed')
>>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed',
... terminate=True)
>>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed',
... terminate=True, signal='SIGKILL')
```
#### Revoking multiple tasks
*New in version 3.1.*
`revoke`也接受list格式的參數,它將一次撤銷多個任務。
**Example**
```python
>>> app.control.revoke([
... '7993b0aa-1f0b-4780-9af0-c47c0858b3f2',
... 'f565793e-b041-4b2b-9ca4-dca22762a55d',
... 'd9d35e03-2997-42d0-a13e-64a66b88a618',
])
```
`GroupResult.revoke`在3.1版之後就開始利用這一點了。
#### Persistent revokes
撤銷任務的工作方式是向所有workers發送一個廣播訊息,然後workers會在記憶體中保留撤銷的任務清單。當一個worker啟動的時候,它會將撤銷的任務同步給cluster內的其它workers。
撤銷的任務清單保存在記憶體內,因此如果所有的workers都重新啟動,那記錄清單也會跟著消失。如果你想重新啟動之間保留這個清單,那你要利用參數`-statedb`來指定保存的檔案給celery worker知道:
```shell
$ celery -A proj worker -l info --statedb=/var/run/celery/worker.state
```
或者,如果你使用**celery multi**,你想要為每個worker instance都建立一個檔案,你可以`%n`來格式化擴展當前的節點名稱:
```shell
celery multi start 2 -l info --statedb=/var/run/celery/%n.state
```
參考[Variables in file paths](https://docs.celeryproject.org/en/master/userguide/workers.html#worker-files)
注意,遠端控制命令必須能夠正常作業,撤銷才能作業。這情況下,僅RabbitMQ(amqp)與Redis支援遠端控制命令。
### Time Limits
*New in version 2.0.*
pool support: `prefork/gevent (see note below)`
:::info
**Soft, or hard?**
time limit,時間限制的設置有兩個值,也就是soft與hard。soft time limit允許任務在被清掉之前拋出例外,而hard timeout並不允許,而且會強制任務終止。
:::
單個任務可能會永久的執行,如果你有很多任務在等待永遠不會發生的某些事件,那你的worker會一輩子無法處理新任務。要預防這種劇情發生的最好方法就是啟用時間限制(time limit)
時間限制意指任務在process被終止並且替換之前的可執行最大秒數。你也可以啟用soft time limit,它會拋出例外讓任務可以在hard time limit殺掉它之前來處理它:
```python
from myapp import app
from celery.exceptions import SoftTimeLimitExceeded
@app.task
def mytask():
try:
do_work()
except SoftTimeLimitExceeded:
clean_up_in_a_hurry()
```
時間限制也可以使用[task_time_limit](https://docs.celeryproject.org/en/master/userguide/configuration.html#std-setting-task_time_limit)/[task_soft_time_limit](https://docs.celeryproject.org/en/master/userguide/configuration.html#std-setting-task_soft_time_limit)來設置。
:::info
**Note:**
時間限制目前在不支援`SIGUSR1 signal`的平台上無法執行。
:::
:::info
**Note:**
gavent pool無法實作soft time limtes。此外,如果任務阻塞,那它也不會強制執行hard time limit。
:::
#### Changing time limits at run-time
*New in version 2.3.*
**broker support:** `amqp`, `redis`
有一個遠端控制命令可以讓你同時調整任務的sort、hard time limit,其名稱為`time_limit`。
範例調整`tasks.crawl_the_web`的時間限制,調整soft time limit為一分鐘,hard time limit為兩分鐘:
```shell
>>> app.control.time_limit('tasks.crawl_the_web',
soft=60, hard=120, reply=True)
[{'worker1.example.com': {'ok': 'time limits set successfully'}}]
```
只有在時間限制改變之後才開始執行的任務才會受影響。
### Rate Limits
#### Changing rate-limits at run-time
下面範例調整`myapp.mytask`的速率限制,每分鐘最多執行200次該類型的任務:
```shell
>>> app.control.rate_limit('myapp.mytask', '200/m')
```
上面並沒有特別指定目標,因此這個調整請求會影響cluster內的所有worker。如果你只想要影響清單內的workers,那你可以使用參數`destination`:
```shell
>>> app.control.rate_limit('myapp.mytask', '200/m',
... destination=['celery@worker1.example.com'])
```
:::warning
**Warning:**
這並不影響worker啟用的[worker_disable_rate_limits](https://docs.celeryproject.org/en/master/userguide/configuration.html#std-setting-worker_disable_rate_limits)設置
:::
### Max tasks per child setting
*New in version 2.0.*
**pool support:** prefork
使用這個選項,你可以配置一個worker在被新的process取代之前的可執行最大任務數量。
如果你有那種無法控制來自C extensions的記憶體漏失問題的話,這非常有用。
這個選項可以用workers的參數[--max-tasks-per-child](https://docs.celeryproject.org/en/master/reference/cli.html#cmdoption-celery-worker-max-tasks-per-child)設置,或使用[worker_max_tasks_per_child](https://docs.celeryproject.org/en/master/userguide/configuration.html#std-setting-worker_max_tasks_per_child)設置。
### Max memory per child setting
*New in version 4.0.*
pool support: prefork
使用這個選項,你可以配置一個worker在被新的process取代之前的可執行最大常駐記憶體用量。
如果你有那種無法控制來自C extensions的記憶體漏失問題的話,這非常有用。
這個選項可以用workers的參數[--max-memory-per-child](https://docs.celeryproject.org/en/master/reference/cli.html#cmdoption-celery-worker-max-memory-per-child)設置,或使用[worker_max_memory_per_child](https://docs.celeryproject.org/en/master/userguide/configuration.html#std-setting-worker_max_memory_per_child)設置。
### Autoscaling
*New in version 2.2.*
**pool support:** `prefork`, `gevent`
組件autoscaler用於依據負載動態調整pool的大小:
* 當有工作要做的時候,autoscaler會增加更多的pool processes
* 當工作負載降低的時候開始移除processes
它利用選項[--autoscale](https://docs.celeryproject.org/en/master/reference/cli.html#cmdoption-celery-worker-autoscale)啟用,這需要兩個數值:最大、最小的pool processes設置:
```python
--autoscale=AUTOSCALE
Enable autoscaling by providing
max_concurrency,min_concurrency. Example:
--autoscale=10,3 (always keep 3 processes, but grow to
10 if necessary).
```
你也可以透過繼承`Autoscaler`來自定義autoscaler的規則。關於指標的一些想法包括平衡負載或記憶體可用量。你可以用[worker_autoscaler](https://docs.celeryproject.org/en/master/userguide/configuration.html#std-setting-worker_autoscaler)指定自定義的autoscaler。
### Queues
worker instance可以使用任意數量的佇列(queue)。預設情況下,它將使用[task_queues](https://docs.celeryproject.org/en/master/userguide/configuration.html#std-setting-task_queues)所定義的所有佇列(如果沒有指定,則返回名稱為celery的預設佇列)。
你可以在啟動worker的時候指定佇列,利用comma(,)來區隔選項[-Q](https://docs.celeryproject.org/en/master/reference/cli.html#cmdoption-celery-worker-Q)後面的佇列清單:
```shell
$ celery -A proj worker -l info -Q foo,bar,baz
```
如果佇列名稱定義在[task_queues](https://docs.celeryproject.org/en/master/userguide/configuration.html#std-setting-task_queues),那就會使用該配置,但如果它沒在佇列清單中定義,Celery會自動生成一個新的佇列(取決於選項[task_create_missing_queues](https://docs.celeryproject.org/en/master/userguide/configuration.html#std-setting-task_create_missing_queues))。
你還可以在運行中利用遠端控制命令[add_consumer](https://docs.celeryproject.org/en/master/userguide/workers.html#std-control-add_consumer)與[cancel_consumer](https://docs.celeryproject.org/en/master/userguide/workers.html#std-control-cancel_consumer)告訴worker開始與結束使用某一個佇列。
#### Queues: Adding consumers
控制命令[add_consumer](https://docs.celeryproject.org/en/master/userguide/workers.html#std-control-add_consumer)會告訴一個或多個workers開始使用某一個佇列,這個操作是冪等的(idempotent)。
要告訴cluster中的所有workers開始使用佇列`foo`,你可以使用celery control:
```shell
$ celery -A proj control add_consumer foo
-> worker1.local: OK
started consuming from u'foo'
```
如果你要特別指定worker,你可以使用參數[--destination](https://docs.celeryproject.org/en/master/reference/cli.html#cmdoption-celery-control-d):
```shell
$ celery -A proj control add_consumer foo -d celery@worker1.local
```
相同的操作可以使用[app.control.add_consumer()](https://docs.celeryproject.org/en/master/reference/celery.app.control.html#celery.app.control.Control.add_consumer)動態地完成:
```python
>>> app.control.add_consumer('foo', reply=True)
[{u'worker1.local': {u'ok': u"already consuming from u'foo'"}}]
>>> app.control.add_consumer('foo', reply=True,
... destination=['worker1@example.com'])
[{u'worker1.local': {u'ok': u"already consuming from u'foo'"}}]
```
目前為止,我們只有演示使用自動佇列的範例,如果你需要更多的控制,你還可以指定`exchange`、 `routing_key`、甚至其它選項:
```python
>>> app.control.add_consumer(
... queue='baz',
... exchange='ex',
... exchange_type='topic',
... routing_key='media.*',
... options={
... 'queue_durable': False,
... 'exchange_durable': False,
... },
... reply=True,
... destination=['w1@example.com', 'w2@example.com'])
```
#### Queues: Canceling consumers
你可以使用控制命令[cancel_consumer](https://docs.celeryproject.org/en/master/userguide/workers.html#std-control-cancel_consumer)來取消worker使用某一個佇列。
強制cluster中的所有worker取消使用某一個佇列,你可以使用celery contrl:
```shell
$ celery -A proj control cancel_consumer foo
```
參數[--destination](https://docs.celeryproject.org/en/master/reference/cli.html#cmdoption-celery-control-d)可以用於指定作用於該命令的worker或worker list:
```shell
$ celery -A proj control cancel_consumer foo -d celery@worker1.local
```
你也可以使用[app.control.cancel_consumer()](https://docs.celeryproject.org/en/master/reference/celery.app.control.html#celery.app.control.Control.cancel_consumer)用寫程式的方式來取消使用佇列:
```python
>>> app.control.cancel_consumer('foo', reply=True)
[{u'worker1.local': {u'ok': u"no longer consuming from u'foo'"}}]
```
#### Queues: List of active queues
你可以用控制命令[active_queues](https://docs.celeryproject.org/en/master/userguide/workers.html#std-control-active_queues)來取得worker使用的佇列清單:
```shell
$ celery -A proj inspect active_queues
[...]
```
如同其它遠端控制命令,它也支援使用參數[--destination](https://docs.celeryproject.org/en/master/reference/cli.html#cmdoption-celery-inspect-d)指定worker來回覆這個請求:
```shell
$ celery -A proj inspect active_queues -d celery@worker1.local
[...]
```
這也可以透過使用[active_queues()](https://docs.celeryproject.org/en/master/reference/celery.app.control.html#celery.app.control.Inspect.active_queues)以寫程式的方式完成:
```python
>>> app.control.inspect().active_queues()
[...]
>>> app.control.inspect(['worker1.local']).active_queues()
[...]
```
### Inspecting workers
[app.control.inspect](https://docs.celeryproject.org/en/master/reference/celery.app.control.html#celery.app.control.Control.inspect)讓你檢核執行中的workers。它其實就是使用遠端控制命令。
你也可以使用celery command來檢查workers,它支援與[app.control](https://docs.celeryproject.org/en/master/reference/celery.app.control.html#celery.app.control.Control)接口相同的命令。
```python
>>> # Inspect all nodes.
>>> i = app.control.inspect()
>>> # Specify multiple nodes to inspect.
>>> i = app.control.inspect(['worker1.example.com',
'worker2.example.com'])
>>> # Specify a single node to inspect.
>>> i = app.control.inspect('worker1.example.com')
```
#### Dump of registered tasks
你可以用[registered()](https://docs.celeryproject.org/en/master/reference/celery.app.control.html#celery.app.control.Inspect.registered)來取得worker所註冊的任務清單:
```shell
>>> i.registered()
[{'worker1.example.com': ['tasks.add',
'tasks.sleeptask']}]
```
#### Dump of currently executing tasks
你可以用[active()](https://docs.celeryproject.org/en/master/reference/celery.app.control.html#celery.app.control.Inspect.active)來取得活動任務清單:
```shell
>>> i.active()
[{'worker1.example.com':
[{'name': 'tasks.sleeptask',
'id': '32666e9b-809c-41fa-8e93-5ae0c80afbbf',
'args': '(8,)',
'kwargs': '{}'}]}]
```
#### Dump of scheduled (ETA) tasks
你可以用[scheduled()](https://docs.celeryproject.org/en/master/reference/celery.app.control.html#celery.app.control.Inspect.scheduled)來取得等待調度的任務清單:
```shell
>>> i.scheduled()
[{'worker1.example.com':
[{'eta': '2010-06-07 09:07:52', 'priority': 0,
'request': {
'name': 'tasks.sleeptask',
'id': '1a7980ea-8b19-413e-91d2-0b74f3844c4d',
'args': '[1]',
'kwargs': '{}'}},
{'eta': '2010-06-07 09:07:53', 'priority': 0,
'request': {
'name': 'tasks.sleeptask',
'id': '49661b9a-aa22-4120-94b7-9ee8031d219d',
'args': '[2]',
'kwargs': '{}'}}]}]
```
:::info
**Note:**
這是帶有參數`ETA/countdown`的任務,並非指週期性任務(periodic tasks)
:::
#### Dump of reserved tasks
保留任務(reserved tasks)是指任務已被接收,但仍然等待執行。
你可以使用[reserved()](https://docs.celeryproject.org/en/master/reference/celery.app.control.html#celery.app.control.Inspect.reserved)來取得清單:
```shell
>>> i.reserved()
[{'worker1.example.com':
[{'name': 'tasks.sleeptask',
'id': '32666e9b-809c-41fa-8e93-5ae0c80afbbf',
'args': '(8,)',
'kwargs': '{}'}]}]
```
#### Statistics
遠端控制命令`inspect stats`(或`status()`)會提供你一個關於worker有效統計的清單(或者不是那麼有幫助):
```shell
$ celery -A proj inspect stats
```
輸出資訊請見[stats()](https://docs.celeryproject.org/en/master/reference/celery.app.control.html#celery.app.control.Inspect.stats)
### Additional Commands
#### Remote shutdown
這個命令將優雅的遠端關閉worker:
```shell
>>> app.control.broadcast('shutdown') # shutdown all workers
>>> app.control.broadcast('shutdown', destination='worker1@example.com')
```
#### Ping
這個命令從活動中的workers要求一個回應。Workers的回應字串為`pong`,差不多就這樣。除了你指定自定義timeout,不然它會使用預設的一秒來回應。
```shell
>>> app.control.ping(timeout=0.5)
[{'worker1.example.com': 'pong'},
{'worker2.example.com': 'pong'},
{'worker3.example.com': 'pong'}]
```
[ping()](https://docs.celeryproject.org/en/master/reference/celery.app.control.html#celery.app.control.Control.ping)也支援參數`destination`,因此你可以指定worker來ping:
```shell
>>> ping(['worker2.example.com', 'worker3.example.com'])
[{'worker2.example.com': 'pong'},
{'worker3.example.com': 'pong'}]
```
#### Enable/disable events
你可以使用`enable_events`、`disable_events`命令來啟動/禁用事件。這對於使用**celery events/celerymon**來臨時監視worker非常有用。
```shell
>>> app.control.enable_events()
>>> app.control.disable_events()
```
### Writing your own remote control commands
有兩種遠端控制的命令類型:
* Inspect command
沒有副作用,通常只是回傳在worker中找到的一些值,像是目前註冊的任務清單,活動中的任務清單,等等。
* Control command
執行會有副作用,比方說,你想要增加從中使用的新佇列之類的
遠端控制命令註冊在控制台,而且它們只有一個參數:當前的`ControlDispatch` instance。如果有必要,你可以從那邊進入活動中的[Consumer](https://docs.celeryproject.org/en/master/reference/celery.worker.consumer.html#celery.worker.consumer.Consumer)
這邊有一個增加任務預取計數器的控制命令範例:
```python
from celery.worker.control import control_command
@control_command(
args=[('n', int)],
signature='[N=1]', # <- used for help on the command-line.
)
def increase_prefetch_count(state, n=1):
state.consumer.qos.increment_eventually(n)
return {'ok': 'prefetch count incremented'}
```
確認你增加這段程式碼到由worker所導入的模組中:這可以與定義Celery應用程式的模組相同,也可以將模組加到到[imports](https://docs.celeryproject.org/en/master/userguide/configuration.html#std-setting-imports)設置中。
重新啟動worker以便註冊控制命令,現在你可以使用`celery control`工具來調用你的命令:
```shell
$ celery -A proj control increase_prefetch_count 3
```
你也可以加入動作到`celery instpec`,下面給出一個讀取當前預取計數器的範例:
```python
from celery.worker.control import inspect_command
@inspect_command
def current_prefetch_count(state):
return {'prefetch_count': state.consumer.qos.value}
```
在重新啟動worker之後你就可以使用`celery inspect`來查詢當前預取計數器的值:
```shell
$ celery -A proj inspect current_prefetch_count
```
## History
20190814_依據4.4版本說明翻譯
20220121_依據5.2版本說明翻譯
## 暫時保留區
輸出資訊
* broker
* 關於broker的訊息
* connect_timeout
以秒為單位,建立新連線的Timeout設置(int/float)
* heartbeat
當前的heartbeat數值(client設置)
* hostname
遠端broker的節點名稱
* insist
No longer used.(似乎已棄用)
* login_method
連接broker的登入方式
* port
遠端broker的Port
* ssl
SSL enabled/disabled
* transport
transport使用的名稱(e.g., `amqp` or `redis`)
* transport_options
傳遞給transport的選項
* uri_prefix
部份transport預期以主機名稱做為URL
```shell
redis+socket:///tmp/redis.sock
```
這個範例中,URI-prefix會是`redis`
* userid
用來連接到broker的user id
* virtual_host
使用虛擬主機
* clock
* Worker logical clock的值。這是一個正整數,並且每次收到統計信息時都應該增加。
* pid
* Worker實例的process id(Main process)
* pool
* Pool-specific section
* max-concurrency
`processes/threads/green threads`的最大數量
* max-tasks-per-child
thread在被回收之前可以執行的最大`tasks`數。
* processes
List of PIDs (or thread-id’s)
* put-guarded-by-semaphore
Internal
* timeouts
時間限制的預設值
* writes
針對prefork pool,這說明使用非同步I/O時pool中每個process的寫入分佈。
* prefetch_count
* `task consumer`的當前預取計數值
* rusage
* 系統使用統計,這欄位可能因為平台不同所有差異。
* From `getrusage(2):`
* stime
代表這個process花費在作業系統程式碼上的時間。
* utime
執行用戶指令所花費的時間
* maxrss
這個process所使用的最大常駐(KB)
* idrss
資料使用的非共享記憶體量(in kilobytes times ticks of execution)
* isrss
用於堆疊空間的非共享記憶體量(in kilobytes times ticks of execution)
* ixrss
用於其它processes的共享記憶體量(in kilobytes times ticks of execution)
* inblock
代表這個process從磁碟讀取文件系統的次數
* oublock
代表這個process從磁碟寫入文件系統的次數
* majflt
通過執行I/O服務的分頁錯誤數量
* minflt
未通過執行I/O服務的分頁錯誤數量
* msgrcv
IPC訊息接收的次數
* msgsnd
IPC訊息發送的次數
* nvcsw
這個process自願性切換上下文的次數
* nivcsw
非自願性切換上下文的次數
* nsignals
接收到信號的次數
* nswap
這個process `swapped entirely out of memory` 的次數
* total
`tasks`名稱的映射,以及Worker啟動以來已接受該類型的`tasks`總數