send email from airflow

Send email from airflow | Setup airflow email in 2023

This tutorial will explain how to send email from airflow using the airflow email operator. Airflow is the best open-source workflow management platform. Airbnb developed airflow in 2014 and later open-sourced it to the Apache community.

The user can create the data pipeline with airflow and schedule it to run at particular intervals. We can perform multiple operations in airflow using a wide variety of operators.

Please follow this link to install airflow in your system. I will be using Gmail to send emails. You can follow the same steps for another email provider. Now let’s set up our Gmail to send an email.

Airflow email notification

First, we need to set up the SMTP configuration to send email from airflow. We will use Gmail to set up the SMTP user and password in this session.

Airflow SMTP configuration

Goto this URL and log in with your username and password. On the successful login, you will see the below window.

NOTE: If you are not getting this window, please enable the 2-step verification for your Gmail account.

Now select app and device and click on generate. I am using MacBook to generate passwords.

An app password will be generated for your system. make sure to copy it somewhere safe and do not share this information with anyone.

Now our Gmail setup is completed. Let’s proceed further and set up the airflow.

Setup airflow config file to send email

To send an email from airflow, we need to add the SMTP configuration in the airflow.cfg file. The Airflow configuration file can be found under the path.

airflow$ cd $AIRFLOW_HOME

airflow$ ls -ltr
total 56
drwxr-xr-x 2 default root    64 May 22 05:21 dags
drwxr-xr-x 2 default root    64 May 22 05:21 plugins
-rw-rw-r-- 1 default root 42186 May 22 05:58 airflow.cfg
-rw-rw-r-- 1 default root  4700 May 22 05:58 webserver_config.py
-rw-r--r-- 1 default root     2 May 22 06:01 airflow-worker.pid
drwxr-xr-x 5 default root   160 May 22 06:18 logs

Now edit the airflow.cfg file and modify the Smtp properties. Update smtp_user, smtp_port,smtp_mail_from and smtp_password.

NOTE: Pass the password which we have generated above insmtp_password

[smtp]

# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = smtp.gmail.com
smtp_starttls = True
smtp_ssl = False
# Example: smtp_user = airflow
smtp_user = [email protected]
# Example: smtp_password = airflow
smtp_password = <pswd>
smtp_port = 587
smtp_mail_from = [email protected]
smtp_timeout = 30
smtp_retry_limit = 5

Save the configuration file and restart the airflow instance. Now let’s proceed further and create an airflow job

Email operator airflow example

To create the airflow job, first, we need to import EmailOperator

from airflow.operators.email import EmailOperator

Let’s create an EmailOperator with the following properties

    send_email_notification= EmailOperator(
        task_id="send_test_email",
        to= "&lt;to_email_address&gt;",
        subject="Test email",
        html_content="&lt;h2&gt;this is test email&lt;\h2&gt;" 

The complete code can be found below:

from airflow.operators.email import EmailOperator
from datetime import datetime, timedelta

from airflow import DAG

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    'email_tutorial',
    default_args=default_args,
    description='A simple email ',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2021,1,1),
    catchup=False
) as dag:

    send_email_notification= EmailOperator(
        task_id="send_test_email",
        to= "<to_email_address>",
        subject="Test email",
        html_content="<h2>this is test email"   
    )

create a file name testemail.py inside your dag folder and paste the following content. Make sure to update the to_email_address and save the file.

Now go to the airflow URL and verify the DAG.

Now, let’s open the DAG and go to the tree view.

Let’s trigger the job manually.

Verify the job status. It must be successfully run.

Now check your inbox and see if you have received any emails.

Awesome, We have successfully sent an email from airflow.

Send email from airflow on Success

Now that we know how to send an email let’s proceed further and understand how to send custom emails from airflow using EmailOperator.

To send the custom alert, we need to define the callbacks for success and failure, and inside those callbacks, we will implement logic to send an email.

The airflow dag with callback looks like below

from airflow.operators.email import EmailOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
from airflow import DAG

def failure_function(context):
    dag_run = context.get('dag_run')
    msg = "DAG Failed"
    subject = f"DAG {dag_run} Failed"
    EmailOperator([email protected], subject=subject, html_content=msg)

def success_function(context):
    dag_run = context.get('dag_run')
    msg = "DAG ran successfully"
    subject = f"DAG {dag_run} has completed"
    EmailOperator([email protected], subject=subject, html_content=msg)


default_args = {
    'owner': 'airflow'   
}

with DAG(
    'check_pass_fail',
    default_args=default_args,
    description='A simple job to nofity user on DAG pass/failure',
    schedule_interval='@daily',
    start_date=datetime(2021,1,1),
    catchup=False
) as dag:

    bash_date = BashOperator(
        task_id='bash_date',
        'on_failure_callback': failure_function,
        'on_success_callback': success_function
        bash_command='dates' 
    )

    bash_list = BashOperator(
        task_id='bash_list',
        'on_failure_callback': failure_function,
        'on_success_callback': success_function
        bash_command='ls -ltr' 
    )

Code explanation

  • Firstly we need to import important packages like EmailOperator, BashOperator, DAG, etc.
  • In the next step, we have defined a function called failure_function which used to send an email on on_failure_callback
  • Similarly, we have defined another function success_function to send an email on on_success_callback
  • Then we have defined two tasks bash_date and bash_list and inside those tasks, we are using on_failure_callback and on_success_callback to call the success and failure function which will send an email if the task fails.

Send email from airflow on failure

Most of the time, we want to get notified if any of our airflow DAGs fail. This session will help understand how to send an email from airflow whenever a failure happens without EmailOperator.

Airflows come with the email_on_failure property, which you can set to send an email if the job fails; you do not need any external operator to perform the task.

For this demo, I am creating an airflow job that will send an email when any tasks fail. Copy-paste the below code snippet in the email_failure.py file.

from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
from airflow import DAG

default_args = {
    'owner': 'airflow',
    'email': '[email protected]',
    'email_on_failure': True
}

with DAG(
    'failure_email_dag',
    default_args=default_args,
    description='A simple email on failure',
    schedule_interval='@daily',
    start_date=datetime(2021,1,1),
    catchup=False
) as dag:

    simple_bash = BashOperator(
        task_id='simple_bash_task',
        bash_command='dates' 
    )

Code Explanation

  • Initially, we have to import important libraries like EmailOperator, DAG, etc.
  • In default args, we need to enable the email_on_failure‘ flag to True and provide the email address. This property will send an email whenever DAGs failed.
  • In the final step, we are defining a simple bash task using a bash operator and print the date.

NOTE: In the final step, I have purposefully made a typo in the date command to fail the airflow jobs.

Trigger the airflow DAG from the UI. It will send an email in the below format if the DAG fails.

Conclusion

As the developer uses airflow to run multiple batch jobs in production, setting up any alert notification for all your jobs is highly recommended. Getting notified via email is the most popular one out of all notifications.

I hope you like this tutorial, and that you have learned how to send an email from airflow. Please do let me know if you face any issues with sending emails in the comment box.

More to read?

How to send a slack alert using airflow

Airflow commands

Install airflow using the docker

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top