Airflow operators

Getting to Know Airflow Operators: A Comprehensive Guide in 2023

In this blog, we will learn about Airflow operators, what they are, and how they are used in Airflow workflows. We will explore various examples of Airflow operators to understand their functionality better. Let’s dive in!

What are operators in airflow?

What are operators in airflow

In Apache Airflow, operators are the building blocks of workflows representing a single task in a DAG. Operators define the actions to be taken when a task is executed

Operators can be used to define tasks that perform a wide range of actions, such as:

  • Extracting data from a source system
  • Transforming data
  • Loading data
  • Running a Python function
  • Running a Bash command
  • Triggering an external process or application

Airflow provides a variety of built-in operators that cover many common use cases, such as BashOperator, PythonOperator, etc. Additionally, users can create their own custom operators to handle the specific requirement.

Airflow operators list

Apache Airflow provides a wide variety of operators that can be used to define tasks in a DAG. Below is the list of most commonly used operators:

  1. BashOperator: Executes a bash command or script.
  2. PythonOperator: Executes a Python function.
  3. DummyOperator: Does nothing. Useful as a placeholder or for testing.
  4. EmailOperator: Sends an email.
  5. PostgresOperator: Executes a PostgreSQL query.
  6. S3FileTransferOperator: Transfers files to/from Amazon S3.
  7. MySqlOperator: Executes a MySQL query.
  8. SparkSubmitOperator: Submits a Spark job to a cluster.
  9. SSHOperator: Executes a command on a remote SSH server.
  10. SubDagOperator: Executes a sub-DAG.
  11. SlackAPIOperator: Sends a message to a Slack channel.
  12. KubernetesPodOperator: Executes a task in a Kubernetes pod.
  13. HiveOperator: Executes a Hive query.
  14. HdfsSensor: Waits for a file to be added to HDFS.
  15. FileSensor: Waits for a file to be added to a specified directory.

These are just a few of the many operators available in Airflow. Users can also create their own custom operators by subclassing the BaseOperator class.

Airflow operator example

Below is an example of a simple BashOperator in an airflow DAG to execute a bash command:

from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash import BashOperator
from datetime import datetime


test_dag = DAG(
    'test_operator',
    start_date=datetime(2023, 3, 15)
)

bash_task = BashOperator(
    task_id='test_operator',
    bash_command='echo "Test bash"',
    dag=test_dag
)

bash_task

The code defines an Airflow DAG named ‘test_operator’ with a start date of March 15th, 2023. The DAG has a single task of type BashOperator named ‘test_operator’, which simply prints the string ‘Test bash’ to the console using the ‘echo’ command in a Bash shell.

When the DAG is executed, the BashOperator task will be triggered and it will run the specified bash command. The BashOperator is a simple operator that can be used to execute arbitrary Bash scripts or commands as a task in an Airflow DAG.

Important airflow operators

In this session, I will briefly explain a few of the widely used airflow operators.

Airflow python operator

The PythonOperator executes a Python callable task within a DAG. A PythonOperator allows you to define a custom Python function that performs some specific and then run that function as a task in your DAG.

Let’s take the below example

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime


def test_python__function():
    print("hello python")


my_dag = DAG(
    'sample_python_dag',
    start_date=datetime(2023, 3, 1),
    schedule_interval='@daily'
)
python_task = PythonOperator(
    task_id='python_task',
    python_callable=test_python__function,
    dag=my_dag
)

Refer my tutorial on airflow pythonOperator to get more information.

Airflow Bash operator

The Airflow BashOperator is a basic operator in Apache Airflow that allows you to execute a Bash command or shell script within an Airflow DAG.

Below is the sample BashOperator airflow DAG:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 15),
    'retries': 0,
}
test_dag = DAG(
    'test_bash_dag',
    default_args=default_args,
    schedule_interval=timedelta(days=1)
)
# Define the BashOperator task
bash_task = BashOperator(
    task_id='bash_task_execute_script',
    bash_command='echo "Hello world"',
    dag=test_dag
)
# Set task dependencies
bash_task

Follow my tutorial on airflow BashOperator to get more details

Airflow spark operator

The Spark Operator allows you to submit Apache Spark jobs as part of your Airflow DAGs to a Spark cluster.

To use the Spark Operator, you need to configure it with the Spark connection information, such as the Spark master URL, the location of the Spark binaries, and any Spark configuration properties.

When the SparkSubmitOperator is executed as part of your DAG, it creates a Spark application and submits it to the Spark cluster using the configured connection information

Let’s take the below example:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator

default_args = {
    'owner': 'Naiveskill',
    'depends_on_past': False,
    'start_date': datetime(2023, 3, 7),
}

my_dag = DAG(
    'spark_example',
    default_args=default_args, 
    schedule_interval='@daily'
)

spark_test_task = SparkSubmitOperator(
    task_id='spark_test_job',
    conn_id='spark_default',
    application='/<path_to_your_spark_job.py>',
    name='test_spark_job',
    executor_memory='2g',
    num_executors=4,
    driver_memory='1g',
    conf={
        'spark.app.name': 'my_spark_job',
        'spark.executorEnv.PYTHONPATH': '/usr/lib/python3.7/site-packages'
    },
    dag=my_dag,
)

spark_test_task

Airflow Kubernetes operator

The Kubernetes Operator allows you to run Kubernetes objects, such as pods and deployments, as part of your Airflow DAG. To use the Kubernetes Operator, you need to configure it with the connection information for your Kubernetes cluster, such as the cluster API server URL.

Then, you can define a KubernetesPodOperator to specify the Kubernetes object to run and any necessary configuration options.

The KubernetesPodOperator creates and runs a Kubernetes Pod. It allows you to specify the Docker image to use, the command to run in the container, the arguments to pass to the command, and any necessary environment variables and volume mounts.

When the Kubernetes operator is executed as part of your DAG, it creates and submits the specified Kubernetes object to the Kubernetes cluster using the configured connection information.

Let’s take the below example

from datetime import datetime, timedelta
from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from kubernetes.client import models as k8s

default_args = {
    'owner': 'naiveskill',
    'depends_on_past': False,
    'start_date': datetime(2023, 3, 7),
}

my_dag = DAG('kubernetes_example', default_args=default_args, schedule_interval='@daily')

k8s_task = KubernetesPodOperator(
    task_id='kubernetes_sample_pod',
    name='kubernetes_sample_pod',
    image='<docker_image_name:latest>',
    cmds=['python', '/path/to/my/test_script.py'],
    arguments=['--arg1', 'value1', '--arg2', 'value2'],
    env_vars={
        'ENV_VAR_1': 'value1',
        'ENV_VAR_2': 'value2',
    },
    volumes=[
        k8s.V1Volume(
            name='my_volume',
            persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(
                claim_name='PVC_name',
            ),
        ),
    ],
    dag=my_dag,
)

Conclusion

I hope you have liked this tutorial on airflow operators. Please do let me know if you are facing any issues while following along.

More to explore

Airflow Schedular

Airflow database

Understand airflow max_active_runs

Airflow ShortCircuitOperator

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top