Skip to content

Commit

Permalink
use the approach from check_permissions to avoid n plus 1 query issue
Browse files Browse the repository at this point in the history
  • Loading branch information
burnettk committed Dec 18, 2023
1 parent e161c3d commit 63f5bdb
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@

from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.exceptions.process_entity_not_found_error import ProcessEntityNotFoundError
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.human_task import HumanTaskModel
from spiffworkflow_backend.models.human_task_user import HumanTaskUserModel
from spiffworkflow_backend.models.permission_assignment import PermissionAssignmentModel
from spiffworkflow_backend.models.principal import PrincipalModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance_file_data import ProcessInstanceFileDataModel
Expand All @@ -31,7 +29,6 @@
from spiffworkflow_backend.services.process_caller_service import ProcessCallerService
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor
from spiffworkflow_backend.services.process_model_service import ProcessModelService
from spiffworkflow_backend.services.user_service import UserService

process_api_blueprint = Blueprint("process_api", __name__)

Expand All @@ -49,14 +46,7 @@ def permissions_check(body: dict[str, dict[str, list[str]]]) -> flask.wrappers.R
requests_to_check = body["requests_to_check"]

user = g.user
principals = UserService.all_principals_for_user(user)
principal_ids = [p.id for p in principals]

permission_assignments = (
PermissionAssignmentModel.query.filter(PermissionAssignmentModel.principal_id.in_(principal_ids))
.options(db.joinedload(PermissionAssignmentModel.permission_target))
.all()
)
permission_assignments = AuthorizationService.all_permission_assignments_for_user(user=user)

for target_uri, http_methods in requests_to_check.items():
if target_uri not in response_dict:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from spiffworkflow_backend.models.human_task import HumanTaskModel
from spiffworkflow_backend.models.permission_assignment import PermissionAssignmentModel
from spiffworkflow_backend.models.permission_target import PermissionTargetModel
from spiffworkflow_backend.models.principal import MissingPrincipalError
from spiffworkflow_backend.models.principal import PrincipalModel
from spiffworkflow_backend.models.service_account import SPIFF_SERVICE_ACCOUNT_AUTH_SERVICE
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
Expand Down Expand Up @@ -118,18 +117,20 @@ def has_permission(cls, principals: list[PrincipalModel], permission: str, targe

@classmethod
def user_has_permission(cls, user: UserModel, permission: str, target_uri: str) -> bool:
if user.principal is None:
raise MissingPrincipalError(f"Missing principal for user with id: {user.id}")

principals = [user.principal]

for group in user.groups:
if group.principal is None:
raise MissingPrincipalError(f"Missing principal for group with id: {group.id}")
principals.append(group.principal)

principals = UserService.all_principals_for_user(user)
return cls.has_permission(principals, permission, target_uri)

@classmethod
def all_permission_assignments_for_user(cls, user: UserModel) -> list[PermissionAssignmentModel]:
principals = UserService.all_principals_for_user(user)
principal_ids = [p.id for p in principals]
permission_assignments: list[PermissionAssignmentModel] = (
PermissionAssignmentModel.query.filter(PermissionAssignmentModel.principal_id.in_(principal_ids))
.options(db.joinedload(PermissionAssignmentModel.permission_target))
.all()
)
return permission_assignments

@classmethod
def permission_assignments_include(
cls, permission_assignments: list[PermissionAssignmentModel], permission: str, target_uri: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,14 +267,22 @@ def process_model_identifiers_with_permission_for_user(
permission=permission_to_check,
target_uri=f"{permission_base_uri}/{guid_of_non_existent_item_to_check_perms_against}",
)

# if user has access to uri/* with that permission then there's no reason to check each one individually
if has_permission:
return process_model_identifiers

permission_assignments = AuthorizationService.all_permission_assignments_for_user(user=user)

permitted_process_model_identifiers = []
for process_model_identifier in process_model_identifiers:
modified_process_model_id = ProcessModelInfo.modify_process_identifier_for_path_param(process_model_identifier)
uri = f"{permission_base_uri}/{modified_process_model_id}"
has_permission = AuthorizationService.user_has_permission(user=user, permission=permission_to_check, target_uri=uri)
has_permission = AuthorizationService.permission_assignments_include(
permission_assignments=permission_assignments,
permission=permission_to_check,
target_uri=uri,
)
if has_permission:
permitted_process_model_identifiers.append(process_model_identifier)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ def get_permission_targets_for_user(cls, user: UserModel, check_groups: bool = T
def all_principals_for_user(cls, user: UserModel) -> list[PrincipalModel]:
if user.principal is None:
raise MissingPrincipalError(f"Missing principal for user with id: {user.id}")

principals = [user.principal]

for group in user.groups:
Expand Down

0 comments on commit 63f5bdb

Please sign in to comment.