# Airflow 介紹 ###### tags: `Airflow` > Author: Axot > 教程連結: https://hackmd.io/@yNxpVdiDR1ym4SdTyfux3g/Hki7kBTlP # 聲明 - 介紹時不會與其他ETL和workflow management的Tool來去做比較。 - 此介紹僅包含Airflow的一部分,並非全貌,因此看完這個介紹以後,Airflow還有很多東東等著你去發掘呢! - 人生第一次寫文章,寫太爛請多包涵。 # Introduction [Airflow官方文件](https://airflow.apache.org/docs/stable/) ## 基本資料: - From Airbnb,現為Apache頂級專案 - 程式語言: python - 是個工作流程管理系統 - 有很棒的GUI - 開源 - 遵循著”configuration as code”原則,workflow及task都用python寫 ## 簡介(From Leemeng) - Airflow是以Python寫成的工作流程管理系統(Workflow Management System) - 近年不管是資料科學家、資料工程師還是任何需要處理數據的軟體工程師,Airflow 都是他們用來建構可靠的 ETL 以及定期處理批量資料的首選之一。 - 大致UI長相: - ![](https://i.imgur.com/mr4Gn60.png) - ![](https://i.imgur.com/Hf9f5or.gif) - 大致結構: - ![](https://i.imgur.com/q2Fxw6k.png) - Workers: 負責執行task的工作單位 - Scheduler: 負責排task給Workers,以及即時回報排程狀況給Webserver - Database: 上游task裡能將執行結果存到Database,下游結果能從Database裡取得上游task的結果 - Webserver: 接收Scheduler以及task的logs來顯示UI - 基本上屌打其他寫workflow的東西(書是這樣講的)。 - ![](https://i.imgur.com/15fqBA9.png) ## DAG(Directed Acyclic Graphs) - workflow 可以被視為一個graph的結構, tasks為nodes,而task dependencies為 directed edges between nodes. - 而在Airflow裡,一個workflow是一個DAG(Directed Acyclic Graph), 其graph不能有cycle。 - ![](https://i.imgur.com/x0eCkHn.png) - 幾種設定task的Dependency的語法(之後還會再看到) ```python= # 上到下游 op1 >> op2 >> op3 # or op1.set_downstream(op2) op2.set_downstream(op3) # 下到上游 op3 << op2 << op1 # or op2.set_upstream(op1) op3.set_upstream(op2) # 也可以到一個list op1 >> [op2, op3] ``` ## 大致功能: - 為workflow工作排程、處理task dependencies, 平行執行, error handling, etc. - Airflow可以連到、執行各種不同語言、系統的tasks - 若是把某個task更新後,通過backfilling能把過去workflow的執行的狀態的該task與其下游task都重新執行一遍。 - ![](https://i.imgur.com/Kimwj0C.png) # 入門 ## Requirements - 作業系統: Linux, MacOS, WSL(Windows Subsystem for Linux) - 有pip - 其實也可以用docker啦,不過這篇文不會教你怎麼用docker來跑Airflow。 ## 安裝 ```bash= # 安裝 pip install apache-airflow # 若有其他相依套件要安裝(像mongo),可以: pip install apache-airflow[mongo] ``` ## 第一次執行 ```bash= # 設定Airflow至當前路徑 export AIRFLOW_HOME=$(pwd) # 產生config檔及基本設定 airflow initdb # 打開webserver airflow webserver -p 8080 ``` 接著在瀏覽器打開`localhost:8080`,就可以看到以下畫面 ![](https://i.imgur.com/r8yB3jC.png) - 圖中的example_xxxx是官方提供的DAG範例,看著他們的Code也可以學習到一些東西唷~! - 若是不想看到examples,需更改airflow.cfg的內容,將`load_examples`設為False,接著執行 ```bash= # 重製設定 airflow resetdb # 打開Web Server airflow webserver -p 8080 ``` `resetdb`的操作會將之前所設定的Connections、Pool給清掉,因此最好一開始就先改好airflow.cfg - 打開webserver後,再開一個cmd,執行 ```bash= export AIRFLOW_HOME=$(pwd) # 開啟排程器 airflow scheduler ``` 再來在UI上點選隨便一個![](https://i.imgur.com/mQeMHlr.png)將其設為ON,scheduler即會將其排程(若時間到的話),若你等不及了想執行(測試)看看,![](https://i.imgur.com/PE2xEdp.png)點選圖中最左邊符號 *trigger*,即可人工trigger一個dag,將其執行。 - 這裡講一下scheduler與webserver的關係,在沒有開啟scheduler的情況下,webserver會每過一段時間(多久我不知道,有時候根本不會動)從`$AIRFLOW_HOME/dags`底下看dags的狀況,若有更新或新增便會更新WebUI的Dags。而在開啟scheduler的狀況下,則會即時更新。 > **WebUI畫面並非即時,需要重新整理畫面才會看到新的狀態。** ## 程式基本架構 ### 範例 1. 首先將這個程式碼放到`$AIRFLOW_HOME/dags`底下。 ```python= import os import time import logging from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python_operator import PythonOperator, BranchPythonOperator from airflow.operators.dummy_operator import DummyOperator def check_weekday(date_stamp): today = datetime.strptime(date_stamp, '%Y%m%d') if today.isoweekday() <= 5: return 'is_workday' else: return 'is_holiday' def get_metadata(): logging.info('get_metadata'+'~'*30+'!!!') def clean_data(): logging.info('clean_data'+'~'*30+'!!!') default_args = { 'owner': 'AxotZero', 'start_date': datetime(2020, 8, 21), 'schedule_interval': '@daily', 'retries': 2, 'retry_delay': timedelta(minutes=5) } with DAG(dag_id='tutorial', default_args=default_args) as dag: tw_stock_start = DummyOperator( task_id='tw_stock_start' ) check_weekday = BranchPythonOperator( task_id='check_weekday', python_callable=check_weekday, op_args=['{{ ds_nodash }}'] ) is_holiday = DummyOperator( task_id='is_holiday' ) is_workday = DummyOperator( task_id='is_workday' ) get_metadata = PythonOperator( task_id='get_metadata', python_callable=get_metadata, ) clean_data = PythonOperator( task_id='clean_data', python_callable=clean_data, ) tw_stock_end = DummyOperator( task_id='tw_stock_end', trigger_rule='one_success' ) tw_stock_start >> check_weekday >> [is_workday, is_holiday] is_holiday >> tw_stock_end is_workday >> get_metadata >> clean_data >> tw_stock_end ``` 2. 若你的webserver和scheduler有開啟的話,即可在你的web上看到一個叫做tutorial的dag。![](https://i.imgur.com/dMFDnUd.png) 3. 這個簡單的程式碼能產生以下的workflow ![](https://i.imgur.com/AyjUhTw.png) 4. 打開Dag旁邊的開關,即可執行~! ### 介紹 ``` 這個dag的功能: 1. 一開始先判斷execution_date是假日還是平日,若是平日則跳到2,若為假日跳到3 2. logger print出一些資訊,跳到3 3. 結束。 ``` 1. 我們先看到以下程式碼片段, ```python=35 default_args = { 'owner': 'AxotZero', 'start_date': datetime(2020, 8, 21), 'schedule_interval': '@daily', 'retries': 2, 'retry_delay': timedelta(minutes=5) } with DAG(dag_id='tutorial', default_args=default_args) as dag: ... ``` - 這個default_args除了拿來當參數給dag以外,也會作為參數傳給該dag底下的各個operator。因此這個default_dags其實包含了dag和operator的參數。 - 給dag的 - owner: 就擁有者 - start_date: 該dag開始的執行時間(可以是過去的時間,會從過去的時間點開始執行) - schedule_interval: 多久執行一次 - 給operator - retries: 執行失敗時,要重試多少次 - retry_delay: 執行失敗時,過多久才重試 > *dag和operator還有一大堆參數等著你去發掘唷~* > dag: [連結](https://airflow.apache.org/docs/stable/_api/airflow/models/dag/index.html) > operator: [連結](https://airflow.apache.org/docs/stable/_api/airflow/models/baseoperator/index.html) 2. 再看到以下程式碼片段 ```python=37 tw_stock_start = DummyOperator( task_id='tw_stock_start' ) check_weekday = BranchPythonOperator( task_id='check_weekday', python_callable=check_weekday, op_args=['{{ ds_nodash }}'] ) is_holiday = DummyOperator( task_id='is_holiday' ) is_workday = DummyOperator( task_id='is_workday' ) get_metadata = PythonOperator( task_id='get_metadata', python_callable=get_metadata, ) clean_data = PythonOperator( task_id='clean_data', python_callable=clean_data, ) tw_stock_end = DummyOperator( task_id='tw_stock_end', trigger_rule='one_success' ) tw_stock_start >> check_weekday >> [is_workday, is_holiday] is_holiday >> tw_stock_end is_workday >> get_metadata >> clean_data >> tw_stock_end ``` - 一個operator其實就是一個task - task之間的dependency可以用 `>>` 和 `<<`符號來定義 - 其實有一大堆operator,舉幾個例子 - 用來執行python程式碼的 PythonOperator - 用來執行shell script的 BashOperator - 等待外部DAG的某個task執行完的 ExternalTaskSensor - .... 超級多的啦 > *還有更多operator等著你去發掘* > 官方提供的operator: [連結](https://airflow.apache.org/docs/stable/_api/airflow/operators/index.html) > contributer們提供的operator: [連結](https://airflow.apache.org/docs/stable/_api/airflow/contrib/operators/index.html) > ***operator其實也可以自己寫,把程式碼抽象化。*** - 在第44行大家可以看到 ```python=44 op_args=['{{ ds_nodash }}'] ``` 其中`'{{ds_nodash}}'`其實是macro,其代表"YYYYMMDD"格式的dag執行時間。 > 關於更多的macro請看這裡: [連結](https://airflow.apache.org/docs/stable/macros-ref.html) # UI介紹 > 以下所說的各種功能,絕大部分都能通過程式碼或指令達成。 > 不過我很笨,我只會用UI ## DAGs ![](https://i.imgur.com/Em4swmF.png) 1. 在經過剛剛的排程執行過後,自己先點點看DAG頁面的東西吧~ 2. 唯一比較需要注意的是最左邊的紅色按鈕`Delete Dag`,會將過去DAG Runs的紀錄砍掉,若該DAG依然存在且Scheduler還開著,便代表要將這個DAG從頭重跑一遍。 **點進DAG裡能夠看到下面這一行,皆是在表達該DAG的執行狀況或設定** ![](https://i.imgur.com/zIex3m5.png) ### Tree View ![](https://i.imgur.com/JibXBLw.png) - Pros: 能看到各個時間點該Dag的執行狀況(最右邊) - Cons: 圖較難理解 ### Graph View ![](https://i.imgur.com/entCSRd.png) - Pros: 圖好理解 - Cons: 一次只能一個時間點的執行狀況 ### Task ![](https://i.imgur.com/l71wHny.png) - 若是點擊了Workflow底下的Task,能跑出以下頁面, - `Zoom into SubDAG` : 只有在該Operator為SubDagOperator時才會出現,其代表這個Operator會執行一個Dag,範例請參考[連結](https://github.com/AxotZero/Airflow-MongoDB/tree/master/TW-Stock-practice/dags)** - `Task Instances Details` : 跟你說這個Task的Detail。 - `Rendered` 又名 `Rendered Template` : Template其實就是你自行想執行的其他語言的程式碼,如sql, hql....。如下圖![](https://i.imgur.com/93MmPa6.png) > ***關於Template的設定,後面會更細節討論*** - `View Log` : 就看該Task的Log - 下面有四個Function分別是Run, Clear, Mark Failed, Mark Sucess,其中 - `Run` 直接執行這個Task - `Ignore Task State` : 代表不論他的State如何就直接執行他(Default只有無狀態的Task才能被執行) - `Ignore Task Deps` : 代表不論該Task的上游執行了沒,就直接執行該Task - `Ignore All Deps` : 以上皆是 - `Clear`,將Task狀態清空,讓Scheduler能重新將該Task丟去排程,也就是所謂的BackFilling,當你Task的程式碼有所變動且想重新執行時,這就是個方便好用的東西。 - `Past`,每個DAG會有他被執行的時間(execution time),每個DAG執行過後會產生此task的task instance,將你所選的Task Instance所有之前(含自己)執行過的TaskInstance的狀態清掉。 - `Future`,跟上面一樣,不過是未來所有的Task Instance。 - `Upstream`,就上游的task。 - `Downstream`,就下游的task。 - `Recursive`,須和Upstream或Downstream配合使用,將該task所有上游或下游task清掉。 - `Failed`,只清掉state為fail的task。 以上六點可自行組合,假設我勾選了Past, Future, Downstream, Recursive -> 則代表將所有DAG的該Task和其下游全部清掉。 - `Mark Failed`和`Mark Success`,就只是標記而已,沒有其他功能。 ## Browse ![](https://i.imgur.com/64iuhfS.png) 查看你過去工作狀況的地方 ### SLA Misses ![](https://i.imgur.com/1OMTjEv.png) - SLA Misses是通常用於連線逾時時,寄信通知用的。 - 而這裡只是紀錄了過去SLA Misses發生的狀況。 - 如何設置SLA Misses? 1. 在你的airflow.cfg設定你的email設定 ```config= [smtp] smtp_host = smtp.gmail.com smtp_starttls = True smtp_ssl = False smtp_user = YOUR_EMAIL_ADDRESS smtp_password = 16_DIGIT_APP_PASSWORD smtp_port = 587 smtp_mail_from = YOUR_EMAIL_ADDRESS ``` 2. 在你的cli執行 ```bash= # 使你的airflow得到新的config設定 airflow resetdb ``` 4. 在你的Operator裡設定 `sla` 參數 ```python= mid = BashOperator( task_id='mid', sla=timedelta(seconds=5), bash_command='sleep 10', retries=0, ) ``` - 一旦時間超過sla的timedelta,就會寄信囉~ - 若是沒有設定email也沒關係,就只是不會寄信而已,不過在SLA Misses的地方還是會有紀錄唷~ ### Task Instances ![](https://i.imgur.com/1veYurG.png) - 過去所有Task執行的狀況,在這裡可以做搜尋、更改狀態...等。 ### Dag Runs ![](https://i.imgur.com/cvweo0n.png) - 過去所有Dags執行的狀況,在這裡可以做搜尋、更改、刪除狀態...等。 > ***關於Browse裡的Jobs和Logs我自己也不知道他是幹嘛的...。*** > **TO BE DONE.** ## Admin ![](https://i.imgur.com/AVdfQJb.png) ### Pools ![](https://i.imgur.com/bc1mpGY.png) - 資源控管用的變數,能夠限定執行的operator數量。 - 假設你希望你的postgres_db同一時間只能接受5個operator連接,那Pool就可以派上用場了 1. 先在UI上新增一個Pool `postgres_dwh` ![](https://i.imgur.com/bNLJw1K.png) 2. 在operator設定pool參數 ```python= extract_product = MyPostgresOperator( sql='select_product.sql' task_id='extract_product', pool='postgres_dwh') ``` 其中第四行的 `pool='postgres_dwh'`及代表將這個operator加入postgres_dwh的pool裡面。 ### Connections ![](https://i.imgur.com/V3sgWHo.png) ![](https://i.imgur.com/TsK5hpi.png) - 與外部平台連線用的變數 - 在後面的Hook介紹裡面會再提到。 ### Variables ![](https://i.imgur.com/rCavyZm.png) - 單純的變數 - 在設定完上圖的sql_path之後,若是我想在程式裡面引用他,只需要: ```python= from airflow.models import Variable tmpl_search_path = Variable.get("sql_path") ``` > 在UI上設定Pool, Connection, Variable這三種變數的優缺點: > 優: 設定一些機密的東西像Connection的設定能夠不用以明文的方式寫在程式碼裡面 > 缺: 要設定的東西太多的話會很累 > *這些設定其實也可以自行寫程式碼來設定,相關例子請看[這裡](https://github.com/gtoonstra/etl-with-airflow/blob/master/examples/etl-example/dags/init_etl_example.py),比較方便,不過就要以明文寫在程式碼裡面* ### XCOM ![](https://i.imgur.com/bfUh4VT.png) - 前面有提到過,上游task會將執行結果存到database,下游task從database來取得其執行結果,這便是airflow上下游task溝通的方式,也就是所謂的Xcom - 使用方式(以PythonOperator為例): 1. 設定operator的參數 `provide_context=True` ```python= my_operator = PythonOperator( task_id='my_task', python_callable=my_func, provide_context=True ) ``` 2. 在你所寫的functoin裡的使用方式 ```python= def my_func(**context): ti = context['task_instance'] # 得到上游的執行結果(key的default值為'return_value') previous_result = ti.xcom_pull(task_ids='previous_work') # 得到的新結果 new_result = previous_result + 100 # 存入Xcom(key='xcom_push') ti.xcom_push(key='xcom_push', value=new_result) # return 其實也會存到Xcom(key='return_value') return new_result ``` - 若是你是想讓別種Operator甚至別種語言來存取xcom怎麼辦? 若是你還記得前面曾提到過的macro的話,其實就是靠macro來達成這件事情唷! ```python= t2 = BashOperator( task_id='t2', bash_command='echo "{{ ti.xcom_pull("t1") }}"') ``` > 有關Xcom更多資訊,請看[這裡](https://airflow.apache.org/docs/stable/concepts.html?highlight=xcom) > ***有看過有人說若是資料太大的話,用這種方式會出錯,因此或許需要自行用別種方式來讓task溝通(我自己用過最大的資料是一個shape為 (13000\*20)&其值都是字串的dataframe,還沒有出錯過啦~~)*** ### Configuration 拿來看你airflow.cfg用的 ### Users > 不知道有甚麼用,不過有一個東西叫做 ***Webserver-RBAC mode*** 可能有相關,也是近期才發現的,我自己目前只有碰一下,但還有去沒摸索。 > **TO BE DONE** ## Data Profiling 一個讓你用sql來快速檢視數據的地方。 ![](https://i.imgur.com/dS5Tecb.png) ### Ad Hoc Query 讓你以sql query來看數據的地方 ![](https://i.imgur.com/EEZS3aX.png) ### Charts 讓你用sql query + 額外的一些設定,產生圖表的地方 1. 進到這個頁面,選擇更改或create一個圖表 ![](https://i.imgur.com/KNAJmPn.png) 2. edit/create頁面太大我就不放了,只要按照他給的layout來寫你的sql query,即可產生以下圖表。 ![](https://i.imgur.com/9gebKcU.png) ### Events > **我現在依然不知道Events是幹嘛用的。** > **TO BE DONE.** # 其他細項介紹 ## Hook [連結](https://airflow.apache.org/docs/stable/_api/index.html#hooks) - Airflow有提供一個叫Hook的東西,讓使用者可以先設定好Connections設定後,只需要在Hook裡丟conn_id,即可連線。 - 有哪些Hook以及有哪些function需要自行去看文件 - [airflow.hooks](https://airflow.apache.org/docs/stable/_api/airflow/hooks/index.html) - [airflow.contrib.hooks](https://airflow.apache.org/docs/stable/_api/airflow/contrib/hooks/index.html) - 以下以Mongodb為例 - ![](https://i.imgur.com/mcBmv3B.png) ### UI上設定 1. 在webserver的上方找到Admin/Connections,並點擊 - ![](https://i.imgur.com/8L8kXjo.png) 2. 在該頁面找到你要用的conn_id(我們這裡是"mongo_default"),並點擊左邊鉛筆的圖案;或也可以自行建立一個conn_id,在最上方點Create。 - ![](https://i.imgur.com/tPR2t6e.png) 3. 到這頁面就可以自己改啦~~~ - ![](https://i.imgur.com/tjVwr7A.png) ### 如何使用? - 給定Hook你的conn_id,剩下就看你要執行甚麼操作。 - [Mongodb文件連結](https://docs.mongodb.com/manual/) - 範例: ```python= # convert to double convert_query = [{ "$set":{ col: { '$convert':{ 'input': "$"+col, 'to': 'double', 'onError': None } } } }] MongoHook(conn_id='mongo_default').update_many( filter_doc={}, update_doc=convert_query, mongo_db=tmp_config['db'], mongo_collection=tmp_config['collection'] ) ``` ## Template Template的功用通常是讓你執行已經寫好的其他程式語言的程式碼,如之前已經寫好的sql, hql...等,就不需要將一切都寫在airflow裡面。 ### 我們直接看範例 1. 設定Operator要去哪裡找他的template,以及template的檔名。 ```python= my_operator = MyPostgresOperator( sql='my_sql.sql', template_searchpath='/sql_dir',) ``` 2. 前面說過了Operator可以自行定義了對吧,這裡就來順便看個範例Code吧。 ```python= class MyPostgresOperator(BaseOperator): template_fields = ('sql', 'parameters') template_ext = ('.sql',) ui_color = '#ededed' @apply_defaults def __init__( self, sql, postgres_conn_id='postgres_default' *args, **kwargs): super(MyPostgresOperator, self).__init__(*args, **kwargs) self.sql = sql self.postgres_conn_id = postgres_conn_id def execute(self, context): logging.info('Executing: ' + str(self.sql)) self.hook = PostgresHook(postgres_conn_id=self.postgres_conn_id) self.hook.run(self.sql) ``` - 這個Operator很簡單,就是通過你給的連線設定連線,並執行你所提供sql檔案。 - template_fields是會顯示在ui上的feature,可以自行設定 - 若template_fields裡出現以template_ext結尾的字串,則會自動將該檔案檔名替代成其內容 - 假設my_sql.sql的內容為 ```sql= SELECT * FROM my_table ``` - 則MyPostgresOperator的init,會將 `sql="my_sql.sql"` 替換成 `sql="SELECT * FROM my_table"` - 你可以在UI上點擊有使用template的task,接著點擊Render便會出現以下類似的畫面 ![](https://i.imgur.com/FN9lvvB.png) ## Executor [連結](https://airflow.apache.org/docs/stable/executor/debug.html) - Sequential Executor - default的Executor - 只能一次處理一個task - 遇到sensor的operator會直接卡住 - Debug Executor - 一次只能處理一個 - 遇到sensor的operator會reschedule而不會卡住 - 可以在config裡設定 'fail_fast'參數,只要遇到一個task fail,整個dag就fail,讓你好debug一點 - DaskExecutor - 支援cluster運算 - Dask是一個python的平行運算library,主要用於大資料(Dataframe, numpy, etc)處理及分析,而cluster運算只是他的其中一個功能。 - LocalExecutor - 支援multithread運算,但依然只能在local端做 - CeleryExecutor - 支援cluster運算 - Celery是一個用Python的分散式任務佇列(Distributed Task Queue),通常以RabbitMQ或Redis做為其Broker [Celery介紹](https://codertw.com/%E7%A8%8B%E5%BC%8F%E8%AA%9E%E8%A8%80/704976/) - Flower是airflow一個拿來監控Celery佇列的工具(也是一個server) - [Airflow CeleryExecutor 多台機器部署教學](https://blog.csdn.net/q383965374/article/details/104537121) - KubernetesExecutor - 支援cluster運算 - Kubernetes是一個能夠部署container的Tool,提供以下功能: - Deployment: 同時部署多個容器到多台機器上 - Scaling: 服務的乘載量有變化時,可以對容器做自動擴展 - Management: 管理多個容器的狀態,自動偵測並重啟故障的容器 - [Kubernetes介紹](https://medium.com/@C.W.Hu/kubernetes-basic-concept-tutorial-e033e3504ec0) > 關於在Airflow設定Executor的方法我自己也不是很清楚,只有試過別人寫好的Docker來去跑跑看 > LocalExecutor和CeleryExecutor: > - https://github.com/puckel/docker-airflow > - https://registry.hub.docker.com/r/puckel/docker-airflow > docker似乎有一個叫docker swarm的東西,是做cluster用的,不過我還沒看 > ***關於Executor的使用方法,還需讀者自行Google看看。*** # 其他 ## Airflow相關文章 - 入門 [Airflow 動手玩](https://www.coderbridge.com/@FrankYang0529) [一段 Airflow 與資料工程的故事:談如何用 Python 追漫畫連載](https://leemeng.tw/a-story-about-airflow-and-data-engineering-using-how-to-use-python-to-catch-up-with-latest-comics-as-an-example.html) - 進階 [ETL-best-practice](https://gtoonstra.github.io/etl-with-airflow/index.html) ## Windows10 WSL(Windows Subsystem Linux)跑airflow的教程 [WSL安裝及將資料夾移動至C:的教學](https://medium.com/@chenwy0806/%E5%9C%A8-windows-10-%E4%B8%8A%E9%81%8B%E8%A1%8C-ubuntu-linux-%E5%AD%90%E7%B3%BB%E7%B5%B1%E4%B8%A6%E5%B0%87%E7%B3%BB%E7%B5%B1%E7%A7%BB%E5%8B%95%E8%87%B3%E5%85%B6%E4%BB%96%E7%A1%AC%E7%A2%9F-57e8faaa3e04) [WSL安裝airflow的教學](https://coding-stream-of-consciousness.com/2018/11/06/apache-airflow-windows-10-install-ubuntu/) [WSL2+docker安裝](https://medium.com/@zex555.a198719/wsl2-docker-docker-compose%E7%92%B0%E5%A2%83%E6%9E%B6%E8%A8%AD%E8%A8%98%E9%8C%84-429f4bf5d68a) ## Mongodb教程 [如何安裝及run mongodb](https://docs.microsoft.com/en-us/windows/wsl/tutorials/wsl-database#install-mongodb) [pymongo](https://pypi.org/project/pymongo/) [aggregate stage](https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/) [aggregate operator介紹](https://docs.mongodb.com/manual/reference/operator/aggregation/)