Try   HackMD

Introduction to Airflow in Python

tags: DataCamp

Delivering data on a schedule can be a manual process. You write scripts, add complex cron tasks, and try various ways to meet an ever-changing set of requirements—and it’s even trickier to manage everything when working with teammates. Airflow can remove this headache by adding scheduling, error handling, and reporting to your workflows. In this course, you’ll master the basics of Airflow and learn how to implement complex data engineering pipelines in production. You'll also learn how to use Directed Acyclic Graphs (DAGs), automate data engineering workflows, and implement data engineering tasks in an easy and repeatable fashion—helping you to maintain your sanity.

Intro to Airflow

Introduction to Airflow

Running a task in Airflow

You've just started looking at using Airflow within your company and would like to try to run a task within the Airflow platform. You remember that you can use the airflow run command to execute a specific task within a workflow. Note that an error while using airflow run will return airflow.exceptions.AirflowException: on the last line of output.

An Airflow DAG is set up for you with a dag_id of etl_pipeline. The task_id is download_file and the start_date is 2020-01-08. All other components needed are defined for you.

Which command would you enter in the console to run the desired task?

Ans: airflow run etl_pipeline download_file 2020-01-08

Examining Airflow commands

While researching how to use Airflow, you start to wonder about the airflow command in general. You realize that by simply running airflow you can get further information about various sub-commands that are available.

Which of the following is NOT an Airflow sub-command?

  1. list_dags
  2. edit_dag (* not a command)
  3. test
  4. scheduler

HELP: airflow -h

Airflow DAGs

Defining a simple DAG

You've spent some time reviewing the Airflow components and are interested in testing out your own workflows. To start you decide to define the default arguments and create a DAG object for your workflow.

The DateTime object has been imported for you.

Instructions

  • Import the Airflow DAG object. Note that it is case-sensitive.
  • Define the default_args dictionary with a key owner and a value of 'dsmith'.
  • Add a start_date of January 14, 2020 to default_args using the value 1 for the month of January.
    Add a retries count of 2 to default_args.
  • Instantiate the DAG object to a variable called etl_dag with a DAG named example_etl.
  • Add the default_args dictionary to the appropriate argument.

Solution

# Import the DAG object
from airflow.models import DAG

# Define the default_args dictionary
default_args = {
  'owner': 'dsmith',
  'start_date': datetime(2020, 1, 14),
  'retries': 2
}

# Instantiate the DAG object
etl_dag = DAG('example_etl', default_args=default_args)

Working with DAGs and the Airflow shell

While working with Airflow, sometimes it can be tricky to remember what DAGs are defined and what they do. You want to gain some further knowledge of the Airflow shell command so you'd like to see what options are available.

Multiple DAGs are already defined for you. How many DAGs are present in the Airflow system from the command-line?

Solution:
airflow list_dags

Troubleshooting DAG creation

Now that you've successfully worked with a couple workflows, you notice that sometimes there are issues making a workflow appear within Airflow. You'd like to be able to better troubleshoot the behavior of Airflow when there may be something wrong with the code.

Two DAGs are defined for you and Airflow is setup. Note that any changes you make within the editor are automatically saved.

Instructions:

  • Use the airflow shell command to determine which DAG is not being recognized correctly.
  • After you determine the broken DAG, open the file and fix any Python errors.
  • Once modified, verify that the DAG now appears within Airflow's output.

Airflow web interface

Starting the Airflow webserver

You've successfully created some DAGs within Airflow using the command-line tools, but notice that it can be a bit tricky to handle scheduling / troubleshooting / etc. After reading the documentation further, you realize that you'd like to access the Airflow web interface. For security reasons, you'd like to start the webserver on port 9090.

Which airflow command would you use to start the webserver on port 9090?

Airflow is installed and accessible from the command line. Remember to use the airflow -h command if needed. airflow <subcommand> -h will provide further detail.

Answer:
airflow webserver -p 9090

To gain some familiarity with the Airflow UI, you decide to explore the various pages. You'd like to know what has happened on your Airflow instance thus far.

Which of the following events have not run on your Airflow instance?

Examining DAGs with the Airflow UI

You've become familiar with the basics of an Airflow DAG and the basics of interacting with Airflow on the command-line. Your boss would like you to show others on your team how to examine any available DAGs. In this instance, she would like to know which operator is NOT in use with the DAG called update_state, as your team is trying to verify the components used in production workflows.

Remember that the Airflow UI allows various methods to view the state of DAGs. The Tree View lists the tasks and any ordering between them in a tree structure, with the ability to compress / expand the nodes. The Graph View shows any tasks and their dependencies in a graph structure, along with the ability to access further details about task runs. The Code view provides full access to the Python code that makes up the DAG.

Remember to select the operator NOT used in this DAG.

Implementing Airflow DAGs

Airflow operators

Defining a BashOperator task

The BashOperator allows you to specify any given Shell command or script and add it to an Airflow workflow. This can be a great start to implementing Airflow in your environment.

As such, you've been running some scripts manually to clean data (using a script called cleanup.sh) prior to delivery to your colleagues in the Data Analytics group. As you get more of these tasks assigned, you've realized it's becoming difficult to keep up with running everything manually, much less dealing with errors or retries. You'd like to implement a simple script as an Airflow operator.

The Airflow DAG analytics_dag is already defined for you and has the appropriate configurations in place.

Instructions:

  • Import the BashOperator object.
  • Define a BashOperator called cleanup with the task_id of cleanup_task.
  • Use the command cleanup.sh.
  • Add the operator to the DAG.

Solution:

# Import the BashOperator
from airflow.operators.bash_operator import BashOperator

# Define the BashOperator 
cleanup = BashOperator(
    task_id='cleanup_task',
    # Define the bash_command
    bash_command='cleanup.sh',
    # Add the task to the dag
    dag=analytics_dag
)

Multiple BashOperators

Airflow DAGs can contain many operators, each performing their defined tasks.

You've successfully implemented one of your scripts as an Airflow task and have decided to continue migrating your individual scripts to a full Airflow DAG. You now want to add more components to the workflow. In addition to the cleanup.sh used in the previous exercise you have two more scripts, consolidate_data.sh and push_data.sh. These further process your data and copy to its final location.

The DAG analytics_dag is available as before, and your cleanup task is still defined. The BashOperator is already imported.

Instructions:

  • Define a BashOperator called consolidate, to run consolidate_data.sh with a task_id of consolidate_task.
  • Add a final BashOperator called push_data, running push_data.sh and a task_id of pushdata_task.

Solution:

# Define a second operator to run the `consolidate_data.sh` script
consolidate = BashOperator(
    task_id='consolidate_task',
    bash_command='consolidate_data.sh',
    dag=analytics_dag)

# Define a final operator to execute the `push_data.sh` script
push_data = BashOperator(
    task_id='pushdata_task',
    bash_command='push_data.sh',
    dag=analytics_dag)

Airflow tasks

Define order of BashOperators

Now that you've learned about the bitshift operators, it's time to modify your workflow to include a pull step and to include the task ordering. You have three currently defined components, cleanup, consolidate, and push_data.

The DAG analytics_dag is available as before and the BashOperator is already imported.

Instructions:

  • Define a BashOperator called pull_sales with a bash command of wget https://salestracking/latestinfo?json.
  • Set the pull_sales operator to run before the cleanup task.
  • Configure consolidate to run next, using the downstream operator.
  • Set push_data to run last using either bitshift operator.

Solution:

# Define a new pull_sales task
pull_sales = BashOperator(
    task_id='pullsales_task',
    bash_command='wget https://salestracking/latestinfo?json',
    dag=analytics_dag
)

# Set pull_sales to run prior to cleanup
pull_sales >> cleanup

# Configure consolidate to run after cleanup
cleanup >> consolidate

# Set push_data to run last
consolidate >> push_data

Determining the order of tasks

While looking through a colleague's workflow definition, you're trying to decipher exactly in which order the defined tasks run. The code in question shows the following:

pull_data << initialize_process
pull_data >> clean >> run_ml_pipeline
generate_reports << run_ml_pipeline

Instructions:

  • Order the tasks in the sequence defined by the bitshift code, with the first task to run on top and the last task to run on the bottom.

Solution:

image

Troubleshooting DAG dependencies

You've created a DAG with intended dependencies based on your workflow but for some reason Airflow won't load / execute the DAG. Try using the terminal to:

  • List the DAGs.
  • Decipher the error message.
  • Use cat workspace/dags/codependent.py to view the Python code.
  • Determine which of the following lines should be removed from the Python code. You may want to consider the last line of the file.
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime

default_args = {
  'owner': 'dsmith',
  'start_date': datetime(2020, 2, 12),
  'retries': 1
}

codependency_dag = DAG('codependency', default_args=default_args)

task1 = BashOperator(task_id='first_task',
                     bash_command='echo 1',
                     dag=codependency_dag)

task2 = BashOperator(task_id='second_task',
                     bash_command='echo 2',
                     dag=codependency_dag)

task3 = BashOperator(task_id='third_task',
                     bash_command='echo 3',
                     dag=codependency_dag)

# task1 must run before task2 which must run before task3
task1 >> task2
task2 >> task3
task3 >> task1

Additional operators

Using the PythonOperator

You've implemented several Airflow tasks using the BashOperator but realize that a couple of specific tasks would be better implemented using Python. You'll implement a task to download and save a file to the system within Airflow.

The requests library is imported for you, and the DAG process_sales_dag is already defined.

Instructions:

  • Define a function called pull_file with two parameters, URL and savepath.
  • Use the print() function and Python f-strings to write a message to the logs.
  • Import the necessary object to use the Python Operator.
  • Create a new task assigned to the variable pull_file_task, with the id pull_file.
  • Add the pull_file(URL, savepath) function defined previously to the operator.
  • Define the arguments needed for the task.

Solution:

def pull_file(URL, savepath):
    r = requests.get(URL)
    with open(savepath, 'wb') as f:
        f.write(r.content)   
    # Use the print method for logging
    print(f"File pulled from {URL} and saved to {savepath}")

from airflow.operators.python_operator import PythonOperator

# Create the task
pull_file_task = PythonOperator(
    task_id='pull_file',
    # Add the callable
    python_callable=pull_file,
    # Define the arguments
    op_kwargs={'URL':'http://dataserver/sales.json', 'savepath':'latestsales.json'},
    dag=process_sales_dag
)

More PythonOperators

To continue implementing your workflow, you need to add another step to parse and save the changes of the downloaded file. The DAG process_sales_dag is defined and has the pull_file task already added. In this case, the Python function is already defined for you, parse_file(inputfile, outputfile).

Note that often when implementing Airflow tasks, you won't necessarily understand the individual steps given to you. As long as you understand how to wrap the steps within Airflow's structure, you'll be able to implement a desired workflow.

Instructions:

  • Define the Python task to the variable parse_file_task with the id parse_file.
  • Add the parse_file(inputfile, outputfile) to the Operator.
  • Define the arguments to pass to the callable.
  • Add the task to the DAG.

Solution:

# Add another Python task
parse_file_task = PythonOperator(
    task_id='parse_file',
    # Set the function to call
    python_callable=parse_file,
    # Add the arguments
    op_kwargs={'inputfile':'latestsales.json', 'outputfile':'parsedfile.json'},
    # Add the DAG
    dag=process_sales_dag
)

EmailOperator and dependencies

Now that you've successfully defined the PythonOperators for your workflow, your manager would like to receive a copy of the parsed JSON file via email when the workflow completes. The previous tasks are still defined and the DAG process_sales_dag is configured.

Instructions:

  • Import the class to send emails.
  • Define the Operator and add the appropriate arguments (to, subject, files).
  • Set the task order so the tasks run sequentially (Pull the file, parse the file, then email your manager).

Solution:

# Import the Operator
from airflow.operators.email_operator import EmailOperator

# Define the task
email_manager_task = EmailOperator(
    task_id='email_manager',
    to='manager@datacamp.com',
    subject='Latest sales JSON',
    html_content='Attached is the latest sales JSON file as requested.',
    files='parsedfile.json',
    dag=process_sales_dag
)

# Set the order of tasks
pull_file_task >> parse_file_task >> email_manager_task

Airflow scheduling

Schedule a DAG via Python

You've learned quite a bit about creating DAGs, but now you would like to schedule a specific DAG on a specific day of the week at a certain time. You'd like the code include this information in case a colleague needs to reinstall the DAG to a different server.

The Airflow DAG object and the appropriate datetime methods have been imported for you.

Instructions:

  • Set the start date of the DAG to November 1, 2019.
  • Configure the retry_delay to 20 minutes. You will learn more about the timedelta object in Chapter 3. For now, you just need to know it expects an integer value.
    *Use the cron syntax to configure a schedule of every Wednesday at 12:30pm.
    Solution:
# Update the scheduling arguments as defined
default_args = {
  'owner': 'Engineering',
  'start_date': datetime(2019, 11, 1),
  'email': ['airflowresults@datacamp.com'],
  'email_on_failure': False,
  'email_on_retry': False,
  'retries': 3,
  'retry_delay': timedelta(minutes=20)
}

dag = DAG('update_dataflows', default_args=default_args, schedule_interval='30 12 * * 3')

Deciphering Airflow schedules

Given the various options for Airflow's schedule_interval, you'd like to verify that you understand exactly how intervals relate to each other, whether it's a cron format, timedelta object, or a preset.

Instructions:

  • Order the schedule intervals from least to greatest amount of time.

Solution:

Troubleshooting DAG runs

You've scheduled a DAG called process_sales which is set to run on the first day of the month and email your manager a copy of the report generated in the workflow. The start_date for the DAG is set to February 15, 2020. Unfortunately it's now March 2nd and your manager did not receive the report and would like to know what happened.

Use the information you've learned about Airflow scheduling to determine what the issue is.

Maintaining and monitoring Airflow workflows

Airflow sensors

Sensors vs operators

As you've just learned about sensors, you want to verify you understand what they have in common with normal operators and where they differ.

Instructions:

  • Move each entry into the Sensors, Operators, or Both bucket.

Solution:

Sensory deprivation

You've recently taken over for another Airflow developer and are trying to learn about the various workflows defined within the system. You come across a DAG that you can't seem to make run properly using any of the normal tools. Try exploring the DAG for any information about what it might be looking for before continuing.

Airflow executors

Determining the executor

While developing your DAGs in Airflow, you realize you're not certain the configuration of the system. Using the commands you've learned, determine which of the following statements is true.

  • Answer: airflow list_dags

Executor implications

You're learning quite a bit about running Airflow DAGs and are gaining some confidence at developing new workflows. That said, your manager has mentioned that on some days, the workflows are taking a lot longer to finish and asks you to investigate. She also mentions that the salesdata_ready.csv file is taking longer to generate these days and the time of day it is completed is variable.

This exercise requires information from the previous two lessons - remember the implications of the available arguments and modify the workflow accordingly. Note that for this exercise, you're expected to modify one line of code, not add any extra code.

Instructions:

  • Determine the level of parallelism available on this system. You can do this by listing dags (airflow list_dags).
  • Look at the source for the DAG file and fix which entry is causing the problem.

Debugging and troubleshooting in Airflow

DAGs in the bag

You've taken over managing an Airflow cluster that you did not setup and are trying to learn a bit more about the system. Which of the following is true?

Missing DAG

Your manager calls you before you're about to leave for the evening and wants to know why a new DAG workflow she's created isn't showing up in the system. She needs this DAG called execute_report to appear in the system so she can properly schedule it for some tests before she leaves on a trip.

Airflow is configured using the ~/airflow/airflow.cfg file.

SLAs and reporting in Airflow

Defining an SLA

You've successfully implemented several Airflow workflows into production, but you don't currently have any method of determining if a workflow takes too long to run. After consulting with your manager and your team, you decide to implement an SLA at the DAG level on a test workflow.

All appropriate Airflow libraries have been imported for you.

Instructions

  • Import the timedelta object.
  • Define an SLA of 30 minutes.
  • Add the SLA to the DAG.

Solution

# Import the timedelta object
from datetime import timedelta

# Create the dictionary entry
default_args = {
  'start_date': datetime(2020, 2, 20),
  'sla': timedelta(minutes=30)
}

# Add to the DAG
test_dag = DAG('test_workflow', default_args=default_args, schedule_interval='@None')

Defining a task SLA

After completing the SLA on the entire workflow, you realize you really only need the SLA timing on a specific task instead of the full workflow.

The appropriate Airflow libraries are imported for you.

Instructions

  • Import the timedelta object.
  • Add a 3 hour SLA to the task object

Solution

# Import the timedelta object
from datetime import timedelta

test_dag = DAG('test_workflow', start_date=datetime(2020,2,20), schedule_interval='@None')

# Create the task with the SLA
task1 = BashOperator(task_id='first_task',
                     sla=timedelta(hours=3),
                     bash_command='initialize_data.sh',
                     dag=test_dag)

Generate and email a report

Airflow provides the ability to automate almost any style of workflow. You would like to receive a report from Airflow when tasks complete without requiring constant monitoring of the UI or log files. You decide to use the email functionality within Airflow to provide this message.

All the typical Airflow components have been imported for you, and a DAG is already defined as dag.

Instructions

  • Define the proper operator for the email_report task.
  • Fill the missing details for the Operator. Use the file named monthly_report.pdf.
  • Set the email_report task to occur after the generate_report task.

Solution

# Define the email task
email_report = EmailOperator(
        task_id='email_report',
        to='airflow@datacamp.com',
        subject='Airflow Monthly Report',
        html_content="""Attached is your monthly workflow report - please refer to it for more detail""",
        files=["monthly_report.pdf"],
        dag=report_dag
)

# Set the email task to run after the report is generated
email_report << generate_report

Adding status emails

You've worked through most of the Airflow configuration for setting up your workflows, but you realize you're not getting any notifications when DAG runs complete or fail. You'd like to setup email alerting for the success and failure cases, but you want to send it to two addresses.

Instructions

  • Edit the execute_report_dag.py workflow.
  • Add the emails airflowalerts@datacamp.com and airflowadmin@datacamp.com to the appropriate key in default_args.
  • Set the failure email option to True.
  • Configure the success email to send you messages as well.

Solution

from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.sensors.file_sensor import FileSensor
from datetime import datetime

default_args={
    'email': ['airflowalerts@datacamp.com', 'airflowadmin@datacamp.com'],
    'email_on_failure': True,
    'email_on_success': True
}
report_dag = DAG(
    dag_id = 'execute_report',
    schedule_interval = "0 0 * * *",
    default_args=default_args
)

precheck = FileSensor(
    task_id='check_for_datafile',
    filepath='salesdata_ready.csv',
    start_date=datetime(2020,2,20),
    mode='reschedule',
    dag=report_dag)

generate_report_task = BashOperator(
    task_id='generate_report',
    bash_command='generate_report.sh',
    start_date=datetime(2020,2,20),
    dag=report_dag
)

precheck >> generate_report_task

Building production pipelines in Airflow

Working with templates