From 965a41fd982bcc52073ee04e12a4b664d991dead Mon Sep 17 00:00:00 2001 From: Yingbo Wang Date: Wed, 3 Oct 2018 06:32:55 -0700 Subject: [PATCH] [AIRFLOW-2951] Update dag_run table end_date when state change (#3990) The existing airflow only change dag_run table end_date value when a user teminate a dag in web UI. The end_date will not be updated if airflow detected a dag finished and updated its state. This commit add end_date update in DagRun's set_state function to make up tho problem mentioned above. --- airflow/models.py | 10 ++-- tests/models.py | 118 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 124 insertions(+), 4 deletions(-) diff --git a/airflow/models.py b/airflow/models.py index 8fc259d1b5a29..428923ff9e73f 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -4845,6 +4845,8 @@ def get_state(self): def set_state(self, state): if self._state != state: self._state = state + self.end_date = timezone.utcnow() if self._state in State.finished() else None + if self.dag_id is not None: # FIXME: Due to the scoped_session factor we we don't get a clean # session here, so something really weird goes on: @@ -5068,7 +5070,7 @@ def update_state(self, session=None): if (not unfinished_tasks and any(r.state in (State.FAILED, State.UPSTREAM_FAILED) for r in roots)): self.log.info('Marking run %s failed', self) - self.state = State.FAILED + self.set_state(State.FAILED) dag.handle_callback(self, success=False, reason='task_failure', session=session) @@ -5076,20 +5078,20 @@ def update_state(self, session=None): elif not unfinished_tasks and all(r.state in (State.SUCCESS, State.SKIPPED) for r in roots): self.log.info('Marking run %s successful', self) - self.state = State.SUCCESS + self.set_state(State.SUCCESS) dag.handle_callback(self, success=True, reason='success', session=session) # if *all tasks* are deadlocked, the run failed elif (unfinished_tasks and none_depends_on_past and none_task_concurrency and no_dependencies_met): self.log.info('Deadlock; marking run %s failed', self) - self.state = State.FAILED + self.set_state(State.FAILED) dag.handle_callback(self, success=False, reason='all_tasks_deadlocked', session=session) # finally, if the roots aren't done, the dag is still running else: - self.state = State.RUNNING + self.set_state(State.RUNNING) # todo: determine we want to use with_for_update to make sure to lock the run session.merge(self) diff --git a/tests/models.py b/tests/models.py index 60aee3c84fcb1..55fa41bd90bab 100644 --- a/tests/models.py +++ b/tests/models.py @@ -915,6 +915,124 @@ def on_failure_callable(context): updated_dag_state = dag_run.update_state() self.assertEqual(State.FAILED, updated_dag_state) + def test_dagrun_set_state_end_date(self): + session = settings.Session() + + dag = DAG( + 'test_dagrun_set_state_end_date', + start_date=DEFAULT_DATE, + default_args={'owner': 'owner1'}) + + dag.clear() + + now = timezone.utcnow() + dr = dag.create_dagrun(run_id='test_dagrun_set_state_end_date', + state=State.RUNNING, + execution_date=now, + start_date=now) + + # Initial end_date should be NULL + # State.SUCCESS and State.FAILED are all ending state and should set end_date + # State.RUNNING set end_date back to NULL + session.add(dr) + session.commit() + self.assertIsNone(dr.end_date) + + dr.set_state(State.SUCCESS) + session.merge(dr) + session.commit() + + dr_database = session.query(DagRun).filter( + DagRun.run_id == 'test_dagrun_set_state_end_date' + ).one() + self.assertIsNotNone(dr_database.end_date) + self.assertEqual(dr.end_date, dr_database.end_date) + + dr.set_state(State.RUNNING) + session.merge(dr) + session.commit() + + dr_database = session.query(DagRun).filter( + DagRun.run_id == 'test_dagrun_set_state_end_date' + ).one() + + self.assertIsNone(dr_database.end_date) + + dr.set_state(State.FAILED) + session.merge(dr) + session.commit() + dr_database = session.query(DagRun).filter( + DagRun.run_id == 'test_dagrun_set_state_end_date' + ).one() + + self.assertIsNotNone(dr_database.end_date) + self.assertEqual(dr.end_date, dr_database.end_date) + + def test_dagrun_update_state_end_date(self): + session = settings.Session() + + dag = DAG( + 'test_dagrun_update_state_end_date', + start_date=DEFAULT_DATE, + default_args={'owner': 'owner1'}) + + # A -> B + with dag: + op1 = DummyOperator(task_id='A') + op2 = DummyOperator(task_id='B') + op1.set_upstream(op2) + + dag.clear() + + now = timezone.utcnow() + dr = dag.create_dagrun(run_id='test_dagrun_update_state_end_date', + state=State.RUNNING, + execution_date=now, + start_date=now) + + # Initial end_date should be NULL + # State.SUCCESS and State.FAILED are all ending state and should set end_date + # State.RUNNING set end_date back to NULL + session.merge(dr) + session.commit() + self.assertIsNone(dr.end_date) + + ti_op1 = dr.get_task_instance(task_id=op1.task_id) + ti_op1.set_state(state=State.SUCCESS, session=session) + ti_op2 = dr.get_task_instance(task_id=op2.task_id) + ti_op2.set_state(state=State.SUCCESS, session=session) + + dr.update_state() + + dr_database = session.query(DagRun).filter( + DagRun.run_id == 'test_dagrun_update_state_end_date' + ).one() + self.assertIsNotNone(dr_database.end_date) + self.assertEqual(dr.end_date, dr_database.end_date) + + ti_op1.set_state(state=State.RUNNING, session=session) + ti_op2.set_state(state=State.RUNNING, session=session) + dr.update_state() + + dr_database = session.query(DagRun).filter( + DagRun.run_id == 'test_dagrun_update_state_end_date' + ).one() + + self.assertEqual(dr._state, State.RUNNING) + self.assertIsNone(dr.end_date) + self.assertIsNone(dr_database.end_date) + + ti_op1.set_state(state=State.FAILED, session=session) + ti_op2.set_state(state=State.FAILED, session=session) + dr.update_state() + + dr_database = session.query(DagRun).filter( + DagRun.run_id == 'test_dagrun_update_state_end_date' + ).one() + + self.assertIsNotNone(dr_database.end_date) + self.assertEqual(dr.end_date, dr_database.end_date) + def test_get_task_instance_on_empty_dagrun(self): """ Make sure that a proper value is returned when a dagrun has no task instances