Apache airflow is an excellent open-source tool that lets you manage and run a complex data pipeline. Airflow has a straightforward user interface as well, using which we can easily manage DAG and can add users and configuration. Airflow also has a very rich UI that allows for many types of operation on a DAG. We can also perform various admin tasks with the help of the airflow command line(CLI).
In this tutorial, I will be explaining how to use the airflow command-line interface. Please follow this link to set up the airflow.
Check airflow version
To check the airflow version, connect to the server where airflow is installed and type the below command. To get the latest features, you recommended using the latest airflow version.
airflow version 2.1.0
Airflow config
With the help of the airflow config list, you will get complete information about the airflow configs.
Config command gives information about the DAG folders, logging, metrics, API, etc. It is a compelling command if you wish to verify/check your airflow configurations.
airflow config list [core] dags_folder = /opt/airflow/dags hostname_callable = socket.getfqdn default_timezone = utc executor = CeleryExecutor sql_alchemy_conn = postgresql+psycopg2://airflow:[email protected]/airflow sql_engine_encoding = utf-8 .......<suppressed o/p>........... .......<suppressed o/p>........... [smart_sensor] use_smart_sensor = False shard_code_upper_limit = 10000 shards = 5 sensors_enabled = NamedHivePartitionSensor
Airflow initdb command
The initdb command will initialize the airflow database. We generally used this command while setting up the airflow the first time.
I am pasting the output of the initdb command just for reference.
airflow initdb [2020-01-01 21:49:21,603] {settings.py:252} INFO - settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=1800, pid=20917 DB: postgresql+psycopg2://[email protected]:5432/airflow_mdb [2020-01-04 20:19:22,257] {db.py:368} INFO - Creating tables INFO [alembic.runtime.migration] Context impl PostgresqlImpl. INFO [alembic.runtime.migration] Will assume transactional DDL. Done.
Airflow Resetdb command
reset DB command will delete all records from the metadata database, including all DAG runs, Variables, and Connections. Do not run this command after your airflow instance is successfully set up; otherwise, you will lose the entire airflow metadata.
Airflow Connections command
Airflow encrypts your passwords in the connection, and it will make sure that Passwords cannot be manipulated/read without the key. Connections can be managed in the airflow by the User interface(Menu –> Admin –> Connections) or by the command line.
We can add, delete, and export the connections with the connections command. Let’s run the below command and verify what all facilities connection command provides:
airflow connections -h usage: airflow connections [-h] COMMAND ... Manage connections positional arguments: COMMAND add Add a connection delete Delete a connection export Export all connections get Get a connection import Import connections from a file list List connections optional arguments: -h, --help show this help message and exit
Airflow List connections
use the connections list command to check all connections present in airflow.
airflow connections list id | conn_id | conn_type | description | host | schema | login | password | port | is_encrypted | is_extra_encrypted | extra_dejson | get_uri ===+============+===========+=============+======+========+=======+=======================================+======+==============+====================+==============+======================================= 1 | slack_conn | http | | | | | https://hooks.slack.com/services/T023 | None | False | False | {} | http://:https%3A%2F%2Fhooks.slack.com%
As you can see, we have a slack_conn present in airflow, which I have created from the UI. Let’s try to create another connection using the command line.
Create an airflow connection
Use add test_connection command to create a new connection. Please provide the connection type, connection login, and connection password.
airflow connections add test_connection --conn-type=http --conn-login=test --conn-password=test [2021-05-29 13:34:18,319] {crypto.py:82} WARNING - empty cryptography key - values will not be stored encrypted. Successfully added `conn_id`=test_connection : http://test:******@:
Now let’s verify if a new connection gets created.
airflow connections list id | conn_id | conn_type | description | host | schema | login | password | port | is_encrypted | is_extra_encrypted | extra_dejson | get_uri ===+=================+===========+=============+======+========+=======+=====================================+======+==============+====================+==============+==================================== 1 | slack_conn | http | | | | | https://hooks.slack.com/services/T0 | None | False | False | {} | http://:https%3A%2F%2Fhooks.slack.c 2 | test_connection | http | None | None | None | test | test | None | False | False | {} | http://test:[email protected]
Awesome, Now let’s try to delete the connection from the command line.
Delete a connection in the airflow
The connections delete command lets you delete a connection from the airflow. Before running this command, ensure that you are not using this connection anywhere in your DAG; otherwise, particular airflow jobs will fail.
airflow connections delete test_connection Successfully deleted connection with `conn_id`=test_connection
Let’s proceed further and check another remarkable airflow command with is dags.
Airflow DAGs command
As per airflow’s official document, a DAG
, A Directed Acyclic Graph is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies.
You can find all the DAG under the DAGs tab.
The DAGs tab will display the active and paused dags and all DAGs present in airflow. We can get the exact details from the airflow command line using the dags command.
Let’s run the dags help command and check what all option airflow dags command line provides:
airflow dags -h usage: airflow dags [-h] COMMAND ... Manage DAGs positional arguments: COMMAND backfill Run subsections of a DAG for a specified date range delete Delete all DB records related to the specified DAG list List all the DAGs list-jobs List the jobs list-runs List DAG runs given a DAG id next-execution Get the next execution datetimes of a DAG pause Pause a DAG report Show DagBag loading report show Displays DAG's tasks with their dependencies state Get the status of a dag run test Execute one single DagRun trigger Trigger a DAG run unpause Resume a paused DAG optional arguments: -h, --help show this help message and exit
We can list all dags, delete a dag, list jobs inside dags, etc. Let’s try some of the most common DAG commands.
Airflow list all DAG
The dags list command lists all the DAG in airflow. It will show you the DAG owner and the status where the job is paused or active.
airflow dags list dag_id | filepath | owner | paused ========================================+==================================================================================================================+=========+======= airflow_slack_notification_tutorial | test_slack_alert.py | airflow | False email_tutorial | testemail.py | airflow | True example_bash_operator | /home/airflow/.local/lib/python3.6/site-packages/airflow/example_dags/example_bash_operator.py | airflow | True example_branch_datetime_operator_2 | /home/airflow/.local/lib/python3.6/site-packages/airflow/example_dags/example_branch_datetime_operator.py | airflow | True example_branch_dop_operator_v3 | /home/airflow/.local/lib/python3.6/site-packages/airflow/example_dags/example_branch_python_dop_operator_3.py | airflow | True example_branch_labels | /home/airflow/.local/lib/python3.6/site-packages/airflow/example_dags/example_branch_labels.py | airflow | True .......<suppressed o/p>........... .......<suppressed o/p>........... tutorial_taskflow_api_etl_virtualenv | /home/airflow/.local/lib/python3.6/site-packages/airflow/example_dags/tutorial_taskflow_api_etl_virtualenv.py | airflow | True
Airflow list-jobs command
dags list-jobs List the jobs inside a DAG. Let’s run this command and verify the output.
airflow dags list-jobs -d airflow_slack_notification_tutorial dag_id | state | job_type | start_date | end_date ====================================+=========+==============+==================================+================================= airflow_slack_notification_tutorial | success | LocalTaskJob | 2021-05-28 14:22:22.697377+00:00 | 2021-05-28 14:22:23.924578+00:00 airflow_slack_notification_tutorial | success | LocalTaskJob | 2021-05-28 14:22:19.270604+00:00 | 2021-05-28 14:22:22.514622+00:00 airflow_slack_notification_tutorial | success | LocalTaskJob | 2021-05-28 14:17:15.783816+00:00 | 2021-05-28 14:17:17.621517+00:00 airflow_slack_notification_tutorial | success | LocalTaskJob | 2021-05-28 14:17:12.587375+00:00 | 2021-05-28 14:17:15.417111+00:00 airflow_slack_notification_tutorial | success | LocalTaskJob | 2021-05-28 14:17:11.581316+00:00 | 2021-05-28 14:17:14.024990+00:00 airflow_slack_notification_tutorial | success | LocalTaskJob | 2021-05-28 14:17:06.681462+00:00 | 2021-05-28 14:17:10.110889+00:00
Airflow list runs command
dags list-runs command takes the DAG id as input and lists DAG runs of a given DAG id. If the user provides the state option and thedags list-runs command, it will only search for all the dag runs with the given state.
airflow dags list-runs -d airflow_slack_notification_tutorial dag_id | run_id | state | execution_date | start_date | end_date ====================================+=============================================+========+==================================+==================================+================================= airflow_slack_notification_tutorial | manual__2021-05-28T14:14:19.651888+00:00 | pass | 2021-05-28T14:14:19.651888+00:00 | 2021-05-28T14:14:19.697786+00:00 | 2021-05-28T14:22:24.931743+00:00 airflow_slack_notification_tutorial | scheduled__2021-05-27T14:16:56.676522+00:00 | pass | 2021-05-27T14:16:56.676522+00:00 | 2021-05-28T14:16:58.075571+00:00 | 2021-05-28T14:22:23.482804+00:00
Airflow next Dag execution command
The dags next-execution command displays the next execution datetimes of a DAG. If you wish to get more than 1 execution datetimes pass -n parameter.
airflow dags next-execution email_tutorial -n 2 2021-05-29 13:57:56.965112+00:00 2021-05-30 13:57:56.965112+00:00
Now let’s run the subsequent execution in a stopped DAg and verify the output.
airflow dags next-execution email_tutorial -n 1 [INFO] Please be reminded this DAG is PAUSED now. 2021-05-24 10:21:15.464155+00:00
As you can see, we got the next execution information, but we got an INFO.
Airflow DAG report command
The dag report command will show the dag loading report. This command shows you helpful information like the File location or directory to look for the dag.
airflow dags report file | duration | dag_num | task_num | dags ==================================================================================+================+=========+==========+=================================================================================== /test_slack_alert.py | 0:00:00.041452 | 1 | 2 | airflow_slack_notification_tutorial /testemail.py | 0:00:00.024659 | 1 | 1 | email_tutorial /home/airflow/.local/lib/python3.6/site-packages/airflow/example_dags/example_sub | 0:00:00.017983 | 3 | 15 | example_subdag_operator,example_subdag_operator.section-1,example_subdag_operator. .......<suppressed o/p>........... .......<suppressed o/p>........... nch_labels.py | | | | /home/airflow/.local/lib/python3.6/site-packages/airflow/example_dags/test_utils. | 0:00:00.002076 | 1 | 1 | test_utils py | | | | /home/airflow/.local/lib/python3.6/site-packages/airflow/example_dags/tutorial_et | 0:00:00.001613 | 1 | 3 | tutorial_etl_dag l_dag.py | | | | /home/airflow/.local/lib/python3.6/site-packages/airflow/example_dags/subdags/sub | 0:00:00.001112 | 0 | 0 | dag.py | | | |
Airflow DAG show command
The Dags show command displays the complete DAG information and its dependencies.
airflow dags show airflow_slack_notification_tutorial digraph airflow_slack_notification_tutorial { graph [label=airflow_slack_notification_tutorial labelloc=t rankdir=LR] simple_bash_task [color="#000000" fillcolor="#f0ede4" label=simple_bash_task shape=rectangle style="filled,rounded"] slack_notification [color="#000000" fillcolor="#f4a460" label=slack_notification shape=rectangle style="filled,rounded"] simple_bash_task -> slack_notification }
Test run DAG in airflow
Testing a DAG before running is a quick method to determine whether DAG is working as expected. We can test the airflow DAG by running the dags test.
Let’s test the below .py file with the command line.
from datetime import timedelta from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.dummy import DummyOperator from airflow.utils.dates import days_ago args = { 'owner': 'airflow', } with DAG( dag_id='example_bash_operator', default_args=args, schedule_interval='0 0 * * *', start_date=days_ago(2), dagrun_timeout=timedelta(minutes=60), tags=['example', 'example2'], params={"example_key": "example_value"}, ) as dag: run_this_last = DummyOperator( task_id='run_this_last', ) # [START howto_operator_bash] run_this = BashOperator( task_id='run_after_loop', bash_command='echo 1', ) # [END howto_operator_bash] run_this >> run_this_last for i in range(3): task = BashOperator( task_id='runme_' + str(i), bash_command='echo "{{ task_instance_key_str }}" && sleep 1', ) task >> run_this # [START howto_operator_bash_template] also_run_this = BashOperator( task_id='also_run_this', bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"', ) # [END howto_operator_bash_template] also_run_this >> run_this_last # [START howto_operator_bash_skip] this_will_skip = BashOperator( task_id='this_will_skip', bash_command='echo "hello world"; exit 99;', dag=dag, ) # [END howto_operator_bash_skip] this_will_skip >> run_this_last if __name__ == "__main__": dag.cli()
Create a DAG by copy-pasting the below code in a .py file and running the dags test command
airflow dags test example_bash_operator 2021-01-01 [2021-05-29 17:48:53,823] {dagbag.py:487} INFO - Filling up the DagBag from /opt/airflow/dags [2021-05-29 17:48:54,494] {base_executor.py:82} INFO - Adding to queue: ['<TaskInstance: example_bash_operator.runme_0 2021-01-01 00:00:00+00:00 [queued]>'] [2021-05-29 17:48:54,555] {base_executor.py:82} INFO - Adding to queue: ['<TaskInstance: example_bash_operator.runme_1 2021-01-01 00:00:00+00:00 [queued]>'] .......<suppressed o/p>........... .......<suppressed o/p>........... 20210529T174904 [2021-05-29 17:49:04,574] {taskinstance.py:1245} INFO - 0 downstream tasks scheduled from follow-on schedule check [2021-05-29 17:49:04,617] {dagrun.py:444} INFO - Marking run <DagRun example_bash_operator @ 2021-01-01 00:00:00+00:00: backfill__2021-01-01T00:00:00+00:00, externally triggered: False> successful [2021-05-29 17:49:04,631] {backfill_job.py:388} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 1 | succeeded: 5 | running: 0 | failed: 0 | skipped: 1 | deadlocked: 0 | not ready: 1 [2021-05-29 17:49:09,414] {backfill_job.py:388} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 5 | running: 0 | failed: 0 | skipped: 2 | deadlocked: 0 | not ready: 0 [2021-05-29 17:49:09,427] {backfill_job.py:831} INFO - Backfill done. Exiting.
Here the DAG run passed without any issues.
Airflow delete a DAG
Sometimes we have a delete a DAG that is no longer required. We can delete a DAG by the dags delete command. Let’s try to delete a DAG.
airflow dags delete test_utils -y [2021-05-29 13:46:32,100] {__init__.py:38} INFO - Loaded API auth backend: <module 'airflow.api.auth.backend.basic_auth' from '/home/airflow/.local/lib/python3.6/site-packages/airflow/api/auth/backend/basic_auth.py'> [2021-05-29 13:46:32,131] {delete_dag.py:42} INFO - Deleting DAG: test_utils Removed 2 record(s)
Pass -y=This will drop all existing records related to the specified DAG
Airflow Tasks command
tasks command helps us manage tasks. With the tasks command, we can run a task, test a task, check the task’s status, and perform many more operations.
[email protected]:/opt/airflow$ airflow tasks -h usage: airflow tasks [-h] COMMAND ... Manage tasks positional arguments: COMMAND clear Clear a set of task instance, as if they never ran failed-deps Returns the unmet dependencies for a task instance list List the tasks within a DAG render Render a task instance's template(s) run Run a single task instance state Get the status of a task instance states-for-dag-run Get the status of all task instances in a dag run test Test a task instance optional arguments: -h, --help show this help message and exit
Let’s try a few of the task commands.
Airflow list all task within DAG
The tasks list command takes the DAG name as a parameter and lists all the tasks present in the DAG.
airflow tasks list example_bash_operator also_run_this run_after_loop run_this_last runme_0 runme_1 runme_2 this_will_skip
Airflow Task run
The task run command helps us to run any task present in DAG. The run command takes the below arguments.
positional arguments:
dag_id The id of the dag
task_id The id of the task
execution_date The execution date of the DAG
Let’s run a simple task using the command line
airflow tasks run example_bash_operator runme_0 2021-01-01 [2021-05-29 15:59:22,239] {dagbag.py:487} INFO - Filling up the DagBag from /opt/***/dags .......<suppressed o/p>........... .......<suppressed o/p>........... Running <TaskInstance: example_bash_operator.runme_0 2021-01-01T00:00:00+00:00 [success]> on host a4bd0ae3c9a0
Airflow check the status of a task
with the help of the tasks state command; we can check the status of a particular task.
airflow tasks state example_bash_operator runme_0 2021-01-01 success
Airflow database check command
Airflow depends on a database to save its metadata. We can quickly check if the database is reachable with the DB check command.
airflow db check [2021-05-29 15:43:02,284] {db.py:776} INFO - Connection successful.
Airflow Jobs command
With job command, we can easily manage jobs in airflow. Let’s run this command and verify if there are any active jobs.
airflow jobs check Found one alive job.
Roles command in airflow
With the help of the roles command, we can easily create and list roles in airflow. Let’s see that role command in action.
Airflow list roles
roles list command lists all the roles available in the airflow instance.
airflow roles list name ====== Admin Op Public User Viewer
Users command in airflow
Users is another handy command which enables us to Manage users. Let’s see a few users’ commands in action.
Airflow users list command
The user’s list command lists all the users.
airflow users list id | username | email | first_name | last_name | roles ===+==========+==========================+============+===========+====== 1 | airflow | [email protected] | Airflow | Admin | Admin
Airflow create a user
We can also create a user using the user create command.
Let’s see what the required parameters are:
airflow users create -h usage: airflow users create [-h] -e EMAIL -f FIRSTNAME -l LASTNAME [-p PASSWORD] -r ROLE [--use-random-password] -u USERNAME Create a user optional arguments: -h, --help show this help message and exit -e EMAIL, --email EMAIL Email of the user -f FIRSTNAME, --firstname FIRSTNAME First name of the user -l LASTNAME, --lastname LASTNAME Last name of the user -p PASSWORD, --password PASSWORD Password of the user, required to create a user without --use-random-password -r ROLE, --role ROLE Role of the user. Existing roles include Admin, User, Op, Viewer, and Public --use-random-password Do not prompt for password. Use random string instead. Required to create a user without --password -u USERNAME, --username USERNAME Username of the user
Now create a test user using the users create command. If the password is not specified, the user will get prompted for a user password.
airflow users create -r User -u test -e [email protected] -f test_first_name -l test_last_name -p test User user test created airflow users list id | username | email | first_name | last_name | roles ===+==========+==========================+=================+================+====== 1 | airflow | [email protected] | Airflow | Admin | Admin 2 | test | [email protected] | test_first_name | test_last_name | User
Airflow Variables command
With the variable command, we can easily manage the variables. Let’s use a few variable commands.
List variables in airflow
The variables list lists all the variable keys in airflow.
airflow variables list key ==== test
if you wish to get a variable value, please use the below command
airflow variables get test test_value
You can check the user from the airflow user interface as well
Airflow create a variable
with the variables set; we can easily create variables in airflow. The set takes the variable key and variable value as a parameter.
airflow variables set variable_key variable_value [2021-05-29 16:15:48,093] {crypto.py:82} WARNING - empty cryptography key - values will not be stored encrypted. Variable variable_key created airflow variables list key ============ test variable_key
Conclusion
Finally, we have come to an end to this trying tutorial. We started with the airflow setup command, and we learned how to manage DAGs and tasks in airflow. At last, we learned how to manage users, roles, and variables in airflow. I hope you have found this article useful. Please do let me know in the comment box if you face any issues with the above commands. Happy learning.
More to Read?
How to send email from airflow