"""Example Airflow DAG that creates a Cloud Dataflow workflow which takes a
text file and adds the rows to a BigQuery table.
This DAG relies on four Airflow variables
https://airflow.apache.org/concepts.html#variables
* project_id - Google Cloud Project ID to use for the Cloud Dataflow cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataflow cluster should be
created.
* gce_region - Google Compute Engine region where Cloud Dataflow cluster should be
created.
Learn more about the difference between the two here:
https://cloud.google.com/compute/docs/regions-zones
* bucket_path - Google Cloud Storage bucket where you've stored the User Defined
Function (.js), the input file (.txt), and the JSON schema (.json).
Author: Andy Lai
Playsee ETL Framework ver 0.1.8
"""
from airflow import models
from airflow.utils.dates import days_ago
from airflow.contrib.operators import bigquery_operator
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators import kubernetes_pod_operator
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from airflow.operators.python_operator import PythonOperator
import datetime
import time
from datetime import datetime, timedelta
import os
import re
"""
Return micro-seconds timestamp
"""
def cal_data_ingestion_timerange():
# time truncate
today = datetime.utcnow().date()
dt_string = today.strftime("%d/%m/%Y")
end_at = datetime.strptime(dt_string, '%d/%m/%Y')
start_at = end_at - timedelta(days=1)
return int(start_at.timestamp()* 1000 * 1000), int(end_at.timestamp() * 1000 * 1000 -1)
start_at, end_at = cal_data_ingestion_timerange()
###### Update this table ID
bq_transform_artifact_table_id = 'None' #<DATASET_ID>.<TABLE_ID>
dataTimestampStartAt = '0'
dataTimestampEndAt = '0'
######
###### Replace automatically by etl-gen
table_id = 'None'
bq_staging_table_id = 'None'
schema = 'sysops-scale-autotest'
dag_id = 'sysops-scale-autotest-down'
######
###### Load predefined value from environment
project_id = os.environ.get('project_id') if os.environ.get('project_id') else models.Variable.get("project_id")
slack_notify_channel= os.environ.get('slack_channel_etl_general') if os.environ.get('slack_channel_etl_general') else models.Variable.get("slack_channel_etl_general")
######
###### Predefined variable (generated by elt-gen)
table_schema_path = f"gs://{project_id}/datalake/schema/{schema}.json"
param_start_at = "{{ task_instance.xcom_pull(task_ids='read_param', key='startAt') }}"
param_end_at = "{{ task_instance.xcom_pull(task_ids='read_param', key='endAt') }}"
######
def on_failure_callback(context):
ti = context['ti']
task_id = f'{ti.task_id}_on_failure_callback'
kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
# The ID specified for the task.
task_id=task_id,
name=task_id,
namespace='default',
is_delete_operator_pod=True,
env_vars= {
'SLACK_WEBHOOK': slack_notify_channel,
'SLACK_USERNAME': 'Cloud Composer',
'SLACK_MESSAGE': '{"text":"[%s] ETL job failed, task id = %s"}' % (dag_id,task_id),
},
image='gcr.io/repo-backend/slack-notify')
return kubernetes_min_pod.execute(context=context)
default_args = {
# Tell airflow to start one day ago, so that it runs as soon as you upload it
"start_date": days_ago(1),
'on_failure_callback': on_failure_callback,
}
# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
# The id you will see in the DAG airflow page
dag_id,
default_args=default_args,
max_active_runs=1,
# The interval with which to schedule the DAG
schedule_interval=None, # Override to match your needs
) as dag:
scale = BashOperator(
task_id=f'scale',
bash_command=f"""
echo "done"
""",
)
notify = kubernetes_pod_operator.KubernetesPodOperator(
# The ID specified for the task.
task_id="notify",
name="notify",
namespace='default',
is_delete_operator_pod=True,
env_vars= {
'SLACK_WEBHOOK': slack_notify_channel,
'SLACK_USERNAME': 'Cloud Composer',
'SLACK_MESSAGE': f'{{\"text\":\"[{dag_id}] ETL job done\"}}',
},
image='gcr.io/repo-backend/slack-notify')
(
scale >> notify
) # >> end_template_job