diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 39bc56f43bfb..1cced0a5b472 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -31,6 +31,7 @@ from polymorphic.models import PolymorphicModel from ansible_base.lib.utils.models import prevent_search, get_type_for_model +from ansible_base.rbac import permission_registry # AWX from awx.main.models.base import CommonModelNameNotUnique, PasswordFieldsModel, NotificationFieldsModel @@ -197,9 +198,7 @@ def unique_error_message(self, model_class, unique_check): @classmethod def _submodels_with_roles(cls): - ujt_classes = [c for c in cls.__subclasses__() if c._meta.model_name not in ['inventorysource', 'systemjobtemplate']] - ct_dict = ContentType.objects.get_for_models(*ujt_classes) - return [ct.id for ct in ct_dict.values()] + return [c for c in cls.__subclasses__() if permission_registry.is_registered(c)] @classmethod def accessible_pk_qs(cls, accessor, role_field): @@ -215,8 +214,16 @@ def accessible_pk_qs(cls, accessor, role_field): action = to_permissions[role_field] + # Special condition for super auditor + role_subclasses = cls._submodels_with_roles() + role_cts = ContentType.objects.get_for_models(*role_subclasses).values() + all_codenames = {f'{action}_{cls._meta.model_name}' for cls in role_subclasses} + if not (all_codenames - accessor.singleton_permissions()): + qs = cls.objects.filter(polymorphic_ctype__in=role_cts) + return qs.values_list('id', flat=True) + return ( - RoleEvaluation.objects.filter(role__in=accessor.has_roles.all(), codename__startswith=action, content_type_id__in=cls._submodels_with_roles()) + RoleEvaluation.objects.filter(role__in=accessor.has_roles.all(), codename__in=all_codenames, content_type_id__in=[ct.id for ct in role_cts]) .values_list('object_id') .distinct() ) diff --git a/awx/main/tests/functional/conftest.py b/awx/main/tests/functional/conftest.py index b23a9a7d2f64..8a100b86d078 100644 --- a/awx/main/tests/functional/conftest.py +++ b/awx/main/tests/functional/conftest.py @@ -92,6 +92,11 @@ def deploy_jobtemplate(project, inventory, credential): return jt +@pytest.fixture() +def execution_environment(): + return ExecutionEnvironment.objects.create(name="test-ee", description="test-ee", managed=True) + + @pytest.fixture def setup_managed_roles(): "Run the migration script to pre-create managed role definitions" diff --git a/awx/main/tests/functional/dab_rbac/test_external_auditor.py b/awx/main/tests/functional/dab_rbac/test_external_auditor.py new file mode 100644 index 000000000000..c3602d736627 --- /dev/null +++ b/awx/main/tests/functional/dab_rbac/test_external_auditor.py @@ -0,0 +1,120 @@ +import pytest + +from django.apps import apps + +from ansible_base.rbac.managed import SystemAuditor +from ansible_base.rbac import permission_registry + +from awx.main.access import check_user_access, get_user_queryset +from awx.main.models import User, AdHocCommandEvent +from awx.api.versioning import reverse + + +@pytest.fixture +def ext_auditor_rd(): + info = SystemAuditor(overrides={'name': 'Alien Auditor', 'shortname': 'ext_auditor'}) + rd, _ = info.get_or_create(apps) + return rd + + +@pytest.fixture +def ext_auditor(ext_auditor_rd): + u = User.objects.create(username='external-auditor-user') + ext_auditor_rd.give_global_permission(u) + return u + + +@pytest.fixture +def obj_factory(request): + def _rf(fixture_name): + obj = request.getfixturevalue(fixture_name) + + # special case to make obj organization-scoped + if obj._meta.model_name == 'executionenvironment': + obj.organization = request.getfixturevalue('organization') + obj.save(update_fields=['organization']) + + return obj + + return _rf + + +@pytest.mark.django_db +def test_access_qs_external_auditor(ext_auditor_rd, rando, job_template): + ext_auditor_rd.give_global_permission(rando) + jt_cls = apps.get_model('main', 'JobTemplate') + ujt_cls = apps.get_model('main', 'UnifiedJobTemplate') + assert job_template in jt_cls.access_qs(rando) + assert job_template.id in jt_cls.access_ids_qs(rando) + assert job_template.id in ujt_cls.accessible_pk_qs(rando, 'read_role') + + +@pytest.mark.django_db +@pytest.mark.parametrize('model', sorted(permission_registry.all_registered_models, key=lambda cls: cls._meta.model_name)) +class TestExternalAuditorRoleAllModels: + def test_access_can_read_method(self, obj_factory, model, ext_auditor, rando): + fixture_name = model._meta.verbose_name.replace(' ', '_') + obj = obj_factory(fixture_name) + + assert check_user_access(rando, model, 'read', obj) is False + assert check_user_access(ext_auditor, model, 'read', obj) is True + + def test_access_get_queryset(self, obj_factory, model, ext_auditor, rando): + fixture_name = model._meta.verbose_name.replace(' ', '_') + obj = obj_factory(fixture_name) + + assert obj not in get_user_queryset(rando, model) + assert obj in get_user_queryset(ext_auditor, model) + + def test_global_list(self, obj_factory, model, ext_auditor, rando, get): + fixture_name = model._meta.verbose_name.replace(' ', '_') + obj_factory(fixture_name) + + url = reverse(f'api:{fixture_name}_list') + r = get(url, user=rando, expect=200) + initial_ct = r.data['count'] + + r = get(url, user=ext_auditor, expect=200) + assert r.data['count'] == initial_ct + 1 + + if fixture_name in ('job_template', 'workflow_job_template'): + url = reverse('api:unified_job_template_list') + r = get(url, user=rando, expect=200) + initial_ct = r.data['count'] + + r = get(url, user=ext_auditor, expect=200) + assert r.data['count'] == initial_ct + 1 + + def test_detail_view(self, obj_factory, model, ext_auditor, rando, get): + fixture_name = model._meta.verbose_name.replace(' ', '_') + obj = obj_factory(fixture_name) + + url = reverse(f'api:{fixture_name}_detail', kwargs={'pk': obj.pk}) + get(url, user=rando, expect=403) # NOTE: should be 401 + get(url, user=ext_auditor, expect=200) + + +@pytest.mark.django_db +class TestExternalAuditorNonRoleModels: + def test_ad_hoc_command_view(self, ad_hoc_command_factory, rando, ext_auditor, get): + """The AdHocCommandAccess class references is_system_auditor + + this is to prove it works with other system-level view roles""" + ad_hoc_command = ad_hoc_command_factory() + url = reverse('api:ad_hoc_command_list') + r = get(url, user=rando, expect=200) + assert r.data['count'] == 0 + r = get(url, user=ext_auditor, expect=200) + assert r.data['count'] == 1 + assert r.data['results'][0]['id'] == ad_hoc_command.id + + event = AdHocCommandEvent.objects.create(ad_hoc_command=ad_hoc_command) + url = reverse('api:ad_hoc_command_ad_hoc_command_events_list', kwargs={'pk': ad_hoc_command.id}) + r = get(url, user=rando, expect=403) + r = get(url, user=ext_auditor, expect=200) + assert r.data['count'] == 1 + + url = reverse('api:ad_hoc_command_event_detail', kwargs={'pk': event.id}) + r = get(url, user=rando, expect=403) + r = get(url, user=ext_auditor, expect=200) + assert r.data['id'] == event.id diff --git a/awx/main/tests/functional/models/test_unified_job.py b/awx/main/tests/functional/models/test_unified_job.py index 488cd2ac00f5..5e37fc985c2a 100644 --- a/awx/main/tests/functional/models/test_unified_job.py +++ b/awx/main/tests/functional/models/test_unified_job.py @@ -4,25 +4,19 @@ # CRUM from crum import impersonate -# Django -from django.contrib.contenttypes.models import ContentType - # AWX -from awx.main.models import UnifiedJobTemplate, Job, JobTemplate, WorkflowJobTemplate, WorkflowApprovalTemplate, Project, WorkflowJob, Schedule, Credential +from awx.main.models import UnifiedJobTemplate, Job, JobTemplate, WorkflowJobTemplate, Project, WorkflowJob, Schedule, Credential from awx.api.versioning import reverse from awx.main.constants import JOB_VARIABLE_PREFIXES @pytest.mark.django_db def test_subclass_types(): - assert set(UnifiedJobTemplate._submodels_with_roles()) == set( - [ - ContentType.objects.get_for_model(JobTemplate).id, - ContentType.objects.get_for_model(Project).id, - ContentType.objects.get_for_model(WorkflowJobTemplate).id, - ContentType.objects.get_for_model(WorkflowApprovalTemplate).id, - ] - ) + assert set(UnifiedJobTemplate._submodels_with_roles()) == { + JobTemplate, + Project, + WorkflowJobTemplate, + } @pytest.mark.django_db