diff --git a/src/controller/python/chip/utils/CommissioningBuildingBlocks.py b/src/controller/python/chip/utils/CommissioningBuildingBlocks.py index 8fa80223b14316..1823b4a6dabe48 100644 --- a/src/controller/python/chip/utils/CommissioningBuildingBlocks.py +++ b/src/controller/python/chip/utils/CommissioningBuildingBlocks.py @@ -62,7 +62,8 @@ async def GrantPrivilege(adminCtrl: ChipDeviceController, grantedCtrl: ChipDevic ''' data = await adminCtrl.ReadAttribute(targetNodeId, [(Clusters.AccessControl.Attributes.Acl)]) if 0 not in data: - raise ValueError("Did not get back any data (possible cause: controller has no access..") + raise ValueError( + "Did not get back any data (possible cause: controller has no access..") currentAcls = data[0][Clusters.AccessControl][Clusters.AccessControl.Attributes.Acl] @@ -73,12 +74,14 @@ async def GrantPrivilege(adminCtrl: ChipDeviceController, grantedCtrl: ChipDevic targetSubjects = [grantedCtrl.nodeId] if (len(targetSubjects) > 4): - raise ValueError(f"List of target subjects of len {len(targetSubjects)} exceeeded the minima of 4!") + raise ValueError( + f"List of target subjects of len {len(targetSubjects)} exceeeded the minima of 4!") # Step 1: Wipe the subject from all existing ACLs. for acl in currentAcls: if (acl.subjects != NullValue): - acl.subjects = [subject for subject in acl.subjects if subject not in targetSubjects] + acl.subjects = [ + subject for subject in acl.subjects if subject not in targetSubjects] if (privilege): addedPrivilege = False @@ -107,7 +110,8 @@ async def GrantPrivilege(adminCtrl: ChipDeviceController, grantedCtrl: ChipDevic )) # Step 4: Prune ACLs which have empty subjects. - currentAcls = [acl for acl in currentAcls if acl.subjects != NullValue and len(acl.subjects) != 0] + currentAcls = [acl for acl in currentAcls if acl.subjects != + NullValue and len(acl.subjects) != 0] logger.info(f'GrantPrivilege: Writing acls: {currentAcls}') await adminCtrl.WriteAttribute(targetNodeId, [(0, Clusters.AccessControl.Attributes.Acl(currentAcls))]) @@ -137,7 +141,8 @@ async def CreateControllersOnFabric(fabricAdmin: FabricAdmin, controllerList = [] for nodeId in controllerNodeIds: - newController = fabricAdmin.NewController(nodeId=nodeId, paaTrustStorePath=paaTrustStorePath, catTags=catTags) + newController = fabricAdmin.NewController( + nodeId=nodeId, paaTrustStorePath=paaTrustStorePath, catTags=catTags) await GrantPrivilege(adminDevCtrl, newController, privilege, targetNodeId, catTags) controllerList.append(newController) @@ -156,12 +161,14 @@ async def AddNOCForNewFabricFromExisting(commissionerDevCtrl, newFabricDevCtrl, newNodeId (int): Node ID to use for the target node on the new fabric. Return: - bool: True if successful, False otherwise. + tuple: (bool, nocResp): True if successful, False otherwise, along with nocResp, rcacResp value. ''' + nocResp = None + resp = await commissionerDevCtrl.SendCommand(existingNodeId, 0, generalCommissioning.Commands.ArmFailSafe(60)) if resp.errorCode is not generalCommissioning.Enums.CommissioningErrorEnum.kOk: - return False + return False, nocResp csrForAddNOC = await commissionerDevCtrl.SendCommand(existingNodeId, 0, opCreds.Commands.CSRRequest(CSRNonce=os.urandom(32))) @@ -171,31 +178,35 @@ async def AddNOCForNewFabricFromExisting(commissionerDevCtrl, newFabricDevCtrl, chainForAddNOC.nocBytes is None or chainForAddNOC.ipkBytes is None): # Expiring the failsafe timer in an attempt to clean up. await commissionerDevCtrl.SendCommand(existingNodeId, 0, generalCommissioning.Commands.ArmFailSafe(0)) - return False + return False, nocResp await commissionerDevCtrl.SendCommand(existingNodeId, 0, opCreds.Commands.AddTrustedRootCertificate(chainForAddNOC.rcacBytes)) - resp = await commissionerDevCtrl.SendCommand(existingNodeId, - 0, - opCreds.Commands.AddNOC(chainForAddNOC.nocBytes, - chainForAddNOC.icacBytes, - chainForAddNOC.ipkBytes, - newFabricDevCtrl.nodeId, - newFabricDevCtrl.fabricAdmin.vendorId)) - if resp.statusCode is not opCreds.Enums.NodeOperationalCertStatusEnum.kOk: + nocResp = await commissionerDevCtrl.SendCommand(existingNodeId, + 0, + opCreds.Commands.AddNOC(chainForAddNOC.nocBytes, + chainForAddNOC.icacBytes, + chainForAddNOC.ipkBytes, + newFabricDevCtrl.nodeId, + newFabricDevCtrl.fabricAdmin.vendorId)) + + rcacResp = chainForAddNOC.rcacBytes + + if nocResp.statusCode is not opCreds.Enums.NodeOperationalCertStatusEnum.kOk: # Expiring the failsafe timer in an attempt to clean up. await commissionerDevCtrl.SendCommand(existingNodeId, 0, generalCommissioning.Commands.ArmFailSafe(0)) - return False + return False, nocResp resp = await newFabricDevCtrl.SendCommand(newNodeId, 0, generalCommissioning.Commands.CommissioningComplete()) + if resp.errorCode is not generalCommissioning.Enums.CommissioningErrorEnum.kOk: # Expiring the failsafe timer in an attempt to clean up. await commissionerDevCtrl.SendCommand(existingNodeId, 0, generalCommissioning.Commands.ArmFailSafe(0)) - return False + return False, nocResp if not await _IsNodeInFabricList(newFabricDevCtrl, newNodeId): - return False + return False, nocResp - return True + return True, nocResp, rcacResp async def UpdateNOC(devCtrl, existingNodeId, newNodeId): diff --git a/src/python_testing/TC_OPCREDS_3_2.py b/src/python_testing/TC_OPCREDS_3_2.py index ceb04acc314c68..d736747e443151 100644 --- a/src/python_testing/TC_OPCREDS_3_2.py +++ b/src/python_testing/TC_OPCREDS_3_2.py @@ -16,8 +16,11 @@ # import chip.clusters as Clusters -from matter_testing_support import MatterBaseTest, TestStep, async_test_body -from test_plan_support import * +from chip.tlv import TLVReader +from chip.utils import CommissioningBuildingBlocks +from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main +from test_plan_support import commission_if_required, commission_from_existing, verify_commissioning_successful, read_attribute, remove_fabric, verify_success +from mobly import asserts def verify_fabric(controller: str) -> str: @@ -54,7 +57,100 @@ def steps_TC_OPCREDS_3_2(self): @async_test_body async def test_TC_OPCREDS_3_2(self): # TODO: implement - pass + opcreds = Clusters.OperationalCredentials + + self.step(0) + + self.step(1) + # Create a new controller on a new fabric called CR2. Commission the new controller from CR1 as follows: + dev_ctrl = self.default_controller + + new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority() + cr2_vid = 0xFFF1 + cr2_fabricId = 2222 + cr2_new_fabric_admin = new_certificate_authority.NewFabricAdmin( + vendorId=cr2_vid, fabricId=cr2_fabricId) + cr2_nodeid = self.default_controller.nodeId+1 + cr2_dut_node_id = self.dut_node_id+1 + + cr2_new_admin_ctrl = cr2_new_fabric_admin.NewController( + nodeId=cr2_nodeid) + success, nocResp, rcacResp = await CommissioningBuildingBlocks.AddNOCForNewFabricFromExisting( + commissionerDevCtrl=dev_ctrl, newFabricDevCtrl=cr2_new_admin_ctrl, + existingNodeId=self.dut_node_id, newNodeId=cr2_dut_node_id + ) + fabric_index_CR2 = nocResp.fabricIndex + tlvReaderRCAC = TLVReader(rcacResp).get()["Any"] + rcac_CR2 = tlvReaderRCAC[9] # public key is field 9 + + print(f"Success: {success}, nocResp: {nocResp}, rcacResp: {rcacResp}") + nocs = await self.read_single_attribute_check_success(dev_ctrl=cr2_new_admin_ctrl, node_id=cr2_dut_node_id, cluster=opcreds, attribute=opcreds.Attributes.NOCs, fabric_filtered=False) + + print(f"nocs: {nocs}") + + # rcac_CR2 = TLVReader(rcacResp).get()["Any"] + # print(rcac_CR2[9]) + + self.step(2) + new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority() + cr3_vid = 0xFFF1 + cr3_fabricId = 3333 + cr3_new_fabric_admin = new_certificate_authority.NewFabricAdmin( + vendorId=cr3_vid, fabricId=cr3_fabricId) + cr3_nodeid = self.default_controller.nodeId+2 + cr3_dut_node_id = self.dut_node_id+2 + + cr3_new_admin_ctrl = cr3_new_fabric_admin.NewController( + nodeId=cr3_nodeid) + success, nocResp, rcacResp = await CommissioningBuildingBlocks.AddNOCForNewFabricFromExisting( + commissionerDevCtrl=dev_ctrl, newFabricDevCtrl=cr3_new_admin_ctrl, + existingNodeId=self.dut_node_id, newNodeId=cr3_dut_node_id + ) + fabric_index_CR3 = nocResp.fabricIndex + # rcac_CR3 = TLVReader(rcacResp).get()["Any"] + + # print(f"Success: {success}, nocResp: {nocResp}, rcacResp: {rcacResp}") + + self.step(3) + cr2_read_fabricIndex = await self.read_single_attribute_check_success(dev_ctrl=cr2_new_admin_ctrl, node_id=cr2_dut_node_id, cluster=opcreds, attribute=opcreds.Attributes.CurrentFabricIndex) + print(f"fabric_index_CR2: {cr2_read_fabricIndex}") + if fabric_index_CR2 == cr2_read_fabricIndex: + print("Success fabric_index_CR2 is equal to read fabricIndex from CR2") + else: + print("Fail fabric_index_CR2 is not equal to read fabricIndex from CR2") + + self.step(4) + cr3_read_fabricIndex = await self.read_single_attribute_check_success(dev_ctrl=cr3_new_admin_ctrl, node_id=cr3_dut_node_id, cluster=opcreds, attribute=opcreds.Attributes.CurrentFabricIndex) + print(f"fabric_index_CR3: {cr3_read_fabricIndex}") + if fabric_index_CR3 == cr3_read_fabricIndex: + print("Success fabric_index_CR3 is equal to read fabricIndex from CR3") + else: + print("Fail fabric_index_CR3 is not equal to read fabricIndex from CR3") + + self.step(5) + cr2_fabric = await self.read_single_attribute_check_success(dev_ctrl=cr2_new_admin_ctrl, node_id=cr2_dut_node_id, cluster=opcreds, attribute=opcreds.Attributes.Fabrics, fabric_filtered=True) + print(f"cr2_fabric: {cr2_fabric}") + + for fabric in cr2_fabric: + cr2_fabric_fabricIndex = fabric.fabricIndex + cr2_fabric_rootPublicKey = fabric.rootPublicKey + cr2_fabric_vendorId = fabric.vendorID + cr2_fabric_fabricId = fabric.fabricID + + asserts.assert_equal(cr2_fabric_fabricIndex, + fabric_index_CR2, "Unexpected CR2 fabric index") + asserts.assert_equal(cr2_fabric_rootPublicKey, rcac_CR2, + "Unexpected RootPublicKey does not match with rcac_CR2") + asserts.assert_equal(cr2_fabric_vendorId, cr2_vid, + "Unexpected vendorId does not match with CR2 VendorID") + asserts.assert_equal(cr2_fabric_fabricId, cr2_fabricId, + "Unexpected fabricId does not match with CR2 fabricID") + + print(f"cr2_fabric_fabricIndex: {cr2_fabric_fabricIndex}, cr2_fabric_rootPublicKey: {cr2_fabric_rootPublicKey}, cr2_fabric_vendorId: {cr2_fabric_vendorId}, cr2_fabric_fabricId: {cr2_fabric_fabricId}") + + self.step(6) + self.step(7) + self.step(8) if __name__ == "__main__":