Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix permissions that come from an external auditor role #15291

Merged
merged 7 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions awx/main/models/unified_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from polymorphic.models import PolymorphicModel

from ansible_base.lib.utils.models import prevent_search, get_type_for_model
from ansible_base.rbac import permission_registry

# AWX
from awx.main.models.base import CommonModelNameNotUnique, PasswordFieldsModel, NotificationFieldsModel
Expand Down Expand Up @@ -197,9 +198,7 @@ def unique_error_message(self, model_class, unique_check):

@classmethod
def _submodels_with_roles(cls):
ujt_classes = [c for c in cls.__subclasses__() if c._meta.model_name not in ['inventorysource', 'systemjobtemplate']]
ct_dict = ContentType.objects.get_for_models(*ujt_classes)
return [ct.id for ct in ct_dict.values()]
return [c for c in cls.__subclasses__() if permission_registry.is_registered(c)]

@classmethod
def accessible_pk_qs(cls, accessor, role_field):
Expand All @@ -215,8 +214,16 @@ def accessible_pk_qs(cls, accessor, role_field):

action = to_permissions[role_field]

# Special condition for super auditor
role_subclasses = cls._submodels_with_roles()
role_cts = ContentType.objects.get_for_models(*role_subclasses).values()
all_codenames = {f'{action}_{cls._meta.model_name}' for cls in role_subclasses}
if not (all_codenames - accessor.singleton_permissions()):
qs = cls.objects.filter(polymorphic_ctype__in=role_cts)
return qs.values_list('id', flat=True)

return (
RoleEvaluation.objects.filter(role__in=accessor.has_roles.all(), codename__startswith=action, content_type_id__in=cls._submodels_with_roles())
RoleEvaluation.objects.filter(role__in=accessor.has_roles.all(), codename__in=all_codenames, content_type_id__in=[ct.id for ct in role_cts])
.values_list('object_id')
.distinct()
)
Expand Down
5 changes: 5 additions & 0 deletions awx/main/tests/functional/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ def deploy_jobtemplate(project, inventory, credential):
return jt


@pytest.fixture()
def execution_environment():
return ExecutionEnvironment.objects.create(name="test-ee", description="test-ee", managed=True)


@pytest.fixture
def setup_managed_roles():
"Run the migration script to pre-create managed role definitions"
Expand Down
120 changes: 120 additions & 0 deletions awx/main/tests/functional/dab_rbac/test_external_auditor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import pytest

from django.apps import apps

from ansible_base.rbac.managed import SystemAuditor
from ansible_base.rbac import permission_registry

from awx.main.access import check_user_access, get_user_queryset
from awx.main.models import User, AdHocCommandEvent
from awx.api.versioning import reverse


@pytest.fixture
def ext_auditor_rd():
info = SystemAuditor(overrides={'name': 'Alien Auditor', 'shortname': 'ext_auditor'})
rd, _ = info.get_or_create(apps)
return rd


@pytest.fixture
def ext_auditor(ext_auditor_rd):
u = User.objects.create(username='external-auditor-user')
ext_auditor_rd.give_global_permission(u)
return u


@pytest.fixture
def obj_factory(request):
def _rf(fixture_name):
obj = request.getfixturevalue(fixture_name)

# special case to make obj organization-scoped
if obj._meta.model_name == 'executionenvironment':
obj.organization = request.getfixturevalue('organization')
obj.save(update_fields=['organization'])

return obj

return _rf


@pytest.mark.django_db
def test_access_qs_external_auditor(ext_auditor_rd, rando, job_template):
ext_auditor_rd.give_global_permission(rando)
jt_cls = apps.get_model('main', 'JobTemplate')
ujt_cls = apps.get_model('main', 'UnifiedJobTemplate')
assert job_template in jt_cls.access_qs(rando)
assert job_template.id in jt_cls.access_ids_qs(rando)
assert job_template.id in ujt_cls.accessible_pk_qs(rando, 'read_role')


@pytest.mark.django_db
@pytest.mark.parametrize('model', sorted(permission_registry.all_registered_models, key=lambda cls: cls._meta.model_name))
class TestExternalAuditorRoleAllModels:
def test_access_can_read_method(self, obj_factory, model, ext_auditor, rando):
fixture_name = model._meta.verbose_name.replace(' ', '_')
obj = obj_factory(fixture_name)

assert check_user_access(rando, model, 'read', obj) is False
assert check_user_access(ext_auditor, model, 'read', obj) is True

def test_access_get_queryset(self, obj_factory, model, ext_auditor, rando):
fixture_name = model._meta.verbose_name.replace(' ', '_')
obj = obj_factory(fixture_name)

assert obj not in get_user_queryset(rando, model)
assert obj in get_user_queryset(ext_auditor, model)

def test_global_list(self, obj_factory, model, ext_auditor, rando, get):
fixture_name = model._meta.verbose_name.replace(' ', '_')
obj_factory(fixture_name)

url = reverse(f'api:{fixture_name}_list')
r = get(url, user=rando, expect=200)
initial_ct = r.data['count']

r = get(url, user=ext_auditor, expect=200)
assert r.data['count'] == initial_ct + 1

if fixture_name in ('job_template', 'workflow_job_template'):
url = reverse('api:unified_job_template_list')
r = get(url, user=rando, expect=200)
initial_ct = r.data['count']

r = get(url, user=ext_auditor, expect=200)
assert r.data['count'] == initial_ct + 1

def test_detail_view(self, obj_factory, model, ext_auditor, rando, get):
fixture_name = model._meta.verbose_name.replace(' ', '_')
obj = obj_factory(fixture_name)

url = reverse(f'api:{fixture_name}_detail', kwargs={'pk': obj.pk})
get(url, user=rando, expect=403) # NOTE: should be 401
get(url, user=ext_auditor, expect=200)


@pytest.mark.django_db
class TestExternalAuditorNonRoleModels:
def test_ad_hoc_command_view(self, ad_hoc_command_factory, rando, ext_auditor, get):
"""The AdHocCommandAccess class references is_system_auditor

this is to prove it works with other system-level view roles"""
ad_hoc_command = ad_hoc_command_factory()
url = reverse('api:ad_hoc_command_list')
r = get(url, user=rando, expect=200)
assert r.data['count'] == 0
r = get(url, user=ext_auditor, expect=200)
assert r.data['count'] == 1
assert r.data['results'][0]['id'] == ad_hoc_command.id

event = AdHocCommandEvent.objects.create(ad_hoc_command=ad_hoc_command)
url = reverse('api:ad_hoc_command_ad_hoc_command_events_list', kwargs={'pk': ad_hoc_command.id})
r = get(url, user=rando, expect=403)
r = get(url, user=ext_auditor, expect=200)
assert r.data['count'] == 1

url = reverse('api:ad_hoc_command_event_detail', kwargs={'pk': event.id})
r = get(url, user=rando, expect=403)
r = get(url, user=ext_auditor, expect=200)
assert r.data['id'] == event.id
18 changes: 6 additions & 12 deletions awx/main/tests/functional/models/test_unified_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,19 @@
# CRUM
from crum import impersonate

# Django
from django.contrib.contenttypes.models import ContentType

# AWX
from awx.main.models import UnifiedJobTemplate, Job, JobTemplate, WorkflowJobTemplate, WorkflowApprovalTemplate, Project, WorkflowJob, Schedule, Credential
from awx.main.models import UnifiedJobTemplate, Job, JobTemplate, WorkflowJobTemplate, Project, WorkflowJob, Schedule, Credential
from awx.api.versioning import reverse
from awx.main.constants import JOB_VARIABLE_PREFIXES


@pytest.mark.django_db
def test_subclass_types():
assert set(UnifiedJobTemplate._submodels_with_roles()) == set(
[
ContentType.objects.get_for_model(JobTemplate).id,
ContentType.objects.get_for_model(Project).id,
ContentType.objects.get_for_model(WorkflowJobTemplate).id,
ContentType.objects.get_for_model(WorkflowApprovalTemplate).id,
]
)
assert set(UnifiedJobTemplate._submodels_with_roles()) == {
JobTemplate,
Project,
WorkflowJobTemplate,
}


@pytest.mark.django_db
Expand Down
Loading