From 79cde239fd538cc97a16dbef0a7a168e8228533b Mon Sep 17 00:00:00 2001 From: quy Date: Fri, 11 Oct 2024 12:39:25 +0700 Subject: [PATCH] Build DagFactory DAGs when there is an invalid YAML in the DAGs folder (#184) Before, when the Airflow DAGs folder had an invalid YAML file, no DagFactory DAGs would be loaded. This PR changes this behaviour to log any invalid YAML file paths but render valid DagFactory YAML-based DAGs. Co-authored-by: Tatiana Al-Chueyr --- .gitignore | 2 ++ dagfactory/dagfactory.py | 11 +++++++---- examples/invalid.yaml | 9 +++++++++ tests/test_dagfactory.py | 16 +++++++++------- tests/test_example_dags.py | 28 ++++++++++++++++++++++++++++ tox.ini | 6 ++++++ 6 files changed, 61 insertions(+), 11 deletions(-) create mode 100644 examples/invalid.yaml create mode 100644 tests/test_example_dags.py diff --git a/.gitignore b/.gitignore index 7a3971fa..405ff309 100644 --- a/.gitignore +++ b/.gitignore @@ -123,3 +123,5 @@ logs/ # VIM *.sw[a-z] +# Airflow +examples/.airflowignore diff --git a/dagfactory/dagfactory.py b/dagfactory/dagfactory.py index b469e551..0864c1ca 100644 --- a/dagfactory/dagfactory.py +++ b/dagfactory/dagfactory.py @@ -178,9 +178,12 @@ def load_yaml_dags( suffix = [".yaml", ".yml"] candidate_dag_files = [] for suf in suffix: - candidate_dag_files = chain(candidate_dag_files, Path(dags_folder).rglob(f"*{suf}")) - + candidate_dag_files = list(chain(candidate_dag_files, Path(dags_folder).rglob(f"*{suf}"))) for config_file_path in candidate_dag_files: config_file_abs_path = str(config_file_path.absolute()) - DagFactory(config_file_abs_path).generate_dags(globals_dict) - logging.info("DAG loaded: %s", config_file_path) + logging.info("Loading %s", config_file_abs_path) + try: + DagFactory(config_file_abs_path).generate_dags(globals_dict) + logging.info("DAG loaded: %s", config_file_path) + except Exception: # pylint: disable=broad-except + logging.exception("Failed to load dag from %s", config_file_path) diff --git a/examples/invalid.yaml b/examples/invalid.yaml new file mode 100644 index 00000000..19859e50 --- /dev/null +++ b/examples/invalid.yaml @@ -0,0 +1,9 @@ +name: John Doe +age: 30 +is_student: yes +address: + street: 123 Main St + city: New York + postal_code 10001 +- phone: 555-1234 +email: johndoe@example.com diff --git a/tests/test_dagfactory.py b/tests/test_dagfactory.py index 2a715e42..e1574e55 100644 --- a/tests/test_dagfactory.py +++ b/tests/test_dagfactory.py @@ -1,5 +1,6 @@ import os import datetime +import logging import pytest from airflow.models.variable import Variable @@ -431,13 +432,14 @@ def test_set_callback_after_loading_config(): td.generate_dags(globals()) -def test_load_yaml_dags_fail(): - with pytest.raises(Exception): - load_yaml_dags( - globals_dict=globals(), - dags_folder="tests/fixtures", - suffix=["invalid_yaml.yml"], - ) +def test_load_invalid_yaml_logs_error(caplog): + caplog.set_level(logging.ERROR) + load_yaml_dags( + globals_dict=globals(), + dags_folder="tests/fixtures", + suffix=["invalid_yaml.yml"], + ) + assert caplog.messages == ['Failed to load dag from tests/fixtures/invalid_yaml.yml'] def test_load_yaml_dags_succeed(): diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py new file mode 100644 index 00000000..65262c71 --- /dev/null +++ b/tests/test_example_dags.py @@ -0,0 +1,28 @@ +from __future__ import annotations +from pathlib import Path + +import airflow +from airflow.models.dagbag import DagBag +from packaging.version import Version + + +EXAMPLE_DAGS_DIR = Path(__file__).parent.parent / "examples" +AIRFLOW_IGNORE_FILE = EXAMPLE_DAGS_DIR / ".airflowignore" +AIRFLOW_VERSION = Version(airflow.__version__) + + +MIN_VER_DAG_FILE_VER: dict[str, list[str]] = { + "2.3": ["example_dynamic_task_mapping.py"], +} + + +def test_no_import_errors(): + with open(AIRFLOW_IGNORE_FILE, "w+") as file: + for min_version, files in MIN_VER_DAG_FILE_VER.items(): + if AIRFLOW_VERSION < Version(min_version): + print(f"Adding {files} to .airflowignore") + file.writelines([f"{file}\n" for file in files]) + + db = DagBag(EXAMPLE_DAGS_DIR, include_examples=False) + assert db.dags + assert not db.import_errors diff --git a/tox.ini b/tox.ini index e5203094..73f6f24f 100644 --- a/tox.ini +++ b/tox.ini @@ -20,6 +20,8 @@ deps = markupsafe>=1.1.1,<2.1.0 setenv = AIRFLOW__CORE__SQL_ALCHEMY_CONN = sqlite:////tmp/airflow.db + CONFIG_ROOT_DIR = {toxinidir}/examples + PYTHONPATH = {toxinidir}:{toxinidir}/examples:{env:PYTHONPATH} commands = airflow db init pytest --cov=dagfactory tests -p no:warnings --verbose --color=yes --cov-report=xml @@ -34,6 +36,8 @@ deps = markupsafe>=1.1.1,<2.1.0 setenv = AIRFLOW__CORE__SQL_ALCHEMY_CONN = sqlite:////tmp/airflow.db + CONFIG_ROOT_DIR = {toxinidir}/examples + PYTHONPATH = {toxinidir}:{toxinidir}/examples:{env:PYTHONPATH} commands = airflow db init pytest --cov=dagfactory tests -p no:warnings --verbose --color=yes --cov-report=xml @@ -48,6 +52,8 @@ deps = markupsafe>=1.1.1,<2.1.0 setenv = AIRFLOW__CORE__SQL_ALCHEMY_CONN = sqlite:////tmp/airflow.db + CONFIG_ROOT_DIR = {toxinidir}/examples + PYTHONPATH = {toxinidir}:{toxinidir}/examples:{env:PYTHONPATH} commands = airflow db init pytest --cov=dagfactory tests -p no:warnings --verbose --color=yes --cov-report=xml