Skip to content

Commit

Permalink
Fix DAG-level permissions disappearance on DAG sync
Browse files Browse the repository at this point in the history
  • Loading branch information
TruthIsNear authored and potiuk committed Aug 6, 2023
1 parent 17a9473 commit 22f680e
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 6 deletions.
29 changes: 24 additions & 5 deletions airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ def _process_modules(self, filepath, mods, file_last_changed_on_disk):

found_dags = []

for (dag, mod) in top_level_dags:
for dag, mod in top_level_dags:
dag.fileloc = mod.__file__
try:
dag.validate()
Expand Down Expand Up @@ -686,11 +686,30 @@ def sync_to_db(self, processor_subdir: str | None = None, session: Session = NEW
@classmethod
@provide_session
def _sync_perm_for_dag(cls, dag: DAG, session: Session = NEW_SESSION):
"""Sync DAG specific permissions, if necessary."""
from airflow.auth.managers.fab.models import Action, Permission, Resource
from airflow.security.permissions import DAG_ACTIONS, resource_name_for_dag

"""Sync DAG specific permissions."""
root_dag_id = dag.parent_dag.dag_id if dag.parent_dag else dag.dag_id

cls.logger().debug("Syncing DAG permissions: %s to the DB", root_dag_id)
from airflow.www.security import ApplessAirflowSecurityManager
def needs_perms(dag_id: str) -> bool:
dag_resource_name = resource_name_for_dag(dag_id)
for permission_name in DAG_ACTIONS:
if not (
session.query(Permission)
.join(Action)
.join(Resource)
.filter(Action.name == permission_name)
.filter(Resource.name == dag_resource_name)
.one_or_none()
):
return True
return False

if dag.access_control or needs_perms(root_dag_id):
cls.logger().debug("Syncing DAG permissions: %s to the DB", root_dag_id)
from airflow.www.security import ApplessAirflowSecurityManager

security_manager = ApplessAirflowSecurityManager(session=session)
security_manager.sync_perm_for_dag(root_dag_id, dag.access_control)
security_manager = ApplessAirflowSecurityManager(session=session)
security_manager.sync_perm_for_dag(root_dag_id, dag.access_control)
3 changes: 2 additions & 1 deletion tests/models/test_dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,7 @@ def _sync_to_db():
def test_sync_perm_for_dag(self, mock_security_manager):
"""
Test that dagbag._sync_perm_for_dag will call ApplessAirflowSecurityManager.sync_perm_for_dag
when DAG specific perm views don't exist already or the DAG has access_control set.
"""
db_clean_up()
with create_session() as session:
Expand All @@ -931,7 +932,7 @@ def _sync_perms():

# perms now exist
_sync_perms()
mock_sync_perm_for_dag.assert_called_once_with("test_example_bash_operator", None)
mock_sync_perm_for_dag.assert_not_called()

# Always sync if we have access_control
dag.access_control = {"Public": {"can_read"}}
Expand Down

0 comments on commit 22f680e

Please sign in to comment.