DataCamp
Delivering data on a schedule can be a manual process. You write scripts, add complex cron tasks, and try various ways to meet an ever-changing set of requirements—and it’s even trickier to manage everything when working with teammates. Airflow can remove this headache by adding scheduling, error handling, and reporting to your workflows. In this course, you’ll master the basics of Airflow and learn how to implement complex data engineering pipelines in production. You'll also learn how to use Directed Acyclic Graphs (DAGs), automate data engineering workflows, and implement data engineering tasks in an easy and repeatable fashion—helping you to maintain your sanity.
You've just started looking at using Airflow within your company and would like to try to run a task within the Airflow platform. You remember that you can use the airflow run
command to execute a specific task within a workflow. Note that an error while using airflow run will return airflow.exceptions.AirflowException
: on the last line of output.
An Airflow DAG is set up for you with a dag_id of etl_pipeline
. The task_id is download_file
and the start_date
is 2020-01-08. All other components needed are defined for you.
Which command would you enter in the console to run the desired task?
Ans: airflow run etl_pipeline download_file 2020-01-08
While researching how to use Airflow, you start to wonder about the airflow
command in general. You realize that by simply running airflow
you can get further information about various sub-commands that are available.
Which of the following is NOT an Airflow sub-command?
HELP: airflow -h
You've spent some time reviewing the Airflow components and are interested in testing out your own workflows. To start you decide to define the default arguments and create a DAG object for your workflow.
The DateTime
object has been imported for you.
Instructions
default_args
dictionary with a key owner and a value of 'dsmith'.start_date
of January 14, 2020 to default_args
using the value 1
for the month of January.retries
count of 2 to default_args
.etl_dag
with a DAG named example_etl
.default_args
dictionary to the appropriate argument.Solution
While working with Airflow, sometimes it can be tricky to remember what DAGs are defined and what they do. You want to gain some further knowledge of the Airflow shell command so you'd like to see what options are available.
Multiple DAGs are already defined for you. How many DAGs are present in the Airflow system from the command-line?
Solution:
airflow list_dags
Now that you've successfully worked with a couple workflows, you notice that sometimes there are issues making a workflow appear within Airflow. You'd like to be able to better troubleshoot the behavior of Airflow when there may be something wrong with the code.
Two DAGs are defined for you and Airflow is setup. Note that any changes you make within the editor are automatically saved.
Instructions:
airflow
shell command to determine which DAG is not being recognized correctly.You've successfully created some DAGs within Airflow using the command-line tools, but notice that it can be a bit tricky to handle scheduling / troubleshooting / etc. After reading the documentation further, you realize that you'd like to access the Airflow web interface. For security reasons, you'd like to start the webserver on port 9090
.
Which airflow
command would you use to start the webserver on port 9090
?
Airflow is installed and accessible from the command line. Remember to use the airflow -h
command if needed. airflow <subcommand> -h
will provide further detail.
Answer:
airflow webserver -p 9090
To gain some familiarity with the Airflow UI, you decide to explore the various pages. You'd like to know what has happened on your Airflow instance thus far.
Which of the following events
have not run on your Airflow instance?
You've become familiar with the basics of an Airflow DAG and the basics of interacting with Airflow on the command-line. Your boss would like you to show others on your team how to examine any available DAGs. In this instance, she would like to know which operator is NOT in use with the DAG called update_state
, as your team is trying to verify the components used in production workflows.
Remember that the Airflow UI allows various methods to view the state of DAGs. The Tree View
lists the tasks and any ordering between them in a tree structure, with the ability to compress / expand the nodes. The Graph View
shows any tasks and their dependencies in a graph structure, along with the ability to access further details about task runs. The Code view
provides full access to the Python code that makes up the DAG.
Remember to select the operator NOT used in this DAG.
The BashOperator
allows you to specify any given Shell command or script and add it to an Airflow workflow. This can be a great start to implementing Airflow in your environment.
As such, you've been running some scripts manually to clean data (using a script called cleanup.sh
) prior to delivery to your colleagues in the Data Analytics group. As you get more of these tasks assigned, you've realized it's becoming difficult to keep up with running everything manually, much less dealing with errors or retries. You'd like to implement a simple script as an Airflow operator.
The Airflow DAG analytics_dag
is already defined for you and has the appropriate configurations in place.
Instructions:
BashOperator
object.BashOperator
called cleanup
with the task_id
of cleanup_task
.cleanup.sh
.Solution:
Airflow DAGs can contain many operators, each performing their defined tasks.
You've successfully implemented one of your scripts as an Airflow task and have decided to continue migrating your individual scripts to a full Airflow DAG. You now want to add more components to the workflow. In addition to the cleanup.sh
used in the previous exercise you have two more scripts, consolidate_data.sh
and push_data.sh
. These further process your data and copy to its final location.
The DAG analytics_dag
is available as before, and your cleanup
task is still defined. The BashOperator
is already imported.
Instructions:
BashOperator
called consolidate
, to run consolidate_data.sh
with a task_id of consolidate_task
.BashOperator
called push_data
, running push_data.sh
and a task_id
of pushdata_task
.Solution:
Now that you've learned about the bitshift operators, it's time to modify your workflow to include a pull step and to include the task ordering. You have three currently defined components, cleanup
, consolidate
, and push_data
.
The DAG analytics_dag
is available as before and the BashOperator
is already imported.
Instructions:
BashOperator
called pull_sales
with a bash command of wget https://salestracking/latestinfo?json
.pull_sales
operator to run before the cleanup
task.consolidate
to run next, using the downstream operator.push_data
to run last using either bitshift operator.Solution:
While looking through a colleague's workflow definition, you're trying to decipher exactly in which order the defined tasks run. The code in question shows the following:
Instructions:
Solution:
You've created a DAG with intended dependencies based on your workflow but for some reason Airflow won't load / execute the DAG. Try using the terminal to:
workspace/dags/codependent.py
to view the Python code.You've implemented several Airflow tasks using the BashOperator but realize that a couple of specific tasks would be better implemented using Python. You'll implement a task to download and save a file to the system within Airflow.
The requests
library is imported for you, and the DAG process_sales_dag
is already defined.
Instructions:
pull_file
with two parameters, URL
and savepath
.print()
function and Python f-strings to write a message to the logs.Solution:
To continue implementing your workflow, you need to add another step to parse and save the changes of the downloaded file. The DAG process_sales_dag
is defined and has the pull_file
task already added. In this case, the Python function is already defined for you, parse_file(inputfile, outputfile)
.
Note that often when implementing Airflow tasks, you won't necessarily understand the individual steps given to you. As long as you understand how to wrap the steps within Airflow's structure, you'll be able to implement a desired workflow.
Instructions:
parse_file_task
with the id parse_file
.parse_file(inputfile, outputfile)
to the Operator.Solution:
Now that you've successfully defined the PythonOperators for your workflow, your manager would like to receive a copy of the parsed JSON file via email when the workflow completes. The previous tasks are still defined and the DAG process_sales_dag
is configured.
Instructions:
to
, subject
, files
).Solution:
You've learned quite a bit about creating DAGs, but now you would like to schedule a specific DAG on a specific day of the week at a certain time. You'd like the code include this information in case a colleague needs to reinstall the DAG to a different server.
The Airflow DAG
object and the appropriate datetime
methods have been imported for you.
Instructions:
Given the various options for Airflow's schedule_interval
, you'd like to verify that you understand exactly how intervals relate to each other, whether it's a cron format, timedelta
object, or a preset.
Instructions:
Solution:
You've scheduled a DAG called process_sales
which is set to run on the first day of the month and email your manager a copy of the report generated in the workflow. The start_date
for the DAG is set to February 15, 2020. Unfortunately it's now March 2nd and your manager did not receive the report and would like to know what happened.
Use the information you've learned about Airflow scheduling to determine what the issue is.
As you've just learned about sensors, you want to verify you understand what they have in common with normal operators and where they differ.
Instructions:
Solution:
You've recently taken over for another Airflow developer and are trying to learn about the various workflows defined within the system. You come across a DAG that you can't seem to make run properly using any of the normal tools. Try exploring the DAG for any information about what it might be looking for before continuing.
While developing your DAGs in Airflow, you realize you're not certain the configuration of the system. Using the commands you've learned, determine which of the following statements is true.
airflow list_dags
You're learning quite a bit about running Airflow DAGs and are gaining some confidence at developing new workflows. That said, your manager has mentioned that on some days, the workflows are taking a lot longer to finish and asks you to investigate. She also mentions that the salesdata_ready.csv
file is taking longer to generate these days and the time of day it is completed is variable.
This exercise requires information from the previous two lessons - remember the implications of the available arguments and modify the workflow accordingly. Note that for this exercise, you're expected to modify one line of code, not add any extra code.
Instructions:
airflow list_dags
).You've taken over managing an Airflow cluster that you did not setup and are trying to learn a bit more about the system. Which of the following is true?
Your manager calls you before you're about to leave for the evening and wants to know why a new DAG workflow she's created isn't showing up in the system. She needs this DAG called execute_report to appear in the system so she can properly schedule it for some tests before she leaves on a trip.
Airflow is configured using the ~/airflow/airflow.cfg file.
You've successfully implemented several Airflow workflows into production, but you don't currently have any method of determining if a workflow takes too long to run. After consulting with your manager and your team, you decide to implement an SLA at the DAG level on a test workflow.
All appropriate Airflow libraries have been imported for you.
Instructions
Solution
After completing the SLA on the entire workflow, you realize you really only need the SLA timing on a specific task instead of the full workflow.
The appropriate Airflow libraries are imported for you.
Instructions
Solution
Airflow provides the ability to automate almost any style of workflow. You would like to receive a report from Airflow when tasks complete without requiring constant monitoring of the UI or log files. You decide to use the email functionality within Airflow to provide this message.
All the typical Airflow components have been imported for you, and a DAG is already defined as dag
.
Instructions
email_report
task.monthly_report.pdf
.email_report
task to occur after the generate_report
task.Solution
You've worked through most of the Airflow configuration for setting up your workflows, but you realize you're not getting any notifications when DAG runs complete or fail. You'd like to setup email alerting for the success and failure cases, but you want to send it to two addresses.
Instructions
execute_report_dag.py
workflow.airflowalerts@datacamp.com
and airflowadmin@datacamp.com
to the appropriate key in default_args
.Solution