diff --git a/src/python_testing/TC_MCORE_FS_1_2.py b/src/python_testing/TC_MCORE_FS_1_2.py index 1085b0fac0a07c..c199225b33f99c 100644 --- a/src/python_testing/TC_MCORE_FS_1_2.py +++ b/src/python_testing/TC_MCORE_FS_1_2.py @@ -19,27 +19,102 @@ # for details about the block below. # -import base64 +import hashlib import logging +import os import queue +import secrets +import signal +import struct +import subprocess import time +import uuid +from dataclasses import dataclass import chip.clusters as Clusters from chip import ChipDeviceCtrl -from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main +from ecdsa.curves import NIST256p +from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main, type_matches from mobly import asserts from TC_SC_3_6 import AttributeChangeAccumulator +# Length of `w0s` and `w1s` elements +WS_LENGTH = NIST256p.baselen + 8 + + +def _generate_verifier(passcode: int, salt: bytes, iterations: int) -> bytes: + ws = hashlib.pbkdf2_hmac('sha256', struct.pack('>> pairing onnetwork 111 {setup_params.passcode}") + def steps_TC_MCORE_FS_1_2(self) -> list[TestStep]: - steps = [TestStep(1, "TH_FSA subscribes to all the Bridged Device Basic Information clusters provided by DUT_FSA to identify the presence of a Bridged Node endpoint with a UniqueID matching the UniqueID provided by the BasicInformationCluster of the TH_SED_DUT."), - TestStep(2, "TH_FSA initiates commissioning of TH_SED_DUT by sending the OpenCommissioningWindow command to the Administrator Commissioning Cluster on the endpoint with the uniqueID matching that of TH_SED_DUT."), - TestStep(3, "TH_FSA completes commissioning of TH_SED_DUT using the Enhanced Commissioning Method."), - TestStep(4, "Commission TH_SED_L onto DUT_FSA’s fabric using the manufacturer specified mechanism."), - TestStep(5, "TH_FSA waits for subscription report from a the Bridged Device Basic Information clusters provided by DUT_FSA to identify the presence of a Bridged Node endpoint with a UniqueID matching the UniqueID provided by the BasicInformationCluster of the TH_SED_L."), - TestStep(6, "TH_FSA initiates commissions of TH_SED_L by sending the OpenCommissioningWindow command to the Administrator Commissioning Cluster on the endpoint with the uniqueID matching that of TH_SED_L."), - TestStep(7, "TH_FSA completes commissioning of TH_SED_L using the Enhanced Commissioning Method.")] + steps = [TestStep(1, "TH subscribes to PartsList attribute of the Descriptor cluster of DUT_FSA endpoint 0."), + TestStep(2, "Follow manufacturer provided instructions to have DUT_FSA commission TH_SERVER"), + TestStep(3, "TH waits up to 30 seconds for subscription report from the PartsList attribute of the Descriptor to contain new endpoint"), + + TestStep(4, "TH uses DUT to open commissioning window to TH_SERVER"), + TestStep(5, "TH commissions TH_SERVER"), + TestStep(6, "TH reads all attributes in Basic Information cluster from TH_SERVER directly"), + TestStep(7, "TH reads all attributes in the Bridged Device Basic Information cluster on new endpoint identified in step 3 from the DUT_FSA")] return steps @property @@ -49,16 +124,21 @@ def default_timeout(self) -> int: @async_test_body async def test_TC_MCORE_FS_1_2(self): self.is_ci = self.check_pics('PICS_SDK_CI_ONLY') + min_report_interval_sec = self.user_params.get("min_report_interval_sec", 0) - max_report_interval_sec = self.user_params.get("max_report_interval_sec", 2) - report_waiting_timeout_delay_sec = self.user_params.get("report_waiting_timeout_delay_sec", 10) + max_report_interval_sec = self.user_params.get("max_report_interval_sec", 30) + th_server_port = self.user_params.get("th_server_port", 5543) + self._th_server_app_path = self.user_params.get("th_server_app_path", None) + if not self._th_server_app_path: + asserts.fail('This test requires a TH_SERVER app. Specify app path with --string-arg th_server_app_path:') + if not os.path.exists(self._th_server_app_path): + asserts.fail(f'The path {self._th_server_app_path} does not exist') self.step(1) - - # Subscribe to the UniqueIDs - unique_id_queue = queue.Queue() + # Subscribe to the PartsList + root_endpoint = 0 subscription_contents = [ - (Clusters.BridgedDeviceBasicInformation.Attributes.UniqueID) # On all endpoints + (root_endpoint, Clusters.Descriptor.Attributes.PartsList) ] sub = await self.default_controller.ReadAttribute( nodeid=self.dut_node_id, @@ -66,82 +146,38 @@ async def test_TC_MCORE_FS_1_2(self): reportInterval=(min_report_interval_sec, max_report_interval_sec), keepSubscriptions=False ) + + parts_list_queue = queue.Queue() attribute_handler = AttributeChangeAccumulator( - name=self.default_controller.name, expected_attribute=Clusters.BridgedDeviceBasicInformation.Attributes.UniqueID, output=unique_id_queue) + name=self.default_controller.name, expected_attribute=Clusters.Descriptor.Attributes.PartsList, output=parts_list_queue) sub.SetAttributeUpdateCallback(attribute_handler) + cached_attributes = sub.GetAttributes() + step_1_dut_parts_list = cached_attributes[root_endpoint][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList] - logging.info("Waiting for First BridgedDeviceBasicInformation.") - start_time = time.time() - elapsed = 0 - time_remaining = report_waiting_timeout_delay_sec - - th_sed_dut_bdbi_endpoint = -1 - th_sed_dut_unique_id = -1 - - while time_remaining > 0 and th_sed_dut_bdbi_endpoint < 0: - try: - item = unique_id_queue.get(block=True, timeout=time_remaining) - endpoint, attribute, value = item['endpoint'], item['attribute'], item['value'] - - # Record arrival of an expected subscription change when seen - if attribute == Clusters.BridgedDeviceBasicInformation.Attributes.UniqueID: - th_sed_dut_bdbi_endpoint = endpoint - th_sed_dut_unique_id = value - - except queue.Empty: - # No error, we update timeouts and keep going - pass - - elapsed = time.time() - start_time - time_remaining = report_waiting_timeout_delay_sec - elapsed - - asserts.assert_greater(th_sed_dut_bdbi_endpoint, 0, "Failed to find any BDBI instances with UniqueID present.") - logging.info("Found BDBI with UniqueID (%d) on endpoint %d." % th_sed_dut_unique_id, th_sed_dut_bdbi_endpoint) + asserts.assert_true(type_matches(step_1_dut_parts_list, list), "PartsList is expected to be a list") self.step(2) - - self.sync_passcode = 20202024 - self.th_sed_dut_discriminator = 2222 - cmd = Clusters.AdministratorCommissioning.Commands.OpenCommissioningWindow(commissioningTimeout=3*60, - PAKEPasscodeVerifier=b"+w1qZQR05Zn0bc2LDyNaDAhsrhDS5iRHPTN10+EmNx8E2OpIPC4SjWRDQVOgqcbnXdYMlpiZ168xLBqn1fx9659gGK/7f9Yc6GxpoJH8kwAUYAYyLGsYeEBt1kL6kpXjgA==", - discriminator=self.th_sed_dut_discriminator, - iterations=10000, salt=base64.b64encode(bytes('SaltyMcSalterson', 'utf-8'))) - await self.send_single_cmd(cmd, endpoint=th_sed_dut_bdbi_endpoint, timedRequestTimeoutMs=5000) - - logging.info("Commissioning Window open for TH_SED_DUT.") + setup_params = await self._create_th_server(th_server_port) + self._ask_for_vendor_commissioning_ux_operation(setup_params) self.step(3) - - self.th_sed_dut_nodeid = 1111 - await self.TH_server_controller.CommissionOnNetwork(nodeId=self.th_sed_dut_nodeid, setupPinCode=self.sync_passcode, filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=self.th_sed_dut_discriminator) - logging.info("Commissioning TH_SED_DUT complete") - - self.step(4) - if not self.is_ci: - self.wait_for_user_input( - "Commission TH_SED_DUT onto DUT_FSA’s fabric using the manufacturer specified mechanism. (ensure Synchronization is enabled.)") - else: - logging.info("Stopping after step 3 while running in CI to avoid manual steps.") - return - - self.step(5) - - th_sed_later_bdbi_endpoint = -1 - th_sed_later_unique_id = -1 - logging.info("Waiting for Second BridgedDeviceBasicInformation.") + report_waiting_timeout_delay_sec = 30 + logging.info("Waiting for update to PartsList.") start_time = time.time() elapsed = 0 time_remaining = report_waiting_timeout_delay_sec - while time_remaining > 0 and th_sed_later_bdbi_endpoint < 0: + parts_list_endpoint_count_from_step_1 = len(step_1_dut_parts_list) + step_3_dut_parts_list = None + while time_remaining > 0: try: - item = unique_id_queue.get(block=True, timeout=time_remaining) + item = parts_list_queue.get(block=True, timeout=time_remaining) endpoint, attribute, value = item['endpoint'], item['attribute'], item['value'] # Record arrival of an expected subscription change when seen - if attribute == Clusters.BridgedDeviceBasicInformation.Attributes.UniqueID and endpoint != th_sed_dut_bdbi_endpoint and th_sed_later_unique_id != th_sed_dut_unique_id: - th_sed_later_bdbi_endpoint = endpoint - th_sed_later_unique_id = value + if endpoint == root_endpoint and attribute == Clusters.Descriptor.Attributes.PartsList and len(value) > parts_list_endpoint_count_from_step_1: + step_3_dut_parts_list = value + break except queue.Empty: # No error, we update timeouts and keep going @@ -150,26 +186,63 @@ async def test_TC_MCORE_FS_1_2(self): elapsed = time.time() - start_time time_remaining = report_waiting_timeout_delay_sec - elapsed - asserts.assert_greater(th_sed_later_bdbi_endpoint, 0, "Failed to find any BDBI instances with UniqueID present.") - logging.info("Found another BDBI with UniqueID (%d) on endpoint %d." % th_sed_later_unique_id, th_sed_later_bdbi_endpoint) + asserts.assert_not_equal(step_3_dut_parts_list, None, "Timed out getting updated PartsList with new endpoint") + set_of_step_1_parts_list_endpoint = set(step_1_dut_parts_list) + set_of_step_3_parts_list_endpoint = set(step_3_dut_parts_list) + unique_endpoints_set = set_of_step_3_parts_list_endpoint - set_of_step_1_parts_list_endpoint + asserts.assert_equal(len(unique_endpoints_set), 1, "Expected only one new endpoint") + newly_added_endpoint = list(unique_endpoints_set)[0] - self.step(6) + self.step(4) - self.th_sed_later_discriminator = 3333 - # min commissioning timeout is 3*60 seconds, so use that even though the command said 30. + discriminator = 3840 + passcode = 20202021 + salt = secrets.token_bytes(16) + iterations = 2000 + verifier = _generate_verifier(passcode, salt, iterations) + + # min commissioning timeout is 3*60 seconds cmd = Clusters.AdministratorCommissioning.Commands.OpenCommissioningWindow(commissioningTimeout=3*60, - PAKEPasscodeVerifier=b"+w1qZQR05Zn0bc2LDyNaDAhsrhDS5iRHPTN10+EmNx8E2OpIPC4SjWRDQVOgqcbnXdYMlpiZ168xLBqn1fx9659gGK/7f9Yc6GxpoJH8kwAUYAYyLGsYeEBt1kL6kpXjgA==", - discriminator=self.th_sed_later_discriminator, - iterations=10000, salt=base64.b64encode(bytes('SaltyMcSalterson', 'utf-8'))) - await self.send_single_cmd(cmd, endpoint=th_sed_later_bdbi_endpoint, timedRequestTimeoutMs=5000) + PAKEPasscodeVerifier=verifier, + discriminator=discriminator, + iterations=iterations, + salt=salt) + await self.send_single_cmd(cmd, dev_ctrl=self.default_controller, node_id=self.dut_node_id, endpoint=newly_added_endpoint, timedRequestTimeoutMs=5000) - logging.info("Commissioning Window open for TH_SED_L.") + self.step(5) + self.th_server_local_nodeid = 1111 + await self.default_controller.CommissionOnNetwork(nodeId=self.th_server_local_nodeid, setupPinCode=passcode, filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=discriminator) - self.step(7) + self.step(6) + th_server_directly_read_result = await self.default_controller.ReadAttribute(self.th_server_local_nodeid, [(root_endpoint, Clusters.BasicInformation)]) + th_server_basic_info = th_server_directly_read_result[root_endpoint][Clusters.BasicInformation] - self.th_sed_later_nodeid = 2222 - await self.TH_server_controller.CommissionOnNetwork(nodeId=self.th_sed_later_nodeid, setupPinCode=self.sync_passcode, filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=self.th_sed_later_discriminator) - logging.info("Commissioning TH_SED_L complete") + self.step(7) + dut_read = await self.default_controller.ReadAttribute(self.dut_node_id, [(newly_added_endpoint, Clusters.BridgedDeviceBasicInformation)]) + bridged_info_for_th_server = dut_read[newly_added_endpoint][Clusters.BridgedDeviceBasicInformation] + basic_info_attr = Clusters.BasicInformation.Attributes + bridged_device_info_attr = Clusters.BridgedDeviceBasicInformation.Attributes + Clusters.BasicInformation.Attributes + asserts.assert_equal(th_server_basic_info[basic_info_attr.VendorName], + bridged_info_for_th_server[bridged_device_info_attr.VendorName], "VendorName incorrectly reported by DUT") + asserts.assert_equal(th_server_basic_info[basic_info_attr.VendorID], + bridged_info_for_th_server[bridged_device_info_attr.VendorID], "VendorID incorrectly reported by DUT") + asserts.assert_equal(th_server_basic_info[basic_info_attr.ProductName], + bridged_info_for_th_server[bridged_device_info_attr.ProductName], "ProductName incorrectly reported by DUT") + asserts.assert_equal(th_server_basic_info[basic_info_attr.ProductID], + bridged_info_for_th_server[bridged_device_info_attr.ProductID], "ProductID incorrectly reported by DUT") + asserts.assert_equal(th_server_basic_info[basic_info_attr.NodeLabel], + bridged_info_for_th_server[bridged_device_info_attr.NodeLabel], "NodeLabel incorrectly reported by DUT") + asserts.assert_equal(th_server_basic_info[basic_info_attr.HardwareVersion], + bridged_info_for_th_server[bridged_device_info_attr.HardwareVersion], "HardwareVersion incorrectly reported by DUT") + asserts.assert_equal(th_server_basic_info[basic_info_attr.HardwareVersionString], + bridged_info_for_th_server[bridged_device_info_attr.HardwareVersionString], "HardwareVersionString incorrectly reported by DUT") + asserts.assert_equal(th_server_basic_info[basic_info_attr.SoftwareVersion], + bridged_info_for_th_server[bridged_device_info_attr.SoftwareVersion], "SoftwareVersion incorrectly reported by DUT") + asserts.assert_equal(th_server_basic_info[basic_info_attr.SoftwareVersionString], + bridged_info_for_th_server[bridged_device_info_attr.SoftwareVersionString], "SoftwareVersionString incorrectly reported by DUT") + asserts.assert_equal(th_server_basic_info[basic_info_attr.UniqueID], + bridged_info_for_th_server[bridged_device_info_attr.UniqueID], "UniqueID incorrectly reported by DUT") if __name__ == "__main__":