From 5654f85b40c2e2891b93de65d5b1db2a7e4a131e Mon Sep 17 00:00:00 2001 From: C Freeman Date: Fri, 26 Jul 2024 10:35:25 -0400 Subject: [PATCH] TC-IDM-10.5: Device type conformance - Add (#34424) * TC-IDM-10.5: Device type conformance - Add Initial test only looks at clusters. Remaining - revisions - feature conformance - cluster elements * Restyled by autopep8 * Restyled by isort * Add OTA requestor device type to door lock Then I can use it in the example. * add test to CI * linter * OTA requestor isn't hooked up, remove Let's go the other way and remove the cluster and the device type. * Revert "OTA requestor isn't hooked up, remove" This reverts commit b4bde38166ceff777ba22ebcd126692e00f5d56f. * Revert "Add OTA requestor device type to door lock" This reverts commit 51edb7925df17448085feb2adbc5bb9596430548. * Remove from CI until door lock OTA is sorted. --------- Co-authored-by: Restyled.io --- src/python_testing/TC_DeviceConformance.py | 99 +++++++++++- .../TestSpecParsingDeviceType.py | 149 +++++++++++++++++- src/python_testing/conformance_support.py | 4 +- 3 files changed, 242 insertions(+), 10 deletions(-) diff --git a/src/python_testing/TC_DeviceConformance.py b/src/python_testing/TC_DeviceConformance.py index c7fa19c45cec2c..c64a3470d19ed1 100644 --- a/src/python_testing/TC_DeviceConformance.py +++ b/src/python_testing/TC_DeviceConformance.py @@ -27,6 +27,7 @@ # test-runner-run/run1/script-args: --storage-path admin_storage.json --manual-code 10054912339 --bool-arg ignore_in_progress:True allow_provisional:True --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:${TRACE_TEST_JSON}.json --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto --tests test_TC_IDM_10_2 # === END CI TEST ARGUMENTS === +# TODO: Enable 10.5 in CI once the door lock OTA requestor problem is sorted. from typing import Callable import chip.clusters as Clusters @@ -35,16 +36,19 @@ from choice_conformance_support import (evaluate_attribute_choice_conformance, evaluate_command_choice_conformance, evaluate_feature_choice_conformance) from conformance_support import ConformanceDecision, conformance_allowed -from global_attribute_ids import GlobalAttributeIds -from matter_testing_support import (AttributePathLocation, ClusterPathLocation, CommandPathLocation, MatterBaseTest, ProblemNotice, - ProblemSeverity, async_test_body, default_matter_test_main) -from spec_parsing_support import CommandType, build_xml_clusters +from global_attribute_ids import (ClusterIdType, DeviceTypeIdType, GlobalAttributeIds, cluster_id_type, device_type_id_type, + is_valid_device_type_id) +from matter_testing_support import (AttributePathLocation, ClusterPathLocation, CommandPathLocation, DeviceTypePathLocation, + MatterBaseTest, ProblemNotice, ProblemSeverity, async_test_body, default_matter_test_main) +from spec_parsing_support import CommandType, build_xml_clusters, build_xml_device_types class DeviceConformanceTests(BasicCompositionTests): async def setup_class_helper(self): await super().setup_class_helper() self.xml_clusters, self.problems = build_xml_clusters() + self.xml_device_types, problems = build_xml_device_types() + self.problems.extend(problems) def check_conformance(self, ignore_in_progress: bool, is_ci: bool): problems = [] @@ -245,6 +249,86 @@ def record_warning(location, problem): return success, problems + def check_device_type(self, fail_on_extra_clusters: bool = True, allow_provisional: bool = False) -> tuple[bool, list[ProblemNotice]]: + success = True + problems = [] + + def record_problem(location, problem, severity): + problems.append(ProblemNotice("IDM-10.5", location, severity, problem, "")) + + def record_error(location, problem): + nonlocal success + record_problem(location, problem, ProblemSeverity.ERROR) + success = False + + def record_warning(location, problem): + record_problem(location, problem, ProblemSeverity.WARNING) + + for endpoint_id, endpoint in self.endpoints.items(): + if Clusters.Descriptor not in endpoint: + location = ClusterPathLocation(endpoint_id=endpoint_id, cluster_id=Clusters.Descriptor.id) + record_error(location=location, problem='No descriptor cluster found on endpoint') + continue + + device_type_list = endpoint[Clusters.Descriptor][Clusters.Descriptor.Attributes.DeviceTypeList] + invalid_device_types = [x for x in device_type_list if not is_valid_device_type_id(device_type_id_type(x.deviceType))] + standard_device_types = [x for x in endpoint[Clusters.Descriptor] + [Clusters.Descriptor.Attributes.DeviceTypeList] if device_type_id_type(x.deviceType) == DeviceTypeIdType.kStandard] + endpoint_clusters = [] + server_clusters = [] + for device_type in invalid_device_types: + location = DeviceTypePathLocation(device_type_id=device_type.deviceType) + record_error(location=location, problem='Invalid device type ID (out of valid range)') + + for device_type in standard_device_types: + device_type_id = device_type.deviceType + location = DeviceTypePathLocation(device_type_id=device_type_id) + if device_type_id not in self.xml_device_types.keys(): + record_error(location=location, problem='Unknown device type ID in standard range') + continue + + if device_type_id not in self.xml_device_types.keys(): + location = DeviceTypePathLocation(device_type_id=device_type_id) + record_error(location=location, problem='Unknown device type') + continue + + # TODO: check revision. Possibly in another test? + + xml_device = self.xml_device_types[device_type_id] + # IDM 10.1 checks individual clusters for validity, + # so here we can ignore checks for invalid and manufacturer clusters. + server_clusters = [x for x in endpoint[Clusters.Descriptor] + [Clusters.Descriptor.Attributes.ServerList] if cluster_id_type(x) == ClusterIdType.kStandard] + + # As a start, we are only checking server clusters + # TODO: check client clusters too? + for cluster_id, cluster_requirement in xml_device.server_clusters.items(): + # Device type cluster conformances do not include any conformances based on cluster elements + conformance_decision_with_choice = cluster_requirement.conformance(0, [], []) + location = DeviceTypePathLocation(device_type_id=device_type_id, cluster_id=cluster_id) + if conformance_decision_with_choice.decision == ConformanceDecision.MANDATORY and cluster_id not in server_clusters: + record_error(location=location, + problem=f"Mandatory cluster {cluster_requirement.name} for device type {xml_device.name} is not present in the server list") + success = False + + if cluster_id in server_clusters and not conformance_allowed(conformance_decision_with_choice, allow_provisional): + record_error(location=location, + problem=f"Disallowed cluster {cluster_requirement.name} found in server list for device type {xml_device.name}") + success = False + # If we want to check for extra clusters on the endpoint, we need to know the entire set of clusters in all the device type + # lists across all the device types on the endpoint. + endpoint_clusters += xml_device.server_clusters.keys() + if fail_on_extra_clusters: + fn = record_error + else: + fn = record_warning + extra_clusters = set(server_clusters) - set(endpoint_clusters) + for extra in extra_clusters: + location = ClusterPathLocation(endpoint_id=endpoint_id, cluster_id=extra) + fn(location=location, problem=f"Extra cluster found on endpoint with device types {device_type_list}") + + return success, problems + class TC_DeviceConformance(MatterBaseTest, DeviceConformanceTests): @async_test_body @@ -267,6 +351,13 @@ def test_TC_IDM_10_3(self): if not success: self.fail_current_test("Problems with cluster revision on at least one cluster") + def test_TC_IDM_10_5(self): + fail_on_extra_clusters = self.user_params.get("fail_on_extra_clusters", True) + success, problems = self.check_device_type(fail_on_extra_clusters) + self.problems.extend(problems) + if not success: + self.fail_current_test("Problems with Device type conformance on one or more endpoints") + if __name__ == "__main__": default_matter_test_main() diff --git a/src/python_testing/TestSpecParsingDeviceType.py b/src/python_testing/TestSpecParsingDeviceType.py index bc199872f8f178..7729ccee8af24d 100644 --- a/src/python_testing/TestSpecParsingDeviceType.py +++ b/src/python_testing/TestSpecParsingDeviceType.py @@ -16,22 +16,27 @@ # import xml.etree.ElementTree as ElementTree +import chip.clusters as Clusters +from chip.clusters import Attribute +from chip.tlv import uint +from conformance_support import conformance_allowed from jinja2 import Template from matter_testing_support import MatterBaseTest, default_matter_test_main from mobly import asserts -from spec_parsing_support import build_xml_device_types, parse_single_device_type +from spec_parsing_support import build_xml_clusters, build_xml_device_types, parse_single_device_type +from TC_DeviceConformance import DeviceConformanceTests class TestSpecParsingDeviceType(MatterBaseTest): - # This just tests that the current spec can be parsed without failures def test_spec_device_parsing(self): - device_types, problems = build_xml_device_types() - self.problems += problems - for id, d in device_types.items(): + for id, d in self.xml_device_types.items(): print(str(d)) def setup_class(self): + self.xml_clusters, self.xml_cluster_problems = build_xml_clusters() + self.xml_device_types, self.xml_device_types_problems = build_xml_device_types() + self.device_type_id = 0xBBEF self.revision = 2 self.classification_class = "simple" @@ -106,6 +111,140 @@ def test_bad_scope(self): device_type, problems = parse_single_device_type(et) asserts.assert_equal(len(problems), 1, "Device with no scope did not generate a problem notice") + # All these tests are based on the temp sensor device type because it is very simple + # it requires temperature measurement, identify and the base devices. + # Right now I'm not testing for binding condition. + # The test is entirely based on the descriptor cluster so that's all I'm populating here + # because it makes the test less complex to write. + def create_test(self, server_list: list[uint], no_descriptor: bool = False, bad_device_id: bool = False) -> DeviceConformanceTests: + self.test = DeviceConformanceTests() + self.test.xml_device_types = self.xml_device_types + self.test.xml_clusters = self.xml_clusters + + if bad_device_id: + known_ids = list(self.test.xml_device_types.keys()) + device_type_id = [a for a in range(min(known_ids), max(known_ids)) if a not in known_ids][0] + else: + device_type_id = 0x0302 + + resp = Attribute.AsyncReadTransaction.ReadResponse({}, [], {}) + if no_descriptor: + resp.attributes = {1: {}} + else: + desc = Clusters.Descriptor + server_list_attr = Clusters.Descriptor.Attributes.ServerList + device_type_list_attr = Clusters.Descriptor.Attributes.DeviceTypeList + device_type_list = [Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=device_type_id, revision=2)] + resp.attributes = {1: {desc: {device_type_list_attr: device_type_list, server_list_attr: server_list}}} + self.test.endpoints = resp.attributes + + def create_good_device(self, device_type_id: int) -> DeviceConformanceTests: + self.test = DeviceConformanceTests() + self.test.xml_device_types = self.xml_device_types + self.test.xml_clusters = self.xml_clusters + + resp = Attribute.AsyncReadTransaction.ReadResponse({}, [], {}) + desc = Clusters.Descriptor + server_list_attr = Clusters.Descriptor.Attributes.ServerList + device_type_list_attr = Clusters.Descriptor.Attributes.DeviceTypeList + device_type_list = [Clusters.Descriptor.Structs.DeviceTypeStruct( + deviceType=device_type_id, revision=self.xml_device_types[device_type_id].revision)] + server_list = [k for k, v in self.xml_device_types[device_type_id].server_clusters.items( + ) if conformance_allowed(v.conformance(0, [], []), False)] + resp.attributes = {1: {desc: {device_type_list_attr: device_type_list, server_list_attr: server_list}}} + + self.test.endpoints = resp.attributes + + # Test with temp sensor with temp sensor, identify and descriptor + def test_ts_minimal_clusters(self): + self.create_test([Clusters.TemperatureMeasurement.id, Clusters.Identify.id, Clusters.Descriptor.id]) + success, problems = self.test.check_device_type(fail_on_extra_clusters=True) + if problems: + print(problems) + asserts.assert_true(success, "Failure on Temperature Sensor device type test") + + # Temp sensor with temp sensor, identify, descriptor, binding + def test_ts_minimal_with_binding(self): + self.create_test([Clusters.TemperatureMeasurement.id, Clusters.Identify.id, Clusters.Binding.id, Clusters.Descriptor.id]) + success, problems = self.test.check_device_type(fail_on_extra_clusters=True) + if problems: + print(problems) + asserts.assert_true(success, "Failure on Temperature Sensor device type test") + asserts.assert_false(problems, "Found problems on Temperature sensor device type test") + + # Temp sensor with temp sensor, identify, descriptor, fixed label + def test_ts_minimal_with_label(self): + self.create_test([Clusters.TemperatureMeasurement.id, Clusters.Identify.id, Clusters.FixedLabel.id, Clusters.Descriptor.id]) + success, problems = self.test.check_device_type(fail_on_extra_clusters=True) + if problems: + print(problems) + asserts.assert_true(success, "Failure on Temperature Sensor device type test") + asserts.assert_false(problems, "Found problems on Temperature sensor device type test") + + # Temp sensor with temp sensor, descriptor + def test_ts_missing_identify(self): + self.create_test([Clusters.TemperatureMeasurement.id, Clusters.Descriptor.id]) + success, problems = self.test.check_device_type(fail_on_extra_clusters=True) + if problems: + print(problems) + asserts.assert_equal(len(problems), 1, "Unexpected number of problems") + asserts.assert_false(success, "Unexpected success running test that should fail") + + # endpoint 1 empty + def test_endpoint_missing_descriptor(self): + self.create_test([], no_descriptor=True) + success, problems = self.test.check_device_type(fail_on_extra_clusters=True) + if problems: + print(problems) + asserts.assert_equal(len(problems), 1, "Unexpected number of problems") + asserts.assert_false(success, "Unexpected success running test that should fail") + + # Temp sensor with temp sensor, descriptor, identify, onoff + def test_ts_extra_cluster(self): + self.create_test([Clusters.TemperatureMeasurement.id, Clusters.Identify.id, Clusters.Descriptor.id, Clusters.OnOff.id]) + success, problems = self.test.check_device_type(fail_on_extra_clusters=True) + if problems: + print(problems) + asserts.assert_equal(len(problems), 1, "Unexpected number of problems") + asserts.assert_false(success, "Unexpected success running test that should fail") + + success, problems = self.test.check_device_type(fail_on_extra_clusters=False) + asserts.assert_equal(len(problems), 1, "Did not receive expected warning for extra clusters") + asserts.assert_true(success, "Unexpected failure") + + def test_bad_device_type_id_device_type_test(self): + self.create_test([], bad_device_id=True) + success, problems = self.test.check_device_type(fail_on_extra_clusters=True) + if problems: + print(problems) + asserts.assert_equal(len(problems), 1, "Unexpected number of problems") + asserts.assert_false(success, "Unexpected success running test that should fail") + + def test_all_device_types(self): + for id in self.xml_device_types.keys(): + self.create_good_device(id) + success, problems = self.test.check_device_type(fail_on_extra_clusters=True) + if problems: + print(problems) + asserts.assert_false(problems, f"Unexpected problems on device type {id}") + asserts.assert_true(success, f"Unexpected failure on device type {id}") + + def test_disallowed_cluster(self): + for id, dt in self.xml_device_types.items(): + expected_problems = 0 + self.create_good_device(id) + for cluster_id, cluster in dt.server_clusters.items(): + if not conformance_allowed(cluster.conformance(0, [], []), False): + self.test.endpoints[1][Clusters.Descriptor][Clusters.Descriptor.Attributes.ServerList].append(cluster_id) + expected_problems += 1 + if expected_problems == 0: + continue + success, problems = self.test.check_device_type(fail_on_extra_clusters=True) + if problems: + print(problems) + asserts.assert_equal(len(problems), expected_problems, "Unexpected number of problems") + asserts.assert_false(success, "Unexpected success running test that should fail") + if __name__ == "__main__": default_matter_test_main() diff --git a/src/python_testing/conformance_support.py b/src/python_testing/conformance_support.py index 6e439f1deb2b4d..90b6a4c3dd105c 100644 --- a/src/python_testing/conformance_support.py +++ b/src/python_testing/conformance_support.py @@ -331,7 +331,9 @@ def __call__(self, feature_map: uint, attribute_list: list[uint], all_command_li for op in self.op_list: decision_with_choice = op(feature_map, attribute_list, all_command_list) # and operations can't happen on optional or disallowed - if decision_with_choice.decision in [ConformanceDecision.OPTIONAL, ConformanceDecision.DISALLOWED, ConformanceDecision.PROVISIONAL]: + if decision_with_choice.decision == ConformanceDecision.OPTIONAL and all([type(op) == device_feature for op in self.op_list]): + return decision_with_choice + elif decision_with_choice.decision in [ConformanceDecision.OPTIONAL, ConformanceDecision.DISALLOWED, ConformanceDecision.PROVISIONAL]: raise ConformanceException('AND operation on optional or disallowed item') elif decision_with_choice.decision == ConformanceDecision.NOT_APPLICABLE: return decision_with_choice