Skip to content

Commit

Permalink
Fix Writing Serialized Dags to DB (#9836)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 2d124417e66ee6a92538e537dd20f6fb63a33007
  • Loading branch information
kaxil authored and Cloud Composer Team committed Sep 12, 2024
1 parent a6c37de commit 7139341
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 7 deletions.
10 changes: 8 additions & 2 deletions airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ class DagBag(BaseDagBag, LoggingMixin):
with airflow or not
:type include_examples: bool
:param store_serialized_dags: Read DAGs from DB if store_serialized_dags is ``True``.
If ``False`` DAGs are read from python files.
If ``False`` DAGs are read from python files. This property is not used when
determining whether or not to write Serialized DAGs, that is done by checking
the config ``store_serialized_dags``.
:type store_serialized_dags: bool
"""

Expand Down Expand Up @@ -434,6 +436,10 @@ def sync_to_db(self):
# To avoid circular import - airflow.models.dagbag -> airflow.models.dag -> airflow.models.dagbag
from airflow.models.dag import DAG
from airflow.models.serialized_dag import SerializedDagModel
self.log.debug("Calling the DAG.bulk_sync_to_db method")
DAG.bulk_sync_to_db(self.dags.values())
if self.store_serialized_dags:
# Write Serialized DAGs to DB if DAG Serialization is turned on
# Even though self.store_serialized_dags is False
if settings.STORE_SERIALIZED_DAGS:
self.log.debug("Calling the SerializedDagModel.bulk_sync_to_db method")
SerializedDagModel.bulk_sync_to_db(self.dags.values())
4 changes: 2 additions & 2 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,10 @@ def create_test_dag(self, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE + timed
return dag

@classmethod
@patch("airflow.models.dagbag.settings.STORE_SERIALIZED_DAGS", True)
def setUpClass(cls):
# Ensure the DAGs we are looking at from the DB are up-to-date
non_serialized_dagbag = DagBag(store_serialized_dags=False, include_examples=False)
non_serialized_dagbag.store_serialized_dags = True
non_serialized_dagbag.sync_to_db()
cls.dagbag = DagBag(store_serialized_dags=True)

Expand Down Expand Up @@ -1370,10 +1370,10 @@ def setUp(self):
self.null_exec = MockExecutor()

@classmethod
@patch("airflow.models.dagbag.settings.STORE_SERIALIZED_DAGS", True)
def setUpClass(cls):
# Ensure the DAGs we are looking at from the DB are up-to-date
non_serialized_dagbag = DagBag(store_serialized_dags=False, include_examples=False)
non_serialized_dagbag.store_serialized_dags = True
non_serialized_dagbag.sync_to_db()
cls.dagbag = DagBag(store_serialized_dags=True)

Expand Down
31 changes: 28 additions & 3 deletions tests/models/test_dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@
from tempfile import NamedTemporaryFile, mkdtemp
from unittest.mock import patch

from sqlalchemy import func

import airflow.example_dags
from airflow import models
from airflow.models import DagBag, DagModel
from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils.session import create_session
from tests.models import TEST_DAGS_FOLDER
from tests.test_utils import db
from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_dags


class TestDagBag(unittest.TestCase):
Expand All @@ -43,10 +46,12 @@ def tearDownClass(cls):
shutil.rmtree(cls.empty_dir)

def setUp(self) -> None:
clear_db_dags()
db.clear_db_dags()
db.clear_db_serialized_dags()

def tearDown(self) -> None:
clear_db_dags()
db.clear_db_dags()
db.clear_db_serialized_dags()

def test_get_existing_dag(self):
"""
Expand Down Expand Up @@ -632,3 +637,23 @@ def test_deactivate_unknown_dags(self):
# clean up
with create_session() as session:
session.query(DagModel).filter(DagModel.dag_id == 'test_deactivate_unknown_dags').delete()

@patch("airflow.models.dagbag.settings.STORE_SERIALIZED_DAGS", True)
def test_serialized_dags_are_written_to_db_on_sync(self):
"""
Test that when dagbag.sync_to_db is called the DAGs are Serialized and written to DB
even when dagbag.store_serialized_dags is False
"""
with create_session() as session:
serialized_dags_count = session.query(func.count(SerializedDagModel.dag_id)).scalar()
self.assertEqual(serialized_dags_count, 0)

dagbag = DagBag(
dag_folder=os.path.join(TEST_DAGS_FOLDER, "test_example_bash_operator.py"),
include_examples=False)
dagbag.sync_to_db()

self.assertFalse(dagbag.store_serialized_dags)

new_serialized_dags_count = session.query(func.count(SerializedDagModel.dag_id)).scalar()
self.assertEqual(new_serialized_dags_count, 1)

0 comments on commit 7139341

Please sign in to comment.