Skip to content

Commit

Permalink
[matter_yamltests] Generates an error if a cluster definition exists …
Browse files Browse the repository at this point in the history
…but the attribute/event/command can not be found (#25109)
  • Loading branch information
vivien-apple authored and pull[bot] committed Feb 9, 2024
1 parent e3b3034 commit 1218f47
Show file tree
Hide file tree
Showing 3 changed files with 240 additions and 101 deletions.
112 changes: 64 additions & 48 deletions scripts/py_matter_yamltests/matter_yamltests/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,18 @@ def get_event_name(self, cluster_id: int, event_id: int) -> str:
event = self.__get_by_id(cluster_id, event_id, _ItemType.Event)
return event.name if event else None

def get_cluster_id_by_name(self, cluster_name: str) -> int:
if cluster_name is None:
return None

# The idl parser remove spaces
cluster_name = cluster_name.replace(' ', '')
return self.__clusters_by_name.get(cluster_name)

def has_cluster_by_name(self, cluster_name: str) -> bool:
cluster_id = self.get_cluster_id_by_name(cluster_name)
return cluster_id is not None

def get_command_by_name(self, cluster_name: str, command_name: str) -> Command:
return self.__get_by_name(cluster_name, command_name, _ItemType.Request)

Expand Down Expand Up @@ -149,6 +161,21 @@ def get_type_by_name(self, cluster_name: str, target_name: str):

return None

def get_command_names(self, cluster_name: str) -> list[str]:
targets = self.__get_targets_by_cluster_name(
cluster_name, _ItemType.Request)
return [] if targets is None else [name for name in targets]

def get_event_names(self, cluster_name: str) -> list[str]:
targets = self.__get_targets_by_cluster_name(
cluster_name, _ItemType.Event)
return [] if targets is None else [name for name in targets]

def get_attribute_names(self, cluster_name: str) -> list[str]:
targets = self.__get_targets_by_cluster_name(
cluster_name, _ItemType.Attribute)
return [] if targets is None else [name for name in targets]

def is_fabric_scoped(self, target) -> bool:
if isinstance(target, Event):
return bool(target.is_fabric_sensitive)
Expand All @@ -164,82 +191,71 @@ def is_nullable(self, target) -> bool:
return False

def __get_by_name(self, cluster_name: str, target_name: str, target_type: _ItemType):
if not cluster_name or not target_name:
if target_name is None:
return None

# The idl parser remove spaces
cluster_name = cluster_name.replace(' ', '')

cluster_id = self.__clusters_by_name.get(cluster_name)
cluster_id = self.get_cluster_id_by_name(cluster_name)
if cluster_id is None:
return None

targets = self.__get_targets_by_cluster_name(cluster_name, target_type)
if targets is None:
return None

target = None

if target_type == _ItemType.Request:
self.__enforce_casing(
target_name, self.__commands_by_name.get(cluster_name))
target_id = self.__commands_by_name.get(
cluster_name).get(target_name)
target_id = targets.get(target_name)
target = self.__get_by_id(cluster_id, target_id, target_type)
elif target_type == _ItemType.Response:
self.__enforce_casing(
target_name, self.__responses_by_name.get(cluster_name))
target_id = self.__responses_by_name.get(
cluster_name).get(target_name)
target_id = targets.get(target_name)
target = self.__get_by_id(cluster_id, target_id, target_type)
elif target_type == _ItemType.Event:
self.__enforce_casing(
target_name, self.__events_by_name.get(cluster_name))
target_id = self.__events_by_name.get(
cluster_name).get(target_name)
target_id = targets.get(target_name)
target = self.__get_by_id(cluster_id, target_id, target_type)
elif target_type == _ItemType.Attribute:
self.__enforce_casing(
target_name, self.__attributes_by_name.get(cluster_name))
target_id = self.__attributes_by_name.get(
cluster_name).get(target_name)
target_id = targets.get(target_name)
target = self.__get_by_id(cluster_id, target_id, target_type)
elif target_type == _ItemType.Bitmap:
self.__enforce_casing(
target_name, self.__bitmaps_by_name.get(cluster_name))
target = self.__bitmaps_by_name.get(cluster_name).get(target_name)
target = targets.get(target_name)
elif target_type == _ItemType.Enum:
self.__enforce_casing(
target_name, self.__enums_by_name.get(cluster_name))
target = self.__enums_by_name.get(cluster_name).get(target_name)
target = targets.get(target_name)
elif target_type == _ItemType.Struct:
self.__enforce_casing(
target_name, self.__structs_by_name.get(cluster_name))
target = self.__structs_by_name.get(cluster_name).get(target_name)
target = targets.get(target_name)

return target

def __get_by_id(self, cluster_id: int, target_id: int, target_type: str):
targets = None

if target_type == _ItemType.Request:
targets = self.__commands_by_id.get(cluster_id)
elif target_type == _ItemType.Response:
targets = self.__responses_by_id.get(cluster_id)
elif target_type == _ItemType.Event:
targets = self.__events_by_id.get(cluster_id)
elif target_type == _ItemType.Attribute:
targets = self.__attributes_by_id.get(cluster_id)

target_mapping = {
_ItemType.Request: self.__commands_by_id,
_ItemType.Response: self.__responses_by_id,
_ItemType.Attribute: self.__attributes_by_id,
_ItemType.Event: self.__events_by_id,
}

targets = target_mapping[target_type].get(cluster_id)
if targets is None:
return None

return targets.get(target_id)

def __enforce_casing(self, target_name: str, targets: list):
if targets.get(target_name) is not None:
return
def __get_targets_by_cluster_name(self, cluster_name: str, target_type: _ItemType):
if not cluster_name:
return None

target_mapping = {
_ItemType.Request: self.__commands_by_name,
_ItemType.Response: self.__responses_by_name,
_ItemType.Attribute: self.__attributes_by_name,
_ItemType.Event: self.__events_by_name,
_ItemType.Bitmap: self.__bitmaps_by_name,
_ItemType.Enum: self.__enums_by_name,
_ItemType.Struct: self.__structs_by_name,
}

for name in targets:
if name.lower() == target_name.lower():
raise KeyError(
f'Unknown target {target_name}. Did you mean {name} ?')
# The idl parser remove spaces
cluster_name = cluster_name.replace(' ', '')
return target_mapping[target_type].get(cluster_name)


def SpecDefinitionsFromPaths(paths: str, pseudo_clusters: Optional[PseudoClusters] = PseudoClusters([])):
Expand Down
161 changes: 124 additions & 37 deletions scripts/py_matter_yamltests/matter_yamltests/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,45 @@
from .yaml_loader import YamlLoader


class UnknownPathQualifierError(TestStepError):
"""Raise when an attribute/command/event name is not found in the definitions."""

def __init__(self, content, target_type, target_name, candidate_names=[]):
if candidate_names:
message = f'Unknown {target_type}: "{target_name}". Candidates are: "{candidate_names}"'

for candidate_name in candidate_names:
if candidate_name.lower() == target_name.lower():
message = f'Unknown {target_type}: "{target_name}". Did you mean "{candidate_name}" ?'
break
else:
message = f'The cluster does not have any {target_type}s.'

super().__init__(message)
self.tag_key_with_error(content, target_type)


class TestStepAttributeKeyError(UnknownPathQualifierError):
"""Raise when an attribute name is not found in the definitions."""

def __init__(self, content, target_name, candidate_names=[]):
super().__init__(content, 'attribute', target_name, candidate_names)


class TestStepCommandKeyError(UnknownPathQualifierError):
"""Raise when a command name is not found in the definitions."""

def __init__(self, content, target_name, candidate_names=[]):
super().__init__(content, 'command', target_name, candidate_names)


class TestStepEventKeyError(UnknownPathQualifierError):
"""Raise when an event name is not found in the definitions."""

def __init__(self, content, target_name, candidate_names=[]):
super().__init__(content, 'event', target_name, candidate_names)


class PostProcessCheckStatus(Enum):
'''Indicates the post processing check step status.'''
SUCCESS = 'success',
Expand Down Expand Up @@ -177,42 +216,7 @@ def __init__(self, test: dict, config: dict, definitions: SpecDefinitions, pics_
self._convert_single_value_to_values(response)
self.responses_with_placeholders = responses

argument_mapping = None
response_mapping = None
response_mapping_name = None

if definitions is not None:
if self.is_attribute:
attribute = definitions.get_attribute_by_name(
self.cluster, self.attribute)
if attribute:
attribute_mapping = self._as_mapping(definitions, self.cluster,
attribute.definition.data_type.name)
argument_mapping = attribute_mapping
response_mapping = attribute_mapping
response_mapping_name = attribute.definition.data_type.name
elif self.is_event:
event = definitions.get_event_by_name(
self.cluster, self.event)
if event:
event_mapping = self._as_mapping(definitions, self.cluster,
event.name)
argument_mapping = event_mapping
response_mapping = event_mapping
response_mapping_name = event.name
else:
command = definitions.get_command_by_name(
self.cluster, self.command)
if command:
argument_mapping = self._as_mapping(
definitions, self.cluster, command.input_param)
response_mapping = self._as_mapping(
definitions, self.cluster, command.output_param)
response_mapping_name = command.output_param

self.argument_mapping = argument_mapping
self.response_mapping = response_mapping
self.response_mapping_name = response_mapping_name
self._update_mappings(test, definitions)
self.update_arguments(self.arguments_with_placeholders)
self.update_responses(self.responses_with_placeholders)

Expand All @@ -227,6 +231,89 @@ def __init__(self, test: dict, config: dict, definitions: SpecDefinitions, pics_
continue
get_constraints(value['constraints'])

def _update_mappings(self, test: dict, definitions: SpecDefinitions):
cluster_name = self.cluster
if definitions is None or not definitions.has_cluster_by_name(cluster_name):
self.argument_mapping = None
self.response_mapping = None
self.response_mapping_name = None
return

argument_mapping = None
response_mapping = None
response_mapping_name = None

if self.is_attribute:
attribute_name = self.attribute
attribute = definitions.get_attribute_by_name(
cluster_name,
attribute_name
)

if not attribute:
targets = definitions.get_attribute_names(cluster_name)
raise TestStepAttributeKeyError(test, attribute_name, targets)

attribute_mapping = self._as_mapping(
definitions,
cluster_name,
attribute.definition.data_type.name
)

argument_mapping = attribute_mapping
response_mapping = attribute_mapping
response_mapping_name = attribute.definition.data_type.name
elif self.is_event:
event_name = self.event
event = definitions.get_event_by_name(
cluster_name,
event_name
)

if not event:
targets = definitions.get_event_names(cluster_name)
raise TestStepEventKeyError(test, event_name, targets)

event_mapping = self._as_mapping(
definitions,
cluster_name,
event_name
)

argument_mapping = event_mapping
response_mapping = event_mapping
response_mapping_name = event.name
else:
command_name = self.command
command = definitions.get_command_by_name(
cluster_name,
command_name
)

if not command:
targets = definitions.get_command_names(cluster_name)
raise TestStepCommandKeyError(test, command_name, targets)

if command.input_param is None:
argument_mapping = {}
else:
argument_mapping = self._as_mapping(
definitions,
cluster_name,
command.input_param
)

response_mapping = self._as_mapping(
definitions,
cluster_name,
command.output_param
)
response_mapping_name = command.output_param

self.argument_mapping = argument_mapping
self.response_mapping = response_mapping
self.response_mapping_name = response_mapping_name

def _convert_single_value_to_values(self, container):
if container is None or 'values' in container:
return
Expand Down Expand Up @@ -274,7 +361,7 @@ def update_responses(self, responses_with_placeholders):
response, self.response_mapping)

def _update_with_definition(self, container: dict, mapping_type):
if not container or not mapping_type:
if not container or mapping_type is None:
return

values = container['values']
Expand Down
Loading

0 comments on commit 1218f47

Please sign in to comment.