Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[matter_yamltests] Generate an error if a cluster definition exists b… #25109

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 64 additions & 48 deletions scripts/py_matter_yamltests/matter_yamltests/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,18 @@ def get_event_name(self, cluster_id: int, event_id: int) -> str:
event = self.__get_by_id(cluster_id, event_id, _ItemType.Event)
return event.name if event else None

def get_cluster_id_by_name(self, cluster_name: str) -> int:
if cluster_name is None:
return None

# The idl parser remove spaces
vivien-apple marked this conversation as resolved.
Show resolved Hide resolved
cluster_name = cluster_name.replace(' ', '')
return self.__clusters_by_name.get(cluster_name)

def has_cluster_by_name(self, cluster_name: str) -> bool:
cluster_id = self.get_cluster_id_by_name(cluster_name)
return cluster_id is not None

def get_command_by_name(self, cluster_name: str, command_name: str) -> Command:
return self.__get_by_name(cluster_name, command_name, _ItemType.Request)

Expand Down Expand Up @@ -149,6 +161,21 @@ def get_type_by_name(self, cluster_name: str, target_name: str):

return None

def get_command_names(self, cluster_name: str) -> list[str]:
vivien-apple marked this conversation as resolved.
Show resolved Hide resolved
targets = self.__get_targets_by_cluster_name(
cluster_name, _ItemType.Request)
return [] if targets is None else [name for name in targets]

def get_event_names(self, cluster_name: str) -> list[str]:
targets = self.__get_targets_by_cluster_name(
cluster_name, _ItemType.Event)
return [] if targets is None else [name for name in targets]

def get_attribute_names(self, cluster_name: str) -> list[str]:
targets = self.__get_targets_by_cluster_name(
cluster_name, _ItemType.Attribute)
return [] if targets is None else [name for name in targets]

def is_fabric_scoped(self, target) -> bool:
if isinstance(target, Event):
return bool(target.is_fabric_sensitive)
Expand All @@ -164,82 +191,71 @@ def is_nullable(self, target) -> bool:
return False

def __get_by_name(self, cluster_name: str, target_name: str, target_type: _ItemType):
if not cluster_name or not target_name:
if target_name is None:
return None

# The idl parser remove spaces
cluster_name = cluster_name.replace(' ', '')

cluster_id = self.__clusters_by_name.get(cluster_name)
cluster_id = self.get_cluster_id_by_name(cluster_name)
if cluster_id is None:
return None

targets = self.__get_targets_by_cluster_name(cluster_name, target_type)
if targets is None:
return None

target = None

if target_type == _ItemType.Request:
self.__enforce_casing(
target_name, self.__commands_by_name.get(cluster_name))
target_id = self.__commands_by_name.get(
cluster_name).get(target_name)
target_id = targets.get(target_name)
target = self.__get_by_id(cluster_id, target_id, target_type)
elif target_type == _ItemType.Response:
self.__enforce_casing(
target_name, self.__responses_by_name.get(cluster_name))
target_id = self.__responses_by_name.get(
cluster_name).get(target_name)
target_id = targets.get(target_name)
target = self.__get_by_id(cluster_id, target_id, target_type)
elif target_type == _ItemType.Event:
self.__enforce_casing(
target_name, self.__events_by_name.get(cluster_name))
target_id = self.__events_by_name.get(
cluster_name).get(target_name)
target_id = targets.get(target_name)
target = self.__get_by_id(cluster_id, target_id, target_type)
elif target_type == _ItemType.Attribute:
self.__enforce_casing(
target_name, self.__attributes_by_name.get(cluster_name))
target_id = self.__attributes_by_name.get(
cluster_name).get(target_name)
target_id = targets.get(target_name)
target = self.__get_by_id(cluster_id, target_id, target_type)
elif target_type == _ItemType.Bitmap:
self.__enforce_casing(
target_name, self.__bitmaps_by_name.get(cluster_name))
target = self.__bitmaps_by_name.get(cluster_name).get(target_name)
target = targets.get(target_name)
elif target_type == _ItemType.Enum:
self.__enforce_casing(
target_name, self.__enums_by_name.get(cluster_name))
target = self.__enums_by_name.get(cluster_name).get(target_name)
target = targets.get(target_name)
elif target_type == _ItemType.Struct:
self.__enforce_casing(
target_name, self.__structs_by_name.get(cluster_name))
target = self.__structs_by_name.get(cluster_name).get(target_name)
target = targets.get(target_name)

return target

def __get_by_id(self, cluster_id: int, target_id: int, target_type: str):
targets = None

if target_type == _ItemType.Request:
targets = self.__commands_by_id.get(cluster_id)
elif target_type == _ItemType.Response:
targets = self.__responses_by_id.get(cluster_id)
elif target_type == _ItemType.Event:
targets = self.__events_by_id.get(cluster_id)
elif target_type == _ItemType.Attribute:
targets = self.__attributes_by_id.get(cluster_id)

target_mapping = {
_ItemType.Request: self.__commands_by_id,
_ItemType.Response: self.__responses_by_id,
_ItemType.Attribute: self.__attributes_by_id,
_ItemType.Event: self.__events_by_id,
}

targets = target_mapping[target_type].get(cluster_id)
if targets is None:
return None

return targets.get(target_id)

def __enforce_casing(self, target_name: str, targets: list):
if targets.get(target_name) is not None:
return
def __get_targets_by_cluster_name(self, cluster_name: str, target_type: _ItemType):
if not cluster_name:
return None

target_mapping = {
_ItemType.Request: self.__commands_by_name,
_ItemType.Response: self.__responses_by_name,
_ItemType.Attribute: self.__attributes_by_name,
_ItemType.Event: self.__events_by_name,
_ItemType.Bitmap: self.__bitmaps_by_name,
_ItemType.Enum: self.__enums_by_name,
_ItemType.Struct: self.__structs_by_name,
}

for name in targets:
if name.lower() == target_name.lower():
raise KeyError(
f'Unknown target {target_name}. Did you mean {name} ?')
# The idl parser remove spaces
vivien-apple marked this conversation as resolved.
Show resolved Hide resolved
cluster_name = cluster_name.replace(' ', '')
return target_mapping[target_type].get(cluster_name)


def SpecDefinitionsFromPaths(paths: str, pseudo_clusters: Optional[PseudoClusters] = PseudoClusters([])):
Expand Down
161 changes: 124 additions & 37 deletions scripts/py_matter_yamltests/matter_yamltests/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,45 @@
from .yaml_loader import YamlLoader


class UnknownPathQualifierError(TestStepError):
"""Raise when an attribute/command/event name is not found in the definitions."""

def __init__(self, content, target_type, target_name, candidate_names=[]):
if candidate_names:
message = f'Unknown {target_type}: "{target_name}". Candidates are: "{candidate_names}"'

for candidate_name in candidate_names:
if candidate_name.lower() == target_name.lower():
message = f'Unknown {target_type}: "{target_name}". Did you mean "{candidate_name}" ?'
break
else:
message = f'The cluster does not have any {target_type}s.'

super().__init__(message)
self.tag_key_with_error(content, target_type)


class TestStepAttributeKeyError(UnknownPathQualifierError):
"""Raise when an attribute name is not found in the definitions."""

def __init__(self, content, target_name, candidate_names=[]):
super().__init__(content, 'attribute', target_name, candidate_names)


class TestStepCommandKeyError(UnknownPathQualifierError):
"""Raise when a command name is not found in the definitions."""

def __init__(self, content, target_name, candidate_names=[]):
super().__init__(content, 'command', target_name, candidate_names)


class TestStepEventKeyError(UnknownPathQualifierError):
"""Raise when an event name is not found in the definitions."""

def __init__(self, content, target_name, candidate_names=[]):
super().__init__(content, 'event', target_name, candidate_names)


class PostProcessCheckStatus(Enum):
'''Indicates the post processing check step status.'''
SUCCESS = 'success',
Expand Down Expand Up @@ -175,42 +214,7 @@ def __init__(self, test: dict, config: dict, definitions: SpecDefinitions, pics_
self._convert_single_value_to_values(response)
self.responses_with_placeholders = responses

argument_mapping = None
response_mapping = None
response_mapping_name = None

if definitions is not None:
if self.is_attribute:
attribute = definitions.get_attribute_by_name(
self.cluster, self.attribute)
if attribute:
attribute_mapping = self._as_mapping(definitions, self.cluster,
attribute.definition.data_type.name)
argument_mapping = attribute_mapping
response_mapping = attribute_mapping
response_mapping_name = attribute.definition.data_type.name
elif self.is_event:
event = definitions.get_event_by_name(
self.cluster, self.event)
if event:
event_mapping = self._as_mapping(definitions, self.cluster,
event.name)
argument_mapping = event_mapping
response_mapping = event_mapping
response_mapping_name = event.name
else:
command = definitions.get_command_by_name(
self.cluster, self.command)
if command:
argument_mapping = self._as_mapping(
definitions, self.cluster, command.input_param)
response_mapping = self._as_mapping(
definitions, self.cluster, command.output_param)
response_mapping_name = command.output_param

self.argument_mapping = argument_mapping
self.response_mapping = response_mapping
self.response_mapping_name = response_mapping_name
self._update_mappings(test, definitions)
self.update_arguments(self.arguments_with_placeholders)
self.update_responses(self.responses_with_placeholders)

Expand All @@ -225,6 +229,89 @@ def __init__(self, test: dict, config: dict, definitions: SpecDefinitions, pics_
continue
get_constraints(value['constraints'])

def _update_mappings(self, test: dict, definitions: SpecDefinitions):
cluster_name = self.cluster
if definitions is None or not definitions.has_cluster_by_name(cluster_name):
self.argument_mapping = None
self.response_mapping = None
self.response_mapping_name = None
return

argument_mapping = None
vivien-apple marked this conversation as resolved.
Show resolved Hide resolved
response_mapping = None
response_mapping_name = None

if self.is_attribute:
attribute_name = self.attribute
attribute = definitions.get_attribute_by_name(
cluster_name,
attribute_name
)

if not attribute:
targets = definitions.get_attribute_names(cluster_name)
raise TestStepAttributeKeyError(test, attribute_name, targets)

attribute_mapping = self._as_mapping(
definitions,
cluster_name,
attribute.definition.data_type.name
)

argument_mapping = attribute_mapping
response_mapping = attribute_mapping
response_mapping_name = attribute.definition.data_type.name
elif self.is_event:
event_name = self.event
event = definitions.get_event_by_name(
cluster_name,
event_name
)

if not event:
targets = definitions.get_event_names(cluster_name)
raise TestStepEventKeyError(test, event_name, targets)

event_mapping = self._as_mapping(
definitions,
cluster_name,
event_name
)

argument_mapping = event_mapping
response_mapping = event_mapping
response_mapping_name = event.name
else:
command_name = self.command
command = definitions.get_command_by_name(
cluster_name,
command_name
)

if not command:
targets = definitions.get_command_names(cluster_name)
raise TestStepCommandKeyError(test, command_name, targets)

if command.input_param is None:
argument_mapping = {}
else:
argument_mapping = self._as_mapping(
definitions,
cluster_name,
command.input_param
)

response_mapping = self._as_mapping(
definitions,
cluster_name,
command.output_param
)
response_mapping_name = command.output_param

self.argument_mapping = argument_mapping
self.response_mapping = response_mapping
self.response_mapping_name = response_mapping_name

def _convert_single_value_to_values(self, container):
if container is None or 'values' in container:
return
Expand Down Expand Up @@ -272,7 +359,7 @@ def update_responses(self, responses_with_placeholders):
response, self.response_mapping)

def _update_with_definition(self, container: dict, mapping_type):
if not container or not mapping_type:
if not container or mapping_type is None:
return

values = container['values']
Expand Down
Loading