Skip to content

Commit

Permalink
Access checker: Improve write check support (#31452)
Browse files Browse the repository at this point in the history
* check

* Non-writeable attribute checks

* Fixes to the spec parsing

* support optinoally writeable attributes in test

* add issue for global attr writes

* Fix the optional write step, don't test unknown enum

* Remove workaround for global attrs

* address review comments
  • Loading branch information
cecille authored Jan 19, 2024
1 parent 45aa5b7 commit 039cbab
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 17 deletions.
33 changes: 27 additions & 6 deletions src/python_testing/TC_AccessChecker.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,20 +137,35 @@ async def _run_read_access_test_for_cluster_privilege(self, endpoint_id, cluster
async def _run_write_access_test_for_cluster_privilege(self, endpoint_id, cluster_id, cluster, xml_cluster: XmlCluster, privilege: Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum, wildcard_read):
for attribute_id in checkable_attributes(cluster_id, cluster, xml_cluster):
spec_requires = xml_cluster.attributes[attribute_id].write_access
if spec_requires == Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kUnknownEnumValue:
# not writeable
continue
is_optional_write = xml_cluster.attributes[attribute_id].write_optional

attribute = Clusters.ClusterObjects.ALL_ATTRIBUTES[cluster_id][attribute_id]
cluster_class = Clusters.ClusterObjects.ALL_CLUSTERS[cluster_id]
location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=attribute_id)
test_name = f'Write access checker - {privilege}'
logging.info(f"Testing attribute {attribute} on endpoint {endpoint_id}")
if attribute == Clusters.AccessControl.Attributes.Acl and privilege == Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister:
logging.info("Skipping ACL attribute check for admin privilege as this is known to be writeable and is being used for this test")
continue

print(f"Testing attribute {attribute} on endpoint {endpoint_id}")
# Because we read everything with admin, we should have this in the wildcard read
# This will only not work if we end up with write-only attributes. We do not currently have any of these.
val = wildcard_read.attributes[endpoint_id][cluster_class][attribute]
if isinstance(val, list):
# Use an empty list for writes in case the list is large and does not fit
val = []

resp = await self.TH2.WriteAttribute(nodeid=self.dut_node_id, attributes=[(endpoint_id, attribute(val))])
if operation_allowed(spec_requires, privilege):
if spec_requires == Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kUnknownEnumValue:
# not writeable - expect an unsupported write response
if resp[0].Status != Status.UnsupportedWrite:
self.record_error(test_name=test_name, location=location,
problem=f"Unexpected error writing non-writeable attribute - expected Unsupported Write, got {resp[0].Status}")
self.success = False
elif is_optional_write and resp[0].Status == Status.UnsupportedWrite:
# unsupported optional writeable attribute - this is fine, no error
continue
elif operation_allowed(spec_requires, privilege):
# Write the default attribute. We don't care if this fails, as long as it fails with a DIFFERENT error than the access
# This is OK because access is required to be checked BEFORE any other thing to avoid leaking device information.
# For example, because we don't have any range information, we might be writing an out of range value, but this will
Expand All @@ -166,13 +181,19 @@ async def _run_write_access_test_for_cluster_privilege(self, endpoint_id, cluste
problem=f"Unexpected error writing attribute - expected Unsupported Access, got {resp[0].Status}")
self.success = False

if resp[0].Status == Status.Success and isinstance(val, list):
# Reset the value to the original if we managed to write an empty list
val = wildcard_read.attributes[endpoint_id][cluster_class][attribute]
await self.TH2.WriteAttribute(nodeid=self.dut_node_id, attributes=[(endpoint_id, attribute(val))])

async def run_access_test(self, test_type: AccessTestType):
# Read all the attributes on TH2 using admin access
if test_type == AccessTestType.WRITE:
await self._setup_acl(privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister)
wildcard_read = await self.TH2.Read(self.dut_node_id, [()])

privilege_enum = Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum
enum = Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum
privilege_enum = [p for p in enum if p != enum.kUnknownEnumValue]
for privilege in privilege_enum:
logging.info(f"Testing for {privilege}")
await self._setup_acl(privilege=privilege)
Expand Down
18 changes: 15 additions & 3 deletions src/python_testing/TestSpecParsingSupport.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
ATTRIBUTE_ID = 0x0000


def single_attribute_cluster_xml(read_access: str, write_access: str):
def single_attribute_cluster_xml(read_access: str, write_access: str, write_supported: str):
xml_cluster = f'<cluster xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="types types.xsd cluster cluster.xsd" id="{CLUSTER_ID}" name="{CLUSTER_NAME}" revision="3">'
revision_table = ('<revisionHistory>'
'<revision revision="1" summary="Initial Release"/>'
Expand All @@ -41,7 +41,7 @@ def single_attribute_cluster_xml(read_access: str, write_access: str):
'</revisionHistory>')
classification = '<classification hierarchy="base" role="utility" picsCode="TEST" scope="Node"/>'
read_access_str = f'read="true" readPrivilege="{read_access}"' if read_access is not None else ""
write_access_str = f'write="true" writePrivilege="{write_access}"' if write_access is not None else ""
write_access_str = f'write="{write_supported}" writePrivilege="{write_access}"' if write_access is not None else ""
attribute = ('<attributes>'
f'<attribute id="{ATTRIBUTE_ID}" name="{ATTRIBUTE_NAME}" type="uint16" default="MS">'
f'<access {read_access_str} {write_access_str}/>'
Expand Down Expand Up @@ -82,7 +82,7 @@ def test_spec_parsing_access(self):
strs = [None, 'view', 'operate', 'manage', 'admin']
for read in strs:
for write in strs:
xml = single_attribute_cluster_xml(read, write)
xml = single_attribute_cluster_xml(read, write, "true")
xml_cluster = parse_cluster(xml)
asserts.assert_is_not_none(xml_cluster.attributes, "No attributes found in cluster")
asserts.assert_is_not_none(xml_cluster.attribute_map, "No attribute map found in cluster")
Expand All @@ -94,6 +94,18 @@ def test_spec_parsing_access(self):
asserts.assert_equal(xml_cluster.attributes[ATTRIBUTE_ID].write_access,
get_access_enum_from_string(write), "Unexpected write access")

def test_write_optional(self):
for write_support in ['true', 'optional']:
xml = single_attribute_cluster_xml('view', 'view', write_support)
xml_cluster = parse_cluster(xml)
asserts.assert_is_not_none(xml_cluster.attributes, "No attributes found in cluster")
asserts.assert_is_not_none(xml_cluster.attribute_map, "No attribute map found in cluster")
asserts.assert_equal(len(xml_cluster.attributes), len(GlobalAttributeIds) + 1, "Unexpected number of attributes")
asserts.assert_true(ATTRIBUTE_ID in xml_cluster.attributes.keys(),
"Did not find test attribute in XmlCluster.attributes")
asserts.assert_equal(xml_cluster.attributes[ATTRIBUTE_ID].write_optional,
write_support == 'optional', "Unexpected write_optional value")


if __name__ == "__main__":
default_matter_test_main()
23 changes: 15 additions & 8 deletions src/python_testing/spec_parsing_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class XmlAttribute:
conformance: Callable[[uint], ConformanceDecision]
read_access: Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum
write_access: Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum
write_optional: bool

def access_string(self):
read_marker = "R" if self.read_access is not Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kUnknownEnumValue else ""
Expand Down Expand Up @@ -228,6 +229,9 @@ def parse_conformance(self, conformance_xml: ElementTree.Element) -> Callable:
severity=ProblemSeverity.WARNING, problem=str(ex)))
return None

def parse_write_optional(self, element_xml: ElementTree.Element, access_xml: ElementTree.Element) -> bool:
return access_xml.attrib['write'] == 'optional'

def parse_access(self, element_xml: ElementTree.Element, access_xml: ElementTree.Element, conformance: Callable) -> tuple[Optional[Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum], Optional[Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum], Optional[Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum]]:
''' Returns a tuple of access types for read / write / invoke'''
def str_to_access_type(privilege_str: str) -> Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum:
Expand Down Expand Up @@ -301,13 +305,16 @@ def parse_attributes(self) -> dict[uint, XmlAttribute]:
# I don't have a good way to relate the ranges to the conformance, but they're both acceptable, so let's just or them.
conformance = or_operation([conformance, attributes[code].conformance])
read_access, write_access, _ = self.parse_access(element, access_xml, conformance)
write_optional = False
if write_access not in [None, Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kUnknownEnumValue]:
write_optional = self.parse_write_optional(element, access_xml)
attributes[code] = XmlAttribute(name=element.attrib['name'], datatype=datatype,
conformance=conformance, read_access=read_access, write_access=write_access)
conformance=conformance, read_access=read_access, write_access=write_access, write_optional=write_optional)
# Add in the global attributes for the base class
for id in GlobalAttributeIds:
# TODO: Add data type here. Right now it's unused. We should parse this from the spec.
attributes[id] = XmlAttribute(name=id.to_name(), datatype="", conformance=mandatory(
), read_access=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, write_access=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kUnknownEnumValue)
), read_access=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, write_access=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kUnknownEnumValue, write_optional=False)
return attributes

def parse_commands(self, command_type: CommandType) -> dict[uint, XmlAttribute]:
Expand Down Expand Up @@ -508,12 +515,12 @@ def remove_problem(location: typing.Union[CommandPathLocation, FeaturePathLocati
view = Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView
none = Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kUnknownEnumValue
clusters[temp_control_id].attributes = {
0x00: XmlAttribute(name='TemperatureSetpoint', datatype='temperature', conformance=feature(0x01, 'TN'), read_access=view, write_access=none),
0x01: XmlAttribute(name='MinTemperature', datatype='temperature', conformance=feature(0x01, 'TN'), read_access=view, write_access=none),
0x02: XmlAttribute(name='MaxTemperature', datatype='temperature', conformance=feature(0x01, 'TN'), read_access=view, write_access=none),
0x03: XmlAttribute(name='Step', datatype='temperature', conformance=feature(0x04, 'STEP'), read_access=view, write_access=none),
0x04: XmlAttribute(name='SelectedTemperatureLevel', datatype='uint8', conformance=feature(0x02, 'TL'), read_access=view, write_access=none),
0x05: XmlAttribute(name='SupportedTemperatureLevels', datatype='list', conformance=feature(0x02, 'TL'), read_access=view, write_access=none),
0x00: XmlAttribute(name='TemperatureSetpoint', datatype='temperature', conformance=feature(0x01, 'TN'), read_access=view, write_access=none, write_optional=False),
0x01: XmlAttribute(name='MinTemperature', datatype='temperature', conformance=feature(0x01, 'TN'), read_access=view, write_access=none, write_optional=False),
0x02: XmlAttribute(name='MaxTemperature', datatype='temperature', conformance=feature(0x01, 'TN'), read_access=view, write_access=none, write_optional=False),
0x03: XmlAttribute(name='Step', datatype='temperature', conformance=feature(0x04, 'STEP'), read_access=view, write_access=none, write_optional=False),
0x04: XmlAttribute(name='SelectedTemperatureLevel', datatype='uint8', conformance=feature(0x02, 'TL'), read_access=view, write_access=none, write_optional=False),
0x05: XmlAttribute(name='SupportedTemperatureLevels', datatype='list', conformance=feature(0x02, 'TL'), read_access=view, write_access=none, write_optional=False),
}

# Workaround for incorrect parsing of access control cluster.
Expand Down

0 comments on commit 039cbab

Please sign in to comment.