Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug that makes AirflowSecurityManagerV2 leave transactions in the idle in transaction state #39935

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions airflow/www/security_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
RESOURCE_XCOM,
)
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.www.extensions.init_auth_manager import get_auth_manager
from airflow.www.utils import CustomSQLAInterface

Expand All @@ -77,8 +76,6 @@
}

if TYPE_CHECKING:
from sqlalchemy.orm import Session

from airflow.auth.managers.models.base_user import BaseUser


Expand Down Expand Up @@ -167,9 +164,8 @@ def add_limit_view(self, baseview):
)(baseview.blueprint)

@cached_property
@provide_session
def _auth_manager_is_authorized_map(
self, session: Session = NEW_SESSION
self,
) -> dict[str, Callable[[str, str | None, BaseUser | None], bool]]:
"""
Return the map associating a FAB resource name to the corresponding auth manager is_authorized_ API.
Expand All @@ -179,6 +175,8 @@ def _auth_manager_is_authorized_map(
auth_manager = get_auth_manager()
methods = get_method_from_fab_action_map()

session = self.appbuilder.session

def get_connection_id(resource_pk):
if not resource_pk:
return None
Expand Down Expand Up @@ -226,7 +224,7 @@ def get_variable_key(resource_pk):
return None
variable = session.scalar(select(Variable).where(Variable.id == resource_pk).limit(1))
if not variable:
raise AirflowException("Connection not found")
raise AirflowException("Variable not found")
return variable.key

return {
Expand Down
31 changes: 31 additions & 0 deletions tests/www/test_security_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations

import json
from unittest import mock
from unittest.mock import Mock

Expand Down Expand Up @@ -135,3 +136,33 @@ def test_has_access(
if len(auth_manager_methods) > 1 and not expected:
for method_name in auth_manager_methods:
getattr(auth_manager, method_name).assert_called()

@mock.patch("airflow.utils.session.create_session")
@mock.patch("airflow.www.security_manager.get_auth_manager")
def test_manager_does_not_create_extra_db_sessions(
self,
_,
mock_create_session,
security_manager,
):
"""
Test that the Security Manager doesn't create extra DB sessions and
instead uses the session already available through the appbuilder
object that is attached to it.
"""
with mock.patch.object(security_manager.appbuilder, "session") as mock_appbuilder_session:
action_name = ACTION_CAN_READ
resource_pk = "PK"
user = Mock()
for func in security_manager._auth_manager_is_authorized_map.values():
try:
func(action_name, resource_pk, user)
except json.JSONDecodeError:
# The resource-retrieving function expects a "composite"
# PK as a JSON string. Provide a mocked one.
func(action_name, "[1, 1, 1, 1]", user)
mock_create_session.assert_not_called()

# The Security Manager's `appbuilder.session` object should have been
# put to use by many of the functions tested above.
assert len(mock_appbuilder_session.method_calls) > 0