From cf624153b349a51492ae2bc683eecfe6385643ea Mon Sep 17 00:00:00 2001 From: Terence Hampson Date: Tue, 3 Jan 2023 12:14:15 -0500 Subject: [PATCH] Convert former python yaml tests parser to be a runner/adapter (#24103) * Convert former python yaml tests parser to be a runner/adapter A common intermediary python YAML test parser has been created. This removes all the YAML python parsing bits from the current chip-repl version. It then turns that former parse into a runner that converts TestStep actions from the common parsed structure to something that chip-repl can execute. The responses from the accessory are then translated back to something the common parser expectes to validate the response. * Address PR comments --- scripts/tests/yamltests/fixes.py | 2 - src/controller/python/BUILD.gn | 4 +- src/controller/python/chip/yaml/__init__.py | 2 +- .../python/chip/yaml/constraints.py | 221 --------- .../python/chip/yaml/data_model_lookup.py | 11 + src/controller/python/chip/yaml/errors.py | 2 +- .../python/chip/yaml/format_converter.py | 184 ++++--- src/controller/python/chip/yaml/parser.py | 467 ------------------ src/controller/python/chip/yaml/runner.py | 386 +++++++++++++++ .../python/chip/yaml/variable_storage.py | 37 -- .../unit_tests/test_yaml_format_converter.py | 121 ----- 11 files changed, 483 insertions(+), 954 deletions(-) delete mode 100644 src/controller/python/chip/yaml/constraints.py delete mode 100644 src/controller/python/chip/yaml/parser.py create mode 100644 src/controller/python/chip/yaml/runner.py delete mode 100644 src/controller/python/chip/yaml/variable_storage.py delete mode 100644 src/controller/python/test/unit_tests/test_yaml_format_converter.py diff --git a/scripts/tests/yamltests/fixes.py b/scripts/tests/yamltests/fixes.py index a0126b003f9a2e..676739d5ae3f6b 100644 --- a/scripts/tests/yamltests/fixes.py +++ b/scripts/tests/yamltests/fixes.py @@ -54,8 +54,6 @@ def try_apply_yaml_float_written_as_strings(value): return value -# TODO(thampson) This method is a clone of the method in -# src/controller/python/chip/yaml/format_converter.py and should eventually be removed in that file. def convert_yaml_octet_string_to_bytes(s: str) -> bytes: '''Convert YAML octet string body to bytes. diff --git a/src/controller/python/BUILD.gn b/src/controller/python/BUILD.gn index a0381f09618a5f..4f4c09cef32ad5 100644 --- a/src/controller/python/BUILD.gn +++ b/src/controller/python/BUILD.gn @@ -230,12 +230,10 @@ chip_python_wheel_action("chip-core") { "chip/utils/CommissioningBuildingBlocks.py", "chip/utils/__init__.py", "chip/yaml/__init__.py", - "chip/yaml/constraints.py", "chip/yaml/data_model_lookup.py", "chip/yaml/errors.py", "chip/yaml/format_converter.py", - "chip/yaml/parser.py", - "chip/yaml/variable_storage.py", + "chip/yaml/runner.py", ] if (chip_controller) { diff --git a/src/controller/python/chip/yaml/__init__.py b/src/controller/python/chip/yaml/__init__.py index 055bec97cafac4..08850e21feea48 100644 --- a/src/controller/python/chip/yaml/__init__.py +++ b/src/controller/python/chip/yaml/__init__.py @@ -20,4 +20,4 @@ # Provides Python APIs for Matter. """Provides yaml parser Python APIs for Matter.""" -from . import parser +from . import runner diff --git a/src/controller/python/chip/yaml/constraints.py b/src/controller/python/chip/yaml/constraints.py deleted file mode 100644 index 49ac8c99153eff..00000000000000 --- a/src/controller/python/chip/yaml/constraints.py +++ /dev/null @@ -1,221 +0,0 @@ -# -# Copyright (c) 2022 Project CHIP Authors -# All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from abc import ABC, abstractmethod -import chip.yaml.format_converter as Converter -from .variable_storage import VariableStorage - - -class ConstraintValidationError(Exception): - def __init__(self, message): - super().__init__(message) - - -class BaseConstraint(ABC): - '''Constrain Interface''' - - @abstractmethod - def is_met(self, response) -> bool: - pass - - -class _LoadableConstraint(BaseConstraint): - '''Constraints where value might be stored in VariableStorage needing runtime load.''' - - def __init__(self, value, field_type, variable_storage: VariableStorage, config_values: dict): - self._variable_storage = variable_storage - # When not none _indirect_value_key is binding a name to the constraint value, and the - # actual value can only be looked-up dynamically, which is why this is a key name. - self._indirect_value_key = None - self._value = None - - if value is None: - # Default values set above is all we need here. - return - - if isinstance(value, str) and self._variable_storage.is_key_saved(value): - self._indirect_value_key = value - else: - self._value = Converter.parse_and_convert_yaml_value( - value, field_type, config_values) - - def get_value(self): - '''Gets the current value of the constraint. - - This method accounts for getting the runtime saved value from DUT previous responses. - ''' - if self._indirect_value_key: - return self._variable_storage.load(self._indirect_value_key) - return self._value - - -class _ConstraintHasValue(BaseConstraint): - def __init__(self, has_value): - self._has_value = has_value - - def is_met(self, response) -> bool: - raise ConstraintValidationError('HasValue constraint currently not implemented') - - -class _ConstraintType(BaseConstraint): - def __init__(self, type): - self._type = type - - def is_met(self, response) -> bool: - raise ConstraintValidationError('Type constraint currently not implemented') - - -class _ConstraintStartsWith(BaseConstraint): - def __init__(self, starts_with): - self._starts_with = starts_with - - def is_met(self, response) -> bool: - return response.startswith(self._starts_with) - - -class _ConstraintEndsWith(BaseConstraint): - def __init__(self, ends_with): - self._ends_with = ends_with - - def is_met(self, response) -> bool: - return response.endswith(self._ends_with) - - -class _ConstraintIsUpperCase(BaseConstraint): - def __init__(self, is_upper_case): - self._is_upper_case = is_upper_case - - def is_met(self, response) -> bool: - return response.isupper() == self._is_upper_case - - -class _ConstraintIsLowerCase(BaseConstraint): - def __init__(self, is_lower_case): - self._is_lower_case = is_lower_case - - def is_met(self, response) -> bool: - return response.islower() == self._is_lower_case - - -class _ConstraintMinValue(_LoadableConstraint): - def __init__(self, min_value, field_type, variable_storage: VariableStorage, - config_values: dict): - super().__init__(min_value, field_type, variable_storage, config_values) - - def is_met(self, response) -> bool: - min_value = self.get_value() - return response >= min_value - - -class _ConstraintMaxValue(_LoadableConstraint): - def __init__(self, max_value, field_type, variable_storage: VariableStorage, - config_values: dict): - super().__init__(max_value, field_type, variable_storage, config_values) - - def is_met(self, response) -> bool: - max_value = self.get_value() - return response <= max_value - - -class _ConstraintContains(BaseConstraint): - def __init__(self, contains): - self._contains = contains - - def is_met(self, response) -> bool: - return set(self._contains).issubset(response) - - -class _ConstraintExcludes(BaseConstraint): - def __init__(self, excludes): - self._excludes = excludes - - def is_met(self, response) -> bool: - return set(self._excludes).isdisjoint(response) - - -class _ConstraintHasMaskSet(BaseConstraint): - def __init__(self, has_masks_set): - self._has_masks_set = has_masks_set - - def is_met(self, response) -> bool: - return all([(response & mask) == mask for mask in self._has_masks_set]) - - -class _ConstraintHasMaskClear(BaseConstraint): - def __init__(self, has_masks_clear): - self._has_masks_clear = has_masks_clear - - def is_met(self, response) -> bool: - return all([(response & mask) == 0 for mask in self._has_masks_clear]) - - -class _ConstraintNotValue(_LoadableConstraint): - def __init__(self, not_value, field_type, variable_storage: VariableStorage, - config_values: dict): - super().__init__(not_value, field_type, variable_storage, config_values) - - def is_met(self, response) -> bool: - not_value = self.get_value() - return response != not_value - - -def get_constraints(constraints, field_type, variable_storage: VariableStorage, - config_values: dict) -> list[BaseConstraint]: - _constraints = [] - if 'hasValue' in constraints: - _constraints.append(_ConstraintHasValue(constraints.get('hasValue'))) - - if 'type' in constraints: - _constraints.append(_ConstraintType(constraints.get('type'))) - - if 'startsWith' in constraints: - _constraints.append(_ConstraintStartsWith(constraints.get('startsWith'))) - - if 'endsWith' in constraints: - _constraints.append(_ConstraintEndsWith(constraints.get('endsWith'))) - - if 'isUpperCase' in constraints: - _constraints.append(_ConstraintIsUpperCase(constraints.get('isUpperCase'))) - - if 'isLowerCase' in constraints: - _constraints.append(_ConstraintIsLowerCase(constraints.get('isLowerCase'))) - - if 'minValue' in constraints: - _constraints.append(_ConstraintMinValue( - constraints.get('minValue'), field_type, variable_storage, config_values)) - - if 'maxValue' in constraints: - _constraints.append(_ConstraintMaxValue( - constraints.get('maxValue'), field_type, variable_storage, config_values)) - - if 'contains' in constraints: - _constraints.append(_ConstraintContains(constraints.get('contains'))) - - if 'excludes' in constraints: - _constraints.append(_ConstraintExcludes(constraints.get('excludes'))) - - if 'hasMasksSet' in constraints: - _constraints.append(_ConstraintHasMaskSet(constraints.get('hasMasksSet'))) - - if 'hasMasksClear' in constraints: - _constraints.append(_ConstraintHasMaskClear(constraints.get('hasMasksClear'))) - - if 'notValue' in constraints: - _constraints.append(_ConstraintNotValue( - constraints.get('notValue'), field_type, variable_storage, config_values)) - - return _constraints diff --git a/src/controller/python/chip/yaml/data_model_lookup.py b/src/controller/python/chip/yaml/data_model_lookup.py index 1e0c3ffe6f3f14..215ac8fcbd9380 100644 --- a/src/controller/python/chip/yaml/data_model_lookup.py +++ b/src/controller/python/chip/yaml/data_model_lookup.py @@ -32,6 +32,10 @@ def get_command(self, cluster: str, command: str): def get_attribute(self, cluster: str, attribute: str): pass + @abstractmethod + def get_event(self, cluster: str, event: str): + pass + class PreDefinedDataModelLookup(DataModelLookup): def get_cluster(self, cluster: str): @@ -53,3 +57,10 @@ def get_attribute(self, cluster: str, attribute: str): return getattr(attributes, attribute, None) except AttributeError: return None + + def get_event(self, cluster: str, event: str): + try: + events = getattr(Clusters, cluster, None).Events + return getattr(events, event, None) + except AttributeError: + return None diff --git a/src/controller/python/chip/yaml/errors.py b/src/controller/python/chip/yaml/errors.py index e6c90123d1baad..092128f5b8d90e 100644 --- a/src/controller/python/chip/yaml/errors.py +++ b/src/controller/python/chip/yaml/errors.py @@ -20,7 +20,7 @@ def __init__(self, message): super().__init__(message) -class UnexpectedParsingError(ParsingError): +class UnexpectedParsingError(ValueError): def __init__(self, message): super().__init__(message) diff --git a/src/controller/python/chip/yaml/format_converter.py b/src/controller/python/chip/yaml/format_converter.py index fc3c5a1b873cfc..ce2edfdc6af4eb 100644 --- a/src/controller/python/chip/yaml/format_converter.py +++ b/src/controller/python/chip/yaml/format_converter.py @@ -20,73 +20,88 @@ from chip.tlv import uint, float32 import enum from chip.yaml.errors import ValidationError -import binascii -def substitute_in_config_variables(field_value, config_values: dict): - ''' Substitutes values that are config variables. +def _case_insensitive_getattr(object, attr_name, default): + for attr in dir(object): + if attr.lower() == attr_name.lower(): + return getattr(object, attr) + return default - YAML values can contain a string of a configuration variable name. In these instances we - substitute the configuration variable name with the actual value. - For examples see unittest src/controller/python/test/unit_tests/test_yaml_format_converter.py +def _get_target_type_fields(test_spec_definition, cluster_name, target_name): + element = test_spec_definition.get_type_by_name(cluster_name, target_name) + if hasattr(element, 'fields'): + return element.fields + return None - # TODO This should also substitue any saveAs values as well as perform any required - # evaluations. + +def from_data_model_to_test_definition(test_spec_definition, cluster_name, response_definition, + response_value): + '''Converts value from data model to definitions provided in test_spec_definition. Args: - 'field_value': Value as extracted from YAML. - 'config_values': Dictionary of global configuration variables. - Returns: - Value with all global configuration variables substituted with the real value. - ''' - if isinstance(field_value, dict): - return {key: substitute_in_config_variables( - field_value[key], config_values) for key in field_value} - if isinstance(field_value, list): - return [substitute_in_config_variables(item, config_values) for item in field_value] - if isinstance(field_value, str) and field_value in config_values: - config_value = config_values[field_value] - if isinstance(config_value, dict) and 'defaultValue' in config_value: - # TODO currently we don't validate that if config_value['type'] is provided - # that the type does in fact match our expectation. - return config_value['defaultValue'] - return config_values[field_value] - - return field_value - - -def convert_yaml_octet_string_to_bytes(s: str) -> bytes: - '''Convert YAML octet string body to bytes. - - Included handling any c-style hex escapes (e.g. \x5a) and 'hex:' prefix. + 'test_spec_definition': The spec cluster definition used by the test parser. + 'cluster_name': Used when we need to look up information in 'test_spec_definition'. + 'response_definition': Type we are converting 'response_value' to. This will be one of + two types: list[idl.matter_idl_types.Field] or idl.matter_idl_types.Field + 'response_value': Response value that we want to convert to ''' - # Step 1: handle explicit "hex:" prefix - if s.startswith('hex:'): - return binascii.unhexlify(s[4:]) - - # Step 2: convert non-hex-prefixed to bytes - # TODO(#23669): This does not properly support utf8 octet strings. We mimic - # javascript codegen behavior. Behavior of javascript is: - # * Octet string character >= u+0200 errors out. - # * Any character greater than 0xFF has the upper bytes chopped off. - as_bytes = [ord(c) for c in s] - - if any([value > 0x200 for value in as_bytes]): - raise ValueError('Unsupported char in octet string %r' % as_bytes) - accumulated_hex = ''.join([f"{(v & 0xFF):02x}" for v in as_bytes]) - return binascii.unhexlify(accumulated_hex) - - -def convert_name_value_pair_to_dict(arg_values): - ''' Fix yaml command arguments. - - For some reason, instead of treating the entire data payload of a - command as a singular struct, the top-level args are specified as 'name' - and 'value' pairs, while the payload of each argument is itself - correctly encapsulated. This fixes up this oddity to create a new - key/value pair with the key being the value of the 'name' field, and - the value being 'value' field. + if response_value is None: + return response_value + + # We first check to see if response_definition is list[idl.matter_idl_types.Field]. When we + # have list[idl.matter_idl_types.Field] that means we have a structure with multiple fields + # that need to be worked through recursively to properly convert the value to the right type. + if isinstance(response_definition, list): + rv = {} + for item in response_definition: + value = _case_insensitive_getattr(response_value, item.name, None) + if item.is_optional and value is None: + continue + rv[item.name] = from_data_model_to_test_definition(test_spec_definition, cluster_name, + item, value) + return rv + + # We convert uint to python int because constraints first check that it is an expected type. + response_value_type = type(response_value) + if response_value_type == uint: + return int(response_value) + + if response_definition is None: + return response_value + + if response_value is NullValue: + return None + + # For single float values types there seems to be a floating precision issue. By using '%g' + # it naturally give 6 most significat digits for us which is the amount of prcision we are + # looking for to give parity results to what chip-tool was getting (For TestCluster.yaml it + # give value back of `0.100000`. + if response_value_type == float32 and response_definition.data_type.name.lower() == 'single': + return float('%g' % response_value) + + response_sub_definition = _get_target_type_fields(test_spec_definition, cluster_name, + response_definition.data_type.name) + + # Check below is to see if the field itself is an array, for example array of ints. + if response_definition.is_list: + return [ + from_data_model_to_test_definition(test_spec_definition, cluster_name, + response_sub_definition, item) for item in response_value + ] + + return from_data_model_to_test_definition(test_spec_definition, cluster_name, + response_sub_definition, response_value) + + +def convert_list_of_name_value_pair_to_dict(arg_values): + '''Converts list of dict with items with keys 'name' and 'value' into single dict. + + The test step contains a list of arguments that have multiple properties other than + 'name' and 'value'. For the purposes of executing a test all these other attributes are not + important. We only want a simple dictionary of a new key/value where with the key being the + value of the 'name' field, and the value being 'value' field. ''' ret_value = {} @@ -96,21 +111,16 @@ def convert_name_value_pair_to_dict(arg_values): return ret_value -def convert_yaml_type(field_value, field_type, inline_cast_dict_to_struct): - ''' Converts yaml value to provided pythonic type. +def convert_to_data_model_type(field_value, field_type): + '''Converts value to provided data model pythonic object type. - The YAML representation when converted to a dictionary does not line up to - the python type data model for the various command/attribute/event object - types. This function converts 'field_value' to the appropriate provided + The values provided by parser does not line up to the python data model for the various + command/attribute/event object types. This function converts 'field_value' to the provided 'field_type'. Args: - 'field_value': Value as extracted from yaml - 'field_type': Pythonic command/attribute/event object type that we - are converting value to. - 'inline_cast_dict_to_struct': If true, for any dictionary 'field_value' - types provided we will do a convertion to the corresponding data - model class in `field_type` by doing field_type.FromDict(...). + 'field_value': Value as extracted by YAML parser. + 'field_type': Pythonic command/attribute/event object type that we are converting value to. ''' origin = typing.get_origin(field_type) @@ -152,10 +162,8 @@ def convert_yaml_type(field_value, field_type, inline_cast_dict_to_struct): raise ValidationError( f'Did not find field "{item}" in {str(field_type)}') from None - return_field_value[field_descriptor.Label] = convert_yaml_type( - field_value[item], field_descriptor.Type, inline_cast_dict_to_struct) - if inline_cast_dict_to_struct: - return field_type.FromDict(return_field_value) + return_field_value[field_descriptor.Label] = convert_to_data_model_type( + field_value[item], field_descriptor.Type) return return_field_value elif(type(field_value) is float): return float32(field_value) @@ -165,8 +173,7 @@ def convert_yaml_type(field_value, field_type, inline_cast_dict_to_struct): # The field type passed in is the type of the list element and not list[T]. for idx, item in enumerate(field_value): - field_value[idx] = convert_yaml_type(item, list_element_type, - inline_cast_dict_to_struct) + field_value[idx] = convert_to_data_model_type(item, list_element_type) return field_value # YAML conversion treats all numbers as ints. Convert to a uint type if the schema # type indicates so. @@ -177,31 +184,6 @@ def convert_yaml_type(field_value, field_type, inline_cast_dict_to_struct): # YAML treats enums as ints. Convert to the typed enum class. elif (issubclass(field_type, enum.Enum)): return field_type(field_value) - # YAML treats bytes as strings. Convert to a byte string. - elif (field_type == bytes and type(field_value) != bytes): - return convert_yaml_octet_string_to_bytes(field_value) # By default, just return the field_value casted to field_type. else: return field_type(field_value) - - -def parse_and_convert_yaml_value(field_value, field_type, config_values: dict, - inline_cast_dict_to_struct: bool = False): - ''' Parse and converts YAML type - - Parsing the YAML value means performing required substitutions and evaluations. Parsing is - then followed by converting from the YAML type done using yaml.safe_load() to the type used in - the various command/attribute/event object data model types. - - Args: - 'field_value': Value as extracted from yaml to be parsed - 'field_type': Pythonic command/attribute/event object type that we - are converting value to. - 'config_values': Dictionary of global configuration variables. - 'inline_cast_dict_to_struct': If true, for any dictionary 'field_value' - types provided we will do an inline convertion to the corresponding - struct in `field_type` by doing field_type.FromDict(...). - ''' - field_value_with_config_variables = substitute_in_config_variables(field_value, config_values) - return convert_yaml_type(field_value_with_config_variables, field_type, - inline_cast_dict_to_struct) diff --git a/src/controller/python/chip/yaml/parser.py b/src/controller/python/chip/yaml/parser.py deleted file mode 100644 index b70a743147063d..00000000000000 --- a/src/controller/python/chip/yaml/parser.py +++ /dev/null @@ -1,467 +0,0 @@ -# -# Copyright (c) 2022 Project CHIP Authors -# All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from abc import ABC, abstractmethod -from dataclasses import dataclass, field -from chip import ChipDeviceCtrl -from chip.tlv import float32 -import yaml -import stringcase -import chip.interaction_model -import asyncio as asyncio -import logging -import math -from chip.yaml.errors import ParsingError, UnexpectedParsingError -from .data_model_lookup import * -import chip.yaml.format_converter as Converter -from .variable_storage import VariableStorage -from .constraints import get_constraints - -_SUCCESS_STATUS_CODE = "SUCCESS" -_NODE_ID_DEFAULT = 0x12345 -_ENDPOINT_DETAULT = '' # TODO why is this an empty string -_CLUSTER_DEFAULT = '' -_TIMEOUT_DEFAULT = 90 -logger = logging.getLogger('YamlParser') - - -@dataclass -class _ExecutionContext: - ''' Objects that is commonly passed around this file that are vital to test execution.''' - # Data model lookup to get python attribute, cluster, command object. - data_model_lookup: DataModelLookup = None - # Where various test action response are stored and loaded from. - variable_storage: VariableStorage = None - # Top level configuration values for a yaml test. - config_values: dict = None - - -class _VariableToSave: - def __init__(self, variable_name: str, variable_storage: VariableStorage): - self._variable_name = variable_name - self._variable_storage = variable_storage - self._variable_storage.save(self._variable_name, None) - - def save_response(self, value): - self._variable_storage.save(self._variable_name, value) - - -class _ExpectedResponse: - def __init__(self, value, response_type, context: _ExecutionContext): - self._load_expected_response_in_verify = None - self._expected_response_type = response_type - self._expected_response = None - self._variable_storage = context.variable_storage - if isinstance(value, str) and self._variable_storage.is_key_saved(value): - self._load_expected_response_in_verify = value - else: - self._expected_response = Converter.parse_and_convert_yaml_value( - value, response_type, context.config_values, inline_cast_dict_to_struct=True) - - def verify(self, response): - if (self._expected_response_type is None): - return True - - if self._load_expected_response_in_verify is not None: - self._expected_response = self._variable_storage.load( - self._load_expected_response_in_verify) - - if isinstance(self._expected_response_type, float32): - if not math.isclose(self._expected_response, response, rel_tol=1e-6): - logger.error(f"Expected response {self._expected_response} didn't match " - f"actual object {response}") - return False - - if (self._expected_response != response): - logger.error(f"Expected response {self._expected_response} didn't match " - f"actual object {response}") - return False - return True - - -class BaseAction(ABC): - '''Interface for a single yaml action that is to be executed.''' - - def __init__(self, label): - self._label = label - - @property - def label(self): - return self._label - - @abstractmethod - def run_action(self, dev_ctrl: ChipDeviceCtrl, endpoint: int, node_id: int): - pass - - -class InvokeAction(BaseAction): - '''Single invoke action to be executed including validation of response.''' - - def __init__(self, item: dict, cluster: str, context: _ExecutionContext): - '''Parse cluster invoke from yaml test configuration. - - Args: - 'item': Dictionary containing single invoke to be parsed. - 'cluster': Name of cluster which to invoke action is targeting. - 'context': Contains test-wide common objects such as DataModelLookup instance, storage - for device responses and top level test configurations variable. - Raises: - ParsingError: Raised if there is a benign error, and there is currently no - action to perform for this write attribute. - UnexpectedParsingError: Raised if there is an unexpected parsing error. - ''' - super().__init__(item['label']) - self._command_name = stringcase.pascalcase(item['command']) - self._cluster = cluster - self._request_object = None - self._expected_raw_response: dict = field(default_factory=dict) - self._expected_response_object = None - - command = context.data_model_lookup.get_command( - self._cluster, self._command_name) - - if command is None: - raise ParsingError( - f'Failed to find cluster:{self._cluster} Command:{self._command_name}') - - command_object = command() - if (item.get('arguments')): - args = item['arguments']['values'] - - request_data_as_dict = Converter.convert_name_value_pair_to_dict(args) - - try: - request_data = Converter.parse_and_convert_yaml_value( - request_data_as_dict, type(command_object), context.config_values) - except ValueError: - raise ParsingError('Could not covert yaml type') - - # Create a cluster object for the request from the provided YAML data. - self._request_object = command_object.FromDict(request_data) - else: - self._request_object = command_object - - self._expected_raw_response = item.get('response') - - if (self._request_object.response_type is not None and - self._expected_raw_response is not None and - self._expected_raw_response.get('values')): - response_type = stringcase.pascalcase(self._request_object.response_type) - expected_command = context.data_model_lookup.get_command(self._cluster, - response_type) - expected_response_args = self._expected_raw_response['values'] - expected_response_data_as_dict = Converter.convert_name_value_pair_to_dict( - expected_response_args) - expected_response_data = Converter.parse_and_convert_yaml_value( - expected_response_data_as_dict, expected_command, context.config_values) - self._expected_response_object = expected_command.FromDict(expected_response_data) - - def run_action(self, dev_ctrl: ChipDeviceCtrl, endpoint: int, node_id: int): - try: - resp = asyncio.run(dev_ctrl.SendCommand(node_id, endpoint, self._request_object)) - except chip.interaction_model.InteractionModelError: - if self._expected_raw_response is None: - raise - - expected_status_code = self._expected_raw_response.get('error') - if expected_status_code is not None and expected_status_code != _SUCCESS_STATUS_CODE: - logger.debug('Got error response, but was expected') - else: - raise - - if (self._expected_response_object is not None): - if (self._expected_response_object != resp): - logger.error(f'Expected response {self._expected_response_object} did not match ' - f'actual object {resp}') - - -class ReadAttributeAction(BaseAction): - '''Single read attribute action to be executed including validation.''' - - def __init__(self, item: dict, cluster: str, context: _ExecutionContext): - '''Parse read attribute action from yaml test configuration. - - Args: - 'item': Dictionary contains single read attribute action to be parsed. - 'cluster': Name of cluster read attribute action is targeting. - 'context': Contains test-wide common objects such as DataModelLookup instance, storage - for device responses and top level test configurations variable. - Raises: - ParsingError: Raised if there is a benign error, and there is currently no - action to perform for this read attribute. - UnexpectedParsingError: Raised if there is an unexpected parsing error. - ''' - super().__init__(item['label']) - self._attribute_name = stringcase.pascalcase(item['attribute']) - self._constraints = [] - self._cluster = cluster - self._cluster_object = None - self._request_object = None - self._expected_raw_response: dict = field(default_factory=dict) - self._expected_response: _ExpectedResponse = None - self._possibly_unsupported = False - self._variable_to_save = None - - self._cluster_object = context.data_model_lookup.get_cluster(self._cluster) - if self._cluster_object is None: - raise UnexpectedParsingError( - f'ReadAttribute failed to find cluster object:{self._cluster}') - - self._request_object = context.data_model_lookup.get_attribute( - self._cluster, self._attribute_name) - if self._request_object is None: - raise ParsingError( - f'ReadAttribute failed to find cluster:{self._cluster} ' - f'Attribute:{self._attribute_name}') - - if (item.get('arguments')): - raise UnexpectedParsingError( - f'ReadAttribute should not contain arguments. {self.label}') - - if self._request_object.attribute_type is None: - raise UnexpectedParsingError( - f'ReadAttribute doesnt have valid attribute_type. {self.label}') - - if 'optional' in item: - self._possibly_unsupported = True - - self._expected_raw_response = item.get('response') - if (self._expected_raw_response is None): - # TODO actually if response is missing it typically means that we need to confirm - # that we got a successful response. This will be implemented later to consider all - # possible corner cases around that (if there are corner cases). - raise UnexpectedParsingError(f'ReadAttribute missing expected response. {self.label}') - - variable_name = self._expected_raw_response.get('saveAs') - if variable_name: - self._variable_to_save = _VariableToSave(variable_name, context.variable_storage) - - if 'value' in self._expected_raw_response: - expected_response_value = self._expected_raw_response['value'] - self._expected_response = _ExpectedResponse(expected_response_value, - self._request_object.attribute_type.Type, - context) - - constraints = self._expected_raw_response.get('constraints') - if constraints: - self._constraints = get_constraints(constraints, - self._request_object.attribute_type.Type, - context.variable_storage, - context.config_values) - - def run_action(self, dev_ctrl: ChipDeviceCtrl, endpoint: int, node_id: int): - try: - resp = asyncio.run(dev_ctrl.ReadAttribute(node_id, [(self._request_object)])) - except chip.interaction_model.InteractionModelError: - if self._expected_raw_response is None: - raise - - expected_status_code = self._expected_raw_response.get('error') - if expected_status_code is not None and expected_status_code != _SUCCESS_STATUS_CODE: - logger.debug('Got error response, but was expected') - else: - raise - - if self._possibly_unsupported and not resp: - # We have found an unsupported attribute. Parsed test did specify that it might be - # unsupported, so nothing left to validate. - return - - # TODO Currently there are no checks that this indexing won't fail. Need to add some - # initial validity checks. Coming soon an a future PR. - parsed_resp = resp[endpoint][self._cluster_object][self._request_object] - - if self._variable_to_save is not None: - self._variable_to_save.save_response(parsed_resp) - - if not all([constraint.is_met(parsed_resp) for constraint in self._constraints]): - logger.error(f'Constraints check failed') - # TODO how should we fail the test here? - - if self._expected_response is not None: - self._expected_response.verify(parsed_resp) - - -class WriteAttributeAction(BaseAction): - '''Single write attribute action to be executed including validation.''' - - def __init__(self, item: dict, cluster: str, context: _ExecutionContext): - '''Parse write attribute action from yaml test configuration. - - Args: - 'item': Dictionary contains single write attribute action to be parsed. - 'cluster': Name of cluster write attribute action is targeting. - 'context': Contains test-wide common objects such as DataModelLookup instance, storage - for device responses and top level test configurations variable. - Raises: - ParsingError: Raised if there is a benign error, and there is currently no - action to perform for this write attribute. - UnexpectedParsingError: Raised if there is an unexpected parsing error. - ''' - super().__init__(item['label']) - self._attribute_name = stringcase.pascalcase(item['attribute']) - self._cluster = cluster - self._request_object = None - - attribute = context.data_model_lookup.get_attribute( - self._cluster, self._attribute_name) - if attribute is None: - raise ParsingError( - f'WriteAttribute failed to find cluster:{self._cluster} ' - f'Attribute:{self._attribute_name}') - - if (item.get('arguments')): - args = item['arguments']['value'] - try: - request_data = Converter.parse_and_convert_yaml_value( - args, attribute.attribute_type.Type, context.config_values) - except ValueError: - raise ParsingError('Could not covert yaml type') - - # Create a cluster object for the request from the provided YAML data. - self._request_object = attribute(request_data) - else: - raise UnexpectedParsingError(f'WriteAttribute action does have arguments {self.label}') - - def run_action(self, dev_ctrl: ChipDeviceCtrl, endpoint: int, node_id: int): - try: - resp = asyncio.run( - dev_ctrl.WriteAttribute(node_id, [(endpoint, self._request_object)])) - except chip.interaction_model.InteractionModelError: - if (self.expected_raw_response is not None and - self.expected_raw_response.get('error')): - logger.debug('Got error, but was expected') - else: - raise - - # TODO: confirm resp give a Success value, although not all write action are expected - # to succeed, hence why this is a todo and not simply just done. Below is example of - # what success check might look like. - # asserts.assert_equal(resp[0].Status, StatusEnum.Success, 'label write must succeed') - - -class YamlTestParser: - '''Parses the test YAMLs and converts to a more natural Pythonic representation. - - The parser also permits execution of those tests there-after. - ''' - - def __init__(self, yaml_path: str): - '''Constructor that parser the given a path to YAML test file.''' - with open(yaml_path, 'r') as stream: - try: - self._raw_data = yaml.safe_load(stream) - except yaml.YAMLError as exc: - raise exc - - if 'name' not in self._raw_data: - raise UnexpectedParsingError("YAML expected to have 'name'") - self._name = self._raw_data['name'] - - if 'config' not in self._raw_data: - raise UnexpectedParsingError("YAML expected to have 'config'") - self._config = self._raw_data['config'] - - self._config.setdefault('nodeId', _NODE_ID_DEFAULT) - self._config.setdefault('endpoint', _ENDPOINT_DETAULT) - self._config.setdefault('cluster', _CLUSTER_DEFAULT) - # TODO timeout is currently not used - self._config.setdefault('timeout', _TIMEOUT_DEFAULT) - - self._config['cluster'] = self._config['cluster'].replace(' ', '').replace('/', '') - self._base_action_test_list = [] - self._context = _ExecutionContext(data_model_lookup=PreDefinedDataModelLookup(), - variable_storage=VariableStorage(), - config_values=self._config) - - for item in self._raw_data['tests']: - # This currently behaves differently than the c++ version. We are evaluating if test - # is disabled before anything else, allowing for incorrectly named commands. - if item.get('disabled'): - logger.info(f"Test is disabled, skipping {item['label']}") - continue - - action = None - cluster = self._config['cluster'] - # Some of the tests contain 'cluster over-rides' that refer to a different - # cluster than that specified in 'config'. - if (item.get('cluster')): - cluster = item.get('cluster').replace(' ', '').replace('/', '') - if item['command'] == 'writeAttribute': - action = self._attribute_write_action_factory(item, cluster) - elif item['command'] == 'readAttribute': - action = self._attribute_read_action_factory(item, cluster) - else: - action = self._invoke_action_factory(item, cluster) - - if action is not None: - self._base_action_test_list.append(action) - else: - logger.warn(f"Failed to parse {item['label']}") - - def _invoke_action_factory(self, item: dict, cluster: str): - '''Parse cluster command from yaml test configuration. - - Args: - 'item': Dictionary contains single cluster action test to be parsed - 'cluster': Name of cluster action is targeting. - Returns: - InvokeAction if 'item' is a valid action to be executed. - None if 'item' was not parsed for a known reason that is not fatal. - ''' - try: - return InvokeAction(item, cluster, self._context) - except ParsingError: - return None - - def _attribute_read_action_factory(self, item: dict, cluster: str): - '''Parse read attribute action from yaml test configuration. - - Args: - 'item': Dictionary contains single read attribute action to be parsed. - 'cluster': Name of cluster read attribute action is targeting. - Returns: - ReadAttributeAction if 'item' is a valid action to be executed. - None if 'item' was not parsed for a known reason that is not fatal. - ''' - try: - return ReadAttributeAction(item, cluster, self._context) - except ParsingError: - return None - - def _attribute_write_action_factory(self, item: dict, cluster: str): - '''Parse write attribute action from yaml test configuration. - - Args: - 'item': Dictionary contains single write attribute action to be parsed. - 'cluster': Name of cluster write attribute action is targeting. - Returns: - WriteAttributeAction if 'item' is a valid action to be executed. - None if 'item' was not parsed for a known reason that is not fatal. - ''' - try: - return WriteAttributeAction(item, cluster, self._context) - except ParsingError: - return None - - def execute_tests(self, dev_ctrl: ChipDeviceCtrl): - '''Executes parsed YAML tests.''' - self._context.variable_storage.clear() - for idx, action in enumerate(self._base_action_test_list): - logger.info(f'test: {idx} -- Executing{action.label}') - - action.run_action(dev_ctrl, self._config['endpoint'], self._config['nodeId']) diff --git a/src/controller/python/chip/yaml/runner.py b/src/controller/python/chip/yaml/runner.py new file mode 100644 index 00000000000000..f55d6ccb2efce7 --- /dev/null +++ b/src/controller/python/chip/yaml/runner.py @@ -0,0 +1,386 @@ +# +# Copyright (c) 2022 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from chip import ChipDeviceCtrl +from enum import Enum +import stringcase +import chip.interaction_model +import asyncio as asyncio +import logging +from chip.yaml.errors import ParsingError, UnexpectedParsingError +from chip.clusters.Attribute import AttributeStatus, ValueDecodeFailure +from .data_model_lookup import * +import chip.yaml.format_converter as Converter + +logger = logging.getLogger('YamlParser') + + +class _ActionStatus(Enum): + SUCCESS = 'success', + ERROR = 'error' + + +@dataclass +class _ActionResult: + status: _ActionStatus + response: object + + +@dataclass +class _ExecutionContext: + ''' Objects that is commonly passed around this file that are vital to test execution.''' + # Data model lookup to get python attribute, cluster, command object. + data_model_lookup: DataModelLookup = None + + +class BaseAction(ABC): + '''Interface for a single YAML action that is to be executed.''' + + def __init__(self, label): + self._label = label + + @property + def label(self): + return self._label + + @abstractmethod + def run_action(self, dev_ctrl: ChipDeviceCtrl) -> _ActionResult: + pass + + +class InvokeAction(BaseAction): + '''Single invoke action to be executed.''' + + def __init__(self, test_step, cluster: str, context: _ExecutionContext): + '''Converts 'test_step' to invoke command action that can execute with ChipDeviceCtrl. + + Args: + 'test_step': Step containing information required to run invoke command action. + 'cluster': Name of cluster which to invoke action is targeting. + 'context': Contains test-wide common objects such as DataModelLookup instance. + Raises: + ParsingError: Raised if there is a benign error, and there is currently no + action to perform for this write attribute. + UnexpectedParsingError: Raised if there is an unexpected parsing error. + ''' + super().__init__(test_step.label) + self._command_name = stringcase.pascalcase(test_step.command) + self._cluster = cluster + self._request_object = None + self._expected_response_object = None + self._endpoint = test_step.endpoint + self._node_id = test_step.node_id + + command = context.data_model_lookup.get_command(self._cluster, self._command_name) + + if command is None: + raise ParsingError( + f'Failed to find cluster:{self._cluster} Command:{self._command_name}') + + command_object = command() + if (test_step.arguments): + args = test_step.arguments['values'] + request_data_as_dict = Converter.convert_list_of_name_value_pair_to_dict(args) + + try: + request_data = Converter.convert_to_data_model_type( + request_data_as_dict, type(command_object)) + except ValueError: + # TODO after allowing out of bounds enums to be written this should be changed to + # UnexpectedParsingError. + raise ParsingError('Could not covert yaml type') + + self._request_object = command_object.FromDict(request_data) + else: + self._request_object = command_object + + def run_action(self, dev_ctrl: ChipDeviceCtrl) -> _ActionResult: + try: + resp = asyncio.run(dev_ctrl.SendCommand(self._node_id, self._endpoint, + self._request_object)) + except chip.interaction_model.InteractionModelError as error: + return _ActionResult(status=_ActionStatus.ERROR, response=error) + + # Commands with no response give a None response. In those cases we return a success + return _ActionResult(status=_ActionStatus.SUCCESS, response=resp) + + +class ReadAttributeAction(BaseAction): + '''Single read attribute action to be executed.''' + + def __init__(self, test_step, cluster: str, context: _ExecutionContext): + '''Converts 'test_step' to read attribute action that can execute with ChipDeviceCtrl. + + Args: + 'test_step': Step containing information required to run read attribute action. + 'cluster': Name of cluster read attribute action is targeting. + 'context': Contains test-wide common objects such as DataModelLookup instance. + Raises: + ParsingError: Raised if there is a benign error, and there is currently no + action to perform for this read attribute. + UnexpectedParsingError: Raised if there is an unexpected parsing error. + ''' + super().__init__(test_step.label) + self._attribute_name = stringcase.pascalcase(test_step.attribute) + self._cluster = cluster + self._endpoint = test_step.endpoint + self._node_id = test_step.node_id + self._cluster_object = None + self._request_object = None + + self._possibly_unsupported = bool(test_step.optional) + + self._cluster_object = context.data_model_lookup.get_cluster(self._cluster) + if self._cluster_object is None: + raise UnexpectedParsingError( + f'ReadAttribute failed to find cluster object:{self._cluster}') + + self._request_object = context.data_model_lookup.get_attribute( + self._cluster, self._attribute_name) + if self._request_object is None: + raise ParsingError( + f'ReadAttribute failed to find cluster:{self._cluster} ' + f'Attribute:{self._attribute_name}') + + if test_step.arguments: + raise UnexpectedParsingError( + f'ReadAttribute should not contain arguments. {self.label}') + + if self._request_object.attribute_type is None: + raise UnexpectedParsingError( + f'ReadAttribute doesnt have valid attribute_type. {self.label}') + + def run_action(self, dev_ctrl: ChipDeviceCtrl) -> _ActionResult: + try: + raw_resp = asyncio.run(dev_ctrl.ReadAttribute(self._node_id, + [(self._endpoint, self._request_object)])) + except chip.interaction_model.InteractionModelError as error: + return _ActionResult(status=_ActionStatus.ERROR, response=error) + + if self._possibly_unsupported and not raw_resp: + # We have found an unsupported attribute. TestStep provided did specify that it might be + # unsupported, so nothing left to validate. We just return a failure here. + return _ActionResult(status=_ActionStatus.ERROR, response=None) + + # TODO Currently there are no checks that this indexing won't fail. Need to add some + # initial validity checks. Coming soon in a future PR. + resp = raw_resp[self._endpoint][self._cluster_object][self._request_object] + + if isinstance(resp, ValueDecodeFailure): + # response.Reason is of type chip.interaction_model.Status. + return _ActionResult(status=_ActionStatus.ERROR, response=resp.Reason) + + # decode() is expecting to get a DataModelLookup Object type to grab certain attributes + # like cluster id. + return_val = self._request_object(resp) + return _ActionResult(status=_ActionStatus.SUCCESS, response=return_val) + + +class WriteAttributeAction(BaseAction): + '''Single write attribute action to be executed.''' + + def __init__(self, test_step, cluster: str, context: _ExecutionContext): + '''Converts 'test_step' to write attribute action that can execute with ChipDeviceCtrl. + + Args: + 'test_step': Step containing information required to run write attribute action. + 'cluster': Name of cluster write attribute action is targeting. + 'context': Contains test-wide common objects such as DataModelLookup instance. + Raises: + ParsingError: Raised if there is a benign error, and there is currently no + action to perform for this write attribute. + UnexpectedParsingError: Raised if there is an unexpected parsing error. + ''' + super().__init__(test_step.label) + self._attribute_name = stringcase.pascalcase(test_step.attribute) + self._cluster = cluster + self._endpoint = test_step.endpoint + self._node_id = test_step.node_id + self._request_object = None + + attribute = context.data_model_lookup.get_attribute( + self._cluster, self._attribute_name) + if attribute is None: + raise ParsingError( + f'WriteAttribute failed to find cluster:{self._cluster} ' + f'Attribute:{self._attribute_name}') + + if not test_step.arguments: + raise UnexpectedParsingError(f'WriteAttribute action does have arguments {self.label}') + + args = test_step.arguments['values'] + if len(args) != 1: + raise UnexpectedParsingError(f'WriteAttribute is trying to write multiple values') + request_data_as_dict = args[0] + try: + # TODO this is an ugly hack + request_data = Converter.convert_to_data_model_type( + request_data_as_dict['value'], attribute.attribute_type.Type) + except ValueError: + raise ParsingError('Could not covert yaml type') + + # Create a cluster object for the request from the provided YAML data. + self._request_object = attribute(request_data) + + def run_action(self, dev_ctrl: ChipDeviceCtrl) -> _ActionResult: + try: + resp = asyncio.run( + dev_ctrl.WriteAttribute(self._node_id, [(self._endpoint, self._request_object)])) + except chip.interaction_model.InteractionModelError: + # TODO Should we be doing the same thing as InvokeAction on InteractionModelError? + raise + if len(resp) == 1 and isinstance(resp[0], AttributeStatus): + if resp[0].Status == chip.interaction_model.Status.Success: + return _ActionResult(status=_ActionStatus.SUCCESS, response=None) + else: + return _ActionResult(status=_ActionStatus.ERROR, response=resp[0].Status) + + # We always expecte the response to be a list of length 1, for that reason we return error + # here. + return _ActionResult(status=_ActionStatus.ERROR, response=None) + + +class ReplTestRunner: + '''Test runner to encode/decode values from YAML test Parser for executing the TestStep. + + Uses ChipDeviceCtrl from chip-repl to execute parsed YAML TestSteps. + ''' + + def __init__(self, test_spec_definition, dev_ctrl): + self._test_spec_definition = test_spec_definition + self._dev_ctrl = dev_ctrl + self._context = _ExecutionContext(data_model_lookup=PreDefinedDataModelLookup()) + + def _invoke_action_factory(self, test_step, cluster: str): + '''Creates cluster invoke action command from TestStep. + + Args: + 'test_step': Step containing information required to run an invoke command action. + 'cluster': Name of cluster action is targeting. + Returns: + InvokeAction if 'test_step' is a valid action to be executed. + None if we were unable to use the provided 'test_step' for a known reason that is not + fatal to test execution. + ''' + try: + return InvokeAction(test_step, cluster, self._context) + except ParsingError: + return None + + def _attribute_read_action_factory(self, test_step, cluster: str): + '''Creates read attribute command TestStep. + + Args: + 'test_step': Step containing information required to run read attribute action. + 'cluster': Name of cluster read attribute action is targeting. + Returns: + ReadAttributeAction if 'test_step' is a valid read attribute to be executed. + None if we were unable to use the provided 'test_step' for a known reason that is not + fatal to test execution. + ''' + try: + return ReadAttributeAction(test_step, cluster, self._context) + except ParsingError: + return None + + def _attribute_write_action_factory(self, test_step, cluster: str): + '''Creates write attribute command TestStep. + + Args: + 'test_step': Step containing information required to run write attribute action. + 'cluster': Name of cluster write attribute action is targeting. + Returns: + WriteAttributeAction if 'test_step' is a valid write attribute to be executed. + None if we were unable to use the provided 'test_step' for a known reason that is not + fatal to test execution. + ''' + try: + return WriteAttributeAction(test_step, cluster, self._context) + except ParsingError: + return None + + def encode(self, request) -> BaseAction: + action = None + cluster = request.cluster.replace(' ', '').replace('/', '') + command = request.command + # Some of the tests contain 'cluster over-rides' that refer to a different + # cluster than that specified in 'config'. + if command == 'writeAttribute': + action = self._attribute_write_action_factory(request, cluster) + elif command == 'readAttribute': + action = self._attribute_read_action_factory(request, cluster) + elif command == 'readEvent': + action = self._event_read_action_factory(request, cluster) + else: + action = self._invoke_action_factory(request, cluster) + + if action is None: + logger.warn(f"Failed to parse {request.label}") + return action + + def decode(self, result: _ActionResult): + # If this is a generic response, there is nothing to do. + if result.response is None: + # TODO Once yamltest and idl python packages are properly packaged as a single module + # the type we are returning will be formalized. For now TestStep.post_process_response + # expects this particular case to be sent as a string. + return 'success' if result.status == _ActionStatus.SUCCESS else 'failure' + + response = result.response + + decoded_response = {} + if isinstance(response, chip.interaction_model.InteractionModelError): + decoded_response['error'] = stringcase.snakecase(response.status.name).upper() + return decoded_response + + if isinstance(response, chip.interaction_model.Status): + decoded_response['error'] = stringcase.snakecase(response.name).upper() + return decoded_response + + cluster_name = self._test_spec_definition.get_cluster_name(response.cluster_id) + decoded_response['clusterId'] = cluster_name + + if hasattr(response, 'command_id'): + decoded_response['command'] = self._test_spec_definition.get_response_name( + response.cluster_id, response.command_id) + response_definition = self._test_spec_definition.get_response_by_name( + cluster_name, decoded_response['command']) + decoded_response['value'] = Converter.from_data_model_to_test_definition( + self._test_spec_definition, cluster_name, response_definition.fields, response) + + if hasattr(response, 'attribute_id'): + decoded_response['attribute'] = self._test_spec_definition.get_attribute_name( + response.cluster_id, response.attribute_id) + attribute = self._test_spec_definition.get_attribute_by_name( + cluster_name, decoded_response['attribute']) + # TODO Once we fix the issue of not being able to find the global attribute properly + # we should be able to remove this if/else statement below. + if attribute is None: + # When we cannot find the attribute it is because it is a global attribute like + # FeatureMap. Fortunately for these types we can get away with using + # 'response.value' directly for the time being. + decoded_response['value'] = response.value + else: + decoded_response['value'] = Converter.from_data_model_to_test_definition( + self._test_spec_definition, cluster_name, attribute.definition, response.value) + + return decoded_response + + def execute(self, action: BaseAction): + return action.run_action(self._dev_ctrl) diff --git a/src/controller/python/chip/yaml/variable_storage.py b/src/controller/python/chip/yaml/variable_storage.py deleted file mode 100644 index d62a7dd82c0ced..00000000000000 --- a/src/controller/python/chip/yaml/variable_storage.py +++ /dev/null @@ -1,37 +0,0 @@ -# -# Copyright (c) 2022 Project CHIP Authors -# All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -class VariableStorage: - '''Stores key value pairs. - - This is a code readability convience object for saving/loading values. - ''' - - def __init__(self): - self._saved_list = {} - - def save(self, key, value): - self._saved_list[key] = value - - def load(self, key): - return self._saved_list.get(key) - - def is_key_saved(self, key) -> bool: - return key in self._saved_list - - def clear(self): - self._saved_list.clear() diff --git a/src/controller/python/test/unit_tests/test_yaml_format_converter.py b/src/controller/python/test/unit_tests/test_yaml_format_converter.py deleted file mode 100644 index 05f58d20b59418..00000000000000 --- a/src/controller/python/test/unit_tests/test_yaml_format_converter.py +++ /dev/null @@ -1,121 +0,0 @@ -# -# Copyright (c) 2022 Project CHIP Authors -# All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from chip.yaml.format_converter import convert_yaml_octet_string_to_bytes, substitute_in_config_variables -from binascii import unhexlify -import unittest - - -class TestOctetStringYamlDecode(unittest.TestCase): - def test_common_cases(self): - self.assertEqual(convert_yaml_octet_string_to_bytes("hex:aa55"), unhexlify("aa55")) - self.assertEqual(convert_yaml_octet_string_to_bytes("hex:"), unhexlify("")) - self.assertEqual(convert_yaml_octet_string_to_bytes("hex:AA55"), unhexlify("aa55")) - - self.assertEqual(convert_yaml_octet_string_to_bytes("0\xaa\x55"), unhexlify("30aa55")) - self.assertEqual(convert_yaml_octet_string_to_bytes("0\xAA\x55"), unhexlify("30aa55")) - self.assertEqual(convert_yaml_octet_string_to_bytes("0\xAa\x55"), unhexlify("30aa55")) - - self.assertEqual(convert_yaml_octet_string_to_bytes("0hex:"), b"0hex:") - self.assertEqual(convert_yaml_octet_string_to_bytes("0hex:A"), b"0hex:A") - self.assertEqual(convert_yaml_octet_string_to_bytes("0hex:AA55"), b"0hex:AA55") - - self.assertEqual(convert_yaml_octet_string_to_bytes("AA55"), b"AA55") - self.assertEqual(convert_yaml_octet_string_to_bytes("AA\n\r\t55"), unhexlify("41410a0d093535")) - # TODO(#23669): After utf8 is properly supported expected result is unhexlify("c3a9c3a90a0a") - self.assertEqual(convert_yaml_octet_string_to_bytes("\xC3\xA9é\n\n"), unhexlify("c3a9e90a0a")) - - # Partial hex nibble - with self.assertRaises(ValueError): - convert_yaml_octet_string_to_bytes("hex:aa5") - - -class TestSubstitueInConfigVariables(unittest.TestCase): - - def setUp(self): - self.common_config = { - 'arg1': { - 'defaultValue': 1 - }, - 'arg2': { - 'defaultValue': 2 - }, - 'no_explicit_default': 3 - } - - def test_basic_substitution(self): - self.assertEqual(substitute_in_config_variables('arg1', self.common_config), 1) - self.assertEqual(substitute_in_config_variables('arg2', self.common_config), 2) - self.assertEqual(substitute_in_config_variables('arg3', self.common_config), 'arg3') - self.assertEqual(substitute_in_config_variables('no_explicit_default', self.common_config), 3) - - def test_basis_dict_substitution(self): - basic_dict = { - 'arg1': 'arg1', - 'arg2': 'arg2', - 'arg3': 'arg3', - 'no_explicit_default': 'no_explicit_default', - } - expected_dict = { - 'arg1': 1, - 'arg2': 2, - 'arg3': 'arg3', - 'no_explicit_default': 3, - } - self.assertEqual(substitute_in_config_variables(basic_dict, self.common_config), expected_dict) - - def test_basis_list_substitution(self): - basic_list = ['arg1', 'arg2', 'arg3', 'no_explicit_default'] - expected_list = [1, 2, 'arg3', 3] - self.assertEqual(substitute_in_config_variables(basic_list, self.common_config), expected_list) - - def test_complex_nested_type(self): - complex_nested_type = { - 'arg1': ['arg1', 'arg2', 'arg3', 'no_explicit_default'], - 'arg2': 'arg22', - 'arg3': { - 'no_explicit_default': 'no_explicit_default', - 'arg2': 'arg2', - 'another_dict': { - 'arg1': ['arg1', 'arg1', 'arg1', 'no_explicit_default'], - }, - 'another_list': ['arg1', 'arg2', 'arg3', 'no_explicit_default'] - }, - 'no_explicit_default': 'no_explicit_default', - } - expected_result = { - 'arg1': [1, 2, 'arg3', 3], - 'arg2': 'arg22', - 'arg3': { - 'no_explicit_default': 3, - 'arg2': 2, - 'another_dict': { - 'arg1': [1, 1, 1, 3], - }, - 'another_list': [1, 2, 'arg3', 3] - }, - 'no_explicit_default': 3, - } - self.assertEqual(substitute_in_config_variables(complex_nested_type, self.common_config), expected_result) - - -def main(): - unittest.main() - - -if __name__ == "__main__": - main()