Skip to content

Commit

Permalink
[AIRFLOW-2951] Update dag_run table end_date when state change (apach…
Browse files Browse the repository at this point in the history
…e#3990)

The existing airflow only change dag_run table end_date value when
a user teminate a dag in web UI. The end_date will not be updated
if airflow detected a dag finished and updated its state.

This commit add end_date update in DagRun's set_state function to
make up tho problem mentioned above.
  • Loading branch information
YingboWang authored and ashb committed Oct 22, 2018
1 parent 9541e7c commit 630b1c1
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 4 deletions.
10 changes: 6 additions & 4 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4845,6 +4845,8 @@ def get_state(self):
def set_state(self, state):
if self._state != state:
self._state = state
self.end_date = timezone.utcnow() if self._state in State.finished() else None

if self.dag_id is not None:
# FIXME: Due to the scoped_session factor we we don't get a clean
# session here, so something really weird goes on:
Expand Down Expand Up @@ -5068,28 +5070,28 @@ def update_state(self, session=None):
if (not unfinished_tasks and
any(r.state in (State.FAILED, State.UPSTREAM_FAILED) for r in roots)):
self.log.info('Marking run %s failed', self)
self.state = State.FAILED
self.set_state(State.FAILED)
dag.handle_callback(self, success=False, reason='task_failure',
session=session)

# if all roots succeeded and no unfinished tasks, the run succeeded
elif not unfinished_tasks and all(r.state in (State.SUCCESS, State.SKIPPED)
for r in roots):
self.log.info('Marking run %s successful', self)
self.state = State.SUCCESS
self.set_state(State.SUCCESS)
dag.handle_callback(self, success=True, reason='success', session=session)

# if *all tasks* are deadlocked, the run failed
elif (unfinished_tasks and none_depends_on_past and
none_task_concurrency and no_dependencies_met):
self.log.info('Deadlock; marking run %s failed', self)
self.state = State.FAILED
self.set_state(State.FAILED)
dag.handle_callback(self, success=False, reason='all_tasks_deadlocked',
session=session)

# finally, if the roots aren't done, the dag is still running
else:
self.state = State.RUNNING
self.set_state(State.RUNNING)

# todo: determine we want to use with_for_update to make sure to lock the run
session.merge(self)
Expand Down
118 changes: 118 additions & 0 deletions tests/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,124 @@ def on_failure_callable(context):
updated_dag_state = dag_run.update_state()
self.assertEqual(State.FAILED, updated_dag_state)

def test_dagrun_set_state_end_date(self):
session = settings.Session()

dag = DAG(
'test_dagrun_set_state_end_date',
start_date=DEFAULT_DATE,
default_args={'owner': 'owner1'})

dag.clear()

now = timezone.utcnow()
dr = dag.create_dagrun(run_id='test_dagrun_set_state_end_date',
state=State.RUNNING,
execution_date=now,
start_date=now)

# Initial end_date should be NULL
# State.SUCCESS and State.FAILED are all ending state and should set end_date
# State.RUNNING set end_date back to NULL
session.add(dr)
session.commit()
self.assertIsNone(dr.end_date)

dr.set_state(State.SUCCESS)
session.merge(dr)
session.commit()

dr_database = session.query(DagRun).filter(
DagRun.run_id == 'test_dagrun_set_state_end_date'
).one()
self.assertIsNotNone(dr_database.end_date)
self.assertEqual(dr.end_date, dr_database.end_date)

dr.set_state(State.RUNNING)
session.merge(dr)
session.commit()

dr_database = session.query(DagRun).filter(
DagRun.run_id == 'test_dagrun_set_state_end_date'
).one()

self.assertIsNone(dr_database.end_date)

dr.set_state(State.FAILED)
session.merge(dr)
session.commit()
dr_database = session.query(DagRun).filter(
DagRun.run_id == 'test_dagrun_set_state_end_date'
).one()

self.assertIsNotNone(dr_database.end_date)
self.assertEqual(dr.end_date, dr_database.end_date)

def test_dagrun_update_state_end_date(self):
session = settings.Session()

dag = DAG(
'test_dagrun_update_state_end_date',
start_date=DEFAULT_DATE,
default_args={'owner': 'owner1'})

# A -> B
with dag:
op1 = DummyOperator(task_id='A')
op2 = DummyOperator(task_id='B')
op1.set_upstream(op2)

dag.clear()

now = timezone.utcnow()
dr = dag.create_dagrun(run_id='test_dagrun_update_state_end_date',
state=State.RUNNING,
execution_date=now,
start_date=now)

# Initial end_date should be NULL
# State.SUCCESS and State.FAILED are all ending state and should set end_date
# State.RUNNING set end_date back to NULL
session.merge(dr)
session.commit()
self.assertIsNone(dr.end_date)

ti_op1 = dr.get_task_instance(task_id=op1.task_id)
ti_op1.set_state(state=State.SUCCESS, session=session)
ti_op2 = dr.get_task_instance(task_id=op2.task_id)
ti_op2.set_state(state=State.SUCCESS, session=session)

dr.update_state()

dr_database = session.query(DagRun).filter(
DagRun.run_id == 'test_dagrun_update_state_end_date'
).one()
self.assertIsNotNone(dr_database.end_date)
self.assertEqual(dr.end_date, dr_database.end_date)

ti_op1.set_state(state=State.RUNNING, session=session)
ti_op2.set_state(state=State.RUNNING, session=session)
dr.update_state()

dr_database = session.query(DagRun).filter(
DagRun.run_id == 'test_dagrun_update_state_end_date'
).one()

self.assertEqual(dr._state, State.RUNNING)
self.assertIsNone(dr.end_date)
self.assertIsNone(dr_database.end_date)

ti_op1.set_state(state=State.FAILED, session=session)
ti_op2.set_state(state=State.FAILED, session=session)
dr.update_state()

dr_database = session.query(DagRun).filter(
DagRun.run_id == 'test_dagrun_update_state_end_date'
).one()

self.assertIsNotNone(dr_database.end_date)
self.assertEqual(dr.end_date, dr_database.end_date)

def test_get_task_instance_on_empty_dagrun(self):
"""
Make sure that a proper value is returned when a dagrun has no task instances
Expand Down

0 comments on commit 630b1c1

Please sign in to comment.