--- lang: ja-jp tags: Python, Apache Airflow --- # Apache Airflow Tutorial [Apache Airflow: Tutorial](https://airflow.apache.org/docs/stable/tutorial.html) このチュートリアルでは Airflow の基本的な概念やオブジェクト、またそれらのパイプライン定義における利用方法について説明します。 ## Example Pipeline definition ここでは基本的なパイプラインの定義の具体例を掲載します。一見複雑に見えたとしても心配しないでください。1行ごと丁寧に追った説明を追記しています。 ```python= from datetime import timedelta # DAG オブジェクト : DAG を生成するために必要になります from airflow import DAG # Operators : 操作を行うのに必要です from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago # 以下の設定は各オペレータに渡されるものとなります # オペレータの初期化時にタスクごとにその設定を更新することも可能です default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': days_ago(2), 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), # 'wait_for_downstream': False, # 'dag': dag, # 'sla': timedelta(hours=2), # 'execution_timeout': timedelta(seconds=300), # 'on_failure_callback': some_function, # 'on_success_callback': some_other_function, # 'on_retry_callback': another_function, # 'sla_miss_callback': yet_another_function, # 'trigger_rule': 'all_success' } dag = DAG( 'tutorial', default_args=default_args, description='A simple tutorial DAG', schedule_interval=timedelta(days=1), ) t1 = BashOperator( task_id='print_date', bash_command='date', dag=dag, ) t2 = BashOperator( task_id='sleep', depends_on_past=False, bash_command='sleep 5', retries=3, dag=dag, ) dag.doc_md = __doc__ t1.doc_md = """\ #### Task Documentation タスクノードとしての Operator インスタンスの `doc_md` (マークダウン)属性を利用してタスクの内容を文書化できます。マークダウンの他にも JSON や YAML でも記述できます。 """ templated_command = """ {% for i in range(5) %} echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7) }}" echo "{{ params.my_param }}" {% endfor %} """ t3 = BashOperator( task_id='templated', depends_on_past=False, bash_command=templated_command, params={'my_param': 'Parameter I passed in'}, dag=dag, ) t1 >> [t2, t3] ``` ## It's a DAG definition file みなさんにとって最初のうちではあまり直感的ではないことの一つとして頭に入れておいて欲しいこととしては、上記のような Python スクリプトは DAG の構造を記述する単なる設定ファイルであるということです。**こちらで定義される実際のタスクはスクリプトのコンテキストとは異なるものの上で実行されることになります**。異なるタスクは異なるワーカーの異なる時系列において実行され得るということです。このことは**スクリプト内では、タスク間で互いに通信することが出来ない**ということを意味しています。タスク間の通信を実現するには、 `Xcom` というより高度な機能を利用することになります。 ## Importing Modules Airflow パイプラインは Airflow DAG オブジェクトを定義する Python スクリプトです(偶然にも Python で記述されるだけです)。必要なライブラリを利用可能にすることからすべては始まります: ```python= # 定数っぽく見えるクラス from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago ``` ## Default Arguments ライブラリから必要なものをインポートしたら、DAGやいくつかのタスクを作成することになりますが、タスクの設定について、各タスクのコンストラクタにそれぞれ明示的(そして冗長)に渡すか、あるいは各タスクを生成する際のデフォルトの設定を辞書形式で定義するか、いずれかを選択することができます。 ```python= # These args will get passed on to each operator # You can override them on a per-task basis during operator initialization default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': days_ago(2), 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), # 'wait_for_downstream': False, # 'dag': dag, # 'sla': timedelta(hours=2), # 'execution_timeout': timedelta(seconds=300), # 'on_failure_callback': some_function, # 'on_success_callback': some_other_function, # 'on_retry_callback': another_function, # 'sla_miss_callback': yet_another_function, # 'trigger_rule': 'all_success' } ``` BaseOperator のパラメータとそれぞれの役割については、 `airflow.models.BaseOperator` のドキュメントを参照してください。 :::info DAG の設定ではなく、 DAG を構成するタスクのデフォルトの設定であることに注意する。また、 `BaseOperator` は全ての Operator クラスに共通の振る舞いをまとめたものであるので、パラメータの機能について詳細を知りたい場合に参照するように勧めていると思われる。 ::: ## Instantiate a DAG 次に、タスクがネストして登録される DAG オブジェクトが必要になります。 DAG の識別子として `dag_id` 文字列を第一引数に渡しています。また、デフォルト設定をまとめた辞書オブジェクトと `schedule_interval` が1日であるという設定を渡しています。 ```python= dag = DAG( 'tutorial', default_args=default_args, description='A simple tutorial DAG', schedule_interval=timedelta(days=1), ) ``` ## Tasks タスクは Operator オブジェクトを生成することにより生成されます。 Operator から初期化されるオブジェクトのことを「コンストラクタ」と呼んでいます。第一引数は `task_id` であり、タスクの一意な識別子となります。 ```python= t1 = BashOperator( task_id='print_date', bash_command='date', dag=dag, ) t2 = BashOperator( task_id='sleep', depends_on_past=False, bash_command='sleep 5', retries=3, dag=dag, ) ``` ここでは Operator に固有の( `bash_command` という)引数と BaseOperator から継承され全ての Operator に共通の設定( `retries` という)引数の両者を混合して渡している点に注意してください。逐一コンストラクタの呼び出しに対して全ての引数を渡すよりも完結です。2番目のタスクでは `retries` を上書きしていることに注意してください。 設定の優先順位は次の通りです: 1. 明示的に渡された引数 2. `default_args` 辞書オブジェクトに存在する設定値 3. (存在する場合には)Operator の固有の値 タスクは `task_id` と `owner` の引数の設定を必須とし、これらが渡されない場合にはエラーを引き起こします。 ## Templating with Jinja WIP **!!!BigQuery に対するクエリを Jinja2 テンプレート化しておけば、 `params` 引数からクエリを組み立てることができる!!!** ## Adding DAG and Tasks documentation WIP ## Setting up Dependencies `t1` , `t2` , `t3` といった3つの互いに依存しないタスクの定義があります。これらの依存関係を定義するには、次のような方法があります。 ```python= t1.set_downstream(t2) # t1 の下流に t2 を設定する t2.set_upstream(t1) # t2 の上流に t1 を設定する # シフト演算子を利用することも可能 t1 >> t2 # t1 が実行されてから t2 が実行される t2 << t1 # t2 が実行される前に t1 が実行される # 複数の依存関係をシフト演算子でより完結に記述できます t1 >> t2 >> t3 # タスクのリストから依存性を記述することも可能です # 以下は全て同じ内容です: t1.set_downstream([t2, t3]) t1 >> [t2, t3] [t2, t3] << t1 ``` DAG のタスク同士が循環したり、2度以上同一タスクが参照されたりすると Airflow は例外を送出します(トポロジカルソート可能でなければならない)。 ## Testing ### Running the Script なんらかテストを実行する時がやってきました。まずは、パイプラインが正しく解釈されることを確かめましょう。 これまでのスクリプトを `tutorial.py` として `airflow.cfg` ファイルが参照する DAG のフォルダに保存されているとします。デフォルトでの該当の設定値は `~/airflow/dags` となっています。 ```shell= % python ~/airflow/dags/tutorial.py ``` スクリプトが例外を送出しなければ、大きく間違った定義を行なっていないということになります。 ### Command Line Metadata Validation スクリプトをより詳細に検証してみることにしましょう: ```shell= # 有効な DAG のリストを表示する % airflow list_dags # "tutorial" の dag_id を持つパイプラインに含まれるタスク一覧を表示する % airflow list_tasks tutorial # DAG 内部のタスクの階層構造を表示する % airflow list_tasks tutorial --tree ``` ### Testing 特定の日付のもとで実際のタスクインスタンスを実行することで、動作確認をしてみます。このコンテキストにおいて指定される日付を `execution_date` を呼びます。 これは *理論的* な日付であり、タスクを実行するスケジューラが特定の日時において実行されるかどうかを「現在」の物理的な日時において検証するためのものです。 ```shell= # コマンド形式: command subcommand dag_id task_id date # task_id='print_date' をテストする airflow test tutorial print_date 2015-06-01 # task_id='sleep' をテストする airflow test tutorial sleep 2015-06-01 ``` `airflow test` コマンドではタスクインスタンスはローカルに実行され、ログは標準出力に表示されます。依存関係のあるタスクを起動させたり、タスクの状態がデータベースに記述されるといったこともありません。 ### Backfill :::info `backfill` は「実行履歴」くらいの意味だと思われる。 ::: ここまで全てがうまく行っているのであれば、 `backfill` を実行してみましょう。 `backfill` ではタスクの依存関係を辿り、吐き出されるログをファイルに記録し、データベースにステータスを登録します。Webサーバを起動している場合には、進行の程度を追跡することが可能です。 `backfill` の進行を可視的に追跡したい場合には、 `airflow webserver` でWebサーバを起動させてください。 `depends_on_past=True` とタスクを設定している場合には、個々のタスクインスタンスはそれよりも以前の自身のタスクインスタンス( `execution_date` よりも記録される、以前の実行結果)に依存するようになります。 `execution_date==start_date` となっている場合には、この依存関係が無視されることになります:なぜならばタスクインスタンスは自身の過去の実行履歴を持たないからです。 `depends_on_past=True` としている場合には、 `wait_for_downstream=True` とすることを考慮したくなるかもしれません。 `depends_on_past=True` では、それ自身の以前のタスクインスタンスの成功に依存させるようになりますが、 `wait_for_downstream=True` では直接依存する下流のタスクインスタンス全てが前回の自身の実行において成功していることを要求するようになります。 :::info `depends_on_past=True` $\subseteq$ `wait_for_downstream=True` であり、差分はタスクインスタンス自身の下流が成功したかどうかを要求したかどうか。 :::