Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Common YAML test parser (#24066)
Browse files Browse the repository at this point in the history
* Initial commit of 3 files from Vivien's branch

These 3 files came from commit ID
e391dad

* Adding a test commit containing a TODO

* Restyle

* Small tweaks plus adding unit test

* Moving towards PEP8 naming convension
* Refactor results to be less focused string matching
* Added very basic unit test file

Test run using:
`python3 scripts/tests/test_yaml_parser.py`

* Refactor constraints

* [Yaml Parser] Add scripts/tests/yamltests/SpecDefinitions.py with some unit tests

* [Yaml Parser] Use SpecDefinitions API instead of ClusterDefinitions

* [Yaml Parser] Uses convert_yaml_octet_string_to_bytes for octet_string

* [Yaml Parser / Constraints] Move the check for None before the type checking

* [Yaml Parser] Add _convert_single_value_to_values and make it sooner such that some code can be made simpler

* Small fixes

* [Yaml Parser] Moves _convert_single_value_to_values up one block such that update_with_definition can be simplified

* Move `__update_placeholder` to TestStep

Also TestStep have internal pointer to config dictionary

* Add docstring, and initial constraint parsing

* Refactor for TestStep into two parts

As well as stype formatting to get it ready for review

* Converge to PEP8 python format standard

* Add and clean up doc strings

* Attempt at creating a python package

* Make Restyle Happy

* Clean up yaml parser unit test to be self contained

* Fix test name

* Fix test name

* Address PR comments

Co-authored-by: Andrei Litvin <[email protected]>
Co-authored-by: Vivien Nicolas <[email protected]>
3 people authored and pull[bot] committed Jan 30, 2024
1 parent 04ba12e commit 1259320
Showing 9 changed files with 1,834 additions and 0 deletions.
95 changes: 95 additions & 0 deletions scripts/tests/test_yaml_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#
# Copyright (c) 2022 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# TODO Once yamltest is a proper self contained module we can move this file
# to a more appropriate spot. For now, having this file to do some quick checks
# is arguably better then no checks at all.

import io
import tempfile
import unittest

from yamltests.definitions import *
from yamltests.parser import TestParser

simple_test_description = '''<?xml version="1.0"?>
<configurator>
<struct name="TestStruct">
<cluster code="0x1234"/>
<item name="a" type="boolean"/>
</struct>
<cluster>
<name>Test</name>
<code>0x1234</code>
</cluster>
</configurator>
'''

simple_test_yaml = '''
name: Test Cluster Tests
config:
nodeId: 0x12344321
cluster: "Test"
endpoint: 1
tests:
- label: "Send Test Command"
command: "test"
- label: "Send Test Not Handled Command"
command: "testNotHandled"
response:
error: INVALID_COMMAND
- label: "Send Test Specific Command"
command: "testSpecific"
response:
values:
- name: "returnValue"
value: 7
'''


class TestYamlParser(unittest.TestCase):
def setUp(self):
self._definitions = SpecDefinitions(
[ParseSource(source=io.StringIO(simple_test_description), name='simple_test_description')])
self._temp_file = tempfile.NamedTemporaryFile(suffix='.yaml')
with open(self._temp_file.name, 'w') as f:
f.writelines(simple_test_yaml)
pics_file = None
self._yaml_parser = TestParser(self._temp_file.name, pics_file, self._definitions)

def test_able_to_iterate_over_all_parsed_tests(self):
# self._yaml_parser.tests implements `__next__`, which does value substitution. We are
# simply ensure there is no exceptions raise.
count = 0
for idx, test_step in enumerate(self._yaml_parser.tests):
count += 1
pass
self.assertEqual(count, 3)


def main():
unittest.main()


if __name__ == '__main__':
main()
39 changes: 39 additions & 0 deletions scripts/tests/yamltests/BUILD.gn
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Copyright (c) 2022 Project CHIP Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import("//build_overrides/build.gni")
import("//build_overrides/chip.gni")

import("//build_overrides/pigweed.gni")
import("$dir_pw_build/python.gni")

pw_python_package("yamltests") {
setup = [ "setup.py" ]

sources = [
"__init__.py",
"constraints.py",
"definitions.py",
"fixes.py",
"parser.py",
]

python_deps = [ "${chip_root}/scripts/idl" ]

tests = [ "test_spec_definitions.py" ]

# TODO: at a future time consider enabling all (* or missing) here to get
# pylint checking these files
static_analysis = []
}
Empty file.
357 changes: 357 additions & 0 deletions scripts/tests/yamltests/constraints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,357 @@
#
# Copyright (c) 2022 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from abc import ABC, abstractmethod
import string


class ConstraintParseError(Exception):
def __init__(self, message):
super().__init__(message)


class ConstraintValidationError(Exception):
def __init__(self, message):
super().__init__(message)


class BaseConstraint(ABC):
'''Constrain Interface'''

def __init__(self, types: list, is_null_allowed: bool = False):
'''An empty type list provided that indicates any type is accepted'''
self._types = types
self._is_null_allowed = is_null_allowed

def is_met(self, value):
if value is None:
return self._is_null_allowed

response_type = type(value)
if self._types and response_type not in self._types:
return False

return self.check_response(value)

@abstractmethod
def check_response(self, value) -> bool:
pass


class _ConstraintHasValue(BaseConstraint):
def __init__(self, has_value):
super().__init__(types=[])
self._has_value = has_value

def check_response(self, value) -> bool:
raise ConstraintValidationError('HasValue constraint currently not implemented')


class _ConstraintType(BaseConstraint):
def __init__(self, type):
super().__init__(types=[], is_null_allowed=True)
self._type = type

def check_response(self, value) -> bool:
success = False
if self._type == 'boolean' and type(value) is bool:
success = True
elif self._type == 'list' and type(value) is list:
success = True
elif self._type == 'char_string' and type(value) is str:
success = True
elif self._type == 'octet_string' and type(value) is bytes:
success = True
elif self._type == 'vendor_id' and type(value) is int:
success = value >= 0 and value <= 0xFFFF
elif self._type == 'device_type_id' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFF
elif self._type == 'cluster_id' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFF
elif self._type == 'attribute_id' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFF
elif self._type == 'field_id' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFF
elif self._type == 'command_id' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFF
elif self._type == 'event_id' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFF
elif self._type == 'action_id' and type(value) is int:
success = value >= 0 and value <= 0xFF
elif self._type == 'transaction_id' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFF
elif self._type == 'node_id' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFFFFFFFFFF
elif self._type == 'bitmap8' and type(value) is int:
success = value >= 0 and value <= 0xFF
elif self._type == 'bitmap16' and type(value) is int:
success = value >= 0 and value <= 0xFFFF
elif self._type == 'bitmap32' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFF
elif self._type == 'bitmap64' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFFFFFFFFFF
elif self._type == 'enum8' and type(value) is int:
success = value >= 0 and value <= 0xFF
elif self._type == 'enum16' and type(value) is int:
success = value >= 0 and value <= 0xFFFF
elif self._type == 'Percent' and type(value) is int:
success = value >= 0 and value <= 0xFF
elif self._type == 'Percent100ths' and type(value) is int:
success = value >= 0 and value <= 0xFFFF
elif self._type == 'epoch_us' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFFFFFFFFFF
elif self._type == 'epoch_s' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFF
elif self._type == 'utc' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFF
elif self._type == 'date' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFF
elif self._type == 'tod' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFF
elif self._type == 'int8u' and type(value) is int:
success = value >= 0 and value <= 0xFF
elif self._type == 'int16u' and type(value) is int:
success = value >= 0 and value <= 0xFFFF
elif self._type == 'int24u' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFF
elif self._type == 'int32u' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFF
elif self._type == 'int40u' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFFFF
elif self._type == 'int48u' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFFFFFF
elif self._type == 'int56u' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFFFFFFFF
elif self._type == 'int64u' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFFFFFFFFFF
elif self._type == 'nullable_int8u' and type(value) is int:
success = value >= 0 and value <= 0xFE
elif self._type == 'nullable_int16u' and type(value) is int:
success = value >= 0 and value <= 0xFFFE
elif self._type == 'nullable_int24u' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFE
elif self._type == 'nullable_int32u' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFE
elif self._type == 'nullable_int40u' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFFFE
elif self._type == 'nullable_int48u' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFFFFFE
elif self._type == 'nullable_int56u' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFFFFFFFE
elif self._type == 'nullable_int64u' and type(value) is int:
success = value >= 0 and value <= 0xFFFFFFFFFFFFFFFE
elif self._type == 'int8s' and type(value) is int:
success = value >= -128 and value <= 127
elif self._type == 'int16s' and type(value) is int:
success = value >= -32768 and value <= 32767
elif self._type == 'int24s' and type(value) is int:
success = value >= -8388608 and value <= 8388607
elif self._type == 'int32s' and type(value) is int:
success = value >= -2147483648 and value <= 2147483647
elif self._type == 'int40s' and type(value) is int:
success = value >= -549755813888 and value <= 549755813887
elif self._type == 'int48s' and type(value) is int:
success = value >= -140737488355328 and value <= 140737488355327
elif self._type == 'int56s' and type(value) is int:
success = value >= -36028797018963968 and value <= 36028797018963967
elif self._type == 'int64s' and type(value) is int:
success = value >= -9223372036854775808 and value <= 9223372036854775807
elif self._type == 'nullable_int8s' and type(value) is int:
success = value >= -127 and value <= 127
elif self._type == 'nullable_int16s' and type(value) is int:
success = value >= -32767 and value <= 32767
elif self._type == 'nullable_int24s' and type(value) is int:
success = value >= -8388607 and value <= 8388607
elif self._type == 'nullable_int32s' and type(value) is int:
success = value >= -2147483647 and value <= 2147483647
elif self._type == 'nullable_int40s' and type(value) is int:
success = value >= -549755813887 and value <= 549755813887
elif self._type == 'nullable_int48s' and type(value) is int:
success = value >= -140737488355327 and value <= 140737488355327
elif self._type == 'nullable_int56s' and type(value) is int:
success = value >= -36028797018963967 and value <= 36028797018963967
elif self._type == 'nullable_int64s' and type(value) is int:
success = value >= -9223372036854775807 and value <= 9223372036854775807
return success


class _ConstraintMinLength(BaseConstraint):
def __init__(self, min_length):
super().__init__(types=[str, bytes, list])
self._min_length = min_length

def check_response(self, value) -> bool:
return len(value) >= self._min_length


class _ConstraintMaxLength(BaseConstraint):
def __init__(self, max_length):
super().__init__(types=[str, bytes, list])
self._max_length = max_length

def check_response(self, value) -> bool:
return len(value) <= self._max_length


class _ConstraintIsHexString(BaseConstraint):
def __init__(self, is_hex_string: bool):
super().__init__(types=[str])
self._is_hex_string = is_hex_string

def check_response(self, value) -> bool:
return all(c in string.hexdigits for c in value) == self._is_hex_string


class _ConstraintStartsWith(BaseConstraint):
def __init__(self, starts_with):
super().__init__(types=[str])
self._starts_with = starts_with

def check_response(self, value) -> bool:
return value.startswith(self._starts_with)


class _ConstraintEndsWith(BaseConstraint):
def __init__(self, ends_with):
super().__init__(types=[str])
self._ends_with = ends_with

def check_response(self, value) -> bool:
return value.endswith(self._ends_with)


class _ConstraintIsUpperCase(BaseConstraint):
def __init__(self, is_upper_case):
super().__init__(types=[str])
self._is_upper_case = is_upper_case

def check_response(self, value) -> bool:
return value.isupper() == self._is_upper_case


class _ConstraintIsLowerCase(BaseConstraint):
def __init__(self, is_lower_case):
super().__init__(types=[str])
self._is_lower_case = is_lower_case

def check_response(self, value) -> bool:
return value.islower() == self._is_lower_case


class _ConstraintMinValue(BaseConstraint):
def __init__(self, min_value):
super().__init__(types=[int, float], is_null_allowed=True)
self._min_value = min_value

def check_response(self, value) -> bool:
return value >= self._min_value


class _ConstraintMaxValue(BaseConstraint):
def __init__(self, max_value):
super().__init__(types=[int, float], is_null_allowed=True)
self._max_value = max_value

def check_response(self, value) -> bool:
return value <= self._max_value


class _ConstraintContains(BaseConstraint):
def __init__(self, contains):
super().__init__(types=[list])
self._contains = contains

def check_response(self, value) -> bool:
return set(self._contains).issubset(value)


class _ConstraintExcludes(BaseConstraint):
def __init__(self, excludes):
super().__init__(types=[list])
self._excludes = excludes

def check_response(self, value) -> bool:
return set(self._excludes).isdisjoint(value)


class _ConstraintHasMaskSet(BaseConstraint):
def __init__(self, has_masks_set):
super().__init__(types=[int])
self._has_masks_set = has_masks_set

def check_response(self, value) -> bool:
return all([(value & mask) == mask for mask in self._has_masks_set])


class _ConstraintHasMaskClear(BaseConstraint):
def __init__(self, has_masks_clear):
super().__init__(types=[int])
self._has_masks_clear = has_masks_clear

def check_response(self, value) -> bool:
return all([(value & mask) == 0 for mask in self._has_masks_clear])


class _ConstraintNotValue(BaseConstraint):
def __init__(self, not_value):
super().__init__(types=[], is_null_allowed=True)
self._not_value = not_value

def check_response(self, value) -> bool:
return value != self._not_value


def get_constraints(constraints: dict) -> list[BaseConstraint]:
_constraints = []

for constraint, constraint_value in constraints.items():
if 'hasValue' == constraint:
_constraints.append(_ConstraintHasValue(constraint_value))
elif 'type' == constraint:
_constraints.append(_ConstraintType(constraint_value))
elif 'minLength' == constraint:
_constraints.append(_ConstraintMinLength(constraint_value))
elif 'maxLength' == constraint:
_constraints.append(_ConstraintMaxLength(constraint_value))
elif 'isHexString' == constraint:
_constraints.append(_ConstraintIsHexString(constraint_value))
elif 'startsWith' == constraint:
_constraints.append(_ConstraintStartsWith(constraint_value))
elif 'endsWith' == constraint:
_constraints.append(_ConstraintEndsWith(constraint_value))
elif 'isUpperCase' == constraint:
_constraints.append(_ConstraintIsUpperCase(constraint_value))
elif 'isLowerCase' == constraint:
_constraints.append(_ConstraintIsLowerCase(constraint_value))
elif 'minValue' == constraint:
_constraints.append(_ConstraintMinValue(constraint_value))
elif 'maxValue' == constraint:
_constraints.append(_ConstraintMaxValue(constraint_value))
elif 'contains' == constraint:
_constraints.append(_ConstraintContains(constraint_value))
elif 'excludes' == constraint:
_constraints.append(_ConstraintExcludes(constraint_value))
elif 'hasMasksSet' == constraint:
_constraints.append(_ConstraintHasMaskSet(constraint_value))
elif 'hasMasksClear' == constraint:
_constraints.append(_ConstraintHasMaskClear(constraint_value))
elif 'notValue' == constraint:
_constraints.append(_ConstraintNotValue(constraint_value))
else:
raise ConstraintParseError(f'Unknown constraint type:{constraint}')

return _constraints
200 changes: 200 additions & 0 deletions scripts/tests/yamltests/definitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
#
# Copyright (c) 2022 Project CHIP Authors
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List
import enum

try:
from idl.zapxml import ParseSource, ParseXmls
from idl.matter_idl_types import *
except:
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../..')))

from idl.zapxml import ParseSource, ParseXmls
from idl.matter_idl_types import *


class _ItemType(enum.Enum):
Cluster = 0
Request = 1
Response = 2
Attribute = 3
Event = 4
Bitmap = 5
Enum = 6
Struct = 7


class SpecDefinitions:

def __init__(self, sources: List[ParseSource]):
self.__clusters_by_id: dict[int, Cluster] = {}
self.__commands_by_id: dict[int, dict[int, Command]] = {}
self.__responses_by_id: dict[int, dict[int, Struct]] = {}
self.__attributes_by_id: dict[int, dict[int, Attribute]] = {}
self.__events_by_id: dict[int, dict[int, Event]] = {}

self.__clusters_by_name: dict[str, int] = {}
self.__commands_by_name: dict[str, int] = {}
self.__responses_by_name: dict[str, int] = {}
self.__attributes_by_name: dict[str, int] = {}
self.__events_by_name: dict[str, int] = {}

self.__bitmaps_by_name: dict[str, dict[str, Bitmap]] = {}
self.__enums_by_name: dict[str, dict[str, Enum]] = {}
self.__structs_by_name: dict[str, dict[str, Struct]] = {}

idl = ParseXmls(sources)

for cluster in idl.clusters:
code: int = cluster.code
name: str = cluster.name
self.__clusters_by_id[code] = cluster
self.__commands_by_id[code] = {c.code: c for c in cluster.commands}
self.__responses_by_id[code] = {}
self.__attributes_by_id[code] = {a.definition.code: a for a in cluster.attributes}
self.__events_by_id[code] = {e.code: e for e in cluster.events}

self.__clusters_by_name[name] = cluster.code
self.__commands_by_name[name] = {c.name.lower(): c.code for c in cluster.commands}
self.__responses_by_name[name] = {}
self.__attributes_by_name[name] = {a.definition.name.lower(): a.definition.code for a in cluster.attributes}
self.__events_by_name[name] = {e.name.lower(): e.code for e in cluster.events}

self.__bitmaps_by_name[name] = {b.name.lower(): b for b in cluster.bitmaps}
self.__enums_by_name[name] = {e.name.lower(): e for e in cluster.enums}
self.__structs_by_name[name] = {s.name.lower(): s for s in cluster.structs}

for struct in cluster.structs:
if struct.tag == StructTag.RESPONSE:
self.__responses_by_id[code][struct.code] = struct
self.__responses_by_name[name][struct.name.lower()] = struct.code

def get_cluster_name(self, cluster_id: int) -> str:
cluster = self.__clusters_by_id.get(cluster_id)
return cluster.name if cluster else None

def get_command_name(self, cluster_id: int, command_id: int) -> str:
command = self.__get_by_id(cluster_id, command_id, _ItemType.Request)
return command.name if command else None

def get_response_name(self, cluster_id: int, response_id: int) -> str:
response = self.__get_by_id(cluster_id, response_id, _ItemType.Response)
return response.name if response else None

def get_attribute_name(self, cluster_id: int, attribute_id: int) -> str:
attribute = self.__get_by_id(cluster_id, attribute_id, _ItemType.Attribute)
return attribute.definition.name if attribute else None

def get_event_name(self, cluster_id: int, event_id: int) -> str:
event = self.__get_by_id(cluster_id, event_id, _ItemType.Event)
return event.name if event else None

def get_command_by_name(self, cluster_name: str, command_name: str) -> Command:
return self.__get_by_name(cluster_name, command_name, _ItemType.Request)

def get_response_by_name(self, cluster_name: str, response_name: str) -> Struct:
return self.__get_by_name(cluster_name, response_name, _ItemType.Response)

def get_attribute_by_name(self, cluster_name: str, attribute_name: str) -> Attribute:
return self.__get_by_name(cluster_name, attribute_name, _ItemType.Attribute)

def get_event_by_name(self, cluster_name: str, event_name: str) -> Event:
return self.__get_by_name(cluster_name, event_name, _ItemType.Event)

def get_bitmap_by_name(self, cluster_name: str, bitmap_name: str) -> Bitmap:
return self.__get_by_name(cluster_name, bitmap_name, _ItemType.Bitmap)

def get_enum_by_name(self, cluster_name: str, enum_name: str) -> Bitmap:
return self.__get_by_name(cluster_name, enum_name, _ItemType.Enum)

def get_struct_by_name(self, cluster_name: str, struct_name: str) -> Struct:
return self.__get_by_name(cluster_name, struct_name, _ItemType.Struct)

def get_type_by_name(self, cluster_name: str, target_name: str):
bitmap = self.get_bitmap_by_name(cluster_name, target_name)
if bitmap:
return bitmap

enum = self.get_enum_by_name(cluster_name, target_name)
if enum:
return enum

struct = self.get_struct_by_name(cluster_name, target_name)
if struct:
return struct

return None

def is_fabric_scoped(self, target) -> bool:
if hasattr(target, 'qualities'):
return bool(target.qualities & StructQuality.FABRIC_SCOPED)
return False

def __get_by_name(self, cluster_name: str, target_name: str, target_type: _ItemType):
if not cluster_name or not target_name:
return None

# The idl parser remove spaces
cluster_name = cluster_name.replace(' ', '')
# Many YAML tests formats the name using camelCase despites that the spec mandates
# CamelCase. To be compatible with the current tests, everything is converted to lower case.
target_name = target_name.lower()

cluster_id = self.__clusters_by_name.get(cluster_name)
if cluster_id is None:
return None

target = None

if target_type == _ItemType.Request:
target_id = self.__commands_by_name.get(cluster_name).get(target_name)
target = self.__get_by_id(cluster_id, target_id, target_type)
elif target_type == _ItemType.Response:
target_id = self.__responses_by_name.get(cluster_name).get(target_name)
target = self.__get_by_id(cluster_id, target_id, target_type)
elif target_type == _ItemType.Event:
target_id = self.__events_by_name.get(cluster_name).get(target_name)
target = self.__get_by_id(cluster_id, target_id, target_type)
elif target_type == _ItemType.Attribute:
target_id = self.__attributes_by_name.get(cluster_name).get(target_name)
target = self.__get_by_id(cluster_id, target_id, target_type)
elif target_type == _ItemType.Bitmap:
target = self.__bitmaps_by_name.get(cluster_name).get(target_name)
elif target_type == _ItemType.Enum:
target = self.__enums_by_name.get(cluster_name).get(target_name)
elif target_type == _ItemType.Struct:
target = self.__structs_by_name.get(cluster_name).get(target_name)

return target

def __get_by_id(self, cluster_id: int, target_id: int, target_type: str):
targets = None

if target_type == _ItemType.Request:
targets = self.__commands_by_id.get(cluster_id)
elif target_type == _ItemType.Response:
targets = self.__responses_by_id.get(cluster_id)
elif target_type == _ItemType.Event:
targets = self.__events_by_id.get(cluster_id)
elif target_type == _ItemType.Attribute:
targets = self.__attributes_by_id.get(cluster_id)

if targets is None:
return None

return targets.get(target_id)
116 changes: 116 additions & 0 deletions scripts/tests/yamltests/fixes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
#
# Copyright (c) 2022 Project CHIP Authors
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import re
import binascii

'''Fixes certain value formats known to exist in YAML tests
Some of the YAML test files contains some values that has been crafted to avoid some limitations
of the original JavaScript parser and the C++ generated bits. This file contains functions to
changes and convert those YAML values them back to something agnostic.
'''


def try_apply_yaml_cpp_longlong_limitation_fix(value):
'''Fix a known nasty hack in order that was used to work around compiler issue for C++.
When -9223372036854775808 was provided compiler would give a warning saying it is not a valid
way to write a long long in C++.
'''
if value == "-9223372036854775807LL - 1":
value = -9223372036854775808
return value


def try_apply_yaml_unrepresentable_integer_for_javascript_fixes(value):
'''Fix up large integers that are represented within a string.
JavaScript can not represent integers bigger than 9007199254740991. But some of the test may
uses values that are bigger than this. The current way to workaround this limitation has
been to write those numbers as strings by encapsulating them in "".
'''
if type(value) is str:
value = int(value)
return value


def try_apply_yaml_float_written_as_strings(value):
'''Fix up floats that are represented within a string.'''
if type(value) is str:
value = float(value)
return value


# TODO(thampson) This method is a clone of the method in
# src/controller/python/chip/yaml/format_converter.py and should eventually be removed in that file.
def convert_yaml_octet_string_to_bytes(s: str) -> bytes:
'''Convert YAML octet string body to bytes.
This handles any c-style hex escapes (e.g. \x5a) and hex: prefix
'''
# Step 1: handle explicit "hex:" prefix
if s.startswith('hex:'):
return binascii.unhexlify(s[4:])

# Step 2: convert non-hex-prefixed to bytes
# TODO(#23669): This does not properly support utf8 octet strings. We mimic
# javascript codegen behavior. Behavior of javascript is:
# * Octet string character >= u+0200 errors out.
# * Any character greater than 0xFF has the upper bytes chopped off.
as_bytes = [ord(c) for c in s]

if any([value > 0x200 for value in as_bytes]):
raise ValueError('Unsupported char in octet string %r' % as_bytes)
accumulated_hex = ''.join([f"{(v & 0xFF):02x}" for v in as_bytes])
return binascii.unhexlify(accumulated_hex)


def try_add_yaml_support_for_scientific_notation_without_dot(loader):
regular_expression = re.compile(u'''^(?:
[-+]?(?:[0-9][0-9_]*)\\.[0-9_]*(?:[eE][-+]?[0-9]+)?
|[-+]?(?:[0-9][0-9_]*)(?:[eE][-+]?[0-9]+)
|\\.[0-9_]+(?:[eE][-+][0-9]+)?
|[-+]?[0-9][0-9_]*(?::[0-5]?[0-9])+\\.[0-9_]*
|[-+]?\\.(?:inf|Inf|INF)
|\\.(?:nan|NaN|NAN))$''', re.X)

loader.add_implicit_resolver(
u'tag:yaml.org,2002:float',
regular_expression,
list(u'-+0123456789.'))
return loader


# This is a gross hack. The previous runner has a some internal states where an identity match one
# accessory. But this state may not exist in the runner (as in it prevent to have multiple node ids
# associated to a fabric...) so the 'nodeId' needs to be added back manually.
def try_update_yaml_node_id_test_runner_state(tests, config):
identities = {'alpha': None if 'nodeId' not in config else config['nodeId']}

for test in tests:
if not test.is_enabled:
continue

identity = test.identity

if test.cluster == 'CommissionerCommands':
if test.command == 'PairWithCode':
for item in test.arguments['values']:
if item['name'] == 'nodeId':
identities[identity] = item['value']
elif identity is not None and identity in identities:
nodeId = identities[identity]
test.nodeId = nodeId
708 changes: 708 additions & 0 deletions scripts/tests/yamltests/parser.py

Large diffs are not rendered by default.

28 changes: 28 additions & 0 deletions scripts/tests/yamltests/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Copyright (c) 2022 Project CHIP Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""The yamltest package."""

import setuptools # type: ignore

setuptools.setup(
name='yamltests',
version='0.0.1',
author='Project CHIP Authors',
description='Parse matter yaml test files',
packages=setuptools.find_packages(),
package_data={'yamltest': ['py.typed']},
zip_safe=False,
)
291 changes: 291 additions & 0 deletions scripts/tests/yamltests/test_spec_definitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
#!/usr/bin/env -S python3 -B
#
# Copyright (c) 2022 Project CHIP Authors
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from definitions import *

import unittest
import io

source_cluster = '''<?xml version="1.0"?>
<configurator>
<cluster>
<name>Test</name>
<code>0x1234</code>
</cluster>
</configurator>
'''

source_command = '''<?xml version="1.0"?>
<configurator>
<cluster>
<name>Test</name>
<code>0x1234</code>
<command source="client" code="0x0" name="TestCommand"></command>
</cluster>
</configurator>
'''

source_response = '''<?xml version="1.0"?>
<configurator>
<cluster>
<name>Test</name>
<code>0x1234</code>
<command source="server" code="0x0" name="TestCommandResponse">
<arg name="arg1" type="int8u"/>
</command>
</cluster>
</configurator>
'''

source_attribute = '''<?xml version="1.0"?>
<configurator>
<global>
<attribute side="server" code="0xFFFD" type="boolean">TestGlobalAttribute</attribute>
</global>
<cluster>
<name>Test</name>
<code>0x1234</code>
<globalAttribute side="server" code="0xFFFD" value="true"/>
<attribute code="0x0" type="boolean">TestAttribute</attribute>
</cluster>
</configurator>
'''

source_event = '''<?xml version="1.0"?>
<configurator>
<cluster>
<name>Test</name>
<code>0x1234</code>
<event code="0x0" name="TestEvent" priority="info" side="server"></event>
</cluster>
</configurator>
'''

source_bitmap = '''<?xml version="1.0"?>
<configurator>
<bitmap name="TestBitmap" type="bitmap8">
<cluster code="0x1234"/>
<field name="a" mask="0x1"/>
</bitmap>
<bitmap name="TestWrongBitmap" type="bitmap8">
<cluster code="0x4321"/>
<field name="a" mask="0x1"/>
</bitmap>
<cluster>
<name>Test</name>
<code>0x1234</code>
</cluster>
<cluster>
<name>TestWrong</name>
<code>0x4321</code>
</cluster>
</configurator>
'''

source_enum = '''<?xml version="1.0"?>
<configurator>
<enum name="TestEnum" type="enum8">
<cluster code="0x1234"/>
<item name="a" value="0x00"/>
</enum>
<enum name="TestWrongEnum" type="enum8">
<cluster code="0x4321"/>
<item name="a" value="0x00"/>
</enum>
<cluster>
<name>Test</name>
<code>0x1234</code>
</cluster>
<cluster>
<name>TestWrong</name>
<code>0x4321</code>
</cluster>
</configurator>
'''

source_struct = '''<?xml version="1.0"?>
<configurator>
<struct name="TestStruct">
<cluster code="0x1234"/>
<item name="a" type="boolean"/>
</struct>
<struct name="TestStructFabricScoped" isFabricScoped="true">
<cluster code="0x1234"/>
<item name="a" type="boolean"/>
</struct>
<struct name="TestWrongStruct">
<cluster code="0x4321"/>
<item name="a" type="boolean"/>
</struct>
<cluster>
<name>Test</name>
<code>0x1234</code>
</cluster>
<cluster>
<name>TestWrong</name>
<code>0x4321</code>
</cluster>
</configurator>
'''


class TestSpecDefinitions(unittest.TestCase):
def test_cluster_name(self):
definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_cluster), name='source_cluster')])
self.assertIsNone(definitions.get_cluster_name(0x4321))
self.assertEqual(definitions.get_cluster_name(0x1234), 'Test')

def test_command_name(self):
definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_command), name='source_command')])
self.assertIsNone(definitions.get_command_name(0x4321, 0x0))
self.assertIsNone(definitions.get_command_name(0x1234, 0x1))
self.assertEqual(definitions.get_command_name(0x1234, 0x0), 'TestCommand')

def test_response_name(self):
definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_response), name='source_response')])
self.assertIsNone(definitions.get_response_name(0x4321, 0x0))
self.assertIsNone(definitions.get_response_name(0x1234, 0x1))
self.assertEqual(definitions.get_response_name(0x1234, 0x0), 'TestCommandResponse')

def test_attribute_name(self):
definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_attribute), name='source_attribute')])
self.assertIsNone(definitions.get_attribute_name(0x4321, 0x0))
self.assertIsNone(definitions.get_attribute_name(0x4321, 0xFFFD))
self.assertIsNone(definitions.get_attribute_name(0x1234, 0x1))
self.assertEqual(definitions.get_attribute_name(0x1234, 0x0), 'TestAttribute')
self.assertEqual(definitions.get_attribute_name(0x1234, 0xFFFD), 'TestGlobalAttribute')

def test_event_name(self):
definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_event), name='source_event')])
self.assertIsNone(definitions.get_event_name(0x4321, 0x0))
self.assertIsNone(definitions.get_event_name(0x1234, 0x1))
self.assertEqual(definitions.get_event_name(0x1234, 0x0), 'TestEvent')

def test_get_command_by_name(self):
definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_command), name='source_command')])
self.assertIsNone(definitions.get_command_by_name('WrongName', 'TestCommand'))
self.assertIsNone(definitions.get_command_by_name('Test', 'TestWrongCommand'))
self.assertIsNone(definitions.get_response_by_name('Test', 'TestCommand'))
self.assertIsInstance(definitions.get_command_by_name('Test', 'TestCommand'), Command)
self.assertIsNone(definitions.get_command_by_name('test', 'TestCommand'))
self.assertIsInstance(definitions.get_command_by_name('Test', 'testcommand'), Command)

def test_get_response_by_name(self):
definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_response), name='source_response')])
self.assertIsNone(definitions.get_response_by_name('WrongName', 'TestCommandResponse'))
self.assertIsNone(definitions.get_response_by_name('Test', 'TestWrongCommandResponse'))
self.assertIsNone(definitions.get_command_by_name('Test', 'TestCommandResponse'))
self.assertIsInstance(definitions.get_response_by_name('Test', 'TestCommandResponse'), Struct)
self.assertIsNone(definitions.get_response_by_name('test', 'TestCommandResponse'))
self.assertIsInstance(definitions.get_response_by_name('Test', 'testcommandresponse'), Struct)

def test_get_attribute_by_name(self):
definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_attribute), name='source_attribute')])
self.assertIsNone(definitions.get_attribute_by_name('WrongName', 'TestAttribute'))
self.assertIsNone(definitions.get_attribute_by_name('WrongName', 'TestGlobalAttribute'))
self.assertIsNone(definitions.get_attribute_by_name('Test', 'TestWrongAttribute'))
self.assertIsInstance(definitions.get_attribute_by_name('Test', 'TestAttribute'), Attribute)
self.assertIsInstance(definitions.get_attribute_by_name('Test', 'TestGlobalAttribute'), Attribute)
self.assertIsNone(definitions.get_attribute_by_name('test', 'TestAttribute'))
self.assertIsNone(definitions.get_attribute_by_name('test', 'TestGlobalAttribute'))
self.assertIsInstance(definitions.get_attribute_by_name('Test', 'testattribute'), Attribute)
self.assertIsInstance(definitions.get_attribute_by_name('Test', 'testglobalattribute'), Attribute)

def test_get_event_by_name(self):
definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_event), name='source_event')])
self.assertIsNone(definitions.get_event_by_name('WrongName', 'TestEvent'))
self.assertIsNone(definitions.get_event_by_name('Test', 'TestWrongEvent'))
self.assertIsInstance(definitions.get_event_by_name('Test', 'TestEvent'), Event)
self.assertIsNone(definitions.get_event_by_name('test', 'TestEvent'))
self.assertIsInstance(definitions.get_event_by_name('Test', 'testevent'), Event)

def test_get_bitmap_by_name(self):
definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_bitmap), name='source_bitmap')])
self.assertIsNone(definitions.get_bitmap_by_name('WrongName', 'TestBitmap'))
self.assertIsNone(definitions.get_bitmap_by_name('Test', 'TestWrongBitmap'))
self.assertIsInstance(definitions.get_bitmap_by_name('Test', 'TestBitmap'), Bitmap)
self.assertIsNone(definitions.get_bitmap_by_name('test', 'TestBitmap'))
self.assertIsInstance(definitions.get_bitmap_by_name('Test', 'testbitmap'), Bitmap)

def test_get_enum_by_name(self):
definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_enum), name='source_enum')])
self.assertIsNone(definitions.get_enum_by_name('WrongName', 'TestEnum'))
self.assertIsNone(definitions.get_enum_by_name('Test', 'TestWrongEnum'))
self.assertIsInstance(definitions.get_enum_by_name('Test', 'TestEnum'), Enum)
self.assertIsNone(definitions.get_enum_by_name('test', 'TestEnum'))
self.assertIsInstance(definitions.get_enum_by_name('Test', 'testenum'), Enum)

def test_get_struct_by_name(self):
definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_struct), name='source_struct')])
self.assertIsNone(definitions.get_struct_by_name('WrongName', 'TestStruct'))
self.assertIsNone(definitions.get_struct_by_name('Test', 'TestWrongStruct'))
self.assertIsInstance(definitions.get_struct_by_name('Test', 'TestStruct'), Struct)
self.assertIsNone(definitions.get_struct_by_name('test', 'TestStruct'))
self.assertIsInstance(definitions.get_struct_by_name('Test', 'teststruct'), Struct)

def test_get_type_by_name(self):
definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_command), name='source_command')])
self.assertIsNone(definitions.get_type_by_name('Test', 'TestCommand'))

definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_response), name='source_response')])
self.assertIsInstance(definitions.get_type_by_name('Test', 'TestCommandResponse'), Struct)

definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_attribute), name='source_attribute')])
self.assertIsNone(definitions.get_type_by_name('Test', 'TestAttribute'))

definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_event), name='source_event')])
self.assertIsNone(definitions.get_type_by_name('Test', 'TestEvent'))

definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_bitmap), name='source_bitmap')])
self.assertIsInstance(definitions.get_type_by_name('Test', 'TestBitmap'), Bitmap)

definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_enum), name='source_enum')])
self.assertIsInstance(definitions.get_type_by_name('Test', 'TestEnum'), Enum)

definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_struct), name='source_struct')])
self.assertIsInstance(definitions.get_type_by_name('Test', 'TestStruct'), Struct)

def test_is_fabric_scoped(self):
definitions = SpecDefinitions([ParseSource(source=io.StringIO(source_struct), name='source_struct')])

struct = definitions.get_struct_by_name('Test', 'TestStruct')
self.assertFalse(definitions.is_fabric_scoped(struct))

struct = definitions.get_struct_by_name('Test', 'TestStructFabricScoped')
self.assertTrue(definitions.is_fabric_scoped(struct))


if __name__ == '__main__':
unittest.main()

0 comments on commit 1259320

Please sign in to comment.