# Celery_Calling Tasks ###### tags: `celery` `celery 5.2` `python` [官方連結_Calling Tasks](https://docs.celeryproject.org/en/stable/userguide/calling.html) ## Calling Tasks ### Basics 這份文件說明task instances與[canvas](https://docs.celeryproject.org/en/stable/userguide/canvas.html#guide-canvas)所使用的"Calling API"。 API定義了一組標準的執行選項以及三種方法: 1. `apply_async(args[, kwargs[, …]])` * 派送任務訊息 2. `delay(*args, **kwargs)` * 派送任務訊息的快捷方式,但不支援執行選項的設置 3. `calling (__call__)` * 應用物件(object)支援Calling API,代表任務並不會被worker執行,而是在當前程序執行(current process)。(沒有訊息被派送) :::info **Quick Cheat Sheet** 1. `T.delay(arg, kwarg=value)` * Star arguments shortcut to `.apply_async. (.delay(*args, **kwargs) calls .apply_async(args, kwargs))`. 2. `T.apply_async((arg,), {'kwarg': value})` 3. `T.apply_async(countdown=10)` * 現在開始10秒內執行 4. `T.apply_async(eta=now + timedelta(seconds=10))` * 現在開始10秒內執行,指定使用eta 5. `T.apply_async(countdown=60, expires=120)` * 現在開始1分鐘內執行,但2分鐘之後到期 6. `T.apply_async(expires=now + timedelta(days=2))` * 使用[datatime](https://docs.python.org/dev/library/datetime.html#datetime.datetime)設置,2天後到期 ::: #### Example 使用[delay()](https://docs.celeryproject.org/en/stable/reference/celery.app.task.html#celery.app.task.Task.delay)很方便,它看起來就像是使用一個再普通不過的函數: ```python task.delay(arg1, arg2, kwarg1='x', kwarg2='y') ``` 使用[apply_async()](https://docs.celeryproject.org/en/stable/reference/celery.app.task.html#celery.app.task.Task.apply_async)來代替的話可以這樣寫: ```python task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'}) ``` 因此,使用`delay`是很方便沒錯,不過如果你要設置額外的執行選項的話,就必需使用`apply_async`。 這文件的其餘部份會直接進入任務執行選項的細部設置。所有的範例均使用一個名為`add`的任務,函數會回傳兩個參數的加總: ```python @app.task def add(x, y): return x + y ``` :::info **There’s another way…** 後面你在讀Canvas的時候會有更深的體驗,不過signature(簽章)是用於傳遞任務調用簽章的對象(例如,透過網路派送),而且它們也支援Calling API: ```python task.s(arg1, arg2, kwarg1='x', kwargs2='y').apply_async() ``` ::: ### Linking (callbacks/errbacks) Celery支援將任務連結在一起,一個跟著一個這樣。這種callback task(回呼任務)會伴隨著父任務的結果做為局部參數(partial argument),如下說明: ```python add.apply_async((2, 2), link=add.s(16)) ``` 第一個任務(父任務)的回傳值4將會被送去新任務,並將16加到第一個任務的回傳值4,最後得到(2+2)+16=20。 如果任務拋出異常(errbacks),你也可以觸發回呼函數(callback)。不過worker並不會真正的去呼叫這個errback來做為任務,而是會直接呼叫errback fucntion,這樣就可以把原始的請求(request)、例外(exception)與回溯(trackback)物件傳遞給它。 下面給出`error callback`的範例: ```python @app.task def error_handler(request, exc, traceback): print('Task {0} raised exception: {1!r}\n{2!r}'.format( request.id, exc, traceback)) ``` 可以使用`link_error`這個執行選項將之加入任務中: ```python add.apply_async((2, 2), link_error=error_handler.s()) ``` 此外,`link`與`link_error`都可以使用`list`來表示: ```python add.apply_async((2, 2), link=[add.s(16), other_task.s()]) ``` 以`list`來表示的時候,callbacks與errbacks都會被按序執行,而且所有的回呼函數都會使用父任務的回傳值做為局部參數。 :::info **What’s s?** 上面用的`add.s`就是一種簽章(signature)。如果你不知道這是什麼的話,後續可以讀讀[canvas guide](https://docs.celeryproject.org/en/stable/userguide/canvas.html#guide-canvas)。在那邊你還可以學到一些關於[chain](https://docs.celeryproject.org/en/stable/reference/celery.html#celery.chain):一種將任務鏈結起來的簡單方法。 ::: ### On message Celery支援使用`on_message` callback來補捉所有狀態變化。 舉例來說,如果你想要一個長時間執行的任務發送它的任務進度的話,可以這麼做: ```python @app.task(bind=True) def hello(self, a, b): time.sleep(1) self.update_state(state="PROGRESS", meta={'progress': 50}) time.sleep(1) self.update_state(state="PROGRESS", meta={'progress': 90}) time.sleep(1) return 'hello world: %i' % (a+b) ``` ```python def on_raw_message(body): print(body) a, b = 1, 1 r = hello.apply_async(args=(a, b)) print(r.get(on_message=on_raw_message, propagate=False)) ``` 會得到下面的結果: ```jsonld {'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7', 'result': {'progress': 50}, 'children': [], 'status': 'PROGRESS', 'traceback': None} {'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7', 'result': {'progress': 90}, 'children': [], 'status': 'PROGRESS', 'traceback': None} {'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7', 'result': 'hello world: 10', 'children': [], 'status': 'SUCCESS', 'traceback': None} hello world: 10 ``` ### ETA and Countdown ETA(estimated time of arrival)<sub>(預計到達時間)</sub>允許你可以設置指定的日期與時間,這是任務執行的最早時間。`countdown`是一種以秒為單位設置ETA的快捷方式。 ```python # 下面範例設置3秒後執行 >>> result = add.apply_async((2, 2), countdown=3) >>> result.get() # this takes at least 3 seconds to return 20 ``` 任務保證在指定日期與時間之後的某個時間被執行,但不一定在那個確切的時間。當然有很多的原因會造成你無法在時間內執行,像是隊列中還有許多項目正排隊等待,或網路負載延遲。為了確認你的任務能夠被及時處理,你應該監視隊最是否擁塞。使用Munin或類似的工具,接收警示,再採取適合的調整來減輕工作量。參閱[Mnium](https://docs.celeryproject.org/en/stable/userguide/monitoring.html#monitoring-munin) 雖然`countdown`是一個整數,不過`eta`必需要是[datatime](https://docs.python.org/dev/library/datetime.html#datetime.datetime),指定確切的日期與時間(包含毫秒精度以及時區): ```python >>> from datetime import datetime, timedelta >>> tomorrow = datetime.utcnow() + timedelta(days=1) >>> add.apply_async((2, 2), eta=tomorrow) ``` :::warning Warning: 當你使用RabbitMQ做為message broker的時候,你指定一個`countdown`超過15分鐘,你可能會遇到worker拋出一個[PreconditionFailed](https://docs.celeryproject.org/projects/amqp/en/latest/reference/amqp.exceptions.html#amqp.exceptions.PreconditionFailed)的異常: ``` amqp.exceptions.PreconditionFailed: (0, 0): (406) PRECONDITION_FAILED - consumer ack timed out on channel ``` RabbitMQ在3.8.15之後,其`consumer_timeout`預設為15分鐘。而在3.8.17之後預設為30分鐘。如果consumer沒有確認其交付(delivery)超過timeout value,那就會關閉它的channel,然後拋出一個異常`PRECONDITION_FAILED`。更多細節請參閱[Delivery Acknowledgement Tiemout](https://www.rabbitmq.com/consumers.html#acknowledgement-timeout)。 想要解決這個問題,就是要在RabbitMQ的配置文件`rabbitmq.conf`中指定參數`consumer_timeout`的值要大於等於你的countdown value(倒數計時的值)。舉例來說,你可以指定一個很大大大大的值,`consumer_timeout = 31622400000`(毫秒計算來看,這是1年的值)來避免後續有出問題。 ::: ### Expiration 參數`exipres`定義了一個可選的到期時間,可以是任務派送之後幾秒,或是使用[`datatime`](https://docs.python.org/dev/library/datetime.html#datetime.datetime)指定日期與時間 ```python >>> # Task expires after one minute from now. >>> add.apply_async((10, 10), expires=60) >>> # Also supports datetime >>> from datetime import datetime, timedelta >>> add.apply_async((10, 10), kwargs, ... expires=datetime.now() + timedelta(days=1) ``` 當一個worker接收到一個過期任務的時候,它會將該任務標記為[REVOKED](https://docs.celeryproject.org/en/stable/userguide/tasks.html#std-state-REVOKED)[TaskRevokedError](https://docs.celeryproject.org/en/stable/reference/celery.exceptions.html#celery.exceptions.TaskRevokedError) ### Message Sending Retry 在連接失敗的時候Celery會自動重新派送訊息,而且你是可以設置這種重新測試的行為,像是多久重新測試,最大重新測試次數,或是通通設置為disable。 要disabled重新測試,你可以設置`retry=False`: ```python add.apply_async((2, 2), retry=False) ``` :::info **Related Settings** * [task_publish_retry](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-task_publish_retry) * [task_publish_retry_policy](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-task_publish_retry_policy) ::: #### Retry Policy 重新測試的策略是一種著控制重新測試派送任務行為的映射,可以包含下面幾個關鍵: * max_retries * 放棄之前的最大重新測試次數,在這情況下會拋出重新測試異常 * 設置為`None`則代表會一直重新測試 * 預設值為3 * interval_start * 定義重新測試之間的等待秒數(浮點數或整數),預設為0(第一次重測是瞬間的) * interval_step * 在每次連續重新測試的時候,這個數值會被加到重新測試延值(點數或整數) * 預設值為0.2 * interval_max * 重新測試之間的等待最大秒數(浮點數或整數) * 預設值為0.2 舉例來說,預設的重新測試策略為: ```python add.apply_async((2, 2), retry=True, retry_policy={ 'max_retries': 3, 'interval_start': 0, 'interval_step': 0.2, 'interval_max': 0.2, }) ``` 重新測試所花的最長時間為0.4秒。預設情況下相對較短,因為如果broker連接中斷,連接失敗可能會導致堆疊效應。舉例來說,許多web server程序等待重新測試,阻止了其它傳入的請求。 ### Connection Error Handling 當你派送一個任務,並且該訊息傳輸連接遺失了,或連接無法被啟動,會拋出`OperationalError`異常: ```python >>> from proj.tasks import add >>> add.delay(2, 2) Traceback (most recent call last): File "<stdin>", line 1, in <module> File "celery/app/task.py", line 388, in delay return self.apply_async(args, kwargs) File "celery/app/task.py", line 503, in apply_async **options File "celery/app/base.py", line 662, in send_task amqp.send_task_message(P, name, message, **options) File "celery/backends/rpc.py", line 275, in on_task_call maybe_declare(self.binding(producer.channel), retry=True) File "/opt/celery/kombu/kombu/messaging.py", line 204, in _get_channel channel = self._channel = channel() File "/opt/celery/py-amqp/amqp/connection.py", line 272, in connect self.transport.connect() File "/opt/celery/py-amqp/amqp/transport.py", line 100, in connect self._connect(self.host, self.port, self.connect_timeout) File "/opt/celery/py-amqp/amqp/transport.py", line 141, in _connect self.sock.connect(sa) kombu.exceptions.OperationalError: [Errno 61] Connection refused ``` 假如你啟用重新測試,那就只會在重新測試次數耗盡或立即禁用的時候才會發生。 你也可以這麼處理錯誤: ```python >>> from celery.utils.log import get_logger >>> logger = get_logger(__name__) >>> try: ... add.delay(2, 2) ... except add.OperationalError as exc: ... logger.exception('Sending task raised: %r', exc) ``` ### Serializers 資料在clients與workers之間的傳輸是需要被序列化的,因此所有在Celery的訊息都有著`content_type`的表頭(header),它描述著序列化的編碼方式。 預設序列化方式是`JSON`,但是你可以使用[task_serializer](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-task_serializer)來改變,或是每次任務單獨的設置,甚至可以細到每個訊息的設置。 內建支援json、pickle、YAML、以及msgpack,你可以透過Kombu來自定義序列化方式。 :::info See also: [Message Serialization](https://docs.celeryproject.org/projects/kombu/en/master/userguide/serialization.html#guide-serialization) ::: :::info **Security** pickle這個模組允許可以執行任意的函數,請參閱[security guide](https://docs.celeryproject.org/en/stable/userguide/security.html#guide-security)。 Celery自帶一個特殊的序列化方式,可以針對你所派送的訊息做加密簽章的處理。 ::: 每個序列化方式都有它的優缺點: * json * JSON支援很多程式語言,是Python的標準庫,編碼相當快速,如[simplejson](https://pypi.org/project/simplejson/)。 * JSON的主要缺點就是它限制了你使用下列資料類型(Decimals與dates很明顯的是沒有的) * strings * Unicode * floats * Boolean * dictionaries * lists * 二進位資料(Binary data)會使用Base64編碼,與所支援的原生二進位類型的編碼格式相比,傳輸數據量增加了34% * 如果你的資料符合上述限制,而且你有跨語言支援需求,JSON的預設設置也許是你的最好選擇 * 更多信息請參閱[http://json.org](https://www.json.org/json-en.html) :::info **Note:** 從Python[官方文件](https://docs.python.org/3.6/library/json.html))資料來看,其JSON的鍵值結終為字串(str)類型。當你把dictionary轉為JSON的時候,所有的`keys`(鍵值)也都會強制轉為字串。所以你如果轉來轉去的把dict轉JSON,再從JSON轉回dict的話,那它可能就不是原本的dict。簡單來說就是你的dict如果有非字串類型的鍵值,那轉回去就不會是原始的dict。 ::: * pickle * 如果你沒有支援Python以外的語言需求,使用pickle將擁有所有Python內建資料類型的支援(class instance除外),派送binary files時更小的訊息,以及較JSON快一些的處理速度 * 更多信息請參閱[pickle](https://docs.python.org/dev/library/pickle.html#module-pickle) * yaml * YAML有著許多與json相同的特性,不過YAML支援更多的資料類型(包含日期、遞歸引用..等) * Python的標準庫中,YAML執行效率較JSON慢一些些 * 如果你需要更豐富的資料類型以及保持跨語言的兼容性,那YAML會較上述更適合。 * 更多信息請參閱[http://yaml.org](http://yaml.org/) * msgpack * msgpack是一個二進位序列化格式,功能面類似於JSON。還有新,剛出的,所以應該將它視為實驗性的會比較好 * 更多信息請參閱[http://msgpack.org/](http://msgpack.org/) 使用的編碼方式可以做為訊息的表頭(message header),這樣worker就知道對任務做怎麼樣的反序列化。如果你使用自定義的序列化方式,這序列化方式必須可以提供worker使用。 下面順序用於決定派送任務的時候所使用的序列化方式: 1. The `serializer execution` option 2. The [`Task.serializer`](https://docs.celeryproject.org/en/stable/reference/celery.app.task.html#celery.app.task.Task.serializer) attribute 3. The [`task_serializer`](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-task_serializer) setting 下面說明為單個任務設置自定義序列化函式: ```python >>> add.apply_async((10, 10), serializer='json') ``` ### Compression Celery可以使用下面內建方案來壓縮訊息: * brotli * brotli針對網頁進行了最佳化,尤其是小型文字類型的文件( text documents)。對於提供靜態內容,像是字型與html頁面最有效 * 使用它,安裝Celery的時候記得如下安裝: ```shell $ pip install celery[brotli] ``` * bzip2 * bzip2可以建立比gzip更小的檔案,但壓縮與解壓縮的速度明顯比gzip還要慢 * 要使用它,請確保你的Python支援執行文件是使用bzip2編譯的 * 如果你得到下面異常[`ImportError`](https://docs.python.org/dev/library/exceptions.html#ImportError),這意味著你應該重新編譯你的Python支援的bzip2版本 ```python >>> import bz2 Traceback (most recent call last): File "<stdin>", line 1, in <module> ImportError: No module named 'bz2' ``` * gzip * gzip適用於記憶體空間較小的系統,適合用於記憶體有限的系統。它通常被用來產生副檔名為`.tar.gz`的檔案 * 要使用它,請確保你的Python支援執行文件是使用gzip編譯的 * 如果你得到下面異常[`ImportError`](https://docs.python.org/dev/library/exceptions.html#ImportError),這意味著你應該重新編譯你的Python支援的bzip2版本 ```python >>> import gzip Traceback (most recent call last): File "<stdin>", line 1, in <module> ImportError: No module named 'gzip' ``` * lzma * lzma提供了良好的壓縮比並且有更快的壓縮、解壓縮執行效率,只是這也造成記憶體用量更高 * 要使用它,請確保你的Python支援執行文件是使用lzma編譯的,而且你的Python是3.3或更新的版本 * 如果你得到下面異常[`ImportError`](https://docs.python.org/dev/library/exceptions.html#ImportError),這意味著你應該重新編譯你的Python支援的lzma版本 ```python >>> import lzma Traceback (most recent call last): File "<stdin>", line 1, in <module> ImportError: No module named 'lzma' ``` * 你也可以如下安裝: ```shell $ pip install celery[lzma] ``` * zlib * zlib是庫形式的Deflate演算法的抽象,它包括對gzip文件格式和API中的輕量級串流格式的支援。它是許多軟體系統的關鍵組成,像Linux kernel、Git VCS.. * 要使用它,請確保你的Python支援執行文件是使用zlib編譯的 * * 如果你得到下面異常[`ImportError`](https://docs.python.org/dev/library/exceptions.html#ImportError),這意味著你應該重新編譯你的Python支援的lzma版本 ```python >>> import zlib Traceback (most recent call last): File "<stdin>", line 1, in <module> ImportError: No module named 'zlib' ``` * zstd * zstd以zlib-level的即時壓縮以及更好的壓縮比為目標。由Huff0與FSE兩個libary所提供的非常快速的entropy stage的支持 * 使用它,安裝Celery的時候記得如下安裝: ```shell $ pip install celery[zstd] ``` 你也可以自定義壓縮方案,並且在[kombu.compression.register](https://docs.celeryproject.org/projects/kombu/en/master/reference/kombu.compression.html#kombu.compression.register)中註冊它。 下面順序用於派送任務的時候決定壓縮的方式: 1. The compression execution option 2. The `Task.compression` attribute 3. The [task_compression](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-task_compression) attribute 下面範例說明在呼叫任務的時候指定壓縮方式: ```python >>> add.apply_async((2, 2), compression='zlib') ``` ### Connections 你可以通過建立publisher來手動處理連接的部份: ```python results = [] with add.app.pool.acquire(block=True) as connection: with add.get_publisher(connection) as publisher: try: for args in numbers: res = add.apply_async((2, 2), publisher=publisher) results.append(res) print([res.get() for res in results]) ``` 雖然說這個特殊的例子用`group`的話可以有更好的表示方式: ```python >>> from celery import group >>> numbers = [(2, 2), (4, 4), (8, 8), (16, 16)] >>> res = group(add.s(i, j) for i, j in numbers).apply_async() >>> res.get() [4, 8, 16, 32] ``` :::info **Automatic Pool Support** 從版本2.3之後支援`automatic connection pools`,因此你不需要手動處理連接和發行者以重用連接。 `connection pools`從版本2.5之後預設狀態為啟用。 見[broker_pool_limit](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-broker_pool_limit)說明 ::: ### Routing options Celery可以路由任務到不同的隊列。 簡單的路由(name<->name)可以使用選項`queue`來完成: ```python add.apply_async(queue='priority.high') ``` 然後,你可以利用參數`-Q`來指派worker讓它加入相對應設置的隊列名稱(`celery, priority.high`): ```shell $ celery -A proj worker -l INFO -Q celery,priority.high ``` :::info **See also:** 不建議在程式碼程以硬編碼的方式來加入隊列,最好的方法是利用配置路由來設置。([task_routes](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-task_routes)) 更多路由可以參閱[Routing Tasks](https://docs.celeryproject.org/en/stable/userguide/routing.html#guide-routing) ::: ### Results options 你可以利用設置[task_ignore_result](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-task_ignore_result)或選項(`ignore_result`)來決定是否保存任務執行結果 ```python >>> result = add.apply_async((1, 2), ignore_result=True) >>> result.get() None >>> # Do not ignore result (default) ... >>> result = add.apply_async((1, 2), ignore_result=False) >>> result.get() 3 ``` 如果你想要額外保存任務的元資料(metadata)在result backend,那就記得將[result_extended](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-result_extended)設置為True。 :::info **See also:** 更多任務說明請參閱[Tasks](https://docs.celeryproject.org/en/stable/userguide/tasks.html#guide-tasks) ::: #### Advanced Options 這些選項是提供給應用AMQP完整路由功能的進階用戶,有興趣的話可以閱讀[routind guide](https://docs.celeryproject.org/en/stable/userguide/routing.html#guide-routing) * excnahge * 要將訊息派送到的那個交換機的名稱(或[kombu.entity.Exchange](https://docs.celeryproject.org/projects/kombu/en/master/reference/kombu.html#kombu.Exchange)) * routing_key * routing key * 用來確定的routing key * priority * 0-255的數值,255代表最高優先(RabbitMQ) * Redis的話則0為最高優化 ## History 20190725_依據4.3版本說明調整 20211210_依據5.2版本說明調整