# Flask實作_消息隊列_Celery_03_入門_WorkFlow ###### tags: `flask` `celery` [Celery_Next Steps](http://docs.celeryproject.org/en/latest/getting-started/next-steps.html#next-steps) 因為官方文件說明中的Next Steps擁有太多的資訊了,所以在說明記錄上拆成兩份,接續著理解Celery中的WorkFlow概念。 ## 練習 專案架構與[Flask實作_消息隊列_Celery_02_入門](/AUPlEQVnRkGSt8_ER48dZg)相同,不另外說明,可直接參閱。 ### Canvas: Designing Work-flows(工作流程) 多數情況下,我們在Calling Tasks中提到的delay與apply_async已經足夠我們應用了,但如果我們想把任務調用給另外的任務或是函數的時候,就需要用到另一個方式`signature`,signature包裝了單個參數以及執行選項再以某種方式來傳遞給函數。 ```python= >>> tasks.add.signature((2,2), countdown=10) proj.tasks.add(2, 2) >>> tasks.add.s((2,2), countdown=10) proj.tasks.add(2, 2) ``` 第1行:對add任務建立signature,並提供參數(2,2)以及執行選項`countdown=10` 第3行:可以減寫以s代替signature Signature支援呼叫api,這代表它擁有兩個method,`delay`與`apply_async`,但有一個地方不同,Signature也許已經指定參數設置。 tasks.add中需要兩個參數,兩個Signature組合成完整的Signature執行,這類似於partial function ```python= # 先宣告一個partial function(2, ) >>> s1 = tasks.add.s(2) >>> s1.values() dict_values(['proj.tasks.add', (2,), {}, {}, None, False, None]) # 再用delay加另一個參數(2, 8) >>> res = s1.delay(8) >>> res.get() 10 ``` 第2行:利用Signature加入一個參數 第3行:查詢目前s1的values 第6行:利用delay再加入一個參數 * sig.apply_async(args=(), kwargs={}, \**options) * sig.delay(*args, \**kwargs) ### The Primitives(基元?) 註:翻譯用語是在國家教育研究院查詢到的 Primitives本身就是Signature,可以用來組合成複雜的任務型態。 * Groups ```python= >>> from celery import group >>> from proj.tasks import add >>> g = group(add.s(i) for i in range(10)) >>> g group([proj.tasks.add(0), add(1), add(2), add(3), add(4), add(5), add(6), add(7), add(8), add(9)]) >>> g(10).get() [10, 11, 12, 13, 14, 15, 16, 17, 18, 19] ``` 第3行: `i for i in range(10)`這是一個標準的串列生成式,會產生0-9的值 `add.s(i) for i in range(10)`產生0-9的partial function Signature 利用`group`包裝這個Signature之後,賦值`g`,所以可以看到g本身包含的資訊 第6行:給另外一個參數的值並利用`get`取得任務結果 ```python= >>> group(add.s(i, i) for i in range(10))().get() [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] ``` 第1行:將值兩兩相加 * Chains 將tasks鏈結在一起,一個完成之後呼叫另外一個 ```python= >>> from celery import chain >>> from proj.tasks import add, mul >>> chain(add.s(4,4) | mul.s(8))().get() 64 ``` 第3行:將4+4=8的結果傳到下一個任務中與8相乘,即8x8=64 ```python= >>> g = chain(add.s(4) | mul.s(8)) >>> g(4).get() 64 ``` 第1行:利用partial function實作 ```python= >>> (add.s(4, 4) | mul.s(8))().get() ``` 第1行:也可以直接鏈結 * Chords Chords就是Group帶有Callback ```python= >>> from celery import chord >>> from proj.tasks import add, xsum >>> chord((add.s(i, i) for i in range(10)), xsum.s())().get() ``` 當Group通過Chain,就會變成是Chords ```python= >>> (group(add.s(i, i) for i in range(10)) | xsum.s())().get() ``` ### Routing 預設上,Celery在派送任務的時候所使用的Routing是celery,但這部份可以利用配置來設置,後續再 使用`apply_async`的時候再宣告要走的Queue ```python= app.conf.update( task_routes = { 'proj.tasks.add': {'queue': 'hipri'}, }, ) ``` 利用參數設置我們新增了一個`hipri`的隊列,這可以讓我們在後續指定調用。 ```python= >>> from proj.tasks import add >>> add.apply_async((2, 2), queue='hipri') ``` 也可以在啟動的時候指定要處理的隊列名稱 ```python= celery -A proj worker -Q hipri,celery ``` [Routing](http://docs.celeryproject.org/en/latest/userguide/routing.html#guide-routing) ### 時區設置 ```python= app.conf.timezone = 'Europe/London' ``` 記得調整成自己國家的就可以了 ## 總結 按著官方教學說明我們認識了Signature以及六種Primitives中的三種,其餘三種可以直接再參考官方說明,也了解到怎麼設置Routing(對應RabbitMQ的Queue)。 到這邊,也算是有基礎了,剩下的就是實作中學習了。