GCP Cloud Composer
===
[TOC]
## Introdcution
1. 多雲端
建立連結資料、處理和服務的跨雲端工作流程,提供整合的雲端環境,也可**跟 AWS 混搭**。
2. 混合型環境
透過自動化調度管理橫跨**內部部署和公用雲端**的工作流程,簡化遷移至雲端的作業,或維持混合型資料環境。
3. Python 程式語言
利用現有的 **Python** 技能,在 Cloud Composer 中以靈活的方式建立工作流程並進行排程。
4. 全代管
Cloud Composer 的代管特性讓您專注在建立、排程和控管工作流程上,**不需煩惱佈建資源**的問題。
5. 開放原始碼
Cloud Composer 建立在 **Apache Airflow** 上,讓使用者不再受限於單一架構,能輕鬆移轉資料。
6. 整合性
Cloud Composer 已預先與 BigQuery、Dataflow、Dataproc、Datastore、Cloud Storage、Pub/Sub 和 Cloud ML Engine 等產品相互整合,可讓您自動化調度管理端對端 **GCP 工作**負載。
7. 可靠性
透過簡單易用的**圖表進行監控**,以及疑難排解問題的根本原因,增加工作流程的可靠性。
> **TL;DR**
> 用 Python 學 Airflow 寫法,編寫、排程工作流程並以 Web 監控
## Airflow 簡介
* Airbnb 開發用以管理各式各樣的流程
* 開源給 Apache 基金會
* 很多公司都在使用 (參照 [Github](https://github.com/apache/airflow) 連結)
### 核心名詞解釋
* **DAG** (Directed Acyclic Graph) is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies.
* 例如:建立一個 DAG,包含節點 A、B、C,與關係 A -> B -> C,不過並未指派 A、B、C 各做什麼事
* **Operator** determines what actually gets done
* BashOperator - executes a bash command
* PythonOperator - calls an arbitrary Python function
* EmailOperator - sends an email
* SimpleHttpOperator - sends an HTTP request
* ...
* etc.
* **Task** - Once an operator is instantiated, it is referred to as a “task”.
* Operator 被實例化後就成為一個 task
* **Task Instances** represents a specific run of a task and is characterized as the combination of a dag, a task, and a point in time
* 執行整個 workflow 後,每個 task 即成為一個 task instance,包含何事、何時、何地、執行後狀態 (“running”, “success”, “failed”, “skipped”, “up for retry”, etc.) 等資訊
### Airflow - Hello world
``` python
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('tutorial', default_args=default_args, schedule_interval=timedelta(days=1))
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
```
* 分段說明
* 引入函式庫
``` python
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
```
* 參數宣告
``` python
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1), # 首次執行日期
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1, # 執行失敗後重試次數
'retry_delay': timedelta(minutes=5), # 執行失敗重試時間間隔
}
```
* 宣告 DAG
``` python
dag = DAG('tutorial', default_args=default_args, schedule_interval=timedelta(days=1))
# 命名,並指定執行頻率
```
* 指定工作
* templated_command 寫法參照
* Jinja template
* [Airflow 定義變數](https://airflow.apache.org/macros.html)
* e.g.
* {{ ds }}: date stamp
``` python
t1 = BashOperator(...) # 印出當日日期
t2 = BashOperator() # 睡 5 秒
templated_command =
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
```
* 編排相依關係
``` python
t2.set_upstream(t1) # t2 的父節點是 t1,也可寫 t1 >> t2
t3.set_upstream(t1) # t3 的父節點是 t1,也可寫 t1 >> t3
# 上兩句可直接併成 t1 >> [t2, t3]
# 也可用 set_downstream, <<
```
## 設定、建立 Cloud Composer
:::danger
:warning: 建立環境約需時 25 分鐘:rage:,先去喝杯茶吧 :tea:
:::
* 點擊建立按鈕

* 輸入名稱、位置、python 3 版本,其餘預設即可

* 點擊環境名稱,查看細節或設定設定檔等

* 進入 DAGs folder 並上傳撰寫好的 airflow_test.py ([src](https://gist.github.com/newman1234/87eea184f0e4d28a6f53b9e86ebba05b))

* 進入 Airflow webserver 查看所有 DAGs

* DAG Graph View 可觀察每個 Task Instance 狀態


* Gantt 圖檢視每個 Task 執行區間

* 檢視某 DAG 執行總覽,可選擇篩選條件

## 計價
1. run dag workflow 時計費

3. 放置 Airflow .py file 到 Cloud Storage 費用
5. Stackdriver 所額外紀錄的 log 費用 (預設開啟)
## 與 Dataflow 比較
如果一個大數據任務是**單次性**的,直接使用 Dataflow 即可;反之,需要**管理、控制、排程**等額外功能,Dataflow 可作為其中的一個 task,並由 Composer 串流整個工作流程,達成額外的效益。
## 相關應用
1. **Data warehousing**: cleanse, organize, data quality check, and publish/stream data into our growing data warehouse
1. **Machine Learning**: automate machine learning workflows
1. **Growth analytics**: compute metrics around guest and host engagement as well as growth accounting
1. **Experimentation**: compute A/B testing experimentation frameworks logic and aggregates
1. **Email targeting**: apply rules to target and engage users through email campaigns
1. **Sessionization**: compute clickstream and time spent datasets
1. **Search**: compute search ranking related metrics
1. **Data infrastructure maintenance**: database scrapes, folder cleanup, applying data retention policies, …
## Reference
1. https://cloud.google.com/composer/
1. https://airflow.apache.org/tutorial.html
1. https://stackoverflow.com/questions/54154816/using-dataflow-vs-cloud-composer
1. https://leemeng.tw/a-story-about-airflow-and-data-engineering-using-how-to-use-python-to-catch-up-with-latest-comics-as-an-example.html
2. https://www.applydatascience.com/airflow/airflow-tutorial-introduction/
###### tags: `GCP` `Cloud Composer`