From 92ae5151cd53b8460fa3c720ecf446a4f60a80d8 Mon Sep 17 00:00:00 2001 From: Gibran Vargas <131407127+gvargas-csa@users.noreply.github.com> Date: Wed, 5 Jun 2024 10:53:19 -0700 Subject: [PATCH] TC-OPCREDS-3.2: Add (#32182) * chore(TC_OPCREDS-3.2): skeleton base class * chore(TC_OPCREDS_3.2): NOCResponse command * chore(TC_OPCREDS_3.2): get currentFabric * chore(TC_OPCREDS_3.2): print some status values * TC-OPCREDS-3.2: Re-work test This test was difficult to automate and included a factory reset. This automation start is used to generate the test plans. The intent of this test is to confirm that the returned fabric index matches the fabric index returned from the AddNOC command, and that the read of fabric-filtered attributes matches to the calling fabric. The rest of the test is covered by OPCREDS-3.1 so was removed to simplify. NOTE: test not yet implemented, uploaded for test plan generation purposes only. - Properly use Test Step and Expected outcome columns - Remove checks for NOCs matching - this is checked in OPCREDS-3.1 - Remove factory reset and commission two new fabrics - Use commissioning over CASE so we can more easily return the fabric index * chore(TC_OPCREDS_3.2): included new steps (1-6) Just need to add validation for CR3 and remove all fabrics. Also it need clean up (remove prints, unused variables, etc) * chore(TC_OPCREDS_3.2): all steps are implemtented * chore(TC_OPCREDS_3.2): restyled by isort * chore(TC_OPCREDS_3_2): fix error from python linter (code-lints) * chore(TC_OPCREDS_3.2): import restyled by isort * chore(TC_OPCREDS_3.2): fixed the vendorID for prevent fail * chore(TC_OPCREDS_3.2): add test to CI Workflow * chore(TC_OPCREDS_3.2): off auto-format and change the return values label * chore(TC_OPCREDS_3.2): remove if-else validation instead used assert_equal * chore(TC_OPCREDS_3.2): spacing adjustment for better readability * chore(TC_OPCREDS_3.2): identation adjustment for better readability * chore(TC_OPCREDS_3.2): restyled patch identation adjustment * chore(TC_OPCREDS_3.2): reverted addition of test --------- Co-authored-by: cecille --- .github/workflows/tests.yaml | 1 + .../chip/utils/CommissioningBuildingBlocks.py | 36 ++-- src/python_testing/TC_OPCREDS_3_2.py | 190 ++++++++++++++++++ src/python_testing/test_plan_support.py | 78 +++++++ 4 files changed, 290 insertions(+), 15 deletions(-) create mode 100644 src/python_testing/TC_OPCREDS_3_2.py create mode 100644 src/python_testing/test_plan_support.py diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 678b7b18195aa0..24895bf5f07e01 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -536,6 +536,7 @@ jobs: scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --script "src/python_testing/TestMatterTestingSupport.py" --script-args "--trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"' scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --script "src/python_testing/TestSpecParsingSupport.py" --script-args "--trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"' scripts/run_in_python_env.sh out/venv './scripts/tests/TestTimeSyncTrustedTimeSourceRunner.py' + scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_OPCREDS_3_2.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"' scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_OPSTATE_2_1.py" --script-args "--endpoint 1 --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"' scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_OPSTATE_2_2.py" --script-args "--endpoint 1 --int-arg PIXIT.WAITTIME.COUNTDOWN:5 --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"' scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_OPSTATE_2_3.py" --script-args "--endpoint 1 --int-arg PIXIT.WAITTIME.COUNTDOWN:5 --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"' diff --git a/src/controller/python/chip/utils/CommissioningBuildingBlocks.py b/src/controller/python/chip/utils/CommissioningBuildingBlocks.py index 8fa80223b14316..4c3826fd6914ee 100644 --- a/src/controller/python/chip/utils/CommissioningBuildingBlocks.py +++ b/src/controller/python/chip/utils/CommissioningBuildingBlocks.py @@ -156,12 +156,14 @@ async def AddNOCForNewFabricFromExisting(commissionerDevCtrl, newFabricDevCtrl, newNodeId (int): Node ID to use for the target node on the new fabric. Return: - bool: True if successful, False otherwise. + tuple: (bool, Optional[nocResp], Optional[rcacResp]: True if successful, False otherwise, along with nocResp, rcacResp value. ''' + nocResp = None + resp = await commissionerDevCtrl.SendCommand(existingNodeId, 0, generalCommissioning.Commands.ArmFailSafe(60)) if resp.errorCode is not generalCommissioning.Enums.CommissioningErrorEnum.kOk: - return False + return False, nocResp csrForAddNOC = await commissionerDevCtrl.SendCommand(existingNodeId, 0, opCreds.Commands.CSRRequest(CSRNonce=os.urandom(32))) @@ -171,31 +173,35 @@ async def AddNOCForNewFabricFromExisting(commissionerDevCtrl, newFabricDevCtrl, chainForAddNOC.nocBytes is None or chainForAddNOC.ipkBytes is None): # Expiring the failsafe timer in an attempt to clean up. await commissionerDevCtrl.SendCommand(existingNodeId, 0, generalCommissioning.Commands.ArmFailSafe(0)) - return False + return False, nocResp await commissionerDevCtrl.SendCommand(existingNodeId, 0, opCreds.Commands.AddTrustedRootCertificate(chainForAddNOC.rcacBytes)) - resp = await commissionerDevCtrl.SendCommand(existingNodeId, - 0, - opCreds.Commands.AddNOC(chainForAddNOC.nocBytes, - chainForAddNOC.icacBytes, - chainForAddNOC.ipkBytes, - newFabricDevCtrl.nodeId, - newFabricDevCtrl.fabricAdmin.vendorId)) - if resp.statusCode is not opCreds.Enums.NodeOperationalCertStatusEnum.kOk: + nocResp = await commissionerDevCtrl.SendCommand(existingNodeId, + 0, + opCreds.Commands.AddNOC(chainForAddNOC.nocBytes, + chainForAddNOC.icacBytes, + chainForAddNOC.ipkBytes, + newFabricDevCtrl.nodeId, + newFabricDevCtrl.fabricAdmin.vendorId)) + + rcacResp = chainForAddNOC.rcacBytes + + if nocResp.statusCode is not opCreds.Enums.NodeOperationalCertStatusEnum.kOk: # Expiring the failsafe timer in an attempt to clean up. await commissionerDevCtrl.SendCommand(existingNodeId, 0, generalCommissioning.Commands.ArmFailSafe(0)) - return False + return False, nocResp resp = await newFabricDevCtrl.SendCommand(newNodeId, 0, generalCommissioning.Commands.CommissioningComplete()) + if resp.errorCode is not generalCommissioning.Enums.CommissioningErrorEnum.kOk: # Expiring the failsafe timer in an attempt to clean up. await commissionerDevCtrl.SendCommand(existingNodeId, 0, generalCommissioning.Commands.ArmFailSafe(0)) - return False + return False, nocResp if not await _IsNodeInFabricList(newFabricDevCtrl, newNodeId): - return False + return False, nocResp - return True + return True, nocResp, rcacResp async def UpdateNOC(devCtrl, existingNodeId, newNodeId): diff --git a/src/python_testing/TC_OPCREDS_3_2.py b/src/python_testing/TC_OPCREDS_3_2.py new file mode 100644 index 00000000000000..5b7cd5bd59480e --- /dev/null +++ b/src/python_testing/TC_OPCREDS_3_2.py @@ -0,0 +1,190 @@ +# +# Copyright (c) 2024 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import chip.clusters as Clusters +from chip.tlv import TLVReader +from chip.utils import CommissioningBuildingBlocks +from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main +from mobly import asserts +from test_plan_support import (commission_from_existing, commission_if_required, read_attribute, remove_fabric, + verify_commissioning_successful, verify_success) + + +def verify_fabric(controller: str) -> str: + return (f"- Verify there is one entry returned. Verify FabricIndex matches `fabric_index_{controller}`.\n" + f"- Verify the RootPublicKey matches the public key for rcac_{controller}.\n" + f"- Verify the VendorID matches the vendor ID for {controller}.\n" + f"- Verify the FabricID matches the fabricID for {controller}") + + +class TC_OPCREDS_3_2(MatterBaseTest): + def desc_TC_OPCREDS_3_2(self): + return " Attribute-CurrentFabricIndex validation [DUTServer]" + + def steps_TC_OPCREDS_3_2(self): + return [TestStep(0, commission_if_required('CR1'), is_commissioning=True), + TestStep(1, f"{commission_from_existing('CR1', 'CR2')}\n. Save the FabricIndex from the NOCResponse as `fabric_index_CR2`.", + verify_commissioning_successful()), + TestStep(2, f"{commission_from_existing('CR1', 'CR3')}\n. Save the FabricIndex from the NOCResponse as `fabric_index_CR3`.", + verify_commissioning_successful()), + TestStep(3, f"CR2 {read_attribute('CurrentFabricIndex')}", + "Verify the returned value is `fabric_index_CR2`"), + TestStep(4, f"CR3 {read_attribute('CurrentFabricIndex')}", + "Verify the returned value is `fabric_index_CR3`"), + TestStep( + 5, f"CR2 {read_attribute('Fabrics')} using a fabric-filtered read", verify_fabric('CR2')), + TestStep( + 6, f"CR3 {read_attribute('Fabrics')} using a fabric-filtered read", verify_fabric('CR3')), + TestStep(7, remove_fabric( + 'fabric_index_CR2', 'CR1'), verify_success()), + TestStep(8, remove_fabric( + 'fabric_index_CR3', 'CR1'), verify_success()), + ] + + @async_test_body + async def test_TC_OPCREDS_3_2(self): + opcreds = Clusters.OperationalCredentials + + self.step(0) + + self.step(1) + dev_ctrl = self.default_controller + + new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority() + cr2_vid = 0xFFF2 + cr2_fabricId = 2222 + cr2_new_fabric_admin = new_certificate_authority.NewFabricAdmin( + vendorId=cr2_vid, fabricId=cr2_fabricId) + cr2_nodeid = self.default_controller.nodeId+1 + cr2_dut_node_id = self.dut_node_id+1 + + cr2_new_admin_ctrl = cr2_new_fabric_admin.NewController( + nodeId=cr2_nodeid) + success, nocResp, rcacResp = await CommissioningBuildingBlocks.AddNOCForNewFabricFromExisting( + commissionerDevCtrl=dev_ctrl, newFabricDevCtrl=cr2_new_admin_ctrl, + existingNodeId=self.dut_node_id, newNodeId=cr2_dut_node_id + ) + + fabric_index_CR2 = nocResp.fabricIndex + tlvReaderRCAC_CR2 = TLVReader(rcacResp).get()["Any"] + rcac_CR2 = tlvReaderRCAC_CR2[9] # public key is field 9 + + self.step(2) + new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority() + cr3_vid = 0xFFF3 + cr3_fabricId = 3333 + cr3_new_fabric_admin = new_certificate_authority.NewFabricAdmin( + vendorId=cr3_vid, fabricId=cr3_fabricId) + cr3_nodeid = self.default_controller.nodeId+2 + cr3_dut_node_id = self.dut_node_id+2 + + cr3_new_admin_ctrl = cr3_new_fabric_admin.NewController( + nodeId=cr3_nodeid) + success, nocResp, rcacResp = await CommissioningBuildingBlocks.AddNOCForNewFabricFromExisting( + commissionerDevCtrl=dev_ctrl, newFabricDevCtrl=cr3_new_admin_ctrl, + existingNodeId=self.dut_node_id, newNodeId=cr3_dut_node_id + ) + + fabric_index_CR3 = nocResp.fabricIndex + tlvReaderRCAC_CR3 = TLVReader(rcacResp).get()["Any"] + rcac_CR3 = tlvReaderRCAC_CR3[9] + + self.step(3) + cr2_read_fabricIndex = await self.read_single_attribute_check_success( + dev_ctrl=cr2_new_admin_ctrl, + node_id=cr2_dut_node_id, + cluster=opcreds, + attribute=opcreds.Attributes.CurrentFabricIndex + ) + + asserts.assert_equal(fabric_index_CR2, cr2_read_fabricIndex, + "Fail fabric_index_CR2 is not equal to read fabricIndex from CR2") + + self.step(4) + cr3_read_fabricIndex = await self.read_single_attribute_check_success( + dev_ctrl=cr3_new_admin_ctrl, + node_id=cr3_dut_node_id, + cluster=opcreds, + attribute=opcreds.Attributes.CurrentFabricIndex + ) + + asserts.assert_equal(fabric_index_CR3, cr3_read_fabricIndex, + "Fail fabric_index_CR3 is not equal to read fabricIndex from CR3") + + self.step(5) + cr2_fabric = await self.read_single_attribute_check_success( + dev_ctrl=cr2_new_admin_ctrl, + node_id=cr2_dut_node_id, + cluster=opcreds, + attribute=opcreds.Attributes.Fabrics, + fabric_filtered=True + ) + + for fabric in cr2_fabric: + cr2_fabric_fabricIndex = fabric.fabricIndex + cr2_fabric_rootPublicKey = fabric.rootPublicKey + cr2_fabric_vendorId = fabric.vendorID + cr2_fabric_fabricId = fabric.fabricID + + asserts.assert_equal(cr2_fabric_fabricIndex, + fabric_index_CR2, "Unexpected CR2 fabric index") + asserts.assert_equal(cr2_fabric_rootPublicKey, rcac_CR2, + "Unexpected RootPublicKey does not match with rcac_CR2") + asserts.assert_equal(cr2_fabric_vendorId, cr2_vid, + "Unexpected vendorId does not match with CR2 VendorID") + asserts.assert_equal(cr2_fabric_fabricId, cr2_fabricId, + "Unexpected fabricId does not match with CR2 fabricID") + + self.step(6) + cr3_fabric = await self.read_single_attribute_check_success( + dev_ctrl=cr3_new_admin_ctrl, + node_id=cr3_dut_node_id, + cluster=opcreds, + attribute=opcreds.Attributes.Fabrics, + fabric_filtered=True + ) + + for fabric in cr3_fabric: + cr3_fabric_fabricIndex = fabric.fabricIndex + cr3_fabric_rootPublicKey = fabric.rootPublicKey + cr3_fabric_vendorId = fabric.vendorID + cr3_fabric_fabricId = fabric.fabricID + + asserts.assert_equal(cr3_fabric_fabricIndex, + fabric_index_CR3, "Unexpected CR3 fabric index") + asserts.assert_equal(cr3_fabric_rootPublicKey, rcac_CR3, + "Unexpected RootPublicKey does not match with rcac_CR3") + asserts.assert_equal(cr3_fabric_vendorId, cr3_vid, + "Unexpected vendorId does not match with CR3 VendorID") + asserts.assert_equal(cr3_fabric_fabricId, cr3_fabricId, + "Unexpected fabricId does not match with CR3 fabricID") + + self.step(7) + cmd = opcreds.Commands.RemoveFabric(fabric_index_CR2) + resp = await self.send_single_cmd(cmd=cmd) + asserts.assert_equal( + resp.statusCode, opcreds.Enums.NodeOperationalCertStatusEnum.kOk) + + self.step(8) + cmd = opcreds.Commands.RemoveFabric(fabric_index_CR3) + resp = await self.send_single_cmd(cmd=cmd) + asserts.assert_equal( + resp.statusCode, opcreds.Enums.NodeOperationalCertStatusEnum.kOk) + + +if __name__ == "__main__": + default_matter_test_main() diff --git a/src/python_testing/test_plan_support.py b/src/python_testing/test_plan_support.py new file mode 100644 index 00000000000000..1acb8576bdb15c --- /dev/null +++ b/src/python_testing/test_plan_support.py @@ -0,0 +1,78 @@ +# +# Copyright (c) 2024 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import typing + + +def read_attribute(attribute: str, cluster: typing.Optional[str] = None): + attr = f'reads the {attribute} attribute' + if cluster: + return f'{attr} from {cluster}' + else: + return attr + + +def save_as(val: str) -> str: + return f' and saves the value as {val}' + + +def verify_status(status: str) -> str: + return f'Verify DUT responds w/ status {status}' + + +def verify_success() -> str: + return verify_status('SUCCESS') + +# ----------------------- +# Commissioning strings +# ----------------------- + + +def commission_if_required(controller: typing.Optional[str] = None) -> str: + controller_str = f'to {controller} ' if controller is not None else '' + return f'Commission DUT {controller_str}if not already done' + + +def commission_from_existing(existing_controller_name: str, new_controller_name: str) -> str: + # NOTE to implementers: This text corresponds to the actions taken by CommissioningBuildingBlocks.AddNOCForNewFabricFromExisting. + # This function should be used in the TestSteps description when you use that function. + # AddNOCForNewFabricFromExisting is used when the generated certificates are required for use in the test. + # It written one step so we can just use the function directly without needing to annotate the sub-steps for the TH. + return (f'Create a new controller on a new fabric called {new_controller_name}.\n' + f'Commission the new controller from {existing_controller_name} as follows:\n\n' + f'- {existing_controller_name} sends an ArmFailsafe command, followed by a CSRRequest command.\n' + f'- Generate credentials on {new_controller_name} using the returned CSR.\n' + f'- Save the RCAC as `rcac_{new_controller_name}. Save the ICAC as `icac_{new_controller_name}`. Save the NOC as `noc_{new_controller_name}`. Save the IPK as ipk_{new_controller_name}.\n' + f'- {existing_controller_name} sends the AddTrustedRootCertificate command with `rcac_{new_controller_name}`' + f'- {existing_controller_name} sends the AddNOC command with the fields set as follows:\n' + f' * NOCValue: `noc_{new_controller_name}`\n' + f' * ICACValue: `icac_{new_controller_name}`\n' + f' * IPKValue: `ipk_{new_controller_name}`\n' + f' * CaseAdminSubject: {new_controller_name} node ID\n' + f' * AdminVendorId: {new_controller_name} vendor ID\n' + f'- {new_controller_name} connects over CASE and sends the commissioning complete command') + + +def open_commissioning_window(controller: str = 'TH') -> str: + return f'{controller} opens a commissioning window on the DUT' + + +def remove_fabric(index_var: str, controller: str): + return f'{controller} sends the RemoveFabric command to the Node Operational Credentials cluster with the FabricIndex set to {index_var}.' + + +def verify_commissioning_successful() -> str: + return 'Verify the commissioning is successful.'