From 713934181416d46de62816540cab83800829ee87 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Wed, 15 Jul 2020 18:35:59 +0100 Subject: [PATCH] Fix Writing Serialized Dags to DB (#9836) GitOrigin-RevId: 2d124417e66ee6a92538e537dd20f6fb63a33007 --- airflow/models/dagbag.py | 10 ++++++++-- tests/jobs/test_scheduler_job.py | 4 ++-- tests/models/test_dagbag.py | 31 ++++++++++++++++++++++++++++--- 3 files changed, 38 insertions(+), 7 deletions(-) diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index cc7f6bd473..c624b2bd1c 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -70,7 +70,9 @@ class DagBag(BaseDagBag, LoggingMixin): with airflow or not :type include_examples: bool :param store_serialized_dags: Read DAGs from DB if store_serialized_dags is ``True``. - If ``False`` DAGs are read from python files. + If ``False`` DAGs are read from python files. This property is not used when + determining whether or not to write Serialized DAGs, that is done by checking + the config ``store_serialized_dags``. :type store_serialized_dags: bool """ @@ -434,6 +436,10 @@ def sync_to_db(self): # To avoid circular import - airflow.models.dagbag -> airflow.models.dag -> airflow.models.dagbag from airflow.models.dag import DAG from airflow.models.serialized_dag import SerializedDagModel + self.log.debug("Calling the DAG.bulk_sync_to_db method") DAG.bulk_sync_to_db(self.dags.values()) - if self.store_serialized_dags: + # Write Serialized DAGs to DB if DAG Serialization is turned on + # Even though self.store_serialized_dags is False + if settings.STORE_SERIALIZED_DAGS: + self.log.debug("Calling the SerializedDagModel.bulk_sync_to_db method") SerializedDagModel.bulk_sync_to_db(self.dags.values()) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 3dc70c7651..d9eae24687 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -125,10 +125,10 @@ def create_test_dag(self, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE + timed return dag @classmethod + @patch("airflow.models.dagbag.settings.STORE_SERIALIZED_DAGS", True) def setUpClass(cls): # Ensure the DAGs we are looking at from the DB are up-to-date non_serialized_dagbag = DagBag(store_serialized_dags=False, include_examples=False) - non_serialized_dagbag.store_serialized_dags = True non_serialized_dagbag.sync_to_db() cls.dagbag = DagBag(store_serialized_dags=True) @@ -1370,10 +1370,10 @@ def setUp(self): self.null_exec = MockExecutor() @classmethod + @patch("airflow.models.dagbag.settings.STORE_SERIALIZED_DAGS", True) def setUpClass(cls): # Ensure the DAGs we are looking at from the DB are up-to-date non_serialized_dagbag = DagBag(store_serialized_dags=False, include_examples=False) - non_serialized_dagbag.store_serialized_dags = True non_serialized_dagbag.sync_to_db() cls.dagbag = DagBag(store_serialized_dags=True) diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py index ff96a81d64..886119b4e4 100644 --- a/tests/models/test_dagbag.py +++ b/tests/models/test_dagbag.py @@ -24,13 +24,16 @@ from tempfile import NamedTemporaryFile, mkdtemp from unittest.mock import patch +from sqlalchemy import func + import airflow.example_dags from airflow import models from airflow.models import DagBag, DagModel +from airflow.models.serialized_dag import SerializedDagModel from airflow.utils.session import create_session from tests.models import TEST_DAGS_FOLDER +from tests.test_utils import db from tests.test_utils.config import conf_vars -from tests.test_utils.db import clear_db_dags class TestDagBag(unittest.TestCase): @@ -43,10 +46,12 @@ def tearDownClass(cls): shutil.rmtree(cls.empty_dir) def setUp(self) -> None: - clear_db_dags() + db.clear_db_dags() + db.clear_db_serialized_dags() def tearDown(self) -> None: - clear_db_dags() + db.clear_db_dags() + db.clear_db_serialized_dags() def test_get_existing_dag(self): """ @@ -632,3 +637,23 @@ def test_deactivate_unknown_dags(self): # clean up with create_session() as session: session.query(DagModel).filter(DagModel.dag_id == 'test_deactivate_unknown_dags').delete() + + @patch("airflow.models.dagbag.settings.STORE_SERIALIZED_DAGS", True) + def test_serialized_dags_are_written_to_db_on_sync(self): + """ + Test that when dagbag.sync_to_db is called the DAGs are Serialized and written to DB + even when dagbag.store_serialized_dags is False + """ + with create_session() as session: + serialized_dags_count = session.query(func.count(SerializedDagModel.dag_id)).scalar() + self.assertEqual(serialized_dags_count, 0) + + dagbag = DagBag( + dag_folder=os.path.join(TEST_DAGS_FOLDER, "test_example_bash_operator.py"), + include_examples=False) + dagbag.sync_to_db() + + self.assertFalse(dagbag.store_serialized_dags) + + new_serialized_dags_count = session.query(func.count(SerializedDagModel.dag_id)).scalar() + self.assertEqual(new_serialized_dags_count, 1)