Skip to content

Commit

Permalink
Fix warning when using xcomarg dependencies (#26801)
Browse files Browse the repository at this point in the history
This warning was invisible before 2.4 due to a bug in our logging config
(fixed by commit 7363e35) and AIP-45 which suddenly made this appear.

The problem was being caused by set_xcomargs_dependencies being called
once for each class in the hierarchy, and each of them doing the same
logic.

The fix is to look at the _actual_ function of `self.__init__` and
compare it to the function we're about to call so that we don't set
dependencies until we have finished the "outer" most class's
apply_defaults invocation.
  • Loading branch information
ashb authored Sep 30, 2022
1 parent 14b38d7 commit d77f056
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 2 deletions.
5 changes: 3 additions & 2 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,8 +410,9 @@ def apply_defaults(self: BaseOperator, *args: Any, **kwargs: Any) -> Any:
# Store the args passed to init -- we need them to support task.map serialzation!
self._BaseOperator__init_kwargs.update(kwargs) # type: ignore

if not instantiated_from_mapped:
# Set upstream task defined by XComArgs passed to template fields of the operator.
# Set upstream task defined by XComArgs passed to template fields of the operator.
# BUT: only do this _ONCE_, not once for each class in the hierarchy
if not instantiated_from_mapped and func == self.__init__.__wrapped__: # type: ignore[misc]
self.set_xcomargs_dependencies()
# Mark instance as instantiated.
self._BaseOperator__instantiated = True
Expand Down
15 changes: 15 additions & 0 deletions tests/decorators/test_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -817,3 +817,18 @@ def down(a, b):
assert len(decision.schedulable_tis) == 1 # "down"
decision.schedulable_tis[0].run(session=session)
assert result == "'example' None"


@pytest.mark.filterwarnings("error")
def test_no_warnings(reset_logging_config, caplog):
@task_decorator
def some_task():
return 1

@task_decorator
def other(x):
...

with DAG(dag_id='test', start_date=DEFAULT_DATE, schedule=None):
other(some_task())
assert caplog.messages == []

0 comments on commit d77f056

Please sign in to comment.