Skip to content

Commit

Permalink
Fix bug in the implementation for example_dag4
Browse files Browse the repository at this point in the history
  • Loading branch information
tatiana committed Dec 4, 2024
1 parent c75eb48 commit 10cc78b
Showing 1 changed file with 10 additions and 6 deletions.
16 changes: 10 additions & 6 deletions dagfactory/dagbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -852,19 +852,23 @@ def topological_sort_tasks(tasks_configs: dict[str, Any]) -> list[tuple(str, Any
(https://en.wikipedia.org/wiki/Topological_sorting#Kahn's_algorithm)
The complexity is O(N + D) where N: total tasks and D: number of dependencies.
:returns: topologically sorted list containing tuples (task name, task config)
"""
# Step 1: Build the downstream (adjacency) tasks list and the upstream dependencies (in-degree) count
downstream_tasks = {}
upstream_dependencies_count = {}

for task_id, _ in tasks_configs.items():
downstream_tasks[task_id] = []
upstream_dependencies_count[task_id] = 0
for task_name, _ in tasks_configs.items():
downstream_tasks[task_name] = []
upstream_dependencies_count[task_name] = 0

for task_id, task_conf in tasks_configs.items():
for task_name, task_conf in tasks_configs.items():
for upstream_task in task_conf.get("dependencies", []):
downstream_tasks[upstream_task].append(task_id)
upstream_dependencies_count[task_id] += 1
# there are cases when dependencies contains references to TaskGroups and not Tasks - we skip those
if upstream_task in tasks_configs:
downstream_tasks[upstream_task].append(task_name)
upstream_dependencies_count[task_name] += 1

# Step 2: Find all tasks with no dependencies
tasks_without_dependencies = [
Expand Down

0 comments on commit 10cc78b

Please sign in to comment.