Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor out xcom constants from models #30180

Merged
merged 5 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions airflow/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
__all__ = [
"DAG",
"ID_LEN",
"XCOM_RETURN_KEY",
"Base",
"BaseOperator",
"BaseOperatorLink",
Expand Down Expand Up @@ -89,7 +88,6 @@ def __getattr__(name):
__lazy_imports = {
"DAG": "airflow.models.dag",
"ID_LEN": "airflow.models.base",
"XCOM_RETURN_KEY": "airflow.models.xcom",
"Base": "airflow.models.base",
"BaseOperator": "airflow.models.baseoperator",
"BaseOperatorLink": "airflow.models.baseoperator",
Expand Down Expand Up @@ -143,4 +141,4 @@ def __getattr__(name):
from airflow.models.taskreschedule import TaskReschedule
from airflow.models.trigger import Trigger
from airflow.models.variable import Variable
from airflow.models.xcom import XCOM_RETURN_KEY, XCom
from airflow.models.xcom import XCom
2 changes: 1 addition & 1 deletion airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
from airflow.models.pool import Pool
from airflow.models.taskinstance import TaskInstance, clear_task_instances
from airflow.models.taskmixin import DAGNode, DependencyMixin
from airflow.models.xcom import XCOM_RETURN_KEY
from airflow.serialization.enums import DagAttributeTypes
from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep
Expand All @@ -92,6 +91,7 @@
from airflow.utils.setup_teardown import SetupTeardownContext
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.weight_rule import WeightRule
from airflow.utils.xcom import XCOM_RETURN_KEY

if TYPE_CHECKING:
import jinja2 # Slow import.
Expand Down
3 changes: 2 additions & 1 deletion airflow/models/mappedoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
from airflow.utils.operator_resources import Resources
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.types import NOTSET
from airflow.utils.xcom import XCOM_RETURN_KEY

if TYPE_CHECKING:
import jinja2 # Slow import.
Expand Down Expand Up @@ -111,7 +112,7 @@ def validate_mapping_kwargs(op: type[BaseOperator], func: ValidationSource, valu


def ensure_xcomarg_return_value(arg: Any) -> None:
from airflow.models.xcom_arg import XCOM_RETURN_KEY, XComArg
from airflow.models.xcom_arg import XComArg

if isinstance(arg, XComArg):
for operator, key in arg.iter_references():
Expand Down
3 changes: 2 additions & 1 deletion airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
from airflow.models.taskfail import TaskFail
from airflow.models.taskmap import TaskMap
from airflow.models.taskreschedule import TaskReschedule
from airflow.models.xcom import XCOM_RETURN_KEY, LazyXComAccess, XCom
from airflow.models.xcom import LazyXComAccess, XCom
from airflow.plugins_manager import integrate_macros_plugins
from airflow.sentry import Sentry
from airflow.stats import Stats
Expand Down Expand Up @@ -122,6 +122,7 @@
)
from airflow.utils.state import DagRunState, State, TaskInstanceState
from airflow.utils.timeout import timeout
from airflow.utils.xcom import XCOM_RETURN_KEY

TR = TaskReschedule

Expand Down
12 changes: 7 additions & 5 deletions airflow/models/xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,14 @@
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import UtcDateTime

log = logging.getLogger(__name__)
# XCom constants below are needed for providers backward compatibility,
# which should import the constants directly after apache-airflow>=2.6.0
from airflow.utils.xcom import (
MAX_XCOM_SIZE, # noqa: F401
XCOM_RETURN_KEY,
)

# MAX XCOM Size is 48KB
# https://github.com/apache/airflow/pull/1618#discussion_r68249677
MAX_XCOM_SIZE = 49344
XCOM_RETURN_KEY = "return_value"
log = logging.getLogger(__name__)

if TYPE_CHECKING:
from airflow.models.taskinstance import TaskInstanceKey
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/xcom_arg.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@
from airflow.models.abstractoperator import AbstractOperator
from airflow.models.mappedoperator import MappedOperator
from airflow.models.taskmixin import DAGNode, DependencyMixin
from airflow.models.xcom import XCOM_RETURN_KEY
from airflow.utils.context import Context
from airflow.utils.edgemodifier import EdgeModifier
from airflow.utils.mixins import ResolveMixin
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.types import NOTSET, ArgNotSet
from airflow.utils.xcom import XCOM_RETURN_KEY

if TYPE_CHECKING:
from airflow.models.dag import DAG
Expand Down
24 changes: 24 additions & 0 deletions airflow/utils/xcom.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# MAX XCOM Size is 48KB
# https://github.com/apache/airflow/pull/1618#discussion_r68249677
from __future__ import annotations

MAX_XCOM_SIZE = 49344
XCOM_RETURN_KEY = "return_value"
2 changes: 1 addition & 1 deletion tests/decorators/test_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
from airflow.models.mappedoperator import MappedOperator
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskmap import TaskMap
from airflow.models.xcom import XCOM_RETURN_KEY
from airflow.models.xcom_arg import PlainXComArg, XComArg
from airflow.utils import timezone
from airflow.utils.state import State
from airflow.utils.task_group import TaskGroup
from airflow.utils.types import DagRunType
from airflow.utils.xcom import XCOM_RETURN_KEY
from tests.operators.test_python import BasePythonTest

DEFAULT_DATE = timezone.datetime(2016, 1, 1)
Expand Down
2 changes: 1 addition & 1 deletion tests/models/test_mappedoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
from airflow.models.param import ParamsDict
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskmap import TaskMap
from airflow.models.xcom import XCOM_RETURN_KEY
from airflow.models.xcom_arg import XComArg
from airflow.utils.state import TaskInstanceState
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.xcom import XCOM_RETURN_KEY
from tests.models import DEFAULT_DATE
from tests.test_utils.mapping import expand_mapped_task
from tests.test_utils.mock_operators import MockOperator, MockOperatorWithNestedFields, NestedFields
Expand Down
5 changes: 3 additions & 2 deletions tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
from airflow.models.taskmap import TaskMap
from airflow.models.taskreschedule import TaskReschedule
from airflow.models.variable import Variable
from airflow.models.xcom import XCOM_RETURN_KEY, LazyXComAccess, XCom
from airflow.models.xcom import LazyXComAccess, XCom
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
Expand All @@ -84,6 +84,7 @@
from airflow.utils.state import State, TaskInstanceState
from airflow.utils.task_group import TaskGroup
from airflow.utils.types import DagRunType
from airflow.utils.xcom import XCOM_RETURN_KEY
from airflow.version import version
from tests.models import DEFAULT_DATE, TEST_DAGS_FOLDER
from tests.test_utils import db
Expand Down Expand Up @@ -1483,7 +1484,7 @@ def test_xcom_push_flag(self, dag_maker):
ti = dag_maker.create_dagrun(execution_date=timezone.utcnow()).task_instances[0]
ti.task = task
ti.run()
assert ti.xcom_pull(task_ids=task_id, key=models.XCOM_RETURN_KEY) is None
assert ti.xcom_pull(task_ids=task_id, key=XCOM_RETURN_KEY) is None

def test_post_execute_hook(self, dag_maker):
"""
Expand Down
3 changes: 2 additions & 1 deletion tests/models/test_xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@
from airflow.configuration import conf
from airflow.models.dagrun import DagRun, DagRunType
from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
from airflow.models.xcom import XCOM_RETURN_KEY, BaseXCom, XCom, resolve_xcom_backend
from airflow.models.xcom import BaseXCom, XCom, resolve_xcom_backend
from airflow.operators.empty import EmptyOperator
from airflow.settings import json
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.xcom import XCOM_RETURN_KEY
from tests.test_utils.config import conf_vars


Expand Down
3 changes: 2 additions & 1 deletion tests/serialization/test_dag_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
from airflow.models.expandinput import EXPAND_INPUT_EMPTY
from airflow.models.mappedoperator import MappedOperator
from airflow.models.param import Param, ParamsDict
from airflow.models.xcom import XCOM_RETURN_KEY, XCom
from airflow.models.xcom import XCom
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.security import permissions
Expand All @@ -65,6 +65,7 @@
from airflow.utils.context import Context
from airflow.utils.operator_resources import Resources
from airflow.utils.task_group import TaskGroup
from airflow.utils.xcom import XCOM_RETURN_KEY
from tests.test_utils.config import conf_vars
from tests.test_utils.mock_operators import CustomOperator, GoogleLink, MockOperator
from tests.test_utils.timetables import CustomSerializationTimetable, cron_timetable, delta_timetable
Expand Down