In this blog, we will learn about Airflow operators, what they are, and how they are used in Airflow workflows. We will explore various examples of Airflow operators to understand their functionality better. Let’s dive in!
What are operators in airflow?
In Apache Airflow, operators are the building blocks of workflows representing a single task in a DAG. Operators define the actions to be taken when a task is executed
Operators can be used to define tasks that perform a wide range of actions, such as:
- Extracting data from a source system
- Transforming data
- Loading data
- Running a Python function
- Running a Bash command
- Triggering an external process or application
Airflow provides a variety of built-in operators that cover many common use cases, such as BashOperator, PythonOperator, etc. Additionally, users can create their own custom operators to handle the specific requirement.
Airflow operators list
Apache Airflow provides a wide variety of operators that can be used to define tasks in a DAG. Below is the list of most commonly used operators:
- BashOperator: Executes a bash command or script.
- PythonOperator: Executes a Python function.
- DummyOperator: Does nothing. Useful as a placeholder or for testing.
- EmailOperator: Sends an email.
- PostgresOperator: Executes a PostgreSQL query.
- S3FileTransferOperator: Transfers files to/from Amazon S3.
- MySqlOperator: Executes a MySQL query.
- SparkSubmitOperator: Submits a Spark job to a cluster.
- SSHOperator: Executes a command on a remote SSH server.
- SubDagOperator: Executes a sub-DAG.
- SlackAPIOperator: Sends a message to a Slack channel.
- KubernetesPodOperator: Executes a task in a Kubernetes pod.
- HiveOperator: Executes a Hive query.
- HdfsSensor: Waits for a file to be added to HDFS.
- FileSensor: Waits for a file to be added to a specified directory.
These are just a few of the many operators available in Airflow. Users can also create their own custom operators by subclassing the BaseOperator class.
Airflow operator example
Below is an example of a simple BashOperator in an airflow DAG to execute a bash command:
from airflow import DAG from datetime import datetime, timedelta from airflow.operators.bash import BashOperator from datetime import datetime test_dag = DAG( 'test_operator', start_date=datetime(2023, 3, 15) ) bash_task = BashOperator( task_id='test_operator', bash_command='echo "Test bash"', dag=test_dag ) bash_task
The code defines an Airflow DAG named ‘test_operator’ with a start date of March 15th, 2023. The DAG has a single task of type BashOperator named ‘test_operator’, which simply prints the string ‘Test bash’ to the console using the ‘echo’ command in a Bash shell.
When the DAG is executed, the BashOperator task will be triggered and it will run the specified bash command. The BashOperator is a simple operator that can be used to execute arbitrary Bash scripts or commands as a task in an Airflow DAG.
Important airflow operators
In this session, I will briefly explain a few of the widely used airflow operators.
Airflow python operator
The PythonOperator executes a Python callable task within a DAG. A PythonOperator allows you to define a custom Python function that performs some specific and then run that function as a task in your DAG.
Let’s take the below example
from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import datetime def test_python__function(): print("hello python") my_dag = DAG( 'sample_python_dag', start_date=datetime(2023, 3, 1), schedule_interval='@daily' ) python_task = PythonOperator( task_id='python_task', python_callable=test_python__function, dag=my_dag )
Refer my tutorial on airflow pythonOperator to get more information.
Airflow Bash operator
The Airflow BashOperator is a basic operator in Apache Airflow that allows you to execute a Bash command or shell script within an Airflow DAG.
Below is the sample BashOperator airflow DAG:
from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2023, 2, 15), 'retries': 0, } test_dag = DAG( 'test_bash_dag', default_args=default_args, schedule_interval=timedelta(days=1) ) # Define the BashOperator task bash_task = BashOperator( task_id='bash_task_execute_script', bash_command='echo "Hello world"', dag=test_dag ) # Set task dependencies bash_task
Follow my tutorial on airflow BashOperator to get more details
Airflow spark operator
The Spark Operator allows you to submit Apache Spark jobs as part of your Airflow DAGs to a Spark cluster.
To use the Spark Operator, you need to configure it with the Spark connection information, such as the Spark master URL, the location of the Spark binaries, and any Spark configuration properties.
When the SparkSubmitOperator is executed as part of your DAG, it creates a Spark application and submits it to the Spark cluster using the configured connection information
Let’s take the below example:
from datetime import datetime, timedelta from airflow import DAG from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator default_args = { 'owner': 'Naiveskill', 'depends_on_past': False, 'start_date': datetime(2023, 3, 7), } my_dag = DAG( 'spark_example', default_args=default_args, schedule_interval='@daily' ) spark_test_task = SparkSubmitOperator( task_id='spark_test_job', conn_id='spark_default', application='/<path_to_your_spark_job.py>', name='test_spark_job', executor_memory='2g', num_executors=4, driver_memory='1g', conf={ 'spark.app.name': 'my_spark_job', 'spark.executorEnv.PYTHONPATH': '/usr/lib/python3.7/site-packages' }, dag=my_dag, ) spark_test_task
Airflow Kubernetes operator
The Kubernetes Operator allows you to run Kubernetes objects, such as pods and deployments, as part of your Airflow DAG. To use the Kubernetes Operator, you need to configure it with the connection information for your Kubernetes cluster, such as the cluster API server URL.
Then, you can define a KubernetesPodOperator to specify the Kubernetes object to run and any necessary configuration options.
The KubernetesPodOperator creates and runs a Kubernetes Pod. It allows you to specify the Docker image to use, the command to run in the container, the arguments to pass to the command, and any necessary environment variables and volume mounts.
When the Kubernetes operator is executed as part of your DAG, it creates and submits the specified Kubernetes object to the Kubernetes cluster using the configured connection information.
Let’s take the below example
from datetime import datetime, timedelta from airflow import DAG from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator from kubernetes.client import models as k8s default_args = { 'owner': 'naiveskill', 'depends_on_past': False, 'start_date': datetime(2023, 3, 7), } my_dag = DAG('kubernetes_example', default_args=default_args, schedule_interval='@daily') k8s_task = KubernetesPodOperator( task_id='kubernetes_sample_pod', name='kubernetes_sample_pod', image='<docker_image_name:latest>', cmds=['python', '/path/to/my/test_script.py'], arguments=['--arg1', 'value1', '--arg2', 'value2'], env_vars={ 'ENV_VAR_1': 'value1', 'ENV_VAR_2': 'value2', }, volumes=[ k8s.V1Volume( name='my_volume', persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource( claim_name='PVC_name', ), ), ], dag=my_dag, )
Conclusion
I hope you have liked this tutorial on airflow operators. Please do let me know if you are facing any issues while following along.