From 975b15e5d8ea80171edb7ddad32eb969a6fa04a6 Mon Sep 17 00:00:00 2001 From: Arkadiusz Bokowy Date: Fri, 20 Sep 2024 05:30:19 +0200 Subject: [PATCH] [Fabric-Sync] Run MCORE-FS-1.3 and MCORE-FS-1.4 on CI (#35402) * [Fabric-Sync] Run MCORE-FS-1.3 and MCORE-FS-1.4 on CI * Adopt TC_MCORE_FS_1_1 to run in CI * Reuse AppServer from TC_MCORE_FS_1_1 * Fix typo * Reuse AppServer from TC_MCORE_FS_1_1 * Restyled by isort * Fix TH server app name * Add json and perfetto tracing * Do not exit fabric-sync-app before apps are terminated * Wait for process termination * Turn off verbose output --------- Co-authored-by: Restyled.io --- .../fabric-admin/scripts/fabric-sync-app.py | 55 ++++----- src/python_testing/TC_MCORE_FS_1_1.py | 104 ++++++++++++------ src/python_testing/TC_MCORE_FS_1_2.py | 83 +++++++------- src/python_testing/TC_MCORE_FS_1_3.py | 27 +---- src/python_testing/TC_MCORE_FS_1_4.py | 62 ++++------- src/python_testing/TC_MCORE_FS_1_5.py | 84 +++++++------- src/python_testing/execute_python_tests.py | 3 - .../chip/testing/tasks.py | 7 +- 8 files changed, 209 insertions(+), 216 deletions(-) diff --git a/examples/fabric-admin/scripts/fabric-sync-app.py b/examples/fabric-admin/scripts/fabric-sync-app.py index 3967f97478940e..797a08a9451047 100755 --- a/examples/fabric-admin/scripts/fabric-sync-app.py +++ b/examples/fabric-admin/scripts/fabric-sync-app.py @@ -31,10 +31,7 @@ async def forward_f(prefix: bytes, f_in: asyncio.StreamReader, This function can optionally feed received lines to a callback function. """ - while True: - line = await f_in.readline() - if not line: - break + while line := await f_in.readline(): if cb is not None: cb(line) f_out.buffer.write(prefix) @@ -68,11 +65,7 @@ async def forward_stdin(f_out: asyncio.StreamWriter): reader = asyncio.StreamReader() protocol = asyncio.StreamReaderProtocol(reader) await loop.connect_read_pipe(lambda: protocol, sys.stdin) - while True: - line = await reader.readline() - if not line: - # Exit on Ctrl-D (EOF). - sys.exit(0) + while line := await reader.readline(): f_out.write(line) await f_out.drain() @@ -206,12 +199,16 @@ async def main(args): passcode=args.passcode, )) + loop = asyncio.get_event_loop() + def terminate(): - admin.terminate() - bridge.terminate() - sys.exit(0) + with contextlib.suppress(ProcessLookupError): + admin.terminate() + with contextlib.suppress(ProcessLookupError): + bridge.terminate() + loop.remove_signal_handler(signal.SIGINT) + loop.remove_signal_handler(signal.SIGTERM) - loop = asyncio.get_event_loop() loop.add_signal_handler(signal.SIGINT, terminate) loop.add_signal_handler(signal.SIGTERM, terminate) @@ -238,7 +235,8 @@ def terminate(): cmd, # Wait for the log message indicating that the bridge has been # added to the fabric. - f"Commissioning complete for node ID {bridge_node_id:#018x}: success") + f"Commissioning complete for node ID {bridge_node_id:#018x}: success", + timeout=30) # Open commissioning window with original setup code for the bridge. cw_endpoint_id = 0 @@ -250,18 +248,23 @@ def terminate(): f" {cw_option} {cw_timeout} {cw_iteration} {cw_discriminator}") try: - await asyncio.gather( - forward_pipe(pipe, admin.p.stdin) if pipe else forward_stdin(admin.p.stdin), - admin.wait(), - bridge.wait(), - ) - except SystemExit: - admin.terminate() - bridge.terminate() - except Exception: - admin.terminate() - bridge.terminate() - raise + forward = forward_pipe(pipe, admin.p.stdin) if pipe else forward_stdin(admin.p.stdin) + # Wait for any of the tasks to complete. + _, pending = await asyncio.wait([ + asyncio.create_task(admin.wait()), + asyncio.create_task(bridge.wait()), + asyncio.create_task(forward), + ], return_when=asyncio.FIRST_COMPLETED) + # Cancel the remaining tasks. + for task in pending: + task.cancel() + except Exception as e: + print(e, file=sys.stderr) + + terminate() + # Make sure that we will not return until both processes are terminated. + await admin.wait() + await bridge.wait() if __name__ == "__main__": diff --git a/src/python_testing/TC_MCORE_FS_1_1.py b/src/python_testing/TC_MCORE_FS_1_1.py index 4ea9d362655884..0d77bad8c6f268 100755 --- a/src/python_testing/TC_MCORE_FS_1_1.py +++ b/src/python_testing/TC_MCORE_FS_1_1.py @@ -17,44 +17,83 @@ # This test requires a TH_SERVER application. Please specify with --string-arg th_server_app_path: +# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments +# for details about the block below. +# +# === BEGIN CI TEST ARGUMENTS === +# test-runner-runs: run1 +# test-runner-run/run1/app: examples/fabric-admin/scripts/fabric-sync-app.py +# test-runner-run/run1/app-args: --app-admin=${FABRIC_ADMIN_APP} --app-bridge=${FABRIC_BRIDGE_APP} --stdin-pipe=dut-fsa-stdin --discriminator=1234 +# test-runner-run/run1/factoryreset: true +# test-runner-run/run1/script-args: --PICS src/app/tests/suites/certification/ci-pics-values --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --string-arg th_server_app_path:${ALL_CLUSTERS_APP} --trace-to json:${TRACE_TEST_JSON}.json --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto +# test-runner-run/run1/script-start-delay: 5 +# test-runner-run/run1/quiet: true +# === END CI TEST ARGUMENTS === + import logging import os import random -import signal -import subprocess +import tempfile import time -import uuid import chip.clusters as Clusters from chip import ChipDeviceCtrl +from chip.testing.tasks import Subprocess from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main from mobly import asserts +class AppServer(Subprocess): + """Wrapper class for starting an application server in a subprocess.""" + + # Prefix for log messages from the application server. + PREFIX = "[SERVER]" + + def __init__(self, app: str, storage_dir: str, discriminator: int, passcode: int, port: int = 5540): + storage_kvs_dir = tempfile.mkstemp(dir=storage_dir, prefix="kvs-app-")[1] + # Start the server application with dedicated KVS storage. + super().__init__(app, "--KVS", storage_kvs_dir, + '--secured-device-port', str(port), + "--discriminator", str(discriminator), + "--passcode", str(passcode), + prefix=self.PREFIX) + + def start(self): + # Start process and block until it prints the expected output. + super().start(expected_output="Server initialization complete") + + class TC_MCORE_FS_1_1(MatterBaseTest): @async_test_body async def setup_class(self): super().setup_class() - self.app_process = None - app = self.user_params.get("th_server_app_path", None) - if not app: - asserts.fail('This test requires a TH_SERVER app. Specify app path with --string-arg th_server_app_path:') - - self.kvs = f'kvs_{str(uuid.uuid4())}' - self.port = 5543 - discriminator = random.randint(0, 4095) - passcode = 20202021 - cmd = [app] - cmd.extend(['--secured-device-port', str(5543)]) - cmd.extend(['--discriminator', str(discriminator)]) - cmd.extend(['--passcode', str(passcode)]) - cmd.extend(['--KVS', self.kvs]) - # TODO: Determine if we want these logs cooked or pushed to somewhere else - logging.info("Starting application to acts mock a server portion of TH_FSA") - self.app_process = subprocess.Popen(cmd) - logging.info("Started application to acts mock a server portion of TH_FSA") - time.sleep(3) + + self.th_server = None + self.storage = None + + th_server_app = self.user_params.get("th_server_app_path", None) + if not th_server_app: + asserts.fail("This test requires a TH_SERVER app. Specify app path with --string-arg th_server_app_path:") + if not os.path.exists(th_server_app): + asserts.fail(f"The path {th_server_app} does not exist") + + # Create a temporary storage directory for keeping KVS files. + self.storage = tempfile.TemporaryDirectory(prefix=self.__class__.__name__) + logging.info("Temporary storage directory: %s", self.storage.name) + + self.th_server_port = 5543 + self.th_server_discriminator = random.randint(0, 4095) + self.th_server_passcode = 20202021 + + # Start the TH_SERVER_NO_UID app. + self.th_server = AppServer( + th_server_app, + storage_dir=self.storage.name, + port=self.th_server_port, + discriminator=self.th_server_discriminator, + passcode=self.th_server_passcode) + self.th_server.start() logging.info("Commissioning from separate fabric") # Create a second controller on a new fabric to communicate to the server @@ -63,25 +102,24 @@ async def setup_class(self): paa_path = str(self.matter_test_config.paa_trust_store_path) self.TH_server_controller = new_fabric_admin.NewController(nodeId=112233, paaTrustStorePath=paa_path) self.server_nodeid = 1111 - await self.TH_server_controller.CommissionOnNetwork(nodeId=self.server_nodeid, setupPinCode=passcode, filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=discriminator) + await self.TH_server_controller.CommissionOnNetwork( + nodeId=self.server_nodeid, + setupPinCode=self.th_server_passcode, + filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, + filter=self.th_server_discriminator) logging.info("Commissioning TH_SERVER complete") def teardown_class(self): - # In case the th_server_app_path does not exist, then we failed the test - # and there is nothing to remove - if self.app_process is not None: - logging.warning("Stopping app with SIGTERM") - self.app_process.send_signal(signal.SIGTERM.value) - self.app_process.wait() - - if os.path.exists(self.kvs): - os.remove(self.kvs) + if self.th_server is not None: + self.th_server.terminate() + if self.storage is not None: + self.storage.cleanup() super().teardown_class() def steps_TC_MCORE_FS_1_1(self) -> list[TestStep]: steps = [TestStep(1, "Enable Fabric Synchronization on DUT_FSA using the manufacturer specified mechanism.", is_commissioning=True), TestStep(2, "Commission DUT_FSA onto TH_FSA fabric."), - TestStep(3, "Reverse Commision Commission TH_FSAs onto DUT_FSA fabric."), + TestStep(3, "Reverse Commission TH_FSAs onto DUT_FSA fabric."), TestStep("3a", "TH_FSA sends RequestCommissioningApproval"), TestStep("3b", "TH_FSA sends CommissionNode"), TestStep("3c", "DUT_FSA commissions TH_FSA")] diff --git a/src/python_testing/TC_MCORE_FS_1_2.py b/src/python_testing/TC_MCORE_FS_1_2.py index b18dcc5b42ef64..3c155e9d188334 100644 --- a/src/python_testing/TC_MCORE_FS_1_2.py +++ b/src/python_testing/TC_MCORE_FS_1_2.py @@ -24,11 +24,9 @@ import os import queue import secrets -import signal import struct -import subprocess +import tempfile import time -import uuid from dataclasses import dataclass import chip.clusters as Clusters @@ -36,6 +34,7 @@ from ecdsa.curves import NIST256p from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main, type_matches from mobly import asserts +from TC_MCORE_FS_1_1 import AppServer from TC_SC_3_6 import AttributeChangeAccumulator # Length of `w0s` and `w1s` elements @@ -52,7 +51,7 @@ def _generate_verifier(passcode: int, salt: bytes, iterations: int) -> bytes: @dataclass -class _SetupParamters: +class _SetupParameters: setup_qr_code: str manual_code: int discriminator: int @@ -63,45 +62,49 @@ class TC_MCORE_FS_1_2(MatterBaseTest): @async_test_body async def setup_class(self): super().setup_class() + self._partslist_subscription = None - self._app_th_server_process = None - self._th_server_kvs = None + self.th_server = None + self.storage = None + + th_server_port = self.user_params.get("th_server_port", 5543) + th_server_app = self.user_params.get("th_server_app_path", None) + if not th_server_app: + asserts.fail('This test requires a TH_SERVER app. Specify app path with --string-arg th_server_app_path:') + if not os.path.exists(th_server_app): + asserts.fail(f'The path {th_server_app} does not exist') + + # Create a temporary storage directory for keeping KVS files. + self.storage = tempfile.TemporaryDirectory(prefix=self.__class__.__name__) + logging.info("Temporary storage directory: %s", self.storage.name) + + self.th_server_port = th_server_port + self.th_server_setup_params = _SetupParameters( + setup_qr_code="MT:-24J0AFN00KA0648G00", + manual_code=34970112332, + discriminator=3840, + passcode=20202021) + + # Start the TH_SERVER_NO_UID app. + self.th_server = AppServer( + th_server_app, + storage_dir=self.storage.name, + port=self.th_server_port, + discriminator=self.th_server_setup_params.discriminator, + passcode=self.th_server_setup_params.passcode) + self.th_server.start() def teardown_class(self): if self._partslist_subscription is not None: self._partslist_subscription.Shutdown() self._partslist_subscription = None - - if self._app_th_server_process is not None: - logging.warning("Stopping app with SIGTERM") - self._app_th_server_process.send_signal(signal.SIGTERM.value) - self._app_th_server_process.wait() - - if self._th_server_kvs is not None: - os.remove(self._th_server_kvs) + if self.th_server is not None: + self.th_server.terminate() + if self.storage is not None: + self.storage.cleanup() super().teardown_class() - async def _create_th_server(self, port): - # These are default testing values - setup_params = _SetupParamters(setup_qr_code="MT:-24J0AFN00KA0648G00", - manual_code=34970112332, discriminator=3840, passcode=20202021) - kvs = f'kvs_{str(uuid.uuid4())}' - - cmd = [self._th_server_app_path] - cmd.extend(['--secured-device-port', str(port)]) - cmd.extend(['--discriminator', str(setup_params.discriminator)]) - cmd.extend(['--passcode', str(setup_params.passcode)]) - cmd.extend(['--KVS', kvs]) - - # TODO: Determine if we want these logs cooked or pushed to somewhere else - logging.info("Starting TH_SERVER") - self._app_th_server_process = subprocess.Popen(cmd) - self._th_server_kvs = kvs - logging.info("Started TH_SERVER") - time.sleep(3) - return setup_params - - def _ask_for_vendor_commissioning_ux_operation(self, setup_params: _SetupParamters): + def _ask_for_vendor_commissioning_ux_operation(self, setup_params: _SetupParameters): self.wait_for_user_input( prompt_msg=f"Using the DUT vendor's provided interface, commission the ICD device using the following parameters:\n" f"- discriminator: {setup_params.discriminator}\n" @@ -115,7 +118,6 @@ def steps_TC_MCORE_FS_1_2(self) -> list[TestStep]: steps = [TestStep(1, "TH subscribes to PartsList attribute of the Descriptor cluster of DUT_FSA endpoint 0."), TestStep(2, "Follow manufacturer provided instructions to have DUT_FSA commission TH_SERVER"), TestStep(3, "TH waits up to 30 seconds for subscription report from the PartsList attribute of the Descriptor to contain new endpoint"), - TestStep(4, "TH uses DUT to open commissioning window to TH_SERVER"), TestStep(5, "TH commissions TH_SERVER"), TestStep(6, "TH reads all attributes in Basic Information cluster from TH_SERVER directly"), @@ -134,12 +136,6 @@ async def test_TC_MCORE_FS_1_2(self): min_report_interval_sec = self.user_params.get("min_report_interval_sec", 0) max_report_interval_sec = self.user_params.get("max_report_interval_sec", 30) - th_server_port = self.user_params.get("th_server_port", 5543) - self._th_server_app_path = self.user_params.get("th_server_app_path", None) - if not self._th_server_app_path: - asserts.fail('This test requires a TH_SERVER app. Specify app path with --string-arg th_server_app_path:') - if not os.path.exists(self._th_server_app_path): - asserts.fail(f'The path {self._th_server_app_path} does not exist') self.step(1) # Subscribe to the PartsList @@ -164,8 +160,7 @@ async def test_TC_MCORE_FS_1_2(self): asserts.assert_true(type_matches(step_1_dut_parts_list, list), "PartsList is expected to be a list") self.step(2) - setup_params = await self._create_th_server(th_server_port) - self._ask_for_vendor_commissioning_ux_operation(setup_params) + self._ask_for_vendor_commissioning_ux_operation(self.th_server_setup_params) self.step(3) report_waiting_timeout_delay_sec = 30 diff --git a/src/python_testing/TC_MCORE_FS_1_3.py b/src/python_testing/TC_MCORE_FS_1_3.py index 1270b5682e8ad2..e57487a24966a2 100644 --- a/src/python_testing/TC_MCORE_FS_1_3.py +++ b/src/python_testing/TC_MCORE_FS_1_3.py @@ -26,10 +26,10 @@ # test-runner-runs: run1 # test-runner-run/run1/app: examples/fabric-admin/scripts/fabric-sync-app.py # test-runner-run/run1/app-args: --app-admin=${FABRIC_ADMIN_APP} --app-bridge=${FABRIC_BRIDGE_APP} --stdin-pipe=dut-fsa-stdin --discriminator=1234 -# test-runner-run/run1/factoryreset: True -# test-runner-run/run1/script-args: --PICS src/app/tests/suites/certification/ci-pics-values --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --string-arg th_server_no_uid_app_path:${LIGHTING_APP_NO_UNIQUE_ID} +# test-runner-run/run1/factoryreset: true +# test-runner-run/run1/script-args: --PICS src/app/tests/suites/certification/ci-pics-values --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --string-arg th_server_no_uid_app_path:${LIGHTING_APP_NO_UNIQUE_ID} --trace-to json:${TRACE_TEST_JSON}.json --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto # test-runner-run/run1/script-start-delay: 5 -# test-runner-run/run1/quiet: false +# test-runner-run/run1/quiet: true # === END CI TEST ARGUMENTS === import asyncio @@ -41,28 +41,9 @@ import chip.clusters as Clusters from chip import ChipDeviceCtrl from chip.interaction_model import Status -from chip.testing.tasks import Subprocess from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main, type_matches from mobly import asserts - - -class AppServer: - - def __init__(self, app, storage_dir, port=None, discriminator=None, passcode=None): - args = [ - "--KVS", tempfile.mkstemp(dir=storage_dir, prefix="kvs-app-")[1], - ] - args.extend(['--secured-device-port', str(port)]) - args.extend(["--discriminator", str(discriminator)]) - args.extend(["--passcode", str(passcode)]) - self.app = Subprocess(app, *args, prefix="[SERVER]") - - def start(self): - # Start process and block until it prints the expected output. - self.app.start(expected_output="Server initialization complete") - - def terminate(self): - self.app.terminate() +from TC_MCORE_FS_1_1 import AppServer class TC_MCORE_FS_1_3(MatterBaseTest): diff --git a/src/python_testing/TC_MCORE_FS_1_4.py b/src/python_testing/TC_MCORE_FS_1_4.py index 7b101bb9273349..d76b0625202e1f 100644 --- a/src/python_testing/TC_MCORE_FS_1_4.py +++ b/src/python_testing/TC_MCORE_FS_1_4.py @@ -26,10 +26,10 @@ # test-runner-runs: run1 # test-runner-run/run1/app: examples/fabric-admin/scripts/fabric-sync-app.py # test-runner-run/run1/app-args: --app-admin=${FABRIC_ADMIN_APP} --app-bridge=${FABRIC_BRIDGE_APP} --stdin-pipe=dut-fsa-stdin --discriminator=1234 -# test-runner-run/run1/factoryreset: True -# test-runner-run/run1/script-args: --PICS src/app/tests/suites/certification/ci-pics-values --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --string-arg th_fsa_app_path:examples/fabric-admin/scripts/fabric-sync-app.py th_fsa_admin_path:${FABRIC_ADMIN_APP} th_fsa_bridge_path:${FABRIC_BRIDGE_APP} th_server_no_uid_app_path:${LIGHTING_APP_NO_UNIQUE_ID} dut_fsa_stdin_pipe:dut-fsa-stdin +# test-runner-run/run1/factoryreset: true +# test-runner-run/run1/script-args: --PICS src/app/tests/suites/certification/ci-pics-values --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --string-arg th_fsa_app_path:examples/fabric-admin/scripts/fabric-sync-app.py th_fsa_admin_path:${FABRIC_ADMIN_APP} th_fsa_bridge_path:${FABRIC_BRIDGE_APP} th_server_no_uid_app_path:${LIGHTING_APP_NO_UNIQUE_ID} dut_fsa_stdin_pipe:dut-fsa-stdin --trace-to json:${TRACE_TEST_JSON}.json --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto # test-runner-run/run1/script-start-delay: 5 -# test-runner-run/run1/quiet: false +# test-runner-run/run1/quiet: true # === END CI TEST ARGUMENTS === import asyncio @@ -44,14 +44,16 @@ from chip.testing.tasks import Subprocess from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main, type_matches from mobly import asserts +from TC_MCORE_FS_1_1 import AppServer -class FabricSyncApp: +class FabricSyncApp(Subprocess): - def __init__(self, fabric_sync_app_path, fabric_admin_app_path, fabric_bridge_app_path, - storage_dir, fabric_name=None, node_id=None, vendor_id=None, - paa_trust_store_path=None, bridge_port=None, bridge_discriminator=None, - bridge_passcode=None): + def __init__(self, fabric_sync_app_path: str, fabric_admin_app_path: str, + fabric_bridge_app_path: str, storage_dir: str, paa_trust_store_path: str = None, + fabric_name: str = None, node_id: int = None, vendor_id: int = None, + bridge_discriminator: int = None, bridge_passcode: int = None, + bridge_port: int = 5540): args = [ f"--app-admin={fabric_admin_app_path}", f"--app-bridge={fabric_bridge_app_path}", @@ -68,45 +70,27 @@ def __init__(self, fabric_sync_app_path, fabric_admin_app_path, fabric_bridge_ap args.append(f"--commissioner-name={fabric_name}") if node_id is not None: args.append(f"--commissioner-node-id={node_id}") - args.append(f"--commissioner-vendor-id={vendor_id}") - args.append(f"--secured-device-port={bridge_port}") - args.append(f"--discriminator={bridge_discriminator}") - args.append(f"--passcode={bridge_passcode}") - - self.fabric_sync_app = Subprocess(fabric_sync_app_path, *args) + if vendor_id is not None: + args.append(f"--commissioner-vendor-id={vendor_id}") + if bridge_port is not None: + args.append(f"--secured-device-port={bridge_port}") + if bridge_discriminator is not None: + args.append(f"--discriminator={bridge_discriminator}") + if bridge_passcode is not None: + args.append(f"--passcode={bridge_passcode}") + # Start the FSA application with dedicated storage and RPC ports. + super().__init__(fabric_sync_app_path, *args) def start(self): # Start process and block until it prints the expected output. - self.fabric_sync_app.start(expected_output="Successfully opened pairing window on the device") - - def terminate(self): - self.fabric_sync_app.terminate() + super().start(expected_output="Successfully opened pairing window on the device") - def commission_on_network(self, node_id, setup_pin_code=None, filter_type=None, filter=None): - self.fabric_sync_app.send( + def commission_on_network(self, node_id: int, setup_pin_code: int, filter_type=None, filter=None): + self.send( f"pairing onnetwork {node_id} {setup_pin_code}", expected_output=f"Commissioning complete for node ID {node_id:#018x}: success") -class AppServer: - - def __init__(self, app, storage_dir, port=None, discriminator=None, passcode=None): - args = [ - "--KVS", tempfile.mkstemp(dir=storage_dir, prefix="kvs-app-")[1], - ] - args.extend(['--secured-device-port', str(port)]) - args.extend(["--discriminator", str(discriminator)]) - args.extend(["--passcode", str(passcode)]) - self.app = Subprocess(app, *args, prefix="[SERVER]") - - def start(self): - # Start process and block until it prints the expected output. - self.app.start(expected_output="Server initialization complete") - - def terminate(self): - self.app.terminate() - - class TC_MCORE_FS_1_4(MatterBaseTest): @property diff --git a/src/python_testing/TC_MCORE_FS_1_5.py b/src/python_testing/TC_MCORE_FS_1_5.py index 22654c994bf41f..2373f66c1cadff 100755 --- a/src/python_testing/TC_MCORE_FS_1_5.py +++ b/src/python_testing/TC_MCORE_FS_1_5.py @@ -22,11 +22,9 @@ import os import queue import secrets -import signal import struct -import subprocess +import tempfile import time -import uuid from dataclasses import dataclass import chip.clusters as Clusters @@ -34,6 +32,7 @@ from ecdsa.curves import NIST256p from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main, type_matches from mobly import asserts +from TC_MCORE_FS_1_1 import AppServer from TC_SC_3_6 import AttributeChangeAccumulator # Length of `w0s` and `w1s` elements @@ -50,7 +49,7 @@ def _generate_verifier(passcode: int, salt: bytes, iterations: int) -> bytes: @dataclass -class _SetupParamters: +class _SetupParameters: setup_qr_code: str manual_code: int discriminator: int @@ -61,50 +60,54 @@ class TC_MCORE_FS_1_5(MatterBaseTest): @async_test_body async def setup_class(self): super().setup_class() + self._partslist_subscription = None self._cadmin_subscription = None - self._app_th_server_process = None - self._th_server_kvs = None + self.th_server = None + self.storage = None + + th_server_port = self.user_params.get("th_server_port", 5543) + th_server_app = self.user_params.get("th_server_app_path", None) + if not th_server_app: + asserts.fail('This test requires a TH_SERVER app. Specify app path with --string-arg th_server_app_path:') + if not os.path.exists(th_server_app): + asserts.fail(f'The path {th_server_app} does not exist') + + # Create a temporary storage directory for keeping KVS files. + self.storage = tempfile.TemporaryDirectory(prefix=self.__class__.__name__) + logging.info("Temporary storage directory: %s", self.storage.name) + + self.th_server_port = th_server_port + # These are default testing values + self.th_server_setup_params = _SetupParameters( + setup_qr_code="MT:-24J0AFN00KA0648G00", + manual_code=34970112332, + discriminator=3840, + passcode=20202021) + + # Start the TH_SERVER_NO_UID app. + self.th_server = AppServer( + th_server_app, + storage_dir=self.storage.name, + port=self.th_server_port, + discriminator=self.th_server_setup_params.discriminator, + passcode=self.th_server_setup_params.passcode) + self.th_server.start() def teardown_class(self): if self._partslist_subscription is not None: self._partslist_subscription.Shutdown() self._partslist_subscription = None - if self._cadmin_subscription is not None: self._cadmin_subscription.Shutdown() self._cadmin_subscription = None - - if self._app_th_server_process is not None: - logging.warning("Stopping app with SIGTERM") - self._app_th_server_process.send_signal(signal.SIGTERM.value) - self._app_th_server_process.wait() - - if self._th_server_kvs is not None: - os.remove(self._th_server_kvs) + if self.th_server is not None: + self.th_server.terminate() + if self.storage is not None: + self.storage.cleanup() super().teardown_class() - async def _create_th_server(self, port): - # These are default testing values - setup_params = _SetupParamters(setup_qr_code="MT:-24J0AFN00KA0648G00", - manual_code=34970112332, discriminator=3840, passcode=20202021) - kvs = f'kvs_{str(uuid.uuid4())}' - - cmd = [self._th_server_app_path] - cmd.extend(['--secured-device-port', str(port)]) - cmd.extend(['--discriminator', str(setup_params.discriminator)]) - cmd.extend(['--passcode', str(setup_params.passcode)]) - cmd.extend(['--KVS', kvs]) - - # TODO: Determine if we want these logs cooked or pushed to somewhere else - logging.info("Starting TH_SERVER") - self._app_th_server_process = subprocess.Popen(cmd) - self._th_server_kvs = kvs - logging.info("Started TH_SERVER") - time.sleep(3) - return setup_params - - def _ask_for_vendor_commissioning_ux_operation(self, setup_params: _SetupParamters): + def _ask_for_vendor_commissioning_ux_operation(self, setup_params: _SetupParameters): self.wait_for_user_input( prompt_msg=f"Using the DUT vendor's provided interface, commission the ICD device using the following parameters:\n" f"- discriminator: {setup_params.discriminator}\n" @@ -139,12 +142,6 @@ async def test_TC_MCORE_FS_1_5(self): min_report_interval_sec = 0 max_report_interval_sec = 30 - th_server_port = self.user_params.get("th_server_port", 5543) - self._th_server_app_path = self.user_params.get("th_server_app_path", None) - if not self._th_server_app_path: - asserts.fail('This test requires a TH_SERVER app. Specify app path with --string-arg th_server_app_path:') - if not os.path.exists(self._th_server_app_path): - asserts.fail(f'The path {self._th_server_app_path} does not exist') self.step(1) # Subscribe to the PartsList @@ -169,8 +166,7 @@ async def test_TC_MCORE_FS_1_5(self): asserts.assert_true(type_matches(step_1_dut_parts_list, list), "PartsList is expected to be a list") self.step(2) - setup_params = await self._create_th_server(th_server_port) - self._ask_for_vendor_commissioning_ux_operation(setup_params) + self._ask_for_vendor_commissioning_ux_operation(self.th_server_setup_params) self.step(3) report_waiting_timeout_delay_sec = 30 diff --git a/src/python_testing/execute_python_tests.py b/src/python_testing/execute_python_tests.py index 1f8621ca2a9b05..f03e31b2e57a50 100644 --- a/src/python_testing/execute_python_tests.py +++ b/src/python_testing/execute_python_tests.py @@ -69,10 +69,7 @@ def main(search_directory, env_file): "TC_OpstateCommon.py", # Shared code for TC_OPSTATE, not a standalone test "TC_pics_checker.py", # Currently isn't enabled because we don't have any examples with conformant PICS "TC_TMP_2_1.py", # src/python_testing/test_testing/test_TC_TMP_2_1.py is the Unit test of this test - "TC_MCORE_FS_1_1.py", # This test requires a TH_SERVER application, hence not ready run with CI "TC_MCORE_FS_1_2.py", # This test requires a TH_SERVER application, hence not ready run with CI - "TC_MCORE_FS_1_3.py", # This test requires a TH_SERVER application, hence not ready run with CI - "TC_MCORE_FS_1_4.py", # This test requires a TH_SERVER application, hence not ready run with CI "TC_MCORE_FS_1_5.py", # This test requires a TH_SERVER application, hence not ready run with CI "TC_OCC_3_1.py", # There are CI issues for the test cases that implements manually controlling sensor device for the occupancy state ON/OFF change "TC_OCC_3_2.py", # There are CI issues for the test cases that implements manually controlling sensor device for the occupancy state ON/OFF change diff --git a/src/python_testing/matter_testing_infrastructure/chip/testing/tasks.py b/src/python_testing/matter_testing_infrastructure/chip/testing/tasks.py index 873cec5e7c234e..a73e73fbeb2bf2 100644 --- a/src/python_testing/matter_testing_infrastructure/chip/testing/tasks.py +++ b/src/python_testing/matter_testing_infrastructure/chip/testing/tasks.py @@ -28,10 +28,7 @@ def forward_f(prefix: bytes, This function can optionally feed received lines to a callback function. """ - while True: - line = f_in.readline() - if not line: - break + while line := f_in.readline(): if cb is not None: cb(line, is_stderr) f_out.buffer.write(prefix) @@ -121,4 +118,6 @@ def send(self, message: str, end: str = "\n", self.expected_output = None def terminate(self): + """Terminate the subprocess and wait for it to finish.""" self.p.terminate() + self.join()