Skip to content

Commit

Permalink
don't default to dags folder
Browse files Browse the repository at this point in the history
  • Loading branch information
dstandish committed Sep 28, 2022
1 parent 6a8bb32 commit c23f8c6
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 12 deletions.
9 changes: 4 additions & 5 deletions airflow/utils/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def get_dag_by_file_location(dag_id: str):
return dagbag.dags[dag_id]


def _try_to_find_path(val: str | None):
def _search_for_dag_file(fileloc: str | None) -> str | None:
"""
Try to correct the path to the dag file in question.
Expand All @@ -197,11 +197,10 @@ def _try_to_find_path(val: str | None):
parsed the dags folder). If we can't find such a file, or if we find more than one, we
just parse the dags folder as a fallback.
"""
if val and Path(val).suffix in ('.zip', '.py'):
matches = list(Path(settings.DAGS_FOLDER).rglob(Path(val).name))
if fileloc and Path(fileloc).suffix in ('.zip', '.py'):
matches = list(Path(settings.DAGS_FOLDER).rglob(Path(fileloc).name))
if len(matches) == 1:
return matches[0].as_posix()
return settings.DAGS_FOLDER


def get_dag(subdir: str | None, dag_id: str) -> DAG:
Expand All @@ -217,7 +216,7 @@ def get_dag(subdir: str | None, dag_id: str) -> DAG:
first_path = process_subdir(subdir)
dagbag = DagBag(first_path)
if dag_id not in dagbag.dags:
fallback_path = _try_to_find_path(subdir)
fallback_path = _search_for_dag_file(subdir) or settings.DAGS_FOLDER
logger.warning("Dag %r not found in path %s; trying path %s", dag_id, first_path, fallback_path)
dagbag = DagBag(dag_folder=fallback_path)
if dag_id not in dagbag.dags:
Expand Down
16 changes: 9 additions & 7 deletions tests/utils/test_cli_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from airflow.exceptions import AirflowException
from airflow.models.log import Log
from airflow.utils import cli, cli_action_loggers, timezone
from airflow.utils.cli import _try_to_find_path
from airflow.utils.cli import _search_for_dag_file

repo_root = Path(airflow.__file__).parent.parent

Expand Down Expand Up @@ -196,15 +196,17 @@ def success_func(_):
pass


def test__try_to_find_path():
def test__search_for_dags_file():
dags_folder = settings.DAGS_FOLDER
assert _try_to_find_path('') == dags_folder
assert _try_to_find_path(None) == dags_folder
assert _search_for_dag_file('') == None
assert _search_for_dag_file(None) == None
# if it's a file, and one can be find in subdir, should return full path
assert _try_to_find_path('any/hi/test_dags_folder.py') == str(Path(dags_folder) / 'test_dags_folder.py')
assert _search_for_dag_file('any/hi/test_dags_folder.py') == str(
Path(dags_folder) / 'test_dags_folder.py'
)
# if a folder, even if exists, should return dags folder
existing_folder = Path(settings.DAGS_FOLDER, 'subdir1')
assert existing_folder.exists()
assert _try_to_find_path(existing_folder.as_posix()) == dags_folder
assert _search_for_dag_file(existing_folder.as_posix()) == None
# when multiple files found, default to the dags folder
assert _try_to_find_path('any/hi/__init__.py') == dags_folder
assert _search_for_dag_file('any/hi/__init__.py') == None

0 comments on commit c23f8c6

Please sign in to comment.