Skip to content

Commit

Permalink
TC-OPCREDS-3.2: Add (#32182)
Browse files Browse the repository at this point in the history
* chore(TC_OPCREDS-3.2): skeleton base class

* chore(TC_OPCREDS_3.2): NOCResponse command

* chore(TC_OPCREDS_3.2): get currentFabric

* chore(TC_OPCREDS_3.2): print some status values

* TC-OPCREDS-3.2: Re-work test

This test was difficult to automate and included a factory reset.
This automation start is used to generate the test plans.

The intent of this test is to confirm that the returned fabric
index matches the fabric index returned from the AddNOC command,
and that the read of fabric-filtered attributes matches to the
calling fabric. The rest of the test is covered by OPCREDS-3.1
so was removed to simplify.

NOTE: test not yet implemented, uploaded for test plan generation
      purposes only.

- Properly use Test Step and Expected outcome columns
- Remove checks for NOCs matching - this is checked in OPCREDS-3.1
- Remove factory reset and commission two new fabrics
- Use commissioning over CASE so we can more easily return the
  fabric index

* chore(TC_OPCREDS_3.2): included new steps (1-6)

Just need to add validation for CR3 and remove all fabrics. Also
it need clean up (remove prints, unused variables, etc)

* chore(TC_OPCREDS_3.2): all steps are implemtented

* chore(TC_OPCREDS_3.2): restyled by isort

* chore(TC_OPCREDS_3_2): fix error from python linter (code-lints)

* chore(TC_OPCREDS_3.2): import restyled by isort

* chore(TC_OPCREDS_3.2): fixed the vendorID for prevent fail

* chore(TC_OPCREDS_3.2): add test to CI Workflow

* chore(TC_OPCREDS_3.2): off auto-format and change the return values label

* chore(TC_OPCREDS_3.2): remove if-else validation instead used assert_equal

* chore(TC_OPCREDS_3.2): spacing adjustment for better readability

* chore(TC_OPCREDS_3.2): identation adjustment for better readability

* chore(TC_OPCREDS_3.2): restyled patch identation adjustment

* chore(TC_OPCREDS_3.2): reverted addition of test

---------

Co-authored-by: cecille <[email protected]>
  • Loading branch information
2 people authored and pull[bot] committed Jun 12, 2024
1 parent df67aa4 commit 92ae515
Show file tree
Hide file tree
Showing 4 changed files with 290 additions and 15 deletions.
1 change: 1 addition & 0 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ jobs:
scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --script "src/python_testing/TestMatterTestingSupport.py" --script-args "--trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"'
scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --script "src/python_testing/TestSpecParsingSupport.py" --script-args "--trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"'
scripts/run_in_python_env.sh out/venv './scripts/tests/TestTimeSyncTrustedTimeSourceRunner.py'
scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_OPCREDS_3_2.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"'
scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_OPSTATE_2_1.py" --script-args "--endpoint 1 --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"'
scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_OPSTATE_2_2.py" --script-args "--endpoint 1 --int-arg PIXIT.WAITTIME.COUNTDOWN:5 --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"'
scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_OPSTATE_2_3.py" --script-args "--endpoint 1 --int-arg PIXIT.WAITTIME.COUNTDOWN:5 --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"'
Expand Down
36 changes: 21 additions & 15 deletions src/controller/python/chip/utils/CommissioningBuildingBlocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,14 @@ async def AddNOCForNewFabricFromExisting(commissionerDevCtrl, newFabricDevCtrl,
newNodeId (int): Node ID to use for the target node on the new fabric.
Return:
bool: True if successful, False otherwise.
tuple: (bool, Optional[nocResp], Optional[rcacResp]: True if successful, False otherwise, along with nocResp, rcacResp value.
'''
nocResp = None

resp = await commissionerDevCtrl.SendCommand(existingNodeId, 0, generalCommissioning.Commands.ArmFailSafe(60))
if resp.errorCode is not generalCommissioning.Enums.CommissioningErrorEnum.kOk:
return False
return False, nocResp

csrForAddNOC = await commissionerDevCtrl.SendCommand(existingNodeId, 0, opCreds.Commands.CSRRequest(CSRNonce=os.urandom(32)))

Expand All @@ -171,31 +173,35 @@ async def AddNOCForNewFabricFromExisting(commissionerDevCtrl, newFabricDevCtrl,
chainForAddNOC.nocBytes is None or chainForAddNOC.ipkBytes is None):
# Expiring the failsafe timer in an attempt to clean up.
await commissionerDevCtrl.SendCommand(existingNodeId, 0, generalCommissioning.Commands.ArmFailSafe(0))
return False
return False, nocResp

await commissionerDevCtrl.SendCommand(existingNodeId, 0, opCreds.Commands.AddTrustedRootCertificate(chainForAddNOC.rcacBytes))
resp = await commissionerDevCtrl.SendCommand(existingNodeId,
0,
opCreds.Commands.AddNOC(chainForAddNOC.nocBytes,
chainForAddNOC.icacBytes,
chainForAddNOC.ipkBytes,
newFabricDevCtrl.nodeId,
newFabricDevCtrl.fabricAdmin.vendorId))
if resp.statusCode is not opCreds.Enums.NodeOperationalCertStatusEnum.kOk:
nocResp = await commissionerDevCtrl.SendCommand(existingNodeId,
0,
opCreds.Commands.AddNOC(chainForAddNOC.nocBytes,
chainForAddNOC.icacBytes,
chainForAddNOC.ipkBytes,
newFabricDevCtrl.nodeId,
newFabricDevCtrl.fabricAdmin.vendorId))

rcacResp = chainForAddNOC.rcacBytes

if nocResp.statusCode is not opCreds.Enums.NodeOperationalCertStatusEnum.kOk:
# Expiring the failsafe timer in an attempt to clean up.
await commissionerDevCtrl.SendCommand(existingNodeId, 0, generalCommissioning.Commands.ArmFailSafe(0))
return False
return False, nocResp

resp = await newFabricDevCtrl.SendCommand(newNodeId, 0, generalCommissioning.Commands.CommissioningComplete())

if resp.errorCode is not generalCommissioning.Enums.CommissioningErrorEnum.kOk:
# Expiring the failsafe timer in an attempt to clean up.
await commissionerDevCtrl.SendCommand(existingNodeId, 0, generalCommissioning.Commands.ArmFailSafe(0))
return False
return False, nocResp

if not await _IsNodeInFabricList(newFabricDevCtrl, newNodeId):
return False
return False, nocResp

return True
return True, nocResp, rcacResp


async def UpdateNOC(devCtrl, existingNodeId, newNodeId):
Expand Down
190 changes: 190 additions & 0 deletions src/python_testing/TC_OPCREDS_3_2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
#
# Copyright (c) 2024 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import chip.clusters as Clusters
from chip.tlv import TLVReader
from chip.utils import CommissioningBuildingBlocks
from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main
from mobly import asserts
from test_plan_support import (commission_from_existing, commission_if_required, read_attribute, remove_fabric,
verify_commissioning_successful, verify_success)


def verify_fabric(controller: str) -> str:
return (f"- Verify there is one entry returned. Verify FabricIndex matches `fabric_index_{controller}`.\n"
f"- Verify the RootPublicKey matches the public key for rcac_{controller}.\n"
f"- Verify the VendorID matches the vendor ID for {controller}.\n"
f"- Verify the FabricID matches the fabricID for {controller}")


class TC_OPCREDS_3_2(MatterBaseTest):
def desc_TC_OPCREDS_3_2(self):
return " Attribute-CurrentFabricIndex validation [DUTServer]"

def steps_TC_OPCREDS_3_2(self):
return [TestStep(0, commission_if_required('CR1'), is_commissioning=True),
TestStep(1, f"{commission_from_existing('CR1', 'CR2')}\n. Save the FabricIndex from the NOCResponse as `fabric_index_CR2`.",
verify_commissioning_successful()),
TestStep(2, f"{commission_from_existing('CR1', 'CR3')}\n. Save the FabricIndex from the NOCResponse as `fabric_index_CR3`.",
verify_commissioning_successful()),
TestStep(3, f"CR2 {read_attribute('CurrentFabricIndex')}",
"Verify the returned value is `fabric_index_CR2`"),
TestStep(4, f"CR3 {read_attribute('CurrentFabricIndex')}",
"Verify the returned value is `fabric_index_CR3`"),
TestStep(
5, f"CR2 {read_attribute('Fabrics')} using a fabric-filtered read", verify_fabric('CR2')),
TestStep(
6, f"CR3 {read_attribute('Fabrics')} using a fabric-filtered read", verify_fabric('CR3')),
TestStep(7, remove_fabric(
'fabric_index_CR2', 'CR1'), verify_success()),
TestStep(8, remove_fabric(
'fabric_index_CR3', 'CR1'), verify_success()),
]

@async_test_body
async def test_TC_OPCREDS_3_2(self):
opcreds = Clusters.OperationalCredentials

self.step(0)

self.step(1)
dev_ctrl = self.default_controller

new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority()
cr2_vid = 0xFFF2
cr2_fabricId = 2222
cr2_new_fabric_admin = new_certificate_authority.NewFabricAdmin(
vendorId=cr2_vid, fabricId=cr2_fabricId)
cr2_nodeid = self.default_controller.nodeId+1
cr2_dut_node_id = self.dut_node_id+1

cr2_new_admin_ctrl = cr2_new_fabric_admin.NewController(
nodeId=cr2_nodeid)
success, nocResp, rcacResp = await CommissioningBuildingBlocks.AddNOCForNewFabricFromExisting(
commissionerDevCtrl=dev_ctrl, newFabricDevCtrl=cr2_new_admin_ctrl,
existingNodeId=self.dut_node_id, newNodeId=cr2_dut_node_id
)

fabric_index_CR2 = nocResp.fabricIndex
tlvReaderRCAC_CR2 = TLVReader(rcacResp).get()["Any"]
rcac_CR2 = tlvReaderRCAC_CR2[9] # public key is field 9

self.step(2)
new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority()
cr3_vid = 0xFFF3
cr3_fabricId = 3333
cr3_new_fabric_admin = new_certificate_authority.NewFabricAdmin(
vendorId=cr3_vid, fabricId=cr3_fabricId)
cr3_nodeid = self.default_controller.nodeId+2
cr3_dut_node_id = self.dut_node_id+2

cr3_new_admin_ctrl = cr3_new_fabric_admin.NewController(
nodeId=cr3_nodeid)
success, nocResp, rcacResp = await CommissioningBuildingBlocks.AddNOCForNewFabricFromExisting(
commissionerDevCtrl=dev_ctrl, newFabricDevCtrl=cr3_new_admin_ctrl,
existingNodeId=self.dut_node_id, newNodeId=cr3_dut_node_id
)

fabric_index_CR3 = nocResp.fabricIndex
tlvReaderRCAC_CR3 = TLVReader(rcacResp).get()["Any"]
rcac_CR3 = tlvReaderRCAC_CR3[9]

self.step(3)
cr2_read_fabricIndex = await self.read_single_attribute_check_success(
dev_ctrl=cr2_new_admin_ctrl,
node_id=cr2_dut_node_id,
cluster=opcreds,
attribute=opcreds.Attributes.CurrentFabricIndex
)

asserts.assert_equal(fabric_index_CR2, cr2_read_fabricIndex,
"Fail fabric_index_CR2 is not equal to read fabricIndex from CR2")

self.step(4)
cr3_read_fabricIndex = await self.read_single_attribute_check_success(
dev_ctrl=cr3_new_admin_ctrl,
node_id=cr3_dut_node_id,
cluster=opcreds,
attribute=opcreds.Attributes.CurrentFabricIndex
)

asserts.assert_equal(fabric_index_CR3, cr3_read_fabricIndex,
"Fail fabric_index_CR3 is not equal to read fabricIndex from CR3")

self.step(5)
cr2_fabric = await self.read_single_attribute_check_success(
dev_ctrl=cr2_new_admin_ctrl,
node_id=cr2_dut_node_id,
cluster=opcreds,
attribute=opcreds.Attributes.Fabrics,
fabric_filtered=True
)

for fabric in cr2_fabric:
cr2_fabric_fabricIndex = fabric.fabricIndex
cr2_fabric_rootPublicKey = fabric.rootPublicKey
cr2_fabric_vendorId = fabric.vendorID
cr2_fabric_fabricId = fabric.fabricID

asserts.assert_equal(cr2_fabric_fabricIndex,
fabric_index_CR2, "Unexpected CR2 fabric index")
asserts.assert_equal(cr2_fabric_rootPublicKey, rcac_CR2,
"Unexpected RootPublicKey does not match with rcac_CR2")
asserts.assert_equal(cr2_fabric_vendorId, cr2_vid,
"Unexpected vendorId does not match with CR2 VendorID")
asserts.assert_equal(cr2_fabric_fabricId, cr2_fabricId,
"Unexpected fabricId does not match with CR2 fabricID")

self.step(6)
cr3_fabric = await self.read_single_attribute_check_success(
dev_ctrl=cr3_new_admin_ctrl,
node_id=cr3_dut_node_id,
cluster=opcreds,
attribute=opcreds.Attributes.Fabrics,
fabric_filtered=True
)

for fabric in cr3_fabric:
cr3_fabric_fabricIndex = fabric.fabricIndex
cr3_fabric_rootPublicKey = fabric.rootPublicKey
cr3_fabric_vendorId = fabric.vendorID
cr3_fabric_fabricId = fabric.fabricID

asserts.assert_equal(cr3_fabric_fabricIndex,
fabric_index_CR3, "Unexpected CR3 fabric index")
asserts.assert_equal(cr3_fabric_rootPublicKey, rcac_CR3,
"Unexpected RootPublicKey does not match with rcac_CR3")
asserts.assert_equal(cr3_fabric_vendorId, cr3_vid,
"Unexpected vendorId does not match with CR3 VendorID")
asserts.assert_equal(cr3_fabric_fabricId, cr3_fabricId,
"Unexpected fabricId does not match with CR3 fabricID")

self.step(7)
cmd = opcreds.Commands.RemoveFabric(fabric_index_CR2)
resp = await self.send_single_cmd(cmd=cmd)
asserts.assert_equal(
resp.statusCode, opcreds.Enums.NodeOperationalCertStatusEnum.kOk)

self.step(8)
cmd = opcreds.Commands.RemoveFabric(fabric_index_CR3)
resp = await self.send_single_cmd(cmd=cmd)
asserts.assert_equal(
resp.statusCode, opcreds.Enums.NodeOperationalCertStatusEnum.kOk)


if __name__ == "__main__":
default_matter_test_main()
78 changes: 78 additions & 0 deletions src/python_testing/test_plan_support.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#
# Copyright (c) 2024 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import typing


def read_attribute(attribute: str, cluster: typing.Optional[str] = None):
attr = f'reads the {attribute} attribute'
if cluster:
return f'{attr} from {cluster}'
else:
return attr


def save_as(val: str) -> str:
return f' and saves the value as {val}'


def verify_status(status: str) -> str:
return f'Verify DUT responds w/ status {status}'


def verify_success() -> str:
return verify_status('SUCCESS')

# -----------------------
# Commissioning strings
# -----------------------


def commission_if_required(controller: typing.Optional[str] = None) -> str:
controller_str = f'to {controller} ' if controller is not None else ''
return f'Commission DUT {controller_str}if not already done'


def commission_from_existing(existing_controller_name: str, new_controller_name: str) -> str:
# NOTE to implementers: This text corresponds to the actions taken by CommissioningBuildingBlocks.AddNOCForNewFabricFromExisting.
# This function should be used in the TestSteps description when you use that function.
# AddNOCForNewFabricFromExisting is used when the generated certificates are required for use in the test.
# It written one step so we can just use the function directly without needing to annotate the sub-steps for the TH.
return (f'Create a new controller on a new fabric called {new_controller_name}.\n'
f'Commission the new controller from {existing_controller_name} as follows:\n\n'
f'- {existing_controller_name} sends an ArmFailsafe command, followed by a CSRRequest command.\n'
f'- Generate credentials on {new_controller_name} using the returned CSR.\n'
f'- Save the RCAC as `rcac_{new_controller_name}. Save the ICAC as `icac_{new_controller_name}`. Save the NOC as `noc_{new_controller_name}`. Save the IPK as ipk_{new_controller_name}.\n'
f'- {existing_controller_name} sends the AddTrustedRootCertificate command with `rcac_{new_controller_name}`'
f'- {existing_controller_name} sends the AddNOC command with the fields set as follows:\n'
f' * NOCValue: `noc_{new_controller_name}`\n'
f' * ICACValue: `icac_{new_controller_name}`\n'
f' * IPKValue: `ipk_{new_controller_name}`\n'
f' * CaseAdminSubject: {new_controller_name} node ID\n'
f' * AdminVendorId: {new_controller_name} vendor ID\n'
f'- {new_controller_name} connects over CASE and sends the commissioning complete command')


def open_commissioning_window(controller: str = 'TH') -> str:
return f'{controller} opens a commissioning window on the DUT'


def remove_fabric(index_var: str, controller: str):
return f'{controller} sends the RemoveFabric command to the Node Operational Credentials cluster with the FabricIndex set to {index_var}.'


def verify_commissioning_successful() -> str:
return 'Verify the commissioning is successful.'

0 comments on commit 92ae515

Please sign in to comment.