From 5b4f7ef4621355dd899dc567abea6d131dee86cf Mon Sep 17 00:00:00 2001 From: Vivien Nicolas Date: Wed, 15 Feb 2023 18:21:58 +0100 Subject: [PATCH] [matter_yamltests] Add the reason of the constraint failure to matter_yamltests/constraints.py --- .../matter_yamltests/constraints.py | 542 ++++++++++++++++-- .../matter_yamltests/errors.py | 25 +- .../matter_yamltests/parser.py | 36 +- 3 files changed, 535 insertions(+), 68 deletions(-) diff --git a/scripts/py_matter_yamltests/matter_yamltests/constraints.py b/scripts/py_matter_yamltests/matter_yamltests/constraints.py index b186fb1bef6282..3d6d9a20e57103 100644 --- a/scripts/py_matter_yamltests/matter_yamltests/constraints.py +++ b/scripts/py_matter_yamltests/matter_yamltests/constraints.py @@ -15,60 +15,217 @@ # limitations under the License. # +import re import string from abc import ABC, abstractmethod +from .errors import TestStepError + class ConstraintParseError(Exception): def __init__(self, message): super().__init__(message) +class ConstraintCheckError(TestStepError): + def __init__(self, context, key, reason): + super().__init__(reason) + self.untag_keys_with_error(context) + self.tag_key_with_error(context, key) + + +class ConstraintTypeError(ConstraintCheckError): + def __init__(self, context, reason): + super().__init__(context, 'type', reason) + + +class ConstraintContainsError(ConstraintCheckError): + def __init__(self, context, reason): + super().__init__(context, 'contains', reason) + + +class ConstraintExcludesError(ConstraintCheckError): + def __init__(self, context, reason): + super().__init__(context, 'excludes', reason) + + +class ConstraintHasMaskClearError(ConstraintCheckError): + def __init__(self, context, reason): + super().__init__(context, 'hasMasksClear', reason) + + +class ConstraintHasMaskSetError(ConstraintCheckError): + def __init__(self, context, reason): + super().__init__(context, 'hasMasksSet', reason) + + +class ConstraintError(ConstraintCheckError): + def __init__(self, context, reason): + super().__init__(context, '', reason) + + +class ConstraintHasValueError(ConstraintCheckError): + def __init__(self, context, reason): + super().__init__(context, 'hasValue', reason) + + +class ConstraintMinLengthError(ConstraintCheckError): + def __init__(self, context, reason): + super().__init__(context, 'minLength', reason) + + +class ConstraintMaxLengthError(ConstraintCheckError): + def __init__(self, context, reason): + super().__init__(context, 'maxLength', reason) + + +class ConstraintIsHexStringError(ConstraintCheckError): + def __init__(self, context, reason): + super().__init__(context, 'isHexString', reason) + + +class ConstraintStartsWithError(ConstraintCheckError): + def __init__(self, context, reason): + super().__init__(context, 'startsWith', reason) + + +class ConstraintEndsWithError(ConstraintCheckError): + def __init__(self, context, reason): + super().__init__(context, 'endsWith', reason) + + +class ConstraintIsUpperCaseError(ConstraintCheckError): + def __init__(self, context, reason): + super().__init__(context, 'isUpperCase', reason) + + +class ConstraintIsLowerCaseError(ConstraintCheckError): + def __init__(self, context, reason): + super().__init__(context, 'isLowerCase', reason) + + +class ConstraintMinValueError(ConstraintCheckError): + def __init__(self, context, reason): + super().__init__(context, 'minValue', reason) + + +class ConstraintMaxValueError(ConstraintCheckError): + def __init__(self, context, reason): + super().__init__(context, 'maxValue', reason) + + +class ConstraintNotValueError(ConstraintCheckError): + def __init__(self, context, reason): + super().__init__(context, 'notValue', reason) + + class BaseConstraint(ABC): '''Constraint Interface''' - def __init__(self, types: list, is_null_allowed: bool = False): + def __init__(self, context, types: list, is_null_allowed: bool = False): '''An empty type list provided that indicates any type is accepted''' self._types = types self._is_null_allowed = is_null_allowed + self._context = context def is_met(self, value, value_type_name): - if value is None: - return self._is_null_allowed + if value is None and self._is_null_allowed: + return True response_type = type(value) if self._types: found_type_match = any( [issubclass(response_type, expected) for expected in self._types]) if not found_type_match: - return False + if len(self._types) == 1: + expected_str = f'type "{self._types[0].__name__}"' + else: + expected_str = f'one of those types: {[x.__name__ for x in self._types]}' + reason = f'This constraint can only be used with a value of {expected_str} but the value is of type "{response_type.__name__}".' + self._raise_error(reason) - return self.check_response(value, value_type_name) + if self.check_response(value, value_type_name): + return True + + reason = self.get_reason(value, value_type_name) + self._raise_error(reason) @abstractmethod def check_response(self, value, value_type_name) -> bool: pass + @abstractmethod + def get_reason(self, value, value_type_name) -> str: + """Get the a human readable explanation about the failure.""" + pass + + def _raise_error(self, reason): + if isinstance(self, _ConstraintType): + raise ConstraintTypeError(self._context, reason) + elif isinstance(self, _ConstraintContains): + raise ConstraintContainsError(self._context, reason) + elif isinstance(self, _ConstraintExcludes): + raise ConstraintExcludesError(self._context, reason) + elif isinstance(self, _ConstraintHasMaskClear): + raise ConstraintHasMaskClearError(self._context, reason) + elif isinstance(self, _ConstraintHasMaskSet): + raise ConstraintHasMaskSetError(self._context, reason) + elif isinstance(self, _ConstraintMinLength): + raise ConstraintMinLengthError(self._context, reason) + elif isinstance(self, _ConstraintMaxLength): + raise ConstraintMaxLengthError(self._context, reason) + elif isinstance(self, _ConstraintIsHexString): + raise ConstraintIsHexStringError(self._context, reason) + elif isinstance(self, _ConstraintStartsWith): + raise ConstraintStartsWithError(self._context, reason) + elif isinstance(self, _ConstraintEndsWith): + raise ConstraintEndsWithError(self._context, reason) + elif isinstance(self, _ConstraintIsUpperCase): + raise ConstraintIsUpperCaseError(self._context, reason) + elif isinstance(self, _ConstraintIsLowerCase): + raise ConstraintIsLowerCaseError(self._context, reason) + elif isinstance(self, _ConstraintMinValue): + raise ConstraintMinValueError(self._context, reason) + elif isinstance(self, _ConstraintMaxValue): + raise ConstraintMaxValueError(self._context, reason) + elif isinstance(self, _ConstraintNotValue): + raise ConstraintNotValueError(self._context, reason) + else: + # This should not happens. + raise ConstraintParseError(f'Unknown constraint instance.') + class _ConstraintHasValue(BaseConstraint): - def __init__(self, has_value): - super().__init__(types=[]) + def __init__(self, context, has_value): + super().__init__(context, types=[]) self._has_value = has_value def is_met(self, value, value_type_name): # We are overriding the BaseConstraint of is_met since has value is a special case where # we might not be expecting a value at all, but the basic null check in BaseConstraint # is not what we want. - return self.check_response(value, value_type_name) + if self.check_response(value, value_type_name): + return True + + reason = self.get_reason(value, value_type_name) + raise ConstraintHasValueError(self._context, reason) def check_response(self, value, value_type_name) -> bool: has_value = value is not None return self._has_value == has_value + def get_reason(self, value, value_type_name) -> str: + if self._has_value: + reason = f"The constraint expects a value but there isn't one." + else: + reason = f'The response contains the value "{value}".' + + return reason + class _ConstraintType(BaseConstraint): - def __init__(self, type): - super().__init__(types=[], is_null_allowed=True) + def __init__(self, context, type): + super().__init__(context, types=[], is_null_allowed=True) self._type = type def check_response(self, value, value_type_name) -> bool: @@ -85,10 +242,14 @@ def check_response(self, value, value_type_name) -> bool: success = True elif self._type == 'long_octet_string' and type(value) is bytes: success = True + elif self._type == 'group_id' and type(value) is int: + success = value >= 0 and value <= 0xFFFF elif self._type == 'vendor_id' and type(value) is int: success = value >= 0 and value <= 0xFFFF elif self._type == 'device_type_id' and type(value) is int: success = value >= 0 and value <= 0xFFFFFFFF + elif self._type == 'nullable_cluster_id' and type(value) is int: + success = value >= 0 and value <= 0xFFFFFFFE elif self._type == 'cluster_id' and type(value) is int: success = value >= 0 and value <= 0xFFFFFFFF elif self._type == 'attribute_id' and type(value) is int: @@ -103,6 +264,8 @@ def check_response(self, value, value_type_name) -> bool: success = value >= 0 and value <= 0xFF elif self._type == 'transaction_id' and type(value) is int: success = value >= 0 and value <= 0xFFFFFFFF + elif self._type == 'nullable_node_id' and type(value) is int: + success = value >= 0 and value <= 0xFFFFFFFFFFFFFFFE elif self._type == 'node_id' and type(value) is int: success = value >= 0 and value <= 0xFFFFFFFFFFFFFFFF elif self._type == 'bitmap8' and type(value) is int: @@ -199,169 +362,440 @@ def check_response(self, value, value_type_name) -> bool: success = self._type == value_type_name return success + def get_reason(self, value, value_type_name) -> str: + types = [] + + if type(value) is bool: + types.append('boolean') + elif type(value) is list: + types.append('list') + elif type(value) is str: + types.append('char_string') + types.append('long_char_string') + elif type(value) is bytes: + types.append('octet_string') + types.append('long_octet_string') + elif type(value) is int: + if value >= 0 and value <= 0xFE: + types.append('nullable_int8u') + + if value >= 0 and value <= 0xFF: + types.append('action_id') + types.append('bitmap8') + types.append('enum8') + types.append('Percent') + types.append('int8u') + + if value >= 0 and value <= 0xFFFE: + types.append('nullable_int16u') + + if value >= 0 and value <= 0xFFFF: + types.append('vendor_id') + types.append('group_id') + types.append('bitmap16') + types.append('enum16') + types.append('Percent100ths') + types.append('int16u') + + if value >= 0 and value <= 0xFFFFFE: + types.append('nullable_int24u') + + if value >= 0 and value <= 0xFFFFFF: + types.append('int24u') + + if value >= 0 and value <= 0xFFFFFFFE: + types.append('nullable_int32u') + types.append('nullable_cluster_id') + + if value >= 0 and value <= 0xFFFFFFFF: + types.append('device_type_id') + types.append('cluster_id') + types.append('attribute_id') + types.append('field_id') + types.append('command_id') + types.append('event_id') + types.append('transaction_id') + types.append('bitmap32') + types.append('epoch_s') + types.append('utc') + types.append('date') + types.append('tod') + types.append('int32u') + + if value >= 0 and value <= 0xFFFFFFFFFE: + types.append('nullable_int40u') + + if value >= 0 and value <= 0xFFFFFFFFFF: + types.append('int40u') + + if value >= 0 and value <= 0xFFFFFFFFFFFE: + types.append('nullable_int48u') + + if value >= 0 and value <= 0xFFFFFFFFFFFF: + types.append('int48u') + + if value >= 0 and value <= 0xFFFFFFFFFFFFFE: + types.append('nullable_int56u') + + if value >= 0 and value <= 0xFFFFFFFFFFFFFF: + types.append('int56u') + + if value >= 0 and value <= 0xFFFFFFFFFFFFFFFE: + types.append('nullable_int64u') + types.append('nullable_node_id') + + if value >= 0 and value <= 0xFFFFFFFFFFFFFFFF: + types.append('node_id') + types.append('bitmap64') + types.append('epoch_us') + types.append('int64u') + + if value >= -128 and value <= 127: + types.append('int8s') + + if value >= -32768 and value <= 32767: + types.append('int16s') + + if value >= -8388608 and value <= 8388607: + types.append('int24s') + + if value >= -2147483648 and value <= 2147483647: + types.append('int32s') + + if value >= -549755813888 and value <= 549755813887: + types.append('int40s') + + if value >= -140737488355328 and value <= 140737488355327: + types.append('int48s') + + if value >= -36028797018963968 and value <= 36028797018963967: + types.append('int56s') + + if value >= -9223372036854775808 and value <= 9223372036854775807: + types.append('int64s') + + if value >= -127 and value <= 127: + types.append('nullable_int8s') + + if value >= -32767 and value <= 32767: + types.append('nullable_int16s') + + if value >= -8388607 and value <= 8388607: + types.append('nullable_int24s') + + if value >= -2147483647 and value <= 2147483647: + types.append('nullable_int32s') + + if value >= -549755813887 and value <= 549755813887: + types.append('nullable_int40s') + + if value >= -140737488355327 and value <= 140737488355327: + types.append('nullable_int48s') + + if value >= -36028797018963967 and value <= 36028797018963967: + types.append('nullable_int56s') + + if value >= -9223372036854775807 and value <= 9223372036854775807: + types.append('nullable_int64s') + + types.sort(key=lambda input_type: [int(c) if c.isdigit( + ) else c for c in re.split('([0-9]+)', input_type)]) + + if value_type_name not in types: + types.append(value_type_name) + + if len(types) == 1: + reason = f'The response type {types[0]}) does not match the constraint.' + else: + reason = f'The response value ({value}) is of one of those types: {types}.' + return reason + class _ConstraintMinLength(BaseConstraint): - def __init__(self, min_length): - super().__init__(types=[str, bytes, list]) + def __init__(self, context, min_length): + super().__init__(context, types=[str, bytes, list]) self._min_length = min_length def check_response(self, value, value_type_name) -> bool: return len(value) >= self._min_length + def get_reason(self, value, value_type_name) -> str: + return f'The response length ({len(value)}) should be greater or equal to the constraint but {len(value)} < {self._min_length}.' + class _ConstraintMaxLength(BaseConstraint): - def __init__(self, max_length): - super().__init__(types=[str, bytes, list]) + def __init__(self, context, max_length): + super().__init__(context, types=[str, bytes, list]) self._max_length = max_length def check_response(self, value, value_type_name) -> bool: return len(value) <= self._max_length + def get_reason(self, value, value_type_name) -> str: + return f'The response length ({len(value)}) should be lower or equal to the constraint but {len(value)} > {self._max_length}.' + class _ConstraintIsHexString(BaseConstraint): - def __init__(self, is_hex_string: bool): - super().__init__(types=[str]) + def __init__(self, context, is_hex_string: bool): + super().__init__(context, types=[str]) self._is_hex_string = is_hex_string def check_response(self, value, value_type_name) -> bool: return all(c in string.hexdigits for c in value) == self._is_hex_string + def get_reason(self, value, value_type_name) -> str: + if self._is_hex_string: + chars = [] + + for char in value: + if not char in string.hexdigits: + chars.append(char) + + if len(chars) == 1: + reason = f'The response "{value}" contains an invalid hexadecimal character: "{chars[0]}".' + else: + reason = f'The response "{value}" contains invalid hexadecimal characters: {chars}.' + else: + reason = f'The response "{value}" is an hexadecimal string.' + return reason + class _ConstraintStartsWith(BaseConstraint): - def __init__(self, starts_with): - super().__init__(types=[str]) + def __init__(self, context, starts_with): + super().__init__(context, types=[str]) self._starts_with = starts_with def check_response(self, value, value_type_name) -> bool: return value.startswith(self._starts_with) + def get_reason(self, value, value_type_name) -> str: + return f'The response "{value}" starts with "{value[:len(self._starts_with)]}" which does not match the constraint.' + class _ConstraintEndsWith(BaseConstraint): - def __init__(self, ends_with): - super().__init__(types=[str]) + def __init__(self, context, ends_with): + super().__init__(context, types=[str]) self._ends_with = ends_with def check_response(self, value, value_type_name) -> bool: return value.endswith(self._ends_with) + def get_reason(self, value, value_type_name) -> str: + return f'The response "{value}" ends with "{value[-len(self._ends_with):]}" which does not match the constraint.' + class _ConstraintIsUpperCase(BaseConstraint): - def __init__(self, is_upper_case): - super().__init__(types=[str]) + def __init__(self, context, is_upper_case): + super().__init__(context, types=[str]) self._is_upper_case = is_upper_case def check_response(self, value, value_type_name) -> bool: return value.isupper() == self._is_upper_case + def get_reason(self, value, value_type_name) -> str: + if self._is_upper_case: + chars = [] + + for char in value: + if not char.upper() == char: + chars.append(char) + + if len(chars) == 1: + reason = f'The response "{value}" contains a lowercase character: "{chars[0]}".' + else: + reason = f'The response "{value}" contains lowercase characters: {chars}.' + else: + reason = f'The response "{value}" is uppercased.' + + return reason + class _ConstraintIsLowerCase(BaseConstraint): - def __init__(self, is_lower_case): - super().__init__(types=[str]) + def __init__(self, context, is_lower_case): + super().__init__(context, types=[str]) self._is_lower_case = is_lower_case def check_response(self, value, value_type_name) -> bool: return value.islower() == self._is_lower_case + def get_reason(self, value, value_type_name) -> str: + if self._is_lower_case: + chars = [] + + for char in value: + if not char.lower() == char: + chars.append(char) + + if len(chars) == 1: + reason = f'The response "{value}" contains a uppercase character: "{chars[0]}".' + else: + reason = f'The response "{value}" contains uppercase characters: {chars}.' + else: + reason = f'The response "{value}" is lowercased.' + + return reason + class _ConstraintMinValue(BaseConstraint): - def __init__(self, min_value): - super().__init__(types=[int, float], is_null_allowed=True) + def __init__(self, context, min_value): + super().__init__(context, types=[int, float], is_null_allowed=True) self._min_value = min_value def check_response(self, value, value_type_name) -> bool: return value >= self._min_value + def get_reason(self, value, value_type_name) -> str: + return f'The response value ({value}) should be greater or equal to the constraint but {value} < {self._min_value}.' + class _ConstraintMaxValue(BaseConstraint): - def __init__(self, max_value): - super().__init__(types=[int, float], is_null_allowed=True) + def __init__(self, context, max_value): + super().__init__(context, types=[int, float], is_null_allowed=True) self._max_value = max_value def check_response(self, value, value_type_name) -> bool: return value <= self._max_value + def get_reason(self, value, value_type_name) -> str: + return f'The response value ({value}) should be lower or equal to the constraint but {value} > {self._max_value}.' + class _ConstraintContains(BaseConstraint): - def __init__(self, contains): - super().__init__(types=[list]) + def __init__(self, context, contains): + super().__init__(context, types=[list]) self._contains = contains def check_response(self, value, value_type_name) -> bool: return set(self._contains).issubset(value) + def get_reason(self, value, value_type_name) -> str: + expected_values = [] + + for expected_value in self._contains: + if expected_value not in value: + expected_values.append(expected_value) + + return f'The response ({value}) is missing {expected_values}.' + class _ConstraintExcludes(BaseConstraint): - def __init__(self, excludes): - super().__init__(types=[list]) + def __init__(self, context, excludes): + super().__init__(context, types=[list]) self._excludes = excludes def check_response(self, value, value_type_name) -> bool: return set(self._excludes).isdisjoint(value) + def get_reason(self, value, value_type_name) -> str: + unexpected_values = [] + + for unexpected_value in self._excludes: + if unexpected_value in value: + unexpected_values.append(unexpected_value) + + return f'The response ({value}) contains {unexpected_values}.' + class _ConstraintHasMaskSet(BaseConstraint): - def __init__(self, has_masks_set): - super().__init__(types=[int]) + def __init__(self, context, has_masks_set): + super().__init__(context, types=[int]) self._has_masks_set = has_masks_set def check_response(self, value, value_type_name) -> bool: return all([(value & mask) == mask for mask in self._has_masks_set]) + def get_reason(self, value, value_type_name) -> str: + expected_masks = [] + + for expected_mask in self._has_masks_set: + if (value & expected_mask) != expected_mask: + expected_masks.append(hex(expected_mask)) + + return f'The response ({hex(value)}) does not match the masks: {expected_masks}.' + class _ConstraintHasMaskClear(BaseConstraint): - def __init__(self, has_masks_clear): - super().__init__(types=[int]) + def __init__(self, context, has_masks_clear): + super().__init__(context, types=[int]) self._has_masks_clear = has_masks_clear def check_response(self, value, value_type_name) -> bool: return all([(value & mask) == 0 for mask in self._has_masks_clear]) + def get_reason(self, value, value_type_name) -> str: + unexpected_masks = [] + + for unexpected_mask in self._has_masks_clear: + if (value & unexpected_mask) == unexpected_mask: + unexpected_masks.append(hex(unexpected_mask)) + + return f'The response ({hex(value)}) match the masks: {unexpected_masks}.' + class _ConstraintNotValue(BaseConstraint): - def __init__(self, not_value): - super().__init__(types=[], is_null_allowed=True) + def __init__(self, context, not_value): + super().__init__(context, types=[], is_null_allowed=True) self._not_value = not_value def check_response(self, value, value_type_name) -> bool: return value != self._not_value + def get_reason(self, value, value_type_name) -> str: + return f'The response value "{value}" should differs from the constraint.' + def get_constraints(constraints: dict) -> list[BaseConstraint]: _constraints = [] + context = constraints for constraint, constraint_value in constraints.items(): if 'hasValue' == constraint: - _constraints.append(_ConstraintHasValue(constraint_value)) + _constraints.append(_ConstraintHasValue( + context, constraint_value)) elif 'type' == constraint: - _constraints.append(_ConstraintType(constraint_value)) + _constraints.append(_ConstraintType(context, constraint_value)) elif 'minLength' == constraint: - _constraints.append(_ConstraintMinLength(constraint_value)) + _constraints.append(_ConstraintMinLength( + context, constraint_value)) elif 'maxLength' == constraint: - _constraints.append(_ConstraintMaxLength(constraint_value)) + _constraints.append(_ConstraintMaxLength( + context, constraint_value)) elif 'isHexString' == constraint: - _constraints.append(_ConstraintIsHexString(constraint_value)) + _constraints.append(_ConstraintIsHexString( + context, constraint_value)) elif 'startsWith' == constraint: - _constraints.append(_ConstraintStartsWith(constraint_value)) + _constraints.append(_ConstraintStartsWith( + context, constraint_value)) elif 'endsWith' == constraint: - _constraints.append(_ConstraintEndsWith(constraint_value)) + _constraints.append(_ConstraintEndsWith( + context, constraint_value)) elif 'isUpperCase' == constraint: - _constraints.append(_ConstraintIsUpperCase(constraint_value)) + _constraints.append(_ConstraintIsUpperCase( + context, constraint_value)) elif 'isLowerCase' == constraint: - _constraints.append(_ConstraintIsLowerCase(constraint_value)) + _constraints.append(_ConstraintIsLowerCase( + context, constraint_value)) elif 'minValue' == constraint: - _constraints.append(_ConstraintMinValue(constraint_value)) + _constraints.append(_ConstraintMinValue( + context, constraint_value)) elif 'maxValue' == constraint: - _constraints.append(_ConstraintMaxValue(constraint_value)) + _constraints.append(_ConstraintMaxValue( + context, constraint_value)) elif 'contains' == constraint: - _constraints.append(_ConstraintContains(constraint_value)) + _constraints.append(_ConstraintContains( + context, constraint_value)) elif 'excludes' == constraint: - _constraints.append(_ConstraintExcludes(constraint_value)) + _constraints.append(_ConstraintExcludes( + context, constraint_value)) elif 'hasMasksSet' == constraint: - _constraints.append(_ConstraintHasMaskSet(constraint_value)) + _constraints.append(_ConstraintHasMaskSet( + context, constraint_value)) elif 'hasMasksClear' == constraint: - _constraints.append(_ConstraintHasMaskClear(constraint_value)) + _constraints.append(_ConstraintHasMaskClear( + context, constraint_value)) elif 'notValue' == constraint: - _constraints.append(_ConstraintNotValue(constraint_value)) + _constraints.append(_ConstraintNotValue( + context, constraint_value)) else: raise ConstraintParseError(f'Unknown constraint type:{constraint}') diff --git a/scripts/py_matter_yamltests/matter_yamltests/errors.py b/scripts/py_matter_yamltests/matter_yamltests/errors.py index 04ca6d7c049eb0..97a71a24133106 100644 --- a/scripts/py_matter_yamltests/matter_yamltests/errors.py +++ b/scripts/py_matter_yamltests/matter_yamltests/errors.py @@ -41,8 +41,11 @@ def update_context(self, context, step_index): def tag_key_with_error(self, content, target_key): self.__tag_key(content, target_key, _ERROR_START_TAG, _ERROR_END_TAG) + def untag_keys_with_error(self, content): + self.__untag_keys(content, _ERROR_START_TAG, _ERROR_END_TAG) + def __tag_key(self, content, target_key, tag_start, tag_end): - # This method replace the key for the dictionary with the tag provided while preserving the order of the dictionary + """This method replaces the key for the dictionary with the tag provided while preserving the order of the dictionary.""" reversed_dictionary = {} # Build a reversed dictionary, tagging the target key. @@ -53,7 +56,25 @@ def __tag_key(self, content, target_key, tag_start, tag_end): else: reversed_dictionary[key] = value - # Revert back the the dictionary to the original order. + # Revert back the dictionary to the original order. + for _ in range(len(reversed_dictionary)): + key, value = reversed_dictionary.popitem() + content[key] = value + + def __untag_keys(self, content, tag_start, tag_end): + """This method replaces the tagged key for the dictionary with the original key while preserving the order of the dictionary.""" + reversed_dictionary = {} + + # Build a reversed dictionary, untagging the tagged key. + for _ in range(len(content)): + key, value = content.popitem() + if key.startswith(tag_start) and key.endswith(tag_end): + reversed_dictionary[key.replace( + tag_start, '').replace(tag_end, '')] = value + else: + reversed_dictionary[key] = value + + # Revert back the dictionary to the original order. for _ in range(len(reversed_dictionary)): key, value = reversed_dictionary.popitem() content[key] = value diff --git a/scripts/py_matter_yamltests/matter_yamltests/parser.py b/scripts/py_matter_yamltests/matter_yamltests/parser.py index 5cef3f773405ef..bbef4205f98b48 100644 --- a/scripts/py_matter_yamltests/matter_yamltests/parser.py +++ b/scripts/py_matter_yamltests/matter_yamltests/parser.py @@ -49,10 +49,11 @@ class PostProcessCheck: it was successful or not. ''' - def __init__(self, state: PostProcessCheckStatus, category: PostProcessCheckType, message: str): + def __init__(self, state: PostProcessCheckStatus, category: PostProcessCheckType, message: str, exception=None): self.state = state self.category = category self.message = message + self.exception = exception def is_success(self) -> bool: return self.state == PostProcessCheckStatus.SUCCESS @@ -88,9 +89,10 @@ def warning(self, category: PostProcessCheckType, message: str): self._insert(PostProcessCheckStatus.WARNING, category, message) self.warnings += 1 - def error(self, category: PostProcessCheckType, message: str): + def error(self, category: PostProcessCheckType, message: str, exception: TestStepError = None): '''Adds an error entry that occured when post processing response to results.''' - self._insert(PostProcessCheckStatus.ERROR, category, message) + self._insert(PostProcessCheckStatus.ERROR, + category, message, exception) self.errors += 1 def is_success(self): @@ -101,8 +103,8 @@ def is_success(self): def is_failure(self): return self.errors != 0 - def _insert(self, state: PostProcessCheckStatus, category: PostProcessCheckType, message: str): - log = PostProcessCheck(state, category, message) + def _insert(self, state: PostProcessCheckStatus, category: PostProcessCheckType, message: str, exception: Exception = None): + log = PostProcessCheck(state, category, message, exception) self.entries.append(log) @@ -376,8 +378,9 @@ class TestStep: and saves any variables that might be required but test step that have yet to be executed. ''' - def __init__(self, test: _TestStepWithPlaceholders, runtime_config_variable_storage: dict): + def __init__(self, test: _TestStepWithPlaceholders, step_index: int, runtime_config_variable_storage: dict): self._test = test + self._step_index = step_index self._runtime_config_variable_storage = runtime_config_variable_storage self.arguments = copy.deepcopy(test.arguments_with_placeholders) self.responses = copy.deepcopy(test.responses_with_placeholders) @@ -391,6 +394,10 @@ def __init__(self, test: _TestStepWithPlaceholders, runtime_config_variable_stor test.update_arguments(self.arguments) test.update_responses(self.responses) + @property + def step_index(self): + return self._step_index + @property def is_enabled(self): return self._test.is_enabled @@ -711,11 +718,15 @@ def _response_constraints_validation(self, expected_response, received_response, constraints = get_constraints(value['constraints']) - if all([constraint.is_met(received_value, response_type_name) for constraint in constraints]): - result.success(check_type, error_success) - else: - # TODO would be helpful to be more verbose here - result.error(check_type, error_failure) + for constraint in constraints: + try: + if constraint.is_met(received_value, response_type_name): + result.success(check_type, error_success) + else: + result.error(check_type, error_failure) + except TestStepError as e: + e.update_context(expected_response, self.step_index) + result.error(check_type, error_failure, e) def _maybe_save_as(self, expected_response, received_response, result): check_type = PostProcessCheckType.SAVE_AS_VARIABLE @@ -841,7 +852,8 @@ def __iter__(self): def __next__(self) -> TestStep: if self._index < self.count: test = self._tests[self._index] - test_step = TestStep(test, self._runtime_config_variable_storage) + test_step = TestStep(test, self._index + 1, + self._runtime_config_variable_storage) self._index += 1 return test_step