diff --git a/scripts/constraints.txt b/scripts/constraints.txt index e0d1d1323d4f36..91885154e23d14 100644 --- a/scripts/constraints.txt +++ b/scripts/constraints.txt @@ -153,7 +153,7 @@ mbed-ls==1.8.11 ; platform_machine != "aarch64" and sys_platform == "linux" # via -r requirements.mbed.txt mbed-os-tools==1.8.13 # via mbed-ls -mobly==1.10.1 +mobly==1.11.1 # via -r requirements.txt numpy==1.23.0 # via pandas diff --git a/src/controller/python/BUILD.gn b/src/controller/python/BUILD.gn index 3baf852afba26c..ab84ff86c864cf 100644 --- a/src/controller/python/BUILD.gn +++ b/src/controller/python/BUILD.gn @@ -55,6 +55,8 @@ shared_library("ChipDeviceCtrl") { "ChipDeviceController-ScriptBinding.cpp", "ChipDeviceController-ScriptDevicePairingDelegate.cpp", "ChipDeviceController-ScriptDevicePairingDelegate.h", + "ChipDeviceController-ScriptPairingDeviceDiscoveryDelegate.cpp", + "ChipDeviceController-ScriptPairingDeviceDiscoveryDelegate.h", "ChipDeviceController-StorageDelegate.cpp", "ChipDeviceController-StorageDelegate.h", "OpCredsBinding.cpp", @@ -281,6 +283,12 @@ chip_python_wheel_action("chip-core") { "pyyaml", "ipdb", "deprecation", + "mobly", + + # Crypto libraries for complex tests and internal Python controller usage + "cryptography", + "pycrypto", + "ecdsa", ] if (current_os == "mac") { @@ -397,6 +405,7 @@ chip_python_wheel_action("chip-repl") { "ipython!=8.1.0", "rich", "ipykernel", + "mobly", ] py_package_name = "chip-repl" diff --git a/src/controller/python/ChipDeviceController-ScriptBinding.cpp b/src/controller/python/ChipDeviceController-ScriptBinding.cpp index 9c90625e644885..96a4a51d76209b 100644 --- a/src/controller/python/ChipDeviceController-ScriptBinding.cpp +++ b/src/controller/python/ChipDeviceController-ScriptBinding.cpp @@ -51,6 +51,7 @@ #include #include +#include #include #include @@ -98,6 +99,7 @@ chip::Controller::CommissioningParameters sCommissioningParameters; } // namespace chip::Controller::ScriptDevicePairingDelegate sPairingDelegate; +chip::Controller::ScriptPairingDeviceDiscoveryDelegate sPairingDeviceDiscoveryDelegate; chip::Controller::Python::StorageAdapter * sStorageAdapter = nullptr; chip::Credentials::GroupDataProviderImpl sGroupDataProvider; chip::Credentials::PersistentStorageOpCertStore sPersistentStorageOpCertStore; @@ -152,6 +154,11 @@ ChipError::StorageType pychip_DeviceController_DiscoverCommissionableNodesDevice uint16_t device_type); ChipError::StorageType pychip_DeviceController_DiscoverCommissionableNodesCommissioningEnabled(chip::Controller::DeviceCommissioner * devCtrl); + +ChipError::StorageType pychip_DeviceController_OnNetworkCommission(chip::Controller::DeviceCommissioner * devCtrl, uint64_t nodeId, + uint32_t setupPasscode, const uint8_t filterType, + const char * filterParam); + ChipError::StorageType pychip_DeviceController_PostTaskOnChipThread(ChipThreadTaskRunnerFunct callback, void * pythonContext); ChipError::StorageType pychip_DeviceController_OpenCommissioningWindow(chip::Controller::DeviceCommissioner * devCtrl, @@ -381,6 +388,48 @@ ChipError::StorageType pychip_DeviceController_ConnectWithCode(chip::Controller: return devCtrl->PairDevice(nodeid, onboardingPayload, sCommissioningParameters).AsInteger(); } +ChipError::StorageType pychip_DeviceController_OnNetworkCommission(chip::Controller::DeviceCommissioner * devCtrl, uint64_t nodeId, + uint32_t setupPasscode, const uint8_t filterType, + const char * filterParam) +{ + Dnssd::DiscoveryFilter filter(static_cast(filterType)); + switch (static_cast(filterType)) + { + case chip::Dnssd::DiscoveryFilterType::kNone: + break; + case chip::Dnssd::DiscoveryFilterType::kShortDiscriminator: + case chip::Dnssd::DiscoveryFilterType::kLongDiscriminator: + case chip::Dnssd::DiscoveryFilterType::kCompressedFabricId: + case chip::Dnssd::DiscoveryFilterType::kVendorId: + case chip::Dnssd::DiscoveryFilterType::kDeviceType: { + // For any numerical filter, convert the string to a filter value + errno = 0; + unsigned long long int numericalArg = strtoull(filterParam, nullptr, 0); + if ((numericalArg == ULLONG_MAX) && (errno == ERANGE)) + { + return CHIP_ERROR_INVALID_ARGUMENT.AsInteger(); + } + filter.code = static_cast(numericalArg); + break; + } + case chip::Dnssd::DiscoveryFilterType::kCommissioningMode: + break; + case chip::Dnssd::DiscoveryFilterType::kCommissioner: + filter.code = 1; + break; + case chip::Dnssd::DiscoveryFilterType::kInstanceName: + filter.code = 0; + filter.instanceName = filterParam; + break; + default: + return CHIP_ERROR_INVALID_ARGUMENT.AsInteger(); + } + + sPairingDeviceDiscoveryDelegate.Init(nodeId, setupPasscode, sCommissioningParameters, &sPairingDelegate, devCtrl); + devCtrl->RegisterDeviceDiscoveryDelegate(&sPairingDeviceDiscoveryDelegate); + return devCtrl->DiscoverCommissionableNodes(filter).AsInteger(); +} + ChipError::StorageType pychip_DeviceController_SetThreadOperationalDataset(const char * threadOperationalDataset, uint32_t size) { ReturnErrorCodeIf(!sThreadBuf.Alloc(size), CHIP_ERROR_NO_MEMORY.AsInteger()); diff --git a/src/controller/python/ChipDeviceController-ScriptPairingDeviceDiscoveryDelegate.cpp b/src/controller/python/ChipDeviceController-ScriptPairingDeviceDiscoveryDelegate.cpp new file mode 100644 index 00000000000000..1752232b786167 --- /dev/null +++ b/src/controller/python/ChipDeviceController-ScriptPairingDeviceDiscoveryDelegate.cpp @@ -0,0 +1,52 @@ +/* + * + * Copyright (c) 2022 Project CHIP Authors + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ChipDeviceController-ScriptPairingDeviceDiscoveryDelegate.h" + +namespace chip { +namespace Controller { + +void ScriptPairingDeviceDiscoveryDelegate::OnDiscoveredDevice(const Dnssd::DiscoveredNodeData & nodeData) +{ + // Ignore nodes with closed comissioning window + VerifyOrReturn(nodeData.commissionData.commissioningMode != 0); + VerifyOrReturn(mActiveDeviceCommissioner != nullptr); + + const uint16_t port = nodeData.resolutionData.port; + char buf[chip::Inet::IPAddress::kMaxStringLength]; + nodeData.resolutionData.ipAddress[0].ToString(buf); + ChipLogProgress(chipTool, "Discovered Device: %s:%u", buf, port); + + // Stop Mdns discovery. + mActiveDeviceCommissioner->RegisterDeviceDiscoveryDelegate(nullptr); + + Inet::InterfaceId interfaceId = + nodeData.resolutionData.ipAddress[0].IsIPv6LinkLocal() ? nodeData.resolutionData.interfaceId : Inet::InterfaceId::Null(); + PeerAddress peerAddress = PeerAddress::UDP(nodeData.resolutionData.ipAddress[0], port, interfaceId); + + RendezvousParameters keyExchangeParams = RendezvousParameters().SetSetupPINCode(mSetupPasscode).SetPeerAddress(peerAddress); + + CHIP_ERROR err = mActiveDeviceCommissioner->PairDevice(mNodeId, keyExchangeParams, mParams); + if (err != CHIP_NO_ERROR) + { + VerifyOrReturn(mPairingDelegate != nullptr); + mPairingDelegate->OnCommissioningComplete(mNodeId, err); + } +} +} // namespace Controller +} // namespace chip diff --git a/src/controller/python/ChipDeviceController-ScriptPairingDeviceDiscoveryDelegate.h b/src/controller/python/ChipDeviceController-ScriptPairingDeviceDiscoveryDelegate.h new file mode 100644 index 00000000000000..d829f261aa8701 --- /dev/null +++ b/src/controller/python/ChipDeviceController-ScriptPairingDeviceDiscoveryDelegate.h @@ -0,0 +1,52 @@ +/* + * + * Copyright (c) 2022 Project CHIP Authors + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "ChipDeviceController-ScriptDevicePairingDelegate.h" + +#include + +namespace chip { +namespace Controller { + +class ScriptPairingDeviceDiscoveryDelegate : public DeviceDiscoveryDelegate +{ +public: + void Init(NodeId nodeId, uint32_t setupPasscode, CommissioningParameters commissioningParams, + ScriptDevicePairingDelegate * pairingDelegate, DeviceCommissioner * activeDeviceCommissioner) + { + mNodeId = nodeId; + mSetupPasscode = setupPasscode; + mParams = commissioningParams; + mPairingDelegate = pairingDelegate; + mActiveDeviceCommissioner = activeDeviceCommissioner; + } + void OnDiscoveredDevice(const Dnssd::DiscoveredNodeData & nodeData); + +private: + ScriptDevicePairingDelegate * mPairingDelegate; + DeviceCommissioner * mActiveDeviceCommissioner = nullptr; + + CommissioningParameters mParams; + NodeId mNodeId; + uint32_t mSetupPasscode; +}; + +} // namespace Controller +} // namespace chip diff --git a/src/controller/python/chip/ChipDeviceCtrl.py b/src/controller/python/chip/ChipDeviceCtrl.py index 5c10070dc20835..089fdae966f943 100644 --- a/src/controller/python/chip/ChipDeviceCtrl.py +++ b/src/controller/python/chip/ChipDeviceCtrl.py @@ -117,6 +117,19 @@ class DCState(enum.IntEnum): COMMISSIONING = 5 +class DiscoveryFilterType(enum.IntEnum): + # These must match chip::Dnssd::DiscoveryFilterType values (barring the naming convention) + NONE = 0 + SHORT_DISCRIMINATOR = 1 + LONG_DISCRIMINATOR = 2 + VENDOR_ID = 3 + DEVICE_TYPE = 4 + COMMISSIONING_MODE = 5 + INSTANCE_NAME = 6 + COMMISSIONER = 7 + COMPRESSED_FABRIC_ID = 8 + + class ChipDeviceController(): activeList = set() @@ -150,6 +163,17 @@ def __init__(self, opCredsContext: ctypes.c_void_p, fabricId: int, nodeId: int, def GetNodeId(self): return self.nodeId + def HandleCommissioningComplete(nodeid, err): + if err != 0: + print("Failed to commission: {}".format(err)) + else: + print("Commissioning complete") + self.state = DCState.IDLE + self._ChipStack.callbackRes = err + self._ChipStack.commissioningEventRes = err + self._ChipStack.commissioningCompleteEvent.set() + self._ChipStack.completeEvent.set() + def HandleKeyExchangeComplete(err): if err != 0: print("Failed to establish secure session to device: {}".format(err)) @@ -157,23 +181,18 @@ def HandleKeyExchangeComplete(err): err) else: print("Established secure session with Device") + if self.state != DCState.COMMISSIONING: # During Commissioning, HandleKeyExchangeComplete will also be called, # in this case the async operation should be marked as finished by # HandleCommissioningComplete instead this function. self.state = DCState.IDLE self._ChipStack.completeEvent.set() - - def HandleCommissioningComplete(nodeid, err): - if err != 0: - print("Failed to commission: {}".format(err)) else: - print("Commissioning complete") - self.state = DCState.IDLE - self._ChipStack.callbackRes = err - self._ChipStack.commissioningEventRes = err - self._ChipStack.commissioningCompleteEvent.set() - self._ChipStack.completeEvent.set() + # When commissioning, getting an error during key exhange + # needs to unblock the entire commissioning flow. + if err != 0: + HandleCommissioningComplete(0, err) self.cbHandleKeyExchangeCompleteFunct = _DevicePairingDelegate_OnPairingCompleteFunct( HandleKeyExchangeComplete) @@ -345,6 +364,44 @@ def CheckTestCommissionerCallbacks(self): def CheckTestCommissionerPaseConnection(self, nodeid): return self._dmLib.pychip_TestPaseConnection(nodeid) + def CommissionOnNetwork(self, nodeId: int, setupPinCode: int, filterType: DiscoveryFilterType = DiscoveryFilterType.NONE, filter: typing.Any = None): + ''' + Does the routine for OnNetworkCommissioning, with a filter for mDNS discovery. + Supported filters are: + + DiscoveryFilterType.NONE + DiscoveryFilterType.SHORT_DISCRIMINATOR + DiscoveryFilterType.LONG_DISCRIMINATOR + DiscoveryFilterType.VENDOR_ID + DiscoveryFilterType.DEVICE_TYPE + DiscoveryFilterType.COMMISSIONING_MODE + DiscoveryFilterType.INSTANCE_NAME + DiscoveryFilterType.COMMISSIONER + DiscoveryFilterType.COMPRESSED_FABRIC_ID + + The filter can be an integer, a string or None depending on the actual type of selected filter. + ''' + self.CheckIsActive() + + # IP connection will run through full commissioning, so we need to wait + # for the commissioning complete event, not just any callback. + self.state = DCState.COMMISSIONING + + # Convert numerical filters to string for passing down to binding. + if isinstance(filter, int): + filter = str(filter) + + self._ChipStack.commissioningCompleteEvent.clear() + + self._ChipStack.CallAsync( + lambda: self._dmLib.pychip_DeviceController_OnNetworkCommission( + self.devCtrl, nodeId, setupPinCode, int(filterType), str(filter).encode("utf-8") + b"\x00" if filter is not None else None) + ) + if not self._ChipStack.commissioningCompleteEvent.isSet(): + # Error 50 is a timeout + return False + return self._ChipStack.commissioningEventRes == 0 + def CommissionWithCode(self, setupPayload: str, nodeid: int): self.CheckIsActive() @@ -1076,6 +1133,9 @@ def _InitLib(self): c_void_p, c_uint64] self._dmLib.pychip_DeviceController_Commission.restype = c_uint32 + self._dmLib.pychip_DeviceController_OnNetworkCommission.argtypes = [c_void_p, c_uint64, c_uint32, c_uint8, c_char_p] + self._dmLib.pychip_DeviceController_OnNetworkCommission.restype = c_uint32 + self._dmLib.pychip_DeviceController_DiscoverAllCommissionableNodes.argtypes = [ c_void_p] self._dmLib.pychip_DeviceController_DiscoverAllCommissionableNodes.restype = c_uint32 diff --git a/src/controller/python/chip/ChipStack.py b/src/controller/python/chip/ChipStack.py index ffe321a5fc4bdf..5120e68fb8b3b2 100644 --- a/src/controller/python/chip/ChipStack.py +++ b/src/controller/python/chip/ChipStack.py @@ -324,6 +324,7 @@ def setLogFunct(self, logFunct): def Shutdown(self): # Make sure PersistentStorage is destructed before chipStack # to avoid accessing builtins.chipStack after destruction. + self._persistentStorage.Shutdown() self._persistentStorage = None self.Call(lambda: self._ChipStackLib.pychip_DeviceController_StackShutdown()) @@ -339,6 +340,8 @@ def Shutdown(self): self.devMgr = None self.callbackRes = None + delattr(builtins, "chipStack") + def Call(self, callFunct, timeoutMs: int = None): '''Run a Python function on CHIP stack, and wait for the response. This function is a wrapper of PostTaskOnChipThread, which includes some handling of application specific logics. diff --git a/src/controller/python/chip/FabricAdmin.py b/src/controller/python/chip/FabricAdmin.py index a1be9a180be237..4af0e38ad4f746 100644 --- a/src/controller/python/chip/FabricAdmin.py +++ b/src/controller/python/chip/FabricAdmin.py @@ -34,9 +34,9 @@ class FabricAdmin: - ''' Administers a specific fabric as identified by the tuple of RCAC subject public key and Fabric ID. - The Fabric ID can be passed into the constructor while the RCAC and ICAC are generated. - The Fabric ID *does not* have to be unique across multiple FabricAdmin instances as + ''' Administers a specific fabric as identified by the tuple of RCAC subject public key and Fabric ID. + The Fabric ID can be passed into the constructor while the RCAC and ICAC are generated. + The Fabric ID *does not* have to be unique across multiple FabricAdmin instances as it is scoped to the key pair used by the root CA and whose public key is in the RCAC. Each admin is identified by an 'admin index' that is unique to the running @@ -51,10 +51,10 @@ class FabricAdmin: Each instance of the fabric admin is associated with a single instance of the OperationalCredentialsAdapter. This adapter instance implements - the OperationalCredentialsDelegate and is meant to provide a Python + the OperationalCredentialsDelegate and is meant to provide a Python adapter to the functions in that delegate so that the fabric admin can in turn, provide users the ability to generate their own NOCs for devices - on the network (not implemented yet). For now, it relies on the in-built + on the network (not implemented yet). For now, it relies on the in-built ExampleOperationalCredentialsIssuer to do that. TODO: Add support for FabricAdmin to permit callers to hook up their own GenerateNOC @@ -89,13 +89,14 @@ def __init__(self, vendorId: int, adminIndex: int = None, fabricId: int = 1): vendorId: Valid operational Vendor ID associated with this fabric. adminIndex: Local index to be associated with this fabric. This is NOT the fabric index. Each controller on the fabric - is assigned a unique fabric index. + is assigned a unique fabric index. If omitted, one will be automatically assigned. fabricId: Fabric ID to be associated with this fabric. This is scoped to the public key of the resultant root generated by the underlying ExampleOperationalCredentialsIssuer. ''' + self._handle = chip.native.GetLibraryHandle() if (vendorId is None or vendorId == 0): raise ValueError( diff --git a/src/controller/python/chip/storage/__init__.py b/src/controller/python/chip/storage/__init__.py index 20d04eca00ca83..521fd15f699586 100644 --- a/src/controller/python/chip/storage/__init__.py +++ b/src/controller/python/chip/storage/__init__.py @@ -90,6 +90,7 @@ class PersistentStorage: def __init__(self, path: str): self._path = path self._handle = chip.native.GetLibraryHandle() + self._isActive = True try: self._file = open(path, 'r') @@ -170,7 +171,15 @@ def DeleteSdkKey(self, key: str): def GetUnderlyingStorageAdapter(self): return self._storageAdapterObj - def __del__(self): + def Shutdown(self): builtins.chipStack.Call( lambda: self._handle.pychip_Storage_ShutdownAdapter() ) + + self._isActive = False + + def __del__(self): + if (self._isActive): + builtins.chipStack.Call( + lambda: self._handle.pychip_Storage_ShutdownAdapter() + ) diff --git a/src/python_testing/hello_test.py b/src/python_testing/hello_test.py new file mode 100644 index 00000000000000..51a380b9e909f9 --- /dev/null +++ b/src/python_testing/hello_test.py @@ -0,0 +1,26 @@ +from matter_testing_support import MatterBaseTest, default_matter_test_main, async_test_body +from chip.interaction_model import Status +import chip.clusters as Clusters +import logging +from mobly import asserts + + +class HelloTest(MatterBaseTest): + @async_test_body + async def test_names_as_expected(self): + dev_ctrl = self.default_controller + vendor_name = await self.read_single_attribute(dev_ctrl, self.dut_node_id, 0, Clusters.Basic.Attributes.VendorName) + + logging.info("Found VendorName: %s" % (vendor_name)) + asserts.assert_equal(vendor_name, "TEST_VENDOR", "VendorName must be TEST_VENDOR!") + + @async_test_body + async def test_failure_on_wrong_endpoint(self): + dev_ctrl = self.default_controller + result = await self.read_single_attribute(dev_ctrl, self.dut_node_id, 9999, Clusters.Basic.Attributes.ProductName) + asserts.assert_true(isinstance(result, Clusters.Attribute.ValueDecodeFailure), "Should fail to read on endpoint 9999") + asserts.assert_equal(result.Reason.status, Status.UnsupportedEndpoint, "Failure reason should be UnsupportedEndpoint") + + +if __name__ == "__main__": + default_matter_test_main() diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py new file mode 100644 index 00000000000000..660f1c85d12d49 --- /dev/null +++ b/src/python_testing/matter_testing_support.py @@ -0,0 +1,704 @@ +# +# Copyright (c) 2022 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import argparse +from binascii import unhexlify, hexlify +import logging +from chip import ChipDeviceCtrl +import chip.clusters as Clusters +from chip.ChipStack import * +from chip.storage import PersistentStorage +import chip.logging +import chip.native +import chip.FabricAdmin +from chip.utils import CommissioningBuildingBlocks +import builtins +from typing import Optional +from dataclasses import dataclass, field +from dataclasses import asdict as dataclass_asdict +import re +import os +import sys +import pathlib +import json +import uuid +import asyncio + +from mobly import base_test +from mobly.config_parser import TestRunConfig, ENV_MOBLY_LOGPATH +from mobly import logger +from mobly import signals +from mobly import utils +from mobly.test_runner import TestRunner + + +# TODO: Add utility to commission a device if needed +# TODO: Add utilities to keep track of controllers/fabrics + +logger = logging.getLogger("matter.python_testing") +logger.setLevel(logging.INFO) + +DiscoveryFilterType = ChipDeviceCtrl.DiscoveryFilterType + +_DEFAULT_ADMIN_VENDOR_ID = 0xFFF1 +_DEFAULT_STORAGE_PATH = "admin_storage.json" +_DEFAULT_LOG_PATH = "/tmp/matter_testing/logs" +_DEFAULT_CONTROLLER_NODE_ID = 112233 +_DEFAULT_DUT_NODE_ID = 111 +_DEFAULT_TRUST_ROOT_INDEX = 1 + +# Mobly cannot deal with user config passing of ctypes objects, +# so we use this dict of uuid -> object to recover items stashed +# by reference. +_GLOBAL_DATA = {} + + +def stash_globally(o: object) -> str: + id = str(uuid.uuid1()) + _GLOBAL_DATA[id] = o + return id + + +def unstash_globally(id: str) -> object: + return _GLOBAL_DATA.get(id) + + +def default_paa_rootstore_from_root(root_path: pathlib.Path) -> Optional[pathlib.Path]: + """Attempt to find a PAA trust store following SDK convention at `root_path` + + This attempts to find {root_path}/credentials/development/paa-root-certs. + + Returns the fully resolved path on success or None if not found. + """ + start_path = root_path.resolve() + cred_path = start_path.joinpath("credentials") + dev_path = cred_path.joinpath("development") + paa_path = dev_path.joinpath("paa-root-certs") + + return paa_path.resolve() if all([path.exists() for path in [cred_path, dev_path, paa_path]]) else None + + +def get_default_paa_trust_store(root_path: pathlib.Path) -> pathlib.Path: + """Attempt to find a PAA trust store starting at `root_path`. + + This tries to find by various heuristics, and goes up one level at a time + until found. After a given number of levels, it will stop. + + This returns `root_path` if not PAA store is not found. + """ + # TODO: Add heuristics about TH default PAA location + cur_dir = pathlib.Path.cwd() + max_levels = 10 + + for level in range(max_levels): + paa_trust_store_path = default_paa_rootstore_from_root(cur_dir) + if paa_trust_store_path is not None: + return paa_trust_store_path + + # Go back one level + cur_dir = cur_dir.joinpath("..") + else: + # On not having found a PAA dir, just return current dir to avoid blow-ups + return pathlib.Path.cwd() + + +@dataclass +class MatterTestConfig: + storage_path: pathlib.Path = None + logs_path: pathlib.Path = None + paa_trust_store_path: pathlib.Path = None + + admin_vendor_id: int = _DEFAULT_ADMIN_VENDOR_ID + global_test_params: dict = field(default_factory=dict) + # List of explicit tests to run by name. If empty, all tests will run + tests: list[str] = field(default_factory=list) + + commissioning_method: str = None + discriminator: int = None + setup_passcode: int = None + + qr_code_content: str = None + manual_code: str = None + + wifi_ssid: str = None + wifi_passphrase: str = None + thread_operational_dataset: str = None + + # Node ID for basic DUT + dut_node_id: int = _DEFAULT_DUT_NODE_ID + # Node ID to use for controller/commissioner + controller_node_id: int = _DEFAULT_CONTROLLER_NODE_ID + # Fabric ID which to use + fabric_id: int = None + # "Alpha" by default + root_of_trust_index: int = _DEFAULT_TRUST_ROOT_INDEX + + # If this is set, we will reuse root of trust keys at that location + chip_tool_credentials_path: pathlib.Path = None + + +class MatterStackState: + def __init__(self, config: MatterTestConfig): + self._logger = logger + self._config = config + self._fabric_admins = [] + + if not hasattr(builtins, "chipStack"): + chip.native.Init() + if config.storage_path is None: + raise ValueError("Must have configured a MatterTestConfig.storage_path") + self._init_stack(already_initialized=False, persistentStoragePath=config.storage_path) + self._we_initialized_the_stack = True + else: + self._init_stack(already_initialized=True) + self._we_initialized_the_stack = False + + def _init_stack(self, already_initialized: bool, **kwargs): + if already_initialized: + self._chip_stack = builtins.chipStack + self._logger.warn( + "Re-using existing ChipStack object found in current interpreter: storage path %s will be ignored!" % (self._config.storage_path)) + # TODO: Warn that storage will not follow what we set in config + else: + self._chip_stack = ChipStack(**kwargs) + builtins.chipStack = self._chip_stack + + self._storage = self._chip_stack.GetStorageManager() + + try: + admin_list = self._storage.GetReplKey('fabricAdmins') + found_admin_list = True + except KeyError: + found_admin_list = False + + if not found_admin_list: + self._logger.warn("No previous fabric administrative data found in persistent data: initializing a new one") + self._fabric_admins.append(chip.FabricAdmin.FabricAdmin(self._config.admin_vendor_id)) + else: + for admin_idx in admin_list: + self._logger.info( + f"Restoring FabricAdmin from storage to manage FabricId {admin_list[admin_idx]['fabricId']}, AdminIndex {admin_idx}") + self._fabric_admins.append(chip.FabricAdmin.FabricAdmin(vendorId=int(admin_list[admin_idx]['vendorId']), + fabricId=admin_list[admin_idx]['fabricId'], adminIndex=int(admin_idx))) + + # TODO: support getting access to chip-tool credentials issuer's data + + def Shutdown(self): + if self._we_initialized_the_stack: + # Unfortunately, all the below are singleton and possibly + # managed elsewhere so we have to be careful not to touch unless + # we initialized ourselves. + ChipDeviceCtrl.ChipDeviceController.ShutdownAll() + chip.FabricAdmin.FabricAdmin.ShutdownAll() + global_chip_stack = builtins.chipStack + global_chip_stack.Shutdown() + + @property + def fabric_admins(self): + return self._fabric_admins + + @property + def storage(self) -> PersistentStorage: + return self._storage + + @property + def stack(self) -> ChipStack: + return builtins.chipStack + + +def bytes_from_hex(hex: str) -> bytes: + """Converts any `hex` string representation including `01:ab:cd` to bytes + + Handles any whitespace including newlines, which are all stripped. + """ + return unhexlify("".join(hex.replace(":", "").replace(" ", "").split())) + + +def hex_from_bytes(b: bytes) -> str: + """Converts a bytes object `b` into a hex string (reverse of bytes_from_hex)""" + return hexlify(b).decode("utf-8") + + +class MatterBaseTest(base_test.BaseTestClass): + def __init__(self, *args): + super().__init__(*args) + + @property + def matter_test_config(self) -> MatterTestConfig: + return unstash_globally(self.user_params.get("matter_test_config")) + + @property + def default_controller(self) -> ChipDeviceCtrl: + return unstash_globally(self.user_params.get("default_controller")) + + @property + def matter_stack(self) -> MatterStackState: + return unstash_globally(self.user_params.get("matter_stack")) + + @property + def dut_node_id(self) -> int: + return self.matter_test_config.dut_node_id + + async def read_single_attribute(self, dev_ctrl: ChipDeviceCtrl, node_id: int, endpoint: int, attribute: object) -> object: + result = await dev_ctrl.ReadAttribute(node_id, [(endpoint, attribute)]) + data = result[endpoint] + return list(data.values())[0][attribute] + + +def generate_mobly_test_config(matter_test_config: MatterTestConfig): + test_run_config = TestRunConfig() + # We use a default name. We don't use Mobly YAML configs, so that we can be + # freestanding without relying + test_run_config.testbed_name = "MatterTest" + + log_path = matter_test_config.logs_path + log_path = _DEFAULT_LOG_PATH if log_path is None else log_path + if ENV_MOBLY_LOGPATH in os.environ: + log_path = os.environ[ENV_MOBLY_LOGPATH] + + test_run_config.log_path = log_path + # TODO: For later, configure controllers + test_run_config.controller_configs = {} + + test_run_config.user_params = matter_test_config.global_test_params + + return test_run_config + + +def _find_test_class(): + """Finds the test class in a test script. + Walk through module members and find the subclass of MatterBaseTest. Only + one subclass is allowed in a test script. + Returns: + The test class in the test module. + Raises: + SystemExit: Raised if the number of test classes is not exactly one. + """ + subclasses = utils.find_subclasses_in_module([MatterBaseTest], sys.modules['__main__']) + subclasses = [c for c in subclasses if c.__name__ != "MatterBaseTest"] + if len(subclasses) != 1: + print( + 'Exactly one subclass of `MatterBaseTest` should be in the main file. Found %s.' % + str([subclass.__name__ for subclass in subclasses])) + sys.exit(1) + + return subclasses[0] + + +def int_decimal_or_hex(s: str) -> int: + val = int(s, 0) + if val < 0: + raise ValueError("Negative values not supported") + return val + + +def byte_string_from_hex(s: str) -> bytes: + return unhexlify(s.replace(":", "").replace(" ", "").replace("0x", "")) + + +def int_from_manual_code(s: str) -> int: + regex = r"^([0-9]{11}|[0-9]{21})$" + match = re.match(regex, s) + if not match: + raise ValueError("Invalid manual code format, does not match %s" % regex) + + return int(s, 10) + + +def int_named_arg(s: str) -> tuple[str, int]: + regex = r"^(?P[a-zA-Z_0-9.]+):((?P0x[0-9a-fA-F_]+)|(?P-?\d+))$" + match = re.match(regex, s) + if not match: + raise ValueError("Invalid int argument format, does not match %s" % regex) + + name = match.group("name") + if match.group("hex_value"): + value = int(match.group("hex_value"), 0) + else: + value = int(match.group("decimal_value"), 10) + return (name, value) + + +def str_named_arg(s: str) -> tuple[str, str]: + regex = r"^(?P[a-zA-Z_0-9.]+):(?P.*)$" + match = re.match(regex, s) + if not match: + raise ValueError("Invalid string argument format, does not match %s" % regex) + + return (match.group("name"), match.group("value")) + + +def float_named_arg(s: str) -> tuple[str, float]: + regex = r"^(?P[a-zA-Z_0-9.]+):(?P.*)$" + match = re.match(regex, s) + if not match: + raise ValueError("Invalid float argument format, does not match %s" % regex) + + name = match.group("name") + value = float(match.group("value")) + + return (name, value) + + +def json_named_arg(s: str) -> tuple[str, object]: + regex = r"^(?P[a-zA-Z_0-9.]+):(?P.*)$" + match = re.match(regex, s) + if not match: + raise ValueError("Invalid JSON argument format, does not match %s" % regex) + + name = match.group("name") + value = json.loads(match.group("value")) + + return (name, value) + + +def bool_named_arg(s: str) -> tuple[str, bool]: + regex = r"^(?P[a-zA-Z_0-9.]+):((?Ptrue|false)|(?P[01]))$" + match = re.match(regex, s.lower()) + if not match: + raise ValueError("Invalid bool argument format, does not match %s" % regex) + + name = match.group("name") + if match.group("truth_value"): + value = True if match.group("truth_value") == "true" else False + else: + value = int(match.group("decimal_value")) != 0 + + return (name, value) + + +def bytes_as_hex_named_arg(s: str) -> tuple[str, bytes]: + regex = r"^(?P[a-zA-Z_0-9.]+):(?P[0-9a-fA-F:]+)$" + match = re.match(regex, s) + if not match: + raise ValueError("Invalid bytes as hex argument format, does not match %s" % regex) + + name = match.group("name") + value_str = match.group("value") + value_str = value_str.replace(":", "") + if len(value_str) % 2 != 0: + raise ValueError("Byte string argument value needs to be event number of hex chars") + value = unhexlify(value_str) + + return (name, value) + + +def root_index(s: str) -> int: + CHIP_TOOL_COMPATIBILITY = { + "alpha": 1, + "beta": 2, + "gamma": 3 + } + + for name, id in CHIP_TOOL_COMPATIBILITY.items(): + if s.lower() == name: + return id + else: + root_index = int(s) + if root_index == 0: + raise ValueError("Only support root index >= 1") + return root_index + + +def populate_commissioning_args(args: argparse.Namespace, config: MatterTestConfig) -> bool: + if args.commissioning_method is None: + return True + + config.commissioning_method = args.commissioning_method + + if args.dut_node_id is None: + print("error: When --commissioning-method present, --dut-node-id is mandatory!") + return False + config.dut_node_id = args.dut_node_id + + if args.discriminator is None and (args.qr_code is None and args.manual_code is None): + print("error: Missing --discriminator when no --qr-code/--manual-code present!") + return False + config.discriminator = args.discriminator + + if args.passcode is None and (args.qr_code is None and args.manual_code is None): + print("error: Missing --passcode when no --qr-code/--manual-code present!") + return False + config.setup_passcode = args.passcode + + if args.qr_code is not None and args.manual_code is not None: + print("error: Cannot have both --qr-code and --manual-code present!") + return False + + config.qr_code_content = args.qr_code + config.manual_code = args.manual_code + + config.root_of_trust_index = args.root_index + # Follow root of trust index if ID not provided to have same behavior as legacy + # chip-tool that fabricID == commissioner_name == root of trust index + config.fabric_id = args.fabric_id if args.fabric_id is not None else config.root_of_trust_index + + if args.chip_tool_credentials_path is not None and not args.chip_tool_credentials_path.exists(): + print("error: chip-tool credentials path %s doesn't exist!" % args.chip_tool_credentials_path) + return False + config.chip_tool_credentials_path = args.chip_tool_credentials_path + + if config.commissioning_method == "ble-wifi": + if args.wifi_ssid is None: + print("error: missing --wifi-ssid for --commissioning-method ble-wifi!") + return False + + if args.wifi_passphrase is None: + print("error: missing --wifi-passphrase for --commissioning-method ble-wifi!") + return False + + config.wifi_ssid = args.wifi_ssid + config.wifi_passphrase = args.wifi_passphrase + elif config.commissioning_method == "ble-thread": + if args.thread_dataset_hex is None: + print("error: missing --thread-dataset-hex for --commissioning-method ble-thread!") + return False + config.thread_operational_dataset = args.thread_dataset_hex + + return True + + +def convert_args_to_matter_config(args: argparse.Namespace) -> MatterTestConfig: + config = MatterTestConfig() + + # Populate commissioning config if present, exiting on error + if not populate_commissioning_args(args, config): + sys.exit(1) + + config.storage_path = pathlib.Path(_DEFAULT_STORAGE_PATH) if args.storage_path is None else args.storage_path + config.logs_path = pathlib.Path(_DEFAULT_LOG_PATH) if args.logs_path is None else args.logs_path + config.paa_trust_store_path = args.paa_trust_store_path + + config.controller_node_id = args.controller_node_id + + # Accumulate all command-line-passed named args + all_global_args = [] + argsets = [item for item in (args.int_arg, args.float_arg, args.string_arg, args.json_arg, + args.hex_arg, args.bool_arg) if item is not None] + for argset in argsets: + all_global_args.extend(argset) + + config.global_test_params = {} + for name, value in all_global_args: + config.global_test_params[name] = value + + # Embed the rest of the config in the global test params dict which will be passed to Mobly tests + config.global_test_params["meta_config"] = {k: v for k, v in dataclass_asdict(config).items() if k != "global_test_params"} + + return config + + +def parse_matter_test_args(argv: list[str]) -> MatterTestConfig: + parser = argparse.ArgumentParser(description='Matter standalone Python test') + + basic_group = parser.add_argument_group(title="Basic arguments", description="Overall test execution arguments") + + basic_group.add_argument('--tests', + '--test_case', + action="store", + nargs='+', + type=str, + metavar='test_a test_b...', + help='A list of tests in the test class to execute.') + + basic_group.add_argument('--storage-path', action="store", type=pathlib.Path, + metavar="PATH", help="Location for persisted storage of instance") + basic_group.add_argument('--logs-path', action="store", type=pathlib.Path, metavar="PATH", help="Location for test logs") + paa_path_default = get_default_paa_trust_store(pathlib.Path.cwd()) + basic_group.add_argument('--paa-trust-store-path', action="store", type=pathlib.Path, metavar="PATH", default=paa_path_default, + help="PAA trust store path (default: %s)" % str(paa_path_default)) + basic_group.add_argument('-N', '--controller-node-id', type=int_decimal_or_hex, + metavar='NODE_ID', + default=_DEFAULT_CONTROLLER_NODE_ID, + help='NodeID to use for initial/default controller (default: %d)' % _DEFAULT_CONTROLLER_NODE_ID) + basic_group.add_argument('-n', '--dut-node-id', type=int_decimal_or_hex, + metavar='NODE_ID', default=_DEFAULT_DUT_NODE_ID, + help='Node ID for primary DUT communication, and NodeID to assign if commissioning (default: %d)' % _DEFAULT_DUT_NODE_ID) + + commission_group = parser.add_argument_group(title="Commissioning", description="Arguments to commission a node") + + commission_group.add_argument('-m', '--commissioning-method', type=str, + metavar='METHOD_NAME', + choices=["on-network", "ble-wifi", "ble-thread"], + help='Name of commissioning method to use') + commission_group.add_argument('-d', '--discriminator', type=int_decimal_or_hex, + metavar='LONG_DISCRIMINATOR', + help='Discriminator to use for commissioning') + commission_group.add_argument('-p', '--passcode', type=int_decimal_or_hex, + metavar='PASSCODE', + help='PAKE passcode to use') + + commission_group.add_argument('--wifi-ssid', type=str, + metavar='SSID', + help='Wi-Fi SSID for ble-wifi commissioning') + commission_group.add_argument('--wifi-passphrase', type=str, + metavar='PASSPHRASE', + help='Wi-Fi passphrase for ble-wifi commissioning') + + commission_group.add_argument('--thread-dataset-hex', type=byte_string_from_hex, + metavar='OPERATIONAL_DATASET_HEX', + help='Thread operational dataset as a hex string for ble-thread commissioning') + + commission_group.add_argument('--admin-vendor-id', action="store", type=int_decimal_or_hex, default=_DEFAULT_ADMIN_VENDOR_ID, + metavar="VENDOR_ID", help="VendorID to use during commissioning (default 0x%04X)" % _DEFAULT_ADMIN_VENDOR_ID) + + code_group = parser.add_mutually_exclusive_group(required=False) + + code_group.add_argument('-q', '--qr-code', type=str, + metavar="QR_CODE", help="QR setup code content (overrides passcode and discriminator)") + code_group.add_argument('--manual-code', type=int_from_manual_code, + metavar="MANUAL_CODE", help="Manual setup code content (overrides passcode and discriminator)") + + fabric_group = parser.add_argument_group( + title="Fabric selection", description="Fabric selection for single-fabric basic usage, and commissioning") + fabric_group.add_argument('-f', '--fabric-id', type=int_decimal_or_hex, + metavar='FABRIC_ID', + help='Fabric ID on which to operate under the root of trust') + + fabric_group.add_argument('-r', '--root-index', type=root_index, + metavar='ROOT_INDEX_OR_NAME', default=_DEFAULT_TRUST_ROOT_INDEX, + help='Root of trust under which to operate/commission for single-fabric basic usage. alpha/beta/gamma are aliases for 1/2/3. Default (%d)' % _DEFAULT_TRUST_ROOT_INDEX) + + fabric_group.add_argument('-c', '--chip-tool-credentials-path', type=pathlib.Path, + metavar='PATH', + help='Path to chip-tool credentials file root') + + args_group = parser.add_argument_group(title="Config arguments", description="Test configuration global arguments set") + args_group.add_argument('--int-arg', nargs='*', type=int_named_arg, metavar="NAME:VALUE", + help="Add a named test argument for an integer as hex or decimal (e.g. -2 or 0xFFFF_1234)") + args_group.add_argument('--bool-arg', nargs='*', type=bool_named_arg, metavar="NAME:VALUE", + help="Add a named test argument for an boolean value (e.g. true/false or 0/1)") + args_group.add_argument('--float-arg', nargs='*', type=float_named_arg, metavar="NAME:VALUE", + help="Add a named test argument for a floating point value (e.g. -2.1 or 6.022e23)") + args_group.add_argument('--string-arg', nargs='*', type=str_named_arg, metavar="NAME:VALUE", + help="Add a named test argument for a string value") + args_group.add_argument('--json-arg', nargs='*', type=json_named_arg, metavar="NAME:VALUE", + help="Add a named test argument for JSON stored as a list or dict") + args_group.add_argument('--hex-arg', nargs='*', type=bytes_as_hex_named_arg, metavar="NAME:VALUE", + help="Add a named test argument for an octet string in hex (e.g. 0011cafe or 00:11:CA:FE)") + + if not argv: + argv = sys.argv[1:] + + return convert_args_to_matter_config(parser.parse_known_args(argv)[0]) + + +def async_test_body(body): + """Decorator required to be applied whenever a `test_*` method is `async def`. + + Since Mobly doesn't support asyncio directly, and the test methods are called + synchronously, we need a mechanism to allow an `async def` to be converted to + a asyncio-run synchronous method. This decorator does the wrapping. + """ + def async_runner(*args, **kwargs): + return asyncio.run(body(*args, **kwargs)) + + return async_runner + + +class CommissionDeviceTest(MatterBaseTest): + """Test class auto-injected at the start of test list to commission a device when requested""" + + def test_run_commissioning(self): + conf = self.matter_test_config + logging.info("Starting commissioning for root index %d, fabric ID 0x%016X, node ID 0x%016X" % + (conf.root_of_trust_index, conf.fabric_id, conf.dut_node_id)) + logging.info("Commissioning method: %s" % conf.commissioning_method) + + if not self._commission_device(): + raise signals.TestAbortAll("Failed to commission node") + + def _commission_device(self) -> bool: + dev_ctrl = self.default_controller + conf = self.matter_test_config + + # TODO: support by manual code and QR + + if conf.commissioning_method == "on-network": + return dev_ctrl.CommissionOnNetwork(nodeId=conf.dut_node_id, setupPinCode=conf.setup_passcode, filterType=DiscoveryFilterType.LONG_DISCRIMINATOR, filter=conf.discriminator) + elif conf.commissioning_method == "ble-wifi": + return dev_ctrl.CommissionWiFi(conf.discriminator, conf.setup_passcode, conf.dut_node_id, conf.wifi_ssid, conf.wifi_passphrase) + elif conf.commissioning_method == "ble-thread": + return dev_ctrl.CommissionThread(conf.discriminator, conf.setup_passcode, conf.dut_node_id, conf.thread_operational_dataset) + else: + raise ValueError("Invalid commissioning method %s!" % conf.commissioning_method) + + +def default_matter_test_main(argv=None): + """Execute the test class in a test module. + This is the default entry point for running a test script file directly. + In this case, only one test class in a test script is allowed. + To make your test script executable, add the following to your file: + .. code-block:: python + from matter_testing_support.py import default_matter_test_main + ... + if __name__ == '__main__': + default_matter_test_main.main() + Args: + argv: A list that is then parsed as command line args. If None, defaults to sys.argv + """ + matter_test_config = parse_matter_test_args(argv) + + # Find the test class in the test script. + test_class = _find_test_class() + + # Load test config file. + test_config = generate_mobly_test_config(matter_test_config) + + # Parse test specifiers if exist. + tests = None + if len(matter_test_config.tests) > 0: + tests = matter_test_config.tests + + stack = MatterStackState(matter_test_config) + test_config.user_params["matter_stack"] = stash_globally(stack) + + # TODO: Steer to right FabricAdmin! + default_controller = stack.fabric_admins[0].NewController(nodeId=matter_test_config.controller_node_id, + paaTrustStorePath=str(matter_test_config.paa_trust_store_path)) + test_config.user_params["default_controller"] = stash_globally(default_controller) + + test_config.user_params["matter_test_config"] = stash_globally(matter_test_config) + + # Execute the test class with the config + ok = True + + runner = TestRunner(log_dir=test_config.log_path, + testbed_name=test_config.testbed_name) + + with runner.mobly_logger(): + if matter_test_config.commissioning_method is not None: + runner.add_test_class(test_config, CommissionDeviceTest, None) + + runner.add_test_class(test_config, test_class, tests) + try: + runner.run() + ok = runner.results.is_all_pass and ok + except signals.TestAbortAll: + ok = False + except Exception: + logging.exception('Exception when executing %s.', test_config.testbed_name) + ok = False + + # Shutdown the stack when all done + stack.Shutdown() + + if ok: + logging.info("Final result: PASS !") + else: + logging.error("Final result: FAIL !") + sys.exit(1)