GCP Cloud Composer === [TOC] ## Introdcution 1. 多雲端 建立連結資料、處理和服務的跨雲端工作流程,提供整合的雲端環境,也可**跟 AWS 混搭**。 2. 混合型環境 透過自動化調度管理橫跨**內部部署和公用雲端**的工作流程,簡化遷移至雲端的作業,或維持混合型資料環境。 3. Python 程式語言 利用現有的 **Python** 技能,在 Cloud Composer 中以靈活的方式建立工作流程並進行排程。 4. 全代管 Cloud Composer 的代管特性讓您專注在建立、排程和控管工作流程上,**不需煩惱佈建資源**的問題。 5. 開放原始碼 Cloud Composer 建立在 **Apache Airflow** 上,讓使用者不再受限於單一架構,能輕鬆移轉資料。 6. 整合性 Cloud Composer 已預先與 BigQuery、Dataflow、Dataproc、Datastore、Cloud Storage、Pub/Sub 和 Cloud ML Engine 等產品相互整合,可讓您自動化調度管理端對端 **GCP 工作**負載。 7. 可靠性 透過簡單易用的**圖表進行監控**,以及疑難排解問題的根本原因,增加工作流程的可靠性。 > **TL;DR** > 用 Python 學 Airflow 寫法,編寫、排程工作流程並以 Web 監控 ## Airflow 簡介 * Airbnb 開發用以管理各式各樣的流程 * 開源給 Apache 基金會 * 很多公司都在使用 (參照 [Github](https://github.com/apache/airflow) 連結) ### 核心名詞解釋 * **DAG** (Directed Acyclic Graph) is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies. * 例如:建立一個 DAG,包含節點 A、B、C,與關係 A -> B -> C,不過並未指派 A、B、C 各做什麼事 * **Operator** determines what actually gets done * BashOperator - executes a bash command * PythonOperator - calls an arbitrary Python function * EmailOperator - sends an email * SimpleHttpOperator - sends an HTTP request * ... * etc. * **Task** - Once an operator is instantiated, it is referred to as a “task”. * Operator 被實例化後就成為一個 task * **Task Instances** represents a specific run of a task and is characterized as the combination of a dag, a task, and a point in time * 執行整個 workflow 後,每個 task 即成為一個 task instance,包含何事、何時、何地、執行後狀態 (“running”, “success”, “failed”, “skipped”, “up for retry”, etc.) 等資訊 ### Airflow - Hello world ``` python """ Code that goes along with the Airflow tutorial located at: https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py """ from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2015, 6, 1), 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG('tutorial', default_args=default_args, schedule_interval=timedelta(days=1)) # t1, t2 and t3 are examples of tasks created by instantiating operators t1 = BashOperator( task_id='print_date', bash_command='date', dag=dag) t2 = BashOperator( task_id='sleep', bash_command='sleep 5', retries=3, dag=dag) templated_command = """ {% for i in range(5) %} echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7)}}" echo "{{ params.my_param }}" {% endfor %} """ t3 = BashOperator( task_id='templated', bash_command=templated_command, params={'my_param': 'Parameter I passed in'}, dag=dag) t2.set_upstream(t1) t3.set_upstream(t1) ``` * 分段說明 * 引入函式庫 ``` python from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta ``` * 參數宣告 ``` python default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2015, 6, 1), # 首次執行日期 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, # 執行失敗後重試次數 'retry_delay': timedelta(minutes=5), # 執行失敗重試時間間隔 } ``` * 宣告 DAG ``` python dag = DAG('tutorial', default_args=default_args, schedule_interval=timedelta(days=1)) # 命名,並指定執行頻率 ``` * 指定工作 * templated_command 寫法參照 * Jinja template * [Airflow 定義變數](https://airflow.apache.org/macros.html) * e.g. * {{ ds }}: date stamp ``` python t1 = BashOperator(...) # 印出當日日期 t2 = BashOperator() # 睡 5 秒 templated_command = """ {% for i in range(5) %} echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7)}}" echo "{{ params.my_param }}" {% endfor %} """ t3 = BashOperator( task_id='templated', bash_command=templated_command, params={'my_param': 'Parameter I passed in'}, dag=dag) ``` * 編排相依關係 ``` python t2.set_upstream(t1) # t2 的父節點是 t1,也可寫 t1 >> t2 t3.set_upstream(t1) # t3 的父節點是 t1,也可寫 t1 >> t3 # 上兩句可直接併成 t1 >> [t2, t3] # 也可用 set_downstream, << ``` ## 設定、建立 Cloud Composer :::danger :warning: 建立環境約需時 25 分鐘:rage:,先去喝杯茶吧 :tea: ::: * 點擊建立按鈕 ![](https://i.imgur.com/CpfllvE.png) * 輸入名稱、位置、python 3 版本,其餘預設即可 ![](https://i.imgur.com/eU25pys.png) * 點擊環境名稱,查看細節或設定設定檔等 ![](https://i.imgur.com/yXAqNnI.png) * 進入 DAGs folder 並上傳撰寫好的 airflow_test.py ([src](https://gist.github.com/newman1234/87eea184f0e4d28a6f53b9e86ebba05b)) ![](https://i.imgur.com/l7kuVBy.png) * 進入 Airflow webserver 查看所有 DAGs ![](https://i.imgur.com/dA3o4FK.png) * DAG Graph View 可觀察每個 Task Instance 狀態 ![](https://i.imgur.com/OcvZ7OL.png) ![](https://i.imgur.com/VmP1i1Z.png) * Gantt 圖檢視每個 Task 執行區間 ![](https://i.imgur.com/ubvZWrx.png) * 檢視某 DAG 執行總覽,可選擇篩選條件 ![](https://i.imgur.com/Xu6H8sX.png) ## 計價 1. run dag workflow 時計費 ![](https://i.imgur.com/71pMnwv.png) 3. 放置 Airflow .py file 到 Cloud Storage 費用 5. Stackdriver 所額外紀錄的 log 費用 (預設開啟) ## 與 Dataflow 比較 如果一個大數據任務是**單次性**的,直接使用 Dataflow 即可;反之,需要**管理、控制、排程**等額外功能,Dataflow 可作為其中的一個 task,並由 Composer 串流整個工作流程,達成額外的效益。 ## 相關應用 1. **Data warehousing**: cleanse, organize, data quality check, and publish/stream data into our growing data warehouse 1. **Machine Learning**: automate machine learning workflows 1. **Growth analytics**: compute metrics around guest and host engagement as well as growth accounting 1. **Experimentation**: compute A/B testing experimentation frameworks logic and aggregates 1. **Email targeting**: apply rules to target and engage users through email campaigns 1. **Sessionization**: compute clickstream and time spent datasets 1. **Search**: compute search ranking related metrics 1. **Data infrastructure maintenance**: database scrapes, folder cleanup, applying data retention policies, … ## Reference 1. https://cloud.google.com/composer/ 1. https://airflow.apache.org/tutorial.html 1. https://stackoverflow.com/questions/54154816/using-dataflow-vs-cloud-composer 1. https://leemeng.tw/a-story-about-airflow-and-data-engineering-using-how-to-use-python-to-catch-up-with-latest-comics-as-an-example.html 2. https://www.applydatascience.com/airflow/airflow-tutorial-introduction/ ###### tags: `GCP` `Cloud Composer`