From 5c4c6e42f9e26ac8c8d33aa6de3173b1a63b7d9e Mon Sep 17 00:00:00 2001 From: austina-csa <aangelus@csa-iot.org> Date: Thu, 5 Dec 2024 08:18:11 -0800 Subject: [PATCH] Fixed linting issues --- src/python_testing/TC_IDM_2_2.py | 52 ++++++++++++++++++-------------- 1 file changed, 30 insertions(+), 22 deletions(-) diff --git a/src/python_testing/TC_IDM_2_2.py b/src/python_testing/TC_IDM_2_2.py index 26df3acac46a2b..f6b9f272893e4e 100644 --- a/src/python_testing/TC_IDM_2_2.py +++ b/src/python_testing/TC_IDM_2_2.py @@ -249,9 +249,12 @@ async def test_TC_IDM_2_2(self): for endpoint in read_request: for cluster in read_request[endpoint]: - if endpoint != 1 or cluster != Clusters.ModeSelect: # Endpoint 1 seems an issue with ModeSelect (ServerList has an extra 4293984257) - returned_attrs = sorted([x.attribute_id for x in read_request[endpoint][cluster].keys() if x != Clusters.Attribute.DataVersion]) - attr_list = sorted([x for x in read_request[endpoint][cluster][cluster.Attributes.AttributeList] if x != Clusters.UnitTesting.Attributes.WriteOnlyInt8u.attribute_id]) + # Endpoint 1 seems an issue with ModeSelect (ServerList has an extra 4293984257) + if endpoint != 1 or cluster != Clusters.ModeSelect: + returned_attrs = sorted([x.attribute_id for x in read_request[endpoint] + [cluster].keys() if x != Clusters.Attribute.DataVersion]) + attr_list = sorted([x for x in read_request[endpoint][cluster][cluster.Attributes.AttributeList] + if x != Clusters.UnitTesting.Attributes.WriteOnlyInt8u.attribute_id]) asserts.assert_equal(returned_attrs, attr_list, f"Mismatch for {cluster} at endpoint {endpoint}") # Step 6 @@ -299,14 +302,14 @@ async def test_TC_IDM_2_2(self): asserts.assert_in(Clusters.Objects.Descriptor, read_request[0].keys(), "Descriptor cluster not in output") asserts.assert_in(Clusters.Objects.Descriptor.Attributes.ServerList, - read_request[0][Clusters.Objects.Descriptor], "ServerList not in output") + read_request[0][Clusters.Objects.Descriptor], "ServerList not in output") for cluster in read_request[0]: attribute_ids = [a.attribute_id for a in read_request[0] - [cluster].keys() if a != Clusters.Attribute.DataVersion] + [cluster].keys() if a != Clusters.Attribute.DataVersion] asserts.assert_equal(sorted(attribute_ids), - sorted(read_request[0][cluster][cluster.Attributes.AttributeList]), - "Expected attribute list does not match actual list for cluster {cluster} on endpoint 0") + sorted(read_request[0][cluster][cluster.Attributes.AttributeList]), + "Expected attribute list does not match actual list for cluster {cluster} on endpoint 0") self.verify_attribute_list_cluster(read_request, 0, Clusters.Descriptor) # Step 9 @@ -407,7 +410,8 @@ async def test_TC_IDM_2_2(self): all_clusters = set(list(ClusterObjects.ALL_CLUSTERS.keys())) for endpoint_id, endpoint in self.endpoints.items(): - dut_standard_clusters = set([x.id for x in endpoint.keys() if global_attribute_ids.cluster_id_type(x.id) == global_attribute_ids.ClusterIdType.kStandard]) + dut_standard_clusters = set([x.id for x in endpoint.keys() if global_attribute_ids.cluster_id_type( + x.id) == global_attribute_ids.ClusterIdType.kStandard]) unsupported = [id for id in list(all_clusters - dut_standard_clusters) if global_attribute_ids.attribute_id_type(id) == global_attribute_ids.AttributeIdType.kStandardNonGlobal] if unsupported: @@ -569,7 +573,7 @@ async def test_TC_IDM_2_2(self): EndpointId=0, ClusterId=None, AttributeId=Clusters.Descriptor.Attributes.ServerList.attribute_id) - # AttributeId=global_attribute_ids.GlobalAttributeIds.ATTRIBUTE_LIST_ID) + # AttributeId=global_attribute_ids.GlobalAttributeIds.ATTRIBUTE_LIST_ID) try: read_request = await self.default_controller.ReadAttribute( self.dut_node_id, @@ -577,7 +581,7 @@ async def test_TC_IDM_2_2(self): ) except ChipStackError as e: asserts.assert_equal(e.err, 0x580, - "Incorrect error response for reading non-global attribute on all clusters at endpoint 0") + "Incorrect error response for reading non-global attribute on all clusters at endpoint 0") # Step 30 @@ -586,11 +590,11 @@ async def test_TC_IDM_2_2(self): # On the TH verify that the DUT sends an error message and not the value of the attribute. self.print_step(30, "Send the Read Request Message to the DUT to read a non global attribute from all clusters at all Endpoints") - + endpoint = 0 read_request = await self.default_controller.Read(self.dut_node_id, - [(endpoint, Clusters.AccessControl.Attributes.Acl)], - ) + [(endpoint, Clusters.AccessControl.Attributes.Acl)], + ) dut_acl_original = read_request.attributes[endpoint][Clusters.AccessControl][Clusters.AccessControl.Attributes.Acl] @@ -606,19 +610,22 @@ async def test_TC_IDM_2_2(self): result = await self.default_controller.WriteAttribute(self.dut_node_id, [(endpoint, Clusters.AccessControl.Attributes.Acl(dut_acl))]) read_request = await self.default_controller.Read(self.dut_node_id, - [(endpoint, Clusters.AccessControl.Attributes.Acl)], - ) + [(endpoint, Clusters.AccessControl.Attributes.Acl)], + ) + asserts.assert_equal(len(read_request.attributes[0][Clusters.AccessControl][Clusters.AccessControl.Attributes.Acl]), 2) - asserts.assert_equal(type(read_request.attributes[0][Clusters.AccessControl][Clusters.AccessControl.Attributes.Acl][0].targets), Clusters.Types.Nullable) - asserts.assert_not_equal(type(read_request.attributes[0][Clusters.AccessControl][Clusters.AccessControl.Attributes.Acl][1].targets), Clusters.Types.Nullable) + asserts.assert_equal(type(read_request.attributes[0][Clusters.AccessControl] + [Clusters.AccessControl.Attributes.Acl][0].targets), Clusters.Types.Nullable) + asserts.assert_not_equal(type(read_request.attributes[0][Clusters.AccessControl] + [Clusters.AccessControl.Attributes.Acl][1].targets), Clusters.Types.Nullable) asserts.assert_equal(len(read_request.tlvAttributes[0][31][0]), 2) # attribute_path = AttributePath( # EndpointId=0, # ClusterId=None, # AttributeId=Clusters.Descriptor.Attributes.ServerList.attribute_id) - + # read_request = await self.default_controller.ReadAttribute( # self.dut_node_id, # [attribute_path] @@ -627,12 +634,13 @@ async def test_TC_IDM_2_2(self): # Cleanup await self.default_controller.WriteAttribute(self.dut_node_id, [(endpoint, Clusters.AccessControl.Attributes.Acl(dut_acl_original))]) read_request = await self.default_controller.Read(self.dut_node_id, - [(endpoint, Clusters.AccessControl.Attributes.Acl)], - ) + [(endpoint, Clusters.AccessControl.Attributes.Acl)], + ) asserts.assert_equal(len(read_request.tlvAttributes[0][31][0]), 1) asserts.assert_equal(len(read_request.attributes[0][Clusters.AccessControl][Clusters.AccessControl.Attributes.Acl]), 1) - asserts.assert_equal(type(read_request.attributes[0][Clusters.AccessControl][Clusters.AccessControl.Attributes.Acl][0].targets), Clusters.Types.Nullable) - + asserts.assert_equal(type(read_request.attributes[0][Clusters.AccessControl] + [Clusters.AccessControl.Attributes.Acl][0].targets), Clusters.Types.Nullable) + # Step 31 # TH should have access to only a single cluster at one Endpoint1.