"""Example Airflow DAG that creates a Cloud Dataflow workflow which takes a text file and adds the rows to a BigQuery table. This DAG relies on four Airflow variables https://airflow.apache.org/concepts.html#variables * project_id - Google Cloud Project ID to use for the Cloud Dataflow cluster. * gce_zone - Google Compute Engine zone where Cloud Dataflow cluster should be created. * gce_region - Google Compute Engine region where Cloud Dataflow cluster should be created. Learn more about the difference between the two here: https://cloud.google.com/compute/docs/regions-zones * bucket_path - Google Cloud Storage bucket where you've stored the User Defined Function (.js), the input file (.txt), and the JSON schema (.json). Author: Andy Lai Playsee ETL Framework ver 0.1.8 """ from airflow import models from airflow.utils.dates import days_ago from airflow.contrib.operators import bigquery_operator from airflow.operators.bash_operator import BashOperator from airflow.contrib.operators import kubernetes_pod_operator from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator from airflow.operators.python_operator import PythonOperator import datetime import time from datetime import datetime, timedelta import os import re """ Return micro-seconds timestamp """ def cal_data_ingestion_timerange(): # time truncate today = datetime.utcnow().date() dt_string = today.strftime("%d/%m/%Y") end_at = datetime.strptime(dt_string, '%d/%m/%Y') start_at = end_at - timedelta(days=1) return int(start_at.timestamp()* 1000 * 1000), int(end_at.timestamp() * 1000 * 1000 -1) start_at, end_at = cal_data_ingestion_timerange() ###### Update this table ID bq_transform_artifact_table_id = 'None' #<DATASET_ID>.<TABLE_ID> dataTimestampStartAt = '0' dataTimestampEndAt = '0' ###### ###### Replace automatically by etl-gen table_id = 'None' bq_staging_table_id = 'None' schema = 'sysops-scale-autotest' dag_id = 'sysops-scale-autotest-down' ###### ###### Load predefined value from environment project_id = os.environ.get('project_id') if os.environ.get('project_id') else models.Variable.get("project_id") slack_notify_channel= os.environ.get('slack_channel_etl_general') if os.environ.get('slack_channel_etl_general') else models.Variable.get("slack_channel_etl_general") ###### ###### Predefined variable (generated by elt-gen) table_schema_path = f"gs://{project_id}/datalake/schema/{schema}.json" param_start_at = "{{ task_instance.xcom_pull(task_ids='read_param', key='startAt') }}" param_end_at = "{{ task_instance.xcom_pull(task_ids='read_param', key='endAt') }}" ###### def on_failure_callback(context): ti = context['ti'] task_id = f'{ti.task_id}_on_failure_callback' kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator( # The ID specified for the task. task_id=task_id, name=task_id, namespace='default', is_delete_operator_pod=True, env_vars= { 'SLACK_WEBHOOK': slack_notify_channel, 'SLACK_USERNAME': 'Cloud Composer', 'SLACK_MESSAGE': '{"text":"[%s] ETL job failed, task id = %s"}' % (dag_id,task_id), }, image='gcr.io/repo-backend/slack-notify') return kubernetes_min_pod.execute(context=context) default_args = { # Tell airflow to start one day ago, so that it runs as soon as you upload it "start_date": days_ago(1), 'on_failure_callback': on_failure_callback, } # Define a DAG (directed acyclic graph) of tasks. # Any task you create within the context manager is automatically added to the # DAG object. with models.DAG( # The id you will see in the DAG airflow page dag_id, default_args=default_args, max_active_runs=1, # The interval with which to schedule the DAG schedule_interval=None, # Override to match your needs ) as dag: scale = BashOperator( task_id=f'scale', bash_command=f""" echo "done" """, ) notify = kubernetes_pod_operator.KubernetesPodOperator( # The ID specified for the task. task_id="notify", name="notify", namespace='default', is_delete_operator_pod=True, env_vars= { 'SLACK_WEBHOOK': slack_notify_channel, 'SLACK_USERNAME': 'Cloud Composer', 'SLACK_MESSAGE': f'{{\"text\":\"[{dag_id}] ETL job done\"}}', }, image='gcr.io/repo-backend/slack-notify') ( scale >> notify ) # >> end_template_job