{"id":25886,"date":"2021-05-28T16:00:35","date_gmt":"2021-05-28T16:00:35","guid":{"rendered":"https:\/\/naiveskill.com\/?p=25886"},"modified":"2023-02-04T08:17:10","modified_gmt":"2023-02-04T08:17:10","slug":"airflow-slack-alert","status":"publish","type":"post","link":"https:\/\/naiveskill.com\/airflow-slack-alert\/","title":{"rendered":"Airflow slack alert | send slack alerts from the airflow in 2023"},"content":{"rendered":"\n
Setup Notification is as essential as deploying an application in production. Notification will keep you informed about the work which you care about. In this blog, we are going to send an airflow slack alert<\/strong>. To follow along, I assume that airflow is set up in your system; if not, please follow this<\/a> link to set up the airflow<\/strong>.<\/p>\n\n\n\n Before sending an airflow slack alert, let’s first understand a callback in apache airflow.<\/p>\n\n\n\n On failure, callbacks play a vital role if you want to perform any action on DAG final state. The two important callbacks in airflow are on_failure_callback and on_success_callback.<\/p>\n\n\n\n Below is the sample DAG, which demonstrates the use of the failure callback<\/p>\n\n\n\n Now we have a basic idea about the callback. We will set up a slack channel in the next session and create an airflow DAG to send a callback whenever DAG fails.<\/p>\n\n\n\n We will set up a slack channel in this session, and we will be generating a webhook to send airflow slack alerts. If you already have a slack channel, you can skip this session.<\/p>\n\n\n\n To create a slack channel, go to this URL<\/a> and then click on try for free.<\/p>\n\n\n\nAirflow on failure callback<\/h2>\n\n\n\n
from airflow.exceptions import AirflowException\nfrom airflow.models import DAG\nfrom airflow.operators.bash_operator import BashOperator\nfrom airflow.operators.python_operator import PythonOperator\nfrom datetime import datetime\n\n\ndef on_failure_callback(context):\n dag_run = context.get('dag_run')\n task_instances = dag_run.get_task_instances()\n print(f\"The task:{task_instances} failed\")\n\n\ndag = DAG(\n dag_id='dag_with_templated_dir',\n start_date=datetime(2020, 1, 1),\n on_failure_callback=on_failure_callback,\n catchup=False,\n max_active_runs=1,\n default_args={\n 'on_failure_callback': on_failure_callback,\n }\n)\n\nexample_task = \\\n BashOperator(\n task_id='bash_task',\n bash_command='dates',\n dag=dag\n )\n\nexample_python_task = \\\n PythonOperator(\n task_id='python_task',\n on_failure_callback=on_failure_callback,\n dag=dag\n )<\/pre>\n\n\n\n
Setup slack workspace and channel<\/h2>\n\n\n\n