From b3b2f98a0c53dac1672745ab29abe213fbca61ba Mon Sep 17 00:00:00 2001 From: Yufeng Wang Date: Thu, 29 Aug 2024 12:40:36 -0700 Subject: [PATCH] Update TC_MCORE_FS_1_3 to align with the Test Spec (#35270) * Update TC_MCORE_FS_1_3 to align with the test spec * Instead of th_server_app_path use th_server_no_uid_app_path --- src/python_testing/TC_MCORE_FS_1_3.py | 60 +++++++++------------------ 1 file changed, 19 insertions(+), 41 deletions(-) diff --git a/src/python_testing/TC_MCORE_FS_1_3.py b/src/python_testing/TC_MCORE_FS_1_3.py index e707b500a6fba8..4245bc139ceb38 100644 --- a/src/python_testing/TC_MCORE_FS_1_3.py +++ b/src/python_testing/TC_MCORE_FS_1_3.py @@ -15,7 +15,7 @@ # limitations under the License. # -# This test requires a TH_SERVER application that returns UnsupportedAttribute when reading UniqueID from BasicInformation Cluster. Please specify with --string-arg th_server_app_path: +# This test requires a TH_SERVER application that returns UnsupportedAttribute when reading UniqueID from BasicInformation Cluster. Please specify with --string-arg th_server_no_uid_app_path: import logging import os @@ -36,14 +36,10 @@ class TC_MCORE_FS_1_3(MatterBaseTest): @async_test_body async def setup_class(self): super().setup_class() - self.device_for_th_eco_nodeid = 1111 - self.device_for_th_eco_kvs = None - self.device_for_th_eco_port = 5543 - self.app_process_for_th_eco = None - - self.device_for_dut_eco_nodeid = 1112 - self.device_for_dut_eco_kvs = None - self.device_for_dut_eco_port = 5544 + + self.th_server_nodeid = 1111 + self.th_server_kvs = None + self.th_server_port = 5543 self.app_process_for_dut_eco = None # Create a second controller on a new fabric to communicate to the server @@ -57,20 +53,14 @@ def teardown_class(self): logging.warning("Stopping app with SIGTERM") self.app_process_for_dut_eco.send_signal(signal.SIGTERM.value) self.app_process_for_dut_eco.wait() - if self.app_process_for_th_eco is not None: - logging.warning("Stopping app with SIGTERM") - self.app_process_for_th_eco.send_signal(signal.SIGTERM.value) - self.app_process_for_th_eco.wait() - os.remove(self.device_for_dut_eco_kvs) - if self.device_for_th_eco_kvs is not None: - os.remove(self.device_for_th_eco_kvs) + os.remove(self.th_server_kvs) super().teardown_class() async def create_device_and_commission_to_th_fabric(self, kvs, port, node_id_for_th, device_info): - app = self.user_params.get("th_server_app_path", None) + app = self.user_params.get("th_server_no_uid_app_path", None) if not app: - asserts.fail('This test requires a TH_SERVER app. Specify app path with --string-arg th_server_app_path:') + asserts.fail('This test requires a TH_SERVER app. Specify app path with --string-arg th_server_no_uid_app_path:') if not os.path.exists(app): asserts.fail(f'The path {app} does not exist') @@ -95,33 +85,33 @@ async def create_device_and_commission_to_th_fabric(self, kvs, port, node_id_for logging.info("Commissioning device for DUT ecosystem onto TH for managing") def steps_TC_MCORE_FS_1_3(self) -> list[TestStep]: - steps = [TestStep(1, "DUT_FSA commissions TH_SED_DUT to DUT_FSAs fabric and generates a UniqueID", is_commissioning=True), - TestStep(2, "TH_FSA commissions TH_SED_TH onto TH_FSAs fabric and generates a UniqueID."), - TestStep(3, "Follow manufacturer provided instructions to enable DUT_FSA to synchronize TH_SED_TH onto DUT_FSAs fabric."), - TestStep(4, "DUT_FSA synchronizes TH_SED_TH onto DUT_FSAs fabric and copies the UniqueID presented by TH_FSAs Bridged Device Basic Information Cluster.")] + steps = [TestStep(1, "TH commissions TH_SERVER to TH’s fabric.", is_commissioning=True), + TestStep(2, "DUT_FSA commissions TH_SERVER to DUT_FSA’s fabric and generates a UniqueID.")] return steps @async_test_body async def test_TC_MCORE_FS_1_3(self): self.is_ci = self.check_pics('PICS_SDK_CI_ONLY') self.print_step(0, "Commissioning DUT to TH, already done") + self.step(1) - # These steps are not explicitly in step 1, but they help identify the dynamically added endpoint in step 1. root_node_endpoint = 0 root_part_list = await self.read_single_attribute_check_success(cluster=Clusters.Descriptor, attribute=Clusters.Descriptor.Attributes.PartsList, endpoint=root_node_endpoint) set_of_endpoints_before_adding_device = set(root_part_list) + logging.info(f"Set of endpoints before adding the device: {set_of_endpoints_before_adding_device}") kvs = f'kvs_{str(uuid.uuid4())}' - device_info = "for DUT ecosystem" - await self.create_device_and_commission_to_th_fabric(kvs, self.device_for_dut_eco_port, self.device_for_dut_eco_nodeid, device_info) + device_info = "for TH ecosystem" + await self.create_device_and_commission_to_th_fabric(kvs, self.th_server_port, self.th_server_nodeid, device_info) - self.device_for_dut_eco_kvs = kvs - read_result = await self.TH_server_controller.ReadAttribute(self.device_for_dut_eco_nodeid, [(root_node_endpoint, Clusters.BasicInformation.Attributes.UniqueID)]) + self.th_server_kvs = kvs + read_result = await self.TH_server_controller.ReadAttribute(self.th_server_nodeid, [(root_node_endpoint, Clusters.BasicInformation.Attributes.UniqueID)]) result = read_result[root_node_endpoint][Clusters.BasicInformation][Clusters.BasicInformation.Attributes.UniqueID] asserts.assert_true(type_matches(result, Clusters.Attribute.ValueDecodeFailure), "We were expecting a value decode failure") asserts.assert_equal(result.Reason.status, Status.UnsupportedAttribute, "Incorrect error returned from reading UniqueID") - params = await self.openCommissioningWindow(dev_ctrl=self.TH_server_controller, node_id=self.device_for_dut_eco_nodeid) + self.step(2) + params = await self.openCommissioningWindow(dev_ctrl=self.TH_server_controller, node_id=self.th_server_nodeid) self.wait_for_user_input( prompt_msg=f"Using the DUT vendor's provided interface, commission the device using the following parameters:\n" @@ -134,6 +124,7 @@ async def test_TC_MCORE_FS_1_3(self): root_part_list = await self.read_single_attribute_check_success(cluster=Clusters.Descriptor, attribute=Clusters.Descriptor.Attributes.PartsList, endpoint=root_node_endpoint) set_of_endpoints_after_adding_device = set(root_part_list) + logging.info(f"Set of endpoints after adding the device: {set_of_endpoints_after_adding_device}") asserts.assert_true(set_of_endpoints_after_adding_device.issuperset( set_of_endpoints_before_adding_device), "Expected only new endpoints to be added") @@ -145,19 +136,6 @@ async def test_TC_MCORE_FS_1_3(self): asserts.assert_true(type_matches(th_sed_dut_unique_id, str), "UniqueID should be a string") asserts.assert_true(th_sed_dut_unique_id, "UniqueID should not be an empty string") - self.step(2) - kvs = f'kvs_{str(uuid.uuid4())}' - device_info = "for TH_FSA ecosystem" - await self.create_device_and_commission_to_th_fabric(kvs, self.device_for_th_eco_port, self.device_for_th_eco_nodeid, device_info) - self.device_for_th_eco_kvs = kvs - # TODO(https://github.com/CHIP-Specifications/chip-test-plans/issues/4375) During setup we need to create the TH_FSA device - # where we would commission device created in create_device_and_commission_to_th_fabric to be commissioned into TH_FSA. - - # TODO(https://github.com/CHIP-Specifications/chip-test-plans/issues/4375) Because we cannot create a TH_FSA and there is - # no way to mock it the following 2 test steps are skipped for now. - self.skip_step(3) - self.skip_step(4) - if __name__ == "__main__": default_matter_test_main()