# Celery_First Steps with Celery ###### tags: `celery` `celery 5.2` `python` [First Steps with Celery](https://docs.celeryproject.org/en/stable/getting-started/first-steps-with-celery.html) Celery是一個包含batteries的任務隊列。它很簡單,簡單到你不需要瞭解它怎麼解決問題的情況下來使用它。它圍繞在最佳化設計,所以你的產品可以擴展並且與其它語言整合,而且提供生產中執行此系統所需要的工具與支援。 在這個教程中,你將學到使用Celery的絕對基礎。 學習關於: * 選擇、安裝訊息傳輸(message transport),也就是broker * 安裝Celery,並且建立你的第一個任務(task) * 啟動worker並且呼叫任務(tasks) * 追蹤任務在不同任務狀態之間的轉換,並且檢查其回傳查 Celery第一次用會讓你怕怕滴,不過不用擔心,這個教程會讓你立刻開始使用。這個教程會刻意的保持簡單,避免你跟進階功能有所混淆。在你完成這個教程之後,不如試著去讀讀文件的其它部份。就像是,[Next Steps](https://docs.celeryproject.org/en/stable/getting-started/next-steps.html#next-steps)這個就程就會進一步的說明Celery的能力。 ## Choosing a Broker Celery需要一個解決方案來發送與接收訊息;通常這會以一種稱為message broker的單獨服務的形式出現。 這有很多種選擇,包含下面幾種。 ### RabbitMQ [RabbitMQ](https://www.rabbitmq.com/)的功能完善、穩定、可靠度高而且容易安裝。它是生產環境的最佳選擇。關於在Celery中使用Rabbit的細節可以參考: * [Using RabbitMQ](https://docs.celeryproject.org/en/stable/getting-started/backends-and-brokers/rabbitmq.html#broker-rabbitmq) 如果你用的是Ubuntu或是Debian的話,你可以用下面的指令來安裝RabbitMQ: ```shell= sudo apt-get install rabbitmq-server ``` 或者,如果你想要在Docker上執行的話,那可以這麼做: ```shell= docker run -d -p 5672:5672 rabbitmq ``` 當命令執行完成之後,broker就會在背景執行,準備好為你運送訊息:Starting rabbitmq-server: SUCCESS。 如果你不是用上述方法的話也不用太擔心,到它的官方網站你就找的到相關的安裝說明,包含Microsoft Windows: * [http://www.rabbitmq.com/download.html](http://www.rabbitmq.com/download.html) ### Redis [Redis](https://redis.io/)的功能也算是完善,不過很容易在突然終止或是斷電的情況下丟失資料。相關細節可以參考using Redis: * [Using Redis](https://docs.celeryproject.org/en/stable/getting-started/backends-and-brokers/redis.html#broker-redis) 如果你希望有Docker上執行這個的話,可以執行: ```shell= docker run -d -p 6379:6379 redis ``` ### Other brokers 除了上述兩種方法,還有其它實驗性的傳輸實現可供選擇,包含[Amazon SQS](https://docs.celeryproject.org/en/stable/getting-started/backends-and-brokers/sqs.html#broker-sqs)。 完整清單可見[Broker Overview](https://docs.celeryproject.org/en/stable/getting-started/backends-and-brokers/index.html#broker-overview)。 ## Install Celery Celery在Python Package Index(PyPI)上,因此你可以用標準的Python工具,像是pip或是easy_install來安裝: ```shell= pip install celery ``` ## Application 你所需要的第一件事就是一個Celery的實例(instace)。我們稱之為Celery application,或簡稱為app。這個實例(instace)會拿來做為你希望在Celery中做的任何事的進入點、像是建立任務(tasks)與管理workers,其它的模組必需要能夠import這個instance。 在這個教程中,我們會維持在單一模組(single module)的作法,不過如果是大型專案的話,你就必需要建立[專用的模組](https://docs.celeryproject.org/en/stable/getting-started/next-steps.html#proj-celery-py)。 讓我們建立檔案`tasks.py`: ```python= from celery import Celery app = Celery('tasks', broker='pyamqp://guest@localhost//') @app.task def add(x, y): return x + y ``` Celery的第一個引數就是當前模組的名稱。這只是為了在 `__main__`模組中定義任務時可以自動生成名稱。 第二個參數則為關鍵字引數,broker,指定你想要使用的message broker的URL。這邊我們用的是RabbitMQ(也是預設選項)。 上面Choosing a Broker有更多的選擇,RabbitMQ的話可以使用`amqp://localhost`,或是Redis,你可以用`redis://localhost`。 這邊你定義了一個單一任務(single task),命名為`add`,這個任務會回傳兩個數值的加總。 ## Running the Celery worker server 你現在可以利用執行我們的程式然後加上引數`worker`來執行worker: ```shell= celery -A tasks worker --loglevel=INFO ``` :::warning Note: 如果worker沒有啟動的話,可以參考下面的異常排除。 ::: 在生產環境中,你會需要讓worker在背景做為常駐程式來執行。這一點你可以利用你的平台所提供的工具,像是[supervisord](http://supervisord.org/)(更多訊息可參考[Daemonization](https://docs.celeryproject.org/en/stable/userguide/daemonizing.html#daemonizing)) 如果你想看可用的完整的命令選項清單,你可以執行: ```shell= celery worker --help ``` 還有其它幾個可用命令,你可以執行: ```shell= celery --help ``` ## Calling the task 要執行我們剛剛定義的任務,我們可以使用[delay()](https://docs.celeryproject.org/en/stable/reference/celery.app.task.html#celery.app.task.Task.delay)這個方法。 這個方法是[apply_async()](https://docs.celeryproject.org/en/stable/reference/celery.app.task.html#celery.app.task.Task.apply_async)的一種快捷模式的操作,可以更好的控制任務的執行(見[Calling Tasks](https://docs.celeryproject.org/en/stable/userguide/calling.html#guide-calling)): ```python= >>> from tasks import add >>> add.delay(4, 4) ``` 這個任務現在已經被你剛剛所啟動的worker處理。你可以透過查詢worker的控制台輸出來驗證這一點。 呼叫任務會回傳一個[AsyncResult](https://docs.celeryproject.org/en/stable/reference/celery.result.html#celery.result.AsyncResult)的實例。這可以用來確認任務的狀態,到底這個任務是等待完成中,或是已經得到它的回傳值(或者如果任務失敗,那就會得到例外(exception)與回溯(traceback)) 預設情況下我們並不會啟動記錄結果。為了能夠做遠端程序調用或是持續追蹤資料庫中的任務結果,你將會需要去配置Celery以便你能夠使用結果的後端(result backend)。這會在下一章說明。 ## Keeping Results 如果你希望能夠持續追蹤任務的狀態,那Celery就會需要保存或發送狀態到某個地方。有幾個建內的result backends可以選擇:SQLAlchemy/Django ORM、MongoDB、Memcached、Redis、RPC(RabbitMQ/AMQP),又或者你可以自己定義。 以這個範例來說,我們使用rpc來做為result backend,這會將狀態做為transient messages(瞬態訊息?)來發回。這個backend會透過Celery的backend這個引數來設置(或者如果你選擇使用組態模組,那就會通過[result_backend](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-result_backend)來設置)。因此,你可以調整`tasks.py`內的實例化設置來啟用`rpc://backend`: ```python= app = Celery('tasks', backend='rpc://', broker='pyamqp://') ``` 或者,如果你想要使用Redis來做為result backend,然後要使用RabbitMQ來做為message broker(常見組合): ```python= app = Celery('tasks', backend='redis://localhost', broker='pyamqp://') ``` 更多關於result backends可參考[Result Backends](https://docs.celeryproject.org/en/stable/userguide/tasks.html#task-result-backends)。 現在我們使用result backend這個配置,關閉當前的python session,然後再次的載入tasks這個模組讓剛剛的調整生效。這一次,當你執行呼叫任務的時候,你就可以保留住AsyncResult這個回傳的實例: ```python= >>> from tasks import add # close and reopen to get updated 'app' >>> result = add.delay(4, 4) ``` [ready()](https://docs.celeryproject.org/en/stable/reference/celery.result.html#celery.result.AsyncResult.ready)這個方法會回傳任務是否執行完成: ```python= >>> result.ready() False ``` ## Configuration 基本上,Celery就像是一個consumer appliance(消費型家用電器)一樣,不需要太多的設置就可以操作它。它有一個輸入與一個輸出。輸入的部份必需連接到broker,而輸出的部份則是可以選擇性的連接到result backend。然而,如果你仔細的看看它的背面,那你就會發現有一個蓋子,上面有很多的silders、dials、與buttons:這就是組態配置。 大部份的情境來說,預設的組態配置已經夠用了,不過你還是可以做很多的組態設置來讓Celery按照你的需求作業。當然啦,去看看有那些可用的選項是一個好主意,這可以讓你熟悉有那些配置是可以設置的。更多可參考[Configuration and defaults](https://docs.celeryproject.org/en/stable/userguide/configuration.html#configuration)。 你可以在app上直接的設置組態,或者是利用專用的配置模組來設置。下面範例說明的是,你可以透過更改[task_serializer](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-task_serializer)設置來調整用於序列化任務有效負載的預設序列化器: ```python= app.conf.task_serializer = 'json' ``` 如果你一次要做很多配置設置的話,那你可以用`update`: ```python= app.conf.update( task_serializer='json', accept_content=['json'], # Ignore other content result_serializer='json', timezone='Europe/Oslo', enable_utc=True, ) ``` 對於大型專案來說,我們是比較建議採用專用配置模組的方式。我們並不是那麼建議週期性任務的間隔(peridoic task intervals)與任務路由選項(task routing options)是採用hard coding的方式。最好把它們保存在一個集中位置。對於libraries來說尤其如此,因為這讓users能夠控制任務的行為。這種集中配置的方式還允許你的系統管理員在系統故障的時候做簡單的調整。 你可以利用呼叫[app.config_from_object()](https://docs.celeryproject.org/en/stable/reference/celery.html#celery.Celery.config_from_object)讓你的Celery知道,是時候使用配置模組了: ```python= app.config_from_object('celeryconfig') ``` 這模組通常稱為"celeryconfig",就當做是一種約定俗成吧,不過你當然還是可以用其它名稱就是。 上面的範例中,一個模組名為`celeryconfig.py`,你必需要有辦法在當前目錄或Python path中去載入它。它看起來就像下面這樣: `celeryconfig.py`: ```python= broker_url = 'pyamqp://' result_backend = 'rpc://' task_serializer = 'json' result_serializer = 'json' accept_content = ['json'] timezone = 'Europe/Oslo' enable_utc = True ``` 要驗證你的配置文件是否能夠正常作業而且不會有任何語法錯誤,你可以試著import看看: ```shell= $ python -m celeryconfig ``` 相關配置的完整資訊,你可以參考[Configuration and defaults](https://docs.celeryproject.org/en/stable/userguide/configuration.html#configuration)。 為了彰顯我大配置文件的能耐,你可以利用下面的方式把一個行為異常的任務路由到一個專用的佇列: `celeryconfig.py`: ```python= task_routes = { 'tasks.add': 'low-priority', } ``` 又或者你也可以對任務的執行速率做一些限制,而不是控制它的路由,就像是讓這類型任務每分鐘只能執行10次(10/m): `celeryconfig.py`: ```python= task_annotations = { 'tasks.add': {'rate_limit': '10/m'} } ``` 如果你的broker是RabbitMQ或是Redis,你還可以指示workers在執行的時候為任務設置新的執行速率限制: ```shell= $ celery -A tasks control rate_limit tasks.add 10/m worker@example.com: OK new rate limit set successfully ``` 更多關於任務路由的資訊可參考[Routing Tasks](https://docs.celeryproject.org/en/stable/userguide/routing.html#guide-routing),更多關於註解的部份也可參考[task_annotations](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-task_annotations),或遠端控制命令以及如何監控你的workers在做什麼的[Monitoring and Management Guide](https://docs.celeryproject.org/en/stable/userguide/monitoring.html#guide-monitoring)。 ## Where to go from here 如果你想學更多,那就請繼續向下看[Next Stpes](https://docs.celeryproject.org/en/stable/getting-started/next-steps.html#next-steps)這個教程,然後接著就可以去看看[User Guide](https://docs.celeryproject.org/en/stable/userguide/index.html#guide)。 ## Troubleshooting [Frequently Asked Questions](https://docs.celeryproject.org/en/stable/faq.html#faq)中還有問題排除的部份。 ### Worker doesn't start: Permission Error * 如果你使用的是Debian、Ubuntu或是其它Debian-based distributions: * Debian最近把`/dev/shm`重新命名為`/run/shm` 一個簡單的解法就是建立一個符號鏈接: ```shell # ln -s /run/shm /dev/shm ``` * Others: * 如果你提供任何`--pidfile, --logfile, --statedb`參數,那就要確保你啟動worker的user是有權限讀取、寫入相關文件或目錄。 ### Result backend doesn’t work or tasks are always in PENDING state 預設情況下,所有的任務都是[PENDING(待辦)](https://docs.celeryproject.org/en/stable/userguide/tasks.html#std-state-PENDING)的,因此最好將狀態命名為"unknown"。當任務派送出去的時候,Celery並不會去更新任務的狀態,而且只要是沒有歷史記錄的任務都會被假設狀態為pending(畢竟你是知道任務id的)。 1. 確認任務沒有啟用`ignore_result` * 啟用這個選項就可以強制worker跳過更新狀態 2. 確認[task_ignore_result](https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-task_ignore_result)這個設置沒有被啟用 3. 確認你沒有舊的workers仍然在執行中 * 你很容易一個不小心就啟動多個workers,所以在你啟動一個新的worker之前,請先確認上一個worker已經正確的關閉 * 那種沒有配置預期的result backend的舊的worker可能正在執行中,而且還會綁架你派出去的任務 * 參數`--pidfile`可以設置為絕對路徑來確保不會發生這種事 4. 確認client端配置正確的backend * 如果出於某種原因,client配置成跟worker使用不同的backend,那你就無法接收到result(結果)。所以厚,你一定要確保你的backend配置正確: ```python >>> result = task.delay() >>> print(result.backend) ``` ## History 20211205_依據5.2版本說明調整