Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expand() usage does not pickup .output if task with expand comes before in yaml def/dict keys of task referenced in expand #225

Closed
StephenOTT opened this issue Oct 1, 2024 · 2 comments · Fixed by #307
Assignees
Labels
bug Something isn't working

Comments

@StephenOTT
Copy link

StephenOTT commented Oct 1, 2024

If you have something like

Tasks:
   Task1:
      ...
      Dependencies: [Task2]
      Expand: 
         Some_arg: Task2.output

   Task2:
      ...

Then when dagbuilder parses the tank/duct is does not properly change the expand arg to a referenced Xcom and it remains as a are

@tatiana tatiana added this to the DAG Factory 0.21.0 milestone Oct 21, 2024
@tatiana
Copy link
Collaborator

tatiana commented Oct 21, 2024

Thanks a lot for reporting this issue, @StephenOTT ! Would you be willing to contribute a fix? We should add an example DAG that leverages this so we can avoid regressions, once we fix this.

@tatiana
Copy link
Collaborator

tatiana commented Dec 4, 2024

I implemented a solution to solve the problem, which seems to have an acceptable asymptotic complexity - O(N + D) where N is the total amount of tasks and D is the total amount of dependencies: #307

The issue described affected any YAML files that declared upstream tasks after downstream tasks, regardless of using dynamic task mapping.

tatiana added a commit that referenced this issue Dec 6, 2024
Any YAML files that declare upstream tasks after downstream tasks,
regardless of using dynamic task mapping, would fail.

Example of DAG that would fail:
```
test_expand:
  default_args:
    owner: "custom_owner"
    start_date: 2 days
  description: "test expand"
  schedule_interval: "0 3 * * *"
  default_view: "graph"
  tasks:
    process:
      operator: airflow.operators.python_operator.PythonOperator
      python_callable_name: expand_task
      python_callable_file: $CONFIG_ROOT_DIR/expand_tasks.py
      partial:
        op_kwargs:
          test_id: "test"
      expand:
        op_args:
          request.output
      dependencies: [request]
    request:
      operator: airflow.operators.python.PythonOperator
      python_callable_name: example_task_mapping
      python_callable_file: $CONFIG_ROOT_DIR/expand_tasks.py
```

In this example, the upstream (parent) task "request" is defined after
the downstream (child) task "process". Before this change, this DAG
would fail.

I implemented a solution to solve the problem that uses Kahn's algorithm
to sort the tasks topologically:
https://en.wikipedia.org/wiki/Topological_sorting#Kahn's_algorithm

It has asymptotic complexity O(N + D), where N is the total number of
tasks, and D is the total number of dependencies. This complexity seems
acceptable.

An alternative to the current approach would be to create all the tasks
without dependencies as a starting point and add the dependencies once
all tasks were made - similar to what we did in
https://github.com/astronomer/astronomer-cosmos. However, this approach
would require a bigger refactor of the DAG factory and may have issues
with dynamic task mapping.

Closes: #225
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants