Skip to content

Commit

Permalink
Update MCORE_FS_1_2 to latest testplan (project-chip#35097)
Browse files Browse the repository at this point in the history
  • Loading branch information
tehampson authored Aug 21, 2024
1 parent 329c965 commit 68f0407
Showing 1 changed file with 163 additions and 90 deletions.
253 changes: 163 additions & 90 deletions src/python_testing/TC_MCORE_FS_1_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,102 @@
# for details about the block below.
#

import base64
import hashlib
import logging
import os
import queue
import secrets
import signal
import struct
import subprocess
import time
import uuid
from dataclasses import dataclass

import chip.clusters as Clusters
from chip import ChipDeviceCtrl
from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main
from ecdsa.curves import NIST256p
from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main, type_matches
from mobly import asserts
from TC_SC_3_6 import AttributeChangeAccumulator

# Length of `w0s` and `w1s` elements
WS_LENGTH = NIST256p.baselen + 8


def _generate_verifier(passcode: int, salt: bytes, iterations: int) -> bytes:
ws = hashlib.pbkdf2_hmac('sha256', struct.pack('<I', passcode), salt, iterations, WS_LENGTH * 2)
w0 = int.from_bytes(ws[:WS_LENGTH], byteorder='big') % NIST256p.order
w1 = int.from_bytes(ws[WS_LENGTH:], byteorder='big') % NIST256p.order
L = NIST256p.generator * w1

return w0.to_bytes(NIST256p.baselen, byteorder='big') + L.to_bytes('uncompressed')


@dataclass
class _SetupParamters:
setup_qr_code: str
manual_code: int
discriminator: int
passcode: int


class TC_MCORE_FS_1_2(MatterBaseTest):
@async_test_body
async def setup_class(self):
super().setup_class()
self._app_th_server_process = None
self._th_server_kvs = None

def teardown_class(self):
if self._app_th_server_process is not None:
logging.warning("Stopping app with SIGTERM")
self._app_th_server_process.send_signal(signal.SIGTERM.value)
self._app_th_server_process.wait()

if self._th_server_kvs is not None:
os.remove(self._th_server_kvs)
super().teardown_class()

async def _create_th_server(self, port):
# These are default testing values
setup_params = _SetupParamters(setup_qr_code="MT:-24J0AFN00KA0648G00",
manual_code=34970112332, discriminator=3840, passcode=20202021)
kvs = f'kvs_{str(uuid.uuid4())}'

cmd = [self._th_server_app_path]
cmd.extend(['--secured-device-port', str(port)])
cmd.extend(['--discriminator', str(setup_params.discriminator)])
cmd.extend(['--passcode', str(setup_params.passcode)])
cmd.extend(['--KVS', kvs])

# TODO: Determine if we want these logs cooked or pushed to somewhere else
logging.info("Starting TH_SERVER")
self._app_th_server_process = subprocess.Popen(cmd)
self._th_server_kvs = kvs
logging.info("Started TH_SERVER")
time.sleep(3)
return setup_params

def _ask_for_vendor_commissioning_ux_operation(self, setup_params: _SetupParamters):
self.wait_for_user_input(
prompt_msg=f"Using the DUT vendor's provided interface, commission the ICD device using the following parameters:\n"
f"- discriminator: {setup_params.discriminator}\n"
f"- setupPinCode: {setup_params.passcode}\n"
f"- setupQRCode: {setup_params.setup_qr_code}\n"
f"- setupManualcode: {setup_params.manual_code}\n"
f"If using FabricSync Admin test app, you may type:\n"
f">>> pairing onnetwork 111 {setup_params.passcode}")

def steps_TC_MCORE_FS_1_2(self) -> list[TestStep]:
steps = [TestStep(1, "TH_FSA subscribes to all the Bridged Device Basic Information clusters provided by DUT_FSA to identify the presence of a Bridged Node endpoint with a UniqueID matching the UniqueID provided by the BasicInformationCluster of the TH_SED_DUT."),
TestStep(2, "TH_FSA initiates commissioning of TH_SED_DUT by sending the OpenCommissioningWindow command to the Administrator Commissioning Cluster on the endpoint with the uniqueID matching that of TH_SED_DUT."),
TestStep(3, "TH_FSA completes commissioning of TH_SED_DUT using the Enhanced Commissioning Method."),
TestStep(4, "Commission TH_SED_L onto DUT_FSA’s fabric using the manufacturer specified mechanism."),
TestStep(5, "TH_FSA waits for subscription report from a the Bridged Device Basic Information clusters provided by DUT_FSA to identify the presence of a Bridged Node endpoint with a UniqueID matching the UniqueID provided by the BasicInformationCluster of the TH_SED_L."),
TestStep(6, "TH_FSA initiates commissions of TH_SED_L by sending the OpenCommissioningWindow command to the Administrator Commissioning Cluster on the endpoint with the uniqueID matching that of TH_SED_L."),
TestStep(7, "TH_FSA completes commissioning of TH_SED_L using the Enhanced Commissioning Method.")]
steps = [TestStep(1, "TH subscribes to PartsList attribute of the Descriptor cluster of DUT_FSA endpoint 0."),
TestStep(2, "Follow manufacturer provided instructions to have DUT_FSA commission TH_SERVER"),
TestStep(3, "TH waits up to 30 seconds for subscription report from the PartsList attribute of the Descriptor to contain new endpoint"),

TestStep(4, "TH uses DUT to open commissioning window to TH_SERVER"),
TestStep(5, "TH commissions TH_SERVER"),
TestStep(6, "TH reads all attributes in Basic Information cluster from TH_SERVER directly"),
TestStep(7, "TH reads all attributes in the Bridged Device Basic Information cluster on new endpoint identified in step 3 from the DUT_FSA")]
return steps

@property
Expand All @@ -49,99 +124,60 @@ def default_timeout(self) -> int:
@async_test_body
async def test_TC_MCORE_FS_1_2(self):
self.is_ci = self.check_pics('PICS_SDK_CI_ONLY')

min_report_interval_sec = self.user_params.get("min_report_interval_sec", 0)
max_report_interval_sec = self.user_params.get("max_report_interval_sec", 2)
report_waiting_timeout_delay_sec = self.user_params.get("report_waiting_timeout_delay_sec", 10)
max_report_interval_sec = self.user_params.get("max_report_interval_sec", 30)
th_server_port = self.user_params.get("th_server_port", 5543)
self._th_server_app_path = self.user_params.get("th_server_app_path", None)
if not self._th_server_app_path:
asserts.fail('This test requires a TH_SERVER app. Specify app path with --string-arg th_server_app_path:<path_to_app>')
if not os.path.exists(self._th_server_app_path):
asserts.fail(f'The path {self._th_server_app_path} does not exist')

self.step(1)

# Subscribe to the UniqueIDs
unique_id_queue = queue.Queue()
# Subscribe to the PartsList
root_endpoint = 0
subscription_contents = [
(Clusters.BridgedDeviceBasicInformation.Attributes.UniqueID) # On all endpoints
(root_endpoint, Clusters.Descriptor.Attributes.PartsList)
]
sub = await self.default_controller.ReadAttribute(
nodeid=self.dut_node_id,
attributes=subscription_contents,
reportInterval=(min_report_interval_sec, max_report_interval_sec),
keepSubscriptions=False
)

parts_list_queue = queue.Queue()
attribute_handler = AttributeChangeAccumulator(
name=self.default_controller.name, expected_attribute=Clusters.BridgedDeviceBasicInformation.Attributes.UniqueID, output=unique_id_queue)
name=self.default_controller.name, expected_attribute=Clusters.Descriptor.Attributes.PartsList, output=parts_list_queue)
sub.SetAttributeUpdateCallback(attribute_handler)
cached_attributes = sub.GetAttributes()
step_1_dut_parts_list = cached_attributes[root_endpoint][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList]

logging.info("Waiting for First BridgedDeviceBasicInformation.")
start_time = time.time()
elapsed = 0
time_remaining = report_waiting_timeout_delay_sec

th_sed_dut_bdbi_endpoint = -1
th_sed_dut_unique_id = -1

while time_remaining > 0 and th_sed_dut_bdbi_endpoint < 0:
try:
item = unique_id_queue.get(block=True, timeout=time_remaining)
endpoint, attribute, value = item['endpoint'], item['attribute'], item['value']

# Record arrival of an expected subscription change when seen
if attribute == Clusters.BridgedDeviceBasicInformation.Attributes.UniqueID:
th_sed_dut_bdbi_endpoint = endpoint
th_sed_dut_unique_id = value

except queue.Empty:
# No error, we update timeouts and keep going
pass

elapsed = time.time() - start_time
time_remaining = report_waiting_timeout_delay_sec - elapsed

asserts.assert_greater(th_sed_dut_bdbi_endpoint, 0, "Failed to find any BDBI instances with UniqueID present.")
logging.info("Found BDBI with UniqueID (%d) on endpoint %d." % th_sed_dut_unique_id, th_sed_dut_bdbi_endpoint)
asserts.assert_true(type_matches(step_1_dut_parts_list, list), "PartsList is expected to be a list")

self.step(2)

self.sync_passcode = 20202024
self.th_sed_dut_discriminator = 2222
cmd = Clusters.AdministratorCommissioning.Commands.OpenCommissioningWindow(commissioningTimeout=3*60,
PAKEPasscodeVerifier=b"+w1qZQR05Zn0bc2LDyNaDAhsrhDS5iRHPTN10+EmNx8E2OpIPC4SjWRDQVOgqcbnXdYMlpiZ168xLBqn1fx9659gGK/7f9Yc6GxpoJH8kwAUYAYyLGsYeEBt1kL6kpXjgA==",
discriminator=self.th_sed_dut_discriminator,
iterations=10000, salt=base64.b64encode(bytes('SaltyMcSalterson', 'utf-8')))
await self.send_single_cmd(cmd, endpoint=th_sed_dut_bdbi_endpoint, timedRequestTimeoutMs=5000)

logging.info("Commissioning Window open for TH_SED_DUT.")
setup_params = await self._create_th_server(th_server_port)
self._ask_for_vendor_commissioning_ux_operation(setup_params)

self.step(3)

self.th_sed_dut_nodeid = 1111
await self.TH_server_controller.CommissionOnNetwork(nodeId=self.th_sed_dut_nodeid, setupPinCode=self.sync_passcode, filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=self.th_sed_dut_discriminator)
logging.info("Commissioning TH_SED_DUT complete")

self.step(4)
if not self.is_ci:
self.wait_for_user_input(
"Commission TH_SED_DUT onto DUT_FSA’s fabric using the manufacturer specified mechanism. (ensure Synchronization is enabled.)")
else:
logging.info("Stopping after step 3 while running in CI to avoid manual steps.")
return

self.step(5)

th_sed_later_bdbi_endpoint = -1
th_sed_later_unique_id = -1
logging.info("Waiting for Second BridgedDeviceBasicInformation.")
report_waiting_timeout_delay_sec = 30
logging.info("Waiting for update to PartsList.")
start_time = time.time()
elapsed = 0
time_remaining = report_waiting_timeout_delay_sec

while time_remaining > 0 and th_sed_later_bdbi_endpoint < 0:
parts_list_endpoint_count_from_step_1 = len(step_1_dut_parts_list)
step_3_dut_parts_list = None
while time_remaining > 0:
try:
item = unique_id_queue.get(block=True, timeout=time_remaining)
item = parts_list_queue.get(block=True, timeout=time_remaining)
endpoint, attribute, value = item['endpoint'], item['attribute'], item['value']

# Record arrival of an expected subscription change when seen
if attribute == Clusters.BridgedDeviceBasicInformation.Attributes.UniqueID and endpoint != th_sed_dut_bdbi_endpoint and th_sed_later_unique_id != th_sed_dut_unique_id:
th_sed_later_bdbi_endpoint = endpoint
th_sed_later_unique_id = value
if endpoint == root_endpoint and attribute == Clusters.Descriptor.Attributes.PartsList and len(value) > parts_list_endpoint_count_from_step_1:
step_3_dut_parts_list = value
break

except queue.Empty:
# No error, we update timeouts and keep going
Expand All @@ -150,26 +186,63 @@ async def test_TC_MCORE_FS_1_2(self):
elapsed = time.time() - start_time
time_remaining = report_waiting_timeout_delay_sec - elapsed

asserts.assert_greater(th_sed_later_bdbi_endpoint, 0, "Failed to find any BDBI instances with UniqueID present.")
logging.info("Found another BDBI with UniqueID (%d) on endpoint %d." % th_sed_later_unique_id, th_sed_later_bdbi_endpoint)
asserts.assert_not_equal(step_3_dut_parts_list, None, "Timed out getting updated PartsList with new endpoint")
set_of_step_1_parts_list_endpoint = set(step_1_dut_parts_list)
set_of_step_3_parts_list_endpoint = set(step_3_dut_parts_list)
unique_endpoints_set = set_of_step_3_parts_list_endpoint - set_of_step_1_parts_list_endpoint
asserts.assert_equal(len(unique_endpoints_set), 1, "Expected only one new endpoint")
newly_added_endpoint = list(unique_endpoints_set)[0]

self.step(6)
self.step(4)

self.th_sed_later_discriminator = 3333
# min commissioning timeout is 3*60 seconds, so use that even though the command said 30.
discriminator = 3840
passcode = 20202021
salt = secrets.token_bytes(16)
iterations = 2000
verifier = _generate_verifier(passcode, salt, iterations)

# min commissioning timeout is 3*60 seconds
cmd = Clusters.AdministratorCommissioning.Commands.OpenCommissioningWindow(commissioningTimeout=3*60,
PAKEPasscodeVerifier=b"+w1qZQR05Zn0bc2LDyNaDAhsrhDS5iRHPTN10+EmNx8E2OpIPC4SjWRDQVOgqcbnXdYMlpiZ168xLBqn1fx9659gGK/7f9Yc6GxpoJH8kwAUYAYyLGsYeEBt1kL6kpXjgA==",
discriminator=self.th_sed_later_discriminator,
iterations=10000, salt=base64.b64encode(bytes('SaltyMcSalterson', 'utf-8')))
await self.send_single_cmd(cmd, endpoint=th_sed_later_bdbi_endpoint, timedRequestTimeoutMs=5000)
PAKEPasscodeVerifier=verifier,
discriminator=discriminator,
iterations=iterations,
salt=salt)
await self.send_single_cmd(cmd, dev_ctrl=self.default_controller, node_id=self.dut_node_id, endpoint=newly_added_endpoint, timedRequestTimeoutMs=5000)

logging.info("Commissioning Window open for TH_SED_L.")
self.step(5)
self.th_server_local_nodeid = 1111
await self.default_controller.CommissionOnNetwork(nodeId=self.th_server_local_nodeid, setupPinCode=passcode, filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=discriminator)

self.step(7)
self.step(6)
th_server_directly_read_result = await self.default_controller.ReadAttribute(self.th_server_local_nodeid, [(root_endpoint, Clusters.BasicInformation)])
th_server_basic_info = th_server_directly_read_result[root_endpoint][Clusters.BasicInformation]

self.th_sed_later_nodeid = 2222
await self.TH_server_controller.CommissionOnNetwork(nodeId=self.th_sed_later_nodeid, setupPinCode=self.sync_passcode, filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=self.th_sed_later_discriminator)
logging.info("Commissioning TH_SED_L complete")
self.step(7)
dut_read = await self.default_controller.ReadAttribute(self.dut_node_id, [(newly_added_endpoint, Clusters.BridgedDeviceBasicInformation)])
bridged_info_for_th_server = dut_read[newly_added_endpoint][Clusters.BridgedDeviceBasicInformation]
basic_info_attr = Clusters.BasicInformation.Attributes
bridged_device_info_attr = Clusters.BridgedDeviceBasicInformation.Attributes
Clusters.BasicInformation.Attributes
asserts.assert_equal(th_server_basic_info[basic_info_attr.VendorName],
bridged_info_for_th_server[bridged_device_info_attr.VendorName], "VendorName incorrectly reported by DUT")
asserts.assert_equal(th_server_basic_info[basic_info_attr.VendorID],
bridged_info_for_th_server[bridged_device_info_attr.VendorID], "VendorID incorrectly reported by DUT")
asserts.assert_equal(th_server_basic_info[basic_info_attr.ProductName],
bridged_info_for_th_server[bridged_device_info_attr.ProductName], "ProductName incorrectly reported by DUT")
asserts.assert_equal(th_server_basic_info[basic_info_attr.ProductID],
bridged_info_for_th_server[bridged_device_info_attr.ProductID], "ProductID incorrectly reported by DUT")
asserts.assert_equal(th_server_basic_info[basic_info_attr.NodeLabel],
bridged_info_for_th_server[bridged_device_info_attr.NodeLabel], "NodeLabel incorrectly reported by DUT")
asserts.assert_equal(th_server_basic_info[basic_info_attr.HardwareVersion],
bridged_info_for_th_server[bridged_device_info_attr.HardwareVersion], "HardwareVersion incorrectly reported by DUT")
asserts.assert_equal(th_server_basic_info[basic_info_attr.HardwareVersionString],
bridged_info_for_th_server[bridged_device_info_attr.HardwareVersionString], "HardwareVersionString incorrectly reported by DUT")
asserts.assert_equal(th_server_basic_info[basic_info_attr.SoftwareVersion],
bridged_info_for_th_server[bridged_device_info_attr.SoftwareVersion], "SoftwareVersion incorrectly reported by DUT")
asserts.assert_equal(th_server_basic_info[basic_info_attr.SoftwareVersionString],
bridged_info_for_th_server[bridged_device_info_attr.SoftwareVersionString], "SoftwareVersionString incorrectly reported by DUT")
asserts.assert_equal(th_server_basic_info[basic_info_attr.UniqueID],
bridged_info_for_th_server[bridged_device_info_attr.UniqueID], "UniqueID incorrectly reported by DUT")


if __name__ == "__main__":
Expand Down

0 comments on commit 68f0407

Please sign in to comment.