Skip to content

Commit

Permalink
python testing: Support QR / manual codes for commissioning (#28479)
Browse files Browse the repository at this point in the history
* Support QR / manual codes for commissioning

* Remove now unused imports
  • Loading branch information
cecille authored and pull[bot] committed Nov 30, 2023
1 parent 2980c0f commit 076f543
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 40 deletions.
31 changes: 3 additions & 28 deletions src/python_testing/TC_DeviceBasicComposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@

import chip.clusters as Clusters
import chip.tlv
from chip import discovery
from chip.clusters.Attribute import ValueDecodeFailure
from chip.exceptions import ChipStackError
from chip.setup_payload import SetupPayload
from matter_testing_support import AttributePathLocation, MatterBaseTest, async_test_body, default_matter_test_main
from mobly import asserts

Expand Down Expand Up @@ -171,32 +168,10 @@ async def setup_class(self):
dump_device_composition_path: Optional[str] = self.user_params.get("dump_device_composition_path", None)

if do_test_over_pase:
if self.matter_test_config.qr_code_content is not None:
qr_code = self.matter_test_config.qr_code_content
try:
setup_payload = SetupPayload().ParseQrCode(qr_code)
except ChipStackError:
asserts.fail(f"QR code '{qr_code} failed to parse properly as a Matter setup code.")

elif self.matter_test_config.manual_code is not None:
manual_code = self.matter_test_config.manual_code
try:
setup_payload = SetupPayload().ParseManualPairingCode(manual_code)
except ChipStackError:
asserts.fail(
f"Manual code code '{manual_code}' failed to parse properly as a Matter setup code. Check that all digits are correct and length is 11 or 21 characters.")
else:
asserts.fail("Require either --qr-code or --manual-code to proceed with PASE needed for test.")

if setup_payload.short_discriminator is not None:
filter_type = discovery.FilterType.SHORT_DISCRIMINATOR
filter_value = setup_payload.short_discriminator
else:
filter_type = discovery.FilterType.LONG_DISCRIMINATOR
filter_value = setup_payload.long_discriminator
info = self.get_setup_payload_info()

commissionable_nodes = dev_ctrl.DiscoverCommissionableNodes(
filter_type, filter_value, stopOnFirst=True, timeoutSecond=15)
info.filter_type, info.filter_value, stopOnFirst=True, timeoutSecond=15)
logging.info(f"Commissionable nodes: {commissionable_nodes}")
# TODO: Support BLE
if commissionable_nodes is not None and len(commissionable_nodes) > 0:
Expand All @@ -208,7 +183,7 @@ async def setup_class(self):
logging.info(f"Found instance {instance_name}, VID={vid}, PID={pid}, Address={address}")

node_id = 1
dev_ctrl.EstablishPASESessionIP(address, setup_payload.setup_passcode, node_id)
dev_ctrl.EstablishPASESessionIP(address, info.passcode, node_id)
else:
asserts.fail("Failed to find the DUT according to command line arguments.")
else:
Expand Down
74 changes: 62 additions & 12 deletions src/python_testing/matter_testing_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,12 @@
import chip.clusters as Clusters
import chip.logging
import chip.native
from chip import discovery
from chip.ChipStack import ChipStack
from chip.clusters.Attribute import EventReadResult, SubscriptionTransaction
from chip.exceptions import ChipStackError
from chip.interaction_model import InteractionModelError, Status
from chip.setup_payload import SetupPayload
from chip.storage import PersistentStorage
from chip.tracing import TracingContext
from mobly import asserts, base_test, signals, utils
Expand Down Expand Up @@ -350,6 +353,13 @@ class ProblemNotice:
spec_location: str = ""


@dataclass
class SetupPayloadInfo:
filter_type: discovery.FilterType = discovery.FilterType.LONG_DISCRIMINATOR
filter_value: int = 0
passcode: int = 0


class MatterStackState:
def __init__(self, config: MatterTestConfig):
self._logger = logger
Expand Down Expand Up @@ -550,6 +560,35 @@ def record_warning(self, test_name: str, location: Union[AttributePathLocation,
def record_note(self, test_name: str, location: Union[AttributePathLocation, EventPathLocation, CommandPathLocation], problem: str, spec_location: str = ""):
self.problems.append(ProblemNotice(test_name, location, ProblemSeverity.NOTE, problem, spec_location))

def get_setup_payload_info(self) -> SetupPayloadInfo:
if self.matter_test_config.qr_code_content is not None:
qr_code = self.matter_test_config.qr_code_content
try:
setup_payload = SetupPayload().ParseQrCode(qr_code)
except ChipStackError:
asserts.fail(f"QR code '{qr_code} failed to parse properly as a Matter setup code.")

elif self.matter_test_config.manual_code is not None:
manual_code = self.matter_test_config.manual_code
try:
setup_payload = SetupPayload().ParseManualPairingCode(manual_code)
except ChipStackError:
asserts.fail(
f"Manual code code '{manual_code}' failed to parse properly as a Matter setup code. Check that all digits are correct and length is 11 or 21 characters.")
else:
asserts.fail("Require either --qr-code or --manual-code.")

info = SetupPayloadInfo()
info.passcode = setup_payload.setup_passcode
if setup_payload.short_discriminator is not None:
info.filter_type = discovery.FilterType.SHORT_DISCRIMINATOR
info.filter_value = setup_payload.short_discriminator
else:
info.filter_type = discovery.FilterType.LONG_DISCRIMINATOR
info.filter_value = setup_payload.long_discriminator

return info


def generate_mobly_test_config(matter_test_config: MatterTestConfig):
test_run_config = TestRunConfig()
Expand Down Expand Up @@ -755,12 +794,15 @@ def populate_commissioning_args(args: argparse.Namespace, config: MatterTestConf
print("error: supplied number of discriminators does not match number of passcodes")
return False

if len(config.dut_node_ids) > len(config.discriminators):
device_descriptors = [config.qr_code_content] if config.qr_code_content is not None else [
config.manual_code] if config.manual_code is not None else config.discriminators

if len(config.dut_node_ids) > len(device_descriptors):
print("error: More node IDs provided than discriminators")
return False

if len(config.dut_node_ids) < len(config.discriminators):
missing = len(config.discriminators) - len(config.dut_node_ids)
if len(config.dut_node_ids) < len(device_descriptors):
missing = len(device_descriptors) - len(config.dut_node_ids)
# We generate new node IDs sequentially from the last one seen for all
# missing NodeIDs when commissioning many nodes at once.
for i in range(missing):
Expand Down Expand Up @@ -985,35 +1027,43 @@ def _commission_device(self, i) -> bool:
dev_ctrl = self.default_controller
conf = self.matter_test_config

# TODO: support by manual code and QR
# TODO: qr code and manual code aren't lists

if conf.qr_code_content or conf.manual_code:
info = self.get_setup_payload_info()
else:
info = SetupPayloadInfo()
info.passcode = conf.setup_passcodes[i]
info.filter_type = DiscoveryFilterType.LONG_DISCRIMINATOR
info.filter_value = conf.discriminators[i]

if conf.commissioning_method == "on-network":
return dev_ctrl.CommissionOnNetwork(
nodeId=conf.dut_node_ids[i],
setupPinCode=conf.setup_passcodes[i],
filterType=DiscoveryFilterType.LONG_DISCRIMINATOR,
filter=conf.discriminators[i]
setupPinCode=info.passcode,
filterType=info.filter_type,
filter=info.filter_value
)
elif conf.commissioning_method == "ble-wifi":
return dev_ctrl.CommissionWiFi(
conf.discriminators[i],
conf.setup_passcodes[i],
info.filter_value,
info.passcode,
conf.dut_node_ids[i],
conf.wifi_ssid,
conf.wifi_passphrase
)
elif conf.commissioning_method == "ble-thread":
return dev_ctrl.CommissionThread(
conf.discriminators[i],
conf.setup_passcodes[i],
info.filter_value,
info.passcode,
conf.dut_node_ids[i],
conf.thread_operational_dataset
)
elif conf.commissioning_method == "on-network-ip":
logging.warning("==== USING A DIRECT IP COMMISSIONING METHOD NOT SUPPORTED IN THE LONG TERM ====")
return dev_ctrl.CommissionIP(
ipaddr=conf.commissionee_ip_address_just_for_testing,
setupPinCode=conf.setup_passcodes[i], nodeid=conf.dut_node_ids[i]
setupPinCode=info.passcode, nodeid=conf.dut_node_ids[i]
)
else:
raise ValueError("Invalid commissioning method %s!" % conf.commissioning_method)
Expand Down

0 comments on commit 076f543

Please sign in to comment.