Skip to content

Commit

Permalink
Remove found_descendents param from get_flat_relative_ids (#31559)
Browse files Browse the repository at this point in the history
By the looks of it, this param is unused.  Since the class is designated private, it is permissable to remove it.

(cherry picked from commit 0cbc0dc)
  • Loading branch information
dstandish authored and eladkal committed Jun 9, 2023
1 parent 7295524 commit 2de0656
Showing 1 changed file with 13 additions and 12 deletions.
25 changes: 13 additions & 12 deletions airflow/models/abstractoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,37 +154,38 @@ def get_direct_relative_ids(self, upstream: bool = False) -> set[str]:
return self.upstream_task_ids
return self.downstream_task_ids

def get_flat_relative_ids(
self,
upstream: bool = False,
found_descendants: set[str] | None = None,
) -> set[str]:
"""Get a flat set of relative IDs, upstream or downstream."""
def get_flat_relative_ids(self, *, upstream: bool = False) -> set[str]:
"""
Get a flat set of relative IDs, upstream or downstream.
Will recurse each relative found in the direction specified.
:param upstream: Whether to look for upstream or downstream relatives.
"""
dag = self.get_dag()
if not dag:
return set()

if found_descendants is None:
found_descendants = set()
relatives: set[str] = set()

task_ids_to_trace = self.get_direct_relative_ids(upstream)
while task_ids_to_trace:
task_ids_to_trace_next: set[str] = set()
for task_id in task_ids_to_trace:
if task_id in found_descendants:
if task_id in relatives:
continue
task_ids_to_trace_next.update(dag.task_dict[task_id].get_direct_relative_ids(upstream))
found_descendants.add(task_id)
relatives.add(task_id)
task_ids_to_trace = task_ids_to_trace_next

return found_descendants
return relatives

def get_flat_relatives(self, upstream: bool = False) -> Collection[Operator]:
"""Get a flat list of relatives, either upstream or downstream."""
dag = self.get_dag()
if not dag:
return set()
return [dag.task_dict[task_id] for task_id in self.get_flat_relative_ids(upstream)]
return [dag.task_dict[task_id] for task_id in self.get_flat_relative_ids(upstream=upstream)]

def _iter_all_mapped_downstreams(self) -> Iterator[MappedOperator | MappedTaskGroup]:
"""Return mapped nodes that are direct dependencies of the current task.
Expand Down

0 comments on commit 2de0656

Please sign in to comment.