airflow scheduler

Complete airflow scheduler tutorial with examples in 2023

In this tutorial, we will learn everything about the airflow scheduler. We will understand the airflow scheduler with multiple examples. So let’s get started.

What is an airflow scheduler?

airflow scheduler

In Apache airflow, a schedular is a very critical component. The scheduler is responsible for managing the execution of airflow DAGs(Directed Acyclic Graphs).

The scheduler’s work is to continuously query the metadata database to identify which tasks are ready to be executed based on dependencies and the schedule defined in the DAG. Schedular then sends instructions to the executor to start running the tasks.

Scheduler makes sure that tasks are executed in the correct order. It also supports backfilling and catchup features, allowing users to schedule and execute past and missed DAG runs. The airflow scheduler is highly configurable, allowing users to set various parameters.

Airflow scheduler configuration

The schedular in airflow can be configured using the airflow.cfg property file. airflow.cfg file contains a variety of settings that control the behavior of the Airflow scheduler

Airflow scheduler parameters

The airflow scheduler has various configuration parameters that can be customized to fulfill the user’s requirement. Some of the most commonly used parameters are:

  • dag_dir_list_interval: interval for checking for new DAGs in the DAG directory
  • catchup_by_default: whether new DAG runs should backfill past intervals by default
  • run_duration: maximum duration of a single DAG run before it is terminated
  • num_runs: maximum number of queued DAG runs allowed
  • max_threads: maximum number of worker threads used for task execution
  • parallelism: maximum number of tasks allowed to run in parallel by the scheduler
  • dag_concurrency: maximum number of active DAG runs for a single DAG
  • min_file_process_interval: minimum interval at which new DAG files are processed
  • dag_orientation: direction of task execution within a DAG (top-down or bottom-up)

These parameters can be set via airflow.cfg configuration files or passed as command-line arguments when starting the Airflow schedule.

Users can also directly set some of these configuration parameters in their DAG definition using the default_args dictionary. Let’s take the below example

from datetime import datetime, timedelta
from airflow import DAG

default_args = {
    'owner': 'naiveskill',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 20),
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 2,
    'retry_delay': timedelta(minutes=2),
    'concurrency': 3,
}

my_dag = DAG(
	'my_dag',
	default_args=default_args,
	schedule_interval=timedelta(hours=1)
)

In this example, the concurrency parameter is set to 3, which means that a maximum of 4 tasks can be executed in parallel for this DAG

Airflow scheduler example

Let’s understand how the airflow schedular works with below example:-

Suppose you have a data pipeline consisting of two tasks: first_task and second_task. second_task depends on the first_task being completed successfully.

You can define these tasks in an Airflow DAG as follows:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'naiveskill',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 20),
    'email_on_failure': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

my_dag = DAG(
	'schedular_dag',
	default_args=default_args,
	schedule_interval=timedelta(days=1)
)

first_task = BashOperator(
    task_id='first_task',
    bash_command='echo "Hello first_task"',
    dag=my_dag,
)

second_task = BashOperator(
    task_id='second_task',
    bash_command='echo "Hello second_task"',
    dag=my_dag,
)

first_task >> second_task
  • In this example, first_task prints a message to the console, while second_task depends on first_task. second_task will only start once the first_task is successful.
  • When you run this DAG, the Airflow scheduler polls the metadata database to identify which tasks are ready to be executed. Since first_task has no dependencies, it can start immediately. The scheduler then sends instructions to the executor to begin running first_task.
  • Once first_task has been completed successfully, the Airflow scheduler identifies that second_task can now be executed since its dependency first_task has been completed. It sends instructions to the executor to start running second_task.
  • In this way, the Airflow scheduler manages the execution of tasks in the DAG, ensuring that dependencies are met and tasks are executed in the correct order.

Airflow scheduler cron syntax

In Apache Airflow, you can schedule DAGs using cron expressions. The cron expression must be defined under the schedule_interval of the DAG property. Airflow support standard cron syntax.

Below is an example of a cron expression that runs a DAG every day at 5 AM:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 20),
    'email_on_failure': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

my_dag = DAG(
    'my_dag',
    default_args=default_args,
    schedule_interval='0 5 * * *')

sample_task = BashOperator(
    task_id='task_a',
    bash_command='echo "hello world"',
    dag=my_dag,
)


sample_task

Airflow scheduler interval

The schedule_interval parameter in airflow is used to specify how often the DAG should be run and can be set to a number of different time-based intervals.

The schedule_interval can be set to either a timedelta object, a cron expression, or one of the predefined scheduling constants available in Airflow. Some examples of predefined scheduling constants include:

  • @once: Run the DAG only once
  • @hourly: Run the DAG every hour
  • @daily: Run the DAG once per day at midnight
  • @weekly: Run the DAG once per week on Sunday at midnight
  • @monthly: Run the DAG once per month on the first day of the month at midnight
  • @yearly: Run the DAG once per year on January 1st at midnight

For example, to set the DAG to run once per day, you could use a timedelta object to specify a 24-hour interval, like below:

from datetime import datetime, timedelta
from airflow import DAG

default_args = {
    'owner': 'my_username',
    'start_date': datetime(2023, 2, 20),
    'retries': 0,
}

dag = DAG(
    'my_dag',
    default_args=default_args,
    schedule_interval=timedelta(days=1)
)

How to restart an airflow scheduler

In Apache Airflow, you can restart a scheduler by using the airflow scheduler command.

airflow scheduler

When you run the scheduler, it will look for any DAGs that are due to be run and schedule them accordingly. If the scheduler is already running, you can stop it and then restart it using the same command.

It’s worth noting that restarting the scheduler will not re-run any tasks that have already been executed.

You can also restart the scheduler by restarting the Airflow webserver, as the webserver and scheduler processes are typically started together.

airflow webserver -p

Conclusion

In conclusion, the scheduler is a crucial component of Apache Airflow that allows for the automation of data processing workflows. It offers a range of scheduling options, including cron and interval-based scheduling, and can be configured with various parameters to optimize performance and manage resource utilization.

More to Explore

Airflow BashOperator

Airflow database

Airflow DummyOperator

Airflow PythonOperator

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top