diff --git a/tests/api2/test_audit_audit.py b/tests/api2/test_audit_audit.py index 9447d5f8bf40d..db9f80aa6a93d 100644 --- a/tests/api2/test_audit_audit.py +++ b/tests/api2/test_audit_audit.py @@ -1,18 +1,15 @@ import os -import sys -import operator -import pytest import requests import time +import operator +import pytest + from middlewared.service_exception import ValidationErrors from middlewared.test.integration.utils import call, url from middlewared.test.integration.utils.audit import expect_audit_log, expect_audit_method_calls from unittest.mock import ANY -sys.path.append(os.getcwd()) -from functions import POST, PUT - # ===================================================================== # Fixtures and utilities @@ -31,66 +28,51 @@ def report_exists(request): ({'retention': 20}, True), ({'retention': 0}, False) ]) -@pytest.mark.parametrize('api', ['ws', 'rest']) -def test_audit_config_audit(api, payload, success): +def test_audit_config_audit(payload, success): ''' Test the auditing of Audit configuration changes ''' initial_audit_config = call('audit.config') - protocol = 'WEBSOCKET' if api == 'ws' else 'REST' rest_operator = operator.eq if success else operator.ne expected_log_template = { - "service_data": { - "vers": { - "major": 0, - "minor": 1, + 'service_data': { + 'vers': { + 'major': 0, + 'minor': 1, }, - "origin": ANY, - "protocol": protocol, - "credentials": { - "credentials": "LOGIN_PASSWORD", - "credentials_data": {"username": "root"}, + 'origin': ANY, + 'protocol': 'WEBSOCKET', + 'credentials': { + 'credentials': 'LOGIN_PASSWORD', + 'credentials_data': {'username': 'root'}, }, }, - "event": "METHOD_CALL", - "event_data": { - "authenticated": True, - "authorized": True, - "method": "audit.update", - "params": [payload], - "description": "Update Audit Configuration", + 'event': 'METHOD_CALL', + 'event_data': { + 'authenticated': True, + 'authorized': True, + 'method': 'audit.update', + 'params': [payload], + 'description': 'Update Audit Configuration', }, - "success": success + 'success': success } try: with expect_audit_log([expected_log_template]): - if api == 'ws': - if success: - call('audit.update', payload) - else: - with pytest.raises(ValidationErrors): - call('audit.update', payload) - elif api == 'rest': - result = PUT('/audit/', payload) - assert rest_operator(result.status_code, 200), result.text + if success: + call('audit.update', payload) else: - raise ValueError(api) + with pytest.raises(ValidationErrors): + call('audit.update', payload) finally: # Restore initial state restore_payload = { 'retention': initial_audit_config['retention'], } - if api == 'ws': - call('audit.update', restore_payload) - elif api == 'rest': - result = PUT('/audit/', restore_payload) - assert result.status_code == 200, result.text - else: - raise ValueError(api) + call('audit.update', restore_payload) -@pytest.mark.parametrize('api', ['ws', 'rest']) -def test_audit_export_audit(request, api): +def test_audit_export_audit(request): ''' Test the auditing of the audit export function ''' @@ -102,31 +84,24 @@ def test_audit_export_audit(request, api): 'params': [payload], 'description': 'Export Audit Data', }]): - if api == 'ws': - report_pathname = call('audit.export', payload, job=True) - request.config.cache.set('report_pathname', report_pathname) - elif api == 'rest': - results = POST("/audit/export/", payload) - assert results.status_code == 200, results.text - else: - raise ValueError(api) + report_pathname = call('audit.export', payload, job=True) + request.config.cache.set('report_pathname', report_pathname) class TestAuditDownload: - """ + ''' Wrap these tests in a class for the 'report_exists' fixture - """ - @pytest.mark.parametrize('api', ['ws', 'rest']) - def test_audit_download_audit(self, report_exists, api): + ''' + def test_audit_download_audit(self, report_exists): ''' Test the auditing of the audit download function ''' report_pathname = report_exists st = call('filesystem.stat', report_pathname) - init_audit_query = call("audit.query", { - "query-filters": [["event_data.method", "=", "audit.download_report"]], - "query-options": {"select": ["event_data", "success"]} + init_audit_query = call('audit.query', { + 'query-filters': [['event_data.method', '=', 'audit.download_report']], + 'query-options': {'select': ['event_data', 'success']} }) init_len = len(init_audit_query) @@ -134,22 +109,16 @@ def test_audit_download_audit(self, report_exists, api): payload = { 'report_name': report_name } - if api == 'ws': - job_id, download_data = call( - 'core.download', 'audit.download_report', [payload], 'report.csv' - ) - r = requests.get(f"{url()}{download_data}") - r.raise_for_status() - assert len(r.content) == st['size'] - elif api == 'rest': - results = POST("/audit/download_report/", payload) - assert results.status_code == 200, results.text - else: - raise ValueError(api) - - post_audit_query = call("audit.query", { - "query-filters": [["event_data.method", "=", "audit.download_report"]], - "query-options": {"select": ["event_data", "success"]} + job_id, download_data = call( + 'core.download', 'audit.download_report', [payload], 'report.csv' + ) + r = requests.get(f'{url()}{download_data}') + r.raise_for_status() + assert len(r.content) == st['size'] + + post_audit_query = call('audit.query', { + 'query-filters': [['event_data.method', '=', 'audit.download_report']], + 'query-options': {'select': ['event_data', 'success']} }) post_len = len(post_audit_query) @@ -158,13 +127,13 @@ def test_audit_download_audit(self, report_exists, api): while count_down > 0 and post_len == init_len: time.sleep(1) count_down -= 1 - post_audit_query = call("audit.query", { - "query-filters": [["event_data.method", "=", "audit.download_report"]], - "query-options": {"select": ["event_data", "success"]} + post_audit_query = call('audit.query', { + 'query-filters': [['event_data.method', '=', 'audit.download_report']], + 'query-options': {'select': ['event_data', 'success']} }) post_len = len(post_audit_query) - assert count_down > 0, "Timed out waiting for the audit entry" + assert count_down > 0, 'Timed out waiting for the audit entry' assert post_len > init_len # Confirm this download is recorded