# Celery_Signals ###### tags: `celery` `celery 5.2` `python` [官方連結_Signals](https://docs.celeryq.dev/en/master/userguide/signals.html) ## Signals 信號允許分離的應用程式在應用程式中的其他地方發生某些操作時接收通知。 Celery帶有很多信號,你的應用程式可以使用這些信號來增強某些操作行為。 ### Basics 幾種事件觸發信號,你可以連接到這些信號,當它們觸發的時候執行某些操作。 一個連接到信號[after_task_publish](https://docs.celeryq.dev/en/master/userguide/signals.html#std-signal-after_task_publish)的範例: ```python from celery.signals import after_task_publish @after_task_publish.connect def task_sent_handler(sender=None, headers=None, body=None, **kwargs): # information about task are located in headers for task messages # using the task protocol version 2. info = headers if 'task' in headers else body print('after_task_publish for task id {info[id]}'.format( info=info, )) ``` 部份信號會有一個`sender`(發送者),你可以用它來過濾。舉例來說,信號[after_task_publish](https://docs.celeryq.dev/en/master/userguide/signals.html#std-signal-after_task_publish)以任務名稱做為`sender`,因此通過提供參數`sender`來進行連接,你可以在每次發佈名稱為"proj.task.add"的任務時將處理程序連接到該請求: ```python @after_task_publish.connect(sender='proj.tasks.add') def task_sent_handler(sender=None, headers=None, body=None, **kwargs): # information about task are located in headers for task messages # using the task protocol version 2. info = headers if 'task' in headers else body print('after_task_publish for task id {info[id]}'.format( info=info, )) ``` 信號使用與**django.core.dispatch**相同的實現,因此,預設情況下,其它的關鍵參數(像是`signal`)會傳送給所有的信號處理程序(signal handlers)。 信號處理程序最好的實踐方式就是接受任意的關鍵參數(像是`**kwargs`)。這樣,新版的Celery就可以增加額外的參數而不會破壞使用者的程式碼。 ### Signals #### Task Signals ##### before_task_publish *New in version 3.1.* 任務發佈之前分派(dispatched)。注意,這是在派送任務的process中執行的。 `sender`為要派送的任務名稱。 提供參數: * body * 任務訊息的表身(task message body) * 這是一個包含任務訊息欄位的映射,關於可定義的可能欄位請參考[Version 2](https://docs.celeryq.dev/en/master/internals/protocol.html#message-protocol-task-v2)與[Version 1](https://docs.celeryq.dev/en/master/internals/protocol.html#message-protocol-task-v1) * exchange * 要派送到的exchange名稱或是[Exchange](https://docs.celeryq.dev/projects/kombu/en/master/reference/kombu.html#kombu.Exchange)物件 * routing_key * 派送訊息的時候所使用的routing key * headers * 應用程式表頭(application headers)的映射(可以修改) * properties * 訊息屬性(可以修改) * declare * 在發佈訊息之前要宣告的實體清單([Exchange](https://docs.celeryq.dev/projects/kombu/en/master/reference/kombu.html#kombu.Exchange), [Queue](https://docs.celeryq.dev/projects/kombu/en/master/reference/kombu.html#kombu.Queue), or binding)。可以修改。 * retry_policy * 重試選項的映射。可以是[kombu.Connection.ensure()](https://docs.celeryq.dev/projects/kombu/en/master/reference/kombu.html#kombu.Connection.ensure)的任意參數,並且是可修改的。 ##### after_task_publish 當任務被派送至broker的時候送出,這是在派送任務的process中執行的。 `sender`為要派送的任務名稱。 提供參數: * headers * 任務訊息的表頭(task message headers),關於可定義的可能欄位請參考[Version 2](https://docs.celeryq.dev/en/master/internals/protocol.html#message-protocol-task-v2)與[Version 1](https://docs.celeryq.dev/en/master/internals/protocol.html#message-protocol-task-v1) * body * 任務訊息的表身(task message body),關於可定義的可能欄位請參考[Version 2](https://docs.celeryq.dev/en/master/internals/protocol.html#message-protocol-task-v2)與[Version 1](https://docs.celeryq.dev/en/master/internals/protocol.html#message-protocol-task-v1) * exchange * 使用的exchange或[Exchange](https://docs.celeryq.dev/projects/kombu/en/master/reference/kombu.html#kombu.Exchange)物件的名稱 * routing_key * 使用的routing key ##### task_prerun 任務執行之前派出。 `sender`是執行中的任務物件。 提供參數: * task_id * 被執行的任務Id * task * 執行中的任務 * args * 任務的位置參數(positional arguments) * kwargs * 任務的關鍵參數(keyword arguments) ##### task_postrun 任務執行之後派出。 `sender`是執行的任務物件。 提供參數: * task_id * 被執行的任務Id * task * 執行中的任務 * args * 任務的位置參數(positional arguments) * kwargs * 任務的關鍵參數(keyword arguments) * retval * 任務的回傳值 * state * 結果狀態的名稱 ##### task_retry 任務重試的時候派出。 `sender`為任務物件。 提供參數: * request * 當前任務的請求 * reason * 重試的原因(通常為例外實例,但可以強制為字串) * einfo * 詳細的異常訊息,包含追溯(一個**billiard.einfo.ExceptionInfo**物件) ##### task_success 任務成功的時候派出。 `sender`是執行的任務物件。 提供參數: * result * 任務的回傳值 ##### task_failure 任務失敗的時候派出。 `sender`是執行的任務物件。 提供參數: * task_id * 任務的Id * exception * 拋出的異常實例 * args * 調用任務的時候使用的位置參數(positional arguments) * kwargs * 調用任務的時候使用的關鍵參數(keyword arguments) * traceback * 堆壘跟蹤物件 * einfo * **billiard.einfo.ExceptionInfo**實例 ##### task_internal_error 在執行任務的時候發生內部Celery錯誤的時候派出。 `sender`是執行的任務物件。 提供參數: * task_id * 任務的Id * args * 調用任務的時候使用的位置參數(positional arguments) * kwargs * 調用任務的時候使用的關鍵參數(keyword arguments) * request * 原始的request dictionary。這是因為在拋出異常時`task.request`可能還沒有準備好 * exception * 拋出的異常實例 * traceback * 堆壘跟蹤物件 * einfo * **billiard.einfo.ExceptionInfo**實例 ##### task_received 在接收到Broker發出的任務並且準備好可以執行的時候派出。 `sender`是consumer object 提供參數: * request * 這是一個[Request](https://docs.celeryq.dev/en/master/reference/celery.worker.request.html#celery.worker.request.Request)實例,並不是`task.requst`。當使用`prefork pool`的時候,這個信號是在[親代處理](http://terms.naer.edu.tw/detail/18677545/)(parent process)中派出,因此`task.request`不能也不應該被使用。改用這個物件,因為它們共享很多相同欄位。 ##### task_revoked 當任務由Worker撤銷/終止的時候派出。 `sender`為被撤銷/終止任務物件。 提供參數: * request * 這是一個[Request](https://docs.celeryq.dev/en/master/reference/celery.worker.request.html#celery.worker.request.Request)實例,並不是`task.requst`。當使用`prefork pool`的時候,這個信號是在[親代處理](http://terms.naer.edu.tw/detail/18677545/)(parent process)中派出,因此`task.request`不能也不應該被使用。改用這個物件,因為它們共享很多相同欄位。 * terminated * 如果任務已終止,設置為True * signum * 用於終止任務的信號編號。如果為None而且`terminated=True`,那應該使用TERM * expired * 如果任務已過期,設置為True ##### task_unknown 當Worker接收到一個沒有註冊的任務的時候派出。 `sender`為Worker [Consumer](https://docs.celeryq.dev/en/master/reference/celery.worker.consumer.html#celery.worker.consumer.Consumer) 提供參數: * name * 不存在於註冊表的任務名稱 * id * 訊息中發現的任務id * message * 原生訊息物件 * exc * 發生的錯誤 ##### task_rejected 當Worker接收到一個未知的訊息類型到它的任務隊列的時候派出。 `sender`為Worker [Consumer](https://docs.celeryq.dev/en/master/reference/celery.worker.consumer.html#celery.worker.consumer.Consumer) 提供參數: * message * 原生訊息物件 * exc * 發生的錯誤(如果有的話) #### App Signals ##### import_modules 當一段程式(Worker、beat、shell等),要求模組在include與imports設置被導入的時候,信號會被送出。 `sender`為app實例。 #### Worker Signals ##### celeryd_after_setup 信號會在設置Worker實例之後,並且在被調用之前送出。這意味著[celery worker -Q](https://docs.celeryq.dev/en/master/reference/cli.html#cmdoption-celery-worker-Q)選項的所有佇列都已經啟用,日誌記錄也已經設置,依此類推。 這可以被用來新增應該總是被使用的自定義佇列,而忽略掉選項[celery worker -Q](https://docs.celeryq.dev/en/master/reference/cli.html#cmdoption-celery-worker-Q)。這是一個為每一個Worker設置`direct queue`的範例,這些佇列可以被用來路由任務到任何一個指定的Worker: ```python from celery.signals import celeryd_after_setup @celeryd_after_setup.connect def setup_direct_queue(sender, instance, **kwargs): queue_name = '{0}.dq'.format(sender) # sender is the nodename of the worker instance.app.amqp.queues.select_add(queue_name) ``` 提供參數: * sender * Worker節點名稱 * instance * 這是要被初始化的[celery.apps.worker.Worker](https://docs.celeryq.dev/en/master/reference/celery.apps.worker.html#celery.apps.worker.Worker)實例。注意到,目前為止只有[app](https://docs.celeryq.dev/en/master/userguide/extending.html#id0)與[hostname](https://docs.celeryq.dev/en/master/userguide/extending.html#id3)(節點名稱)被設定,其餘的`__init__`尚未被執行。 * conf * 當前app的配置 ##### celeryd_init 這是Celery Worker啟動所發送的第一個信號。`sender`為Worker主機名稱,因此這個信號可以用來設置Worker的指定配置: ```python from celery.signals import celeryd_init @celeryd_init.connect(sender='worker12@example.com') def configure_worker12(conf=None, **kwargs): conf.task_default_rate_limit = '10/m' ``` 或者可以設置多個Worker的配置,當你連接的時候可以省略指定`sender`: ```python from celery.signals import celeryd_init @celeryd_init.connect def configure_workers(sender=None, conf=None, **kwargs): if sender in ('worker1@example.com', 'worker2@example.com'): conf.task_default_rate_limit = '10/m' if sender == 'worker3@example.com': conf.worker_prefetch_multiplier = 0 ``` 提供參數: * sender * Worker的節點名稱 * instance * 這是要被初始化的[celery.apps.worker.Worker](https://docs.celeryq.dev/en/master/reference/celery.apps.worker.html#celery.apps.worker.Worker)實例。注意到,目前為止只有[app](https://docs.celeryq.dev/en/master/userguide/extending.html#id0)與[hostname](https://docs.celeryq.dev/en/master/userguide/extending.html#id3)(節點名稱)被設定,其餘的`__init__`尚未被執行。 * conf * 當前app的配置 * options * 從命令列參數(包含預設置)傳送給Worker的選項 ##### worker_init 啟動Worker之前派出。 ##### worker_ready 當Worker準備好接收工作的時候派出。 ##### heartbeat_sent Dispatched when Celery sends a worker heartbeat. `Sender`為[`celery.worker.heartbeat.Heart`](https://docs.celeryq.dev/en/master/internals/reference/celery.worker.heartbeat.html#celery.worker.heartbeat.Heart)實例。 ##### worker_shutting_down 當Worker開始關閉process的時候派出。 提供參數: * sig * 收到的POSIX信號 * how * 關機的方式,暖關機或冷關機 * exitcode * 主要process退出的時候要使用的exitcode ##### worker_process_init 當所有pool內的子處理(child processes)啟動的時候就分派它們。 注意,附加到這個信號的處理程序的阻塞時間不能超過4秒,否則將在process啟動失敗的情况下終止該process。 ##### worker_process_shutdown 在所有pool內的子處理退出之前分派給它們。 注意:並不保證這個信號一定被派出,類似於[finally](https://docs.python.org/dev/reference/compound_stmts.html#finally)blocks,無法保證在關閉時調用處理程序,即使調用也可能會在期間被中斷。 提供參數: * pid * 即將關閉的child process的pid * exitcode * [子處理](http://terms.naer.edu.tw/detail/1274415/)退出的時候要使用的exitcode ##### worker_shutdown 當Worker要關閉的時候派出。 #### Beat Signals ##### beat_init 當celery beat啟動的時候派出(不論是獨立或嵌入) `sender`為[celery.beat.Service](https://docs.celeryq.dev/en/master/reference/celery.beat.html#celery.beat.Service)實例 ##### beat_embedded_init 當celery beat做為embedded process啟動的時候,除了[beat_init](https://docs.celeryq.dev/en/master/userguide/signals.html#std-signal-beat_init)之外,還會派出這個信號。 `sender`為[celery.beat.Service](https://docs.celeryq.dev/en/master/reference/celery.beat.html#celery.beat.Service)實例 #### Eventlet Signals ##### eventlet_pool_started 當eventlet pool啟動的時候發送。 `sender`為[celery.concurrency.eventlet.TaskPool](https://docs.celeryq.dev/en/master/internals/reference/celery.concurrency.eventlet.html#celery.concurrency.eventlet.TaskPool)實例 ##### eventlet_pool_preshutdown 當Worker關閉,就在eventlet pool請求等待其它Worker的時候發送。 `sender`為[celery.concurrency.eventlet.TaskPool](https://docs.celeryq.dev/en/master/internals/reference/celery.concurrency.eventlet.html#celery.concurrency.eventlet.TaskPool)實例 ##### eventlet_pool_postshutdown 當pool已經加入而且Worker已經準備好關閉的時候發送。 `sender`為[celery.concurrency.eventlet.TaskPool](https://docs.celeryq.dev/en/master/internals/reference/celery.concurrency.eventlet.html#celery.concurrency.eventlet.TaskPool)實例 ##### eventlet_pool_apply 當任務應用於pool的時候發送。 `sender`為[celery.concurrency.eventlet.TaskPool](https://docs.celeryq.dev/en/master/internals/reference/celery.concurrency.eventlet.html#celery.concurrency.eventlet.TaskPool)實例 提供參數: * target * 目標函數 * args * 位置參數(Positional arguments) * kwargs * 關鍵參數(Keyword arguments) #### Logging Signals ##### setup_logging 如果連接這個信號,Celery將不會配置日誌記錄,因此你可以使用這個功能來完成依你需求做日誌記錄的配置。 如果您想通過Celery擴充日誌記錄配置,那你可以使用信號[after_setup_logger](https://docs.celeryq.dev/en/master/userguide/signals.html#std-signal-after_setup_logger)和[after_setup_task_logger](https://docs.celeryq.dev/en/master/userguide/signals.html#std-signal-after_setup_task_logger)。 提供參數: * loglevel * 日誌記錄等級 * logfile * 日誌文件檔案名稱 * format * 日誌格式 * colorize * 指定日誌是否為彩色 ##### after_setup_logger 設置每個全域日誌記錄(非任務日誌記錄)後發送。用於擴充日誌記錄配置。 提供參數: * logger * The logger object * loglevel * 日誌記錄等級 * logfile * 日誌文件檔案名稱 * format * 日誌格式 * colorize * 指定日誌是否為彩色 ##### after_setup_task_logger 設置每個單獨任務日誌記錄後發送。用於擴充日誌記錄配置。 提供參數: * logger * The logger object * loglevel * 日誌記錄等級 * logfile * 日誌文件檔案名稱 * format * 日誌格式 * colorize * 指定日誌是否為彩色 #### Command signals ##### user_preload_options 任何的Celery命令行程序完成解析用戶預載選項後,都會發送這個信號。 它可以用來增加額外的命令行參數到celery umbrella command: ```python from celery import Celery from celery import signals from celery.bin.base import Option app = Celery() app.user_options['preload'].add(Option( '--monitoring', action='store_true', help='Enable our external monitoring utility, blahblah', )) @signals.user_preload_options.connect def handle_preload_options(options, **kwargs): if options['monitoring']: enable_monitoring() ``` `sender`為Command instance,其值取決於調用的程式(例如,對於umbrella command而言,它將是物件CeleryCommand)。 提供參數: * app * app實例 * options * 解析的用戶預載選項的映射(具有默認值)。 #### Deprecated Signals ##### task_sent 這已經不再使用了,請使用[after_task_publish](https://docs.celeryq.dev/en/master/userguide/signals.html#std-signal-after_task_publish) ## History 20190923_依據4.4版本說明翻譯 20220523_依據5.2版本說明翻譯