Skip to content

Commit

Permalink
PICS checker test implementation (#30970)
Browse files Browse the repository at this point in the history
* PICS checker test implementation

* Address review comments

* Address review comments

* Remove tests for case-insensitive pics - we don't want this

* Fix case-insensitive pics and add test

* Fix pics case in test
  • Loading branch information
cecille authored Jan 17, 2024
1 parent 642aa4e commit 84c0cf9
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 27 deletions.
4 changes: 2 additions & 2 deletions src/python_testing/TC_TIMESYNC_2_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,15 +164,15 @@ async def test_TC_TIMESYNC_2_1(self):
asserts.assert_true(False, "NTPServerAvailable is mandatory if the NTPS (TIMESYNC.S.F02) feature is supported")

self.print_step(12, "Read TimeZoneListMaxSize")
if self.check_pics("TIMESYNC.S.A000A"):
if self.check_pics("TIMESYNC.S.A000a"):
size = await self.read_ts_attribute_expect_success(endpoint=endpoint, attribute=attributes.TimeZoneListMaxSize)
asserts.assert_greater_equal(size, 1, "TimeZoneListMaxSize must be at least 1")
asserts.assert_less_equal(size, 2, "TimeZoneListMaxSize must be max 2")
elif self.check_pics("TIMESYNC.S.F00"):
asserts.assert_true(False, "TimeZoneListMaxSize is mandatory if the TZ (TIMESYNC.S.F00) feature is supported")

self.print_step(13, "Read DSTOffsetListMaxSize")
if self.check_pics("TIMESYNC.S.A000B"):
if self.check_pics("TIMESYNC.S.A000b"):
size = await self.read_ts_attribute_expect_success(endpoint=endpoint, attribute=attributes.DSTOffsetListMaxSize)
asserts.assert_greater_equal(size, 1, "DSTOffsetListMaxSize must be at least 1")
elif self.check_pics("TIMESYNC.S.F00"):
Expand Down
160 changes: 160 additions & 0 deletions src/python_testing/TC_pics_checker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
#
# Copyright (c) 2023 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import math

import chip.clusters as Clusters
from basic_composition_support import BasicCompositionTests
from global_attribute_ids import GlobalAttributeIds
from matter_testing_support import (AttributePathLocation, ClusterPathLocation, CommandPathLocation, FeaturePathLocation,
MatterBaseTest, async_test_body, default_matter_test_main)
from mobly import asserts
from spec_parsing_support import build_xml_clusters


def attribute_pics(pics_base: str, id: int) -> str:
return f'{pics_base}.S.A{id:04x}'


def accepted_cmd_pics(pics_base: str, id: int) -> str:
return f'{pics_base}.S.C{id:02x}.Rsp'


def generated_cmd_pics(pics_base: str, id: int) -> str:
return f'{pics_base}.S.C{id:02x}.Tx'


def feature_pics(pics_base: str, bit: int) -> str:
return f'{pics_base}.S.F{bit:02x}'


class TC_PICS_Checker(MatterBaseTest, BasicCompositionTests):
@async_test_body
async def setup_class(self):
super().setup_class()
await self.setup_class_helper(False)
# build_xml_cluster returns a list of issues found when paring the XML
# Problems in the XML shouldn't cause test failure, but we want them recorded
# so they are added to the list of problems that get output when the test set completes.
self.xml_clusters, self.problems = build_xml_clusters()

def _check_and_record_errors(self, location, required, pics):
if required and not self.check_pics(pics):
self.record_error("PICS check", location=location,
problem=f"An element found on the device, but the corresponding PICS {pics} was not found in pics list")
self.success = False
elif not required and self.check_pics(pics):
self.record_error("PICS check", location=location, problem=f"PICS {pics} found in PICS list, but not on device")
self.success = False

def _add_pics_for_lists(self, cluster_id: int, attribute_id_of_element_list: GlobalAttributeIds) -> None:
try:
if attribute_id_of_element_list == GlobalAttributeIds.ATTRIBUTE_LIST_ID:
all_spec_elements_to_check = Clusters.ClusterObjects.ALL_ATTRIBUTES[cluster_id]
pics_mapper = attribute_pics
elif attribute_id_of_element_list == GlobalAttributeIds.ACCEPTED_COMMAND_LIST_ID:
all_spec_elements_to_check = Clusters.ClusterObjects.ALL_ACCEPTED_COMMANDS[cluster_id]
pics_mapper = accepted_cmd_pics

elif attribute_id_of_element_list == GlobalAttributeIds.GENERATED_COMMAND_LIST_ID:
all_spec_elements_to_check = Clusters.ClusterObjects.ALL_GENERATED_COMMANDS[cluster_id]
pics_mapper = generated_cmd_pics
else:
asserts.fail("add_pics_for_list function called for non-list attribute")
except KeyError:
# This cluster does not have any of this element type
return

for element_id in all_spec_elements_to_check:
if element_id > 0xF000:
# No pics for global elements
continue
pics = pics_mapper(self.xml_clusters[cluster_id].pics, element_id)

if cluster_id not in self.endpoint.keys():
# This cluster is not on this endpoint
required = False
elif element_id in self.endpoint[cluster_id][attribute_id_of_element_list]:
# Cluster and element are on the endpoint
required = True
else:
# Cluster is on the endpoint but the element is not in the list
required = False

if attribute_id_of_element_list == GlobalAttributeIds.ATTRIBUTE_LIST_ID:
location = AttributePathLocation(endpoint_id=self.endpoint_id, cluster_id=cluster_id, attribute_id=element_id)
else:
location = CommandPathLocation(endpoint_id=self.endpoint_id, cluster_id=cluster_id, command_id=element_id)

self._check_and_record_errors(location, required, pics)

def test_TC_pics_checker(self):
self.endpoint_id = self.matter_test_config.endpoint
self.endpoint = self.endpoints_tlv[self.endpoint_id]
self.success = True

for cluster_id, cluster in Clusters.ClusterObjects.ALL_CLUSTERS.items():
# Data model XML is used to get the PICS code for this cluster. If we don't know the PICS
# code, we can't evaluate the PICS list. Clusters that are present on the device but are
# not present in the spec are checked in the IDM tests.
if cluster_id not in self.xml_clusters or self.xml_clusters[cluster_id].pics is None:
continue

# Ensure the PICS.S code is correctly marked
pics_cluster = f'{self.xml_clusters[cluster_id].pics}.S'
location = ClusterPathLocation(endpoint_id=self.endpoint_id, cluster_id=cluster_id)
self._check_and_record_errors(location, cluster_id in self.endpoint, pics_cluster)

self._add_pics_for_lists(cluster_id, GlobalAttributeIds.ATTRIBUTE_LIST_ID)
self._add_pics_for_lists(cluster_id, GlobalAttributeIds.ACCEPTED_COMMAND_LIST_ID)
self._add_pics_for_lists(cluster_id, GlobalAttributeIds.GENERATED_COMMAND_LIST_ID)

try:
cluster_features = cluster.Bitmaps.Feature
except AttributeError:
# cluster has no features
continue

pics_base = self.xml_clusters[cluster_id].pics
try:
feature_map = self.endpoint[cluster_id][GlobalAttributeIds.FEATURE_MAP_ID]
except KeyError:
feature_map = 0

for feature_mask in cluster_features:
# Codegen in python uses feature masks (0x01, 0x02, 0x04 etc.)
# PICS uses the mask bit number (1, 2, 3)
# Convert the mask to a bit number so we can check the PICS.
feature_bit = int(math.log2(feature_mask))
pics = feature_pics(pics_base, feature_bit)
if feature_mask & feature_map:
required = True
else:
required = False

try:
location = FeaturePathLocation(endpoint_id=self.endpoint_id, cluster_id=cluster_id,
feature_code=self.xml_clusters[cluster_id].features[feature_mask].code)
except KeyError:
location = ClusterPathLocation(endpoint_id=self.endpoint_id, cluster_id=cluster_id)
self._check_and_record_errors(location, required, pics)

if not self.success:
self.fail_current_test("At least one PICS error was found for this endpoint")


if __name__ == "__main__":
default_matter_test_main()
5 changes: 2 additions & 3 deletions src/python_testing/TestMatterTestingSupport.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ async def test_type_checking(self):
async def test_pics_support(self):
pics_list = ['TEST.S.A0000=1',
'TEST.S.A0001=0',
'lower.s.a0000=1',
'TEST.S.A000a=1'
'',
' ',
'# comment',
Expand All @@ -148,10 +148,9 @@ async def test_pics_support(self):

asserts.assert_true(self.check_pics("TEST.S.A0000"), "PICS parsed incorrectly for TEST.S.A0000")
asserts.assert_false(self.check_pics("TEST.S.A0001"), "PICS parsed incorrectly for TEST.S.A0001")
asserts.assert_true(self.check_pics("LOWER.S.A0000"), "PICS pased incorrectly for LOWER.S.A0000")
asserts.assert_true(self.check_pics("TEST.S.A000a"), "PICS parsed incorrectly for TEST.S.A000a")
asserts.assert_true(self.check_pics("SPACE.S.A0000"), "PICS parsed incorrectly for SPACE.S.A0000")
asserts.assert_false(self.check_pics("NOT.S.A0000"), "PICS parsed incorrectly for NOT.S.A0000")
asserts.assert_true(self.check_pics(" test.s.a0000"), "PICS checker lowercase handled incorrectly")

# invalid pics file should throw a value error
pics_list.append("BAD.S.A000=5")
Expand Down
4 changes: 2 additions & 2 deletions src/python_testing/basic_composition_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ def ConvertValue(value) -> Any:


class BasicCompositionTests:
async def setup_class_helper(self):
async def setup_class_helper(self, default_to_pase: bool = True):
dev_ctrl = self.default_controller
self.problems = []

do_test_over_pase = self.user_params.get("use_pase_only", True)
do_test_over_pase = self.user_params.get("use_pase_only", default_to_pase)
dump_device_composition_path: Optional[str] = self.user_params.get("dump_device_composition_path", None)

if do_test_over_pase:
Expand Down
2 changes: 2 additions & 0 deletions src/python_testing/drlk_2_x_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,10 @@ async def run_drlk_test_common(self, lockUnlockCommand, lockUnlockCmdRspPICS, lo
self.print_step("1", "TH writes the RequirePINforRemoteOperation attribute value as false on the DUT")
attribute = attributes.RequirePINforRemoteOperation(False)
if self.check_pics("DRLK.S.M.RequirePINForRemoteOperationAttributeWritable"):
print("---------------------- PICS is true")
await self.write_drlk_attribute_expect_success(attribute=attribute)
else:
print("---------------------- PICS is false")
await self.write_drlk_attribute_expect_error(attribute=attribute, error=Status.UnsupportedWrite)

if self.check_pics("DRLK.S.A0033"):
Expand Down
4 changes: 2 additions & 2 deletions src/python_testing/matter_testing_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def parse_pics(lines=typing.List[str]) -> dict[str, bool]:
if val not in ["1", "0"]:
raise ValueError('PICS {} must have a value of 0 or 1'.format(key))

pics[key.strip().upper()] = (val == "1")
pics[key.strip()] = (val == "1")
return pics


Expand Down Expand Up @@ -725,7 +725,7 @@ def teardown_class(self):

def check_pics(self, pics_key: str) -> bool:
picsd = self.matter_test_config.pics
pics_key = pics_key.strip().upper()
pics_key = pics_key.strip()
return pics_key in picsd and picsd[pics_key]

async def read_single_attribute(
Expand Down
42 changes: 24 additions & 18 deletions src/python_testing/spec_parsing_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class XmlCluster:
accepted_commands: dict[uint, XmlCommand]
generated_commands: dict[uint, XmlCommand]
events: dict[uint, XmlEvent]
pics: str


class CommandType(Enum):
Expand All @@ -124,6 +125,12 @@ def __init__(self, cluster, cluster_id, name, is_alias):
except (KeyError, StopIteration):
self._derived = None

try:
classification = next(cluster.iter('classification'))
self._pics = classification.attrib['picsCode']
except (KeyError, StopIteration):
self._pics = None

self.feature_elements = self.get_all_feature_elements()
self.attribute_elements = self.get_all_attribute_elements()
self.command_elements = self.get_all_command_elements()
Expand Down Expand Up @@ -348,31 +355,29 @@ def create_cluster(self) -> XmlCluster:
attributes=self.parse_attributes(),
accepted_commands=self.parse_commands(CommandType.ACCEPTED),
generated_commands=self.parse_commands(CommandType.GENERATED),
events=self.parse_events())
events=self.parse_events(), pics=self._pics)

def get_problems(self) -> list[ProblemNotice]:
return self._problems


def build_xml_clusters() -> tuple[list[XmlCluster], list[ProblemNotice]]:
# workaround for aliased clusters not appearing in the xml. Remove this once https://github.com/csa-data-model/projects/issues/373 is addressed
conc_clusters = {0x040C: 'Carbon Monoxide Concentration Measurement',
0x040D: 'Carbon Dioxide Concentration Measurement',
0x0413: 'Nitrogen Dioxide Concentration Measurement',
0x0415: 'Ozone Concentration Measurement',
0x042A: 'PM2.5 Concentration Measurement',
0x042B: 'Formaldehyde Concentration Measurement',
0x042C: 'PM1 Concentration Measurement',
0x042D: 'PM10 Concentration Measurement',
0x042E: 'Total Volatile Organic Compounds Concentration Measurement',
0x042F: 'Radon Concentration Measurement'}
conc_clusters = {0x040C: ('Carbon Monoxide Concentration Measurement', 'CMOCONC'),
0x040D: ('Carbon Dioxide Concentration Measurement', 'CDOCONC'),
0x0413: ('Nitrogen Dioxide Concentration Measurement', 'NDOCONC'),
0x0415: ('Ozone Concentration Measurement', 'OZCONC'),
0x042A: ('PM2.5 Concentration Measurement', 'PMICONC'),
0x042B: ('Formaldehyde Concentration Measurement', 'FLDCONC'),
0x042C: ('PM1 Concentration Measurement', 'PMHCONC'),
0x042D: ('PM10 Concentration Measurement', 'PMKCONC'),
0x042E: ('Total Volatile Organic Compounds Concentration Measurement', 'TVOCCONC'),
0x042F: ('Radon Concentration Measurement', 'RNCONC')}
conc_base_name = 'Concentration Measurement Clusters'
resource_clusters = {0x0071: 'HEPA Filter Monitoring',
0x0072: 'Activated Carbon Filter Monitoring'}
resource_clusters = {0x0071: ('HEPA Filter Monitoring', 'HEPAFREMON'),
0x0072: ('Activated Carbon Filter Monitoring', 'ACFREMON')}
resource_base_name = 'Resource Monitoring Clusters'
water_clusters = {0x0405: 'Relative Humidity Measurement',
0x0407: 'Leaf Wetness Measurement',
0x0408: 'Soil Moisture Measurement'}
water_clusters = {0x0405: ('Relative Humidity Measurement', 'RH')}
water_base_name = 'Water Content Measurement Clusters'
aliases = {conc_base_name: conc_clusters, resource_base_name: resource_clusters, water_base_name: water_clusters}

Expand Down Expand Up @@ -482,15 +487,16 @@ def remove_problem(location: typing.Union[CommandPathLocation, FeaturePathLocati
new = XmlCluster(revision=c.revision, derived=c.derived, name=c.name,
feature_map=feature_map, attribute_map=attribute_map, command_map=command_map,
features=features, attributes=attributes, accepted_commands=accepted_commands,
generated_commands=generated_commands, events=events)
generated_commands=generated_commands, events=events, pics=c.pics)
clusters[id] = new

for alias_base_name, aliased_clusters in aliases.items():
for id, alias_name in aliased_clusters.items():
for id, (alias_name, pics) in aliased_clusters.items():
base = derived_clusters[alias_base_name]
new = deepcopy(base)
new.derived = alias_base_name
new.name = alias_name
new.pics = pics
clusters[id] = new

# TODO: All these fixups should be removed BEFORE SVE if at all possible
Expand Down

0 comments on commit 84c0cf9

Please sign in to comment.