Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TC-OPCREDS-3.2: Add #32182

Merged
merged 22 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
dc17fbd
chore(TC_OPCREDS-3.2): skeleton base class
gvargas-csa Jan 31, 2024
122207f
chore(TC_OPCREDS_3.2): NOCResponse command
gvargas-csa Feb 2, 2024
8367f6e
chore(TC_OPCREDS_3.2): get currentFabric
gvargas-csa Feb 13, 2024
a01f9ad
chore(TC_OPCREDS_3.2): print some status values
gvargas-csa Feb 26, 2024
76f8533
TC-OPCREDS-3.2: Re-work test
cecille Apr 22, 2024
f24a046
Merge remote-tracking branch 'cecille/tc_opcreds_3_2' into HEAD
gvargas-csa May 15, 2024
c751f6a
Merge branch 'project-chip:master' into TC_OPCREDS_3_2
gvargas-csa May 15, 2024
aeb58bf
Merge branch 'project-chip:master' into TC_OPCREDS_3_2
gvargas-csa May 20, 2024
77ef8b2
chore(TC_OPCREDS_3.2): included new steps (1-6)
gvargas-csa May 20, 2024
314b579
chore(TC_OPCREDS_3.2): all steps are implemtented
gvargas-csa May 20, 2024
9b2f5d8
chore(TC_OPCREDS_3.2): restyled by isort
gvargas-csa May 20, 2024
f015132
chore(TC_OPCREDS_3_2): fix error from python linter (code-lints)
gvargas-csa May 20, 2024
064605f
chore(TC_OPCREDS_3.2): import restyled by isort
gvargas-csa May 20, 2024
81df904
chore(TC_OPCREDS_3.2): fixed the vendorID for prevent fail
gvargas-csa May 30, 2024
a589d37
chore(TC_OPCREDS_3.2): add test to CI Workflow
gvargas-csa May 30, 2024
b24f0b6
chore(TC_OPCREDS_3.2): off auto-format and change the return values l…
gvargas-csa May 30, 2024
05ee254
chore(TC_OPCREDS_3.2): remove if-else validation instead used assert_…
gvargas-csa May 30, 2024
64d090e
chore(TC_OPCREDS_3.2): spacing adjustment for better readability
gvargas-csa May 31, 2024
57c1dd3
chore(TC_OPCREDS_3.2): identation adjustment for better readability
gvargas-csa May 31, 2024
dd220c0
chore(TC_OPCREDS_3.2): restyled patch identation adjustment
gvargas-csa May 31, 2024
400f01f
Merge branch 'master' into HEAD
gvargas-csa Jun 4, 2024
25e23c4
chore(TC_OPCREDS_3.2): reverted addition of test
gvargas-csa Jun 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ jobs:
scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --script "src/python_testing/TestMatterTestingSupport.py" --script-args "--trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"'
scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --script "src/python_testing/TestSpecParsingSupport.py" --script-args "--trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"'
scripts/run_in_python_env.sh out/venv './scripts/tests/TestTimeSyncTrustedTimeSourceRunner.py'
scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_OPCREDS_3_2.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"'
scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_OPSTATE_2_1.py" --script-args "--endpoint 1 --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"'
scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_OPSTATE_2_2.py" --script-args "--endpoint 1 --int-arg PIXIT.WAITTIME.COUNTDOWN:5 --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"'
scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_OPSTATE_2_3.py" --script-args "--endpoint 1 --int-arg PIXIT.WAITTIME.COUNTDOWN:5 --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"'
Expand Down
36 changes: 21 additions & 15 deletions src/controller/python/chip/utils/CommissioningBuildingBlocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,14 @@ async def AddNOCForNewFabricFromExisting(commissionerDevCtrl, newFabricDevCtrl,
newNodeId (int): Node ID to use for the target node on the new fabric.

Return:
bool: True if successful, False otherwise.
tuple: (bool, Optional[nocResp], Optional[rcacResp]: True if successful, False otherwise, along with nocResp, rcacResp value.

'''
nocResp = None

resp = await commissionerDevCtrl.SendCommand(existingNodeId, 0, generalCommissioning.Commands.ArmFailSafe(60))
if resp.errorCode is not generalCommissioning.Enums.CommissioningErrorEnum.kOk:
return False
return False, nocResp

csrForAddNOC = await commissionerDevCtrl.SendCommand(existingNodeId, 0, opCreds.Commands.CSRRequest(CSRNonce=os.urandom(32)))

Expand All @@ -171,31 +173,35 @@ async def AddNOCForNewFabricFromExisting(commissionerDevCtrl, newFabricDevCtrl,
chainForAddNOC.nocBytes is None or chainForAddNOC.ipkBytes is None):
# Expiring the failsafe timer in an attempt to clean up.
await commissionerDevCtrl.SendCommand(existingNodeId, 0, generalCommissioning.Commands.ArmFailSafe(0))
return False
return False, nocResp

await commissionerDevCtrl.SendCommand(existingNodeId, 0, opCreds.Commands.AddTrustedRootCertificate(chainForAddNOC.rcacBytes))
resp = await commissionerDevCtrl.SendCommand(existingNodeId,
0,
opCreds.Commands.AddNOC(chainForAddNOC.nocBytes,
chainForAddNOC.icacBytes,
chainForAddNOC.ipkBytes,
newFabricDevCtrl.nodeId,
newFabricDevCtrl.fabricAdmin.vendorId))
if resp.statusCode is not opCreds.Enums.NodeOperationalCertStatusEnum.kOk:
nocResp = await commissionerDevCtrl.SendCommand(existingNodeId,
0,
opCreds.Commands.AddNOC(chainForAddNOC.nocBytes,
chainForAddNOC.icacBytes,
chainForAddNOC.ipkBytes,
newFabricDevCtrl.nodeId,
newFabricDevCtrl.fabricAdmin.vendorId))

rcacResp = chainForAddNOC.rcacBytes

if nocResp.statusCode is not opCreds.Enums.NodeOperationalCertStatusEnum.kOk:
# Expiring the failsafe timer in an attempt to clean up.
await commissionerDevCtrl.SendCommand(existingNodeId, 0, generalCommissioning.Commands.ArmFailSafe(0))
return False
return False, nocResp

resp = await newFabricDevCtrl.SendCommand(newNodeId, 0, generalCommissioning.Commands.CommissioningComplete())

if resp.errorCode is not generalCommissioning.Enums.CommissioningErrorEnum.kOk:
# Expiring the failsafe timer in an attempt to clean up.
await commissionerDevCtrl.SendCommand(existingNodeId, 0, generalCommissioning.Commands.ArmFailSafe(0))
return False
return False, nocResp

if not await _IsNodeInFabricList(newFabricDevCtrl, newNodeId):
return False
return False, nocResp

return True
return True, nocResp, rcacResp


async def UpdateNOC(devCtrl, existingNodeId, newNodeId):
Expand Down
190 changes: 190 additions & 0 deletions src/python_testing/TC_OPCREDS_3_2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
#
gvargas-csa marked this conversation as resolved.
Show resolved Hide resolved
# Copyright (c) 2024 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import chip.clusters as Clusters
from chip.tlv import TLVReader
from chip.utils import CommissioningBuildingBlocks
from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main
from mobly import asserts
from test_plan_support import (commission_from_existing, commission_if_required, read_attribute, remove_fabric,
verify_commissioning_successful, verify_success)


def verify_fabric(controller: str) -> str:
return (f"- Verify there is one entry returned. Verify FabricIndex matches `fabric_index_{controller}`.\n"
f"- Verify the RootPublicKey matches the public key for rcac_{controller}.\n"
f"- Verify the VendorID matches the vendor ID for {controller}.\n"
f"- Verify the FabricID matches the fabricID for {controller}")


class TC_OPCREDS_3_2(MatterBaseTest):
def desc_TC_OPCREDS_3_2(self):
return " Attribute-CurrentFabricIndex validation [DUTServer]"

def steps_TC_OPCREDS_3_2(self):
return [TestStep(0, commission_if_required('CR1'), is_commissioning=True),
TestStep(1, f"{commission_from_existing('CR1', 'CR2')}\n. Save the FabricIndex from the NOCResponse as `fabric_index_CR2`.",
verify_commissioning_successful()),
TestStep(2, f"{commission_from_existing('CR1', 'CR3')}\n. Save the FabricIndex from the NOCResponse as `fabric_index_CR3`.",
verify_commissioning_successful()),
TestStep(3, f"CR2 {read_attribute('CurrentFabricIndex')}",
"Verify the returned value is `fabric_index_CR2`"),
TestStep(4, f"CR3 {read_attribute('CurrentFabricIndex')}",
"Verify the returned value is `fabric_index_CR3`"),
TestStep(
5, f"CR2 {read_attribute('Fabrics')} using a fabric-filtered read", verify_fabric('CR2')),
TestStep(
6, f"CR3 {read_attribute('Fabrics')} using a fabric-filtered read", verify_fabric('CR3')),
TestStep(7, remove_fabric(
'fabric_index_CR2', 'CR1'), verify_success()),
TestStep(8, remove_fabric(
'fabric_index_CR3', 'CR1'), verify_success()),
]

@async_test_body
async def test_TC_OPCREDS_3_2(self):
opcreds = Clusters.OperationalCredentials

self.step(0)

self.step(1)
dev_ctrl = self.default_controller

new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority()
cr2_vid = 0xFFF2
cr2_fabricId = 2222
cr2_new_fabric_admin = new_certificate_authority.NewFabricAdmin(
vendorId=cr2_vid, fabricId=cr2_fabricId)
cr2_nodeid = self.default_controller.nodeId+1
cr2_dut_node_id = self.dut_node_id+1

cr2_new_admin_ctrl = cr2_new_fabric_admin.NewController(
nodeId=cr2_nodeid)
success, nocResp, rcacResp = await CommissioningBuildingBlocks.AddNOCForNewFabricFromExisting(
commissionerDevCtrl=dev_ctrl, newFabricDevCtrl=cr2_new_admin_ctrl,
existingNodeId=self.dut_node_id, newNodeId=cr2_dut_node_id
)

fabric_index_CR2 = nocResp.fabricIndex
tlvReaderRCAC_CR2 = TLVReader(rcacResp).get()["Any"]
rcac_CR2 = tlvReaderRCAC_CR2[9] # public key is field 9

self.step(2)
new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority()
cr3_vid = 0xFFF3
cr3_fabricId = 3333
cr3_new_fabric_admin = new_certificate_authority.NewFabricAdmin(
vendorId=cr3_vid, fabricId=cr3_fabricId)
cr3_nodeid = self.default_controller.nodeId+2
cr3_dut_node_id = self.dut_node_id+2

cr3_new_admin_ctrl = cr3_new_fabric_admin.NewController(
nodeId=cr3_nodeid)
success, nocResp, rcacResp = await CommissioningBuildingBlocks.AddNOCForNewFabricFromExisting(
commissionerDevCtrl=dev_ctrl, newFabricDevCtrl=cr3_new_admin_ctrl,
existingNodeId=self.dut_node_id, newNodeId=cr3_dut_node_id
)

fabric_index_CR3 = nocResp.fabricIndex
tlvReaderRCAC_CR3 = TLVReader(rcacResp).get()["Any"]
rcac_CR3 = tlvReaderRCAC_CR3[9]

self.step(3)
cr2_read_fabricIndex = await self.read_single_attribute_check_success(
dev_ctrl=cr2_new_admin_ctrl,
node_id=cr2_dut_node_id,
cluster=opcreds,
attribute=opcreds.Attributes.CurrentFabricIndex
)

asserts.assert_equal(fabric_index_CR2, cr2_read_fabricIndex,
"Fail fabric_index_CR2 is not equal to read fabricIndex from CR2")

self.step(4)
cr3_read_fabricIndex = await self.read_single_attribute_check_success(
dev_ctrl=cr3_new_admin_ctrl,
node_id=cr3_dut_node_id,
cluster=opcreds,
attribute=opcreds.Attributes.CurrentFabricIndex
)

asserts.assert_equal(fabric_index_CR3, cr3_read_fabricIndex,
"Fail fabric_index_CR3 is not equal to read fabricIndex from CR3")

self.step(5)
cr2_fabric = await self.read_single_attribute_check_success(
dev_ctrl=cr2_new_admin_ctrl,
node_id=cr2_dut_node_id,
cluster=opcreds,
attribute=opcreds.Attributes.Fabrics,
fabric_filtered=True
)

for fabric in cr2_fabric:
cr2_fabric_fabricIndex = fabric.fabricIndex
cr2_fabric_rootPublicKey = fabric.rootPublicKey
cr2_fabric_vendorId = fabric.vendorID
cr2_fabric_fabricId = fabric.fabricID

asserts.assert_equal(cr2_fabric_fabricIndex,
fabric_index_CR2, "Unexpected CR2 fabric index")
asserts.assert_equal(cr2_fabric_rootPublicKey, rcac_CR2,
"Unexpected RootPublicKey does not match with rcac_CR2")
asserts.assert_equal(cr2_fabric_vendorId, cr2_vid,
"Unexpected vendorId does not match with CR2 VendorID")
asserts.assert_equal(cr2_fabric_fabricId, cr2_fabricId,
"Unexpected fabricId does not match with CR2 fabricID")

self.step(6)
cr3_fabric = await self.read_single_attribute_check_success(
dev_ctrl=cr3_new_admin_ctrl,
node_id=cr3_dut_node_id,
cluster=opcreds,
attribute=opcreds.Attributes.Fabrics,
fabric_filtered=True
)

for fabric in cr3_fabric:
cr3_fabric_fabricIndex = fabric.fabricIndex
cr3_fabric_rootPublicKey = fabric.rootPublicKey
cr3_fabric_vendorId = fabric.vendorID
cr3_fabric_fabricId = fabric.fabricID

asserts.assert_equal(cr3_fabric_fabricIndex,
fabric_index_CR3, "Unexpected CR3 fabric index")
asserts.assert_equal(cr3_fabric_rootPublicKey, rcac_CR3,
"Unexpected RootPublicKey does not match with rcac_CR3")
asserts.assert_equal(cr3_fabric_vendorId, cr3_vid,
"Unexpected vendorId does not match with CR3 VendorID")
asserts.assert_equal(cr3_fabric_fabricId, cr3_fabricId,
"Unexpected fabricId does not match with CR3 fabricID")

self.step(7)
cmd = opcreds.Commands.RemoveFabric(fabric_index_CR2)
resp = await self.send_single_cmd(cmd=cmd)
asserts.assert_equal(
resp.statusCode, opcreds.Enums.NodeOperationalCertStatusEnum.kOk)

self.step(8)
cmd = opcreds.Commands.RemoveFabric(fabric_index_CR3)
resp = await self.send_single_cmd(cmd=cmd)
asserts.assert_equal(
resp.statusCode, opcreds.Enums.NodeOperationalCertStatusEnum.kOk)


if __name__ == "__main__":
default_matter_test_main()
78 changes: 78 additions & 0 deletions src/python_testing/test_plan_support.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#
# Copyright (c) 2024 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import typing


def read_attribute(attribute: str, cluster: typing.Optional[str] = None):
attr = f'reads the {attribute} attribute'
if cluster:
return f'{attr} from {cluster}'
else:
return attr


def save_as(val: str) -> str:
return f' and saves the value as {val}'


def verify_status(status: str) -> str:
return f'Verify DUT responds w/ status {status}'


def verify_success() -> str:
return verify_status('SUCCESS')

# -----------------------
# Commissioning strings
# -----------------------


def commission_if_required(controller: typing.Optional[str] = None) -> str:
controller_str = f'to {controller} ' if controller is not None else ''
return f'Commission DUT {controller_str}if not already done'


def commission_from_existing(existing_controller_name: str, new_controller_name: str) -> str:
# NOTE to implementers: This text corresponds to the actions taken by CommissioningBuildingBlocks.AddNOCForNewFabricFromExisting.
# This function should be used in the TestSteps description when you use that function.
# AddNOCForNewFabricFromExisting is used when the generated certificates are required for use in the test.
# It written one step so we can just use the function directly without needing to annotate the sub-steps for the TH.
return (f'Create a new controller on a new fabric called {new_controller_name}.\n'
f'Commission the new controller from {existing_controller_name} as follows:\n\n'
f'- {existing_controller_name} sends an ArmFailsafe command, followed by a CSRRequest command.\n'
f'- Generate credentials on {new_controller_name} using the returned CSR.\n'
f'- Save the RCAC as `rcac_{new_controller_name}. Save the ICAC as `icac_{new_controller_name}`. Save the NOC as `noc_{new_controller_name}`. Save the IPK as ipk_{new_controller_name}.\n'
f'- {existing_controller_name} sends the AddTrustedRootCertificate command with `rcac_{new_controller_name}`'
f'- {existing_controller_name} sends the AddNOC command with the fields set as follows:\n'
f' * NOCValue: `noc_{new_controller_name}`\n'
f' * ICACValue: `icac_{new_controller_name}`\n'
f' * IPKValue: `ipk_{new_controller_name}`\n'
f' * CaseAdminSubject: {new_controller_name} node ID\n'
f' * AdminVendorId: {new_controller_name} vendor ID\n'
f'- {new_controller_name} connects over CASE and sends the commissioning complete command')


def open_commissioning_window(controller: str = 'TH') -> str:
return f'{controller} opens a commissioning window on the DUT'


def remove_fabric(index_var: str, controller: str):
return f'{controller} sends the RemoveFabric command to the Node Operational Credentials cluster with the FabricIndex set to {index_var}.'


def verify_commissioning_successful() -> str:
return 'Verify the commissioning is successful.'
Loading