# [Introduction to Airflow in Python](https://app.datacamp.com/learn/courses/introduction-to-airflow-in-python) ###### tags: `DataCamp` Delivering data on a schedule can be a manual process. You write scripts, add complex cron tasks, and try various ways to meet an ever-changing set of requirements—and it’s even trickier to manage everything when working with teammates. Airflow can remove this headache by adding scheduling, error handling, and reporting to your workflows. In this course, you’ll master the basics of Airflow and learn how to implement complex data engineering pipelines in production. You'll also learn how to use Directed Acyclic Graphs (DAGs), automate data engineering workflows, and implement data engineering tasks in an easy and repeatable fashion—helping you to maintain your sanity. ## Intro to Airflow ### Introduction to Airflow * [Video](https://campus.datacamp.com/courses/introduction-to-airflow-in-python/intro-to-airflow?ex=1) ### Running a task in Airflow You've just started looking at using Airflow within your company and would like to try to run a task within the Airflow platform. You remember that you can use the `airflow run` command to execute a specific task within a workflow. Note that an error while using airflow run will return `airflow.exceptions.AirflowException`: on the last line of output. An Airflow DAG is set up for you with a dag_id of `etl_pipeline`. The task_id is `download_file` and the `start_date` is 2020-01-08. All other components needed are defined for you. Which command would you enter in the console to run the desired task? Ans: `airflow run etl_pipeline download_file 2020-01-08` ### Examining Airflow commands While researching how to use Airflow, you start to wonder about the `airflow` command in general. You realize that by simply running `airflow` you can get further information about various sub-commands that are available. Which of the following is NOT an Airflow sub-command? 1. list_dags 2. edit_dag (* not a command) 3. test 4. scheduler HELP: `airflow -h` ### Airflow DAGs * [Video](https://campus.datacamp.com/courses/introduction-to-airflow-in-python/intro-to-airflow?ex=4) ### Defining a simple DAG You've spent some time reviewing the Airflow components and are interested in testing out your own workflows. To start you decide to define the default arguments and create a DAG object for your workflow. The `DateTime` object has been imported for you. ***Instructions*** * Import the Airflow DAG object. Note that it is case-sensitive. * Define the `default_args` dictionary with a key owner and a value of 'dsmith'. * Add a `start_date` of January 14, 2020 to `default_args` using the value `1` for the month of January. Add a `retries` count of 2 to `default_args`. * Instantiate the DAG object to a variable called `etl_dag` with a DAG named `example_etl`. * Add the `default_args` dictionary to the appropriate argument. ***Solution*** ``` # Import the DAG object from airflow.models import DAG # Define the default_args dictionary default_args = { 'owner': 'dsmith', 'start_date': datetime(2020, 1, 14), 'retries': 2 } # Instantiate the DAG object etl_dag = DAG('example_etl', default_args=default_args) ``` ### Working with DAGs and the Airflow shell While working with Airflow, sometimes it can be tricky to remember what DAGs are defined and what they do. You want to gain some further knowledge of the Airflow shell command so you'd like to see what options are available. Multiple DAGs are already defined for you. How many DAGs are present in the Airflow system from the command-line? ***Solution:*** `airflow list_dags` ### Troubleshooting DAG creation Now that you've successfully worked with a couple workflows, you notice that sometimes there are issues making a workflow appear within Airflow. You'd like to be able to better troubleshoot the behavior of Airflow when there may be something wrong with the code. Two DAGs are defined for you and Airflow is setup. Note that any changes you make within the editor are automatically saved. ***Instructions:*** * Use the `airflow` shell command to determine which DAG is not being recognized correctly. * After you determine the broken DAG, open the file and fix any Python errors. * Once modified, verify that the DAG now appears within Airflow's output. ### Airflow web interface * [Video](https://campus.datacamp.com/courses/introduction-to-airflow-in-python/intro-to-airflow?ex=8) ### Starting the Airflow webserver You've successfully created some DAGs within Airflow using the command-line tools, but notice that it can be a bit tricky to handle scheduling / troubleshooting / etc. After reading the documentation further, you realize that you'd like to access the Airflow web interface. For security reasons, you'd like to start the webserver on port `9090`. Which `airflow` command would you use to start the webserver on port `9090`? Airflow is installed and accessible from the command line. Remember to use the `airflow -h` command if needed. `airflow <subcommand> -h` will provide further detail. ***Answer:*** `airflow webserver -p 9090` ### Navigating the Airflow UI To gain some familiarity with the Airflow UI, you decide to explore the various pages. You'd like to know what has happened on your Airflow instance thus far. Which of the following `events` have not run on your Airflow instance? ### Examining DAGs with the Airflow UI You've become familiar with the basics of an Airflow DAG and the basics of interacting with Airflow on the command-line. Your boss would like you to show others on your team how to examine any available DAGs. In this instance, she would like to know which operator is **NOT** in use with the DAG called `update_state`, as your team is trying to verify the components used in production workflows. Remember that the Airflow UI allows various methods to view the state of DAGs. The `Tree View` lists the tasks and any ordering between them in a tree structure, with the ability to compress / expand the nodes. The `Graph View` shows any tasks and their dependencies in a graph structure, along with the ability to access further details about task runs. The `Code view` provides full access to the Python code that makes up the DAG. Remember to select the operator NOT used in this DAG. ## Implementing Airflow DAGs ### Airflow operators * [Video](https://campus.datacamp.com/courses/introduction-to-airflow-in-python/implementing-airflow-dags?ex=1) ### Defining a BashOperator task The `BashOperator` allows you to specify any given Shell command or script and add it to an Airflow workflow. This can be a great start to implementing Airflow in your environment. As such, you've been running some scripts manually to clean data (using a script called `cleanup.sh`) prior to delivery to your colleagues in the Data Analytics group. As you get more of these tasks assigned, you've realized it's becoming difficult to keep up with running everything manually, much less dealing with errors or retries. You'd like to implement a simple script as an Airflow operator. The Airflow DAG `analytics_dag` is already defined for you and has the appropriate configurations in place. ***Instructions:*** * Import the `BashOperator` object. * Define a `BashOperator` called `cleanup` with the `task_id` of `cleanup_task`. * Use the command `cleanup.sh`. * Add the operator to the DAG. ***Solution:*** ``` # Import the BashOperator from airflow.operators.bash_operator import BashOperator # Define the BashOperator cleanup = BashOperator( task_id='cleanup_task', # Define the bash_command bash_command='cleanup.sh', # Add the task to the dag dag=analytics_dag ) ``` ### Multiple BashOperators Airflow DAGs can contain many operators, each performing their defined tasks. You've successfully implemented one of your scripts as an Airflow task and have decided to continue migrating your individual scripts to a full Airflow DAG. You now want to add more components to the workflow. In addition to the `cleanup.sh` used in the previous exercise you have two more scripts, `consolidate_data.sh` and `push_data.sh`. These further process your data and copy to its final location. The DAG `analytics_dag` is available as before, and your `cleanup` task is still defined. The `BashOperator` is already imported. ***Instructions:*** * Define a `BashOperator` called `consolidate`, to run `consolidate_data.sh` with a task_id of `consolidate_task`. * Add a final `BashOperator` called `push_data`, running `push_data.sh` and a `task_id` of `pushdata_task`. ***Solution:*** ``` # Define a second operator to run the `consolidate_data.sh` script consolidate = BashOperator( task_id='consolidate_task', bash_command='consolidate_data.sh', dag=analytics_dag) # Define a final operator to execute the `push_data.sh` script push_data = BashOperator( task_id='pushdata_task', bash_command='push_data.sh', dag=analytics_dag) ``` ### Airflow tasks * [Video](https://campus.datacamp.com/courses/introduction-to-airflow-in-python/implementing-airflow-dags?ex=4) ### Define order of BashOperators Now that you've learned about the bitshift operators, it's time to modify your workflow to include a pull step and to include the task ordering. You have three currently defined components, `cleanup`, `consolidate`, and `push_data`. The DAG `analytics_dag` is available as before and the `BashOperator` is already imported. ***Instructions:*** * Define a `BashOperator` called `pull_sales` with a bash command of `wget https://salestracking/latestinfo?json`. * Set the `pull_sales` operator to run before the `cleanup` task. * Configure `consolidate` to run next, using the downstream operator. * Set `push_data` to run last using either bitshift operator. ***Solution:*** ``` # Define a new pull_sales task pull_sales = BashOperator( task_id='pullsales_task', bash_command='wget https://salestracking/latestinfo?json', dag=analytics_dag ) # Set pull_sales to run prior to cleanup pull_sales >> cleanup # Configure consolidate to run after cleanup cleanup >> consolidate # Set push_data to run last consolidate >> push_data ``` ### Determining the order of tasks While looking through a colleague's workflow definition, you're trying to decipher exactly in which order the defined tasks run. The code in question shows the following: ``` pull_data << initialize_process pull_data >> clean >> run_ml_pipeline generate_reports << run_ml_pipeline ``` ***Instructions:*** * Order the tasks in the sequence defined by the bitshift code, with the first task to run on top and the last task to run on the bottom. ***Solution:*** ![image](https://i.imgur.com/3ousMO6.png) ### Troubleshooting DAG dependencies You've created a DAG with intended dependencies based on your workflow but for some reason Airflow won't load / execute the DAG. Try using the terminal to: * List the DAGs. * Decipher the error message. * Use cat `workspace/dags/codependent.py` to view the Python code. * Determine which of the following lines should be removed from the Python code. You may want to consider the last line of the file. ``` from airflow.models import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime default_args = { 'owner': 'dsmith', 'start_date': datetime(2020, 2, 12), 'retries': 1 } codependency_dag = DAG('codependency', default_args=default_args) task1 = BashOperator(task_id='first_task', bash_command='echo 1', dag=codependency_dag) task2 = BashOperator(task_id='second_task', bash_command='echo 2', dag=codependency_dag) task3 = BashOperator(task_id='third_task', bash_command='echo 3', dag=codependency_dag) # task1 must run before task2 which must run before task3 task1 >> task2 task2 >> task3 task3 >> task1 ``` ### Additional operators * [Video](https://campus.datacamp.com/courses/introduction-to-airflow-in-python/implementing-airflow-dags?ex=8) ### Using the PythonOperator You've implemented several Airflow tasks using the BashOperator but realize that a couple of specific tasks would be better implemented using Python. You'll implement a task to download and save a file to the system within Airflow. The `requests` library is imported for you, and the DAG `process_sales_dag` is already defined. ***Instructions:*** * Define a function called `pull_file` with two parameters, `URL` and `savepath`. * Use the `print()` function and Python f-strings to write a message to the logs. * Import the necessary object to use the Python Operator. * Create a new task assigned to the variable pull_file_task, with the id pull_file. * Add the pull_file(URL, savepath) function defined previously to the operator. * Define the arguments needed for the task. ***Solution:*** ``` def pull_file(URL, savepath): r = requests.get(URL) with open(savepath, 'wb') as f: f.write(r.content) # Use the print method for logging print(f"File pulled from {URL} and saved to {savepath}") from airflow.operators.python_operator import PythonOperator # Create the task pull_file_task = PythonOperator( task_id='pull_file', # Add the callable python_callable=pull_file, # Define the arguments op_kwargs={'URL':'http://dataserver/sales.json', 'savepath':'latestsales.json'}, dag=process_sales_dag ) ``` ### More PythonOperators To continue implementing your workflow, you need to add another step to parse and save the changes of the downloaded file. The DAG `process_sales_dag` is defined and has the `pull_file` task already added. In this case, the Python function is already defined for you, `parse_file(inputfile, outputfile)`. Note that often when implementing Airflow tasks, you won't necessarily understand the individual steps given to you. As long as you understand how to wrap the steps within Airflow's structure, you'll be able to implement a desired workflow. ***Instructions:*** * Define the Python task to the variable `parse_file_task` with the id `parse_file`. * Add the `parse_file(inputfile, outputfile)` to the Operator. * Define the arguments to pass to the callable. * Add the task to the DAG. ***Solution:*** ``` # Add another Python task parse_file_task = PythonOperator( task_id='parse_file', # Set the function to call python_callable=parse_file, # Add the arguments op_kwargs={'inputfile':'latestsales.json', 'outputfile':'parsedfile.json'}, # Add the DAG dag=process_sales_dag ) ``` ### EmailOperator and dependencies Now that you've successfully defined the PythonOperators for your workflow, your manager would like to receive a copy of the parsed JSON file via email when the workflow completes. The previous tasks are still defined and the DAG `process_sales_dag` is configured. ***Instructions:*** * Import the class to send emails. * Define the Operator and add the appropriate arguments (`to`, `subject`, `files`). * Set the task order so the tasks run sequentially (Pull the file, parse the file, then email your manager). ***Solution:*** ``` # Import the Operator from airflow.operators.email_operator import EmailOperator # Define the task email_manager_task = EmailOperator( task_id='email_manager', to='manager@datacamp.com', subject='Latest sales JSON', html_content='Attached is the latest sales JSON file as requested.', files='parsedfile.json', dag=process_sales_dag ) # Set the order of tasks pull_file_task >> parse_file_task >> email_manager_task ``` ### Airflow scheduling * [Video](https://campus.datacamp.com/courses/introduction-to-airflow-in-python/implementing-airflow-dags?ex=12) ### Schedule a DAG via Python You've learned quite a bit about creating DAGs, but now you would like to schedule a specific DAG on a specific day of the week at a certain time. You'd like the code include this information in case a colleague needs to reinstall the DAG to a different server. The Airflow `DAG` object and the appropriate `datetime` methods have been imported for you. ***Instructions:*** * Set the start date of the DAG to November 1, 2019. * Configure the retry_delay to 20 minutes. You will learn more about the timedelta object in Chapter 3. For now, you just need to know it expects an integer value. *Use the cron syntax to configure a schedule of every Wednesday at 12:30pm. ***Solution:*** ``` # Update the scheduling arguments as defined default_args = { 'owner': 'Engineering', 'start_date': datetime(2019, 11, 1), 'email': ['airflowresults@datacamp.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 3, 'retry_delay': timedelta(minutes=20) } dag = DAG('update_dataflows', default_args=default_args, schedule_interval='30 12 * * 3') ``` ### Deciphering Airflow schedules Given the various options for Airflow's `schedule_interval`, you'd like to verify that you understand exactly how intervals relate to each other, whether it's a cron format, `timedelta` object, or a preset. ***Instructions:*** * Order the schedule intervals from least to greatest amount of time. ***Solution:*** ![](https://i.imgur.com/o2aM9fe.png) ### Troubleshooting DAG runs You've scheduled a DAG called `process_sales` which is set to run on the first day of the month and email your manager a copy of the report generated in the workflow. The `start_date` for the DAG is set to February 15, 2020. Unfortunately it's now March 2nd and your manager did not receive the report and would like to know what happened. Use the information you've learned about Airflow scheduling to determine what the issue is. ## Maintaining and monitoring Airflow workflows ### Airflow sensors * [Video](https://campus.datacamp.com/courses/introduction-to-airflow-in-python/maintaining-and-monitoring-airflow-workflows?ex=1) ### Sensors vs operators As you've just learned about sensors, you want to verify you understand what they have in common with normal operators and where they differ. ***Instructions:*** * Move each entry into the Sensors, Operators, or Both bucket. ***Solution:*** ![](https://i.imgur.com/qpG6tRv.png) ### Sensory deprivation You've recently taken over for another Airflow developer and are trying to learn about the various workflows defined within the system. You come across a DAG that you can't seem to make run properly using any of the normal tools. Try exploring the DAG for any information about what it might be looking for before continuing. ### Airflow executors * [Video](https://campus.datacamp.com/courses/introduction-to-airflow-in-python/maintaining-and-monitoring-airflow-workflows?ex=4) ### Determining the executor While developing your DAGs in Airflow, you realize you're not certain the configuration of the system. Using the commands you've learned, determine which of the following statements is true. * Answer: `airflow list_dags` ### Executor implications You're learning quite a bit about running Airflow DAGs and are gaining some confidence at developing new workflows. That said, your manager has mentioned that on some days, the workflows are taking a lot longer to finish and asks you to investigate. She also mentions that the `salesdata_ready.csv` file is taking longer to generate these days and the time of day it is completed is variable. This exercise requires information from the previous two lessons - remember the implications of the available arguments and modify the workflow accordingly. Note that for this exercise, you're expected to modify one line of code, not add any extra code. ***Instructions:*** * Determine the level of parallelism available on this system. You can do this by listing dags (`airflow list_dags`). * Look at the source for the DAG file and fix which entry is causing the problem. ### Debugging and troubleshooting in Airflow * [Video](https://campus.datacamp.com/courses/introduction-to-airflow-in-python/maintaining-and-monitoring-airflow-workflows?ex=7) ### DAGs in the bag You've taken over managing an Airflow cluster that you did not setup and are trying to learn a bit more about the system. Which of the following is true? ### Missing DAG Your manager calls you before you're about to leave for the evening and wants to know why a new DAG workflow she's created isn't showing up in the system. She needs this DAG called execute_report to appear in the system so she can properly schedule it for some tests before she leaves on a trip. Airflow is configured using the ~/airflow/airflow.cfg file. ### SLAs and reporting in Airflow * [Video](https://campus.datacamp.com/courses/introduction-to-airflow-in-python/maintaining-and-monitoring-airflow-workflows?ex=7) ### Defining an SLA You've successfully implemented several Airflow workflows into production, but you don't currently have any method of determining if a workflow takes too long to run. After consulting with your manager and your team, you decide to implement an SLA at the DAG level on a test workflow. All appropriate Airflow libraries have been imported for you. ***Instructions*** * Import the timedelta object. * Define an SLA of 30 minutes. * Add the SLA to the DAG. ***Solution*** ``` # Import the timedelta object from datetime import timedelta # Create the dictionary entry default_args = { 'start_date': datetime(2020, 2, 20), 'sla': timedelta(minutes=30) } # Add to the DAG test_dag = DAG('test_workflow', default_args=default_args, schedule_interval='@None') ``` ### Defining a task SLA After completing the SLA on the entire workflow, you realize you really only need the SLA timing on a specific task instead of the full workflow. The appropriate Airflow libraries are imported for you. ***Instructions*** * Import the timedelta object. * Add a 3 hour SLA to the task object ***Solution*** ``` # Import the timedelta object from datetime import timedelta test_dag = DAG('test_workflow', start_date=datetime(2020,2,20), schedule_interval='@None') # Create the task with the SLA task1 = BashOperator(task_id='first_task', sla=timedelta(hours=3), bash_command='initialize_data.sh', dag=test_dag) ``` ### Generate and email a report Airflow provides the ability to automate almost any style of workflow. You would like to receive a report from Airflow when tasks complete without requiring constant monitoring of the UI or log files. You decide to use the email functionality within Airflow to provide this message. All the typical Airflow components have been imported for you, and a DAG is already defined as `dag`. ***Instructions*** * Define the proper operator for the `email_report` task. * Fill the missing details for the Operator. Use the file named `monthly_report.pdf`. * Set the `email_report` task to occur after the `generate_report` task. ***Solution*** ``` # Define the email task email_report = EmailOperator( task_id='email_report', to='airflow@datacamp.com', subject='Airflow Monthly Report', html_content="""Attached is your monthly workflow report - please refer to it for more detail""", files=["monthly_report.pdf"], dag=report_dag ) # Set the email task to run after the report is generated email_report << generate_report ``` ### Adding status emails You've worked through most of the Airflow configuration for setting up your workflows, but you realize you're not getting any notifications when DAG runs complete or fail. You'd like to setup email alerting for the success and failure cases, but you want to send it to two addresses. ***Instructions*** * Edit the `execute_report_dag.py` workflow. * Add the emails `airflowalerts@datacamp.com` and `airflowadmin@datacamp.com` to the appropriate key in `default_args`. * Set the failure email option to True. * Configure the success email to send you messages as well. ***Solution*** ``` from airflow.models import DAG from airflow.operators.bash_operator import BashOperator from airflow.contrib.sensors.file_sensor import FileSensor from datetime import datetime default_args={ 'email': ['airflowalerts@datacamp.com', 'airflowadmin@datacamp.com'], 'email_on_failure': True, 'email_on_success': True } report_dag = DAG( dag_id = 'execute_report', schedule_interval = "0 0 * * *", default_args=default_args ) precheck = FileSensor( task_id='check_for_datafile', filepath='salesdata_ready.csv', start_date=datetime(2020,2,20), mode='reschedule', dag=report_dag) generate_report_task = BashOperator( task_id='generate_report', bash_command='generate_report.sh', start_date=datetime(2020,2,20), dag=report_dag ) precheck >> generate_report_task ``` ## Building production pipelines in Airflow ### Working with templates * [Video](https://campus.datacamp.com/courses/introduction-to-airflow-in-python/building-production-pipelines-in-airflow?ex=1)