In this blog, we will learn about airflow PythonOperator.we will understand airflow PythonOperator with several examples.so let’s get started:
What is airflow PythonOperator?
In Airflow, the PythonOperator executes a Python callable task within a DAG.PythonOperator allows you to define a custom Python function that performs some specific and then run that function as a task in your DAG.
In PythonOperator, you can define a python function and can pass that function under the python_callable parameter. You can also specify parameters and keyword arguments to the function.
How to import airflow PythonOperator
To use the PythonOperator, you need to import it from the airflow.operators.python_operator module. Here’s an example of how to import the PythonOperator.
from airflow.operators.python_operator import PythonOperator
Once you have imported the PythonOperator, you can create an instance of it to define a task in your DAG.
Below is an example of how to define a PythonOperator
task in a DAG:
from airflow.operators.python_operator import PythonOperator def python_fun(): //code dag = DAG( //DAG defination ) python_task = PythonOperator( task_id='<task>', python_callable=python_fun, dag=dag )
Airflow PythonOperator arguments
The PythonOperator in Apache Airflow takes several arguments to customize its behavior. Some of the most commonly used arguments:
- task_id: (Required)A unique identifier for the task.
- python_callable: (Required)A Python function will be executed when the task is run.
- dag: (Required)The DAG object to which the task belongs.
- op_args: A list of arguments that will be passed to the python_callable function when the operator calls it.
- op_kwargs: A dictionary of keyword arguments that will be passed to the python_callable function when the operator calls it.
- provide_context: A Boolean flag that indicates whether the operator should be passed the Airflow context.
- templates_dict: A dictionary of key-value pairs that can be used to pass template variables to the python_callable function.
- templates_exts: A list of file extensions that should be considered templates by the operator.
- templates_context: A dictionary of key-value pairs that will be used to render templates.
Airflow PythonOperator examples
In this session, we will understand PythonOperator in airflow with several examples.
Basic airflow PythonOperator example
Below is an example of simple airflow PythonOperator implementation.
from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import datetime def test_function(): print("hello world python") my_dag = DAG( 'simple_python_dag', start_date=datetime(2023, 2, 1), schedule_interval='@daily' ) python_task = PythonOperator( task_id='python_task', python_callable=test_function, dag=my_dag )
The DAG is named ‘simple_python_dag’, and it is scheduled to run daily starting from February 1, 2023. The PythonOperator, named ‘python_task’, is defined to execute the function ‘test_function’ when the DAG is triggered.
The function ‘test_function’ simply prints out the message “hello world python” when it is executed.
Overall, this DAG is a very basic example of how to define and schedule a task in Airflow using a PythonOperator and it could be used as a starting point
Airflow PythonOperator with op_args parameter
You can use the op_args
parameter of the PythonOperator
to pass arguments to the Python function that will be executed as a task.The op_args
parameter is a list of positional arguments that will be passed to the python_callable
function.
Below is an example of how to use op_args
in a PythonOperator
from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import datetime def test_function(name,age): print(f"My name is {name},and i am {age} years old") my_dag = DAG( 'op_args_python_dag', start_date=datetime(2023, 2, 1), schedule_interval='@daily' ) python_task = PythonOperator( task_id='python_task', python_callable=test_function, op_args=['jack', 32], dag=my_dag )
In the above example, we define a Python function test_function that takes two arguments, name, and age. The function prints a string that includes the values of name and age.
We create a PythonOperator called python_task that runs the test_function function, passing in the arguments ‘jack’ and 32 using the op_args parameter. When the python_task operator runs, it will call test_function with the arguments ‘jack’ and 32, and the output will be:
Airflow PythonOperator with op_kwargs parameter
In addition to the op_args parameter, the user can also use the op_kwargs parameter in the PythonOperator to pass keyword arguments to the Python function.
The op_kwargs parameter is a dictionary of keyword arguments that will be passed to the python_callable function. Below is an example of how to use op_kwargs in a PythonOperator.
from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import datetime def test_function(name,age): print(f"My name is {name},and i am {age} years old") my_dag = DAG( 'op_kwargs_python_dag', start_date=datetime(2023, 2, 1), schedule_interval='@daily' ) python_task = PythonOperator( task_id='python_task', python_callable=test_function, op_kwargs={'name':'jack', 'age':32}, dag=my_dag )
In the above example, we define a Python function test_function that takes two keyword arguments, name, and age. We create a PythonOperator called python_task that runs the test_function function, passing in the keyword arguments {‘name’: ‘jack’, ‘age’: 32} using the op_kwargs parameter. When the python_task operator runs, it will call test_function with the keyword arguments and the output will be
Airflow PythonOperator with provide_context
In Airflow the PythonOperator has a provide_context argument, when set to True, allows the operator to access the Airflow context when it runs the Python function. The provide_context can be useful for passing information between tasks or for customizing the behavior of a task based on the current state of the DAG.
To use the provide_context argument, you need to define a Python function that takes the Airflow context as its first argument. Let’s take the below example
from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import datetime def test_function(**context): print(f"Execution date is {context['ds']}") print(f"DAG ID is {context['dag'].dag_id}") my_dag = DAG( 'python_context_dag', start_date=datetime(2023, 2, 1), schedule_interval='@daily' ) python_task = PythonOperator( task_id='python_task', python_callable=test_function, provide_context=True, dag=my_dag )
In the above example, we have created a PythonOperator called python_task that runs the test_function function and sets the provide_context parameter to True. When the python_task operator runs, it will pass context information including the execution date and the DAG ID to the test_function. The output will be
Conclusion
Overall, the PythonOperator is a key component of Airflow that allows you to define complex, custom tasks using Python code. With its flexibility and power, the PythonOperator is an essential tool for any data pipeline that requires custom logic or interaction with external systems.
Please do let me know if you are facing any issues while following along.
More to Explore
How to send email from airflow
How to integrate airflow with slack