Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fail stop feature for DAGs #29406

Merged
merged 5 commits into from
Apr 23, 2023
Merged

Conversation

RachitSharma2001
Copy link
Contributor

@RachitSharma2001 RachitSharma2001 commented Feb 7, 2023

The corresponding issue: #26854

I have added the fail_fast parameter for DAGs. When this is set, every running task within the DAG will be forcefully failed immediately and any task that hasn't already completed will be set to SKIPPED.

The way I approached this was, within the handle_failure method of the TaskInstance class, if a task reaches a FAILED state and its DAG is in fail_fast mode, then it fails every task that is currently running within its DAG run.

I have added unit tests and have also tested several different fail_fast DAGs. I have confirmed that when a task within these DAGs fails, the currently running tasks within them immediately fail and all other non-completed tasks get skipped, thus quickly stopping the DAG.

One suggestion of how to approach this was to modify base_executor to kill all tasks within a DAG once one task fails. The problem that I found within this approach was that, first of all there are several different executors which all store which tasks are running, queued, or completed differently, making it hard to make a unified function that kills all running tasks on all types of executors. More importantly, through my tests I have found that the executor often doesn't have full information of all the tasks in a dag run when one task fails. What this means is that, if a task fails, the executor may not yet have full information about all the tasks currently running within that DAG, and all tasks currently queued, and thus cannot properly fail the DAG.

Let me know if there are flaws to my approach or if there are any changes that should be added.

@RachitSharma2001 RachitSharma2001 force-pushed the fail-fast-pr branch 3 times, most recently from c50b334 to c824b83 Compare February 9, 2023 03:34
@RachitSharma2001 RachitSharma2001 marked this pull request as ready for review February 9, 2023 15:57
@RachitSharma2001 RachitSharma2001 force-pushed the fail-fast-pr branch 2 times, most recently from 3439824 to b94119d Compare February 14, 2023 19:09
@RachitSharma2001
Copy link
Contributor Author

Hi everyone, I was wondering if I could get some feedback on the approach I used for adding this feature. Does the approach look good, or do you have any suggestions for changes?

@RachitSharma2001
Copy link
Contributor Author

Hi everybody, it looks like all the tests pass now. I was wondering if I could get some feedback on the approach I used for adding this feature. Does the approach look good, or do you have any suggestions for changes?

):
continue
if ti.state == TaskInstanceState.RUNNING:
ti.error(session)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shoudl add some logging telling that we are doing it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise it will be quite magical

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good! I have added a logging statement for when a running task is being force failed, and when a task is being set to the skipped state. Let me know if these log statements are good or if there are anything to change.

@potiuk
Copy link
Member

potiuk commented Feb 20, 2023

@ashb @uranusjr - can you think of some hidden side-effects. Seems like this implementation of "fail-fast" is simple and might just work - unless I missed something.

@uranusjr
Copy link
Member

How should this interact with trigger rules?

@RachitSharma2001
Copy link
Contributor Author

Hi everyone, does anyone have an opinion of how the fail fast dag should interact with the trigger rules? Should there be some tests for this? Or maybe documentation to outline how to use a fail fast DAG with trigger rules?

@potiuk
Copy link
Member

potiuk commented Mar 1, 2023

I think it needs a bit of anylysis and design. It's hard to get the logic around it without viualising it, but @uranusjr is right that some thinking should be done around it. I guess we have two options:

  • fail everything regardless from the rules (this is the current solution)
  • be "smart" about it and include triggering rules in it

How the "smart" should look like - hard to say. probably each rule should be analysed how it should behave and good strategy for each rule shoudl be worked out (if that's even possible) - trying to logically attempt to follow the rules and choose for example a strategy of failing currently run tasks and propagating it furhter. That would be rather complex and there are some non-obvious traps we can fall in (what happens with the tasks that succeed when one failed?). And what about the tasks that follow them?

I think it very much depends on the definition of "fail fast". If we define it as "fail all the tasks if any of those tasks fail", then current solution is good. If we try to follow triggering rules, then well, it's complex and likely "fali fast" is hard to define in general ("fail fast all the currently eligible tasks and propagagate it across the DAG").

But maybe there is a middle-ground. Maybe we can only make fail-fast work if the only triggering rules we have are "all_success" (i.e. default). And if you add another triggering rule to a "fail-fast" DAG, this could cause an error.

I think there is a big class of DAGs which fall into that category and those are precisely the DAGs where "fail fast" defined as "fail all the tasks if any of them fail" makes perfect sense.

@uranusjr ? WDYT? I guess that would only require to add the error in case any non-default triggering rule is added for "fail-fast" dag (and properly documenting it).

@uranusjr
Copy link
Member

uranusjr commented Mar 9, 2023

If we define “fail” on the DAG level as “a task will not triggered when all its upstreams finish” I think it can work logically; the question is whether this is expected by users (and we probably should invent a term other than “fail” to avoid confusion).

But no matter how we decide on this, having a fail-fast flag (assuming we don’t use the “fail” term directly) would work the same if all tasks use the all-success rule, so implementing strictly only that scenario would be a good first step. But this still requires better terminology.

@potiuk
Copy link
Member

potiuk commented Mar 9, 2023

If we define “fail” on the DAG level as “a task will not triggered when all its upstreams finish” I think it can work logically; the question is whether this is expected by users (and we probably should invent a term other than “fail” to avoid confusion).

Maybe "cancel" or "stop" ? Yes I think this is a useful cases in a number of scenarios especially when users are cost or timing (or both) conscious. There might be a number of cases, especially when you have dynamic cloud infrastructure, and long running tasks, where you know that failure of any task will generally make any other task results useless - so running them would be a waste of money (or time waiting for things to complete).

@uranusjr
Copy link
Member

Yeah both sounds right to me.

@potiuk
Copy link
Member

potiuk commented Mar 10, 2023

fast_stop then instead of fail_fast ?

@RachitSharma2001
Copy link
Contributor Author

That makes sense. I was thinking of making the following changes, let me know if this sounds good:

  1. Change fail_fast flag to fast_stop
  2. Throw an error if a user tries to add a trigger rule other than all_success to a fast_stop DAG
  3. Add documentation explaining what fast_stop does

@potiuk
Copy link
Member

potiuk commented Mar 11, 2023

Sounds great!

@RachitSharma2001
Copy link
Contributor Author

RachitSharma2001 commented Mar 25, 2023

Hi everyone, I have added the following changes:

  1. Changed DAG 'fail_fast' to 'fail_stop' (here)
  2. dag.add_task throws an exception if the task has a trigger rule that is not 'all_success' (here)
    a. I have also added a test for this here.
  3. BaseOperator throws an exception if the task has a trigger rule that is not 'all_success' and the given dag is a 'fail_stop" dag. (here)
    a. I have added a test for this here
  4. Documented what the fail stop parameter does, with a warning about trigger rules. (here)

Let me know if there is anything else I should add or anything of concern.

@potiuk
Copy link
Member

potiuk commented Mar 27, 2023

Static checks?

@RachitSharma2001 RachitSharma2001 force-pushed the fail-fast-pr branch 2 times, most recently from 29aa15f to db7406d Compare April 1, 2023 17:57
Comment on lines 770 to 771
if dag is not None and dag.fail_stop and trigger_rule != DEFAULT_TRIGGER_RULE:
raise DagInvalidTriggerRule()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems awkward to me fail_stop and DEFAULT_TRIGGER_RULE are repeated when we check to raise the exception, and when the exception is rendered.

Would something like a check function on DagInvalidTriggerRule encapsulating the logic be a good idea, so the logic exists only in one class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. I have added a static check method to the DagInvalidTriggerRule class, which checks the necessary condition and throws an exception if needed. Let me know if this was what you had in mind or if there is a better possible implementation for this.

Comment on lines 213 to 218
@staticmethod
def check(dag: DAG | None, trigger_rule: str):
from airflow.models.abstractoperator import DEFAULT_TRIGGER_RULE

if dag is not None and dag.fail_stop and trigger_rule != DEFAULT_TRIGGER_RULE:
raise DagInvalidTriggerRule()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems simpler if this is a classmethod and use raise cls?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I have updated the code to be a class method rather than a static method.

airflow/models/dag.py Outdated Show resolved Hide resolved
@RachitSharma2001
Copy link
Contributor Author

Hi everyone, I was wondering if these changes look good, or if there is anything else I should add?

@potiuk potiuk merged commit 4c5cebf into apache:main Apr 23, 2023
@potiuk
Copy link
Member

potiuk commented Apr 23, 2023

Nice one :)

@ephraimbuddy ephraimbuddy added the type:new-feature Changelog: New Features label May 8, 2023
@ephraimbuddy ephraimbuddy added this to the Airflow 2.7.0 milestone May 8, 2023
@dstandish dstandish changed the title Add Fail Fast feature for DAGs Add fail stop feature for DAGs Jul 31, 2023
@dstandish
Copy link
Contributor

One suggestion of how to approach this was to modify base_executor to kill all tasks within a DAG once one task fails. The problem that I found within this approach was that, first of all there are several different executors which all store which tasks are running, queued, or completed differently, making it hard to make a unified function that kills all running tasks on all types of executors. More importantly, through my tests I have found that the executor often doesn't have full information of all the tasks in a dag run when one task fails. What this means is that, if a task fails, the executor may not yet have full information about all the tasks currently running within that DAG, and all tasks currently queued, and thus cannot properly fail the DAG.

Yeah it does seem that the current approach does allow for races between scheduler and task.

What if tis are expanded after tis = self.get_dagrun(session).get_task_instances() is called and before anything is killed?

Separately, what if here we skip a TI and the scheduler sets it to queued?

I'm not sure the frequency with which this kind of thing would manifest but the conditions aren't hard to imagine.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type:new-feature Changelog: New Features
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants