# Flask實作_消息隊列_Celery_03_入門_WorkFlow
###### tags: `flask` `celery`
[Celery_Next Steps](http://docs.celeryproject.org/en/latest/getting-started/next-steps.html#next-steps)
因為官方文件說明中的Next Steps擁有太多的資訊了,所以在說明記錄上拆成兩份,接續著理解Celery中的WorkFlow概念。
## 練習
專案架構與[Flask實作_消息隊列_Celery_02_入門](/AUPlEQVnRkGSt8_ER48dZg)相同,不另外說明,可直接參閱。
### Canvas: Designing Work-flows(工作流程)
多數情況下,我們在Calling Tasks中提到的delay與apply_async已經足夠我們應用了,但如果我們想把任務調用給另外的任務或是函數的時候,就需要用到另一個方式`signature`,signature包裝了單個參數以及執行選項再以某種方式來傳遞給函數。
```python=
>>> tasks.add.signature((2,2), countdown=10)
proj.tasks.add(2, 2)
>>> tasks.add.s((2,2), countdown=10)
proj.tasks.add(2, 2)
```
第1行:對add任務建立signature,並提供參數(2,2)以及執行選項`countdown=10`
第3行:可以減寫以s代替signature
Signature支援呼叫api,這代表它擁有兩個method,`delay`與`apply_async`,但有一個地方不同,Signature也許已經指定參數設置。
tasks.add中需要兩個參數,兩個Signature組合成完整的Signature執行,這類似於partial function
```python=
# 先宣告一個partial function(2, )
>>> s1 = tasks.add.s(2)
>>> s1.values()
dict_values(['proj.tasks.add', (2,), {}, {}, None, False, None])
# 再用delay加另一個參數(2, 8)
>>> res = s1.delay(8)
>>> res.get()
10
```
第2行:利用Signature加入一個參數
第3行:查詢目前s1的values
第6行:利用delay再加入一個參數
* sig.apply_async(args=(), kwargs={}, \**options)
* sig.delay(*args, \**kwargs)
### The Primitives(基元?)
註:翻譯用語是在國家教育研究院查詢到的
Primitives本身就是Signature,可以用來組合成複雜的任務型態。
* Groups
```python=
>>> from celery import group
>>> from proj.tasks import add
>>> g = group(add.s(i) for i in range(10))
>>> g
group([proj.tasks.add(0), add(1), add(2), add(3), add(4), add(5), add(6), add(7), add(8), add(9)])
>>> g(10).get()
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
```
第3行:
`i for i in range(10)`這是一個標準的串列生成式,會產生0-9的值
`add.s(i) for i in range(10)`產生0-9的partial function Signature
利用`group`包裝這個Signature之後,賦值`g`,所以可以看到g本身包含的資訊
第6行:給另外一個參數的值並利用`get`取得任務結果
```python=
>>> group(add.s(i, i) for i in range(10))().get()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
```
第1行:將值兩兩相加
* Chains
將tasks鏈結在一起,一個完成之後呼叫另外一個
```python=
>>> from celery import chain
>>> from proj.tasks import add, mul
>>> chain(add.s(4,4) | mul.s(8))().get()
64
```
第3行:將4+4=8的結果傳到下一個任務中與8相乘,即8x8=64
```python=
>>> g = chain(add.s(4) | mul.s(8))
>>> g(4).get()
64
```
第1行:利用partial function實作
```python=
>>> (add.s(4, 4) | mul.s(8))().get()
```
第1行:也可以直接鏈結
* Chords
Chords就是Group帶有Callback
```python=
>>> from celery import chord
>>> from proj.tasks import add, xsum
>>> chord((add.s(i, i) for i in range(10)), xsum.s())().get()
```
當Group通過Chain,就會變成是Chords
```python=
>>> (group(add.s(i, i) for i in range(10)) | xsum.s())().get()
```
### Routing
預設上,Celery在派送任務的時候所使用的Routing是celery,但這部份可以利用配置來設置,後續再 使用`apply_async`的時候再宣告要走的Queue
```python=
app.conf.update(
task_routes = {
'proj.tasks.add': {'queue': 'hipri'},
},
)
```
利用參數設置我們新增了一個`hipri`的隊列,這可以讓我們在後續指定調用。
```python=
>>> from proj.tasks import add
>>> add.apply_async((2, 2), queue='hipri')
```
也可以在啟動的時候指定要處理的隊列名稱
```python=
celery -A proj worker -Q hipri,celery
```
[Routing](http://docs.celeryproject.org/en/latest/userguide/routing.html#guide-routing)
### 時區設置
```python=
app.conf.timezone = 'Europe/London'
```
記得調整成自己國家的就可以了
## 總結
按著官方教學說明我們認識了Signature以及六種Primitives中的三種,其餘三種可以直接再參考官方說明,也了解到怎麼設置Routing(對應RabbitMQ的Queue)。
到這邊,也算是有基礎了,剩下的就是實作中學習了。