Skip to content

Commit

Permalink
chore(TC_OPCREDS_3.2): included new steps (1-6)
Browse files Browse the repository at this point in the history
Just need to add validation for CR3 and remove all fabrics. Also
it need clean up (remove prints, unused variables, etc)
  • Loading branch information
gvargas-csa committed May 20, 2024
1 parent aeb58bf commit 77ef8b2
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 23 deletions.
51 changes: 31 additions & 20 deletions src/controller/python/chip/utils/CommissioningBuildingBlocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ async def GrantPrivilege(adminCtrl: ChipDeviceController, grantedCtrl: ChipDevic
'''
data = await adminCtrl.ReadAttribute(targetNodeId, [(Clusters.AccessControl.Attributes.Acl)])
if 0 not in data:
raise ValueError("Did not get back any data (possible cause: controller has no access..")
raise ValueError(
"Did not get back any data (possible cause: controller has no access..")

currentAcls = data[0][Clusters.AccessControl][Clusters.AccessControl.Attributes.Acl]

Expand All @@ -73,12 +74,14 @@ async def GrantPrivilege(adminCtrl: ChipDeviceController, grantedCtrl: ChipDevic
targetSubjects = [grantedCtrl.nodeId]

if (len(targetSubjects) > 4):
raise ValueError(f"List of target subjects of len {len(targetSubjects)} exceeeded the minima of 4!")
raise ValueError(
f"List of target subjects of len {len(targetSubjects)} exceeeded the minima of 4!")

# Step 1: Wipe the subject from all existing ACLs.
for acl in currentAcls:
if (acl.subjects != NullValue):
acl.subjects = [subject for subject in acl.subjects if subject not in targetSubjects]
acl.subjects = [
subject for subject in acl.subjects if subject not in targetSubjects]

if (privilege):
addedPrivilege = False
Expand Down Expand Up @@ -107,7 +110,8 @@ async def GrantPrivilege(adminCtrl: ChipDeviceController, grantedCtrl: ChipDevic
))

# Step 4: Prune ACLs which have empty subjects.
currentAcls = [acl for acl in currentAcls if acl.subjects != NullValue and len(acl.subjects) != 0]
currentAcls = [acl for acl in currentAcls if acl.subjects !=
NullValue and len(acl.subjects) != 0]

logger.info(f'GrantPrivilege: Writing acls: {currentAcls}')
await adminCtrl.WriteAttribute(targetNodeId, [(0, Clusters.AccessControl.Attributes.Acl(currentAcls))])
Expand Down Expand Up @@ -137,7 +141,8 @@ async def CreateControllersOnFabric(fabricAdmin: FabricAdmin,
controllerList = []

for nodeId in controllerNodeIds:
newController = fabricAdmin.NewController(nodeId=nodeId, paaTrustStorePath=paaTrustStorePath, catTags=catTags)
newController = fabricAdmin.NewController(
nodeId=nodeId, paaTrustStorePath=paaTrustStorePath, catTags=catTags)
await GrantPrivilege(adminDevCtrl, newController, privilege, targetNodeId, catTags)
controllerList.append(newController)

Expand All @@ -156,12 +161,14 @@ async def AddNOCForNewFabricFromExisting(commissionerDevCtrl, newFabricDevCtrl,
newNodeId (int): Node ID to use for the target node on the new fabric.
Return:
bool: True if successful, False otherwise.
tuple: (bool, nocResp): True if successful, False otherwise, along with nocResp, rcacResp value.
'''
nocResp = None

resp = await commissionerDevCtrl.SendCommand(existingNodeId, 0, generalCommissioning.Commands.ArmFailSafe(60))
if resp.errorCode is not generalCommissioning.Enums.CommissioningErrorEnum.kOk:
return False
return False, nocResp

csrForAddNOC = await commissionerDevCtrl.SendCommand(existingNodeId, 0, opCreds.Commands.CSRRequest(CSRNonce=os.urandom(32)))

Expand All @@ -171,31 +178,35 @@ async def AddNOCForNewFabricFromExisting(commissionerDevCtrl, newFabricDevCtrl,
chainForAddNOC.nocBytes is None or chainForAddNOC.ipkBytes is None):
# Expiring the failsafe timer in an attempt to clean up.
await commissionerDevCtrl.SendCommand(existingNodeId, 0, generalCommissioning.Commands.ArmFailSafe(0))
return False
return False, nocResp

await commissionerDevCtrl.SendCommand(existingNodeId, 0, opCreds.Commands.AddTrustedRootCertificate(chainForAddNOC.rcacBytes))
resp = await commissionerDevCtrl.SendCommand(existingNodeId,
0,
opCreds.Commands.AddNOC(chainForAddNOC.nocBytes,
chainForAddNOC.icacBytes,
chainForAddNOC.ipkBytes,
newFabricDevCtrl.nodeId,
newFabricDevCtrl.fabricAdmin.vendorId))
if resp.statusCode is not opCreds.Enums.NodeOperationalCertStatusEnum.kOk:
nocResp = await commissionerDevCtrl.SendCommand(existingNodeId,
0,
opCreds.Commands.AddNOC(chainForAddNOC.nocBytes,
chainForAddNOC.icacBytes,
chainForAddNOC.ipkBytes,
newFabricDevCtrl.nodeId,
newFabricDevCtrl.fabricAdmin.vendorId))

rcacResp = chainForAddNOC.rcacBytes

if nocResp.statusCode is not opCreds.Enums.NodeOperationalCertStatusEnum.kOk:
# Expiring the failsafe timer in an attempt to clean up.
await commissionerDevCtrl.SendCommand(existingNodeId, 0, generalCommissioning.Commands.ArmFailSafe(0))
return False
return False, nocResp

resp = await newFabricDevCtrl.SendCommand(newNodeId, 0, generalCommissioning.Commands.CommissioningComplete())

if resp.errorCode is not generalCommissioning.Enums.CommissioningErrorEnum.kOk:
# Expiring the failsafe timer in an attempt to clean up.
await commissionerDevCtrl.SendCommand(existingNodeId, 0, generalCommissioning.Commands.ArmFailSafe(0))
return False
return False, nocResp

if not await _IsNodeInFabricList(newFabricDevCtrl, newNodeId):
return False
return False, nocResp

return True
return True, nocResp, rcacResp


async def UpdateNOC(devCtrl, existingNodeId, newNodeId):
Expand Down
102 changes: 99 additions & 3 deletions src/python_testing/TC_OPCREDS_3_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
#

import chip.clusters as Clusters
from matter_testing_support import MatterBaseTest, TestStep, async_test_body
from test_plan_support import *
from chip.tlv import TLVReader
from chip.utils import CommissioningBuildingBlocks
from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main
from test_plan_support import commission_if_required, commission_from_existing, verify_commissioning_successful, read_attribute, remove_fabric, verify_success
from mobly import asserts


def verify_fabric(controller: str) -> str:
Expand Down Expand Up @@ -54,7 +57,100 @@ def steps_TC_OPCREDS_3_2(self):
@async_test_body
async def test_TC_OPCREDS_3_2(self):
# TODO: implement
pass
opcreds = Clusters.OperationalCredentials

self.step(0)

self.step(1)
# Create a new controller on a new fabric called CR2. Commission the new controller from CR1 as follows:
dev_ctrl = self.default_controller

new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority()
cr2_vid = 0xFFF1
cr2_fabricId = 2222
cr2_new_fabric_admin = new_certificate_authority.NewFabricAdmin(
vendorId=cr2_vid, fabricId=cr2_fabricId)
cr2_nodeid = self.default_controller.nodeId+1
cr2_dut_node_id = self.dut_node_id+1

cr2_new_admin_ctrl = cr2_new_fabric_admin.NewController(
nodeId=cr2_nodeid)
success, nocResp, rcacResp = await CommissioningBuildingBlocks.AddNOCForNewFabricFromExisting(
commissionerDevCtrl=dev_ctrl, newFabricDevCtrl=cr2_new_admin_ctrl,
existingNodeId=self.dut_node_id, newNodeId=cr2_dut_node_id
)
fabric_index_CR2 = nocResp.fabricIndex
tlvReaderRCAC = TLVReader(rcacResp).get()["Any"]
rcac_CR2 = tlvReaderRCAC[9] # public key is field 9

print(f"Success: {success}, nocResp: {nocResp}, rcacResp: {rcacResp}")
nocs = await self.read_single_attribute_check_success(dev_ctrl=cr2_new_admin_ctrl, node_id=cr2_dut_node_id, cluster=opcreds, attribute=opcreds.Attributes.NOCs, fabric_filtered=False)

print(f"nocs: {nocs}")

# rcac_CR2 = TLVReader(rcacResp).get()["Any"]
# print(rcac_CR2[9])

self.step(2)
new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority()
cr3_vid = 0xFFF1
cr3_fabricId = 3333
cr3_new_fabric_admin = new_certificate_authority.NewFabricAdmin(
vendorId=cr3_vid, fabricId=cr3_fabricId)
cr3_nodeid = self.default_controller.nodeId+2
cr3_dut_node_id = self.dut_node_id+2

cr3_new_admin_ctrl = cr3_new_fabric_admin.NewController(
nodeId=cr3_nodeid)
success, nocResp, rcacResp = await CommissioningBuildingBlocks.AddNOCForNewFabricFromExisting(
commissionerDevCtrl=dev_ctrl, newFabricDevCtrl=cr3_new_admin_ctrl,
existingNodeId=self.dut_node_id, newNodeId=cr3_dut_node_id
)
fabric_index_CR3 = nocResp.fabricIndex
# rcac_CR3 = TLVReader(rcacResp).get()["Any"]

# print(f"Success: {success}, nocResp: {nocResp}, rcacResp: {rcacResp}")

self.step(3)
cr2_read_fabricIndex = await self.read_single_attribute_check_success(dev_ctrl=cr2_new_admin_ctrl, node_id=cr2_dut_node_id, cluster=opcreds, attribute=opcreds.Attributes.CurrentFabricIndex)
print(f"fabric_index_CR2: {cr2_read_fabricIndex}")
if fabric_index_CR2 == cr2_read_fabricIndex:
print("Success fabric_index_CR2 is equal to read fabricIndex from CR2")
else:
print("Fail fabric_index_CR2 is not equal to read fabricIndex from CR2")

self.step(4)
cr3_read_fabricIndex = await self.read_single_attribute_check_success(dev_ctrl=cr3_new_admin_ctrl, node_id=cr3_dut_node_id, cluster=opcreds, attribute=opcreds.Attributes.CurrentFabricIndex)
print(f"fabric_index_CR3: {cr3_read_fabricIndex}")
if fabric_index_CR3 == cr3_read_fabricIndex:
print("Success fabric_index_CR3 is equal to read fabricIndex from CR3")
else:
print("Fail fabric_index_CR3 is not equal to read fabricIndex from CR3")

self.step(5)
cr2_fabric = await self.read_single_attribute_check_success(dev_ctrl=cr2_new_admin_ctrl, node_id=cr2_dut_node_id, cluster=opcreds, attribute=opcreds.Attributes.Fabrics, fabric_filtered=True)
print(f"cr2_fabric: {cr2_fabric}")

for fabric in cr2_fabric:
cr2_fabric_fabricIndex = fabric.fabricIndex
cr2_fabric_rootPublicKey = fabric.rootPublicKey
cr2_fabric_vendorId = fabric.vendorID
cr2_fabric_fabricId = fabric.fabricID

asserts.assert_equal(cr2_fabric_fabricIndex,
fabric_index_CR2, "Unexpected CR2 fabric index")
asserts.assert_equal(cr2_fabric_rootPublicKey, rcac_CR2,
"Unexpected RootPublicKey does not match with rcac_CR2")
asserts.assert_equal(cr2_fabric_vendorId, cr2_vid,
"Unexpected vendorId does not match with CR2 VendorID")
asserts.assert_equal(cr2_fabric_fabricId, cr2_fabricId,
"Unexpected fabricId does not match with CR2 fabricID")

print(f"cr2_fabric_fabricIndex: {cr2_fabric_fabricIndex}, cr2_fabric_rootPublicKey: {cr2_fabric_rootPublicKey}, cr2_fabric_vendorId: {cr2_fabric_vendorId}, cr2_fabric_fabricId: {cr2_fabric_fabricId}")

self.step(6)
self.step(7)
self.step(8)


if __name__ == "__main__":
Expand Down

0 comments on commit 77ef8b2

Please sign in to comment.