diff --git a/src/python_testing/TC_DeviceBasicComposition.py b/src/python_testing/TC_DeviceBasicComposition.py index 295a15cc3c559a..67f5bc4f952b8c 100644 --- a/src/python_testing/TC_DeviceBasicComposition.py +++ b/src/python_testing/TC_DeviceBasicComposition.py @@ -27,10 +27,7 @@ import chip.clusters as Clusters import chip.tlv -from chip import discovery from chip.clusters.Attribute import ValueDecodeFailure -from chip.exceptions import ChipStackError -from chip.setup_payload import SetupPayload from matter_testing_support import AttributePathLocation, MatterBaseTest, async_test_body, default_matter_test_main from mobly import asserts @@ -171,32 +168,10 @@ async def setup_class(self): dump_device_composition_path: Optional[str] = self.user_params.get("dump_device_composition_path", None) if do_test_over_pase: - if self.matter_test_config.qr_code_content is not None: - qr_code = self.matter_test_config.qr_code_content - try: - setup_payload = SetupPayload().ParseQrCode(qr_code) - except ChipStackError: - asserts.fail(f"QR code '{qr_code} failed to parse properly as a Matter setup code.") - - elif self.matter_test_config.manual_code is not None: - manual_code = self.matter_test_config.manual_code - try: - setup_payload = SetupPayload().ParseManualPairingCode(manual_code) - except ChipStackError: - asserts.fail( - f"Manual code code '{manual_code}' failed to parse properly as a Matter setup code. Check that all digits are correct and length is 11 or 21 characters.") - else: - asserts.fail("Require either --qr-code or --manual-code to proceed with PASE needed for test.") - - if setup_payload.short_discriminator is not None: - filter_type = discovery.FilterType.SHORT_DISCRIMINATOR - filter_value = setup_payload.short_discriminator - else: - filter_type = discovery.FilterType.LONG_DISCRIMINATOR - filter_value = setup_payload.long_discriminator + info = self.get_setup_payload_info() commissionable_nodes = dev_ctrl.DiscoverCommissionableNodes( - filter_type, filter_value, stopOnFirst=True, timeoutSecond=15) + info.filter_type, info.filter_value, stopOnFirst=True, timeoutSecond=15) logging.info(f"Commissionable nodes: {commissionable_nodes}") # TODO: Support BLE if commissionable_nodes is not None and len(commissionable_nodes) > 0: @@ -208,7 +183,7 @@ async def setup_class(self): logging.info(f"Found instance {instance_name}, VID={vid}, PID={pid}, Address={address}") node_id = 1 - dev_ctrl.EstablishPASESessionIP(address, setup_payload.setup_passcode, node_id) + dev_ctrl.EstablishPASESessionIP(address, info.passcode, node_id) else: asserts.fail("Failed to find the DUT according to command line arguments.") else: diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py index 008848177b3271..a394952445de60 100644 --- a/src/python_testing/matter_testing_support.py +++ b/src/python_testing/matter_testing_support.py @@ -47,9 +47,12 @@ import chip.clusters as Clusters import chip.logging import chip.native +from chip import discovery from chip.ChipStack import ChipStack from chip.clusters.Attribute import EventReadResult, SubscriptionTransaction +from chip.exceptions import ChipStackError from chip.interaction_model import InteractionModelError, Status +from chip.setup_payload import SetupPayload from chip.storage import PersistentStorage from chip.tracing import TracingContext from mobly import asserts, base_test, signals, utils @@ -350,6 +353,13 @@ class ProblemNotice: spec_location: str = "" +@dataclass +class SetupPayloadInfo: + filter_type: discovery.FilterType = discovery.FilterType.LONG_DISCRIMINATOR + filter_value: int = 0 + passcode: int = 0 + + class MatterStackState: def __init__(self, config: MatterTestConfig): self._logger = logger @@ -550,6 +560,35 @@ def record_warning(self, test_name: str, location: Union[AttributePathLocation, def record_note(self, test_name: str, location: Union[AttributePathLocation, EventPathLocation, CommandPathLocation], problem: str, spec_location: str = ""): self.problems.append(ProblemNotice(test_name, location, ProblemSeverity.NOTE, problem, spec_location)) + def get_setup_payload_info(self) -> SetupPayloadInfo: + if self.matter_test_config.qr_code_content is not None: + qr_code = self.matter_test_config.qr_code_content + try: + setup_payload = SetupPayload().ParseQrCode(qr_code) + except ChipStackError: + asserts.fail(f"QR code '{qr_code} failed to parse properly as a Matter setup code.") + + elif self.matter_test_config.manual_code is not None: + manual_code = self.matter_test_config.manual_code + try: + setup_payload = SetupPayload().ParseManualPairingCode(manual_code) + except ChipStackError: + asserts.fail( + f"Manual code code '{manual_code}' failed to parse properly as a Matter setup code. Check that all digits are correct and length is 11 or 21 characters.") + else: + asserts.fail("Require either --qr-code or --manual-code.") + + info = SetupPayloadInfo() + info.passcode = setup_payload.setup_passcode + if setup_payload.short_discriminator is not None: + info.filter_type = discovery.FilterType.SHORT_DISCRIMINATOR + info.filter_value = setup_payload.short_discriminator + else: + info.filter_type = discovery.FilterType.LONG_DISCRIMINATOR + info.filter_value = setup_payload.long_discriminator + + return info + def generate_mobly_test_config(matter_test_config: MatterTestConfig): test_run_config = TestRunConfig() @@ -755,12 +794,15 @@ def populate_commissioning_args(args: argparse.Namespace, config: MatterTestConf print("error: supplied number of discriminators does not match number of passcodes") return False - if len(config.dut_node_ids) > len(config.discriminators): + device_descriptors = [config.qr_code_content] if config.qr_code_content is not None else [ + config.manual_code] if config.manual_code is not None else config.discriminators + + if len(config.dut_node_ids) > len(device_descriptors): print("error: More node IDs provided than discriminators") return False - if len(config.dut_node_ids) < len(config.discriminators): - missing = len(config.discriminators) - len(config.dut_node_ids) + if len(config.dut_node_ids) < len(device_descriptors): + missing = len(device_descriptors) - len(config.dut_node_ids) # We generate new node IDs sequentially from the last one seen for all # missing NodeIDs when commissioning many nodes at once. for i in range(missing): @@ -985,27 +1027,35 @@ def _commission_device(self, i) -> bool: dev_ctrl = self.default_controller conf = self.matter_test_config - # TODO: support by manual code and QR + # TODO: qr code and manual code aren't lists + + if conf.qr_code_content or conf.manual_code: + info = self.get_setup_payload_info() + else: + info = SetupPayloadInfo() + info.passcode = conf.setup_passcodes[i] + info.filter_type = DiscoveryFilterType.LONG_DISCRIMINATOR + info.filter_value = conf.discriminators[i] if conf.commissioning_method == "on-network": return dev_ctrl.CommissionOnNetwork( nodeId=conf.dut_node_ids[i], - setupPinCode=conf.setup_passcodes[i], - filterType=DiscoveryFilterType.LONG_DISCRIMINATOR, - filter=conf.discriminators[i] + setupPinCode=info.passcode, + filterType=info.filter_type, + filter=info.filter_value ) elif conf.commissioning_method == "ble-wifi": return dev_ctrl.CommissionWiFi( - conf.discriminators[i], - conf.setup_passcodes[i], + info.filter_value, + info.passcode, conf.dut_node_ids[i], conf.wifi_ssid, conf.wifi_passphrase ) elif conf.commissioning_method == "ble-thread": return dev_ctrl.CommissionThread( - conf.discriminators[i], - conf.setup_passcodes[i], + info.filter_value, + info.passcode, conf.dut_node_ids[i], conf.thread_operational_dataset ) @@ -1013,7 +1063,7 @@ def _commission_device(self, i) -> bool: logging.warning("==== USING A DIRECT IP COMMISSIONING METHOD NOT SUPPORTED IN THE LONG TERM ====") return dev_ctrl.CommissionIP( ipaddr=conf.commissionee_ip_address_just_for_testing, - setupPinCode=conf.setup_passcodes[i], nodeid=conf.dut_node_ids[i] + setupPinCode=info.passcode, nodeid=conf.dut_node_ids[i] ) else: raise ValueError("Invalid commissioning method %s!" % conf.commissioning_method)