Issue
I'm trying to use the PythonVirtualenvOperator from airflow to create an virtual environment for a specific task to run. I'm just replicating the example given in the airflow documentation first,
My Main py and Dag py are two different files,
Main.py file looks like below :
from airflow.decorators import task
@task.virtualenv(
task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=True
)
def Main_func():
from time import sleep
from colorama import Back, Fore, Style
print(Fore.RED + "some red text")
print(Back.GREEN + "and with a green background")
print(Style.DIM + "and in dim text")
print(Style.RESET_ALL)
for _ in range(4):
print(Style.DIM + "Please wait...", flush=True)
sleep(1)
print("Finished")
task_dag.py looks like below :
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from Main import Main_func
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'max_active_runs': 1,
}
Main_Mod_dag = DAG(
'Main_Mod_Run',
catchup=False,
default_args=default_args,
description='Main Module Run',
schedule_interval='48 11 * * 3',
start_date=datetime(2022, 12, 11),
tags=['Main_Mod'],
)
Main_Mod_Func = PythonOperator(task_id='Main_Mod', python_callable=Main_func, dag=Main_Mod_dag)
Main_Mod_Func
Expected Results: successful schedule -> Run -> all the print statements in the log
**Error : **
[2023-05-10, 10:47:13 UTC] {python.py:177} INFO - Done. Returned value was: {{ task_instance.xcom_pull(task_ids='virtualenv_python', dag_id='adhoc_airflow', key='return_value') }} [2023-05-10, 10:47:13 UTC] {taskinstance.py:1853} ERROR - Task failed with exception Traceback (most recent call last): File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 72, in wrapper return func(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 2381, in xcom_push XCom.set( File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 72, in wrapper return func(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/xcom.py", line 206, in set value = cls.serialize_value( File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/xcom.py", line 595, in serialize_value return pickle.dumps(value) _pickle.PicklingError: Can't pickle <function Main_func at 0x7fe5c18dba60>: it's not the same object as Main.Main_func
Solution
You cannot call decorated functions in python_callable.
See some examples how you can do
https://github.com/apache/airflow/blob/main/airflow/example_dags/example_python_operator.py
In your code:
from airflow.decorators import dag
from datetime import datetime
from Main import Main_func
default_args = {
"owner": "airflow",
"depends_on_past": False,
"max_active_runs": 1,
}
@dag(
dag_id="Main_Mod_Run",
catchup=False,
default_args=default_args,
description="Main Module Run",
schedule_interval="48 11 * * 3",
start_date=datetime(2022, 12, 11),
tags=["Main_Mod"],
)
def my_dag():
Main_Mod_Func = Main_func()
my_dag = my_dag()
Answered By - Ruscinc Answer Checked By - David Goodson (WPSolving Volunteer)