Skip to content

Commit

Permalink
Update TC_IDM_3_2.py
Browse files Browse the repository at this point in the history
Implemented step 16 (as a WIP) and refactored for generating writable attributes
  • Loading branch information
austina-csa authored Dec 16, 2024
1 parent e3df699 commit 8af5d7f
Showing 1 changed file with 69 additions and 16 deletions.
85 changes: 69 additions & 16 deletions src/python_testing/TC_IDM_3_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
from enum import IntFlag
# from typing import Union, get_args, get_origin
from typing import get_args
from typing import Generator

import chip.clusters as Clusters
from chip.clusters import ClusterObjects as ClusterObjects
from chip.clusters.Attribute import AttributePath
from chip.clusters.ClusterObjects import ClusterObject
from chip.clusters.enum import MatterIntEnum
from chip.clusters.Types import Nullable
from chip.interaction_model import InteractionModelError, Status
from chip.testing import global_attribute_ids
from chip.tlv import uint
Expand All @@ -22,8 +25,6 @@ async def all_type_attributes_for_cluster(self, cluster: ClusterObjects.Cluster,
all_attributes = [attribute for attribute in cluster.Attributes.__dict__.values() if inspect.isclass(
attribute) and issubclass(attribute, ClusterObjects.ClusterAttributeDescriptor)]

# Hackish way to get enums to return properly -- the default behavior (under else block) returns a BLANK LIST without this workaround
# If type(attribute.attribute_type.Type) or type(ClusterObjects.ClusterObjectFieldDescriptor(Type=desired_type).Type are enums, they return <class 'aenum._enum.EnumType'>, which are equal!
if desired_type == MatterIntEnum:
all_attributes_of_type = [attribute for attribute in all_attributes if type(
attribute.attribute_type.Type) == type(ClusterObjects.ClusterObjectFieldDescriptor(Type=desired_type).Type)]
Expand Down Expand Up @@ -97,8 +98,7 @@ async def check_attribute_write_for_type(self, desired_attribute_type: type) ->
print(f'attributes_of_type_on_device: {attributes_of_type_on_device}, attributes_of_type: {attributes_of_type}')
chosen_attribute = next(iter(attributes_of_type_on_device))
value = self.pick_writable_value(chosen_attribute)
# output = await self.write_single_attribute(
await self.write_single_attribute(
output = await self.write_single_attribute(
attribute_value=chosen_attribute(value=value),
endpoint_id=endpoint,
)
Expand All @@ -107,17 +107,55 @@ async def check_attribute_write_for_type(self, desired_attribute_type: type) ->
return True
return False

def generate_nullable_attributes(self) -> None:
self.nullable_attributes = [a for a in self.writable_attributes if isinstance(a.value, Nullable)]
self.nullable_attributes_iter = iter(self.nullable_attributes)
return

def generate_writable_attributes(self):
writable_attributes = []
command_map = {}
for cluster_id in self.xml_clusters:
xml_cluster = self.xml_clusters[cluster_id]
attributes = xml_cluster.attributes
for attribute_id, xml_attribute in attributes.items():
write_access = xml_attribute.write_access
if not write_access:
continue
if write_access is not None and write_access < Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kOperate:
continue
if cluster_id in Clusters.ClusterObjects.ALL_ATTRIBUTES and attribute_id in Clusters.ClusterObjects.ALL_ATTRIBUTES[cluster_id]:
attribute = Clusters.ClusterObjects.ALL_ATTRIBUTES[cluster_id][attribute_id]
if hasattr(attribute, 'value') and write_access >= Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kOperate:

writable_attributes.append(attribute)
if cluster_id not in command_map:
command_map[cluster_id] = self.xml_clusters[cluster_id].command_map

writable_clusters = []

for writable_attribute in writable_attributes:
cluster = Clusters.ClusterObjects.ALL_CLUSTERS[writable_attribute.cluster_id]
if cluster not in writable_clusters:
writable_clusters.append(cluster)
self.writable_attributes = writable_attributes
self.command_map = command_map
self.writable_attributes_iter = iter(self.writable_attributes)
return

@async_test_body
async def test_TC_IDM_3_2(self):
super().setup_class()
await self.setup_class_helper()
self.xml_clusters, self.problems = build_xml_clusters()
all_clusters = [cluster for cluster in Clusters.ClusterObjects.ALL_ATTRIBUTES]
# expected_descriptor_attributes = ClusterObjects.ALL_ATTRIBUTES[Clusters.Objects.Descriptor.id]
self.generate_writable_attributes()
self.generate_nullable_attributes()
expected_descriptor_attributes = ClusterObjects.ALL_ATTRIBUTES[Clusters.Objects.Descriptor.id]

# read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, Clusters.Objects.Descriptor)])
# returned_attributes = [a for a in read_request[0][Clusters.Objects.Descriptor].keys() if a !=
# Clusters.Attribute.DataVersion]
read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, Clusters.Objects.Descriptor)])
returned_attributes = [a for a in read_request[0][Clusters.Objects.Descriptor].keys() if a !=
Clusters.Attribute.DataVersion]

self.device_clusters = self.all_device_clusters()
self.device_attributes = self.all_device_attributes()
Expand Down Expand Up @@ -161,6 +199,7 @@ async def test_TC_IDM_3_2(self):
writable_attributes_iter = iter(writable_attributes)

chosen_writable_attribute = next(writable_attributes_iter)
chosen_writable_attribute = next(self.writable_attributes_iter)

chosen_writable_cluster = Clusters.ClusterObjects.ALL_CLUSTERS[chosen_writable_attribute.cluster_id]
if chosen_writable_cluster in self.device_clusters and chosen_writable_attribute in self.device_attributes:
Expand Down Expand Up @@ -196,14 +235,12 @@ async def test_TC_IDM_3_2(self):

# Verify that the DUT sends a WriteResponseMessage with any status except UNSUPPORTED_WRITE or DATA_VERSION_MISMATCH. If the Status is SUCCESS, verify the updated value by sending a ReadRequestMessage for all affected paths. If the status is SUCCESS, send a WriteRequestMessage to set the value back to `original`.

chosen_writable_attribute = next(writable_attributes_iter)
chosen_writable_attribute = next(self.writable_attributes_iter)
chosen_writable_cluster = Clusters.ClusterObjects.ALL_CLUSTERS[chosen_writable_attribute.cluster_id]
if chosen_writable_cluster in self.device_clusters and chosen_writable_attribute in self.device_attributes:
output_1 = await self.default_controller.Read(self.dut_node_id, [chosen_writable_attribute])
if output_1:
endpoint = next(iter(output_1.attributes))
# import pdb;pdb.set_trace()
# print(f"endpoint: {endpoint}")
value = self.pick_writable_value(chosen_writable_attribute)
await self.default_controller.WriteAttribute(self.dut_node_id, [(endpoint, chosen_writable_attribute(value=value))])
output_2 = await self.default_controller.Read(self.dut_node_id, [chosen_writable_attribute])
Expand Down Expand Up @@ -311,7 +348,7 @@ async def test_TC_IDM_3_2(self):
unsupported = list(all_endpoints - supported_endpoints)
result = await self.read_single_attribute_expect_error(endpoint=unsupported[0], cluster=Clusters.Descriptor, attribute=Clusters.Descriptor.Attributes.FeatureMap, error=Status.UnsupportedEndpoint)
await self.read_single_attribute_expect_error(endpoint=unsupported[0], cluster=Clusters.Descriptor, attribute=Clusters.Descriptor.Attributes.FeatureMap, error=Status.UnsupportedEndpoint)
chosen_writable_attribute = next(writable_attributes_iter)
chosen_writable_attribute = next(self.writable_attributes_iter)
value = self.pick_writable_value(chosen_writable_attribute)
await self.write_single_attribute(attribute_value=chosen_writable_attribute(value=value), endpoint_id=endpoint)
result = await self.write_single_attribute(attribute_value=chosen_writable_attribute(value=value), endpoint_id=endpoint)
Expand All @@ -334,7 +371,7 @@ async def test_TC_IDM_3_2(self):

if unsupported:
unsupported_attribute = (ClusterObjects.ALL_ATTRIBUTES[unsupported[0]])[0]
chosen_writable_attribute = next(writable_attributes_iter)
chosen_writable_attribute = next(self.writable_attributes_iter)
chosen_writable_cluster = Clusters.ClusterObjects.ALL_CLUSTERS[chosen_writable_attribute.cluster_id]
value = self.pick_writable_value(chosen_writable_attribute)
if value is not None:
Expand Down Expand Up @@ -365,7 +402,7 @@ async def test_TC_IDM_3_2(self):
== global_attribute_ids.AttributeIdType.kStandardNonGlobal]

if unsupported:
chosen_writable_attribute = next(writable_attributes_iter)
chosen_writable_attribute = next(self.writable_attributes_iter)
value = self.pick_writable_value(chosen_writable_attribute)
if value is not None:
output_1 = await self.default_controller.Read(self.dut_node_id, [chosen_writable_attribute])
Expand All @@ -386,6 +423,22 @@ async def test_TC_IDM_3_2(self):
# TH reads the attribute value and saves as original.
# TH sends a WriteRequestMessage to the DUT to write to the attribute to null.

chosen_nullable_attribute = next(self.nullable_attributes_iter)
# This picks, e.g. Clusters.ThreadNetworkDirectory.Attributes.PreferredExtendedPanID on linux-x64-lock/chip-lock-app,
# which contains an entry for value=Null
read_request = await self.default_controller.ReadAttribute(
self.dut_node_id,
[chosen_nullable_attribute]
)
# read_request returns {}?
await self.default_controller.WriteAttribute(self.dut_node_id, [(0, chosen_nullable_attribute(value=Nullable))])
# Why is this exception happening?: ValueError: Field . expected <class 'bytes'>, but got <class 'type'>

read_request_2 = await self.default_controller.ReadAttribute(
self.dut_node_id,
[chosen_nullable_attribute]
)

# Verify that the DUT sends a WriteResponseMessage with any status except UNSUPPORTED_WRITE or DATA_VERSION_MISMATCH. If the Status is SUCCESS, verify the updated value by sending a ReadRequestMessage for all affected paths. If the status is SUCCESS, send a WriteRequestMessage to set the value back to `original`.

# Step 17
Expand All @@ -397,7 +450,7 @@ async def test_TC_IDM_3_2(self):

# Verify that the DUT sends a WriteResponseMessage with any status except UNSUPPORTED_WRITE or DATA_VERSION_MISMATCH. If the Status is SUCCESS, verify the updated value by sending a ReadRequestMessage for all affected paths. If the status is SUCCESS, send a WriteRequestMessage to set the value back to `original`.

chosen_writable_attribute = next(writable_attributes_iter)
chosen_writable_attribute = next(self.writable_attributes_iter)
chosen_writable_cluster = Clusters.ClusterObjects.ALL_CLUSTERS[chosen_writable_attribute.cluster_id]
output_1 = await self.default_controller.Read(self.dut_node_id, [chosen_writable_attribute])
endpoint = next(iter(output_1.attributes))
Expand All @@ -417,7 +470,7 @@ async def test_TC_IDM_3_2(self):
# TH sends a second WriteRequestMessage to the DUT to modify the value of an attribute with the dataversion field set to the value received earlier.
# Verify that the DUT sends a Write Response message with the error DATA_VERSION_MISMATCH for the second Write request.

chosen_writable_attribute = next(writable_attributes_iter)
chosen_writable_attribute = next(self.writable_attributes_iter)
chosen_writable_cluster = Clusters.ClusterObjects.ALL_CLUSTERS[chosen_writable_attribute.cluster_id]
output_1 = await self.default_controller.Read(self.dut_node_id, [chosen_writable_attribute])
endpoint = next(iter(output_1.attributes))
Expand Down

0 comments on commit 8af5d7f

Please sign in to comment.