diff --git a/scripts/tests/test_yaml_parser.py b/scripts/tests/test_yaml_parser.py
new file mode 100644
index 00000000000000..c6a73da4f8eed9
--- /dev/null
+++ b/scripts/tests/test_yaml_parser.py
@@ -0,0 +1,95 @@
+#
+# Copyright (c) 2022 Project CHIP Authors
+# All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# TODO Once yamltest is a proper self contained module we can move this file
+# to a more appropriate spot. For now, having this file to do some quick checks
+# is arguably better then no checks at all.
+
+import io
+import tempfile
+import unittest
+
+from yamltests.definitions import *
+from yamltests.parser import TestParser
+
+simple_test_description = '''
+
+
+
+
+
+
+
+ Test
+ 0x1234
+
+
+
+'''
+
+simple_test_yaml = '''
+name: Test Cluster Tests
+
+config:
+ nodeId: 0x12344321
+ cluster: "Test"
+ endpoint: 1
+
+tests:
+ - label: "Send Test Command"
+ command: "test"
+
+ - label: "Send Test Not Handled Command"
+ command: "testNotHandled"
+ response:
+ error: INVALID_COMMAND
+
+ - label: "Send Test Specific Command"
+ command: "testSpecific"
+ response:
+ values:
+ - name: "returnValue"
+ value: 7
+'''
+
+
+class TestYamlParser(unittest.TestCase):
+ def setUp(self):
+ self._definitions = SpecDefinitions(
+ [ParseSource(source=io.StringIO(simple_test_description), name='simple_test_description')])
+ self._temp_file = tempfile.NamedTemporaryFile(suffix='.yaml')
+ with open(self._temp_file.name, 'w') as f:
+ f.writelines(simple_test_yaml)
+ pics_file = None
+ self._yaml_parser = TestParser(self._temp_file.name, pics_file, self._definitions)
+
+ def test_able_to_iterate_over_all_parsed_tests(self):
+ # self._yaml_parser.tests implements `__next__`, which does value substitution. We are
+ # simply ensure there is no exceptions raise.
+ count = 0
+ for idx, test_step in enumerate(self._yaml_parser.tests):
+ count += 1
+ pass
+ self.assertEqual(count, 3)
+
+
+def main():
+ unittest.main()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/scripts/tests/yamltests/BUILD.gn b/scripts/tests/yamltests/BUILD.gn
new file mode 100644
index 00000000000000..2f9a49c0744181
--- /dev/null
+++ b/scripts/tests/yamltests/BUILD.gn
@@ -0,0 +1,39 @@
+# Copyright (c) 2022 Project CHIP Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import("//build_overrides/build.gni")
+import("//build_overrides/chip.gni")
+
+import("//build_overrides/pigweed.gni")
+import("$dir_pw_build/python.gni")
+
+pw_python_package("yamltests") {
+ setup = [ "setup.py" ]
+
+ sources = [
+ "__init__.py",
+ "constraints.py",
+ "definitions.py",
+ "fixes.py",
+ "parser.py",
+ ]
+
+ python_deps = [ "${chip_root}/scripts/idl" ]
+
+ tests = [ "test_spec_definitions.py" ]
+
+ # TODO: at a future time consider enabling all (* or missing) here to get
+ # pylint checking these files
+ static_analysis = []
+}
diff --git a/scripts/tests/yamltests/__init__.py b/scripts/tests/yamltests/__init__.py
new file mode 100644
index 00000000000000..e69de29bb2d1d6
diff --git a/scripts/tests/yamltests/constraints.py b/scripts/tests/yamltests/constraints.py
new file mode 100644
index 00000000000000..0da769fb321d83
--- /dev/null
+++ b/scripts/tests/yamltests/constraints.py
@@ -0,0 +1,357 @@
+#
+# Copyright (c) 2022 Project CHIP Authors
+# All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from abc import ABC, abstractmethod
+import string
+
+
+class ConstraintParseError(Exception):
+ def __init__(self, message):
+ super().__init__(message)
+
+
+class ConstraintValidationError(Exception):
+ def __init__(self, message):
+ super().__init__(message)
+
+
+class BaseConstraint(ABC):
+ '''Constrain Interface'''
+
+ def __init__(self, types: list, is_null_allowed: bool = False):
+ '''An empty type list provided that indicates any type is accepted'''
+ self._types = types
+ self._is_null_allowed = is_null_allowed
+
+ def is_met(self, value):
+ if value is None:
+ return self._is_null_allowed
+
+ response_type = type(value)
+ if self._types and response_type not in self._types:
+ return False
+
+ return self.check_response(value)
+
+ @abstractmethod
+ def check_response(self, value) -> bool:
+ pass
+
+
+class _ConstraintHasValue(BaseConstraint):
+ def __init__(self, has_value):
+ super().__init__(types=[])
+ self._has_value = has_value
+
+ def check_response(self, value) -> bool:
+ raise ConstraintValidationError('HasValue constraint currently not implemented')
+
+
+class _ConstraintType(BaseConstraint):
+ def __init__(self, type):
+ super().__init__(types=[], is_null_allowed=True)
+ self._type = type
+
+ def check_response(self, value) -> bool:
+ success = False
+ if self._type == 'boolean' and type(value) is bool:
+ success = True
+ elif self._type == 'list' and type(value) is list:
+ success = True
+ elif self._type == 'char_string' and type(value) is str:
+ success = True
+ elif self._type == 'octet_string' and type(value) is bytes:
+ success = True
+ elif self._type == 'vendor_id' and type(value) is int:
+ success = value >= 0 and value <= 0xFFFF
+ elif self._type == 'device_type_id' and type(value) is int:
+ success = value >= 0 and value <= 0xFFFFFFFF
+ elif self._type == 'cluster_id' and type(value) is int:
+ success = value >= 0 and value <= 0xFFFFFFFF
+ elif self._type == 'attribute_id' and type(value) is int:
+ success = value >= 0 and value <= 0xFFFFFFFF
+ elif self._type == 'field_id' and type(value) is int:
+ success = value >= 0 and value <= 0xFFFFFFFF
+ elif self._type == 'command_id' and type(value) is int:
+ success = value >= 0 and value <= 0xFFFFFFFF
+ elif self._type == 'event_id' and type(value) is int:
+ success = value >= 0 and value <= 0xFFFFFFFF
+ elif self._type == 'action_id' and type(value) is int:
+ success = value >= 0 and value <= 0xFF
+ elif self._type == 'transaction_id' and type(value) is int:
+ success = value >= 0 and value <= 0xFFFFFFFF
+ elif self._type == 'node_id' and type(value) is int:
+ success = value >= 0 and value <= 0xFFFFFFFFFFFFFFFF
+ elif self._type == 'bitmap8' and type(value) is int:
+ success = value >= 0 and value <= 0xFF
+ elif self._type == 'bitmap16' and type(value) is int:
+ success = value >= 0 and value <= 0xFFFF
+ elif self._type == 'bitmap32' and type(value) is int:
+ success = value >= 0 and value <= 0xFFFFFFFF
+ elif self._type == 'bitmap64' and type(value) is int:
+ success = value >= 0 and value <= 0xFFFFFFFFFFFFFFFF
+ elif self._type == 'enum8' and type(value) is int:
+ success = value >= 0 and value <= 0xFF
+ elif self._type == 'enum16' and type(value) is int:
+ success = value >= 0 and value <= 0xFFFF
+ elif self._type == 'Percent' and type(value) is int:
+ success = value >= 0 and value <= 0xFF
+ elif self._type == 'Percent100ths' and type(value) is int:
+ success = value >= 0 and value <= 0xFFFF
+ elif self._type == 'epoch_us' and type(value) is int:
+ success = value >= 0 and value <= 0xFFFFFFFFFFFFFFFF
+ elif self._type == 'epoch_s' and type(value) is int:
+ success = value >= 0 and value <= 0xFFFFFFFF
+ elif self._type == 'utc' and type(value) is int:
+ success = value >= 0 and value <= 0xFFFFFFFF
+ elif self._type == 'date' and type(value) is int:
+ success = value >= 0 and value <= 0xFFFFFFFF
+ elif self._type == 'tod' and type(value) is int:
+ success = value >= 0 and value <= 0xFFFFFFFF
+ elif self._type == 'int8u' and type(value) is int:
+ success = value >= 0 and value <= 0xFF
+ elif self._type == 'int16u' and type(value) is int:
+ success = value >= 0 and value <= 0xFFFF
+ elif self._type == 'int24u' and type(value) is int:
+ success = value >= 0 and value <= 0xFFFFFF
+ elif self._type == 'int32u' and type(value) is int:
+ success = value >= 0 and value <= 0xFFFFFFFF
+ elif self._type == 'int40u' and type(value) is int:
+ success = value >= 0 and value <= 0xFFFFFFFFFF
+ elif self._type == 'int48u' and type(value) is int:
+ success = value >= 0 and value <= 0xFFFFFFFFFFFF
+ elif self._type == 'int56u' and type(value) is int:
+ success = value >= 0 and value <= 0xFFFFFFFFFFFFFF
+ elif self._type == 'int64u' and type(value) is int:
+ success = value >= 0 and value <= 0xFFFFFFFFFFFFFFFF
+ elif self._type == 'nullable_int8u' and type(value) is int:
+ success = value >= 0 and value <= 0xFE
+ elif self._type == 'nullable_int16u' and type(value) is int:
+ success = value >= 0 and value <= 0xFFFE
+ elif self._type == 'nullable_int24u' and type(value) is int:
+ success = value >= 0 and value <= 0xFFFFFE
+ elif self._type == 'nullable_int32u' and type(value) is int:
+ success = value >= 0 and value <= 0xFFFFFFFE
+ elif self._type == 'nullable_int40u' and type(value) is int:
+ success = value >= 0 and value <= 0xFFFFFFFFFE
+ elif self._type == 'nullable_int48u' and type(value) is int:
+ success = value >= 0 and value <= 0xFFFFFFFFFFFE
+ elif self._type == 'nullable_int56u' and type(value) is int:
+ success = value >= 0 and value <= 0xFFFFFFFFFFFFFE
+ elif self._type == 'nullable_int64u' and type(value) is int:
+ success = value >= 0 and value <= 0xFFFFFFFFFFFFFFFE
+ elif self._type == 'int8s' and type(value) is int:
+ success = value >= -128 and value <= 127
+ elif self._type == 'int16s' and type(value) is int:
+ success = value >= -32768 and value <= 32767
+ elif self._type == 'int24s' and type(value) is int:
+ success = value >= -8388608 and value <= 8388607
+ elif self._type == 'int32s' and type(value) is int:
+ success = value >= -2147483648 and value <= 2147483647
+ elif self._type == 'int40s' and type(value) is int:
+ success = value >= -549755813888 and value <= 549755813887
+ elif self._type == 'int48s' and type(value) is int:
+ success = value >= -140737488355328 and value <= 140737488355327
+ elif self._type == 'int56s' and type(value) is int:
+ success = value >= -36028797018963968 and value <= 36028797018963967
+ elif self._type == 'int64s' and type(value) is int:
+ success = value >= -9223372036854775808 and value <= 9223372036854775807
+ elif self._type == 'nullable_int8s' and type(value) is int:
+ success = value >= -127 and value <= 127
+ elif self._type == 'nullable_int16s' and type(value) is int:
+ success = value >= -32767 and value <= 32767
+ elif self._type == 'nullable_int24s' and type(value) is int:
+ success = value >= -8388607 and value <= 8388607
+ elif self._type == 'nullable_int32s' and type(value) is int:
+ success = value >= -2147483647 and value <= 2147483647
+ elif self._type == 'nullable_int40s' and type(value) is int:
+ success = value >= -549755813887 and value <= 549755813887
+ elif self._type == 'nullable_int48s' and type(value) is int:
+ success = value >= -140737488355327 and value <= 140737488355327
+ elif self._type == 'nullable_int56s' and type(value) is int:
+ success = value >= -36028797018963967 and value <= 36028797018963967
+ elif self._type == 'nullable_int64s' and type(value) is int:
+ success = value >= -9223372036854775807 and value <= 9223372036854775807
+ return success
+
+
+class _ConstraintMinLength(BaseConstraint):
+ def __init__(self, min_length):
+ super().__init__(types=[str, bytes, list])
+ self._min_length = min_length
+
+ def check_response(self, value) -> bool:
+ return len(value) >= self._min_length
+
+
+class _ConstraintMaxLength(BaseConstraint):
+ def __init__(self, max_length):
+ super().__init__(types=[str, bytes, list])
+ self._max_length = max_length
+
+ def check_response(self, value) -> bool:
+ return len(value) <= self._max_length
+
+
+class _ConstraintIsHexString(BaseConstraint):
+ def __init__(self, is_hex_string: bool):
+ super().__init__(types=[str])
+ self._is_hex_string = is_hex_string
+
+ def check_response(self, value) -> bool:
+ return all(c in string.hexdigits for c in value) == self._is_hex_string
+
+
+class _ConstraintStartsWith(BaseConstraint):
+ def __init__(self, starts_with):
+ super().__init__(types=[str])
+ self._starts_with = starts_with
+
+ def check_response(self, value) -> bool:
+ return value.startswith(self._starts_with)
+
+
+class _ConstraintEndsWith(BaseConstraint):
+ def __init__(self, ends_with):
+ super().__init__(types=[str])
+ self._ends_with = ends_with
+
+ def check_response(self, value) -> bool:
+ return value.endswith(self._ends_with)
+
+
+class _ConstraintIsUpperCase(BaseConstraint):
+ def __init__(self, is_upper_case):
+ super().__init__(types=[str])
+ self._is_upper_case = is_upper_case
+
+ def check_response(self, value) -> bool:
+ return value.isupper() == self._is_upper_case
+
+
+class _ConstraintIsLowerCase(BaseConstraint):
+ def __init__(self, is_lower_case):
+ super().__init__(types=[str])
+ self._is_lower_case = is_lower_case
+
+ def check_response(self, value) -> bool:
+ return value.islower() == self._is_lower_case
+
+
+class _ConstraintMinValue(BaseConstraint):
+ def __init__(self, min_value):
+ super().__init__(types=[int, float], is_null_allowed=True)
+ self._min_value = min_value
+
+ def check_response(self, value) -> bool:
+ return value >= self._min_value
+
+
+class _ConstraintMaxValue(BaseConstraint):
+ def __init__(self, max_value):
+ super().__init__(types=[int, float], is_null_allowed=True)
+ self._max_value = max_value
+
+ def check_response(self, value) -> bool:
+ return value <= self._max_value
+
+
+class _ConstraintContains(BaseConstraint):
+ def __init__(self, contains):
+ super().__init__(types=[list])
+ self._contains = contains
+
+ def check_response(self, value) -> bool:
+ return set(self._contains).issubset(value)
+
+
+class _ConstraintExcludes(BaseConstraint):
+ def __init__(self, excludes):
+ super().__init__(types=[list])
+ self._excludes = excludes
+
+ def check_response(self, value) -> bool:
+ return set(self._excludes).isdisjoint(value)
+
+
+class _ConstraintHasMaskSet(BaseConstraint):
+ def __init__(self, has_masks_set):
+ super().__init__(types=[int])
+ self._has_masks_set = has_masks_set
+
+ def check_response(self, value) -> bool:
+ return all([(value & mask) == mask for mask in self._has_masks_set])
+
+
+class _ConstraintHasMaskClear(BaseConstraint):
+ def __init__(self, has_masks_clear):
+ super().__init__(types=[int])
+ self._has_masks_clear = has_masks_clear
+
+ def check_response(self, value) -> bool:
+ return all([(value & mask) == 0 for mask in self._has_masks_clear])
+
+
+class _ConstraintNotValue(BaseConstraint):
+ def __init__(self, not_value):
+ super().__init__(types=[], is_null_allowed=True)
+ self._not_value = not_value
+
+ def check_response(self, value) -> bool:
+ return value != self._not_value
+
+
+def get_constraints(constraints: dict) -> list[BaseConstraint]:
+ _constraints = []
+
+ for constraint, constraint_value in constraints.items():
+ if 'hasValue' == constraint:
+ _constraints.append(_ConstraintHasValue(constraint_value))
+ elif 'type' == constraint:
+ _constraints.append(_ConstraintType(constraint_value))
+ elif 'minLength' == constraint:
+ _constraints.append(_ConstraintMinLength(constraint_value))
+ elif 'maxLength' == constraint:
+ _constraints.append(_ConstraintMaxLength(constraint_value))
+ elif 'isHexString' == constraint:
+ _constraints.append(_ConstraintIsHexString(constraint_value))
+ elif 'startsWith' == constraint:
+ _constraints.append(_ConstraintStartsWith(constraint_value))
+ elif 'endsWith' == constraint:
+ _constraints.append(_ConstraintEndsWith(constraint_value))
+ elif 'isUpperCase' == constraint:
+ _constraints.append(_ConstraintIsUpperCase(constraint_value))
+ elif 'isLowerCase' == constraint:
+ _constraints.append(_ConstraintIsLowerCase(constraint_value))
+ elif 'minValue' == constraint:
+ _constraints.append(_ConstraintMinValue(constraint_value))
+ elif 'maxValue' == constraint:
+ _constraints.append(_ConstraintMaxValue(constraint_value))
+ elif 'contains' == constraint:
+ _constraints.append(_ConstraintContains(constraint_value))
+ elif 'excludes' == constraint:
+ _constraints.append(_ConstraintExcludes(constraint_value))
+ elif 'hasMasksSet' == constraint:
+ _constraints.append(_ConstraintHasMaskSet(constraint_value))
+ elif 'hasMasksClear' == constraint:
+ _constraints.append(_ConstraintHasMaskClear(constraint_value))
+ elif 'notValue' == constraint:
+ _constraints.append(_ConstraintNotValue(constraint_value))
+ else:
+ raise ConstraintParseError(f'Unknown constraint type:{constraint}')
+
+ return _constraints
diff --git a/scripts/tests/yamltests/definitions.py b/scripts/tests/yamltests/definitions.py
new file mode 100644
index 00000000000000..006704993d7bab
--- /dev/null
+++ b/scripts/tests/yamltests/definitions.py
@@ -0,0 +1,200 @@
+#
+# Copyright (c) 2022 Project CHIP Authors
+#
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import List
+import enum
+
+try:
+ from idl.zapxml import ParseSource, ParseXmls
+ from idl.matter_idl_types import *
+except:
+ import os
+ import sys
+ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../..')))
+
+ from idl.zapxml import ParseSource, ParseXmls
+ from idl.matter_idl_types import *
+
+
+class _ItemType(enum.Enum):
+ Cluster = 0
+ Request = 1
+ Response = 2
+ Attribute = 3
+ Event = 4
+ Bitmap = 5
+ Enum = 6
+ Struct = 7
+
+
+class SpecDefinitions:
+
+ def __init__(self, sources: List[ParseSource]):
+ self.__clusters_by_id: dict[int, Cluster] = {}
+ self.__commands_by_id: dict[int, dict[int, Command]] = {}
+ self.__responses_by_id: dict[int, dict[int, Struct]] = {}
+ self.__attributes_by_id: dict[int, dict[int, Attribute]] = {}
+ self.__events_by_id: dict[int, dict[int, Event]] = {}
+
+ self.__clusters_by_name: dict[str, int] = {}
+ self.__commands_by_name: dict[str, int] = {}
+ self.__responses_by_name: dict[str, int] = {}
+ self.__attributes_by_name: dict[str, int] = {}
+ self.__events_by_name: dict[str, int] = {}
+
+ self.__bitmaps_by_name: dict[str, dict[str, Bitmap]] = {}
+ self.__enums_by_name: dict[str, dict[str, Enum]] = {}
+ self.__structs_by_name: dict[str, dict[str, Struct]] = {}
+
+ idl = ParseXmls(sources)
+
+ for cluster in idl.clusters:
+ code: int = cluster.code
+ name: str = cluster.name
+ self.__clusters_by_id[code] = cluster
+ self.__commands_by_id[code] = {c.code: c for c in cluster.commands}
+ self.__responses_by_id[code] = {}
+ self.__attributes_by_id[code] = {a.definition.code: a for a in cluster.attributes}
+ self.__events_by_id[code] = {e.code: e for e in cluster.events}
+
+ self.__clusters_by_name[name] = cluster.code
+ self.__commands_by_name[name] = {c.name.lower(): c.code for c in cluster.commands}
+ self.__responses_by_name[name] = {}
+ self.__attributes_by_name[name] = {a.definition.name.lower(): a.definition.code for a in cluster.attributes}
+ self.__events_by_name[name] = {e.name.lower(): e.code for e in cluster.events}
+
+ self.__bitmaps_by_name[name] = {b.name.lower(): b for b in cluster.bitmaps}
+ self.__enums_by_name[name] = {e.name.lower(): e for e in cluster.enums}
+ self.__structs_by_name[name] = {s.name.lower(): s for s in cluster.structs}
+
+ for struct in cluster.structs:
+ if struct.tag == StructTag.RESPONSE:
+ self.__responses_by_id[code][struct.code] = struct
+ self.__responses_by_name[name][struct.name.lower()] = struct.code
+
+ def get_cluster_name(self, cluster_id: int) -> str:
+ cluster = self.__clusters_by_id.get(cluster_id)
+ return cluster.name if cluster else None
+
+ def get_command_name(self, cluster_id: int, command_id: int) -> str:
+ command = self.__get_by_id(cluster_id, command_id, _ItemType.Request)
+ return command.name if command else None
+
+ def get_response_name(self, cluster_id: int, response_id: int) -> str:
+ response = self.__get_by_id(cluster_id, response_id, _ItemType.Response)
+ return response.name if response else None
+
+ def get_attribute_name(self, cluster_id: int, attribute_id: int) -> str:
+ attribute = self.__get_by_id(cluster_id, attribute_id, _ItemType.Attribute)
+ return attribute.definition.name if attribute else None
+
+ def get_event_name(self, cluster_id: int, event_id: int) -> str:
+ event = self.__get_by_id(cluster_id, event_id, _ItemType.Event)
+ return event.name if event else None
+
+ def get_command_by_name(self, cluster_name: str, command_name: str) -> Command:
+ return self.__get_by_name(cluster_name, command_name, _ItemType.Request)
+
+ def get_response_by_name(self, cluster_name: str, response_name: str) -> Struct:
+ return self.__get_by_name(cluster_name, response_name, _ItemType.Response)
+
+ def get_attribute_by_name(self, cluster_name: str, attribute_name: str) -> Attribute:
+ return self.__get_by_name(cluster_name, attribute_name, _ItemType.Attribute)
+
+ def get_event_by_name(self, cluster_name: str, event_name: str) -> Event:
+ return self.__get_by_name(cluster_name, event_name, _ItemType.Event)
+
+ def get_bitmap_by_name(self, cluster_name: str, bitmap_name: str) -> Bitmap:
+ return self.__get_by_name(cluster_name, bitmap_name, _ItemType.Bitmap)
+
+ def get_enum_by_name(self, cluster_name: str, enum_name: str) -> Bitmap:
+ return self.__get_by_name(cluster_name, enum_name, _ItemType.Enum)
+
+ def get_struct_by_name(self, cluster_name: str, struct_name: str) -> Struct:
+ return self.__get_by_name(cluster_name, struct_name, _ItemType.Struct)
+
+ def get_type_by_name(self, cluster_name: str, target_name: str):
+ bitmap = self.get_bitmap_by_name(cluster_name, target_name)
+ if bitmap:
+ return bitmap
+
+ enum = self.get_enum_by_name(cluster_name, target_name)
+ if enum:
+ return enum
+
+ struct = self.get_struct_by_name(cluster_name, target_name)
+ if struct:
+ return struct
+
+ return None
+
+ def is_fabric_scoped(self, target) -> bool:
+ if hasattr(target, 'qualities'):
+ return bool(target.qualities & StructQuality.FABRIC_SCOPED)
+ return False
+
+ def __get_by_name(self, cluster_name: str, target_name: str, target_type: _ItemType):
+ if not cluster_name or not target_name:
+ return None
+
+ # The idl parser remove spaces
+ cluster_name = cluster_name.replace(' ', '')
+ # Many YAML tests formats the name using camelCase despites that the spec mandates
+ # CamelCase. To be compatible with the current tests, everything is converted to lower case.
+ target_name = target_name.lower()
+
+ cluster_id = self.__clusters_by_name.get(cluster_name)
+ if cluster_id is None:
+ return None
+
+ target = None
+
+ if target_type == _ItemType.Request:
+ target_id = self.__commands_by_name.get(cluster_name).get(target_name)
+ target = self.__get_by_id(cluster_id, target_id, target_type)
+ elif target_type == _ItemType.Response:
+ target_id = self.__responses_by_name.get(cluster_name).get(target_name)
+ target = self.__get_by_id(cluster_id, target_id, target_type)
+ elif target_type == _ItemType.Event:
+ target_id = self.__events_by_name.get(cluster_name).get(target_name)
+ target = self.__get_by_id(cluster_id, target_id, target_type)
+ elif target_type == _ItemType.Attribute:
+ target_id = self.__attributes_by_name.get(cluster_name).get(target_name)
+ target = self.__get_by_id(cluster_id, target_id, target_type)
+ elif target_type == _ItemType.Bitmap:
+ target = self.__bitmaps_by_name.get(cluster_name).get(target_name)
+ elif target_type == _ItemType.Enum:
+ target = self.__enums_by_name.get(cluster_name).get(target_name)
+ elif target_type == _ItemType.Struct:
+ target = self.__structs_by_name.get(cluster_name).get(target_name)
+
+ return target
+
+ def __get_by_id(self, cluster_id: int, target_id: int, target_type: str):
+ targets = None
+
+ if target_type == _ItemType.Request:
+ targets = self.__commands_by_id.get(cluster_id)
+ elif target_type == _ItemType.Response:
+ targets = self.__responses_by_id.get(cluster_id)
+ elif target_type == _ItemType.Event:
+ targets = self.__events_by_id.get(cluster_id)
+ elif target_type == _ItemType.Attribute:
+ targets = self.__attributes_by_id.get(cluster_id)
+
+ if targets is None:
+ return None
+
+ return targets.get(target_id)
diff --git a/scripts/tests/yamltests/fixes.py b/scripts/tests/yamltests/fixes.py
new file mode 100644
index 00000000000000..a0126b003f9a2e
--- /dev/null
+++ b/scripts/tests/yamltests/fixes.py
@@ -0,0 +1,116 @@
+#
+# Copyright (c) 2022 Project CHIP Authors
+#
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import re
+import binascii
+
+'''Fixes certain value formats known to exist in YAML tests
+
+Some of the YAML test files contains some values that has been crafted to avoid some limitations
+of the original JavaScript parser and the C++ generated bits. This file contains functions to
+changes and convert those YAML values them back to something agnostic.
+'''
+
+
+def try_apply_yaml_cpp_longlong_limitation_fix(value):
+ '''Fix a known nasty hack in order that was used to work around compiler issue for C++.
+
+ When -9223372036854775808 was provided compiler would give a warning saying it is not a valid
+ way to write a long long in C++.
+ '''
+ if value == "-9223372036854775807LL - 1":
+ value = -9223372036854775808
+ return value
+
+
+def try_apply_yaml_unrepresentable_integer_for_javascript_fixes(value):
+ '''Fix up large integers that are represented within a string.
+
+ JavaScript can not represent integers bigger than 9007199254740991. But some of the test may
+ uses values that are bigger than this. The current way to workaround this limitation has
+ been to write those numbers as strings by encapsulating them in "".
+ '''
+ if type(value) is str:
+ value = int(value)
+ return value
+
+
+def try_apply_yaml_float_written_as_strings(value):
+ '''Fix up floats that are represented within a string.'''
+ if type(value) is str:
+ value = float(value)
+ return value
+
+
+# TODO(thampson) This method is a clone of the method in
+# src/controller/python/chip/yaml/format_converter.py and should eventually be removed in that file.
+def convert_yaml_octet_string_to_bytes(s: str) -> bytes:
+ '''Convert YAML octet string body to bytes.
+
+ This handles any c-style hex escapes (e.g. \x5a) and hex: prefix
+ '''
+ # Step 1: handle explicit "hex:" prefix
+ if s.startswith('hex:'):
+ return binascii.unhexlify(s[4:])
+
+ # Step 2: convert non-hex-prefixed to bytes
+ # TODO(#23669): This does not properly support utf8 octet strings. We mimic
+ # javascript codegen behavior. Behavior of javascript is:
+ # * Octet string character >= u+0200 errors out.
+ # * Any character greater than 0xFF has the upper bytes chopped off.
+ as_bytes = [ord(c) for c in s]
+
+ if any([value > 0x200 for value in as_bytes]):
+ raise ValueError('Unsupported char in octet string %r' % as_bytes)
+ accumulated_hex = ''.join([f"{(v & 0xFF):02x}" for v in as_bytes])
+ return binascii.unhexlify(accumulated_hex)
+
+
+def try_add_yaml_support_for_scientific_notation_without_dot(loader):
+ regular_expression = re.compile(u'''^(?:
+ [-+]?(?:[0-9][0-9_]*)\\.[0-9_]*(?:[eE][-+]?[0-9]+)?
+ |[-+]?(?:[0-9][0-9_]*)(?:[eE][-+]?[0-9]+)
+ |\\.[0-9_]+(?:[eE][-+][0-9]+)?
+ |[-+]?[0-9][0-9_]*(?::[0-5]?[0-9])+\\.[0-9_]*
+ |[-+]?\\.(?:inf|Inf|INF)
+ |\\.(?:nan|NaN|NAN))$''', re.X)
+
+ loader.add_implicit_resolver(
+ u'tag:yaml.org,2002:float',
+ regular_expression,
+ list(u'-+0123456789.'))
+ return loader
+
+
+# This is a gross hack. The previous runner has a some internal states where an identity match one
+# accessory. But this state may not exist in the runner (as in it prevent to have multiple node ids
+# associated to a fabric...) so the 'nodeId' needs to be added back manually.
+def try_update_yaml_node_id_test_runner_state(tests, config):
+ identities = {'alpha': None if 'nodeId' not in config else config['nodeId']}
+
+ for test in tests:
+ if not test.is_enabled:
+ continue
+
+ identity = test.identity
+
+ if test.cluster == 'CommissionerCommands':
+ if test.command == 'PairWithCode':
+ for item in test.arguments['values']:
+ if item['name'] == 'nodeId':
+ identities[identity] = item['value']
+ elif identity is not None and identity in identities:
+ nodeId = identities[identity]
+ test.nodeId = nodeId
diff --git a/scripts/tests/yamltests/parser.py b/scripts/tests/yamltests/parser.py
new file mode 100644
index 00000000000000..329d17cf124856
--- /dev/null
+++ b/scripts/tests/yamltests/parser.py
@@ -0,0 +1,708 @@
+#
+# Copyright (c) 2022 Project CHIP Authors
+#
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import copy
+import yaml
+from enum import Enum
+
+from .constraints import get_constraints
+from . import fixes
+
+_TESTS_SECTION = [
+ 'name',
+ 'config',
+ 'tests',
+ 'PICS',
+]
+
+_TEST_SECTION = [
+ 'label',
+ 'cluster',
+ 'command',
+ 'disabled',
+ 'endpoint',
+ 'identity',
+ 'fabricFiltered',
+ 'verification',
+ 'nodeId',
+ 'attribute',
+ 'optional',
+ 'PICS',
+ 'arguments',
+ 'response',
+ 'minInterval',
+ 'maxInterval',
+ 'timedInteractionTimeoutMs',
+ 'busyWaitMs',
+]
+
+_TEST_ARGUMENTS_SECTION = [
+ 'values',
+ 'value',
+]
+
+_TEST_RESPONSE_SECTION = [
+ 'value',
+ 'values',
+ 'error',
+ 'clusterError',
+ 'constraints',
+ 'type',
+ 'hasMasksSet',
+ 'contains',
+ 'saveAs'
+]
+
+_ATTRIBUTE_COMMANDS = [
+ 'readAttribute',
+ 'writeAttribute',
+ 'subscribeAttribute',
+ 'waitForReport',
+]
+
+_EVENT_COMMANDS = [
+ 'readEvent',
+ 'subscribeEvent',
+]
+
+
+class PostProcessCheckStatus(Enum):
+ '''Indicates the post processing check step status.'''
+ SUCCESS = 'success',
+ WARNING = 'warning',
+ ERROR = 'error'
+
+
+class PostProcessCheckType(Enum):
+ '''Indicates the post processing check step type.'''
+ IM_STATUS = 'IMStatus',
+ CLUSTER_STATUS = 'ClusterStatus',
+ RESPONSE_VALIDATION = 'Response',
+ CONSTRAINT_VALIDATION = 'Constraints',
+ SAVE_AS_VARIABLE = 'SaveAs'
+
+
+class PostProcessCheck:
+ '''Information about a single post processing operation that was performed.
+
+ Each check has a helpful message, indicating what the post processing operation did and whether
+ it was successful or not.
+ '''
+
+ def __init__(self, state: PostProcessCheckStatus, category: PostProcessCheckType, message: str):
+ self.state = state
+ self.category = category
+ self.message = message
+
+ def is_success(self) -> bool:
+ return self.state == PostProcessCheckStatus.SUCCESS
+
+ def is_warning(self) -> bool:
+ return self.state == PostProcessCheckStatus.WARNING
+
+ def is_error(self) -> bool:
+ return self.state == PostProcessCheckStatus.ERROR
+
+
+class PostProcessResponseResult:
+ '''Post processing response result information.
+
+ There are multiple operations that occur when post processing a response. This contains all the
+ results for each operation performed. Note that the number and types of steps performed is
+ dependant on test step itself.
+ '''
+
+ def __init__(self):
+ self.entries = []
+ self.successes = 0
+ self.warnings = 0
+ self.errors = 0
+
+ def success(self, category: PostProcessCheckType, message: str):
+ '''Adds a success entry that occured when post processing response to results.'''
+ self._insert(PostProcessCheckStatus.SUCCESS, category, message)
+ self.successes += 1
+
+ def warning(self, category: PostProcessCheckType, message: str):
+ '''Adds a warning entry that occured when post processing response to results.'''
+ self._insert(PostProcessCheckStatus.WARNING, category, message)
+ self.warnings += 1
+
+ def error(self, category: PostProcessCheckType, message: str):
+ '''Adds an error entry that occured when post processing response to results.'''
+ self._insert(PostProcessCheckStatus.ERROR, category, message)
+ self.errors += 1
+
+ def is_success(self):
+ # It is possible that post processing a response doesn't have any success entires added
+ # that is why we explicitly only search for if an error occurred.
+ return self.errors == 0
+
+ def is_failure(self):
+ return self.errors != 0
+
+ def _insert(self, state: PostProcessCheckStatus, category: PostProcessCheckType, message: str):
+ log = PostProcessCheck(state, category, message)
+ self.entries.append(log)
+
+
+def _check_valid_keys(section, valid_keys_dict):
+ if section:
+ for key in section:
+ if key not in valid_keys_dict:
+ print(f'Unknown key: {key}')
+ raise KeyError
+
+
+def _value_or_none(data, key):
+ return data[key] if key in data else None
+
+
+def _value_or_config(data, key, config):
+ return data[key] if key in data else config[key]
+
+
+class _TestStepWithPlaceholders:
+ '''A single YAML test parsed, as is, from YAML.
+
+ Some YAML test steps contain placeholders for variable subsitution. The value of the variable
+ is only known after an earlier test step's has executed and the result successfully post
+ processed.
+ '''
+
+ def __init__(self, test: dict, config: dict, definitions):
+ # Disabled tests are not parsed in order to allow the test to be added to the test
+ # suite even if the feature is not implemented yet.
+ self.is_enabled = not ('disabled' in test and test['disabled'])
+ if not self.is_enabled:
+ return
+
+ self._parsing_config_variable_storage = config
+
+ _check_valid_keys(test, _TEST_SECTION)
+
+ self.label = _value_or_none(test, 'label')
+ self.optional = _value_or_none(test, 'optional')
+ self.node_id = _value_or_config(test, 'nodeId', config)
+ self.cluster = _value_or_config(test, 'cluster', config)
+ self.command = _value_or_config(test, 'command', config)
+ self.attribute = _value_or_none(test, 'attribute')
+ self.endpoint = _value_or_config(test, 'endpoint', config)
+
+ self.identity = _value_or_none(test, 'identity')
+ self.fabric_filtered = _value_or_none(test, 'fabricFiltered')
+ self.min_interval = _value_or_none(test, 'minInterval')
+ self.max_interval = _value_or_none(test, 'maxInterval')
+ self.timed_interaction_timeout_ms = _value_or_none(test, 'timedInteractionTimeoutMs')
+ self.busy_wait_ms = _value_or_none(test, 'busyWaitMs')
+
+ self.is_attribute = self.command in _ATTRIBUTE_COMMANDS
+ self.is_event = self.command in _EVENT_COMMANDS
+
+ self.arguments_with_placeholders = _value_or_none(test, 'arguments')
+ self.response_with_placeholders = _value_or_none(test, 'response')
+
+ _check_valid_keys(self.arguments_with_placeholders, _TEST_ARGUMENTS_SECTION)
+ _check_valid_keys(self.response_with_placeholders, _TEST_RESPONSE_SECTION)
+
+ self._convert_single_value_to_values(self.arguments_with_placeholders)
+ self._convert_single_value_to_values(self.response_with_placeholders)
+
+ argument_mapping = None
+ response_mapping = None
+
+ if self.is_attribute:
+ attribute = definitions.get_attribute_by_name(self.cluster, self.attribute)
+ if attribute:
+ attribute_mapping = self._as_mapping(definitions, self.cluster,
+ attribute.definition.data_type.name)
+ argument_mapping = attribute_mapping
+ response_mapping = attribute_mapping
+ else:
+ command = definitions.get_command_by_name(self.cluster, self.command)
+ if command:
+ argument_mapping = self._as_mapping(definitions, self.cluster, command.input_param)
+ response_mapping = self._as_mapping(definitions, self.cluster, command.output_param)
+
+ self._update_with_definition(self.arguments_with_placeholders, argument_mapping)
+ self._update_with_definition(self.response_with_placeholders, response_mapping)
+
+ # This performs a very basic sanity parse time check of constrains. This parsing happens
+ # again inside post processing response since at that time we will have required variables
+ # to substitute in. This parsing check here has value since some test can take a really
+ # long time to run so knowing earlier on that the test step would have failed at parsing
+ # time before the test step run occurs save developer time that building yaml tests.
+ if self.response_with_placeholders:
+ for value in self.response_with_placeholders['values']:
+ if 'constraints' not in value:
+ continue
+ get_constraints(value['constraints'])
+
+ def _convert_single_value_to_values(self, container):
+ if container is None or 'values' in container:
+ return
+
+ # Attribute tests pass a single value argument that does not carry a name but
+ # instead uses a generic 'value' keyword. Convert to keyword to be the single
+ # members of the 'values' array which is what is used for other tests.
+ value = {}
+
+ known_keys_to_copy = ['value', 'constraints', 'saveAs']
+ known_keys_to_allow = ['error', 'clusterError']
+
+ for key, item in list(container.items()):
+ if key in known_keys_to_copy:
+ value[key] = item
+ del container[key]
+ elif key in known_keys_to_allow:
+ # Nothing to do for those keys.
+ pass
+ else:
+ raise KeyError(f'Unknown key: {key}')
+
+ container['values'] = [value]
+
+ def _as_mapping(self, definitions, cluster_name, target_name):
+ element = definitions.get_type_by_name(cluster_name, target_name)
+
+ if hasattr(element, 'base_type'):
+ target_name = element.base_type.lower()
+ elif hasattr(element, 'fields'):
+ target_name = {f.name: self._as_mapping(definitions, cluster_name, f.data_type.name) for f in element.fields}
+ elif target_name:
+ target_name = target_name.lower()
+
+ return target_name
+
+ def _update_with_definition(self, container: dict, mapping_type):
+ if not container or not mapping_type:
+ return
+
+ for value in list(container['values']):
+ for key, item_value in list(value.items()):
+ mapping = mapping_type if self.is_attribute else mapping_type[value['name']]
+
+ if key == 'value':
+ value[key] = self._update_value_with_definition(item_value, mapping)
+ elif key == 'saveAs' and type(item_value) is str and item_value not in self._parsing_config_variable_storage:
+ self._parsing_config_variable_storage[item_value] = None
+ elif key == 'constraints':
+ for constraint, constraint_value in item_value.items():
+ value[key][constraint] = self._update_value_with_definition(
+ constraint_value, mapping_type)
+ else:
+ # This key, value pair does not rely on cluster specifications.
+ pass
+
+ def _update_value_with_definition(self, value, mapping_type):
+ if not mapping_type:
+ return value
+
+ if type(value) is dict:
+ rv = {}
+ for key in value:
+ # FabricIndex is a special case where the framework requires it to be passed even
+ # if it is not part of the requested arguments per spec and not part of the XML
+ # definition.
+ if key == 'FabricIndex' or key == 'fabricIndex':
+ rv[key] = value[key] # int64u
+ else:
+ mapping = mapping_type[key]
+ rv[key] = self._update_value_with_definition(value[key], mapping)
+ return rv
+ if type(value) is list:
+ return [self._update_value_with_definition(entry, mapping_type) for entry in value]
+ # TODO currently unsure if the check of `value not in config` is sufficant. For
+ # example let's say value = 'foo + 1' and map type is 'int64u', we would arguably do
+ # the wrong thing below.
+ if value is not None and value not in self._parsing_config_variable_storage:
+ if mapping_type == 'int64u' or mapping_type == 'int64s' or mapping_type == 'bitmap64' or mapping_type == 'epoch_us':
+ value = fixes.try_apply_yaml_cpp_longlong_limitation_fix(value)
+ value = fixes.try_apply_yaml_unrepresentable_integer_for_javascript_fixes(value)
+ elif mapping_type == 'single' or mapping_type == 'double':
+ value = fixes.try_apply_yaml_float_written_as_strings(value)
+ elif mapping_type == 'octet_string' or mapping_type == 'long_octet_string':
+ value = fixes.convert_yaml_octet_string_to_bytes(value)
+ elif mapping_type == 'boolean':
+ value = bool(value)
+
+ return value
+
+
+class TestStep:
+ '''A single YAML test action parsed from YAML.
+
+ This object contains all the information required for a test runner to execute the test step.
+ It also provide a function that is expected to be called by the test runner to post process
+ the recieved response from the accessory. Post processing both validates recieved response
+ and saves any variables that might be required but test step that have yet to be executed.
+ '''
+
+ def __init__(self, test: _TestStepWithPlaceholders, runtime_config_variable_storage: dict):
+ self._test = test
+ self._runtime_config_variable_storage = runtime_config_variable_storage
+ self.arguments = copy.deepcopy(test.arguments_with_placeholders)
+ self.response = copy.deepcopy(test.response_with_placeholders)
+ self._update_placeholder_values(self.arguments)
+ self._update_placeholder_values(self.response)
+
+ @property
+ def is_enabled(self):
+ return self._test.is_enabled
+
+ @property
+ def is_attribute(self):
+ return self._test.is_attribute
+
+ @property
+ def is_event(self):
+ return self._test.is_event
+
+ @property
+ def label(self):
+ return self._test.label
+
+ @property
+ def optional(self):
+ return self._test.optional
+
+ @property
+ def node_id(self):
+ return self._test.node_id
+
+ @property
+ def cluster(self):
+ return self._test.cluster
+
+ @property
+ def command(self):
+ return self._test.command
+
+ @property
+ def attribute(self):
+ return self._test.attribute
+
+ @property
+ def endpoint(self):
+ return self._test.endpoint
+
+ @property
+ def identity(self):
+ return self._test.identity
+
+ @property
+ def fabric_filtered(self):
+ return self._test.fabric_filtered
+
+ @property
+ def min_interval(self):
+ return self._test.min_interval
+
+ @property
+ def max_interval(self):
+ return self._test.max_interval
+
+ @property
+ def timed_interaction_timeout_ms(self):
+ return self._test.timed_interaction_timeout_ms
+
+ @property
+ def busy_wait_ms(self):
+ return self._test.busy_wait_ms
+
+ def post_process_response(self, response: dict):
+ result = PostProcessResponseResult()
+
+ if self._skip_post_processing(response, result):
+ return result
+
+ self._response_error_validation(response, result)
+ if self.response:
+ self._response_cluster_error_validation(response, result)
+ self._response_values_validation(response, result)
+ self._response_constraints_validation(response, result)
+ self._maybe_save_as(response, result)
+
+ return result
+
+ def _skip_post_processing(self, response: dict, result) -> bool:
+ '''Should we skip perform post processing.
+
+ Currently we only skip post processing if the test step indicates that sent test step
+ invokation was expected to be optionally supported. We confirm that it is optional
+ supported by either validating we got the expected error only then indicate that all
+ other post processing should be skipped.
+ '''
+ if not self.optional:
+ return False
+
+ received_error = response.get('error', None)
+ if received_error is None:
+ return False
+
+ if received_error == 'UNSUPPORTED_ATTRIBUTE' or received_error == 'UNSUPPORTED_COMMAND':
+ # result.warning(PostProcessCheckType.Optional, f'The response contains the error: "{error}".')
+ return True
+
+ return False
+
+ def _response_error_validation(self, response, result):
+ check_type = PostProcessCheckType.IM_STATUS
+ error_success = 'The test expects the "{error}" error which occured successfully.'
+ error_success_no_error = 'The test expects no error and no error occurred.'
+ error_wrong_error = 'The test expects the "{error}" error but the "{value}" error occured.'
+ error_unexpected_error = 'The test expects no error but the "{value}" error occured.'
+ error_unexpected_success = 'The test expects the "{error}" error but no error occured.'
+
+ expected_error = self.response.get('error') if self.response else None
+
+ # Handle generic success/error
+ if type(response) is str and response == 'failure':
+ received_error = response
+ elif type(response) is str and response == 'success':
+ received_error = None
+ else:
+ received_error = response.get('error')
+
+ if expected_error and received_error and expected_error == received_error:
+ result.success(check_type, error_success.format(error=expected_error))
+ elif expected_error and received_error:
+ result.error(check_type, error_wrong_error.format(
+ error=expected_error, value=received_error))
+ elif expected_error and not received_error:
+ result.error(check_type, error_unexpected_success.format(error=expected_error))
+ elif not expected_error and received_error:
+ result.error(check_type, error_unexpected_error.format(error=received_error))
+ elif not expected_error and not received_error:
+ result.success(check_type, error_success_no_error)
+ else:
+ # This should not happens
+ raise AssertionError('This should not happens.')
+
+ def _response_cluster_error_validation(self, response, result):
+ check_type = PostProcessCheckType.CLUSTER_STATUS
+ error_success = 'The test expects the "{error}" error which occured successfully.'
+ error_unexpected_success = 'The test expects the "{error}" error but no error occured.'
+ error_wrong_error = 'The test expects the "{error}" error but the "{value}" error occured.'
+
+ expected_error = self.response.get('clusterError')
+ received_error = response.get('clusterError')
+
+ if expected_error:
+ if received_error and expected_error == received_error:
+ result.success(check_type, error_success.format(error=expected_error))
+ elif received_error:
+ result.error(check_type, error_wrong_error.format(
+ error=expected_error, value=received_error))
+ else:
+ result.error(check_type, error_unexpected_success.format(error=expected_error))
+ else:
+ # Nothing is logged here to not be redundant with the generic error checking code.
+ pass
+
+ def _response_values_validation(self, response, result):
+ check_type = PostProcessCheckType.RESPONSE_VALIDATION
+ error_success = 'The test expectation "{name} == {value}" is true'
+ error_failure = 'The test expectation "{name} == {value}" is false'
+ error_name_does_not_exist = 'The test expects a value named "{name}" but it does not exists in the response."'
+
+ for value in self.response['values']:
+ if 'value' not in value:
+ continue
+
+ expected_name = 'value'
+ received_value = response.get('value')
+ if not self.is_attribute:
+ expected_name = value.get('name')
+ if expected_name not in received_value:
+ result.error(check_type, error_name_does_not_exist.format(name=expected_name))
+ continue
+
+ received_value = received_value.get(expected_name) if received_value else None
+
+ # TODO Supports Array/List. See an exemple of failure in TestArmFailSafe.yaml
+ expected_value = value.get('value')
+ if expected_value == received_value:
+ result.success(
+ check_type, error_success.format(name=expected_name, value=received_value))
+ else:
+ result.error(
+ check_type, error_failure.format(name=expected_name, value=expected_value))
+
+ def _response_constraints_validation(self, response, result):
+ check_type = PostProcessCheckType.CONSTRAINT_VALIDATION
+ error_success = 'Constraints check passed'
+ error_failure = 'Constraints check failed'
+ error_name_does_not_exist = 'The test expects a value named "{name}" but it does not exists in the response."'
+
+ for value in self.response['values']:
+ if 'constraints' not in value:
+ continue
+
+ expected_name = 'value'
+ received_value = response.get('value')
+ if not self.is_attribute:
+ expected_name = value.get('name')
+ if expected_name not in received_value:
+ result.error(check_type, error_name_does_not_exist.format(name=expected_name))
+ continue
+
+ received_value = received_value.get(expected_name) if received_value else None
+
+ constraints = get_constraints(value['constraints'])
+ if all([constraint.is_met(received_value) for constraint in constraints]):
+ result.success(check_type, error_success)
+ else:
+ # TODO would be helpful to be more verbose here
+ result.error(check_type, error_failure)
+
+ def _maybe_save_as(self, response, result):
+ check_type = PostProcessCheckType.SAVE_AS_VARIABLE
+ error_success = 'The test save the value "{value}" as {name}.'
+ error_name_does_not_exist = 'The test expects a value named "{name}" but it does not exists in the response."'
+
+ for value in self.response['values']:
+ if 'saveAs' not in value:
+ continue
+
+ expected_name = 'value'
+ received_value = response.get('value')
+ if not self.is_attribute:
+ expected_name = value.get('name')
+ if expected_name not in received_value:
+ result.error(check_type, error_name_does_not_exist.format(name=expected_name))
+ continue
+
+ received_value = received_value.get(expected_name) if received_value else None
+
+ save_as = value.get('saveAs')
+ self._runtime_config_variable_storage[save_as] = received_value
+ result.success(check_type, error_success.format(value=received_value, name=save_as))
+
+ def _update_placeholder_values(self, container):
+ if not container:
+ return
+
+ values = container['values']
+
+ for idx, item in enumerate(values):
+ if 'value' in item:
+ values[idx]['value'] = self._config_variable_substitution(item['value'])
+
+ if 'constraints' in item:
+ for constraint, constraint_value in item['constraints'].items():
+ values[idx]['constraints'][constraint] = self._config_variable_substitution(
+ constraint_value)
+
+ container['values'] = values
+
+ def _config_variable_substitution(self, value):
+ if type(value) is list:
+ return [self._config_variable_substitution(entry) for entry in value]
+ elif type(value) is dict:
+ mapped_value = {}
+ for key in value:
+ mapped_value[key] = self._config_variable_substitution(value[key])
+ return mapped_value
+ elif type(value) is str:
+ # For most tests, a single config variable is used and it can be replaced as in.
+ # But some other tests were relying on the fact that the expression was put 'as if' in
+ # the generated code and was resolved before being sent over the wire. For such
+ # expressions (e.g 'myVar + 1') we need to compute it before sending it over the wire.
+ tokens = value.split()
+ if len(tokens) == 0:
+ return value
+
+ substitution_occured = False
+ for idx, token in enumerate(tokens):
+ if token in self._runtime_config_variable_storage:
+ variable_info = self._runtime_config_variable_storage[token]
+ if type(variable_info) is dict and 'defaultValue' in variable_info:
+ variable_info = variable_info['defaultValue']
+ tokens[idx] = variable_info
+ substitution_occured = True
+
+ if len(tokens) == 1:
+ return tokens[0]
+
+ tokens = [str(token) for token in tokens]
+ value = ' '.join(tokens)
+ # TODO we should move away from eval. That will mean that we will need to do extra
+ # parsing, but it would be safer then just blindly running eval.
+ return value if not substitution_occured else eval(value)
+ else:
+ return value
+
+
+class YamlTests:
+ '''Parses YAML tests and becomes an iterator to provide 'TestStep's
+
+ The provided TestStep is expected to be used by a runner/adapter to run the test step and
+ provide the response from the device to the TestStep object.
+
+ Currently this is a one time use object. Eventually this should be refactored to take a
+ runner/adapter as an argument and run through all test steps and should be reusable for
+ multiple runs.
+ '''
+
+ def __init__(self, parsing_config_variable_storage: dict, definitions, tests: dict):
+ self._parsing_config_variable_storage = parsing_config_variable_storage
+ enabled_tests = []
+ for test in tests:
+ test_with_placeholders = _TestStepWithPlaceholders(
+ test, self._parsing_config_variable_storage, definitions)
+ if test_with_placeholders.is_enabled:
+ enabled_tests.append(test_with_placeholders)
+ fixes.try_update_yaml_node_id_test_runner_state(
+ enabled_tests, self._parsing_config_variable_storage)
+
+ self._runtime_config_variable_storage = copy.deepcopy(parsing_config_variable_storage)
+ self._tests = enabled_tests
+ self._index = 0
+ self.count = len(self._tests)
+
+ def __iter__(self):
+ return self
+
+ def __next__(self) -> TestStep:
+ if self._index < self.count:
+ test = self._tests[self._index]
+ test_step = TestStep(test, self._runtime_config_variable_storage)
+ self._index += 1
+ return test_step
+
+ raise StopIteration
+
+
+class TestParser:
+ def __init__(self, test_file, pics_file, definitions):
+ # TODO Needs supports for PICS file
+ with open(test_file) as f:
+ loader = yaml.FullLoader
+ loader = fixes.try_add_yaml_support_for_scientific_notation_without_dot(loader)
+
+ data = yaml.load(f, Loader=loader)
+ _check_valid_keys(data, _TESTS_SECTION)
+
+ self.name = _value_or_none(data, 'name')
+ self.PICS = _value_or_none(data, 'PICS')
+
+ self._parsing_config_variable_storage = _value_or_none(data, 'config')
+
+ tests = _value_or_none(data, 'tests')
+ self.tests = YamlTests(self._parsing_config_variable_storage, definitions, tests)
+
+ def update_config(self, key, value):
+ self._parsing_config_variable_storage[key] = value
diff --git a/scripts/tests/yamltests/setup.py b/scripts/tests/yamltests/setup.py
new file mode 100644
index 00000000000000..2bcdfbe8c61d46
--- /dev/null
+++ b/scripts/tests/yamltests/setup.py
@@ -0,0 +1,28 @@
+# Copyright (c) 2022 Project CHIP Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+"""The yamltest package."""
+
+import setuptools # type: ignore
+
+setuptools.setup(
+ name='yamltests',
+ version='0.0.1',
+ author='Project CHIP Authors',
+ description='Parse matter yaml test files',
+ packages=setuptools.find_packages(),
+ package_data={'yamltest': ['py.typed']},
+ zip_safe=False,
+)
diff --git a/scripts/tests/yamltests/test_spec_definitions.py b/scripts/tests/yamltests/test_spec_definitions.py
new file mode 100644
index 00000000000000..00a15da9940d0c
--- /dev/null
+++ b/scripts/tests/yamltests/test_spec_definitions.py
@@ -0,0 +1,291 @@
+#!/usr/bin/env -S python3 -B
+#
+# Copyright (c) 2022 Project CHIP Authors
+#
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from definitions import *
+
+import unittest
+import io
+
+source_cluster = '''
+
+
+ Test
+ 0x1234
+
+
+'''
+
+source_command = '''
+
+
+ Test
+ 0x1234
+
+
+
+
+
+'''
+
+source_response = '''
+
+
+ Test
+ 0x1234
+
+
+
+
+
+
+
+'''
+
+source_attribute = '''
+
+
+ TestGlobalAttribute
+
+
+
+ Test
+ 0x1234
+
+
+ TestAttribute
+
+
+
+'''
+
+source_event = '''
+
+
+ Test
+ 0x1234
+
+
+
+
+
+'''
+
+source_bitmap = '''
+
+
+
+
+
+
+
+
+
+
+
+
+ Test
+ 0x1234
+
+
+
+ TestWrong
+ 0x4321
+
+
+'''
+
+source_enum = '''
+
+
+
+
+
+
+
+
+
+
+
+
+ Test
+ 0x1234
+
+
+
+ TestWrong
+ 0x4321
+
+
+'''
+
+source_struct = '''
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Test
+ 0x1234
+
+
+
+ TestWrong
+ 0x4321
+
+
+'''
+
+
+class TestSpecDefinitions(unittest.TestCase):
+ def test_cluster_name(self):
+ definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_cluster), name='source_cluster')])
+ self.assertIsNone(definitions.get_cluster_name(0x4321))
+ self.assertEqual(definitions.get_cluster_name(0x1234), 'Test')
+
+ def test_command_name(self):
+ definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_command), name='source_command')])
+ self.assertIsNone(definitions.get_command_name(0x4321, 0x0))
+ self.assertIsNone(definitions.get_command_name(0x1234, 0x1))
+ self.assertEqual(definitions.get_command_name(0x1234, 0x0), 'TestCommand')
+
+ def test_response_name(self):
+ definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_response), name='source_response')])
+ self.assertIsNone(definitions.get_response_name(0x4321, 0x0))
+ self.assertIsNone(definitions.get_response_name(0x1234, 0x1))
+ self.assertEqual(definitions.get_response_name(0x1234, 0x0), 'TestCommandResponse')
+
+ def test_attribute_name(self):
+ definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_attribute), name='source_attribute')])
+ self.assertIsNone(definitions.get_attribute_name(0x4321, 0x0))
+ self.assertIsNone(definitions.get_attribute_name(0x4321, 0xFFFD))
+ self.assertIsNone(definitions.get_attribute_name(0x1234, 0x1))
+ self.assertEqual(definitions.get_attribute_name(0x1234, 0x0), 'TestAttribute')
+ self.assertEqual(definitions.get_attribute_name(0x1234, 0xFFFD), 'TestGlobalAttribute')
+
+ def test_event_name(self):
+ definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_event), name='source_event')])
+ self.assertIsNone(definitions.get_event_name(0x4321, 0x0))
+ self.assertIsNone(definitions.get_event_name(0x1234, 0x1))
+ self.assertEqual(definitions.get_event_name(0x1234, 0x0), 'TestEvent')
+
+ def test_get_command_by_name(self):
+ definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_command), name='source_command')])
+ self.assertIsNone(definitions.get_command_by_name('WrongName', 'TestCommand'))
+ self.assertIsNone(definitions.get_command_by_name('Test', 'TestWrongCommand'))
+ self.assertIsNone(definitions.get_response_by_name('Test', 'TestCommand'))
+ self.assertIsInstance(definitions.get_command_by_name('Test', 'TestCommand'), Command)
+ self.assertIsNone(definitions.get_command_by_name('test', 'TestCommand'))
+ self.assertIsInstance(definitions.get_command_by_name('Test', 'testcommand'), Command)
+
+ def test_get_response_by_name(self):
+ definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_response), name='source_response')])
+ self.assertIsNone(definitions.get_response_by_name('WrongName', 'TestCommandResponse'))
+ self.assertIsNone(definitions.get_response_by_name('Test', 'TestWrongCommandResponse'))
+ self.assertIsNone(definitions.get_command_by_name('Test', 'TestCommandResponse'))
+ self.assertIsInstance(definitions.get_response_by_name('Test', 'TestCommandResponse'), Struct)
+ self.assertIsNone(definitions.get_response_by_name('test', 'TestCommandResponse'))
+ self.assertIsInstance(definitions.get_response_by_name('Test', 'testcommandresponse'), Struct)
+
+ def test_get_attribute_by_name(self):
+ definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_attribute), name='source_attribute')])
+ self.assertIsNone(definitions.get_attribute_by_name('WrongName', 'TestAttribute'))
+ self.assertIsNone(definitions.get_attribute_by_name('WrongName', 'TestGlobalAttribute'))
+ self.assertIsNone(definitions.get_attribute_by_name('Test', 'TestWrongAttribute'))
+ self.assertIsInstance(definitions.get_attribute_by_name('Test', 'TestAttribute'), Attribute)
+ self.assertIsInstance(definitions.get_attribute_by_name('Test', 'TestGlobalAttribute'), Attribute)
+ self.assertIsNone(definitions.get_attribute_by_name('test', 'TestAttribute'))
+ self.assertIsNone(definitions.get_attribute_by_name('test', 'TestGlobalAttribute'))
+ self.assertIsInstance(definitions.get_attribute_by_name('Test', 'testattribute'), Attribute)
+ self.assertIsInstance(definitions.get_attribute_by_name('Test', 'testglobalattribute'), Attribute)
+
+ def test_get_event_by_name(self):
+ definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_event), name='source_event')])
+ self.assertIsNone(definitions.get_event_by_name('WrongName', 'TestEvent'))
+ self.assertIsNone(definitions.get_event_by_name('Test', 'TestWrongEvent'))
+ self.assertIsInstance(definitions.get_event_by_name('Test', 'TestEvent'), Event)
+ self.assertIsNone(definitions.get_event_by_name('test', 'TestEvent'))
+ self.assertIsInstance(definitions.get_event_by_name('Test', 'testevent'), Event)
+
+ def test_get_bitmap_by_name(self):
+ definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_bitmap), name='source_bitmap')])
+ self.assertIsNone(definitions.get_bitmap_by_name('WrongName', 'TestBitmap'))
+ self.assertIsNone(definitions.get_bitmap_by_name('Test', 'TestWrongBitmap'))
+ self.assertIsInstance(definitions.get_bitmap_by_name('Test', 'TestBitmap'), Bitmap)
+ self.assertIsNone(definitions.get_bitmap_by_name('test', 'TestBitmap'))
+ self.assertIsInstance(definitions.get_bitmap_by_name('Test', 'testbitmap'), Bitmap)
+
+ def test_get_enum_by_name(self):
+ definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_enum), name='source_enum')])
+ self.assertIsNone(definitions.get_enum_by_name('WrongName', 'TestEnum'))
+ self.assertIsNone(definitions.get_enum_by_name('Test', 'TestWrongEnum'))
+ self.assertIsInstance(definitions.get_enum_by_name('Test', 'TestEnum'), Enum)
+ self.assertIsNone(definitions.get_enum_by_name('test', 'TestEnum'))
+ self.assertIsInstance(definitions.get_enum_by_name('Test', 'testenum'), Enum)
+
+ def test_get_struct_by_name(self):
+ definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_struct), name='source_struct')])
+ self.assertIsNone(definitions.get_struct_by_name('WrongName', 'TestStruct'))
+ self.assertIsNone(definitions.get_struct_by_name('Test', 'TestWrongStruct'))
+ self.assertIsInstance(definitions.get_struct_by_name('Test', 'TestStruct'), Struct)
+ self.assertIsNone(definitions.get_struct_by_name('test', 'TestStruct'))
+ self.assertIsInstance(definitions.get_struct_by_name('Test', 'teststruct'), Struct)
+
+ def test_get_type_by_name(self):
+ definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_command), name='source_command')])
+ self.assertIsNone(definitions.get_type_by_name('Test', 'TestCommand'))
+
+ definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_response), name='source_response')])
+ self.assertIsInstance(definitions.get_type_by_name('Test', 'TestCommandResponse'), Struct)
+
+ definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_attribute), name='source_attribute')])
+ self.assertIsNone(definitions.get_type_by_name('Test', 'TestAttribute'))
+
+ definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_event), name='source_event')])
+ self.assertIsNone(definitions.get_type_by_name('Test', 'TestEvent'))
+
+ definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_bitmap), name='source_bitmap')])
+ self.assertIsInstance(definitions.get_type_by_name('Test', 'TestBitmap'), Bitmap)
+
+ definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_enum), name='source_enum')])
+ self.assertIsInstance(definitions.get_type_by_name('Test', 'TestEnum'), Enum)
+
+ definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_struct), name='source_struct')])
+ self.assertIsInstance(definitions.get_type_by_name('Test', 'TestStruct'), Struct)
+
+ def test_is_fabric_scoped(self):
+ definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_struct), name='source_struct')])
+
+ struct = definitions.get_struct_by_name('Test', 'TestStruct')
+ self.assertFalse(definitions.is_fabric_scoped(struct))
+
+ struct = definitions.get_struct_by_name('Test', 'TestStructFabricScoped')
+ self.assertTrue(definitions.is_fabric_scoped(struct))
+
+
+if __name__ == '__main__':
+ unittest.main()