Zephyrnet Logo

Apache Airflow: How to Dynamically Fetch Data and Email?

Date:

This article was published as a part of the Data Science Blogathon.

Introduction

Automating redundant jobs with workflow management tools saves a considerable amount of time and resources. Apache Airflow is currently the market leader in workflow management tools. Airflow is open-source and comes pre-packed with many operators, hooks, sensors, and much more, covering a diverse set of external services.

Source: Photo by Andrew Pons on Unsplash

Airflow is a platform developed by the python community that allows connecting numerous data sources to analyze and extract meaning values. Airflow maintains lineage using DAGs and simplifies the data/ML engineer’s jobs allowing them to architect use-cases into automated workflows.

This post aims to showcase how to extract data from online sources to analyze and email the teams for performing use-case-specific actions.

Prerequisites

  • Airflow and SMTP setup.
  • python3

Requirement

The use case contemplates that our data teams must fetch the data from the source to analyze and build models around the data. The files in the source location need parsing to determine whether they contain relevant data or are empty.

We will email the files to our data teams based on the file content availability as presented.

apache airflow

Workflow Development

Reading the files from sources can be tricky and complex without a reliable and secure-auth mechanism. Concerning this demo for the greater audience, we will extract publicly available statistical data from a secure platform to make the data extraction simpler.

Data needs to be stored somewhere for reference before being analyzed. For simplicity and ease of implementation, let’s set up a file destination to save our data using airflow.cfg logging path.

Apache Airflow provides a configuration library similar to python ConfigParser to read the values. Using this library, we can get the values keys of the element.

import requests
from airflow.configuration import conf

files_dir = conf.get("logging","base_log_folder")

def get_file(url: str):
    resp = requests.get(url)
    with open(f"{files_dir}/file_name.csv","wb") as f:
        f.write(resp.content)

With our get_files method in place, the next step is to check for the file contents. The approach we take is to open all the files from our files_dir, read lines and validate for the length to ensure that our files contain data before sending the files to the data team.

from os import listdir

from os.path import isfile, join

files = [f for f in listdir(files_dir) if isfile(join(files_dir, f))]

def check_all_files():
    get_file("https://stats.govt.nz/assets/Uploads/Annual-enterprise-survey/
             Annual-enterprise-survey-2021-financial-year-provisional/Download-data/
             annual-enterprise-survey-2021-financial-year-provisional-size-bands-csv.csv")
    get_file("https://stats.govt.nz/assets/Uploads/Business-operations-survey/
             Business-operations-survey-2021/Download-data/bos2021ModC.csv")
    get_file("Business-operations-survey/Business-operations-survey-2021/
              Download-data/bos2021ModC.csv")

    def is_file_empty(file_name: str):
        try:
            with open(file_name, 'r+') as file:
                data = []
                for line in file:
                    data.append(line.replace('n', ''))
                if len(data) <= 1:
                    return True
                else:
                    return False
        except:
            return 'ERROR: FILE_NOT_FOUND'

check_all_files method will iteratively call the custom get_files method to download publicly available datasets. We have a sub-method defined inside check_all_files that opens all the downloaded files recursively to check the length of the files and returns the state of the file based on the file length.

The next step is to devise a mechanism to trigger the email alert based on the return type of our check_all_files method. For statefulness, we will maintain global variable results to capture the boolean equivalent of our is_empty_file method call return type. Now we will loop over the files and append the return type to the results list. We will use this results variable to check if our files have content or are empty to log the state.

results = []

for file in files:
        global results
        results.append(is_file_empty(file))

if results[0] == True and results[1] == True and results[2] == True :
    return 'NO UPDATES'
else:
    result = []
    for index, data in enumerate(results):
        if data == False:
            result.append( files[index])
    if len(result) == 0:
        return 'ERROR: FILES_NOT_FOUND'
    else:
        return result

We will call the above function using a PythonOperator.

get_files=PythonOperator(
        task_id='get_files',
        python_callable=check_all_files
    )

Now we will use the return state from the check_all_files condition and architect airflow BranchPythonOperator. The check_for_email method expects a task instance and will pull the files dynamically during runtime using the xcom_pull class.

def check_for_email(ti):
    check_files= ti.xcom_pull(task_ids='get_files',key='check_files')
    if check_files == "NO UPDATES":
        return 'no_update'
    elif check_files == 'ERROR: FILES_NOT_FOUND':
        return 'file_not_found'
    else:
        return 'email_alert'

The branch python operator can be an excellent fit to trigger operators based on the conditions and skip the rest. The operator accepts a python_callable that returns a task_id, and this task_id is referred to and is treated as the main element in branching the method.

check_if_file_is_empty = BranchPythonOperator(
        task_id="get_and_check_file_contents",
        python_callable=check_for_email,
        provide_context=True
    )

Upon successful branching, we will enter into either dummy operator or PythonOperator via branch python operator. Next, we will send the emails based on the actual file contents. We will leverage the task instance here to fetch the files from in-memory. Appending the files received from the xcom_pull with the state of the files, we can trigger an email to be delivered with a predefined template as below.

def email_actual_file(ti, **kwargs):
    check_all_files()
    pull_files = ti.xcom_pull(task_ids='get_files')
    actual_files = []
    file_status = []
    for (content_file, status) in zip(pull_files, results):
        if status == False:
            actual_files.append(content_file)
            file_status.append(status)
    if any(file_status) == False:
            email = EmailOperator( task_id="email_alert",
                                  to='[email protected]',
                                  subject="CMS Alert on Daily Update for ASP Pricing, ASP crosswalk and NOC",
                                  html_content='''
Hello, Here is an update. Check attachment''',
                                  #dag = context.get("dag"),
                                  files=[*actual_files])
            email.execute(context=kwargs)

Functions are defined; now, adding all the operators into a DAG and changing them for execution is the next step of our use case.

Apache Airflow Dag

As is the mandate and best practice, we will define defaults args and declare the operators inside the dag. The flag render_template_as_native_obj has to be set as True to fetch multiple files from the email operator. The custom email operator is expecting **kwargs as context; we need to set the provide_context flag to be true to accept the **kwargs during runtime.

from airflow.operators.email_operator import EmailOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python import BranchPythonOperator
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

default_agrs={"owner":"Jay",
              "email_on_failure": True,
              "start_date":"2022, 01, 01"}

with DAG(dag_id='cms-asp_pricing_pg',default_args=default_agrs,
         render_template_as_native_obj=True,
         schedule_interval='@daily') as dag:

    get_files=PythonOperator(
        task_id='get_files',
        python_callable=check_all_files
    )

    check_if_file_is_empty = BranchPythonOperator(
        task_id="get_and_check_file_contents",
        python_callable=check_for_email,
        provide_context=True
    )

    no_update = DummyOperator(
        task_id='no_update'
    )

    file_not_found = DummyOperator(
    	task_id='file_not_found'
    )

    email_alert = PythonOperator(task_id = "email_alert",
                                python_callable=email_actual_file,
                                provide_context=True)

    get_files  >> check_if_file_is_empty >> [no_update,file_not_found, email_alert]

Output

apache airflow

Conclusion

Apache Airflow is the most popular workflow management tool out there. Organizations are adopting the airflow into their tech stack and treating airflow as a critical and unavoidable platform to orchestrate and automate workflows with efficiency.

The post took us on an exciting ride about how we can develop Apache Airflow dag to attain data extraction and forward the data via email.

  • We architected a mechanism to download publicly available datasets, parse them and validate the file contents.

  • We have devised a method to leverage the Airflow branch python operator for solving our use case-specific problem.

  • We have developed a method to filter and email only the files which contain data.

  • In the end, we combined all the operators and built a DAG, passed python methods as callable, and chained the tasks together into a working DAG.

The media shown in this article is not owned by Analytics Vidhya and is used at the Author’s discretion.

spot_img

Latest Intelligence

spot_img

Chat with us

Hi there! How can I help you?