From b49c3ed920f7b95fc0f95ce5e8bdb82c85e757d8 Mon Sep 17 00:00:00 2001 From: Terence Hampson Date: Thu, 15 Dec 2022 11:29:44 -0500 Subject: [PATCH] Common YAML test parser (#24066) * Initial commit of 3 files from Vivien's branch These 3 files came from commit ID e391dadb5d759ce3044e175260f028eff8efec9f * Adding a test commit containing a TODO * Restyle * Small tweaks plus adding unit test * Moving towards PEP8 naming convension * Refactor results to be less focused string matching * Added very basic unit test file Test run using: `python3 scripts/tests/test_yaml_parser.py` * Refactor constraints * [Yaml Parser] Add scripts/tests/yamltests/SpecDefinitions.py with some unit tests * [Yaml Parser] Use SpecDefinitions API instead of ClusterDefinitions * [Yaml Parser] Uses convert_yaml_octet_string_to_bytes for octet_string * [Yaml Parser / Constraints] Move the check for None before the type checking * [Yaml Parser] Add _convert_single_value_to_values and make it sooner such that some code can be made simpler * Small fixes * [Yaml Parser] Moves _convert_single_value_to_values up one block such that update_with_definition can be simplified * Move `__update_placeholder` to TestStep Also TestStep have internal pointer to config dictionary * Add docstring, and initial constraint parsing * Refactor for TestStep into two parts As well as stype formatting to get it ready for review * Converge to PEP8 python format standard * Add and clean up doc strings * Attempt at creating a python package * Make Restyle Happy * Clean up yaml parser unit test to be self contained * Fix test name * Fix test name * Address PR comments Co-authored-by: Andrei Litvin Co-authored-by: Vivien Nicolas --- scripts/tests/test_yaml_parser.py | 95 +++ scripts/tests/yamltests/BUILD.gn | 39 + scripts/tests/yamltests/__init__.py | 0 scripts/tests/yamltests/constraints.py | 357 +++++++++ scripts/tests/yamltests/definitions.py | 200 +++++ scripts/tests/yamltests/fixes.py | 116 +++ scripts/tests/yamltests/parser.py | 708 ++++++++++++++++++ scripts/tests/yamltests/setup.py | 28 + .../tests/yamltests/test_spec_definitions.py | 291 +++++++ 9 files changed, 1834 insertions(+) create mode 100644 scripts/tests/test_yaml_parser.py create mode 100644 scripts/tests/yamltests/BUILD.gn create mode 100644 scripts/tests/yamltests/__init__.py create mode 100644 scripts/tests/yamltests/constraints.py create mode 100644 scripts/tests/yamltests/definitions.py create mode 100644 scripts/tests/yamltests/fixes.py create mode 100644 scripts/tests/yamltests/parser.py create mode 100644 scripts/tests/yamltests/setup.py create mode 100644 scripts/tests/yamltests/test_spec_definitions.py diff --git a/scripts/tests/test_yaml_parser.py b/scripts/tests/test_yaml_parser.py new file mode 100644 index 00000000000000..c6a73da4f8eed9 --- /dev/null +++ b/scripts/tests/test_yaml_parser.py @@ -0,0 +1,95 @@ +# +# Copyright (c) 2022 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# TODO Once yamltest is a proper self contained module we can move this file +# to a more appropriate spot. For now, having this file to do some quick checks +# is arguably better then no checks at all. + +import io +import tempfile +import unittest + +from yamltests.definitions import * +from yamltests.parser import TestParser + +simple_test_description = ''' + + + + + + + + Test + 0x1234 + + + +''' + +simple_test_yaml = ''' +name: Test Cluster Tests + +config: + nodeId: 0x12344321 + cluster: "Test" + endpoint: 1 + +tests: + - label: "Send Test Command" + command: "test" + + - label: "Send Test Not Handled Command" + command: "testNotHandled" + response: + error: INVALID_COMMAND + + - label: "Send Test Specific Command" + command: "testSpecific" + response: + values: + - name: "returnValue" + value: 7 +''' + + +class TestYamlParser(unittest.TestCase): + def setUp(self): + self._definitions = SpecDefinitions( + [ParseSource(source=io.StringIO(simple_test_description), name='simple_test_description')]) + self._temp_file = tempfile.NamedTemporaryFile(suffix='.yaml') + with open(self._temp_file.name, 'w') as f: + f.writelines(simple_test_yaml) + pics_file = None + self._yaml_parser = TestParser(self._temp_file.name, pics_file, self._definitions) + + def test_able_to_iterate_over_all_parsed_tests(self): + # self._yaml_parser.tests implements `__next__`, which does value substitution. We are + # simply ensure there is no exceptions raise. + count = 0 + for idx, test_step in enumerate(self._yaml_parser.tests): + count += 1 + pass + self.assertEqual(count, 3) + + +def main(): + unittest.main() + + +if __name__ == '__main__': + main() diff --git a/scripts/tests/yamltests/BUILD.gn b/scripts/tests/yamltests/BUILD.gn new file mode 100644 index 00000000000000..2f9a49c0744181 --- /dev/null +++ b/scripts/tests/yamltests/BUILD.gn @@ -0,0 +1,39 @@ +# Copyright (c) 2022 Project CHIP Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import("//build_overrides/build.gni") +import("//build_overrides/chip.gni") + +import("//build_overrides/pigweed.gni") +import("$dir_pw_build/python.gni") + +pw_python_package("yamltests") { + setup = [ "setup.py" ] + + sources = [ + "__init__.py", + "constraints.py", + "definitions.py", + "fixes.py", + "parser.py", + ] + + python_deps = [ "${chip_root}/scripts/idl" ] + + tests = [ "test_spec_definitions.py" ] + + # TODO: at a future time consider enabling all (* or missing) here to get + # pylint checking these files + static_analysis = [] +} diff --git a/scripts/tests/yamltests/__init__.py b/scripts/tests/yamltests/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/scripts/tests/yamltests/constraints.py b/scripts/tests/yamltests/constraints.py new file mode 100644 index 00000000000000..0da769fb321d83 --- /dev/null +++ b/scripts/tests/yamltests/constraints.py @@ -0,0 +1,357 @@ +# +# Copyright (c) 2022 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from abc import ABC, abstractmethod +import string + + +class ConstraintParseError(Exception): + def __init__(self, message): + super().__init__(message) + + +class ConstraintValidationError(Exception): + def __init__(self, message): + super().__init__(message) + + +class BaseConstraint(ABC): + '''Constrain Interface''' + + def __init__(self, types: list, is_null_allowed: bool = False): + '''An empty type list provided that indicates any type is accepted''' + self._types = types + self._is_null_allowed = is_null_allowed + + def is_met(self, value): + if value is None: + return self._is_null_allowed + + response_type = type(value) + if self._types and response_type not in self._types: + return False + + return self.check_response(value) + + @abstractmethod + def check_response(self, value) -> bool: + pass + + +class _ConstraintHasValue(BaseConstraint): + def __init__(self, has_value): + super().__init__(types=[]) + self._has_value = has_value + + def check_response(self, value) -> bool: + raise ConstraintValidationError('HasValue constraint currently not implemented') + + +class _ConstraintType(BaseConstraint): + def __init__(self, type): + super().__init__(types=[], is_null_allowed=True) + self._type = type + + def check_response(self, value) -> bool: + success = False + if self._type == 'boolean' and type(value) is bool: + success = True + elif self._type == 'list' and type(value) is list: + success = True + elif self._type == 'char_string' and type(value) is str: + success = True + elif self._type == 'octet_string' and type(value) is bytes: + success = True + elif self._type == 'vendor_id' and type(value) is int: + success = value >= 0 and value <= 0xFFFF + elif self._type == 'device_type_id' and type(value) is int: + success = value >= 0 and value <= 0xFFFFFFFF + elif self._type == 'cluster_id' and type(value) is int: + success = value >= 0 and value <= 0xFFFFFFFF + elif self._type == 'attribute_id' and type(value) is int: + success = value >= 0 and value <= 0xFFFFFFFF + elif self._type == 'field_id' and type(value) is int: + success = value >= 0 and value <= 0xFFFFFFFF + elif self._type == 'command_id' and type(value) is int: + success = value >= 0 and value <= 0xFFFFFFFF + elif self._type == 'event_id' and type(value) is int: + success = value >= 0 and value <= 0xFFFFFFFF + elif self._type == 'action_id' and type(value) is int: + success = value >= 0 and value <= 0xFF + elif self._type == 'transaction_id' and type(value) is int: + success = value >= 0 and value <= 0xFFFFFFFF + elif self._type == 'node_id' and type(value) is int: + success = value >= 0 and value <= 0xFFFFFFFFFFFFFFFF + elif self._type == 'bitmap8' and type(value) is int: + success = value >= 0 and value <= 0xFF + elif self._type == 'bitmap16' and type(value) is int: + success = value >= 0 and value <= 0xFFFF + elif self._type == 'bitmap32' and type(value) is int: + success = value >= 0 and value <= 0xFFFFFFFF + elif self._type == 'bitmap64' and type(value) is int: + success = value >= 0 and value <= 0xFFFFFFFFFFFFFFFF + elif self._type == 'enum8' and type(value) is int: + success = value >= 0 and value <= 0xFF + elif self._type == 'enum16' and type(value) is int: + success = value >= 0 and value <= 0xFFFF + elif self._type == 'Percent' and type(value) is int: + success = value >= 0 and value <= 0xFF + elif self._type == 'Percent100ths' and type(value) is int: + success = value >= 0 and value <= 0xFFFF + elif self._type == 'epoch_us' and type(value) is int: + success = value >= 0 and value <= 0xFFFFFFFFFFFFFFFF + elif self._type == 'epoch_s' and type(value) is int: + success = value >= 0 and value <= 0xFFFFFFFF + elif self._type == 'utc' and type(value) is int: + success = value >= 0 and value <= 0xFFFFFFFF + elif self._type == 'date' and type(value) is int: + success = value >= 0 and value <= 0xFFFFFFFF + elif self._type == 'tod' and type(value) is int: + success = value >= 0 and value <= 0xFFFFFFFF + elif self._type == 'int8u' and type(value) is int: + success = value >= 0 and value <= 0xFF + elif self._type == 'int16u' and type(value) is int: + success = value >= 0 and value <= 0xFFFF + elif self._type == 'int24u' and type(value) is int: + success = value >= 0 and value <= 0xFFFFFF + elif self._type == 'int32u' and type(value) is int: + success = value >= 0 and value <= 0xFFFFFFFF + elif self._type == 'int40u' and type(value) is int: + success = value >= 0 and value <= 0xFFFFFFFFFF + elif self._type == 'int48u' and type(value) is int: + success = value >= 0 and value <= 0xFFFFFFFFFFFF + elif self._type == 'int56u' and type(value) is int: + success = value >= 0 and value <= 0xFFFFFFFFFFFFFF + elif self._type == 'int64u' and type(value) is int: + success = value >= 0 and value <= 0xFFFFFFFFFFFFFFFF + elif self._type == 'nullable_int8u' and type(value) is int: + success = value >= 0 and value <= 0xFE + elif self._type == 'nullable_int16u' and type(value) is int: + success = value >= 0 and value <= 0xFFFE + elif self._type == 'nullable_int24u' and type(value) is int: + success = value >= 0 and value <= 0xFFFFFE + elif self._type == 'nullable_int32u' and type(value) is int: + success = value >= 0 and value <= 0xFFFFFFFE + elif self._type == 'nullable_int40u' and type(value) is int: + success = value >= 0 and value <= 0xFFFFFFFFFE + elif self._type == 'nullable_int48u' and type(value) is int: + success = value >= 0 and value <= 0xFFFFFFFFFFFE + elif self._type == 'nullable_int56u' and type(value) is int: + success = value >= 0 and value <= 0xFFFFFFFFFFFFFE + elif self._type == 'nullable_int64u' and type(value) is int: + success = value >= 0 and value <= 0xFFFFFFFFFFFFFFFE + elif self._type == 'int8s' and type(value) is int: + success = value >= -128 and value <= 127 + elif self._type == 'int16s' and type(value) is int: + success = value >= -32768 and value <= 32767 + elif self._type == 'int24s' and type(value) is int: + success = value >= -8388608 and value <= 8388607 + elif self._type == 'int32s' and type(value) is int: + success = value >= -2147483648 and value <= 2147483647 + elif self._type == 'int40s' and type(value) is int: + success = value >= -549755813888 and value <= 549755813887 + elif self._type == 'int48s' and type(value) is int: + success = value >= -140737488355328 and value <= 140737488355327 + elif self._type == 'int56s' and type(value) is int: + success = value >= -36028797018963968 and value <= 36028797018963967 + elif self._type == 'int64s' and type(value) is int: + success = value >= -9223372036854775808 and value <= 9223372036854775807 + elif self._type == 'nullable_int8s' and type(value) is int: + success = value >= -127 and value <= 127 + elif self._type == 'nullable_int16s' and type(value) is int: + success = value >= -32767 and value <= 32767 + elif self._type == 'nullable_int24s' and type(value) is int: + success = value >= -8388607 and value <= 8388607 + elif self._type == 'nullable_int32s' and type(value) is int: + success = value >= -2147483647 and value <= 2147483647 + elif self._type == 'nullable_int40s' and type(value) is int: + success = value >= -549755813887 and value <= 549755813887 + elif self._type == 'nullable_int48s' and type(value) is int: + success = value >= -140737488355327 and value <= 140737488355327 + elif self._type == 'nullable_int56s' and type(value) is int: + success = value >= -36028797018963967 and value <= 36028797018963967 + elif self._type == 'nullable_int64s' and type(value) is int: + success = value >= -9223372036854775807 and value <= 9223372036854775807 + return success + + +class _ConstraintMinLength(BaseConstraint): + def __init__(self, min_length): + super().__init__(types=[str, bytes, list]) + self._min_length = min_length + + def check_response(self, value) -> bool: + return len(value) >= self._min_length + + +class _ConstraintMaxLength(BaseConstraint): + def __init__(self, max_length): + super().__init__(types=[str, bytes, list]) + self._max_length = max_length + + def check_response(self, value) -> bool: + return len(value) <= self._max_length + + +class _ConstraintIsHexString(BaseConstraint): + def __init__(self, is_hex_string: bool): + super().__init__(types=[str]) + self._is_hex_string = is_hex_string + + def check_response(self, value) -> bool: + return all(c in string.hexdigits for c in value) == self._is_hex_string + + +class _ConstraintStartsWith(BaseConstraint): + def __init__(self, starts_with): + super().__init__(types=[str]) + self._starts_with = starts_with + + def check_response(self, value) -> bool: + return value.startswith(self._starts_with) + + +class _ConstraintEndsWith(BaseConstraint): + def __init__(self, ends_with): + super().__init__(types=[str]) + self._ends_with = ends_with + + def check_response(self, value) -> bool: + return value.endswith(self._ends_with) + + +class _ConstraintIsUpperCase(BaseConstraint): + def __init__(self, is_upper_case): + super().__init__(types=[str]) + self._is_upper_case = is_upper_case + + def check_response(self, value) -> bool: + return value.isupper() == self._is_upper_case + + +class _ConstraintIsLowerCase(BaseConstraint): + def __init__(self, is_lower_case): + super().__init__(types=[str]) + self._is_lower_case = is_lower_case + + def check_response(self, value) -> bool: + return value.islower() == self._is_lower_case + + +class _ConstraintMinValue(BaseConstraint): + def __init__(self, min_value): + super().__init__(types=[int, float], is_null_allowed=True) + self._min_value = min_value + + def check_response(self, value) -> bool: + return value >= self._min_value + + +class _ConstraintMaxValue(BaseConstraint): + def __init__(self, max_value): + super().__init__(types=[int, float], is_null_allowed=True) + self._max_value = max_value + + def check_response(self, value) -> bool: + return value <= self._max_value + + +class _ConstraintContains(BaseConstraint): + def __init__(self, contains): + super().__init__(types=[list]) + self._contains = contains + + def check_response(self, value) -> bool: + return set(self._contains).issubset(value) + + +class _ConstraintExcludes(BaseConstraint): + def __init__(self, excludes): + super().__init__(types=[list]) + self._excludes = excludes + + def check_response(self, value) -> bool: + return set(self._excludes).isdisjoint(value) + + +class _ConstraintHasMaskSet(BaseConstraint): + def __init__(self, has_masks_set): + super().__init__(types=[int]) + self._has_masks_set = has_masks_set + + def check_response(self, value) -> bool: + return all([(value & mask) == mask for mask in self._has_masks_set]) + + +class _ConstraintHasMaskClear(BaseConstraint): + def __init__(self, has_masks_clear): + super().__init__(types=[int]) + self._has_masks_clear = has_masks_clear + + def check_response(self, value) -> bool: + return all([(value & mask) == 0 for mask in self._has_masks_clear]) + + +class _ConstraintNotValue(BaseConstraint): + def __init__(self, not_value): + super().__init__(types=[], is_null_allowed=True) + self._not_value = not_value + + def check_response(self, value) -> bool: + return value != self._not_value + + +def get_constraints(constraints: dict) -> list[BaseConstraint]: + _constraints = [] + + for constraint, constraint_value in constraints.items(): + if 'hasValue' == constraint: + _constraints.append(_ConstraintHasValue(constraint_value)) + elif 'type' == constraint: + _constraints.append(_ConstraintType(constraint_value)) + elif 'minLength' == constraint: + _constraints.append(_ConstraintMinLength(constraint_value)) + elif 'maxLength' == constraint: + _constraints.append(_ConstraintMaxLength(constraint_value)) + elif 'isHexString' == constraint: + _constraints.append(_ConstraintIsHexString(constraint_value)) + elif 'startsWith' == constraint: + _constraints.append(_ConstraintStartsWith(constraint_value)) + elif 'endsWith' == constraint: + _constraints.append(_ConstraintEndsWith(constraint_value)) + elif 'isUpperCase' == constraint: + _constraints.append(_ConstraintIsUpperCase(constraint_value)) + elif 'isLowerCase' == constraint: + _constraints.append(_ConstraintIsLowerCase(constraint_value)) + elif 'minValue' == constraint: + _constraints.append(_ConstraintMinValue(constraint_value)) + elif 'maxValue' == constraint: + _constraints.append(_ConstraintMaxValue(constraint_value)) + elif 'contains' == constraint: + _constraints.append(_ConstraintContains(constraint_value)) + elif 'excludes' == constraint: + _constraints.append(_ConstraintExcludes(constraint_value)) + elif 'hasMasksSet' == constraint: + _constraints.append(_ConstraintHasMaskSet(constraint_value)) + elif 'hasMasksClear' == constraint: + _constraints.append(_ConstraintHasMaskClear(constraint_value)) + elif 'notValue' == constraint: + _constraints.append(_ConstraintNotValue(constraint_value)) + else: + raise ConstraintParseError(f'Unknown constraint type:{constraint}') + + return _constraints diff --git a/scripts/tests/yamltests/definitions.py b/scripts/tests/yamltests/definitions.py new file mode 100644 index 00000000000000..006704993d7bab --- /dev/null +++ b/scripts/tests/yamltests/definitions.py @@ -0,0 +1,200 @@ +# +# Copyright (c) 2022 Project CHIP Authors +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import List +import enum + +try: + from idl.zapxml import ParseSource, ParseXmls + from idl.matter_idl_types import * +except: + import os + import sys + sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../..'))) + + from idl.zapxml import ParseSource, ParseXmls + from idl.matter_idl_types import * + + +class _ItemType(enum.Enum): + Cluster = 0 + Request = 1 + Response = 2 + Attribute = 3 + Event = 4 + Bitmap = 5 + Enum = 6 + Struct = 7 + + +class SpecDefinitions: + + def __init__(self, sources: List[ParseSource]): + self.__clusters_by_id: dict[int, Cluster] = {} + self.__commands_by_id: dict[int, dict[int, Command]] = {} + self.__responses_by_id: dict[int, dict[int, Struct]] = {} + self.__attributes_by_id: dict[int, dict[int, Attribute]] = {} + self.__events_by_id: dict[int, dict[int, Event]] = {} + + self.__clusters_by_name: dict[str, int] = {} + self.__commands_by_name: dict[str, int] = {} + self.__responses_by_name: dict[str, int] = {} + self.__attributes_by_name: dict[str, int] = {} + self.__events_by_name: dict[str, int] = {} + + self.__bitmaps_by_name: dict[str, dict[str, Bitmap]] = {} + self.__enums_by_name: dict[str, dict[str, Enum]] = {} + self.__structs_by_name: dict[str, dict[str, Struct]] = {} + + idl = ParseXmls(sources) + + for cluster in idl.clusters: + code: int = cluster.code + name: str = cluster.name + self.__clusters_by_id[code] = cluster + self.__commands_by_id[code] = {c.code: c for c in cluster.commands} + self.__responses_by_id[code] = {} + self.__attributes_by_id[code] = {a.definition.code: a for a in cluster.attributes} + self.__events_by_id[code] = {e.code: e for e in cluster.events} + + self.__clusters_by_name[name] = cluster.code + self.__commands_by_name[name] = {c.name.lower(): c.code for c in cluster.commands} + self.__responses_by_name[name] = {} + self.__attributes_by_name[name] = {a.definition.name.lower(): a.definition.code for a in cluster.attributes} + self.__events_by_name[name] = {e.name.lower(): e.code for e in cluster.events} + + self.__bitmaps_by_name[name] = {b.name.lower(): b for b in cluster.bitmaps} + self.__enums_by_name[name] = {e.name.lower(): e for e in cluster.enums} + self.__structs_by_name[name] = {s.name.lower(): s for s in cluster.structs} + + for struct in cluster.structs: + if struct.tag == StructTag.RESPONSE: + self.__responses_by_id[code][struct.code] = struct + self.__responses_by_name[name][struct.name.lower()] = struct.code + + def get_cluster_name(self, cluster_id: int) -> str: + cluster = self.__clusters_by_id.get(cluster_id) + return cluster.name if cluster else None + + def get_command_name(self, cluster_id: int, command_id: int) -> str: + command = self.__get_by_id(cluster_id, command_id, _ItemType.Request) + return command.name if command else None + + def get_response_name(self, cluster_id: int, response_id: int) -> str: + response = self.__get_by_id(cluster_id, response_id, _ItemType.Response) + return response.name if response else None + + def get_attribute_name(self, cluster_id: int, attribute_id: int) -> str: + attribute = self.__get_by_id(cluster_id, attribute_id, _ItemType.Attribute) + return attribute.definition.name if attribute else None + + def get_event_name(self, cluster_id: int, event_id: int) -> str: + event = self.__get_by_id(cluster_id, event_id, _ItemType.Event) + return event.name if event else None + + def get_command_by_name(self, cluster_name: str, command_name: str) -> Command: + return self.__get_by_name(cluster_name, command_name, _ItemType.Request) + + def get_response_by_name(self, cluster_name: str, response_name: str) -> Struct: + return self.__get_by_name(cluster_name, response_name, _ItemType.Response) + + def get_attribute_by_name(self, cluster_name: str, attribute_name: str) -> Attribute: + return self.__get_by_name(cluster_name, attribute_name, _ItemType.Attribute) + + def get_event_by_name(self, cluster_name: str, event_name: str) -> Event: + return self.__get_by_name(cluster_name, event_name, _ItemType.Event) + + def get_bitmap_by_name(self, cluster_name: str, bitmap_name: str) -> Bitmap: + return self.__get_by_name(cluster_name, bitmap_name, _ItemType.Bitmap) + + def get_enum_by_name(self, cluster_name: str, enum_name: str) -> Bitmap: + return self.__get_by_name(cluster_name, enum_name, _ItemType.Enum) + + def get_struct_by_name(self, cluster_name: str, struct_name: str) -> Struct: + return self.__get_by_name(cluster_name, struct_name, _ItemType.Struct) + + def get_type_by_name(self, cluster_name: str, target_name: str): + bitmap = self.get_bitmap_by_name(cluster_name, target_name) + if bitmap: + return bitmap + + enum = self.get_enum_by_name(cluster_name, target_name) + if enum: + return enum + + struct = self.get_struct_by_name(cluster_name, target_name) + if struct: + return struct + + return None + + def is_fabric_scoped(self, target) -> bool: + if hasattr(target, 'qualities'): + return bool(target.qualities & StructQuality.FABRIC_SCOPED) + return False + + def __get_by_name(self, cluster_name: str, target_name: str, target_type: _ItemType): + if not cluster_name or not target_name: + return None + + # The idl parser remove spaces + cluster_name = cluster_name.replace(' ', '') + # Many YAML tests formats the name using camelCase despites that the spec mandates + # CamelCase. To be compatible with the current tests, everything is converted to lower case. + target_name = target_name.lower() + + cluster_id = self.__clusters_by_name.get(cluster_name) + if cluster_id is None: + return None + + target = None + + if target_type == _ItemType.Request: + target_id = self.__commands_by_name.get(cluster_name).get(target_name) + target = self.__get_by_id(cluster_id, target_id, target_type) + elif target_type == _ItemType.Response: + target_id = self.__responses_by_name.get(cluster_name).get(target_name) + target = self.__get_by_id(cluster_id, target_id, target_type) + elif target_type == _ItemType.Event: + target_id = self.__events_by_name.get(cluster_name).get(target_name) + target = self.__get_by_id(cluster_id, target_id, target_type) + elif target_type == _ItemType.Attribute: + target_id = self.__attributes_by_name.get(cluster_name).get(target_name) + target = self.__get_by_id(cluster_id, target_id, target_type) + elif target_type == _ItemType.Bitmap: + target = self.__bitmaps_by_name.get(cluster_name).get(target_name) + elif target_type == _ItemType.Enum: + target = self.__enums_by_name.get(cluster_name).get(target_name) + elif target_type == _ItemType.Struct: + target = self.__structs_by_name.get(cluster_name).get(target_name) + + return target + + def __get_by_id(self, cluster_id: int, target_id: int, target_type: str): + targets = None + + if target_type == _ItemType.Request: + targets = self.__commands_by_id.get(cluster_id) + elif target_type == _ItemType.Response: + targets = self.__responses_by_id.get(cluster_id) + elif target_type == _ItemType.Event: + targets = self.__events_by_id.get(cluster_id) + elif target_type == _ItemType.Attribute: + targets = self.__attributes_by_id.get(cluster_id) + + if targets is None: + return None + + return targets.get(target_id) diff --git a/scripts/tests/yamltests/fixes.py b/scripts/tests/yamltests/fixes.py new file mode 100644 index 00000000000000..a0126b003f9a2e --- /dev/null +++ b/scripts/tests/yamltests/fixes.py @@ -0,0 +1,116 @@ +# +# Copyright (c) 2022 Project CHIP Authors +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import re +import binascii + +'''Fixes certain value formats known to exist in YAML tests + +Some of the YAML test files contains some values that has been crafted to avoid some limitations +of the original JavaScript parser and the C++ generated bits. This file contains functions to +changes and convert those YAML values them back to something agnostic. +''' + + +def try_apply_yaml_cpp_longlong_limitation_fix(value): + '''Fix a known nasty hack in order that was used to work around compiler issue for C++. + + When -9223372036854775808 was provided compiler would give a warning saying it is not a valid + way to write a long long in C++. + ''' + if value == "-9223372036854775807LL - 1": + value = -9223372036854775808 + return value + + +def try_apply_yaml_unrepresentable_integer_for_javascript_fixes(value): + '''Fix up large integers that are represented within a string. + + JavaScript can not represent integers bigger than 9007199254740991. But some of the test may + uses values that are bigger than this. The current way to workaround this limitation has + been to write those numbers as strings by encapsulating them in "". + ''' + if type(value) is str: + value = int(value) + return value + + +def try_apply_yaml_float_written_as_strings(value): + '''Fix up floats that are represented within a string.''' + if type(value) is str: + value = float(value) + return value + + +# TODO(thampson) This method is a clone of the method in +# src/controller/python/chip/yaml/format_converter.py and should eventually be removed in that file. +def convert_yaml_octet_string_to_bytes(s: str) -> bytes: + '''Convert YAML octet string body to bytes. + + This handles any c-style hex escapes (e.g. \x5a) and hex: prefix + ''' + # Step 1: handle explicit "hex:" prefix + if s.startswith('hex:'): + return binascii.unhexlify(s[4:]) + + # Step 2: convert non-hex-prefixed to bytes + # TODO(#23669): This does not properly support utf8 octet strings. We mimic + # javascript codegen behavior. Behavior of javascript is: + # * Octet string character >= u+0200 errors out. + # * Any character greater than 0xFF has the upper bytes chopped off. + as_bytes = [ord(c) for c in s] + + if any([value > 0x200 for value in as_bytes]): + raise ValueError('Unsupported char in octet string %r' % as_bytes) + accumulated_hex = ''.join([f"{(v & 0xFF):02x}" for v in as_bytes]) + return binascii.unhexlify(accumulated_hex) + + +def try_add_yaml_support_for_scientific_notation_without_dot(loader): + regular_expression = re.compile(u'''^(?: + [-+]?(?:[0-9][0-9_]*)\\.[0-9_]*(?:[eE][-+]?[0-9]+)? + |[-+]?(?:[0-9][0-9_]*)(?:[eE][-+]?[0-9]+) + |\\.[0-9_]+(?:[eE][-+][0-9]+)? + |[-+]?[0-9][0-9_]*(?::[0-5]?[0-9])+\\.[0-9_]* + |[-+]?\\.(?:inf|Inf|INF) + |\\.(?:nan|NaN|NAN))$''', re.X) + + loader.add_implicit_resolver( + u'tag:yaml.org,2002:float', + regular_expression, + list(u'-+0123456789.')) + return loader + + +# This is a gross hack. The previous runner has a some internal states where an identity match one +# accessory. But this state may not exist in the runner (as in it prevent to have multiple node ids +# associated to a fabric...) so the 'nodeId' needs to be added back manually. +def try_update_yaml_node_id_test_runner_state(tests, config): + identities = {'alpha': None if 'nodeId' not in config else config['nodeId']} + + for test in tests: + if not test.is_enabled: + continue + + identity = test.identity + + if test.cluster == 'CommissionerCommands': + if test.command == 'PairWithCode': + for item in test.arguments['values']: + if item['name'] == 'nodeId': + identities[identity] = item['value'] + elif identity is not None and identity in identities: + nodeId = identities[identity] + test.nodeId = nodeId diff --git a/scripts/tests/yamltests/parser.py b/scripts/tests/yamltests/parser.py new file mode 100644 index 00000000000000..329d17cf124856 --- /dev/null +++ b/scripts/tests/yamltests/parser.py @@ -0,0 +1,708 @@ +# +# Copyright (c) 2022 Project CHIP Authors +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy +import yaml +from enum import Enum + +from .constraints import get_constraints +from . import fixes + +_TESTS_SECTION = [ + 'name', + 'config', + 'tests', + 'PICS', +] + +_TEST_SECTION = [ + 'label', + 'cluster', + 'command', + 'disabled', + 'endpoint', + 'identity', + 'fabricFiltered', + 'verification', + 'nodeId', + 'attribute', + 'optional', + 'PICS', + 'arguments', + 'response', + 'minInterval', + 'maxInterval', + 'timedInteractionTimeoutMs', + 'busyWaitMs', +] + +_TEST_ARGUMENTS_SECTION = [ + 'values', + 'value', +] + +_TEST_RESPONSE_SECTION = [ + 'value', + 'values', + 'error', + 'clusterError', + 'constraints', + 'type', + 'hasMasksSet', + 'contains', + 'saveAs' +] + +_ATTRIBUTE_COMMANDS = [ + 'readAttribute', + 'writeAttribute', + 'subscribeAttribute', + 'waitForReport', +] + +_EVENT_COMMANDS = [ + 'readEvent', + 'subscribeEvent', +] + + +class PostProcessCheckStatus(Enum): + '''Indicates the post processing check step status.''' + SUCCESS = 'success', + WARNING = 'warning', + ERROR = 'error' + + +class PostProcessCheckType(Enum): + '''Indicates the post processing check step type.''' + IM_STATUS = 'IMStatus', + CLUSTER_STATUS = 'ClusterStatus', + RESPONSE_VALIDATION = 'Response', + CONSTRAINT_VALIDATION = 'Constraints', + SAVE_AS_VARIABLE = 'SaveAs' + + +class PostProcessCheck: + '''Information about a single post processing operation that was performed. + + Each check has a helpful message, indicating what the post processing operation did and whether + it was successful or not. + ''' + + def __init__(self, state: PostProcessCheckStatus, category: PostProcessCheckType, message: str): + self.state = state + self.category = category + self.message = message + + def is_success(self) -> bool: + return self.state == PostProcessCheckStatus.SUCCESS + + def is_warning(self) -> bool: + return self.state == PostProcessCheckStatus.WARNING + + def is_error(self) -> bool: + return self.state == PostProcessCheckStatus.ERROR + + +class PostProcessResponseResult: + '''Post processing response result information. + + There are multiple operations that occur when post processing a response. This contains all the + results for each operation performed. Note that the number and types of steps performed is + dependant on test step itself. + ''' + + def __init__(self): + self.entries = [] + self.successes = 0 + self.warnings = 0 + self.errors = 0 + + def success(self, category: PostProcessCheckType, message: str): + '''Adds a success entry that occured when post processing response to results.''' + self._insert(PostProcessCheckStatus.SUCCESS, category, message) + self.successes += 1 + + def warning(self, category: PostProcessCheckType, message: str): + '''Adds a warning entry that occured when post processing response to results.''' + self._insert(PostProcessCheckStatus.WARNING, category, message) + self.warnings += 1 + + def error(self, category: PostProcessCheckType, message: str): + '''Adds an error entry that occured when post processing response to results.''' + self._insert(PostProcessCheckStatus.ERROR, category, message) + self.errors += 1 + + def is_success(self): + # It is possible that post processing a response doesn't have any success entires added + # that is why we explicitly only search for if an error occurred. + return self.errors == 0 + + def is_failure(self): + return self.errors != 0 + + def _insert(self, state: PostProcessCheckStatus, category: PostProcessCheckType, message: str): + log = PostProcessCheck(state, category, message) + self.entries.append(log) + + +def _check_valid_keys(section, valid_keys_dict): + if section: + for key in section: + if key not in valid_keys_dict: + print(f'Unknown key: {key}') + raise KeyError + + +def _value_or_none(data, key): + return data[key] if key in data else None + + +def _value_or_config(data, key, config): + return data[key] if key in data else config[key] + + +class _TestStepWithPlaceholders: + '''A single YAML test parsed, as is, from YAML. + + Some YAML test steps contain placeholders for variable subsitution. The value of the variable + is only known after an earlier test step's has executed and the result successfully post + processed. + ''' + + def __init__(self, test: dict, config: dict, definitions): + # Disabled tests are not parsed in order to allow the test to be added to the test + # suite even if the feature is not implemented yet. + self.is_enabled = not ('disabled' in test and test['disabled']) + if not self.is_enabled: + return + + self._parsing_config_variable_storage = config + + _check_valid_keys(test, _TEST_SECTION) + + self.label = _value_or_none(test, 'label') + self.optional = _value_or_none(test, 'optional') + self.node_id = _value_or_config(test, 'nodeId', config) + self.cluster = _value_or_config(test, 'cluster', config) + self.command = _value_or_config(test, 'command', config) + self.attribute = _value_or_none(test, 'attribute') + self.endpoint = _value_or_config(test, 'endpoint', config) + + self.identity = _value_or_none(test, 'identity') + self.fabric_filtered = _value_or_none(test, 'fabricFiltered') + self.min_interval = _value_or_none(test, 'minInterval') + self.max_interval = _value_or_none(test, 'maxInterval') + self.timed_interaction_timeout_ms = _value_or_none(test, 'timedInteractionTimeoutMs') + self.busy_wait_ms = _value_or_none(test, 'busyWaitMs') + + self.is_attribute = self.command in _ATTRIBUTE_COMMANDS + self.is_event = self.command in _EVENT_COMMANDS + + self.arguments_with_placeholders = _value_or_none(test, 'arguments') + self.response_with_placeholders = _value_or_none(test, 'response') + + _check_valid_keys(self.arguments_with_placeholders, _TEST_ARGUMENTS_SECTION) + _check_valid_keys(self.response_with_placeholders, _TEST_RESPONSE_SECTION) + + self._convert_single_value_to_values(self.arguments_with_placeholders) + self._convert_single_value_to_values(self.response_with_placeholders) + + argument_mapping = None + response_mapping = None + + if self.is_attribute: + attribute = definitions.get_attribute_by_name(self.cluster, self.attribute) + if attribute: + attribute_mapping = self._as_mapping(definitions, self.cluster, + attribute.definition.data_type.name) + argument_mapping = attribute_mapping + response_mapping = attribute_mapping + else: + command = definitions.get_command_by_name(self.cluster, self.command) + if command: + argument_mapping = self._as_mapping(definitions, self.cluster, command.input_param) + response_mapping = self._as_mapping(definitions, self.cluster, command.output_param) + + self._update_with_definition(self.arguments_with_placeholders, argument_mapping) + self._update_with_definition(self.response_with_placeholders, response_mapping) + + # This performs a very basic sanity parse time check of constrains. This parsing happens + # again inside post processing response since at that time we will have required variables + # to substitute in. This parsing check here has value since some test can take a really + # long time to run so knowing earlier on that the test step would have failed at parsing + # time before the test step run occurs save developer time that building yaml tests. + if self.response_with_placeholders: + for value in self.response_with_placeholders['values']: + if 'constraints' not in value: + continue + get_constraints(value['constraints']) + + def _convert_single_value_to_values(self, container): + if container is None or 'values' in container: + return + + # Attribute tests pass a single value argument that does not carry a name but + # instead uses a generic 'value' keyword. Convert to keyword to be the single + # members of the 'values' array which is what is used for other tests. + value = {} + + known_keys_to_copy = ['value', 'constraints', 'saveAs'] + known_keys_to_allow = ['error', 'clusterError'] + + for key, item in list(container.items()): + if key in known_keys_to_copy: + value[key] = item + del container[key] + elif key in known_keys_to_allow: + # Nothing to do for those keys. + pass + else: + raise KeyError(f'Unknown key: {key}') + + container['values'] = [value] + + def _as_mapping(self, definitions, cluster_name, target_name): + element = definitions.get_type_by_name(cluster_name, target_name) + + if hasattr(element, 'base_type'): + target_name = element.base_type.lower() + elif hasattr(element, 'fields'): + target_name = {f.name: self._as_mapping(definitions, cluster_name, f.data_type.name) for f in element.fields} + elif target_name: + target_name = target_name.lower() + + return target_name + + def _update_with_definition(self, container: dict, mapping_type): + if not container or not mapping_type: + return + + for value in list(container['values']): + for key, item_value in list(value.items()): + mapping = mapping_type if self.is_attribute else mapping_type[value['name']] + + if key == 'value': + value[key] = self._update_value_with_definition(item_value, mapping) + elif key == 'saveAs' and type(item_value) is str and item_value not in self._parsing_config_variable_storage: + self._parsing_config_variable_storage[item_value] = None + elif key == 'constraints': + for constraint, constraint_value in item_value.items(): + value[key][constraint] = self._update_value_with_definition( + constraint_value, mapping_type) + else: + # This key, value pair does not rely on cluster specifications. + pass + + def _update_value_with_definition(self, value, mapping_type): + if not mapping_type: + return value + + if type(value) is dict: + rv = {} + for key in value: + # FabricIndex is a special case where the framework requires it to be passed even + # if it is not part of the requested arguments per spec and not part of the XML + # definition. + if key == 'FabricIndex' or key == 'fabricIndex': + rv[key] = value[key] # int64u + else: + mapping = mapping_type[key] + rv[key] = self._update_value_with_definition(value[key], mapping) + return rv + if type(value) is list: + return [self._update_value_with_definition(entry, mapping_type) for entry in value] + # TODO currently unsure if the check of `value not in config` is sufficant. For + # example let's say value = 'foo + 1' and map type is 'int64u', we would arguably do + # the wrong thing below. + if value is not None and value not in self._parsing_config_variable_storage: + if mapping_type == 'int64u' or mapping_type == 'int64s' or mapping_type == 'bitmap64' or mapping_type == 'epoch_us': + value = fixes.try_apply_yaml_cpp_longlong_limitation_fix(value) + value = fixes.try_apply_yaml_unrepresentable_integer_for_javascript_fixes(value) + elif mapping_type == 'single' or mapping_type == 'double': + value = fixes.try_apply_yaml_float_written_as_strings(value) + elif mapping_type == 'octet_string' or mapping_type == 'long_octet_string': + value = fixes.convert_yaml_octet_string_to_bytes(value) + elif mapping_type == 'boolean': + value = bool(value) + + return value + + +class TestStep: + '''A single YAML test action parsed from YAML. + + This object contains all the information required for a test runner to execute the test step. + It also provide a function that is expected to be called by the test runner to post process + the recieved response from the accessory. Post processing both validates recieved response + and saves any variables that might be required but test step that have yet to be executed. + ''' + + def __init__(self, test: _TestStepWithPlaceholders, runtime_config_variable_storage: dict): + self._test = test + self._runtime_config_variable_storage = runtime_config_variable_storage + self.arguments = copy.deepcopy(test.arguments_with_placeholders) + self.response = copy.deepcopy(test.response_with_placeholders) + self._update_placeholder_values(self.arguments) + self._update_placeholder_values(self.response) + + @property + def is_enabled(self): + return self._test.is_enabled + + @property + def is_attribute(self): + return self._test.is_attribute + + @property + def is_event(self): + return self._test.is_event + + @property + def label(self): + return self._test.label + + @property + def optional(self): + return self._test.optional + + @property + def node_id(self): + return self._test.node_id + + @property + def cluster(self): + return self._test.cluster + + @property + def command(self): + return self._test.command + + @property + def attribute(self): + return self._test.attribute + + @property + def endpoint(self): + return self._test.endpoint + + @property + def identity(self): + return self._test.identity + + @property + def fabric_filtered(self): + return self._test.fabric_filtered + + @property + def min_interval(self): + return self._test.min_interval + + @property + def max_interval(self): + return self._test.max_interval + + @property + def timed_interaction_timeout_ms(self): + return self._test.timed_interaction_timeout_ms + + @property + def busy_wait_ms(self): + return self._test.busy_wait_ms + + def post_process_response(self, response: dict): + result = PostProcessResponseResult() + + if self._skip_post_processing(response, result): + return result + + self._response_error_validation(response, result) + if self.response: + self._response_cluster_error_validation(response, result) + self._response_values_validation(response, result) + self._response_constraints_validation(response, result) + self._maybe_save_as(response, result) + + return result + + def _skip_post_processing(self, response: dict, result) -> bool: + '''Should we skip perform post processing. + + Currently we only skip post processing if the test step indicates that sent test step + invokation was expected to be optionally supported. We confirm that it is optional + supported by either validating we got the expected error only then indicate that all + other post processing should be skipped. + ''' + if not self.optional: + return False + + received_error = response.get('error', None) + if received_error is None: + return False + + if received_error == 'UNSUPPORTED_ATTRIBUTE' or received_error == 'UNSUPPORTED_COMMAND': + # result.warning(PostProcessCheckType.Optional, f'The response contains the error: "{error}".') + return True + + return False + + def _response_error_validation(self, response, result): + check_type = PostProcessCheckType.IM_STATUS + error_success = 'The test expects the "{error}" error which occured successfully.' + error_success_no_error = 'The test expects no error and no error occurred.' + error_wrong_error = 'The test expects the "{error}" error but the "{value}" error occured.' + error_unexpected_error = 'The test expects no error but the "{value}" error occured.' + error_unexpected_success = 'The test expects the "{error}" error but no error occured.' + + expected_error = self.response.get('error') if self.response else None + + # Handle generic success/error + if type(response) is str and response == 'failure': + received_error = response + elif type(response) is str and response == 'success': + received_error = None + else: + received_error = response.get('error') + + if expected_error and received_error and expected_error == received_error: + result.success(check_type, error_success.format(error=expected_error)) + elif expected_error and received_error: + result.error(check_type, error_wrong_error.format( + error=expected_error, value=received_error)) + elif expected_error and not received_error: + result.error(check_type, error_unexpected_success.format(error=expected_error)) + elif not expected_error and received_error: + result.error(check_type, error_unexpected_error.format(error=received_error)) + elif not expected_error and not received_error: + result.success(check_type, error_success_no_error) + else: + # This should not happens + raise AssertionError('This should not happens.') + + def _response_cluster_error_validation(self, response, result): + check_type = PostProcessCheckType.CLUSTER_STATUS + error_success = 'The test expects the "{error}" error which occured successfully.' + error_unexpected_success = 'The test expects the "{error}" error but no error occured.' + error_wrong_error = 'The test expects the "{error}" error but the "{value}" error occured.' + + expected_error = self.response.get('clusterError') + received_error = response.get('clusterError') + + if expected_error: + if received_error and expected_error == received_error: + result.success(check_type, error_success.format(error=expected_error)) + elif received_error: + result.error(check_type, error_wrong_error.format( + error=expected_error, value=received_error)) + else: + result.error(check_type, error_unexpected_success.format(error=expected_error)) + else: + # Nothing is logged here to not be redundant with the generic error checking code. + pass + + def _response_values_validation(self, response, result): + check_type = PostProcessCheckType.RESPONSE_VALIDATION + error_success = 'The test expectation "{name} == {value}" is true' + error_failure = 'The test expectation "{name} == {value}" is false' + error_name_does_not_exist = 'The test expects a value named "{name}" but it does not exists in the response."' + + for value in self.response['values']: + if 'value' not in value: + continue + + expected_name = 'value' + received_value = response.get('value') + if not self.is_attribute: + expected_name = value.get('name') + if expected_name not in received_value: + result.error(check_type, error_name_does_not_exist.format(name=expected_name)) + continue + + received_value = received_value.get(expected_name) if received_value else None + + # TODO Supports Array/List. See an exemple of failure in TestArmFailSafe.yaml + expected_value = value.get('value') + if expected_value == received_value: + result.success( + check_type, error_success.format(name=expected_name, value=received_value)) + else: + result.error( + check_type, error_failure.format(name=expected_name, value=expected_value)) + + def _response_constraints_validation(self, response, result): + check_type = PostProcessCheckType.CONSTRAINT_VALIDATION + error_success = 'Constraints check passed' + error_failure = 'Constraints check failed' + error_name_does_not_exist = 'The test expects a value named "{name}" but it does not exists in the response."' + + for value in self.response['values']: + if 'constraints' not in value: + continue + + expected_name = 'value' + received_value = response.get('value') + if not self.is_attribute: + expected_name = value.get('name') + if expected_name not in received_value: + result.error(check_type, error_name_does_not_exist.format(name=expected_name)) + continue + + received_value = received_value.get(expected_name) if received_value else None + + constraints = get_constraints(value['constraints']) + if all([constraint.is_met(received_value) for constraint in constraints]): + result.success(check_type, error_success) + else: + # TODO would be helpful to be more verbose here + result.error(check_type, error_failure) + + def _maybe_save_as(self, response, result): + check_type = PostProcessCheckType.SAVE_AS_VARIABLE + error_success = 'The test save the value "{value}" as {name}.' + error_name_does_not_exist = 'The test expects a value named "{name}" but it does not exists in the response."' + + for value in self.response['values']: + if 'saveAs' not in value: + continue + + expected_name = 'value' + received_value = response.get('value') + if not self.is_attribute: + expected_name = value.get('name') + if expected_name not in received_value: + result.error(check_type, error_name_does_not_exist.format(name=expected_name)) + continue + + received_value = received_value.get(expected_name) if received_value else None + + save_as = value.get('saveAs') + self._runtime_config_variable_storage[save_as] = received_value + result.success(check_type, error_success.format(value=received_value, name=save_as)) + + def _update_placeholder_values(self, container): + if not container: + return + + values = container['values'] + + for idx, item in enumerate(values): + if 'value' in item: + values[idx]['value'] = self._config_variable_substitution(item['value']) + + if 'constraints' in item: + for constraint, constraint_value in item['constraints'].items(): + values[idx]['constraints'][constraint] = self._config_variable_substitution( + constraint_value) + + container['values'] = values + + def _config_variable_substitution(self, value): + if type(value) is list: + return [self._config_variable_substitution(entry) for entry in value] + elif type(value) is dict: + mapped_value = {} + for key in value: + mapped_value[key] = self._config_variable_substitution(value[key]) + return mapped_value + elif type(value) is str: + # For most tests, a single config variable is used and it can be replaced as in. + # But some other tests were relying on the fact that the expression was put 'as if' in + # the generated code and was resolved before being sent over the wire. For such + # expressions (e.g 'myVar + 1') we need to compute it before sending it over the wire. + tokens = value.split() + if len(tokens) == 0: + return value + + substitution_occured = False + for idx, token in enumerate(tokens): + if token in self._runtime_config_variable_storage: + variable_info = self._runtime_config_variable_storage[token] + if type(variable_info) is dict and 'defaultValue' in variable_info: + variable_info = variable_info['defaultValue'] + tokens[idx] = variable_info + substitution_occured = True + + if len(tokens) == 1: + return tokens[0] + + tokens = [str(token) for token in tokens] + value = ' '.join(tokens) + # TODO we should move away from eval. That will mean that we will need to do extra + # parsing, but it would be safer then just blindly running eval. + return value if not substitution_occured else eval(value) + else: + return value + + +class YamlTests: + '''Parses YAML tests and becomes an iterator to provide 'TestStep's + + The provided TestStep is expected to be used by a runner/adapter to run the test step and + provide the response from the device to the TestStep object. + + Currently this is a one time use object. Eventually this should be refactored to take a + runner/adapter as an argument and run through all test steps and should be reusable for + multiple runs. + ''' + + def __init__(self, parsing_config_variable_storage: dict, definitions, tests: dict): + self._parsing_config_variable_storage = parsing_config_variable_storage + enabled_tests = [] + for test in tests: + test_with_placeholders = _TestStepWithPlaceholders( + test, self._parsing_config_variable_storage, definitions) + if test_with_placeholders.is_enabled: + enabled_tests.append(test_with_placeholders) + fixes.try_update_yaml_node_id_test_runner_state( + enabled_tests, self._parsing_config_variable_storage) + + self._runtime_config_variable_storage = copy.deepcopy(parsing_config_variable_storage) + self._tests = enabled_tests + self._index = 0 + self.count = len(self._tests) + + def __iter__(self): + return self + + def __next__(self) -> TestStep: + if self._index < self.count: + test = self._tests[self._index] + test_step = TestStep(test, self._runtime_config_variable_storage) + self._index += 1 + return test_step + + raise StopIteration + + +class TestParser: + def __init__(self, test_file, pics_file, definitions): + # TODO Needs supports for PICS file + with open(test_file) as f: + loader = yaml.FullLoader + loader = fixes.try_add_yaml_support_for_scientific_notation_without_dot(loader) + + data = yaml.load(f, Loader=loader) + _check_valid_keys(data, _TESTS_SECTION) + + self.name = _value_or_none(data, 'name') + self.PICS = _value_or_none(data, 'PICS') + + self._parsing_config_variable_storage = _value_or_none(data, 'config') + + tests = _value_or_none(data, 'tests') + self.tests = YamlTests(self._parsing_config_variable_storage, definitions, tests) + + def update_config(self, key, value): + self._parsing_config_variable_storage[key] = value diff --git a/scripts/tests/yamltests/setup.py b/scripts/tests/yamltests/setup.py new file mode 100644 index 00000000000000..2bcdfbe8c61d46 --- /dev/null +++ b/scripts/tests/yamltests/setup.py @@ -0,0 +1,28 @@ +# Copyright (c) 2022 Project CHIP Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +"""The yamltest package.""" + +import setuptools # type: ignore + +setuptools.setup( + name='yamltests', + version='0.0.1', + author='Project CHIP Authors', + description='Parse matter yaml test files', + packages=setuptools.find_packages(), + package_data={'yamltest': ['py.typed']}, + zip_safe=False, +) diff --git a/scripts/tests/yamltests/test_spec_definitions.py b/scripts/tests/yamltests/test_spec_definitions.py new file mode 100644 index 00000000000000..00a15da9940d0c --- /dev/null +++ b/scripts/tests/yamltests/test_spec_definitions.py @@ -0,0 +1,291 @@ +#!/usr/bin/env -S python3 -B +# +# Copyright (c) 2022 Project CHIP Authors +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from definitions import * + +import unittest +import io + +source_cluster = ''' + + + Test + 0x1234 + + +''' + +source_command = ''' + + + Test + 0x1234 + + + + + +''' + +source_response = ''' + + + Test + 0x1234 + + + + + + + +''' + +source_attribute = ''' + + + TestGlobalAttribute + + + + Test + 0x1234 + + + TestAttribute + + + +''' + +source_event = ''' + + + Test + 0x1234 + + + + + +''' + +source_bitmap = ''' + + + + + + + + + + + + + Test + 0x1234 + + + + TestWrong + 0x4321 + + +''' + +source_enum = ''' + + + + + + + + + + + + + Test + 0x1234 + + + + TestWrong + 0x4321 + + +''' + +source_struct = ''' + + + + + + + + + + + + + + + + + + Test + 0x1234 + + + + TestWrong + 0x4321 + + +''' + + +class TestSpecDefinitions(unittest.TestCase): + def test_cluster_name(self): + definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_cluster), name='source_cluster')]) + self.assertIsNone(definitions.get_cluster_name(0x4321)) + self.assertEqual(definitions.get_cluster_name(0x1234), 'Test') + + def test_command_name(self): + definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_command), name='source_command')]) + self.assertIsNone(definitions.get_command_name(0x4321, 0x0)) + self.assertIsNone(definitions.get_command_name(0x1234, 0x1)) + self.assertEqual(definitions.get_command_name(0x1234, 0x0), 'TestCommand') + + def test_response_name(self): + definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_response), name='source_response')]) + self.assertIsNone(definitions.get_response_name(0x4321, 0x0)) + self.assertIsNone(definitions.get_response_name(0x1234, 0x1)) + self.assertEqual(definitions.get_response_name(0x1234, 0x0), 'TestCommandResponse') + + def test_attribute_name(self): + definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_attribute), name='source_attribute')]) + self.assertIsNone(definitions.get_attribute_name(0x4321, 0x0)) + self.assertIsNone(definitions.get_attribute_name(0x4321, 0xFFFD)) + self.assertIsNone(definitions.get_attribute_name(0x1234, 0x1)) + self.assertEqual(definitions.get_attribute_name(0x1234, 0x0), 'TestAttribute') + self.assertEqual(definitions.get_attribute_name(0x1234, 0xFFFD), 'TestGlobalAttribute') + + def test_event_name(self): + definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_event), name='source_event')]) + self.assertIsNone(definitions.get_event_name(0x4321, 0x0)) + self.assertIsNone(definitions.get_event_name(0x1234, 0x1)) + self.assertEqual(definitions.get_event_name(0x1234, 0x0), 'TestEvent') + + def test_get_command_by_name(self): + definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_command), name='source_command')]) + self.assertIsNone(definitions.get_command_by_name('WrongName', 'TestCommand')) + self.assertIsNone(definitions.get_command_by_name('Test', 'TestWrongCommand')) + self.assertIsNone(definitions.get_response_by_name('Test', 'TestCommand')) + self.assertIsInstance(definitions.get_command_by_name('Test', 'TestCommand'), Command) + self.assertIsNone(definitions.get_command_by_name('test', 'TestCommand')) + self.assertIsInstance(definitions.get_command_by_name('Test', 'testcommand'), Command) + + def test_get_response_by_name(self): + definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_response), name='source_response')]) + self.assertIsNone(definitions.get_response_by_name('WrongName', 'TestCommandResponse')) + self.assertIsNone(definitions.get_response_by_name('Test', 'TestWrongCommandResponse')) + self.assertIsNone(definitions.get_command_by_name('Test', 'TestCommandResponse')) + self.assertIsInstance(definitions.get_response_by_name('Test', 'TestCommandResponse'), Struct) + self.assertIsNone(definitions.get_response_by_name('test', 'TestCommandResponse')) + self.assertIsInstance(definitions.get_response_by_name('Test', 'testcommandresponse'), Struct) + + def test_get_attribute_by_name(self): + definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_attribute), name='source_attribute')]) + self.assertIsNone(definitions.get_attribute_by_name('WrongName', 'TestAttribute')) + self.assertIsNone(definitions.get_attribute_by_name('WrongName', 'TestGlobalAttribute')) + self.assertIsNone(definitions.get_attribute_by_name('Test', 'TestWrongAttribute')) + self.assertIsInstance(definitions.get_attribute_by_name('Test', 'TestAttribute'), Attribute) + self.assertIsInstance(definitions.get_attribute_by_name('Test', 'TestGlobalAttribute'), Attribute) + self.assertIsNone(definitions.get_attribute_by_name('test', 'TestAttribute')) + self.assertIsNone(definitions.get_attribute_by_name('test', 'TestGlobalAttribute')) + self.assertIsInstance(definitions.get_attribute_by_name('Test', 'testattribute'), Attribute) + self.assertIsInstance(definitions.get_attribute_by_name('Test', 'testglobalattribute'), Attribute) + + def test_get_event_by_name(self): + definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_event), name='source_event')]) + self.assertIsNone(definitions.get_event_by_name('WrongName', 'TestEvent')) + self.assertIsNone(definitions.get_event_by_name('Test', 'TestWrongEvent')) + self.assertIsInstance(definitions.get_event_by_name('Test', 'TestEvent'), Event) + self.assertIsNone(definitions.get_event_by_name('test', 'TestEvent')) + self.assertIsInstance(definitions.get_event_by_name('Test', 'testevent'), Event) + + def test_get_bitmap_by_name(self): + definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_bitmap), name='source_bitmap')]) + self.assertIsNone(definitions.get_bitmap_by_name('WrongName', 'TestBitmap')) + self.assertIsNone(definitions.get_bitmap_by_name('Test', 'TestWrongBitmap')) + self.assertIsInstance(definitions.get_bitmap_by_name('Test', 'TestBitmap'), Bitmap) + self.assertIsNone(definitions.get_bitmap_by_name('test', 'TestBitmap')) + self.assertIsInstance(definitions.get_bitmap_by_name('Test', 'testbitmap'), Bitmap) + + def test_get_enum_by_name(self): + definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_enum), name='source_enum')]) + self.assertIsNone(definitions.get_enum_by_name('WrongName', 'TestEnum')) + self.assertIsNone(definitions.get_enum_by_name('Test', 'TestWrongEnum')) + self.assertIsInstance(definitions.get_enum_by_name('Test', 'TestEnum'), Enum) + self.assertIsNone(definitions.get_enum_by_name('test', 'TestEnum')) + self.assertIsInstance(definitions.get_enum_by_name('Test', 'testenum'), Enum) + + def test_get_struct_by_name(self): + definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_struct), name='source_struct')]) + self.assertIsNone(definitions.get_struct_by_name('WrongName', 'TestStruct')) + self.assertIsNone(definitions.get_struct_by_name('Test', 'TestWrongStruct')) + self.assertIsInstance(definitions.get_struct_by_name('Test', 'TestStruct'), Struct) + self.assertIsNone(definitions.get_struct_by_name('test', 'TestStruct')) + self.assertIsInstance(definitions.get_struct_by_name('Test', 'teststruct'), Struct) + + def test_get_type_by_name(self): + definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_command), name='source_command')]) + self.assertIsNone(definitions.get_type_by_name('Test', 'TestCommand')) + + definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_response), name='source_response')]) + self.assertIsInstance(definitions.get_type_by_name('Test', 'TestCommandResponse'), Struct) + + definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_attribute), name='source_attribute')]) + self.assertIsNone(definitions.get_type_by_name('Test', 'TestAttribute')) + + definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_event), name='source_event')]) + self.assertIsNone(definitions.get_type_by_name('Test', 'TestEvent')) + + definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_bitmap), name='source_bitmap')]) + self.assertIsInstance(definitions.get_type_by_name('Test', 'TestBitmap'), Bitmap) + + definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_enum), name='source_enum')]) + self.assertIsInstance(definitions.get_type_by_name('Test', 'TestEnum'), Enum) + + definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_struct), name='source_struct')]) + self.assertIsInstance(definitions.get_type_by_name('Test', 'TestStruct'), Struct) + + def test_is_fabric_scoped(self): + definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_struct), name='source_struct')]) + + struct = definitions.get_struct_by_name('Test', 'TestStruct') + self.assertFalse(definitions.is_fabric_scoped(struct)) + + struct = definitions.get_struct_by_name('Test', 'TestStructFabricScoped') + self.assertTrue(definitions.is_fabric_scoped(struct)) + + +if __name__ == '__main__': + unittest.main()