diff --git a/app/chip_tool/chip_tool.py b/app/chip_tool/chip_tool.py index 8a96ff2e..ff88efb7 100644 --- a/app/chip_tool/chip_tool.py +++ b/app/chip_tool/chip_tool.py @@ -267,6 +267,7 @@ async def start_chip_server( prefix = CHIP_APP_EXE command = ["--interactive", "--port 9002"] elif test_type == ChipToolTestType.PYTHON_TEST: + # TODO - To be implemented pass else: raise ChipToolUnknownTestType(f"Unsupported Test Type: {test_type}") diff --git a/app/chip_tool/test_case.py b/app/chip_tool/test_case.py index d1881c11..db93970e 100644 --- a/app/chip_tool/test_case.py +++ b/app/chip_tool/test_case.py @@ -108,7 +108,7 @@ def test_start(self, filename: str, name: str, count: int) -> None: # since there is step execute outside runner context self.next_step() - def test_stop(self, duration: int) -> None: + def test_stop(self, exception: Exception, duration: int) -> None: self.current_test_step.mark_as_completed() def step_skipped(self, name: str, expression: str) -> None: diff --git a/app/tests/python_tests/test_python_folder.py b/app/tests/python_tests/test_python_folder.py index 8b113460..db690688 100644 --- a/app/tests/python_tests/test_python_folder.py +++ b/app/tests/python_tests/test_python_folder.py @@ -50,7 +50,7 @@ def test_python_folder_version_missing() -> None: def test_python_folder_filename_pattern() -> None: """Test PythonTestFolder will search for files with filename pattern.""" with mock.patch.object(target=Path, attribute="glob") as path_glob: - # Default file_name_patter: * + # Default file_name_pattern: * python_folder = PythonTestFolder(test_python_path) _ = python_folder.python_file_paths() path_glob.assert_called_once_with("*.py") diff --git a/app/tests/python_tests/test_python_test_case.py b/app/tests/python_tests/test_python_test_case.py index 1cc88f4a..0a3cbb2f 100644 --- a/app/tests/python_tests/test_python_test_case.py +++ b/app/tests/python_tests/test_python_test_case.py @@ -210,17 +210,7 @@ async def test_python_version_logging() -> None: logger_info.assert_any_call(f"Python Test Version: {test_python_version}") -def test_default_first_steps_for_python_chip_tool_test_case() -> None: - test = python_test_instance(type=PythonTestType.AUTOMATED, steps=[]) - case_class: Type[PythonTestCase] = PythonTestCase.class_factory( - test=test, python_test_version="version" - ) - instance = case_class(TestCaseExecution()) - assert len(instance.test_steps) == 1 - assert instance.test_steps[0].name == "Start Python test" - - -def test_normal_steps_for_non_manual_tests() -> None: +def test_normal_steps_for_python_tests() -> None: """Test that non-manual tests include enabled steps.""" for type in list(PythonTestType): test_step = PythonTestStep(label="Step1") @@ -234,7 +224,7 @@ def test_normal_steps_for_non_manual_tests() -> None: assert any(s.name == test_step.label for s in instance.test_steps) -def test_multiple_steps_for_non_manual() -> None: +def test_multiple_steps_for_python_tests() -> None: """Test that non-manual tests multiple enabled steps are all included.""" for type in list(PythonTestType): test_step = PythonTestStep(label="StepN") diff --git a/app/tests/yaml_tests/test_sdk_yaml_collection.py b/app/tests/yaml_tests/test_sdk_yaml_collection.py index b2c892bd..8319bbc9 100644 --- a/app/tests/yaml_tests/test_sdk_yaml_collection.py +++ b/app/tests/yaml_tests/test_sdk_yaml_collection.py @@ -53,8 +53,8 @@ def test_sdk_yaml_collection( assert len(yaml_collection.test_suites.keys()) == 3 # test version number - test_sdk_yaml_path = "/app/backend/app/tests/yaml_tests/test_yamls/.version" - with open(test_sdk_yaml_path, "r") as version_file: + test_sdk_yaml_version_path = "/app/backend/app/tests/yaml_tests/test_yamls/.version" + with open(test_sdk_yaml_version_path, "r") as version_file: assert yaml_collection.yaml_version == version_file.read().rstrip() diff --git a/test_collections/sdk_tests/support/python_testing/models/python_test_folder.py b/test_collections/sdk_tests/support/python_testing/models/python_test_folder.py index a7ec4ca4..92410538 100644 --- a/test_collections/sdk_tests/support/python_testing/models/python_test_folder.py +++ b/test_collections/sdk_tests/support/python_testing/models/python_test_folder.py @@ -15,9 +15,10 @@ # from pathlib import Path +from test_collections.sdk_tests.support.paths import SDK_CHECKOUT_PATH + UNKNOWN_version = "Unknown" VERSION_FILE_FILENAME = ".version" -VERSION_FILE_PATH = Path("/app/backend/test_collections/sdk_tests/sdk_checkout/") class PythonTestFolder: @@ -34,7 +35,7 @@ def __init__(self, path: Path, filename_pattern: str = "*") -> None: def __version(self) -> str: """Read version string from .version file in /app/backend/test_collections/sdk_tests/sdk_checkout path.""" - version_file_path = VERSION_FILE_PATH / VERSION_FILE_FILENAME + version_file_path = SDK_CHECKOUT_PATH / VERSION_FILE_FILENAME if not version_file_path.exists(): return UNKNOWN_version diff --git a/test_collections/sdk_tests/support/python_testing/models/python_test_parser.py b/test_collections/sdk_tests/support/python_testing/models/python_test_parser.py index 557fd723..e5f6d9b9 100644 --- a/test_collections/sdk_tests/support/python_testing/models/python_test_parser.py +++ b/test_collections/sdk_tests/support/python_testing/models/python_test_parser.py @@ -28,19 +28,33 @@ class PythonParserException(Exception): """Raised when an error occurs during the parser of python file.""" +class PythonTestInfo: + """This class stores all the information from a python test case that came from + python test script file.""" + + def __init__( + self, + desc: str, + pics: list, + config: dict, + steps: list[PythonTestStep], + type: PythonTestType, + ) -> None: + self.desc = desc + self.pics = pics + self.config = config + self.steps = steps + self.type = type + + def parse_python_test(path: Path) -> PythonTest: """Parse a single Python test file into PythonTest model. This will also annotate parsed python test with it's path and test type. """ - tc_steps: list[PythonTestStep] = [] - # Currently PICS and config is not configured in Python Testing - tc_pics: list = [] - tc_config: dict = {} + tc_info = __extract_tcs_info(path) - tc_desc, tc_steps = __extract_tcs_info(path) - - if not tc_desc or not tc_steps: + if not tc_info.desc or not tc_info.steps: # The file name from path tc_name = path.name.split(".")[0] raise PythonParserException( @@ -48,14 +62,20 @@ def parse_python_test(path: Path) -> PythonTest: f"or steps_{tc_name}" ) - test = PythonTest(name=tc_desc, steps=tc_steps, config=tc_config, PICS=tc_pics) + test = PythonTest( + name=tc_info.desc, steps=tc_info.steps, config=tc_info.config, PICS=tc_info.pics + ) test.path = path - test.type = PythonTestType.AUTOMATED + test.type = tc_info.type return test -def __extract_tcs_info(path: Path) -> Tuple[str, List[PythonTestStep]]: +def __extract_tcs_info(path: Path) -> PythonTestInfo: + # Currently PICS and config is not configured in Python Testing + tc_pics: list = [] + tc_config: dict = {} + with open(path, "r") as python_file: parsed_python_file = ast.parse(python_file.read()) classes = [c for c in parsed_python_file.body if isinstance(c, ast.ClassDef)] @@ -72,7 +92,13 @@ def __extract_tcs_info(path: Path) -> Tuple[str, List[PythonTestStep]]: elif "steps_" in method.name: tc_steps = __retrieve_steps(method) - return tc_desc, tc_steps + return PythonTestInfo( + desc=tc_desc, + pics=tc_pics, + config=tc_config, + steps=tc_steps, + type=PythonTestType.AUTOMATED, + ) def __retrieve_steps(method: ast.FunctionDef) -> List[PythonTestStep]: diff --git a/test_collections/sdk_tests/support/python_testing/models/python_testing_hooks_proxy.py b/test_collections/sdk_tests/support/python_testing/models/python_testing_hooks_proxy.py new file mode 100644 index 00000000..2ea3cbbb --- /dev/null +++ b/test_collections/sdk_tests/support/python_testing/models/python_testing_hooks_proxy.py @@ -0,0 +1,95 @@ +# +# Copyright (c) 2023 Project CHIP Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from queue import Empty, Queue +from typing import Any, Union + +from matter_yamltests.hooks import TestRunnerHooks + + +class SDKPythonTestRunnerHooks(TestRunnerHooks): + is_finished = False + results: Queue + + def updates_test(self) -> Union[dict, None]: + try: + result = self.results.get(block=False) + return result + except Empty: + return None + + def finished(self) -> bool: + return SDKPythonTestRunnerHooks.is_finished + + def __init__(self) -> None: + SDKPythonTestRunnerHooks.is_finished = False + SDKPythonTestRunnerHooks.results = Queue() + + def start(self, count: int) -> None: + self.results.put({"start": {"count": count}}) + + def stop(self, duration: int) -> None: + self.results.put({"stop": {"duration": duration}}) + self.is_finished = True + + def test_start(self, filename: str, name: str, count: int) -> None: + self.results.put( + {"test_start": {"filename": filename, "name": name, "count": count}} + ) + + def test_stop(self, exception: Exception, duration: int) -> None: + self.results.put({"test_stop": {"exception": exception, "duration": duration}}) + + def step_skipped(self, name: str, expression: str) -> None: + self.results.put({"step_skipped": {"name": name, "expression": expression}}) + + def step_start(self, name: str) -> None: + self.results.put({"step_start": {"name": name}}) + + def step_success(self, logger: Any, logs: Any, duration: int, request: Any) -> None: + self.results.put( + { + "step_success": { + "logger": logger, + "logs": logs, + "duration": duration, + "request": request, + } + } + ) + + def step_failure( + self, logger: Any, logs: Any, duration: int, request: Any, received: Any + ) -> None: + self.results.put( + { + "step_failure": { + "logger": logger, + "logs": logs, + "duration": duration, + "request": request, + "received": received, + } + } + ) + + def step_unknown(self) -> None: + self.results.put({"step_unknown": {}}) + + def step_manual(self) -> None: + self.results.put({"step_manual": {}}) + + def step_start_list(self) -> None: + pass diff --git a/test_collections/sdk_tests/support/python_testing/models/test_case.py b/test_collections/sdk_tests/support/python_testing/models/test_case.py index 7861636a..75c57ddd 100644 --- a/test_collections/sdk_tests/support/python_testing/models/test_case.py +++ b/test_collections/sdk_tests/support/python_testing/models/test_case.py @@ -13,42 +13,141 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import os import re +from multiprocessing import Process +from multiprocessing.managers import BaseManager from typing import Any, Type, TypeVar +# TODO check if this should be changed to SDK python test specific entries +from matter_chip_tool_adapter.decoder import MatterLog +from matter_yamltests.hooks import TestRunnerHooks + from app.chip_tool.chip_tool import ChipToolTestType from app.chip_tool.test_case import ChipToolTest -from app.test_engine.logger import test_engine_logger -from app.test_engine.models import TestCase, TestStep +from app.test_engine.logger import ( + CHIP_LOG_FORMAT, + CHIPTOOL_LEVEL, + logger, + test_engine_logger, +) +from app.test_engine.models import ManualVerificationTestStep, TestCase, TestStep from .python_test_models import PythonTest +from .python_testing_hooks_proxy import SDKPythonTestRunnerHooks # Custom type variable used to annotate the factory method in PythonTestCase. T = TypeVar("T", bound="PythonTestCase") class PythonTestCase(TestCase): - """Base class for all Python based test cases. + """Base class for all Python Test based test cases. This class provides a class factory that will dynamically declare a new sub-class based on the test-type the Python test is expressing. - The PythonTest will be stored as a class property that will be used at run-time - in all instances of such subclass. + The PythonTest will be stored as a class property that will be used at run-time in all + instances of such subclass. """ python_test: PythonTest python_test_version: str + test_finished: bool + + def reset(self) -> None: + self.start_called = False + self.stop_called = False + self.test_start_called = False + self.test_stop_called = False + self.step_success_count = 0 + self.step_failure_count = 0 + self.step_unknown_count = 0 + self.__runned = 0 + + def start(self, count: int) -> None: + pass + + def stop(self, duration: int) -> None: + pass + + def test_start(self, filename: str, name: str, count: int) -> None: + pass + # Dont know if it is necessary for python testing (came from chip_tool) + # self.next_step() + + def test_stop(self, exception: Exception, duration: int) -> None: + self.current_test_step.mark_as_completed() + + def step_skipped(self, name: str, expression: str) -> None: + self.current_test_step.mark_as_not_applicable( + f"Test step skipped: {name}. {expression} == False" + ) + self.next_step() + + def step_start(self, name: str) -> None: + pass + + def step_success(self, logger: Any, logs: str, duration: int, request: Any) -> None: + self.__handle_logs(logs) + self.next_step() + + def step_failure( + self, logger: Any, logs: str, duration: int, request: Any, received: Any + ) -> None: + self.__handle_logs(logs) + self.__report_failures(logger, request, received) + self.next_step() + + def step_unknown(self) -> None: + self.__runned += 1 + + def is_finished(self) -> bool: + return self.test_finished + + def __handle_logs(self, logs: Any) -> None: + for log_entry in logs or []: + if not isinstance(log_entry, MatterLog): + continue + + test_engine_logger.log( + CHIPTOOL_LEVEL, + CHIP_LOG_FORMAT.format(log_entry.module, log_entry.message), + ) + + def __report_failures(self, logger: Any, request: TestStep, received: Any) -> None: + """ + The logger from runner contains all logs entries for the test step, this method + seeks for the error entries. + """ + if not logger: + # It is expected the runner to return a PostProcessResponseResult, + # but in case of returning a different type + self.current_test_step.append_failure( + "Test Step Failure: \n " f"Expected: '' \n Received: ''" + ) + return + + # Iterate through the entries seeking for the errors entries + for log_entry in logger.entries or []: + if log_entry.is_error(): + # Check if the step error came from exception or not, since the message + # in exception object has more details + # TODO: There is an issue raised in SDK runner in order to improve the + # message from log_entry: + # https://github.com/project-chip/connectedhomeip/issues/28101 + if log_entry.exception: + self.current_test_step.append_failure(log_entry.exception.message) + else: + self.current_test_step.append_failure(log_entry.message) @classmethod def pics(cls) -> set[str]: - """Test Case level PICS. Read directly from parsed python test.""" + """Test Case level PICS. Read directly from parsed Python Test.""" return cls.python_test.PICS @classmethod def default_test_parameters(cls) -> dict[str, Any]: - """Python test config dict, sometimes have a nested dict with type - and default value. + """Python Testing config dict, sometimes have a nested dict with type and default value. Only defaultValue is used in this case. """ parameters = {} @@ -70,7 +169,7 @@ async def setup(self) -> None: @classmethod def class_factory(cls, test: PythonTest, python_test_version: str) -> Type[T]: - """Dynamically declares a subclass based on the type of Python test.""" + """Dynamically declares a subclass based on the type of Python Test test.""" case_class: Type[PythonTestCase] = PythonChipToolTestCase return case_class.__class_factory( @@ -102,7 +201,7 @@ def __class_factory(cls, test: PythonTest, python_test_version: str) -> Type[T]: @staticmethod def __test_identifier(name: str) -> str: - """Find TC-XX-1.1 in Python test title. + """Find TC-XX-1.1 in Python Test title. Note some have [TC-XX-1.1] and others TC-XX-1.1 """ title_pattern = re.compile(r"(?PTC-[^\s\]]*)") @@ -116,14 +215,71 @@ def __class_name(identifier: str) -> str: """Replace all non-alphanumeric characters with _ to make valid class name.""" return re.sub("[^0-9a-zA-Z]+", "_", identifier) + def run_command(self, cmd: str) -> None: + os.system(cmd) + + async def execute(self) -> None: + try: + logger.info("Running Python Test: " + self.metadata["title"]) + + # NOTE that this aproach invalidates parallel execution since test_case_instance object is shared by the class + # TODO: Same approach could work from TestCase side: create test_case_instance inside PythonTestCase to avoid using SDKPythonTestRunnerHooks + + BaseManager.register("TestRunnerHooks", SDKPythonTestRunnerHooks) + manager = BaseManager(address=("0.0.0.0", 50000), authkey=b"abc") + manager.start() + + test_runner_hooks = manager.TestRunnerHooks() # type: ignore + + command = ( + "docker run -it --network host --privileged" + " -v /var/paa-root-certs:/root/paa-root-certs" + " -v /var/run/dbus/system_bus_socket:/var/run/dbus/system_bus_socket:rw" + " -v /home/ubuntu/chip-certification-tool/backend/sdk_content/python_testing2:/root/python_testing2" + " connectedhomeip/chip-cert-bins:19771ed7101321d68b87d05201d42d00adf5368f" + " python3 python_testing2/hello_external_runner.py " + f" {self.metadata['title']}" + ) + # Start the command in a new process + p = Process(target=self.run_command, args=(command,)) + p.start() + + while ((update := test_runner_hooks.updates_test()) is not None) or ( + not test_runner_hooks.finished() + ): + if not update: + continue + + def handle_update(update: dict) -> None: + def call_function(obj, func_name: str, kwargs) -> None: # type: ignore + func = getattr(obj, func_name, None) + if not func: + raise AttributeError( + f"{func_name} is not a method of {obj}" + ) + if not callable(func): + raise TypeError(f"{func_name} is not callable") + # Call the method with the unpacked keyword arguments. + func(**kwargs) + + for func_name, kwargs in update.items(): + call_function(self, func_name, kwargs) + + handle_update(update) + + finally: + pass + + async def cleanup(self) -> None: + logger.info("Test Cleanup") + class PythonChipToolTestCase(PythonTestCase, ChipToolTest): - """Automated Python test cases.""" + """Automated Python test cases using chip-tool.""" test_type = ChipToolTestType.PYTHON_TEST def create_test_steps(self) -> None: - self.test_steps = [TestStep("Start Python test")] for step in self.python_test.steps: python_test_step = TestStep(step.label) self.test_steps.append(python_test_step) diff --git a/test_collections/sdk_tests/support/python_testing/sdk_python_test_tests.py b/test_collections/sdk_tests/support/python_testing/sdk_python_test_tests.py index c16d10d4..5945505d 100644 --- a/test_collections/sdk_tests/support/python_testing/sdk_python_test_tests.py +++ b/test_collections/sdk_tests/support/python_testing/sdk_python_test_tests.py @@ -17,6 +17,8 @@ from loguru import logger +from test_collections.sdk_tests.support.paths import SDK_CHECKOUT_PATH + from .models.python_test_folder import PythonTestFolder from .models.python_test_parser import PythonParserException, parse_python_test from .models.test_declarations import ( @@ -27,7 +29,7 @@ from .models.test_suite import SuiteType ### -# This file hosts logic load and parse Python test-cases, located in +# This file hosts logic to load and parse Python test cases, located in # `test_collections/sdk_tests/sdk_checkout/python_testing/scripts/sdk`. # The `sdk` sub-folder here is automatically maintained using the # `test_collections/sdk_tests/fetch_sdk_tests_and_runner.sh` script. @@ -36,9 +38,7 @@ # - Automated ### -SDK_PYTHON_TEST_PATH = Path( - "/app/backend/test_collections/sdk_tests/sdk_checkout/python_testing/scripts/sdk" -) +SDK_PYTHON_TEST_PATH = SDK_CHECKOUT_PATH / Path("python_testing/scripts/sdk") SDK_PYTHON_TEST_FOLDER = PythonTestFolder( path=SDK_PYTHON_TEST_PATH, filename_pattern="TC*" )