Skip to content

Commit

Permalink
[AIRFLOW-3238] Fix models.DAG to deactivate unknown DAGs on initdb (#…
Browse files Browse the repository at this point in the history
…4073)

Unknown dags are now deactivated on initdb
  • Loading branch information
jason-udacity authored and kaxil committed Oct 21, 2018
1 parent fdda024 commit 39a9365
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 0 deletions.
1 change: 1 addition & 0 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4358,6 +4358,7 @@ def deactivate_unknown_dags(active_dag_ids, session=None):
DagModel).filter(~DagModel.dag_id.in_(active_dag_ids)).all():
dag.is_active = False
session.merge(dag)
session.commit()

@staticmethod
@provide_session
Expand Down
25 changes: 25 additions & 0 deletions tests/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1691,6 +1691,31 @@ def test_kill_zombies(self, mock_ti):
configuration.getboolean('core', 'unit_test_mode'),
ANY)

def test_deactivate_unknown_dags(self):
"""
Test that dag_ids not passed into deactivate_unknown_dags
are deactivated when function is invoked
"""
dagbag = models.DagBag(include_examples=True)
expected_active_dags = dagbag.dags.keys()

session = settings.Session
session.add(DagModel(dag_id='test_deactivate_unknown_dags', is_active=True))
session.commit()

models.DAG.deactivate_unknown_dags(expected_active_dags)

for dag in session.query(DagModel).all():
if dag.dag_id in expected_active_dags:
self.assertTrue(dag.is_active)
else:
self.assertEquals(dag.dag_id, 'test_deactivate_unknown_dags')
self.assertFalse(dag.is_active)

# clean up
session.query(DagModel).filter(DagModel.dag_id == 'test_deactivate_unknown_dags').delete()
session.commit()


class TaskInstanceTest(unittest.TestCase):

Expand Down

0 comments on commit 39a9365

Please sign in to comment.