diff --git a/src/python_testing/TC_RR_1_1.py b/src/python_testing/TC_RR_1_1.py index 557a58464f265b..d47e3bd698b895 100644 --- a/src/python_testing/TC_RR_1_1.py +++ b/src/python_testing/TC_RR_1_1.py @@ -17,15 +17,18 @@ import asyncio import logging +import math import queue import random import time from binascii import hexlify from threading import Event +from typing import Any, Dict, List import chip.CertificateAuthority import chip.clusters as Clusters import chip.FabricAdmin +from chip import ChipDeviceCtrl from chip.clusters.Attribute import AttributeStatus, SubscriptionTransaction, TypedAttributePath from chip.interaction_model import Status as StatusEnum from chip.utils import CommissioningBuildingBlocks @@ -351,6 +354,59 @@ async def test_TC_RR_1_1(self): else: logging.info("Step 9: Skipped due to no UserLabel cluster instances") + # Step 10: Count all group cluster instances + # and ensure MaxGroupsPerFabric >= 4 * counted_groups_clusters. + logging.info("Step 10: Validating groups support minimums") + groups_cluster_endpoints: Dict[int, Any] = await dev_ctrl.ReadAttribute(self.dut_node_id, [Clusters.Groups]) + counted_groups_clusters: int = len(groups_cluster_endpoints) + + # The test for Step 10 and all of Steps 11 to 14 are only performed if Groups cluster instances are found. + if counted_groups_clusters > 0: + indicated_max_groups_per_fabric: int = await self.read_single_attribute(dev_ctrl, + node_id=self.dut_node_id, + endpoint=0, + attribute=Clusters.GroupKeyManagement.Attributes.MaxGroupsPerFabric) + if indicated_max_groups_per_fabric < 4 * counted_groups_clusters: + asserts.fail(f"Failed Step 10: MaxGroupsPerFabric < 4 * counted_groups_clusters") + + # Step 11: Confirm MaxGroupKeysPerFabric meets the minimum requirement of 3. + indicated_max_group_keys_per_fabric: int = await self.read_single_attribute(dev_ctrl, + node_id=self.dut_node_id, + endpoint=0, + attribute=Clusters.GroupKeyManagement.Attributes.MaxGroupKeysPerFabric) + if indicated_max_group_keys_per_fabric < 3: + asserts.fail(f"Failed Step 11: MaxGroupKeysPerFabric < 3") + + # Create a list of per-fabric clients to use for filling group resources accross all fabrics. + fabric_unique_clients: List[Any] = [] + for fabric_idx in range(num_fabrics_to_commission): + fabric_number: int = fabric_idx + 1 + # Client is client A for each fabric to set the Label field + client_name: str = "RD%dA" % fabric_number + fabric_unique_clients.append(client_by_name[client_name]) + + # Step 12: Write and verify indicated_max_group_keys_per_fabric group keys to all fabrics. + group_keys: List[Dict[Clusters.GroupKeyManagement.Structs.GroupKeySetStruct]] = await self.fill_and_validate_group_key_sets( + num_fabrics_to_commission, fabric_unique_clients, indicated_max_group_keys_per_fabric) + + # Step 13: Write and verify indicated_max_groups_per_fabric group/key mappings for all fabrics. + # First, Generate list of unique group/key mappings + group_key_map: List[Dict[int, int]] = [[]]*num_fabrics_to_commission + for fabric_idx in range(num_fabrics_to_commission): + group_key_map[fabric_idx] = {} + for group_idx in range(indicated_max_groups_per_fabric): + group_id: int = fabric_idx * indicated_max_groups_per_fabric + group_idx + 1 + group_key_idx: int = group_idx % len(group_keys[fabric_idx]) + group_key_map[fabric_idx][group_id] = group_keys[fabric_idx][group_key_idx].groupKeySetID + + group_key_map: List[Dict[int, int]] = await self.fill_and_validate_group_key_map( + num_fabrics_to_commission, fabric_unique_clients, group_key_map) + + # Step 14: Add all the groups to the discovered groups-supporting endpoints and verify GroupTable + group_table_written: List[Dict[int, Clusters.GroupKeyManagement.Structs.GroupInfoMapStruct]] = await self.add_all_groups( + num_fabrics_to_commission, fabric_unique_clients, group_key_map, groups_cluster_endpoints, indicated_max_groups_per_fabric) + await self.validate_group_table(num_fabrics_to_commission, fabric_unique_clients, group_table_written) + def random_string(self, length) -> str: rnd = self._pseudo_random_generator return "".join([rnd.choice("abcdef0123456789") for _ in range(length)])[:length] @@ -378,6 +434,158 @@ async def fill_user_label_list(self, dev_ctrl, target_node_id): asserts.assert_equal(read_back_labels, labels, "LabelList attribute must match what was written") + async def fill_and_validate_group_key_sets(self, + fabrics: int, + clients: List[Any], + keys_per_fabric: int) -> List[List[Clusters.GroupKeyManagement.Structs.GroupKeySetStruct]]: + # Step 12: Write indicated_max_group_keys_per_fabric group keys to all fabrics. + group_keys: List[List[Clusters.GroupKeyManagement.Structs.GroupKeySetStruct]] = [[]]*fabrics + for fabric_idx in range(fabrics): + client: Any = clients[fabric_idx] + + # Write, skip the IPK key set. + for group_key_cluster_idx in range(1, keys_per_fabric): + group_key_list_idx: int = group_key_cluster_idx - 1 + + logging.info("Step 12: Setting group key on fabric %d at index '%d'" % (fabric_idx+1, group_key_cluster_idx)) + group_keys[fabric_idx].append(self.build_group_key(fabric_idx, group_key_cluster_idx, keys_per_fabric)) + await client.SendCommand(self.dut_node_id, 0, Clusters.GroupKeyManagement.Commands.KeySetWrite(group_keys[fabric_idx][group_key_list_idx])) + + # Step 12 verification: After all the key sets were written, read all the information back. + for fabric_idx in range(fabrics): + client: Any = clients[fabric_idx] + + # Read, skip the IPK key set. + for group_key_cluster_idx in range(1, keys_per_fabric): + group_key_list_idx: int = group_key_cluster_idx - 1 + + logging.info("Step 12: Reading back group key on fabric %d at index ''%d'" % (fabric_idx+1, group_key_cluster_idx)) + key_set = await client.SendCommand(self.dut_node_id, 0, + Clusters.GroupKeyManagement.Commands.KeySetRead( + group_keys[fabric_idx][group_key_list_idx].groupKeySetID), + responseType=Clusters.GroupKeyManagement.Commands.KeySetReadResponse) + print(key_set) + asserts.assert_equal(group_keys[fabric_idx][group_key_list_idx].groupKeySetID, + key_set.groupKeySetID, "Received incorrect key set.") + asserts.assert_equal(group_keys[fabric_idx][group_key_list_idx].groupKeySecurityPolicy, + key_set.groupKeySecurityPolicy) + asserts.assert_equal(group_keys[fabric_idx][group_key_list_idx].epochStartTime0, key_set.epochStartTime0) + asserts.assert_equal(group_keys[fabric_idx][group_key_list_idx].epochStartTime1, key_set.epochStartTime1) + asserts.assert_equal(group_keys[fabric_idx][group_key_list_idx].epochStartTime2, key_set.epochStartTime2) + asserts.assert_equal(chip.NullValue, key_set.epochKey0, "Unexpected key response from read.") + asserts.assert_equal(chip.NullValue, key_set.epochKey1, "Unexpected key response from read.") + asserts.assert_equal(chip.NullValue, key_set.epochKey2, "Unexpected key response from read.") + + return group_keys + + async def fill_and_validate_group_key_map(self, + fabrics: int, + clients: List[Any], + group_key_map: List[Dict[int, int]]) -> None: + # Step 13: Write and verify indicated_max_groups_per_fabric group/key mappings for all fabrics. + mapping_structs: List[List[Clusters.GroupKeyManagement.Structs.GroupKeyMapStruct]] = [[]]*fabrics + for fabric_idx in range(fabrics): + client: Any = clients[fabric_idx] + + for group in group_key_map[fabric_idx]: + mapping_structs[fabric_idx].append(Clusters.GroupKeyManagement.Structs.GroupKeyMapStruct(groupId=group, + groupKeySetID=group_key_map[group], + fabricIndex=fabric_idx)) + + logging.info("Step 13: Setting group key map on fabric %d" % (fabric_idx+1)) + await client.WriteAttribute(self.dut_node_id, [(0, Clusters.GroupKeyManagement.Attributes.GroupKeyMap(mapping_structs[fabric_idx]))]) + + # Step 13 verification: After all the group key maps were written, read all the information back. + for fabric_idx in range(fabrics): + client: Any = clients[fabric_idx] + + logging.info("Step 13: Reading group key map on fabric %d" % (fabric_idx+1)) + group_key_map_readback = await self.read_single_attribute(client, node_id=self.dut_node_id, endpoint=0, attribute=Clusters.GroupKeyManagement.Attributes.GroupKeyMap) + + found_entry: int = 0 + for read_entry in group_key_map_readback: + if read_entry.fabricIndex != fabric_idx: + continue + + written_entry = next(entry for entry in mapping_structs[fabric_idx] if entry.groupId == read_entry.groupId) + found_entry += 1 + asserts.assert_equal(written_entry.groupId, read_entry.groupId) + asserts.assert_equal(written_entry.groupKeySetID, read_entry.groupKeySetID) + asserts.assert_equal(written_entry.fabricIndex, read_entry.fabricIndex) + + asserts.assert_equal(found_entry, len(mapping_structs[fabric_idx]), + "GroupKeyMap does not match the length of written data.") + + async def add_all_groups(self, + fabrics: int, + clients: List[Any], + group_key_map: List[Dict[int, int]], + group_endpoints: Dict[int, Any], + groups_per_fabric: int) -> List[Dict[int, Clusters.GroupKeyManagement.Structs.GroupInfoMapStruct]]: + # Step 14: Add indicated_max_groups_per_fabric to each fabric through the Groups clusters on supporting endpoints. + written_group_table_map: List[Dict[int, Clusters.GroupKeyManagement.Structs.GroupInfoMapStruct]] = [[]]*fabrics + for fabric_idx in range(fabrics): + client: Any = clients[fabric_idx] + + base_groups_per_endpoint: int = math.floor(groups_per_fabric / len(group_endpoints)) + groups_remainder: int = groups_per_fabric % len(group_endpoints) + fabric_group_index: int = 0 + + for endpoint_id in group_endpoints: + groups_to_add: int = base_groups_per_endpoint + groups_remainder + groups_remainder -= 1 + + feature_map: int = await self.read_single_attribute(client, + node_id=self.dut_node_id, + endpoint=endpoint_id, + attribute=Clusters.Groups.Attributes.FeatureMap) + name_featrure_bit: int = 0 + name_supported: bool = (feature_map & (1 << name_featrure_bit)) != 0 + + # Write groups to cluster + for group_idx in range(fabric_group_index, fabric_group_index + groups_to_add): + group_name: str = self.random_string(16) if name_supported else "" + group_id: int = group_key_map[fabric_idx].keys()[group_idx] + command: Clusters.GroupKeyManagement.Commands.AddGroup = Clusters.GroupKeyManagement.Commands.AddGroup( + groupID=group_id, groupName=group_name) + written_group_table_map[fabric_idx][group_id] = Clusters.GroupKeyManagement.Structs.GroupInfoMapStruct(groupID=group_id, + groupName=group_name, + fabricIndex=fabric_idx, + endpoints=[endpoint_id]) + add_response: Clusters.Groups.Commands.AddGroupResponse = await client.SendCommand(self.dut_node_id, endpoint_id, command, + responseType=Clusters.Groups.Commands.AddGroupResponse) + asserts.assert_equal(StatusEnum.Success, add_response.status) + asserts.assert_equal(group_id, add_response.groupID) + + # for endpoint_id in group_endpoints + fabric_group_index += groups_to_add + + async def validate_group_table(self, + fabrics: int, + clients: List[Any], + group_table_written: List[Dict[int, Clusters.GroupKeyManagement.Structs.GroupInfoMapStruct]]) -> None: + for fabric_idx in range(fabrics): + client: Any = clients[fabric_idx] + group_table_read: List[Clusters.GroupKeyManagement.Attributes.FeatGroupTableureMap] = await self.read_single_attribute( + client, node_id=self.dut_node_id, endpoint=0, attribute=Clusters.GroupKeyManagement.Attributes.FeatGroupTableureMap) + + found_groups: int = 0 + for read_entry in group_table_read: + if read_entry.fabricIndex != fabric_idx: + continue + + found_groups += 1 + asserts.assert_in(group_table_written[fabric_idx], read_entry.groupId, "Group missing from group map") + written_entry: Clusters.GroupKeyManagement.Structs.GroupInfoMapStruct = group_table_written[ + fabric_idx][read_entry.groupId] + asserts.assert_equal(written_entry.groupId, read_entry.groupId) + asserts.assert_equal(written_entry.endpoints, read_entry.endpoints) + asserts.assert_equal(written_entry.groupName, read_entry.groupName) + asserts.assert_equal(written_entry.fabricIndex, read_entry.fabricIndex) + + asserts.assert_equal(found_groups, len(group_table_written[fabric_idx]), + "Found group count does not match written value.") + def build_acl(self, fabric_number, client_by_name, num_controllers_per_fabric): acl = [] @@ -462,6 +670,25 @@ def build_acl(self, fabric_number, client_by_name, num_controllers_per_fabric): return acl + def build_group_key(self, fabric_index: int, group_key_index: int, keys_per_fabric: int) -> Clusters.GroupKeyManagement.Structs.GroupKeySetStruct: + asserts.assert_not_equal(group_key_index, 0, "TH Internal Error: IPK key set index (0) should not be re-generated.") + + # groupKeySetID is definted as uint16 in the Matter specification. + # To easily test that the stored values are unique, unique values are created accross all fabrics. + # However, it is only required that values be unique within a fabric according to the specifiction. + # If a device ever provides over 65535 total key sets, then this will need to be updated. + set_id: int = fabric_index*keys_per_fabric + group_key_index + asserts.assert_less_equal( + set_id, 0xFFFF, "Invalid Key Set ID. This may be a limitation of the test harness, not the device under test.") + return Clusters.GroupKeyManagement.Structs.GroupKeySetStruct(groupKeySetID=set_id, + groupKeySecurityPolicy=Clusters.GroupKeyManagement.Enums.GroupKeySecurityPolicy.kTrustFirst, + epochKey0=self.random_string(16).encode(), + epochStartTime0=(set_id * 4), + epochKey1=self.random_string(16).encode(), + epochStartTime1=(set_id * 4 + 1), + epochKey2=self.random_string(16).encode(), + epochStartTime2=(set_id * 4 + 2)) + if __name__ == "__main__": default_matter_test_main(maximize_cert_chains=True, controller_cat_tags=[0x0001_0001])