diff --git a/airflow/utils/cli.py b/airflow/utils/cli.py index 3b9674e5aa362..bd9f59402763d 100644 --- a/airflow/utils/cli.py +++ b/airflow/utils/cli.py @@ -184,7 +184,7 @@ def get_dag_by_file_location(dag_id: str): return dagbag.dags[dag_id] -def _try_to_find_path(val: str | None): +def _search_for_dag_file(fileloc: str | None) -> str | None: """ Try to correct the path to the dag file in question. @@ -197,11 +197,10 @@ def _try_to_find_path(val: str | None): parsed the dags folder). If we can't find such a file, or if we find more than one, we just parse the dags folder as a fallback. """ - if val and Path(val).suffix in ('.zip', '.py'): - matches = list(Path(settings.DAGS_FOLDER).rglob(Path(val).name)) + if fileloc and Path(fileloc).suffix in ('.zip', '.py'): + matches = list(Path(settings.DAGS_FOLDER).rglob(Path(fileloc).name)) if len(matches) == 1: return matches[0].as_posix() - return settings.DAGS_FOLDER def get_dag(subdir: str | None, dag_id: str) -> DAG: @@ -217,7 +216,7 @@ def get_dag(subdir: str | None, dag_id: str) -> DAG: first_path = process_subdir(subdir) dagbag = DagBag(first_path) if dag_id not in dagbag.dags: - fallback_path = _try_to_find_path(subdir) + fallback_path = _search_for_dag_file(subdir) or settings.DAGS_FOLDER logger.warning("Dag %r not found in path %s; trying path %s", dag_id, first_path, fallback_path) dagbag = DagBag(dag_folder=fallback_path) if dag_id not in dagbag.dags: diff --git a/tests/utils/test_cli_util.py b/tests/utils/test_cli_util.py index a2614778e4613..ecabcacfb1c27 100644 --- a/tests/utils/test_cli_util.py +++ b/tests/utils/test_cli_util.py @@ -34,7 +34,7 @@ from airflow.exceptions import AirflowException from airflow.models.log import Log from airflow.utils import cli, cli_action_loggers, timezone -from airflow.utils.cli import _try_to_find_path +from airflow.utils.cli import _search_for_dag_file repo_root = Path(airflow.__file__).parent.parent @@ -196,15 +196,17 @@ def success_func(_): pass -def test__try_to_find_path(): +def test__search_for_dags_file(): dags_folder = settings.DAGS_FOLDER - assert _try_to_find_path('') == dags_folder - assert _try_to_find_path(None) == dags_folder + assert _search_for_dag_file('') == None + assert _search_for_dag_file(None) == None # if it's a file, and one can be find in subdir, should return full path - assert _try_to_find_path('any/hi/test_dags_folder.py') == str(Path(dags_folder) / 'test_dags_folder.py') + assert _search_for_dag_file('any/hi/test_dags_folder.py') == str( + Path(dags_folder) / 'test_dags_folder.py' + ) # if a folder, even if exists, should return dags folder existing_folder = Path(settings.DAGS_FOLDER, 'subdir1') assert existing_folder.exists() - assert _try_to_find_path(existing_folder.as_posix()) == dags_folder + assert _search_for_dag_file(existing_folder.as_posix()) == None # when multiple files found, default to the dags folder - assert _try_to_find_path('any/hi/__init__.py') == dags_folder + assert _search_for_dag_file('any/hi/__init__.py') == None