airflow command

Airflow command line | Airflow command developer should be aware of

Apache airflow is an excellent open-source tool that lets you manage and run a complex data pipeline. Airflow has a straightforward user interface as well, using which we can easily manage DAG and can add users and configuration. Airflow also has a very rich UI that allows for many types of operation on a DAG. We can also perform various admin tasks with the help of the airflow command line(CLI).

In this tutorial, I will be explaining how to use the airflow command-line interface. Please follow this link to set up the airflow.

Check airflow version

To check the airflow version, connect to the server where airflow is installed and type the below command. To get the latest features, you recommended using the latest airflow version.

airflow version
2.1.0

Airflow config

With the help of the airflow config list, you will get complete information about the airflow configs.

Config command gives information about the DAG folders, logging, metrics, API, etc. It is a compelling command if you wish to verify/check your airflow configurations.

airflow config list
[core]
dags_folder = /opt/airflow/dags
hostname_callable = socket.getfqdn
default_timezone = utc
executor = CeleryExecutor
sql_alchemy_conn = postgresql+psycopg2://airflow:[email protected]/airflow
sql_engine_encoding = utf-8
.......<suppressed o/p>...........
.......<suppressed o/p>...........
[smart_sensor]
use_smart_sensor = False
shard_code_upper_limit = 10000
shards = 5
sensors_enabled = NamedHivePartitionSensor

Airflow initdb command

The initdb command will initialize the airflow database. We generally used this command while setting up the airflow the first time.


I am pasting the output of the initdb command just for reference.

airflow initdb
[2020-01-01 21:49:21,603] {settings.py:252} INFO - settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=1800, pid=20917
DB: postgresql+psycopg2://[email protected]:5432/airflow_mdb
[2020-01-04 20:19:22,257] {db.py:368} INFO - Creating tables
INFO  [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO  [alembic.runtime.migration] Will assume transactional DDL.
Done.

Airflow Resetdb command

reset DB command will delete all records from the metadata database, including all DAG runs, Variables, and Connections. Do not run this command after your airflow instance is successfully set up; otherwise, you will lose the entire airflow metadata.

Airflow Connections command

Airflow encrypts your passwords in the connection, and it will make sure that Passwords cannot be manipulated/read without the key. Connections can be managed in the airflow by the User interface(Menu –> Admin –> Connections) or by the command line.

Airflow Connections command

We can add, delete, and export the connections with the connections command. Let’s run the below command and verify what all facilities connection command provides:

airflow connections -h
usage: airflow connections [-h] COMMAND ...

Manage connections

positional arguments:
  COMMAND
    add       Add a connection
    delete    Delete a connection
    export    Export all connections
    get       Get a connection
    import    Import connections from a file
    list      List connections

optional arguments:
  -h, --help  show this help message and exit

Airflow List connections

use the connections list command to check all connections present in airflow.

airflow connections list
id | conn_id    | conn_type | description | host | schema | login | password                              | port | is_encrypted | is_extra_encrypted | extra_dejson | get_uri
===+============+===========+=============+======+========+=======+=======================================+======+==============+====================+==============+=======================================
1  | slack_conn | http      |             |      |        |       | https://hooks.slack.com/services/T023 | None | False        | False              | {}           | http://:https%3A%2F%2Fhooks.slack.com%

As you can see, we have a slack_conn present in airflow, which I have created from the UI. Let’s try to create another connection using the command line.

Create an airflow connection

Use add test_connection command to create a new connection. Please provide the connection type, connection login, and connection password.

airflow connections add test_connection --conn-type=http --conn-login=test --conn-password=test
[2021-05-29 13:34:18,319] {crypto.py:82} WARNING - empty cryptography key - values will not be stored encrypted.
Successfully added `conn_id`=test_connection : http://test:******@:

Now let’s verify if a new connection gets created.

airflow connections list
id | conn_id         | conn_type | description | host | schema | login | password                            | port | is_encrypted | is_extra_encrypted | extra_dejson | get_uri
===+=================+===========+=============+======+========+=======+=====================================+======+==============+====================+==============+====================================
1  | slack_conn      | http      |             |      |        |       | https://hooks.slack.com/services/T0 | None | False        | False              | {}           | http://:https%3A%2F%2Fhooks.slack.c          
2  | test_connection | http      | None        | None | None   | test  | test                                | None | False        | False              | {}           | http://test:[email protected]

Awesome, Now let’s try to delete the connection from the command line.

Delete a connection in airflow

The connections delete command lets you delete a connection from the airflow. Before running this command, ensure that you are not using this connection anywhere in your DAG; otherwise, particular airflow jobs will fail.

airflow connections delete test_connection
Successfully deleted connection with `conn_id`=test_connection

Let’s proceed further and check another remarkable airflow command with is dags.

Airflow DAGs command

As per airflow official document, a DAG, A Directed Acyclic Graph is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies.

You can find all the DAG under the DAGs tab.

Airflow DAGs command

DAGs tab will display the active and paused dags and all DAGs present in airflow. We can get the exact details from the airflow command line using the dags command.

Let’s run the dags help command and check what all option airflow dags command line provides:

airflow dags -h
usage: airflow dags [-h] COMMAND ...

Manage DAGs

positional arguments:
  COMMAND
    backfill      Run subsections of a DAG for a specified date range
    delete        Delete all DB records related to the specified DAG
    list          List all the DAGs
    list-jobs     List the jobs
    list-runs     List DAG runs given a DAG id
    next-execution
                  Get the next execution datetimes of a DAG
    pause         Pause a DAG
    report        Show DagBag loading report
    show          Displays DAG's tasks with their dependencies
    state         Get the status of a dag run
    test          Execute one single DagRun
    trigger       Trigger a DAG run
    unpause       Resume a paused DAG

optional arguments:
  -h, --help      show this help message and exit

We can list all dags, delete a dag, list jobs inside dags, etc. Let’s try some of the most common DAG commands.

Airflow list all DAG

The dags list command lists all the DAG in airflow. It will show you the DAG owner and the status where the job is paused or active.

airflow dags list
dag_id                                  | filepath                                                                                                         | owner   | paused
========================================+==================================================================================================================+=========+=======
airflow_slack_notification_tutorial     | test_slack_alert.py                                                                                              | airflow | False
email_tutorial                          | testemail.py                                                                                                     | airflow | True
example_bash_operator                   | /home/airflow/.local/lib/python3.6/site-packages/airflow/example_dags/example_bash_operator.py                   | airflow | True
example_branch_datetime_operator_2      | /home/airflow/.local/lib/python3.6/site-packages/airflow/example_dags/example_branch_datetime_operator.py        | airflow | True
example_branch_dop_operator_v3          | /home/airflow/.local/lib/python3.6/site-packages/airflow/example_dags/example_branch_python_dop_operator_3.py    | airflow | True
example_branch_labels                   | /home/airflow/.local/lib/python3.6/site-packages/airflow/example_dags/example_branch_labels.py                   | airflow | True
.......<suppressed o/p>...........
.......<suppressed o/p>...........
tutorial_taskflow_api_etl_virtualenv    | /home/airflow/.local/lib/python3.6/site-packages/airflow/example_dags/tutorial_taskflow_api_etl_virtualenv.py    | airflow | True

Airflow list-jobs command

dags list-jobs List the jobs inside a DAG. Let’s run this command and verify the output.

airflow dags list-jobs -d airflow_slack_notification_tutorial
dag_id                              | state   | job_type     | start_date                       | end_date
====================================+=========+==============+==================================+=================================
airflow_slack_notification_tutorial | success | LocalTaskJob | 2021-05-28 14:22:22.697377+00:00 | 2021-05-28 14:22:23.924578+00:00
airflow_slack_notification_tutorial | success | LocalTaskJob | 2021-05-28 14:22:19.270604+00:00 | 2021-05-28 14:22:22.514622+00:00
airflow_slack_notification_tutorial | success | LocalTaskJob | 2021-05-28 14:17:15.783816+00:00 | 2021-05-28 14:17:17.621517+00:00
airflow_slack_notification_tutorial | success | LocalTaskJob | 2021-05-28 14:17:12.587375+00:00 | 2021-05-28 14:17:15.417111+00:00
airflow_slack_notification_tutorial | success | LocalTaskJob | 2021-05-28 14:17:11.581316+00:00 | 2021-05-28 14:17:14.024990+00:00
airflow_slack_notification_tutorial | success | LocalTaskJob | 2021-05-28 14:17:06.681462+00:00 | 2021-05-28 14:17:10.110889+00:00

Airflow list runs command

dags list-runs command takes DAG id as input, and list DAG runs of a given DAG id. If the user provides the state option and thedags list-runs command, it will only search for all the dag runs with the given state.

airflow dags list-runs -d airflow_slack_notification_tutorial
dag_id                              | run_id                                      | state  | execution_date                   | start_date                       | end_date
====================================+=============================================+========+==================================+==================================+=================================
airflow_slack_notification_tutorial | manual__2021-05-28T14:14:19.651888+00:00    | pass | 2021-05-28T14:14:19.651888+00:00 | 2021-05-28T14:14:19.697786+00:00 | 2021-05-28T14:22:24.931743+00:00
airflow_slack_notification_tutorial | scheduled__2021-05-27T14:16:56.676522+00:00 | pass | 2021-05-27T14:16:56.676522+00:00 | 2021-05-28T14:16:58.075571+00:00 | 2021-05-28T14:22:23.482804+00:00

Airflow next Dag execution command

The dags next-execution command displays the next execution datetimes of a DAG. If you wish to get more than 1 execution datetimes pass -n parameter.

airflow dags next-execution email_tutorial -n 2
2021-05-29 13:57:56.965112+00:00
2021-05-30 13:57:56.965112+00:00

Now let’s run the subsequent execution in a stopped DAg and verify the output.

airflow dags next-execution email_tutorial -n 1
[INFO] Please be reminded this DAG is PAUSED now.
2021-05-24 10:21:15.464155+00:00

As you can see, we got the next execution information, but we got an INFO.

Airflow DAG report command

The dag report command will show the dag loading report. This command shows you helpful information like the File location or directory to look for the dag.

airflow dags report
file                                                                              | duration       | dag_num | task_num | dags
==================================================================================+================+=========+==========+===================================================================================
/test_slack_alert.py                                                              | 0:00:00.041452 | 1       | 2        | airflow_slack_notification_tutorial
/testemail.py                                                                     | 0:00:00.024659 | 1       | 1        | email_tutorial
/home/airflow/.local/lib/python3.6/site-packages/airflow/example_dags/example_sub | 0:00:00.017983 | 3       | 15       | example_subdag_operator,example_subdag_operator.section-1,example_subdag_operator.
.......<suppressed o/p>...........
.......<suppressed o/p>...........
nch_labels.py                                                                     |                |         |          |
/home/airflow/.local/lib/python3.6/site-packages/airflow/example_dags/test_utils. | 0:00:00.002076 | 1       | 1        | test_utils
py                                                                                |                |         |          |
/home/airflow/.local/lib/python3.6/site-packages/airflow/example_dags/tutorial_et | 0:00:00.001613 | 1       | 3        | tutorial_etl_dag
l_dag.py                                                                          |                |         |          |
/home/airflow/.local/lib/python3.6/site-packages/airflow/example_dags/subdags/sub | 0:00:00.001112 | 0       | 0        |
dag.py                                                                            |                |         |          |

Airflow DAG show command

The Dags show command displays the complete DAG information and its dependencies.

airflow dags show airflow_slack_notification_tutorial
digraph airflow_slack_notification_tutorial {
	graph [label=airflow_slack_notification_tutorial labelloc=t rankdir=LR]
	simple_bash_task [color="#000000" fillcolor="#f0ede4" label=simple_bash_task shape=rectangle style="filled,rounded"]
	slack_notification [color="#000000" fillcolor="#f4a460" label=slack_notification shape=rectangle style="filled,rounded"]
	simple_bash_task -> slack_notification
}

Test run DAG in airflow

Testing a DAG before running is a quick method to determine whether DAG is working as expected. We can test the airflow DAG by running the dags test.

Let’s test the below .py file with the command line.

from datetime import timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

args = {
    'owner': 'airflow',
}

with DAG(
    dag_id='example_bash_operator',
    default_args=args,
    schedule_interval='0 0 * * *',
    start_date=days_ago(2),
    dagrun_timeout=timedelta(minutes=60),
    tags=['example', 'example2'],
    params={"example_key": "example_value"},
) as dag:

    run_this_last = DummyOperator(
        task_id='run_this_last',
    )

    # [START howto_operator_bash]
    run_this = BashOperator(
        task_id='run_after_loop',
        bash_command='echo 1',
    )
    # [END howto_operator_bash]

    run_this >> run_this_last

    for i in range(3):
        task = BashOperator(
            task_id='runme_' + str(i),
            bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
        )
        task >> run_this

    # [START howto_operator_bash_template]
    also_run_this = BashOperator(
        task_id='also_run_this',
        bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
    )
    # [END howto_operator_bash_template]
    also_run_this >> run_this_last

# [START howto_operator_bash_skip]
this_will_skip = BashOperator(
    task_id='this_will_skip',
    bash_command='echo "hello world"; exit 99;',
    dag=dag,
)
# [END howto_operator_bash_skip]
this_will_skip >> run_this_last

if __name__ == "__main__":
    dag.cli()

Create a DAG by copy-pasting the below code in a .py file and running the dags test command

airflow dags test example_bash_operator 2021-01-01
[2021-05-29 17:48:53,823] {dagbag.py:487} INFO - Filling up the DagBag from /opt/airflow/dags
[2021-05-29 17:48:54,494] {base_executor.py:82} INFO - Adding to queue: ['<TaskInstance: example_bash_operator.runme_0 2021-01-01 00:00:00+00:00 [queued]>']
[2021-05-29 17:48:54,555] {base_executor.py:82} INFO - Adding to queue: ['<TaskInstance: example_bash_operator.runme_1 2021-01-01 00:00:00+00:00 [queued]>']
.......<suppressed o/p>...........
.......<suppressed o/p>...........
20210529T174904
[2021-05-29 17:49:04,574] {taskinstance.py:1245} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-05-29 17:49:04,617] {dagrun.py:444} INFO - Marking run <DagRun example_bash_operator @ 2021-01-01 00:00:00+00:00: backfill__2021-01-01T00:00:00+00:00, externally triggered: False> successful
[2021-05-29 17:49:04,631] {backfill_job.py:388} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 1 | succeeded: 5 | running: 0 | failed: 0 | skipped: 1 | deadlocked: 0 | not ready: 1
[2021-05-29 17:49:09,414] {backfill_job.py:388} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 5 | running: 0 | failed: 0 | skipped: 2 | deadlocked: 0 | not ready: 0
[2021-05-29 17:49:09,427] {backfill_job.py:831} INFO - Backfill done. Exiting.

Here the DAG run passed without any issue.

Airflow delete a DAG

Sometimes we have a delete a DAG which is no longer required. We can delete a DAG by dags delete command. Let’s try to delete a DAG.

airflow dags delete test_utils -y
[2021-05-29 13:46:32,100] {__init__.py:38} INFO - Loaded API auth backend: <module 'airflow.api.auth.backend.basic_auth' from '/home/airflow/.local/lib/python3.6/site-packages/airflow/api/auth/backend/basic_auth.py'>
[2021-05-29 13:46:32,131] {delete_dag.py:42} INFO - Deleting DAG: test_utils
Removed 2 record(s)

Pass -y=This will drop all existing records related to the specified DAG

Airflow Tasks command

tasks command helps us manage tasks. With the tasks command, we can run a task, test a task, check the task’s status, and perform many more operations.

[email protected]:/opt/airflow$ airflow tasks -h
usage: airflow tasks [-h] COMMAND ...

Manage tasks

positional arguments:
  COMMAND
    clear             Clear a set of task instance, as if they never ran
    failed-deps       Returns the unmet dependencies for a task instance
    list              List the tasks within a DAG
    render            Render a task instance's template(s)
    run               Run a single task instance
    state             Get the status of a task instance
    states-for-dag-run
                      Get the status of all task instances in a dag run
    test              Test a task instance

optional arguments:
  -h, --help          show this help message and exit

Let’s try a few of the task commands.

Airflow list all task within DAG

The tasks list command takes the DAG name as a parameter and lists all the tasks present in the DAG.

airflow tasks list example_bash_operator
also_run_this
run_after_loop
run_this_last
runme_0
runme_1
runme_2
this_will_skip

Airflow Task run

Task run command helps us to run any task present in DAG. The run command takes the below arguments.

positional arguments:
  dag_id                The id of the dag
  task_id               The id of the task
  execution_date        The execution date of the DAG

Let’s run a simple task using the command line

airflow tasks run example_bash_operator runme_0 2021-01-01
[2021-05-29 15:59:22,239] {dagbag.py:487} INFO - Filling up the DagBag from /opt/***/dags
.......<suppressed o/p>...........
.......<suppressed o/p>...........
Running <TaskInstance: example_bash_operator.runme_0 2021-01-01T00:00:00+00:00 [success]> on host a4bd0ae3c9a0

Airflow check status of task

with the help of the tasks state command; we can check the status of a particular task.

airflow tasks state example_bash_operator runme_0 2021-01-01
success

Airflow database check command

Airflow depends on a database to save its metadata. We can quickly check if the database is reachable with the DB check command.

airflow db check
[2021-05-29 15:43:02,284] {db.py:776} INFO - Connection successful.

Airflow Jobs command

With job command, we can easily manage jobs in airflow. Let’s run this command and verify if there are any active jobs.

airflow jobs check
Found one alive job.

Roles command in airflow

With the help of the roles command, we can easily create and list roles in airflow. Let’s see that role command in action.

Airflow list roles

roles list command lists all the roles available in the airflow instance.

airflow roles list
name
======
Admin
Op
Public
User
Viewer

Users command in airflow

Users is another handy command which enables us to Manage users. Let’s see a few users’ commands in action.

Airflow users list command

The user’s list command lists all the users.

airflow users list
id | username | email                    | first_name | last_name | roles
===+==========+==========================+============+===========+======
1  | airflow  | [email protected] | Airflow    | Admin     | Admin

Airflow create an user

We can also create a user using the users create command.


Let’s see what the required parameters are:

airflow users create -h
usage: airflow users create [-h] -e EMAIL -f FIRSTNAME -l LASTNAME
                            [-p PASSWORD] -r ROLE [--use-random-password] -u
                            USERNAME

Create a user

optional arguments:
  -h, --help            show this help message and exit
  -e EMAIL, --email EMAIL
                        Email of the user
  -f FIRSTNAME, --firstname FIRSTNAME
                        First name of the user
  -l LASTNAME, --lastname LASTNAME
                        Last name of the user
  -p PASSWORD, --password PASSWORD
                        Password of the user, required to create a user without --use-random-password
  -r ROLE, --role ROLE  Role of the user. Existing roles include Admin, User, Op, Viewer, and Public
  --use-random-password
                        Do not prompt for password. Use random string instead. Required to create a user without --password
  -u USERNAME, --username USERNAME
                        Username of the user

Now create a test user using the users create command. If the password is not specified, the user will get prompted for a user password.

airflow users create -r User -u test -e [email protected] -f test_first_name -l test_last_name -p test
User user test created

airflow users list
id | username | email                    | first_name      | last_name      | roles
===+==========+==========================+=================+================+======
1  | airflow  | [email protected] | Airflow         | Admin          | Admin
2  | test     | [email protected]            | test_first_name | test_last_name | User

Airflow Variables command

With the variable command, we can easily manage the variables. Let’s use a few variable commands.

List variables in airflow

The variables list lists all the variable keys in airflow.

airflow variables list
key
====
test

if you wish to get a variable value, please use the below command

airflow variables get test
test_value

You can check the user from the airflow user interface as well

List variables in airflow

Airflow create a variable

with the variables set; we can easily create variables in airflow. The set takes the variable key and variable value as a parameter.

airflow variables set variable_key variable_value
[2021-05-29 16:15:48,093] {crypto.py:82} WARNING - empty cryptography key - values will not be stored encrypted.
Variable variable_key created

airflow variables list
key
============
test
variable_key

Conclusion

Finally, we have come to an end to this trying tutorial. We started with the airflow setup command, and we have learned how to manage DAGs and tasks in airflow. At last, we learned how to manage users, roles, and variables in airflow. I hope you have found this article useful. Please do let me know in the comment box if you face any issues with the above commands. Happy learning.

More to Read?

How to send email from airflow

How to integrate airflow with slack

Install airflow using docker

Leave a Comment

Your email address will not be published.

Scroll to Top