From 6656e6c03cf97d846148eafa2b01af4a207cf109 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Thu, 20 Jun 2024 14:07:35 -0400 Subject: [PATCH 1/7] Add tests for external auditor --- awx/main/tests/functional/conftest.py | 5 ++ .../dab_rbac/test_external_auditor.py | 73 +++++++++++++++++++ 2 files changed, 78 insertions(+) create mode 100644 awx/main/tests/functional/dab_rbac/test_external_auditor.py diff --git a/awx/main/tests/functional/conftest.py b/awx/main/tests/functional/conftest.py index b23a9a7d2f64..8a100b86d078 100644 --- a/awx/main/tests/functional/conftest.py +++ b/awx/main/tests/functional/conftest.py @@ -92,6 +92,11 @@ def deploy_jobtemplate(project, inventory, credential): return jt +@pytest.fixture() +def execution_environment(): + return ExecutionEnvironment.objects.create(name="test-ee", description="test-ee", managed=True) + + @pytest.fixture def setup_managed_roles(): "Run the migration script to pre-create managed role definitions" diff --git a/awx/main/tests/functional/dab_rbac/test_external_auditor.py b/awx/main/tests/functional/dab_rbac/test_external_auditor.py new file mode 100644 index 000000000000..9a8270c9604c --- /dev/null +++ b/awx/main/tests/functional/dab_rbac/test_external_auditor.py @@ -0,0 +1,73 @@ +import pytest + +from django.apps import apps + +from ansible_base.rbac.managed import SystemAuditor +from ansible_base.rbac import permission_registry + +from awx.main.access import check_user_access, get_user_queryset +from awx.api.versioning import reverse + + +@pytest.fixture +def ext_auditor_rd(): + info = SystemAuditor(overrides={'name': 'Alien Auditor', 'shortname': 'ext_auditor'}) + rd, _ = info.get_or_create(apps) + return rd + + +@pytest.fixture +def obj_factory(request): + def _rf(fixture_name): + obj = request.getfixturevalue(fixture_name) + + # special case to make obj organization-scoped + if obj._meta.model_name == 'executionenvironment': + obj.organization = request.getfixturevalue('organization') + obj.save(update_fields=['organization']) + + return obj + + return _rf + + +@pytest.mark.django_db +@pytest.mark.parametrize('model', permission_registry.all_registered_models) +class TestExternalAuditorRole: + def test_access_can_read_method(self, obj_factory, model, ext_auditor_rd, rando): + fixture_name = model._meta.verbose_name.replace(' ', '_') + obj = obj_factory(fixture_name) + + assert check_user_access(rando, model, 'read', obj) is False + ext_auditor_rd.give_global_permission(rando) + assert check_user_access(rando, model, 'read', obj) is True + + def test_access_get_queryset(self, obj_factory, model, ext_auditor_rd, rando): + fixture_name = model._meta.verbose_name.replace(' ', '_') + obj = obj_factory(fixture_name) + + assert obj not in get_user_queryset(rando, model) + ext_auditor_rd.give_global_permission(rando) + assert obj in get_user_queryset(rando, model) + + def test_global_list(self, obj_factory, model, ext_auditor_rd, rando, get): + fixture_name = model._meta.verbose_name.replace(' ', '_') + obj_factory(fixture_name) + + url = reverse(f'api:{fixture_name}_list') + r = get(url, user=rando, expect=200) + initial_ct = r.data['count'] + + ext_auditor_rd.give_global_permission(rando) + r = get(url, user=rando, expect=200) + assert r.data['count'] == initial_ct + 1 + + def test_detail_view(self, obj_factory, model, ext_auditor_rd, rando, get): + fixture_name = model._meta.verbose_name.replace(' ', '_') + obj = obj_factory(fixture_name) + + url = reverse(f'api:{fixture_name}_detail', kwargs={'pk': obj.pk}) + r = get(url, user=rando, expect=403) # TODO: should be 401 + + ext_auditor_rd.give_global_permission(rando) + r = get(url, user=rando, expect=200) From fe593263587bb5fc4f379675cd4887f5fc3a4898 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Thu, 20 Jun 2024 14:08:32 -0400 Subject: [PATCH 2/7] Add TODO --- awx/main/tests/functional/dab_rbac/test_external_auditor.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/awx/main/tests/functional/dab_rbac/test_external_auditor.py b/awx/main/tests/functional/dab_rbac/test_external_auditor.py index 9a8270c9604c..d598c226eec2 100644 --- a/awx/main/tests/functional/dab_rbac/test_external_auditor.py +++ b/awx/main/tests/functional/dab_rbac/test_external_auditor.py @@ -71,3 +71,6 @@ def test_detail_view(self, obj_factory, model, ext_auditor_rd, rando, get): ext_auditor_rd.give_global_permission(rando) r = get(url, user=rando, expect=200) + + +# TODO: test non-RBAC models, jobs, ad hoc commands, etc. From e5ce4d40751a63b9b93a7aa2f64287a9c29298b6 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Mon, 24 Jun 2024 21:00:37 -0400 Subject: [PATCH 3/7] Add assertion for unified JTs which fails --- awx/main/tests/functional/dab_rbac/test_external_auditor.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/awx/main/tests/functional/dab_rbac/test_external_auditor.py b/awx/main/tests/functional/dab_rbac/test_external_auditor.py index d598c226eec2..391de7cd0172 100644 --- a/awx/main/tests/functional/dab_rbac/test_external_auditor.py +++ b/awx/main/tests/functional/dab_rbac/test_external_auditor.py @@ -62,6 +62,11 @@ def test_global_list(self, obj_factory, model, ext_auditor_rd, rando, get): r = get(url, user=rando, expect=200) assert r.data['count'] == initial_ct + 1 + if fixture_name in ('job_template', 'workflow_job_template'): + url = reverse(f'api:unified_job_template_list') + r = get(url, user=rando, expect=200) + assert r.data['count'] == initial_ct + 1 + def test_detail_view(self, obj_factory, model, ext_auditor_rd, rando, get): fixture_name = model._meta.verbose_name.replace(' ', '_') obj = obj_factory(fixture_name) From f952141a44249a16c6b5d8b8eefe9146a6053a4f Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Mon, 24 Jun 2024 21:07:31 -0400 Subject: [PATCH 4/7] Fix test parameter ordering issue --- awx/main/tests/functional/dab_rbac/test_external_auditor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awx/main/tests/functional/dab_rbac/test_external_auditor.py b/awx/main/tests/functional/dab_rbac/test_external_auditor.py index 391de7cd0172..d354a37b5e52 100644 --- a/awx/main/tests/functional/dab_rbac/test_external_auditor.py +++ b/awx/main/tests/functional/dab_rbac/test_external_auditor.py @@ -32,7 +32,7 @@ def _rf(fixture_name): @pytest.mark.django_db -@pytest.mark.parametrize('model', permission_registry.all_registered_models) +@pytest.mark.parametrize('model', sorted(permission_registry.all_registered_models, key=lambda cls: cls._meta.model_name)) class TestExternalAuditorRole: def test_access_can_read_method(self, obj_factory, model, ext_auditor_rd, rando): fixture_name = model._meta.verbose_name.replace(' ', '_') From 9bb02f407e345185dbf1afaddac9b7f31e44ce3e Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Mon, 24 Jun 2024 22:10:13 -0400 Subject: [PATCH 5/7] Fix UJT listing bug --- awx/main/models/unified_jobs.py | 7 +++++++ .../functional/dab_rbac/test_external_auditor.py | 12 +++++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 39bc56f43bfb..f2734e9b6911 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -215,6 +215,13 @@ def accessible_pk_qs(cls, accessor, role_field): action = to_permissions[role_field] + # Special condition for super auditor and other future super permissions + role_subclasses = [cls for cls in cls.__subclasses__() if hasattr(cls, 'read_role')] + all_codenames = {f'{action}_{cls._meta.model_name}' for cls in role_subclasses} + if not (all_codenames - accessor.singleton_permissions()): + qs = cls.objects.filter(polymorphic_ctype__in=ContentType.objects.get_for_models(*role_subclasses).values()) + return qs.values_list('id', flat=True) + return ( RoleEvaluation.objects.filter(role__in=accessor.has_roles.all(), codename__startswith=action, content_type_id__in=cls._submodels_with_roles()) .values_list('object_id') diff --git a/awx/main/tests/functional/dab_rbac/test_external_auditor.py b/awx/main/tests/functional/dab_rbac/test_external_auditor.py index d354a37b5e52..3304803d3ed3 100644 --- a/awx/main/tests/functional/dab_rbac/test_external_auditor.py +++ b/awx/main/tests/functional/dab_rbac/test_external_auditor.py @@ -31,9 +31,19 @@ def _rf(fixture_name): return _rf +@pytest.mark.django_db +def test_access_qs_external_auditor(ext_auditor_rd, rando, job_template): + ext_auditor_rd.give_global_permission(rando) + jt_cls = apps.get_model('main', 'JobTemplate') + ujt_cls = apps.get_model('main', 'UnifiedJobTemplate') + assert job_template in jt_cls.access_qs(rando) + assert job_template.id in jt_cls.access_ids_qs(rando) + assert job_template.id in ujt_cls.accessible_pk_qs(rando, 'read_role') + + @pytest.mark.django_db @pytest.mark.parametrize('model', sorted(permission_registry.all_registered_models, key=lambda cls: cls._meta.model_name)) -class TestExternalAuditorRole: +class TestExternalAuditorRoleAllModels: def test_access_can_read_method(self, obj_factory, model, ext_auditor_rd, rando): fixture_name = model._meta.verbose_name.replace(' ', '_') obj = obj_factory(fixture_name) From fd4e7b20eb260849159540d55112cb8dad003a47 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Tue, 25 Jun 2024 09:25:40 -0400 Subject: [PATCH 6/7] Method syntax editing and linters --- awx/main/models/unified_jobs.py | 14 +++++++------- .../dab_rbac/test_external_auditor.py | 6 +++--- .../functional/models/test_unified_job.py | 18 ++++++------------ 3 files changed, 16 insertions(+), 22 deletions(-) diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index f2734e9b6911..1cced0a5b472 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -31,6 +31,7 @@ from polymorphic.models import PolymorphicModel from ansible_base.lib.utils.models import prevent_search, get_type_for_model +from ansible_base.rbac import permission_registry # AWX from awx.main.models.base import CommonModelNameNotUnique, PasswordFieldsModel, NotificationFieldsModel @@ -197,9 +198,7 @@ def unique_error_message(self, model_class, unique_check): @classmethod def _submodels_with_roles(cls): - ujt_classes = [c for c in cls.__subclasses__() if c._meta.model_name not in ['inventorysource', 'systemjobtemplate']] - ct_dict = ContentType.objects.get_for_models(*ujt_classes) - return [ct.id for ct in ct_dict.values()] + return [c for c in cls.__subclasses__() if permission_registry.is_registered(c)] @classmethod def accessible_pk_qs(cls, accessor, role_field): @@ -215,15 +214,16 @@ def accessible_pk_qs(cls, accessor, role_field): action = to_permissions[role_field] - # Special condition for super auditor and other future super permissions - role_subclasses = [cls for cls in cls.__subclasses__() if hasattr(cls, 'read_role')] + # Special condition for super auditor + role_subclasses = cls._submodels_with_roles() + role_cts = ContentType.objects.get_for_models(*role_subclasses).values() all_codenames = {f'{action}_{cls._meta.model_name}' for cls in role_subclasses} if not (all_codenames - accessor.singleton_permissions()): - qs = cls.objects.filter(polymorphic_ctype__in=ContentType.objects.get_for_models(*role_subclasses).values()) + qs = cls.objects.filter(polymorphic_ctype__in=role_cts) return qs.values_list('id', flat=True) return ( - RoleEvaluation.objects.filter(role__in=accessor.has_roles.all(), codename__startswith=action, content_type_id__in=cls._submodels_with_roles()) + RoleEvaluation.objects.filter(role__in=accessor.has_roles.all(), codename__in=all_codenames, content_type_id__in=[ct.id for ct in role_cts]) .values_list('object_id') .distinct() ) diff --git a/awx/main/tests/functional/dab_rbac/test_external_auditor.py b/awx/main/tests/functional/dab_rbac/test_external_auditor.py index 3304803d3ed3..8a439862a637 100644 --- a/awx/main/tests/functional/dab_rbac/test_external_auditor.py +++ b/awx/main/tests/functional/dab_rbac/test_external_auditor.py @@ -73,7 +73,7 @@ def test_global_list(self, obj_factory, model, ext_auditor_rd, rando, get): assert r.data['count'] == initial_ct + 1 if fixture_name in ('job_template', 'workflow_job_template'): - url = reverse(f'api:unified_job_template_list') + url = reverse('api:unified_job_template_list') r = get(url, user=rando, expect=200) assert r.data['count'] == initial_ct + 1 @@ -82,10 +82,10 @@ def test_detail_view(self, obj_factory, model, ext_auditor_rd, rando, get): obj = obj_factory(fixture_name) url = reverse(f'api:{fixture_name}_detail', kwargs={'pk': obj.pk}) - r = get(url, user=rando, expect=403) # TODO: should be 401 + get(url, user=rando, expect=403) # TODO: should be 401 ext_auditor_rd.give_global_permission(rando) - r = get(url, user=rando, expect=200) + get(url, user=rando, expect=200) # TODO: test non-RBAC models, jobs, ad hoc commands, etc. diff --git a/awx/main/tests/functional/models/test_unified_job.py b/awx/main/tests/functional/models/test_unified_job.py index 488cd2ac00f5..5e37fc985c2a 100644 --- a/awx/main/tests/functional/models/test_unified_job.py +++ b/awx/main/tests/functional/models/test_unified_job.py @@ -4,25 +4,19 @@ # CRUM from crum import impersonate -# Django -from django.contrib.contenttypes.models import ContentType - # AWX -from awx.main.models import UnifiedJobTemplate, Job, JobTemplate, WorkflowJobTemplate, WorkflowApprovalTemplate, Project, WorkflowJob, Schedule, Credential +from awx.main.models import UnifiedJobTemplate, Job, JobTemplate, WorkflowJobTemplate, Project, WorkflowJob, Schedule, Credential from awx.api.versioning import reverse from awx.main.constants import JOB_VARIABLE_PREFIXES @pytest.mark.django_db def test_subclass_types(): - assert set(UnifiedJobTemplate._submodels_with_roles()) == set( - [ - ContentType.objects.get_for_model(JobTemplate).id, - ContentType.objects.get_for_model(Project).id, - ContentType.objects.get_for_model(WorkflowJobTemplate).id, - ContentType.objects.get_for_model(WorkflowApprovalTemplate).id, - ] - ) + assert set(UnifiedJobTemplate._submodels_with_roles()) == { + JobTemplate, + Project, + WorkflowJobTemplate, + } @pytest.mark.django_db From 2b3023f5d036dd6ee796f91f931cc54f6fc57c14 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Tue, 25 Jun 2024 09:40:46 -0400 Subject: [PATCH 7/7] Add test for ad hoc commands just to be sure --- .../dab_rbac/test_external_auditor.py | 57 ++++++++++++++----- 1 file changed, 43 insertions(+), 14 deletions(-) diff --git a/awx/main/tests/functional/dab_rbac/test_external_auditor.py b/awx/main/tests/functional/dab_rbac/test_external_auditor.py index 8a439862a637..c3602d736627 100644 --- a/awx/main/tests/functional/dab_rbac/test_external_auditor.py +++ b/awx/main/tests/functional/dab_rbac/test_external_auditor.py @@ -6,6 +6,7 @@ from ansible_base.rbac import permission_registry from awx.main.access import check_user_access, get_user_queryset +from awx.main.models import User, AdHocCommandEvent from awx.api.versioning import reverse @@ -16,6 +17,13 @@ def ext_auditor_rd(): return rd +@pytest.fixture +def ext_auditor(ext_auditor_rd): + u = User.objects.create(username='external-auditor-user') + ext_auditor_rd.give_global_permission(u) + return u + + @pytest.fixture def obj_factory(request): def _rf(fixture_name): @@ -44,23 +52,21 @@ def test_access_qs_external_auditor(ext_auditor_rd, rando, job_template): @pytest.mark.django_db @pytest.mark.parametrize('model', sorted(permission_registry.all_registered_models, key=lambda cls: cls._meta.model_name)) class TestExternalAuditorRoleAllModels: - def test_access_can_read_method(self, obj_factory, model, ext_auditor_rd, rando): + def test_access_can_read_method(self, obj_factory, model, ext_auditor, rando): fixture_name = model._meta.verbose_name.replace(' ', '_') obj = obj_factory(fixture_name) assert check_user_access(rando, model, 'read', obj) is False - ext_auditor_rd.give_global_permission(rando) - assert check_user_access(rando, model, 'read', obj) is True + assert check_user_access(ext_auditor, model, 'read', obj) is True - def test_access_get_queryset(self, obj_factory, model, ext_auditor_rd, rando): + def test_access_get_queryset(self, obj_factory, model, ext_auditor, rando): fixture_name = model._meta.verbose_name.replace(' ', '_') obj = obj_factory(fixture_name) assert obj not in get_user_queryset(rando, model) - ext_auditor_rd.give_global_permission(rando) - assert obj in get_user_queryset(rando, model) + assert obj in get_user_queryset(ext_auditor, model) - def test_global_list(self, obj_factory, model, ext_auditor_rd, rando, get): + def test_global_list(self, obj_factory, model, ext_auditor, rando, get): fixture_name = model._meta.verbose_name.replace(' ', '_') obj_factory(fixture_name) @@ -68,24 +74,47 @@ def test_global_list(self, obj_factory, model, ext_auditor_rd, rando, get): r = get(url, user=rando, expect=200) initial_ct = r.data['count'] - ext_auditor_rd.give_global_permission(rando) - r = get(url, user=rando, expect=200) + r = get(url, user=ext_auditor, expect=200) assert r.data['count'] == initial_ct + 1 if fixture_name in ('job_template', 'workflow_job_template'): url = reverse('api:unified_job_template_list') r = get(url, user=rando, expect=200) + initial_ct = r.data['count'] + + r = get(url, user=ext_auditor, expect=200) assert r.data['count'] == initial_ct + 1 - def test_detail_view(self, obj_factory, model, ext_auditor_rd, rando, get): + def test_detail_view(self, obj_factory, model, ext_auditor, rando, get): fixture_name = model._meta.verbose_name.replace(' ', '_') obj = obj_factory(fixture_name) url = reverse(f'api:{fixture_name}_detail', kwargs={'pk': obj.pk}) - get(url, user=rando, expect=403) # TODO: should be 401 + get(url, user=rando, expect=403) # NOTE: should be 401 + get(url, user=ext_auditor, expect=200) - ext_auditor_rd.give_global_permission(rando) - get(url, user=rando, expect=200) +@pytest.mark.django_db +class TestExternalAuditorNonRoleModels: + def test_ad_hoc_command_view(self, ad_hoc_command_factory, rando, ext_auditor, get): + """The AdHocCommandAccess class references is_system_auditor -# TODO: test non-RBAC models, jobs, ad hoc commands, etc. + this is to prove it works with other system-level view roles""" + ad_hoc_command = ad_hoc_command_factory() + url = reverse('api:ad_hoc_command_list') + r = get(url, user=rando, expect=200) + assert r.data['count'] == 0 + r = get(url, user=ext_auditor, expect=200) + assert r.data['count'] == 1 + assert r.data['results'][0]['id'] == ad_hoc_command.id + + event = AdHocCommandEvent.objects.create(ad_hoc_command=ad_hoc_command) + url = reverse('api:ad_hoc_command_ad_hoc_command_events_list', kwargs={'pk': ad_hoc_command.id}) + r = get(url, user=rando, expect=403) + r = get(url, user=ext_auditor, expect=200) + assert r.data['count'] == 1 + + url = reverse('api:ad_hoc_command_event_detail', kwargs={'pk': event.id}) + r = get(url, user=rando, expect=403) + r = get(url, user=ext_auditor, expect=200) + assert r.data['id'] == event.id