diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 678b7b18195aa0..24895bf5f07e01 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -536,6 +536,7 @@ jobs: scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --script "src/python_testing/TestMatterTestingSupport.py" --script-args "--trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"' scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --script "src/python_testing/TestSpecParsingSupport.py" --script-args "--trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"' scripts/run_in_python_env.sh out/venv './scripts/tests/TestTimeSyncTrustedTimeSourceRunner.py' + scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_OPCREDS_3_2.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"' scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_OPSTATE_2_1.py" --script-args "--endpoint 1 --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"' scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_OPSTATE_2_2.py" --script-args "--endpoint 1 --int-arg PIXIT.WAITTIME.COUNTDOWN:5 --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"' scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_OPSTATE_2_3.py" --script-args "--endpoint 1 --int-arg PIXIT.WAITTIME.COUNTDOWN:5 --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"' diff --git a/src/controller/python/chip/utils/CommissioningBuildingBlocks.py b/src/controller/python/chip/utils/CommissioningBuildingBlocks.py index 8fa80223b14316..4c3826fd6914ee 100644 --- a/src/controller/python/chip/utils/CommissioningBuildingBlocks.py +++ b/src/controller/python/chip/utils/CommissioningBuildingBlocks.py @@ -156,12 +156,14 @@ async def AddNOCForNewFabricFromExisting(commissionerDevCtrl, newFabricDevCtrl, newNodeId (int): Node ID to use for the target node on the new fabric. Return: - bool: True if successful, False otherwise. + tuple: (bool, Optional[nocResp], Optional[rcacResp]: True if successful, False otherwise, along with nocResp, rcacResp value. ''' + nocResp = None + resp = await commissionerDevCtrl.SendCommand(existingNodeId, 0, generalCommissioning.Commands.ArmFailSafe(60)) if resp.errorCode is not generalCommissioning.Enums.CommissioningErrorEnum.kOk: - return False + return False, nocResp csrForAddNOC = await commissionerDevCtrl.SendCommand(existingNodeId, 0, opCreds.Commands.CSRRequest(CSRNonce=os.urandom(32))) @@ -171,31 +173,35 @@ async def AddNOCForNewFabricFromExisting(commissionerDevCtrl, newFabricDevCtrl, chainForAddNOC.nocBytes is None or chainForAddNOC.ipkBytes is None): # Expiring the failsafe timer in an attempt to clean up. await commissionerDevCtrl.SendCommand(existingNodeId, 0, generalCommissioning.Commands.ArmFailSafe(0)) - return False + return False, nocResp await commissionerDevCtrl.SendCommand(existingNodeId, 0, opCreds.Commands.AddTrustedRootCertificate(chainForAddNOC.rcacBytes)) - resp = await commissionerDevCtrl.SendCommand(existingNodeId, - 0, - opCreds.Commands.AddNOC(chainForAddNOC.nocBytes, - chainForAddNOC.icacBytes, - chainForAddNOC.ipkBytes, - newFabricDevCtrl.nodeId, - newFabricDevCtrl.fabricAdmin.vendorId)) - if resp.statusCode is not opCreds.Enums.NodeOperationalCertStatusEnum.kOk: + nocResp = await commissionerDevCtrl.SendCommand(existingNodeId, + 0, + opCreds.Commands.AddNOC(chainForAddNOC.nocBytes, + chainForAddNOC.icacBytes, + chainForAddNOC.ipkBytes, + newFabricDevCtrl.nodeId, + newFabricDevCtrl.fabricAdmin.vendorId)) + + rcacResp = chainForAddNOC.rcacBytes + + if nocResp.statusCode is not opCreds.Enums.NodeOperationalCertStatusEnum.kOk: # Expiring the failsafe timer in an attempt to clean up. await commissionerDevCtrl.SendCommand(existingNodeId, 0, generalCommissioning.Commands.ArmFailSafe(0)) - return False + return False, nocResp resp = await newFabricDevCtrl.SendCommand(newNodeId, 0, generalCommissioning.Commands.CommissioningComplete()) + if resp.errorCode is not generalCommissioning.Enums.CommissioningErrorEnum.kOk: # Expiring the failsafe timer in an attempt to clean up. await commissionerDevCtrl.SendCommand(existingNodeId, 0, generalCommissioning.Commands.ArmFailSafe(0)) - return False + return False, nocResp if not await _IsNodeInFabricList(newFabricDevCtrl, newNodeId): - return False + return False, nocResp - return True + return True, nocResp, rcacResp async def UpdateNOC(devCtrl, existingNodeId, newNodeId): diff --git a/src/python_testing/TC_OPCREDS_3_2.py b/src/python_testing/TC_OPCREDS_3_2.py new file mode 100644 index 00000000000000..5b7cd5bd59480e --- /dev/null +++ b/src/python_testing/TC_OPCREDS_3_2.py @@ -0,0 +1,190 @@ +# +# Copyright (c) 2024 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import chip.clusters as Clusters +from chip.tlv import TLVReader +from chip.utils import CommissioningBuildingBlocks +from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main +from mobly import asserts +from test_plan_support import (commission_from_existing, commission_if_required, read_attribute, remove_fabric, + verify_commissioning_successful, verify_success) + + +def verify_fabric(controller: str) -> str: + return (f"- Verify there is one entry returned. Verify FabricIndex matches `fabric_index_{controller}`.\n" + f"- Verify the RootPublicKey matches the public key for rcac_{controller}.\n" + f"- Verify the VendorID matches the vendor ID for {controller}.\n" + f"- Verify the FabricID matches the fabricID for {controller}") + + +class TC_OPCREDS_3_2(MatterBaseTest): + def desc_TC_OPCREDS_3_2(self): + return " Attribute-CurrentFabricIndex validation [DUTServer]" + + def steps_TC_OPCREDS_3_2(self): + return [TestStep(0, commission_if_required('CR1'), is_commissioning=True), + TestStep(1, f"{commission_from_existing('CR1', 'CR2')}\n. Save the FabricIndex from the NOCResponse as `fabric_index_CR2`.", + verify_commissioning_successful()), + TestStep(2, f"{commission_from_existing('CR1', 'CR3')}\n. Save the FabricIndex from the NOCResponse as `fabric_index_CR3`.", + verify_commissioning_successful()), + TestStep(3, f"CR2 {read_attribute('CurrentFabricIndex')}", + "Verify the returned value is `fabric_index_CR2`"), + TestStep(4, f"CR3 {read_attribute('CurrentFabricIndex')}", + "Verify the returned value is `fabric_index_CR3`"), + TestStep( + 5, f"CR2 {read_attribute('Fabrics')} using a fabric-filtered read", verify_fabric('CR2')), + TestStep( + 6, f"CR3 {read_attribute('Fabrics')} using a fabric-filtered read", verify_fabric('CR3')), + TestStep(7, remove_fabric( + 'fabric_index_CR2', 'CR1'), verify_success()), + TestStep(8, remove_fabric( + 'fabric_index_CR3', 'CR1'), verify_success()), + ] + + @async_test_body + async def test_TC_OPCREDS_3_2(self): + opcreds = Clusters.OperationalCredentials + + self.step(0) + + self.step(1) + dev_ctrl = self.default_controller + + new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority() + cr2_vid = 0xFFF2 + cr2_fabricId = 2222 + cr2_new_fabric_admin = new_certificate_authority.NewFabricAdmin( + vendorId=cr2_vid, fabricId=cr2_fabricId) + cr2_nodeid = self.default_controller.nodeId+1 + cr2_dut_node_id = self.dut_node_id+1 + + cr2_new_admin_ctrl = cr2_new_fabric_admin.NewController( + nodeId=cr2_nodeid) + success, nocResp, rcacResp = await CommissioningBuildingBlocks.AddNOCForNewFabricFromExisting( + commissionerDevCtrl=dev_ctrl, newFabricDevCtrl=cr2_new_admin_ctrl, + existingNodeId=self.dut_node_id, newNodeId=cr2_dut_node_id + ) + + fabric_index_CR2 = nocResp.fabricIndex + tlvReaderRCAC_CR2 = TLVReader(rcacResp).get()["Any"] + rcac_CR2 = tlvReaderRCAC_CR2[9] # public key is field 9 + + self.step(2) + new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority() + cr3_vid = 0xFFF3 + cr3_fabricId = 3333 + cr3_new_fabric_admin = new_certificate_authority.NewFabricAdmin( + vendorId=cr3_vid, fabricId=cr3_fabricId) + cr3_nodeid = self.default_controller.nodeId+2 + cr3_dut_node_id = self.dut_node_id+2 + + cr3_new_admin_ctrl = cr3_new_fabric_admin.NewController( + nodeId=cr3_nodeid) + success, nocResp, rcacResp = await CommissioningBuildingBlocks.AddNOCForNewFabricFromExisting( + commissionerDevCtrl=dev_ctrl, newFabricDevCtrl=cr3_new_admin_ctrl, + existingNodeId=self.dut_node_id, newNodeId=cr3_dut_node_id + ) + + fabric_index_CR3 = nocResp.fabricIndex + tlvReaderRCAC_CR3 = TLVReader(rcacResp).get()["Any"] + rcac_CR3 = tlvReaderRCAC_CR3[9] + + self.step(3) + cr2_read_fabricIndex = await self.read_single_attribute_check_success( + dev_ctrl=cr2_new_admin_ctrl, + node_id=cr2_dut_node_id, + cluster=opcreds, + attribute=opcreds.Attributes.CurrentFabricIndex + ) + + asserts.assert_equal(fabric_index_CR2, cr2_read_fabricIndex, + "Fail fabric_index_CR2 is not equal to read fabricIndex from CR2") + + self.step(4) + cr3_read_fabricIndex = await self.read_single_attribute_check_success( + dev_ctrl=cr3_new_admin_ctrl, + node_id=cr3_dut_node_id, + cluster=opcreds, + attribute=opcreds.Attributes.CurrentFabricIndex + ) + + asserts.assert_equal(fabric_index_CR3, cr3_read_fabricIndex, + "Fail fabric_index_CR3 is not equal to read fabricIndex from CR3") + + self.step(5) + cr2_fabric = await self.read_single_attribute_check_success( + dev_ctrl=cr2_new_admin_ctrl, + node_id=cr2_dut_node_id, + cluster=opcreds, + attribute=opcreds.Attributes.Fabrics, + fabric_filtered=True + ) + + for fabric in cr2_fabric: + cr2_fabric_fabricIndex = fabric.fabricIndex + cr2_fabric_rootPublicKey = fabric.rootPublicKey + cr2_fabric_vendorId = fabric.vendorID + cr2_fabric_fabricId = fabric.fabricID + + asserts.assert_equal(cr2_fabric_fabricIndex, + fabric_index_CR2, "Unexpected CR2 fabric index") + asserts.assert_equal(cr2_fabric_rootPublicKey, rcac_CR2, + "Unexpected RootPublicKey does not match with rcac_CR2") + asserts.assert_equal(cr2_fabric_vendorId, cr2_vid, + "Unexpected vendorId does not match with CR2 VendorID") + asserts.assert_equal(cr2_fabric_fabricId, cr2_fabricId, + "Unexpected fabricId does not match with CR2 fabricID") + + self.step(6) + cr3_fabric = await self.read_single_attribute_check_success( + dev_ctrl=cr3_new_admin_ctrl, + node_id=cr3_dut_node_id, + cluster=opcreds, + attribute=opcreds.Attributes.Fabrics, + fabric_filtered=True + ) + + for fabric in cr3_fabric: + cr3_fabric_fabricIndex = fabric.fabricIndex + cr3_fabric_rootPublicKey = fabric.rootPublicKey + cr3_fabric_vendorId = fabric.vendorID + cr3_fabric_fabricId = fabric.fabricID + + asserts.assert_equal(cr3_fabric_fabricIndex, + fabric_index_CR3, "Unexpected CR3 fabric index") + asserts.assert_equal(cr3_fabric_rootPublicKey, rcac_CR3, + "Unexpected RootPublicKey does not match with rcac_CR3") + asserts.assert_equal(cr3_fabric_vendorId, cr3_vid, + "Unexpected vendorId does not match with CR3 VendorID") + asserts.assert_equal(cr3_fabric_fabricId, cr3_fabricId, + "Unexpected fabricId does not match with CR3 fabricID") + + self.step(7) + cmd = opcreds.Commands.RemoveFabric(fabric_index_CR2) + resp = await self.send_single_cmd(cmd=cmd) + asserts.assert_equal( + resp.statusCode, opcreds.Enums.NodeOperationalCertStatusEnum.kOk) + + self.step(8) + cmd = opcreds.Commands.RemoveFabric(fabric_index_CR3) + resp = await self.send_single_cmd(cmd=cmd) + asserts.assert_equal( + resp.statusCode, opcreds.Enums.NodeOperationalCertStatusEnum.kOk) + + +if __name__ == "__main__": + default_matter_test_main() diff --git a/src/python_testing/test_plan_support.py b/src/python_testing/test_plan_support.py new file mode 100644 index 00000000000000..1acb8576bdb15c --- /dev/null +++ b/src/python_testing/test_plan_support.py @@ -0,0 +1,78 @@ +# +# Copyright (c) 2024 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import typing + + +def read_attribute(attribute: str, cluster: typing.Optional[str] = None): + attr = f'reads the {attribute} attribute' + if cluster: + return f'{attr} from {cluster}' + else: + return attr + + +def save_as(val: str) -> str: + return f' and saves the value as {val}' + + +def verify_status(status: str) -> str: + return f'Verify DUT responds w/ status {status}' + + +def verify_success() -> str: + return verify_status('SUCCESS') + +# ----------------------- +# Commissioning strings +# ----------------------- + + +def commission_if_required(controller: typing.Optional[str] = None) -> str: + controller_str = f'to {controller} ' if controller is not None else '' + return f'Commission DUT {controller_str}if not already done' + + +def commission_from_existing(existing_controller_name: str, new_controller_name: str) -> str: + # NOTE to implementers: This text corresponds to the actions taken by CommissioningBuildingBlocks.AddNOCForNewFabricFromExisting. + # This function should be used in the TestSteps description when you use that function. + # AddNOCForNewFabricFromExisting is used when the generated certificates are required for use in the test. + # It written one step so we can just use the function directly without needing to annotate the sub-steps for the TH. + return (f'Create a new controller on a new fabric called {new_controller_name}.\n' + f'Commission the new controller from {existing_controller_name} as follows:\n\n' + f'- {existing_controller_name} sends an ArmFailsafe command, followed by a CSRRequest command.\n' + f'- Generate credentials on {new_controller_name} using the returned CSR.\n' + f'- Save the RCAC as `rcac_{new_controller_name}. Save the ICAC as `icac_{new_controller_name}`. Save the NOC as `noc_{new_controller_name}`. Save the IPK as ipk_{new_controller_name}.\n' + f'- {existing_controller_name} sends the AddTrustedRootCertificate command with `rcac_{new_controller_name}`' + f'- {existing_controller_name} sends the AddNOC command with the fields set as follows:\n' + f' * NOCValue: `noc_{new_controller_name}`\n' + f' * ICACValue: `icac_{new_controller_name}`\n' + f' * IPKValue: `ipk_{new_controller_name}`\n' + f' * CaseAdminSubject: {new_controller_name} node ID\n' + f' * AdminVendorId: {new_controller_name} vendor ID\n' + f'- {new_controller_name} connects over CASE and sends the commissioning complete command') + + +def open_commissioning_window(controller: str = 'TH') -> str: + return f'{controller} opens a commissioning window on the DUT' + + +def remove_fabric(index_var: str, controller: str): + return f'{controller} sends the RemoveFabric command to the Node Operational Credentials cluster with the FabricIndex set to {index_var}.' + + +def verify_commissioning_successful() -> str: + return 'Verify the commissioning is successful.'