Skip to content

Commit

Permalink
[Fabric-Sync] Run MCORE-FS-1.3 and MCORE-FS-1.4 on CI (project-chip#3…
Browse files Browse the repository at this point in the history
…5402)

* [Fabric-Sync] Run MCORE-FS-1.3 and MCORE-FS-1.4 on CI

* Adopt TC_MCORE_FS_1_1 to run in CI

* Reuse AppServer from TC_MCORE_FS_1_1

* Fix typo

* Reuse AppServer from TC_MCORE_FS_1_1

* Restyled by isort

* Fix TH server app name

* Add json and perfetto tracing

* Do not exit fabric-sync-app before apps are terminated

* Wait for process termination

* Turn off verbose output

---------

Co-authored-by: Restyled.io <[email protected]>
  • Loading branch information
2 people authored and yyzhong-g committed Dec 11, 2024
1 parent b0c489a commit 975b15e
Show file tree
Hide file tree
Showing 8 changed files with 209 additions and 216 deletions.
55 changes: 29 additions & 26 deletions examples/fabric-admin/scripts/fabric-sync-app.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ async def forward_f(prefix: bytes, f_in: asyncio.StreamReader,
This function can optionally feed received lines to a callback function.
"""
while True:
line = await f_in.readline()
if not line:
break
while line := await f_in.readline():
if cb is not None:
cb(line)
f_out.buffer.write(prefix)
Expand Down Expand Up @@ -68,11 +65,7 @@ async def forward_stdin(f_out: asyncio.StreamWriter):
reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(reader)
await loop.connect_read_pipe(lambda: protocol, sys.stdin)
while True:
line = await reader.readline()
if not line:
# Exit on Ctrl-D (EOF).
sys.exit(0)
while line := await reader.readline():
f_out.write(line)
await f_out.drain()

Expand Down Expand Up @@ -206,12 +199,16 @@ async def main(args):
passcode=args.passcode,
))

loop = asyncio.get_event_loop()

def terminate():
admin.terminate()
bridge.terminate()
sys.exit(0)
with contextlib.suppress(ProcessLookupError):
admin.terminate()
with contextlib.suppress(ProcessLookupError):
bridge.terminate()
loop.remove_signal_handler(signal.SIGINT)
loop.remove_signal_handler(signal.SIGTERM)

loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, terminate)
loop.add_signal_handler(signal.SIGTERM, terminate)

Expand All @@ -238,7 +235,8 @@ def terminate():
cmd,
# Wait for the log message indicating that the bridge has been
# added to the fabric.
f"Commissioning complete for node ID {bridge_node_id:#018x}: success")
f"Commissioning complete for node ID {bridge_node_id:#018x}: success",
timeout=30)

# Open commissioning window with original setup code for the bridge.
cw_endpoint_id = 0
Expand All @@ -250,18 +248,23 @@ def terminate():
f" {cw_option} {cw_timeout} {cw_iteration} {cw_discriminator}")

try:
await asyncio.gather(
forward_pipe(pipe, admin.p.stdin) if pipe else forward_stdin(admin.p.stdin),
admin.wait(),
bridge.wait(),
)
except SystemExit:
admin.terminate()
bridge.terminate()
except Exception:
admin.terminate()
bridge.terminate()
raise
forward = forward_pipe(pipe, admin.p.stdin) if pipe else forward_stdin(admin.p.stdin)
# Wait for any of the tasks to complete.
_, pending = await asyncio.wait([
asyncio.create_task(admin.wait()),
asyncio.create_task(bridge.wait()),
asyncio.create_task(forward),
], return_when=asyncio.FIRST_COMPLETED)
# Cancel the remaining tasks.
for task in pending:
task.cancel()
except Exception as e:
print(e, file=sys.stderr)

terminate()
# Make sure that we will not return until both processes are terminated.
await admin.wait()
await bridge.wait()


if __name__ == "__main__":
Expand Down
104 changes: 71 additions & 33 deletions src/python_testing/TC_MCORE_FS_1_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,44 +17,83 @@

# This test requires a TH_SERVER application. Please specify with --string-arg th_server_app_path:<path_to_app>

# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments
# for details about the block below.
#
# === BEGIN CI TEST ARGUMENTS ===
# test-runner-runs: run1
# test-runner-run/run1/app: examples/fabric-admin/scripts/fabric-sync-app.py
# test-runner-run/run1/app-args: --app-admin=${FABRIC_ADMIN_APP} --app-bridge=${FABRIC_BRIDGE_APP} --stdin-pipe=dut-fsa-stdin --discriminator=1234
# test-runner-run/run1/factoryreset: true
# test-runner-run/run1/script-args: --PICS src/app/tests/suites/certification/ci-pics-values --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --string-arg th_server_app_path:${ALL_CLUSTERS_APP} --trace-to json:${TRACE_TEST_JSON}.json --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto
# test-runner-run/run1/script-start-delay: 5
# test-runner-run/run1/quiet: true
# === END CI TEST ARGUMENTS ===

import logging
import os
import random
import signal
import subprocess
import tempfile
import time
import uuid

import chip.clusters as Clusters
from chip import ChipDeviceCtrl
from chip.testing.tasks import Subprocess
from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main
from mobly import asserts


class AppServer(Subprocess):
"""Wrapper class for starting an application server in a subprocess."""

# Prefix for log messages from the application server.
PREFIX = "[SERVER]"

def __init__(self, app: str, storage_dir: str, discriminator: int, passcode: int, port: int = 5540):
storage_kvs_dir = tempfile.mkstemp(dir=storage_dir, prefix="kvs-app-")[1]
# Start the server application with dedicated KVS storage.
super().__init__(app, "--KVS", storage_kvs_dir,
'--secured-device-port', str(port),
"--discriminator", str(discriminator),
"--passcode", str(passcode),
prefix=self.PREFIX)

def start(self):
# Start process and block until it prints the expected output.
super().start(expected_output="Server initialization complete")


class TC_MCORE_FS_1_1(MatterBaseTest):

@async_test_body
async def setup_class(self):
super().setup_class()
self.app_process = None
app = self.user_params.get("th_server_app_path", None)
if not app:
asserts.fail('This test requires a TH_SERVER app. Specify app path with --string-arg th_server_app_path:<path_to_app>')

self.kvs = f'kvs_{str(uuid.uuid4())}'
self.port = 5543
discriminator = random.randint(0, 4095)
passcode = 20202021
cmd = [app]
cmd.extend(['--secured-device-port', str(5543)])
cmd.extend(['--discriminator', str(discriminator)])
cmd.extend(['--passcode', str(passcode)])
cmd.extend(['--KVS', self.kvs])
# TODO: Determine if we want these logs cooked or pushed to somewhere else
logging.info("Starting application to acts mock a server portion of TH_FSA")
self.app_process = subprocess.Popen(cmd)
logging.info("Started application to acts mock a server portion of TH_FSA")
time.sleep(3)

self.th_server = None
self.storage = None

th_server_app = self.user_params.get("th_server_app_path", None)
if not th_server_app:
asserts.fail("This test requires a TH_SERVER app. Specify app path with --string-arg th_server_app_path:<path_to_app>")
if not os.path.exists(th_server_app):
asserts.fail(f"The path {th_server_app} does not exist")

# Create a temporary storage directory for keeping KVS files.
self.storage = tempfile.TemporaryDirectory(prefix=self.__class__.__name__)
logging.info("Temporary storage directory: %s", self.storage.name)

self.th_server_port = 5543
self.th_server_discriminator = random.randint(0, 4095)
self.th_server_passcode = 20202021

# Start the TH_SERVER_NO_UID app.
self.th_server = AppServer(
th_server_app,
storage_dir=self.storage.name,
port=self.th_server_port,
discriminator=self.th_server_discriminator,
passcode=self.th_server_passcode)
self.th_server.start()

logging.info("Commissioning from separate fabric")
# Create a second controller on a new fabric to communicate to the server
Expand All @@ -63,25 +102,24 @@ async def setup_class(self):
paa_path = str(self.matter_test_config.paa_trust_store_path)
self.TH_server_controller = new_fabric_admin.NewController(nodeId=112233, paaTrustStorePath=paa_path)
self.server_nodeid = 1111
await self.TH_server_controller.CommissionOnNetwork(nodeId=self.server_nodeid, setupPinCode=passcode, filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=discriminator)
await self.TH_server_controller.CommissionOnNetwork(
nodeId=self.server_nodeid,
setupPinCode=self.th_server_passcode,
filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR,
filter=self.th_server_discriminator)
logging.info("Commissioning TH_SERVER complete")

def teardown_class(self):
# In case the th_server_app_path does not exist, then we failed the test
# and there is nothing to remove
if self.app_process is not None:
logging.warning("Stopping app with SIGTERM")
self.app_process.send_signal(signal.SIGTERM.value)
self.app_process.wait()

if os.path.exists(self.kvs):
os.remove(self.kvs)
if self.th_server is not None:
self.th_server.terminate()
if self.storage is not None:
self.storage.cleanup()
super().teardown_class()

def steps_TC_MCORE_FS_1_1(self) -> list[TestStep]:
steps = [TestStep(1, "Enable Fabric Synchronization on DUT_FSA using the manufacturer specified mechanism.", is_commissioning=True),
TestStep(2, "Commission DUT_FSA onto TH_FSA fabric."),
TestStep(3, "Reverse Commision Commission TH_FSAs onto DUT_FSA fabric."),
TestStep(3, "Reverse Commission TH_FSAs onto DUT_FSA fabric."),
TestStep("3a", "TH_FSA sends RequestCommissioningApproval"),
TestStep("3b", "TH_FSA sends CommissionNode"),
TestStep("3c", "DUT_FSA commissions TH_FSA")]
Expand Down
83 changes: 39 additions & 44 deletions src/python_testing/TC_MCORE_FS_1_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,17 @@
import os
import queue
import secrets
import signal
import struct
import subprocess
import tempfile
import time
import uuid
from dataclasses import dataclass

import chip.clusters as Clusters
from chip import ChipDeviceCtrl
from ecdsa.curves import NIST256p
from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main, type_matches
from mobly import asserts
from TC_MCORE_FS_1_1 import AppServer
from TC_SC_3_6 import AttributeChangeAccumulator

# Length of `w0s` and `w1s` elements
Expand All @@ -52,7 +51,7 @@ def _generate_verifier(passcode: int, salt: bytes, iterations: int) -> bytes:


@dataclass
class _SetupParamters:
class _SetupParameters:
setup_qr_code: str
manual_code: int
discriminator: int
Expand All @@ -63,45 +62,49 @@ class TC_MCORE_FS_1_2(MatterBaseTest):
@async_test_body
async def setup_class(self):
super().setup_class()

self._partslist_subscription = None
self._app_th_server_process = None
self._th_server_kvs = None
self.th_server = None
self.storage = None

th_server_port = self.user_params.get("th_server_port", 5543)
th_server_app = self.user_params.get("th_server_app_path", None)
if not th_server_app:
asserts.fail('This test requires a TH_SERVER app. Specify app path with --string-arg th_server_app_path:<path_to_app>')
if not os.path.exists(th_server_app):
asserts.fail(f'The path {th_server_app} does not exist')

# Create a temporary storage directory for keeping KVS files.
self.storage = tempfile.TemporaryDirectory(prefix=self.__class__.__name__)
logging.info("Temporary storage directory: %s", self.storage.name)

self.th_server_port = th_server_port
self.th_server_setup_params = _SetupParameters(
setup_qr_code="MT:-24J0AFN00KA0648G00",
manual_code=34970112332,
discriminator=3840,
passcode=20202021)

# Start the TH_SERVER_NO_UID app.
self.th_server = AppServer(
th_server_app,
storage_dir=self.storage.name,
port=self.th_server_port,
discriminator=self.th_server_setup_params.discriminator,
passcode=self.th_server_setup_params.passcode)
self.th_server.start()

def teardown_class(self):
if self._partslist_subscription is not None:
self._partslist_subscription.Shutdown()
self._partslist_subscription = None

if self._app_th_server_process is not None:
logging.warning("Stopping app with SIGTERM")
self._app_th_server_process.send_signal(signal.SIGTERM.value)
self._app_th_server_process.wait()

if self._th_server_kvs is not None:
os.remove(self._th_server_kvs)
if self.th_server is not None:
self.th_server.terminate()
if self.storage is not None:
self.storage.cleanup()
super().teardown_class()

async def _create_th_server(self, port):
# These are default testing values
setup_params = _SetupParamters(setup_qr_code="MT:-24J0AFN00KA0648G00",
manual_code=34970112332, discriminator=3840, passcode=20202021)
kvs = f'kvs_{str(uuid.uuid4())}'

cmd = [self._th_server_app_path]
cmd.extend(['--secured-device-port', str(port)])
cmd.extend(['--discriminator', str(setup_params.discriminator)])
cmd.extend(['--passcode', str(setup_params.passcode)])
cmd.extend(['--KVS', kvs])

# TODO: Determine if we want these logs cooked or pushed to somewhere else
logging.info("Starting TH_SERVER")
self._app_th_server_process = subprocess.Popen(cmd)
self._th_server_kvs = kvs
logging.info("Started TH_SERVER")
time.sleep(3)
return setup_params

def _ask_for_vendor_commissioning_ux_operation(self, setup_params: _SetupParamters):
def _ask_for_vendor_commissioning_ux_operation(self, setup_params: _SetupParameters):
self.wait_for_user_input(
prompt_msg=f"Using the DUT vendor's provided interface, commission the ICD device using the following parameters:\n"
f"- discriminator: {setup_params.discriminator}\n"
Expand All @@ -115,7 +118,6 @@ def steps_TC_MCORE_FS_1_2(self) -> list[TestStep]:
steps = [TestStep(1, "TH subscribes to PartsList attribute of the Descriptor cluster of DUT_FSA endpoint 0."),
TestStep(2, "Follow manufacturer provided instructions to have DUT_FSA commission TH_SERVER"),
TestStep(3, "TH waits up to 30 seconds for subscription report from the PartsList attribute of the Descriptor to contain new endpoint"),

TestStep(4, "TH uses DUT to open commissioning window to TH_SERVER"),
TestStep(5, "TH commissions TH_SERVER"),
TestStep(6, "TH reads all attributes in Basic Information cluster from TH_SERVER directly"),
Expand All @@ -134,12 +136,6 @@ async def test_TC_MCORE_FS_1_2(self):

min_report_interval_sec = self.user_params.get("min_report_interval_sec", 0)
max_report_interval_sec = self.user_params.get("max_report_interval_sec", 30)
th_server_port = self.user_params.get("th_server_port", 5543)
self._th_server_app_path = self.user_params.get("th_server_app_path", None)
if not self._th_server_app_path:
asserts.fail('This test requires a TH_SERVER app. Specify app path with --string-arg th_server_app_path:<path_to_app>')
if not os.path.exists(self._th_server_app_path):
asserts.fail(f'The path {self._th_server_app_path} does not exist')

self.step(1)
# Subscribe to the PartsList
Expand All @@ -164,8 +160,7 @@ async def test_TC_MCORE_FS_1_2(self):
asserts.assert_true(type_matches(step_1_dut_parts_list, list), "PartsList is expected to be a list")

self.step(2)
setup_params = await self._create_th_server(th_server_port)
self._ask_for_vendor_commissioning_ux_operation(setup_params)
self._ask_for_vendor_commissioning_ux_operation(self.th_server_setup_params)

self.step(3)
report_waiting_timeout_delay_sec = 30
Expand Down
Loading

0 comments on commit 975b15e

Please sign in to comment.