# Celery_Routing Tasks ###### tags: `celery` `celery 5.2` `python` [官方連結_Routing Tasks](https://docs.celeryq.dev/en/master/userguide/routing.html) ## Routing Tasks :::info **Note:** Alternate routing(替代路由),像是topic與fanout並不適用於所有傳輸。請參考[transport comparison table](https://docs.celeryq.dev/projects/kombu/en/master/introduction.html#transport-comparison) ::: ### Basics #### Automatic routing 使用routing最簡單的方法就是使用[task_create_missing_queues](https://docs.celeryq.dev/en/master/userguide/configuration.html#std-setting-task_create_missing_queues)設置(預設開啟) 開啟這設置之後,將自動建立尚未在[task_queues](https://docs.celeryq.dev/en/master/userguide/configuration.html#std-setting-task_queues)中定義的命名佇列。這讓執行簡單的路由任務變得容易。 假設你有兩台伺服器,`x`與`y`處理常規任務,另一台伺服器`z`只處理feed的相關任務。你可以使用這個配置: ```python task_routes = {'feed.tasks.import_feed': {'queue': 'feeds'}} ``` 啟用這個路由之後,feed的相關任務就會被路由到`feeds`這個佇列,其它的任務會被路由到預設的佇列(基於歷史原因,命名為`celery`)。 又或者,你可以使用glob的模式匹配,甚至使用正則表達式來匹配在命名空間為`feed.tasks`的所有任務: ```python app.conf.task_routes = {'feed.tasks.*': {'queue': 'feeds'}} ``` 如果匹配模式的順序很重要的話,你就應該以項目格式來指定路由: ```python task_routes = ([ ('feed.tasks.*', {'queue': 'feeds'}), ('web.tasks.*', {'queue': 'web'}), (re.compile(r'(video|image)\.tasks\..*'), {'queue': 'media'}), ],) ``` :::info **Note:** [task_routes](https://docs.celeryq.dev/en/master/userguide/configuration.html#std-setting-task_routes)設置也可以使用dict,或路由物件的清單(router object list),這種情況下,我們應該指定這個設置是包含list的tuple格式。 ::: 在安裝路由之後,你可以啟動伺服器`z`單純的處理佇列`feeds`,像這樣: ```shell user@z:/$ celery -A proj worker -Q feeds ``` 你可以根據需求指定你要的佇列,因此你也可以讓這台伺服器處理預設佇列: ```shell user@z:/$ celery -A proj worker -Q feeds,celery ``` ##### Changing the name of the default queue 你可以利用下面配置來調整預設佇列的名稱: ```python app.conf.task_default_queue = 'default' ``` ##### How the queues are defined 這個功能的重點在於替單純需要基本功能的使用者隱藏複雜的AMQP協定。只是,也許你會對如何宣告這些佇列感到興趣。 下面的設定會建立一個名稱為`video`的佇列: ```python {'exchange': 'video', 'exchange_type': 'direct', 'routing_key': 'video'} ``` 非AMQP的backends,像是Redis或SQS並不支援exchanges,所以他們將exchange的名稱設置與佇列相同。使用這個設置可以確保它們可以正常作業。 #### Manual routing 假設你有兩台伺服器,`x`與`y`在處理常規任務,另一台伺服器`z`單純處理`feed`相關的任務,你可以使用這個配置: ```python from kombu import Queue app.conf.task_default_queue = 'default' app.conf.task_queues = ( Queue('default', routing_key='task.#'), Queue('feed_tasks', routing_key='feed.#'), ) app.conf.task_default_exchange = 'tasks' app.conf.task_default_exchange_type = 'topic' app.conf.task_default_routing_key = 'task.default' ``` [task_queues](https://docs.celeryq.dev/en/master/userguide/configuration.html#std-setting-task_queues)是[Queue instance](https://docs.celeryq.dev/projects/kombu/en/master/reference/kombu.html#kombu.Queue)的清單,如果你沒有替routing_key設置exchange或exchange_type,那它們就會從[task_default_exchange](https://docs.celeryq.dev/en/master/userguide/configuration.html#std-setting-task_default_exchange)與[task_default_exchange_type](https://docs.celeryq.dev/en/master/userguide/configuration.html#std-setting-task_default_exchange_type)取得設置。 要將任務路由到`feed_tasks`這個佇列,你可以在[task_routes](https://docs.celeryq.dev/en/master/userguide/configuration.html#std-setting-task_routes)增加一個項目設置: ```python task_routes = { 'feeds.tasks.import_feed': { 'queue': 'feed_tasks', 'routing_key': 'feed.import', }, } ``` 你也可以在`Task.apply_async()`或`send_task()`使用參數`routing_key`來覆寫: ```python >>> from feeds.tasks import import_feed >>> import_feed.apply_async(args=['http://cnn.com/rss'], ... queue='feed_tasks', ... routing_key='feed.import') ``` 要讓伺服器`z`單純的使用佇列`feed`,你可以啟動它的時候使用選項[celery worker -Q](https://docs.celeryq.dev/en/master/reference/cli.html#cmdoption-celery-worker-Q): ```shell user@z:/$ celery -A proj worker -Q feed_tasks --hostname=z@%h ``` 伺服器`x`、`y`必須被配置為使用預設佇列: ```shell user@x:/$ celery -A proj worker -Q default --hostname=x@%h user@y:/$ celery -A proj worker -Q default --hostname=y@%h ``` 如果你想要,你甚至可以讓你處理`feed`的Worker也處理常規任務,也許有時候工作量大的時候需要這麼做: ```shell user@z:/$ celery -A proj worker -Q feed_tasks,default --hostname=z@%h ``` 如果你有另外的佇列並且想增加其它的exchange,只需要指定自定義的exchange與exchange type: ```python from kombu import Exchange, Queue app.conf.task_queues = ( Queue('feed_tasks', routing_key='feed.#'), Queue('regular_tasks', routing_key='task.#'), Queue('image_tasks', exchange=Exchange('mediatasks', type='direct'), routing_key='image.compress'), ) ``` 如果你對這部份感到疑慮,那你應該瞭解AMQP。 :::info **See also:** In addition to the Redis Message Priorities below, there’s Rabbits and Warrens, an excellent blog post describing queues and exchanges. There’s also The CloudAMQP tutorial, For users of RabbitMQ the RabbitMQ FAQ could be useful as a source of information. ::: ### Special Routing Options #### RabbitMQ Message Priorities **supported transports:** RabbitMQ *New in version 4.0.* 佇列可以透過設置參數`x-max-priority`來支援優先序: ```python from kombu import Exchange, Queue app.conf.task_queues = [ Queue('tasks', Exchange('tasks'), routing_key='tasks', queue_arguments={'x-max-priority': 10}), ] ``` 對所有佇列設定預設值可以使用[task_queue_max_priority](https://docs.celeryq.dev/en/master/userguide/configuration.html#std-setting-task_queue_max_priority)來設置: ```python app.conf.task_queue_max_priority = 10 ``` 也可以使用[task_default_priority](https://docs.celeryq.dev/en/master/userguide/configuration.html#std-setting-task_default_priority)對所有任務指定優先序的預設值: ```python app.conf.task_default_priority = 5 ``` #### Redis Message Priorities **supported transports:** Redis 雖然Celery Redis的傳輸確實遵守優先序欄位,但是Redis本身並沒有優先序的概念。在嚐試使用Redis實作優先序之前,請先閱讀本說明,因為你可能會遇到非預期行為。 為了能夠基於優先序來調度任務,你需要配置傳輸選項`queue_order_strategy`: ```python app.conf.broker_transport_options = { 'queue_order_strategy': 'priority', } ``` 這優先序的支援是透過為每個佇列建立`n`個list來實現的。這意味著,即使有十個(0-9)優先級別,預設情況下還是的為了節省資源而合併為四個級別。這意味著名稱為`celery`的佇列會被分成四個佇列。 最高優先序的佇列會被命名為`celery`,其它的佇列就會有一個分隔符(預設為`x06x16`),然後它們的優先序號碼就會被附加到佇列名稱。 ```shell ['celery', 'celery\x06\x163', 'celery\x06\x166', 'celery\x06\x169'] ``` 如果你需要更多的優先序級別或是不同的分隔符,那麼你可以設置傳輸選項`priority_steps`與`sep`: ```python app.conf.broker_transport_options = { 'priority_steps': list(range(10)), 'sep': ':', 'queue_order_strategy': 'priority', } ``` 上面的配置會給你這些佇列名稱: ``` ['celery', 'celery:1', 'celery:2', 'celery:3', 'celery:4', 'celery:5', 'celery:6', 'celery:7', 'celery:8', 'celery:9'] ``` 也就是說,請注意,這永遠不會像在伺服器級別實現的優先順序那樣好,而且充其量也可能只是近似。但對你的應用程式而言可能是足夠的了。 ### AMQP Primer #### Messages 訊息由header與body組成。Celery使用headers來保存訊息的內容類型以及它的內容編碼。內容類型通常是用於序列化訊息的序列化格式。body包含要執行的任務名稱,任務的id`(UUID)`,執行參數以及額外的meta-data,像是重試次數或ETA。 這是一個以Python dict表示的任務訊息: ```python {'task': 'myapp.tasks.add', 'id': '54086c5e-6193-4575-8308-dbab76798756', 'args': [4, 4], 'kwargs': {}} ``` #### Producers, consumers, and brokers 發送訊息的Client端通常稱為publisher,或producer,接收訊息的實體稱為consumer。 broker是訊息服務,將訊息由producers路由到consumers。 你可能會在AMQP相關材料中看到這些術語被大量使用。 #### Exchanges, queues, and routing keys 1. 訊息發送到exchanges。 2. exchange會把訊息路由到一個或多個佇列。有很多種exchange type,提供不同的路由方式或實現不同的訊息傳遞方案。 3. 訊息會保留在佇列中,一直到有人取用(consumes)。 4. 訊息確認之後就會從佇列刪除。 發送與接收訊息的步驟: 1. 建立exchange 2. 建立queue 3. 將queue綁定到exchange Celery為[task_queues](https://docs.celeryq.dev/en/master/userguide/configuration.html#std-setting-task_queues)中的佇列自動建立需求實體(除非佇列設置`auto_declar=False`) 這邊是一個帶有三個佇列的配置;一個用於video,一個用於images,一個是提供其它的預設佇列: ```python from kombu import Exchange, Queue app.conf.task_queues = ( Queue('default', Exchange('default'), routing_key='default'), Queue('videos', Exchange('media'), routing_key='media.video'), Queue('images', Exchange('media'), routing_key='media.image'), ) app.conf.task_default_queue = 'default' app.conf.task_default_exchange_type = 'direct' app.conf.task_default_routing_key = 'default' ``` #### Exchange types exchange type定義了訊息如何透過ehchange做路由。標準exchange type的定義有`direct`、`topic`、`fanout`與`headers`。也有非標準exchange type可以做為RabbitMQ的外掛,像是由Michael Bridgen所寫的[`last-value-cache plug-in`](https://github.com/squaremo/rabbitmq-lvc-plugin)。 ##### Direct exchanges direct exchanges透過精確的routing key做匹配,由routing key-`video`所綁定的佇列只會單純的接收具有該routing key的訊息。 ##### Topic exchanges Topic exchanges匹配routing key,使用`.`來分隔字串,萬用字元`*`(匹配一個單字),`#`(匹配零或多個單字)。 使用routing key像是`usa.news`、`usa.weather`、`norway.news`與`norway.weather`,綁定可能是`*.news`(所有news),`usa.#`(所有在usa項目)或`usa.weather`(所有usa weather項目) #### Related API commands * **exchange.declare(exchange_name, type, passive, durable, auto_delete, internal)** * 按名稱宣告exchange * 參考[amqp:Channel.exchange_declare](https://docs.celeryq.dev/projects/amqp/en/latest/reference/amqp.channel.html#amqp.channel.Channel.exchange_declare) * **Keyword Arguments:** * **passive** – Passive意味著exchange不會被建立,但你可以用它來確認exchange是否早就存在。 * **durable** – durable exchanges是持久化的。(它們在broker重啟之後依然存在) * **auto_delete** – 這意味著exchange在沒有佇列使用的時候會由broker自動刪除。 * queue.declare(queue_name, passive, durable, exclusive, auto_delete) * 按名稱宣告exchange * 參考[amqp:Channel.queue_declare](https://docs.celeryq.dev/projects/amqp/en/latest/reference/amqp.channel.html#amqp.channel.Channel.queue_declare) * Exclusive queues只能被當前連線取用。Exclusive意味著`auto_delete`。 * queue.bind(queue_name, exchange_name, routing_key) * 利用routing key來綁定佇列與exchange * 未綁定的佇列不會收到訊息,因此這是必要的。 * 參考[amqp:Channel.queue_bind](https://docs.celeryq.dev/projects/amqp/en/latest/reference/amqp.channel.html#amqp.channel.Channel.queue_bind) * queue.delete(name, if_unused=False, if_empty=False) * 刪除佇列與它的綁定。 * 參考[amqp:Channel.queue_delete](https://docs.celeryq.dev/projects/amqp/en/latest/reference/amqp.channel.html#amqp.channel.Channel.queue_delete) * exchange.delete(name, if_unused=False) * 刪除一個exchange。 * 參考[amqp:Channel.exchange_delete](https://docs.celeryq.dev/projects/amqp/en/latest/reference/amqp.channel.html#amqp.channel.Channel.exchange_delete) :::info **Note:** 宣告並不一定代表"建立"。當你宣告的時候,你斷言(assert)那實體是存在並且可操作的。關於`exchange/queue/binding`之間誰應該先建立是沒有規定的,不論是`consumer`或`producer`。通常第一個需要它的人就是建立它的那個人。 ::: #### Hands-on with the API Celery帶有一個名為`celery amqp`的工具,用來透過命令行訪問AMQP的API,允許用命令行來管理任務,像是建立/刪除佇列與exchange,清除佇列並發送訊息。它也可以用非AMQP的broker,但不同的broker可能無法實現所有的命令。 你可以在`celery amqp`的參數直接寫入命令,或以沒參數的方式在`shell-mode`下啟動它: ```shell $ celery -A proj amqp -> connecting to amqp://guest@localhost:5672/. -> connected. 1> ``` 這邊`1>`是一個提示符號。數值`1`,代表你目前為止有執行過的命令。輸入`help`取得可用命令清單。它也支援自動完成(auto-completion),因此你可以開始輸入命令,然後按一下`tab`鍵來顯示可用的命令清單。 建立佇列,你可以發送訊息到: ```shell $ celery -A proj amqp 1> exchange.declare testexchange direct ok. 2> queue.declare testqueue ok. queue:testqueue messages:0 consumers:0. 3> queue.bind testqueue testexchange testkey ok. ``` 這建立了direct exchange `testexchange`,然後佇列命名為`testqueue`。然後使用routing key `testkey`將佇列綁定到exchange。 現在開始,使用routing key `testkey`發送到exchange `testexchange`的所有訊息都將移動到此佇列。你可以使用命令`basic.publis`發送訊息: ```shell 4> basic.publish 'This is a message!' testexchange testkey ok. ``` 現在訊息已經發送,你可以再次的檢索它。你可以使用命令`basic.get`,以同步方式輪詢佇列中的新消息。 從佇列中彈出訊息: ```shell 5> basic.get testqueue {'body': 'This is a message!', 'delivery_info': {'delivery_tag': 1, 'exchange': u'testexchange', 'message_count': 0, 'redelivered': False, 'routing_key': u'testkey'}, 'properties': {}} ``` AMQP使用`ack`來表示訊息已經被成功地接收並處理。如果訊息沒有`ack`而consumer頻道是關閉狀況下,那這個訊息就會被傳送到另外的consumer。 注意上面的`delivery_tag`;在連接通道中,每個接收到的訊息都有一個唯一的`delivery_tag`,該標記用於確認訊息。也要注意到,`delivery_tag`在連接之間並非唯一的,因此在另一個Client中,`delivery_tag 1`可能指向與此頻道不同的訊息。 你可以使用`basic.ack`確認接收到的訊息: ```shell 6> basic.ack 1 ok. ``` 測試之後應該刪除你所建立的物件: ```shell 7> queue.delete testqueue ok. 0 messages deleted. 8> exchange.delete testexchange ok. ``` ### Routing Tasks #### Defining queues Celery中,可用佇列的定義由[task_queues](https://docs.celeryq.dev/en/master/userguide/configuration.html#std-setting-task_queues)設置: 這邊是一個帶有三個佇列的佇列配置;`videl`、`images`、`default`: ```python default_exchange = Exchange('default', type='direct') media_exchange = Exchange('media', type='direct') app.conf.task_queues = ( Queue('default', default_exchange, routing_key='default'), Queue('videos', media_exchange, routing_key='media.video'), Queue('images', media_exchange, routing_key='media.image') ) app.conf.task_default_queue = 'default' app.conf.task_default_exchange = 'default' app.conf.task_default_routing_key = 'default' ``` 這邊,[task_default_queue](https://docs.celeryq.dev/en/master/userguide/configuration.html#std-setting-task_default_queue)用於沒有明確指定路由的任務。 `task_default_queue`、`task_default_exchange`、`task_default_routing_key`用來設置任務的預設路由,也做為[task_queues](https://docs.celeryq.dev/en/master/userguide/configuration.html#std-setting-task_queues)內的預設值。 也支援對單個佇列做多個綁定。下面是兩個routing key綁定到相同佇列的範例: ```python from kombu import Exchange, Queue, binding media_exchange = Exchange('media', type='direct') CELERY_QUEUES = ( Queue('media', [ binding(media_exchange, routing_key='media.video'), binding(media_exchange, routing_key='media.image'), ]), ) ``` #### Specifying task destination 任務的目的由以下設置按序決定: 1. `Task.apply_async()`的路由參數 2. 定義於[Task](https://docs.celeryq.dev/en/master/reference/celery.app.task.html#celery.app.task.Task)本身的相關路由屬性 3. 定義於[task_queues](https://docs.celeryq.dev/en/master/userguide/configuration.html#std-setting-task_queues)的[Routers](https://docs.celeryq.dev/en/master/userguide/routing.html#routers) 最好的方式就是不要hard-code這些設置,而是使用[Routers](https://docs.celeryq.dev/en/master/userguide/routing.html#routers)做為配置選項;這是最靈活的作法,但合理的預設值依然可以設置為任務屬性。 #### Routers 路由(router)是決定任務的路由選項的函數(功能)。 定義一個新的路由要做的事,就是定義一個帶有簽章(signature)的函數(name, args, kwargs, options, task=None, \*\*kw) ```python def route_task(name, args, kwargs, options, task=None, **kw): if name == 'myapp.tasks.compress_video': return {'exchange': 'video', 'exchange_type': 'topic', 'routing_key': 'video.compress'} ``` 如果你回傳佇列鍵值,它將使用[task_queues](https://docs.celeryq.dev/en/master/userguide/configuration.html#std-setting-task_queues)中該佇列已定義的設置展開: ```python {'queue': 'video', 'routing_key': 'video.compress'} ``` 變成-> ```python {'queue': 'video', 'exchange': 'video', 'exchange_type': 'topic', 'routing_key': 'video.compress'} ``` 您可以通過將路由類別加到設置[task_routes](https://docs.celeryq.dev/en/master/userguide/configuration.html#std-setting-task_routes)來安裝它們: ```python task_routes = (route_task,) ``` 路由函數也透過名稱添加: ```python task_routes = ('myapp.routers.route_task',) ``` 對於簡單的任務名稱到路由的關聯,如上範例,你可以放一個dict到[task_routes](https://docs.celeryq.dev/en/master/userguide/configuration.html#std-setting-task_routes)以獲得相同行為: ```python task_routes = { 'myapp.tasks.compress_video': { 'queue': 'video', 'routing_key': 'video.compress', }, } ``` 然後,路由將按順序被遍歷,它會在第一個回傳True的路由停止,並將它做為任務的最終路由。 你也可以在序列中定義多個路由: ```python task_routes = [ route_task, { 'myapp.tasks.compress_video': { 'queue': 'video', 'routing_key': 'video.compress', }, ] ``` 然後依次訪問路由,並且將選擇第一個返回值的路由器。 如果你使用Redis或RabbitMQ,你還可以在路由中指定佇列的預設優先序: ```python task_routes = { 'myapp.tasks.compress_video': { 'queue': 'video', 'routing_key': 'video.compress', 'priority': 10, }, } ``` 同樣的,在任務上調用`apply_async`將覆寫預設的優先序: ```python task.apply_async(priority=0) ``` ##### Priority Order and Cluster Responsiveness: 需要注意的是,由於Worker預取(prefetching)的行為,如果在同一時間提交了一批任務(綑綁包),那這批任務可能對D在一開始就失去先後順序。禁用掉Worker預取(prefetching)的行為可以預防這個問題,但這可能會導致較小、快速的任務的效能變的不理想。多數情況下,簡單地將`worker_prefetch_multiplier`調減至1是一種更簡單、更簡潔的方法,可以在不完全禁用預取的情况下提高系統的響應能力。 請注意,使用redis做為broker的時候,優先序的值會反向排序:0為最優先。 #### Broadcast Celery也支援廣播路由(broadcast routing)。這是一個`exchange broadcast_tasks`的範例,它將任務的副本傳送給所有連接的Workers: ```python from kombu.common import Broadcast app.conf.task_queues = (Broadcast('broadcast_tasks'),) app.conf.task_routes = { 'tasks.reload_cache': { 'queue': 'broadcast_tasks', 'exchange': 'broadcast_tasks' } } ``` 現在,`task.reload_cache`會被發送到從佇列`broadcast_tasks`取用的每一個Worker。 這是另一個廣播路由的範例,這次是個`celery beat`排程: ```python from kombu.common import Broadcast from celery.schedules import crontab app.conf.task_queues = (Broadcast('broadcast_tasks'),) app.conf.beat_schedule = { 'test-task': { 'task': 'tasks.reload_cache', 'schedule': crontab(minute=0, hour='*/3'), 'options': {'exchange': 'broadcast_tasks'} }, } ``` ##### Broadcast & Results: 注意,如果兩個任務具有相同的`task_id`,那Celery result就不會去定義發生什麼事。如果相同的任務發佈到多個Wokers,那可能不會保留其狀態歷史記錄。 這種情況下,設置屬性`task.ignore_result`是一個不錯的想法。 ## History 20190823_依據4.4版本說明翻譯 20220520_依據5.2版本說明翻譯