Skip to content

Commit

Permalink
chore(TC_OPCREDS_3.4): patch from restyled code
Browse files Browse the repository at this point in the history
  • Loading branch information
gvargas-csa committed Jul 15, 2024
1 parent c74e99b commit 5c4222b
Showing 1 changed file with 29 additions and 17 deletions.
46 changes: 29 additions & 17 deletions src/python_testing/TC_OPCREDS_3_4.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import chip.discovery as Discovery
from chip import ChipDeviceCtrl
from chip.exceptions import ChipStackError
from chip.utils import CommissioningBuildingBlocks
from chip.interaction_model import InteractionModelError, Status
from chip.tlv import TLVReader, TLVWriter
from chip.utils import CommissioningBuildingBlocks
from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main
from mobly import asserts

Expand All @@ -38,7 +38,7 @@ def FindAndEstablishPase(self, longDiscriminator: int, setupPinCode: int, nodeid
filterType=Discovery.FilterType.LONG_DISCRIMINATOR, filter=longDiscriminator, stopOnFirst=False)
# For some reason, the devices returned here aren't filtered, so filter ourselves
device = next(filter(lambda d: d.commissioningMode ==
Discovery.FilterType.LONG_DISCRIMINATOR and d.longDiscriminator == longDiscriminator, devices))
Discovery.FilterType.LONG_DISCRIMINATOR and d.longDiscriminator == longDiscriminator, devices))
for a in device.addresses:
try:
dev_ctrl.EstablishPASESessionIP(ipaddr=a, setupPinCode=setupPinCode,
Expand Down Expand Up @@ -83,9 +83,9 @@ async def test_TC_OPCREDS_3_4(self):
dev_ctrl=th1_new_fabric_ctrl,
node_id=th1_dut_node_id, cluster=opcreds,
attribute=opcreds.Attributes.TrustedRootCertificates)
print("trusted_root_original: ", trusted_root_original)

self.print_step(4, "TH1 sends the UpdateNOC command to the Node Operational Credentials cluster with the following fields: NOCValue and ICACValue")
self.print_step(
4, "TH1 sends the UpdateNOC command to the Node Operational Credentials cluster with the following fields: NOCValue and ICACValue")
cmd = opcreds.Commands.UpdateNOC(NOCValue=noc_original, ICACValue=icac_original)
try:
await self.send_single_cmd(dev_ctrl=th1_new_fabric_ctrl, node_id=th1_dut_node_id, cmd=cmd)
Expand All @@ -96,13 +96,15 @@ async def test_TC_OPCREDS_3_4(self):
self.print_step(5, "TH1 sends ArmFailSafe command to the DUT with the ExpiryLengthSeconds field set to 900")
cmd = Clusters.GeneralCommissioning.Commands.ArmFailSafe(expiryLengthSeconds=900)
resp = await self.send_single_cmd(dev_ctrl=th1_new_fabric_ctrl, node_id=th1_dut_node_id, cmd=cmd)
print(resp)
asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kOk, "Failure status returned from arm failsafe")
asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kOk,
"Failure status returned from arm failsafe")

self.print_step(6, "TH1 sends the UpdateNOC command to the Node Operational Credentials cluster with the following fields: NOCValue and ICACValue")
self.print_step(
6, "TH1 sends the UpdateNOC command to the Node Operational Credentials cluster with the following fields: NOCValue and ICACValue")
cmd = opcreds.Commands.UpdateNOC(NOCValue=noc_original, ICACValue=icac_original)
resp = await self.send_single_cmd(dev_ctrl=th1_new_fabric_ctrl, node_id=th1_dut_node_id, cmd=cmd)
asserts.assert_equal(resp.statusCode, opcreds.Enums.NodeOperationalCertStatusEnum.kMissingCsr, "Failure status returned from UpdateNOC")
asserts.assert_equal(resp.statusCode, opcreds.Enums.NodeOperationalCertStatusEnum.kMissingCsr,
"Failure status returned from UpdateNOC")

self.print_step(7, "TH1 Sends CSRRequest command with the IsForUpdateNOC field set to false")
cmd = opcreds.Commands.CSRRequest(CSRNonce=random.randbytes(32), isForUpdateNOC=False)
Expand All @@ -124,11 +126,13 @@ async def test_TC_OPCREDS_3_4(self):

cmd = Clusters.GeneralCommissioning.Commands.ArmFailSafe(0)
resp = await self.send_single_cmd(dev_ctrl=th1_new_fabric_ctrl, node_id=th1_dut_node_id, cmd=cmd)
asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kOk, "Failure status returned from arm failsafe")
asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kOk,
"Failure status returned from arm failsafe")

cmd = Clusters.GeneralCommissioning.Commands.ArmFailSafe(900)
resp = await self.send_single_cmd(dev_ctrl=th1_new_fabric_ctrl, node_id=th1_dut_node_id, cmd=cmd)
asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kOk, "Failure status returned from arm failsafe")
asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kOk,
"Failure status returned from arm failsafe")

self.print_step(10, "TH1 Sends CSRRequest command with the IsForUpdateNOC field set to true")
cmd = opcreds.Commands.CSRRequest(CSRNonce=random.randbytes(32), isForUpdateNOC=True)
Expand All @@ -137,7 +141,8 @@ async def test_TC_OPCREDS_3_4(self):
self.print_step(11, "TH1 sends the UpdateNOC command to the Node Operational Credentials cluster")
cmd = opcreds.Commands.UpdateNOC(NOCValue=noc_original, ICACValue=icac_original)
resp = await self.send_single_cmd(dev_ctrl=th1_new_fabric_ctrl, node_id=th1_dut_node_id, cmd=cmd)
asserts.assert_equal(resp.statusCode, opcreds.Enums.NodeOperationalCertStatusEnum.kInvalidPublicKey, "Unexpected error code on AddNOC with mismatched CSR")
asserts.assert_equal(resp.statusCode, opcreds.Enums.NodeOperationalCertStatusEnum.kInvalidPublicKey,
"Unexpected error code on AddNOC with mismatched CSR")

self.print_step(12, "TH1 generates a new Trusted Root Certificate and Private Key and saves as new_root_cert and new_root_key so that TH can generate an NOC for UpdateNOC that doesn’t chain to the original root")
new_root_cert = new_noc_chain.rcacBytes
Expand All @@ -152,7 +157,8 @@ async def test_TC_OPCREDS_3_4(self):
self.print_step(14, "TH1 sends the UpdateNOC command to the Node Operational Credentials cluster")
cmd = opcreds.Commands.UpdateNOC(NOCValue=noc_update_new_root, ICACValue=icac_update_new_root)
resp = await self.send_single_cmd(dev_ctrl=th1_new_fabric_ctrl, node_id=th1_dut_node_id, cmd=cmd)
asserts.assert_equal(resp.statusCode, opcreds.Enums.NodeOperationalCertStatusEnum.kInvalidNOC, "NOCResponse with the StatusCode InvalidNOC")
asserts.assert_equal(resp.statusCode, opcreds.Enums.NodeOperationalCertStatusEnum.kInvalidNOC,
"NOCResponse with the StatusCode InvalidNOC")

self.print_step(15, "TH1 generates a new NOC and ICAC")
# new NOC is generated from the NOCSR returned in csr_update with the matter-fabric-id set to a different
Expand All @@ -165,7 +171,8 @@ async def test_TC_OPCREDS_3_4(self):
self.print_step(16, "TH1 sends the UpdateNOC command to the Node Operational Credentials cluster")
cmd = opcreds.Commands.UpdateNOC(NOCValue=noc_update_bad_fabric_on_noc, ICACValue=icac_update_bad_fabric_on_noc)
resp = await self.send_single_cmd(dev_ctrl=th1_new_fabric_ctrl, node_id=th1_dut_node_id, cmd=cmd)
asserts.assert_equal(resp.statusCode, opcreds.Enums.NodeOperationalCertStatusEnum.kInvalidNOC, "NOCResponse with the StatusCode InvalidNOC")
asserts.assert_equal(resp.statusCode, opcreds.Enums.NodeOperationalCertStatusEnum.kInvalidNOC,
"NOCResponse with the StatusCode InvalidNOC")

self.print_step(17, "TH1 generates a new NOC and ICAC")
noc_update_bad_fabric_on_icac = csr_update.NOCSRElements
Expand All @@ -174,7 +181,8 @@ async def test_TC_OPCREDS_3_4(self):
self.print_step(18, "TH1 sends the UpdateNOC command to the Node Operational Credentials cluster")
cmd = opcreds.Commands.UpdateNOC(NOCValue=noc_update_bad_fabric_on_icac, ICACValue=icac_update_bad_fabric_on_icac)
resp = await self.send_single_cmd(dev_ctrl=th1_new_fabric_ctrl, node_id=th1_dut_node_id, cmd=cmd)
asserts.assert_equal(resp.statusCode, opcreds.Enums.NodeOperationalCertStatusEnum.kInvalidNOC, "NOCResponse with the StatusCode InvalidNOC")
asserts.assert_equal(resp.statusCode, opcreds.Enums.NodeOperationalCertStatusEnum.kInvalidNOC,
"NOCResponse with the StatusCode InvalidNOC")

self.print_step(19, "TH1 sends AddTrustedRootCertificate command to DUT again with the RootCACertificate field set to new_root_cert")
cmd = opcreds.Commands.AddTrustedRootCertificate(new_root_cert)
Expand All @@ -188,12 +196,14 @@ async def test_TC_OPCREDS_3_4(self):
# 11.17.7.9 Test Step: Verify that the DUT responds with CONSTRAINT_ERROR
# TODO: Ask why we expected that error if there is not CSRRequest command previously? Also in the step 6 received and MissingCsr error.
#asserts.assert_equal(e.status, Status.ConstraintError, "Failure status returned from UpdateNOC")
asserts.assert_equal(resp.statusCode, opcreds.Enums.NodeOperationalCertStatusEnum.kMissingCsr, "Failure status returned from UpdateNOC")
asserts.assert_equal(resp.statusCode, opcreds.Enums.NodeOperationalCertStatusEnum.kMissingCsr,
"Failure status returned from UpdateNOC")

self.print_step(21, "TH1 sends ArmFailSafe command to the DUT with the ExpiryLengthSeconds field set to 0")
cmd = Clusters.GeneralCommissioning.Commands.ArmFailSafe(0)
resp = await self.send_single_cmd(dev_ctrl=th1_new_fabric_ctrl, node_id=th1_dut_node_id, cmd=cmd)
asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kOk, "Failure status returned from arm failsafe")
asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kOk,
"Failure status returned from arm failsafe")

self.print_step(22, "TH1 sends an OpenCommissioningWindow command to the DUT")
resp = self.openCommissioningWindow(th1_new_fabric_ctrl, th1_dut_node_id)
Expand All @@ -204,7 +214,8 @@ async def test_TC_OPCREDS_3_4(self):

cmd = Clusters.GeneralCommissioning.Commands.ArmFailSafe(900)
resp = await self.send_single_cmd(dev_ctrl=th1_new_fabric_ctrl, node_id=th1_dut_node_id, cmd=cmd)
asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kOk, "Error code status returned from arm failsafe")
asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kOk,
"Error code status returned from arm failsafe")

self.print_step(24, "TH1 Sends CSRRequest command over PASE with the IsForUpdateNOC field set to true")
cmd = opcreds.Commands.CSRRequest(CSRNonce=random.randbytes(32))
Expand All @@ -223,5 +234,6 @@ async def test_TC_OPCREDS_3_4(self):
except InteractionModelError as e:
asserts.assert_equal(e.status, Status.UnsupportedAccess, "Failure status returned from UpdateNOC")


if __name__ == "__main__":
default_matter_test_main()

0 comments on commit 5c4222b

Please sign in to comment.