Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[matter_yamltests] Add wait keyword supports #24592

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 65 additions & 8 deletions scripts/py_matter_yamltests/matter_yamltests/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.

import copy
from enum import Enum
from enum import Enum, auto

import yaml

Expand Down Expand Up @@ -49,6 +49,7 @@
'maxInterval',
'timedInteractionTimeoutMs',
'busyWaitMs',
'wait',
]

_TEST_ARGUMENTS_SECTION = [
Expand Down Expand Up @@ -90,11 +91,12 @@ class PostProcessCheckStatus(Enum):

class PostProcessCheckType(Enum):
'''Indicates the post processing check step type.'''
IM_STATUS = 'IMStatus',
CLUSTER_STATUS = 'ClusterStatus',
RESPONSE_VALIDATION = 'Response',
CONSTRAINT_VALIDATION = 'Constraints',
SAVE_AS_VARIABLE = 'SaveAs'
IM_STATUS = auto()
CLUSTER_STATUS = auto()
RESPONSE_VALIDATION = auto()
CONSTRAINT_VALIDATION = auto()
SAVE_AS_VARIABLE = auto()
WAIT_VALIDATION = auto()


class PostProcessCheck:
Expand Down Expand Up @@ -211,9 +213,10 @@ def __init__(self, test: dict, config: dict, definitions: SpecDefinitions, pics_
self.timed_interaction_timeout_ms = _value_or_none(
test, 'timedInteractionTimeoutMs')
self.busy_wait_ms = _value_or_none(test, 'busyWaitMs')
self.wait_for = _value_or_none(test, 'wait')

self.is_attribute = self.command in _ATTRIBUTE_COMMANDS
self.is_event = self.command in _EVENT_COMMANDS
self.is_attribute = self.command in _ATTRIBUTE_COMMANDS or self.wait_for in _ATTRIBUTE_COMMANDS
self.is_event = self.command in _EVENT_COMMANDS or self.wait_for in _EVENT_COMMANDS

self.arguments_with_placeholders = _value_or_none(test, 'arguments')
self.response_with_placeholders = _value_or_none(test, 'response')
Expand Down Expand Up @@ -466,9 +469,17 @@ def timed_interaction_timeout_ms(self):
def busy_wait_ms(self):
return self._test.busy_wait_ms

@property
def wait_for(self):
return self._test.wait_for

def post_process_response(self, response: dict):
result = PostProcessResponseResult()

if self.wait_for is not None:
self._response_cluster_wait_validation(response, result)
return result

if self._skip_post_processing(response, result):
return result

Expand All @@ -481,6 +492,52 @@ def post_process_response(self, response: dict):

return result

def _response_cluster_wait_validation(self, response, result):
vivien-apple marked this conversation as resolved.
Show resolved Hide resolved
"""Check if the response concrete path matches the configuration of the test step
and validate that the response type (e.g readAttribute/writeAttribute/...) matches
the expectation from the test step."""
check_type = PostProcessCheckType.WAIT_VALIDATION
error_success = 'The test expectation "{wait_for}" for "{cluster}.{wait_type}" on endpoint {endpoint} is true'
error_failure = 'The test expectation "{expected} == {received}" is false'

if self.is_attribute:
expected_wait_type = self.attribute
received_wait_type = response.get('attribute')
elif self.is_event:
expected_wait_type = self.event
received_wait_type = response.get('event')
else:
expected_wait_type = self.command
received_wait_type = response.get('command')

expected_values = [
self.wait_for,
self.endpoint,
# TODO The name in tests does not always use spaces
self.cluster.replace(' ', ''),
expected_wait_type
]

received_values = [
response.get('wait_for'),
response.get('endpoint'),
response.get('cluster'),
received_wait_type
]

success = True
for expected_value in expected_values:
received_value = received_values.pop(0)

if expected_value != received_value:
result.error(check_type, error_failure.format(
expected=expected_value, received=received_value))
success = False

if success:
result.success(check_type, error_success.format(
wait_for=self.wait_for, cluster=self.cluster, wait_type=expected_wait_type, endpoint=self.endpoint))

def _skip_post_processing(self, response: dict, result) -> bool:
'''Should we skip perform post processing.

Expand Down