airflow PythonOperator

Airflow PythonOperator with examples in 2023

In this blog, we will learn about airflow PythonOperator.we will understand airflow PythonOperator with several examples.so let’s get started:

What is airflow PythonOperator?

airflow PythonOperator

In Airflow, the PythonOperator executes a Python callable task within a DAG.PythonOperator allows you to define a custom Python function that performs some specific and then run that function as a task in your DAG.

In PythonOperator, you can define a python function and can pass that function under the python_callable parameter. You can also specify parameters and keyword arguments to the function.

How to import airflow PythonOperator

To use the PythonOperator, you need to import it from the airflow.operators.python_operator module. Here’s an example of how to import the PythonOperator.

from airflow.operators.python_operator import PythonOperator

Once you have imported the PythonOperator, you can create an instance of it to define a task in your DAG.

Below is an example of how to define a PythonOperator task in a DAG:

from airflow.operators.python_operator import PythonOperator

def python_fun():
    //code

dag = DAG(
    //DAG defination
)

python_task = PythonOperator(
    task_id='<task>',
    python_callable=python_fun,
    dag=dag
)

Airflow PythonOperator arguments

The PythonOperator in Apache Airflow takes several arguments to customize its behavior. Some of the most commonly used arguments:

  • task_id: (Required)A unique identifier for the task.
  • python_callable: (Required)A Python function will be executed when the task is run.
  • dag: (Required)The DAG object to which the task belongs.
  • op_args: A list of arguments that will be passed to the python_callable function when the operator calls it.
  • op_kwargs: A dictionary of keyword arguments that will be passed to the python_callable function when the operator calls it.
  • provide_context: A Boolean flag that indicates whether the operator should be passed the Airflow context.
  • templates_dict: A dictionary of key-value pairs that can be used to pass template variables to the python_callable function.
  • templates_exts: A list of file extensions that should be considered templates by the operator.
  • templates_context: A dictionary of key-value pairs that will be used to render templates.

Airflow PythonOperator examples

airflow PythonOperator examples

In this session, we will understand PythonOperator in airflow with several examples.

Basic airflow PythonOperator example

Below is an example of simple airflow PythonOperator implementation.

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def test_function():
    print("hello world python")

my_dag = DAG(
    'simple_python_dag',
    start_date=datetime(2023, 2, 1),
    schedule_interval='@daily'
)

python_task = PythonOperator(
    task_id='python_task',
    python_callable=test_function,
    dag=my_dag
)

The DAG is named ‘simple_python_dag’, and it is scheduled to run daily starting from February 1, 2023. The PythonOperator, named ‘python_task’, is defined to execute the function ‘test_function’ when the DAG is triggered.

The function ‘test_function’ simply prints out the message “hello world python” when it is executed.

Overall, this DAG is a very basic example of how to define and schedule a task in Airflow using a PythonOperator and it could be used as a starting point

Basic airflow PythonOperator example

Airflow PythonOperator with op_args parameter

You can use the op_args parameter of the PythonOperator to pass arguments to the Python function that will be executed as a task.The op_args parameter is a list of positional arguments that will be passed to the python_callable function.

Below is an example of how to use op_args in a PythonOperator

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def test_function(name,age):
    print(f"My name is {name},and i am {age} years old")

my_dag = DAG(
    'op_args_python_dag',
    start_date=datetime(2023, 2, 1),
    schedule_interval='@daily'
)

python_task = PythonOperator(
    task_id='python_task',
    python_callable=test_function,
    op_args=['jack', 32],
    dag=my_dag
)

In the above example, we define a Python function test_function that takes two arguments, name, and age. The function prints a string that includes the values of name and age.

We create a PythonOperator called python_task that runs the test_function function, passing in the arguments ‘jack’ and 32 using the op_args parameter. When the python_task operator runs, it will call test_function with the arguments ‘jack’ and 32, and the output will be:

Airflow PythonOperator with op_args parameter

Airflow PythonOperator with op_kwargs parameter

In addition to the op_args parameter, the user can also use the op_kwargs parameter in the PythonOperator to pass keyword arguments to the Python function.

The op_kwargs parameter is a dictionary of keyword arguments that will be passed to the python_callable function. Below is an example of how to use op_kwargs in a PythonOperator.

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def test_function(name,age):
    print(f"My name is {name},and i am {age} years old")

my_dag = DAG(
    'op_kwargs_python_dag',
    start_date=datetime(2023, 2, 1),
    schedule_interval='@daily'
)

python_task = PythonOperator(
    task_id='python_task',
    python_callable=test_function,
    op_kwargs={'name':'jack', 'age':32},
    dag=my_dag
)

In the above example, we define a Python function test_function that takes two keyword arguments, name, and age. We create a PythonOperator called python_task that runs the test_function function, passing in the keyword arguments {‘name’: ‘jack’, ‘age’: 32} using the op_kwargs parameter. When the python_task operator runs, it will call test_function with the keyword arguments and the output will be

Airflow PythonOperator with op_kwargs parameter

Airflow PythonOperator with provide_context

In Airflow the PythonOperator has a provide_context argument, when set to True, allows the operator to access the Airflow context when it runs the Python function. The provide_context can be useful for passing information between tasks or for customizing the behavior of a task based on the current state of the DAG.

To use the provide_context argument, you need to define a Python function that takes the Airflow context as its first argument. Let’s take the below example

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def test_function(**context):
    print(f"Execution date is {context['ds']}")
    print(f"DAG ID is {context['dag'].dag_id}")

my_dag = DAG(
    'python_context_dag',
    start_date=datetime(2023, 2, 1),
    schedule_interval='@daily'
)

python_task = PythonOperator(
    task_id='python_task',
    python_callable=test_function,
    provide_context=True,
    dag=my_dag
)

In the above example, we have created a PythonOperator called python_task that runs the test_function function and sets the provide_context parameter to True. When the python_task operator runs, it will pass context information including the execution date and the DAG ID to the test_function. The output will be

Airflow PythonOperator with provide_context

Conclusion

Overall, the PythonOperator is a key component of Airflow that allows you to define complex, custom tasks using Python code. With its flexibility and power, the PythonOperator is an essential tool for any data pipeline that requires custom logic or interaction with external systems.

Please do let me know if you are facing any issues while following along.

More to Explore

Airflow BashOperator

How to send email from airflow

How to integrate airflow with slack

Install airflow using the docker

Airflow command line commands

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top