diff --git a/airflow/jobs/backfill_job_runner.py b/airflow/jobs/backfill_job_runner.py index af8ebd1c4e37c..101e6cc31393f 100644 --- a/airflow/jobs/backfill_job_runner.py +++ b/airflow/jobs/backfill_job_runner.py @@ -106,7 +106,7 @@ class _DagRunTaskStatus: failed: set[TaskInstanceKey] = attr.ib(factory=set) not_ready: set[TaskInstanceKey] = attr.ib(factory=set) deadlocked: set[TaskInstance] = attr.ib(factory=set) - active_runs: list[DagRun] = attr.ib(factory=list) + active_runs: set[DagRun] = attr.ib(factory=set) executed_dag_run_dates: set[pendulum.DateTime] = attr.ib(factory=set) finished_runs: int = 0 total_runs: int = 0 @@ -526,6 +526,8 @@ def _per_task_process(key, ti: TaskInstance, session): ti_status.running.pop(key) # Reset the failed task in backfill to scheduled state ti.set_state(TaskInstanceState.SCHEDULED, session=session) + if ti.dag_run not in ti_status.active_runs: + ti_status.active_runs.add(ti.dag_run) else: # Default behaviour which works for subdag. if ti.state in (TaskInstanceState.FAILED, TaskInstanceState.UPSTREAM_FAILED): @@ -746,7 +748,7 @@ def to_keep(key: TaskInstanceKey) -> bool: session.commit() # update dag run state - _dag_runs = ti_status.active_runs[:] + _dag_runs = ti_status.active_runs.copy() for run in _dag_runs: run.update_state(session=session) if run.state in State.finished_dr_states: @@ -848,7 +850,7 @@ def _execute_dagruns( dag_run = self._get_dag_run(dagrun_info, dag, session=session) if dag_run is not None: tis_map = self._task_instances_for_dag_run(dag, dag_run, session=session) - ti_status.active_runs.append(dag_run) + ti_status.active_runs.add(dag_run) ti_status.to_run.update(tis_map or {}) processed_dag_run_dates = self._process_backfill_task_instances( diff --git a/tests/dags/test_backfill_with_upstream_failed_task.py b/tests/dags/test_backfill_with_upstream_failed_task.py new file mode 100644 index 0000000000000..d2cb6353bfaa3 --- /dev/null +++ b/tests/dags/test_backfill_with_upstream_failed_task.py @@ -0,0 +1,36 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import datetime + +from airflow.models.dag import DAG +from airflow.operators.bash import BashOperator + +dag = DAG( + dag_id="test_backfill_with_upstream_failed_task", + default_args={"retries": 0, "start_date": datetime.datetime(2010, 1, 1)}, + schedule="0 0 * * *", +) + +failing_task = BashOperator(task_id="failing_task", bash_command="exit 1", dag=dag) +downstream_task = BashOperator(task_id="downstream_task", bash_command="echo 1", dag=dag) +downstream_task.set_upstream(failing_task) + +if __name__ == "__main__": + dag.cli() diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py index 88ce758b578d0..0802f11aa9b31 100644 --- a/tests/jobs/test_backfill_job.py +++ b/tests/jobs/test_backfill_job.py @@ -1916,7 +1916,7 @@ def consumer(value): executor = MockExecutor() ti_status = BackfillJobRunner._DagRunTaskStatus() - ti_status.active_runs.append(dr) + ti_status.active_runs.add(dr) ti_status.to_run = {ti.key: ti for ti in dr.task_instances} job = Job(executor=executor) @@ -2103,3 +2103,31 @@ def test_backfill_disable_retry(self, dag_maker, disable_retry, try_number, exce assert dag_run.state == DagRunState.FAILED dag.clear() + + def test_backfill_failed_dag_with_upstream_failed_task(self, dag_maker): + self.dagbag.process_file(str(TEST_DAGS_FOLDER / "test_backfill_with_upstream_failed_task.py")) + dag = self.dagbag.get_dag("test_backfill_with_upstream_failed_task") + + # We have to use the "fake" version of perform_heartbeat due to the 'is_unit_test' check in + # the original one. However, instead of using the original version of perform_heartbeat, + # we can simply wait for a LocalExecutor's worker cycle. The approach with sleep works well now, + # but it can be replaced with checking the state of the LocalTaskJob. + def fake_perform_heartbeat(*args, **kwargs): + import time + + time.sleep(1) + + with mock.patch("airflow.jobs.backfill_job_runner.perform_heartbeat", fake_perform_heartbeat): + job = Job(executor=ExecutorLoader.load_executor("LocalExecutor")) + job_runner = BackfillJobRunner( + job=job, + dag=dag, + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE, + rerun_failed_tasks=True, + ) + with pytest.raises(BackfillUnfinished): + run_job(job=job, execute_callable=job_runner._execute) + + dr: DagRun = dag.get_last_dagrun() + assert dr.state == State.FAILED