diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py index c750757cb048d..281ffcfd67654 100644 --- a/airflow/models/__init__.py +++ b/airflow/models/__init__.py @@ -22,7 +22,6 @@ __all__ = [ "DAG", "ID_LEN", - "XCOM_RETURN_KEY", "Base", "BaseOperator", "BaseOperatorLink", @@ -89,7 +88,6 @@ def __getattr__(name): __lazy_imports = { "DAG": "airflow.models.dag", "ID_LEN": "airflow.models.base", - "XCOM_RETURN_KEY": "airflow.models.xcom", "Base": "airflow.models.base", "BaseOperator": "airflow.models.baseoperator", "BaseOperatorLink": "airflow.models.baseoperator", @@ -143,4 +141,4 @@ def __getattr__(name): from airflow.models.taskreschedule import TaskReschedule from airflow.models.trigger import Trigger from airflow.models.variable import Variable - from airflow.models.xcom import XCOM_RETURN_KEY, XCom + from airflow.models.xcom import XCom diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index a120168685381..d78152d9906e9 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -75,7 +75,6 @@ from airflow.models.pool import Pool from airflow.models.taskinstance import TaskInstance, clear_task_instances from airflow.models.taskmixin import DAGNode, DependencyMixin -from airflow.models.xcom import XCOM_RETURN_KEY from airflow.serialization.enums import DagAttributeTypes from airflow.ti_deps.deps.base_ti_dep import BaseTIDep from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep @@ -92,6 +91,7 @@ from airflow.utils.setup_teardown import SetupTeardownContext from airflow.utils.trigger_rule import TriggerRule from airflow.utils.weight_rule import WeightRule +from airflow.utils.xcom import XCOM_RETURN_KEY if TYPE_CHECKING: import jinja2 # Slow import. diff --git a/airflow/models/mappedoperator.py b/airflow/models/mappedoperator.py index 2cbd46cd57384..a10fd10cdc04b 100644 --- a/airflow/models/mappedoperator.py +++ b/airflow/models/mappedoperator.py @@ -66,6 +66,7 @@ from airflow.utils.operator_resources import Resources from airflow.utils.trigger_rule import TriggerRule from airflow.utils.types import NOTSET +from airflow.utils.xcom import XCOM_RETURN_KEY if TYPE_CHECKING: import jinja2 # Slow import. @@ -111,7 +112,7 @@ def validate_mapping_kwargs(op: type[BaseOperator], func: ValidationSource, valu def ensure_xcomarg_return_value(arg: Any) -> None: - from airflow.models.xcom_arg import XCOM_RETURN_KEY, XComArg + from airflow.models.xcom_arg import XComArg if isinstance(arg, XComArg): for operator, key in arg.iter_references(): diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 0406d2158ba0e..e4dbcb0e5eeb4 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -93,7 +93,7 @@ from airflow.models.taskfail import TaskFail from airflow.models.taskmap import TaskMap from airflow.models.taskreschedule import TaskReschedule -from airflow.models.xcom import XCOM_RETURN_KEY, LazyXComAccess, XCom +from airflow.models.xcom import LazyXComAccess, XCom from airflow.plugins_manager import integrate_macros_plugins from airflow.sentry import Sentry from airflow.stats import Stats @@ -122,6 +122,7 @@ ) from airflow.utils.state import DagRunState, State, TaskInstanceState from airflow.utils.timeout import timeout +from airflow.utils.xcom import XCOM_RETURN_KEY TR = TaskReschedule diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py index 1ecd21f405e49..a9ecae27a4ad9 100644 --- a/airflow/models/xcom.py +++ b/airflow/models/xcom.py @@ -58,12 +58,14 @@ from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.sqlalchemy import UtcDateTime -log = logging.getLogger(__name__) +# XCom constants below are needed for providers backward compatibility, +# which should import the constants directly after apache-airflow>=2.6.0 +from airflow.utils.xcom import ( + MAX_XCOM_SIZE, # noqa: F401 + XCOM_RETURN_KEY, +) -# MAX XCOM Size is 48KB -# https://github.com/apache/airflow/pull/1618#discussion_r68249677 -MAX_XCOM_SIZE = 49344 -XCOM_RETURN_KEY = "return_value" +log = logging.getLogger(__name__) if TYPE_CHECKING: from airflow.models.taskinstance import TaskInstanceKey diff --git a/airflow/models/xcom_arg.py b/airflow/models/xcom_arg.py index 133fd4280bb42..7f65cd3d945dc 100644 --- a/airflow/models/xcom_arg.py +++ b/airflow/models/xcom_arg.py @@ -28,12 +28,12 @@ from airflow.models.abstractoperator import AbstractOperator from airflow.models.mappedoperator import MappedOperator from airflow.models.taskmixin import DAGNode, DependencyMixin -from airflow.models.xcom import XCOM_RETURN_KEY from airflow.utils.context import Context from airflow.utils.edgemodifier import EdgeModifier from airflow.utils.mixins import ResolveMixin from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.types import NOTSET, ArgNotSet +from airflow.utils.xcom import XCOM_RETURN_KEY if TYPE_CHECKING: from airflow.models.dag import DAG diff --git a/airflow/utils/xcom.py b/airflow/utils/xcom.py new file mode 100644 index 0000000000000..b05b881fa63a3 --- /dev/null +++ b/airflow/utils/xcom.py @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# MAX XCOM Size is 48KB +# https://github.com/apache/airflow/pull/1618#discussion_r68249677 +from __future__ import annotations + +MAX_XCOM_SIZE = 49344 +XCOM_RETURN_KEY = "return_value" diff --git a/tests/decorators/test_python.py b/tests/decorators/test_python.py index 20dc883106ed0..2da1a0dd2a9fd 100644 --- a/tests/decorators/test_python.py +++ b/tests/decorators/test_python.py @@ -32,12 +32,12 @@ from airflow.models.mappedoperator import MappedOperator from airflow.models.taskinstance import TaskInstance from airflow.models.taskmap import TaskMap -from airflow.models.xcom import XCOM_RETURN_KEY from airflow.models.xcom_arg import PlainXComArg, XComArg from airflow.utils import timezone from airflow.utils.state import State from airflow.utils.task_group import TaskGroup from airflow.utils.types import DagRunType +from airflow.utils.xcom import XCOM_RETURN_KEY from tests.operators.test_python import BasePythonTest DEFAULT_DATE = timezone.datetime(2016, 1, 1) diff --git a/tests/models/test_mappedoperator.py b/tests/models/test_mappedoperator.py index bdfcf8bc7f809..931d262d24c7a 100644 --- a/tests/models/test_mappedoperator.py +++ b/tests/models/test_mappedoperator.py @@ -28,10 +28,10 @@ from airflow.models.param import ParamsDict from airflow.models.taskinstance import TaskInstance from airflow.models.taskmap import TaskMap -from airflow.models.xcom import XCOM_RETURN_KEY from airflow.models.xcom_arg import XComArg from airflow.utils.state import TaskInstanceState from airflow.utils.trigger_rule import TriggerRule +from airflow.utils.xcom import XCOM_RETURN_KEY from tests.models import DEFAULT_DATE from tests.test_utils.mapping import expand_mapped_task from tests.test_utils.mock_operators import MockOperator, MockOperatorWithNestedFields, NestedFields diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 6c331e5ae7361..2d86cdfde5ffe 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -63,7 +63,7 @@ from airflow.models.taskmap import TaskMap from airflow.models.taskreschedule import TaskReschedule from airflow.models.variable import Variable -from airflow.models.xcom import XCOM_RETURN_KEY, LazyXComAccess, XCom +from airflow.models.xcom import LazyXComAccess, XCom from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator from airflow.operators.python import PythonOperator @@ -84,6 +84,7 @@ from airflow.utils.state import State, TaskInstanceState from airflow.utils.task_group import TaskGroup from airflow.utils.types import DagRunType +from airflow.utils.xcom import XCOM_RETURN_KEY from airflow.version import version from tests.models import DEFAULT_DATE, TEST_DAGS_FOLDER from tests.test_utils import db @@ -1483,7 +1484,7 @@ def test_xcom_push_flag(self, dag_maker): ti = dag_maker.create_dagrun(execution_date=timezone.utcnow()).task_instances[0] ti.task = task ti.run() - assert ti.xcom_pull(task_ids=task_id, key=models.XCOM_RETURN_KEY) is None + assert ti.xcom_pull(task_ids=task_id, key=XCOM_RETURN_KEY) is None def test_post_execute_hook(self, dag_maker): """ diff --git a/tests/models/test_xcom.py b/tests/models/test_xcom.py index 5dc153dee13f9..6abdd344acaed 100644 --- a/tests/models/test_xcom.py +++ b/tests/models/test_xcom.py @@ -28,11 +28,12 @@ from airflow.configuration import conf from airflow.models.dagrun import DagRun, DagRunType from airflow.models.taskinstance import TaskInstance, TaskInstanceKey -from airflow.models.xcom import XCOM_RETURN_KEY, BaseXCom, XCom, resolve_xcom_backend +from airflow.models.xcom import BaseXCom, XCom, resolve_xcom_backend from airflow.operators.empty import EmptyOperator from airflow.settings import json from airflow.utils import timezone from airflow.utils.session import create_session +from airflow.utils.xcom import XCOM_RETURN_KEY from tests.test_utils.config import conf_vars diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index 622cd9f5fcb5b..96cf19aa25b37 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -47,7 +47,7 @@ from airflow.models.expandinput import EXPAND_INPUT_EMPTY from airflow.models.mappedoperator import MappedOperator from airflow.models.param import Param, ParamsDict -from airflow.models.xcom import XCOM_RETURN_KEY, XCom +from airflow.models.xcom import XCom from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator from airflow.security import permissions @@ -65,6 +65,7 @@ from airflow.utils.context import Context from airflow.utils.operator_resources import Resources from airflow.utils.task_group import TaskGroup +from airflow.utils.xcom import XCOM_RETURN_KEY from tests.test_utils.config import conf_vars from tests.test_utils.mock_operators import CustomOperator, GoogleLink, MockOperator from tests.test_utils.timetables import CustomSerializationTimetable, cron_timetable, delta_timetable