diff --git a/credentials/fetch-paa-certs-from-dcl.py b/credentials/fetch_paa_certs_from_dcl.py similarity index 69% rename from credentials/fetch-paa-certs-from-dcl.py rename to credentials/fetch_paa_certs_from_dcl.py index d440398c472be7..3bcd74b6534e26 100644 --- a/credentials/fetch-paa-certs-from-dcl.py +++ b/credentials/fetch_paa_certs_from_dcl.py @@ -37,6 +37,9 @@ PRODUCTION_NODE_URL_REST = "https://on.dcl.csa-iot.org" TEST_NODE_URL_REST = "https://on.test-net.dcl.csa-iot.org" +MATTER_CERT_CA_SUBJECT = "MFIxDDAKBgNVBAoMA0NTQTEsMCoGA1UEAwwjTWF0dGVyIENlcnRpZmljYXRpb24gYW5kIFRlc3RpbmcgQ0ExFDASBgorBgEEAYKifAIBDARDNUEw" +MATTER_CERT_CA_SUBJECT_KEY_ID = "97:E4:69:D0:C5:04:14:C2:6F:C7:01:F7:7E:94:77:39:09:8D:F6:A5" + def parse_paa_root_certs(cmdpipe, paa_list): """ @@ -73,13 +76,14 @@ def parse_paa_root_certs(cmdpipe, paa_list): else: if b': ' in line: key, value = line.split(b': ') - result[key.strip(b' -').decode("utf-8")] = value.strip().decode("utf-8") + result[key.strip(b' -').decode("utf-8") + ] = value.strip().decode("utf-8") parse_paa_root_certs.counter += 1 if parse_paa_root_certs.counter % 2 == 0: paa_list.append(copy.deepcopy(result)) -def write_paa_root_cert(certificate, subject): +def write_cert(certificate, subject): filename = 'dcld_mirror_' + \ re.sub('[^a-zA-Z0-9_-]', '', re.sub('[=, ]', '_', subject)) with open(filename + '.pem', 'w+') as outfile: @@ -93,7 +97,8 @@ def write_paa_root_cert(certificate, subject): serialization.Encoding.DER) outfile.write(der_certificate) except (IOError, ValueError) as e: - print(f"ERROR: Failed to convert {filename + '.pem'}: {str(e)}. Skipping...") + print( + f"ERROR: Failed to convert {filename + '.pem'}: {str(e)}. Skipping...") def parse_paa_root_cert_from_dcld(cmdpipe): @@ -133,7 +138,38 @@ def use_dcld(dcld, production, cmdlist): @optgroup.option('--paa-trust-store-path', default='paa-root-certs', type=str, metavar='PATH', help="PAA trust store path (default: paa-root-certs)") def main(use_main_net_dcld, use_test_net_dcld, use_main_net_http, use_test_net_http, paa_trust_store_path): """DCL PAA mirroring tools""" + fetch_paa_certs(use_main_net_dcld, use_test_net_dcld, use_main_net_http, use_test_net_http, paa_trust_store_path) + + +def get_cert_from_rest(rest_node_url, subject, subject_key_id): + response = requests.get( + f"{rest_node_url}/dcl/pki/certificates/{subject}/{subject_key_id}").json()["approvedCertificates"]["certs"][0] + certificate = response["pemCert"].rstrip("\n") + subject = response["subjectAsText"] + return certificate, subject + + +def fetch_cd_signing_certs(store_path): + ''' Only supports using main net http currently.''' + rest_node_url = PRODUCTION_NODE_URL_REST + os.makedirs(store_path, exist_ok=True) + original_dir = os.getcwd() + os.chdir(store_path) + cd_signer_ids = requests.get( + f"{rest_node_url}/dcl/pki/child-certificates/{MATTER_CERT_CA_SUBJECT}/{MATTER_CERT_CA_SUBJECT_KEY_ID}").json()['childCertificates']['certIds'] + for signer in cd_signer_ids: + subject = signer['subject'] + subject_key_id = signer['subjectKeyId'] + certificate, subject = get_cert_from_rest(rest_node_url, subject, subject_key_id) + + print(f"Downloaded CD signing cert with subject: {subject}") + write_cert(certificate, subject) + + os.chdir(original_dir) + + +def fetch_paa_certs(use_main_net_dcld, use_test_net_dcld, use_main_net_http, use_test_net_http, paa_trust_store_path): production = False dcld = use_test_net_dcld @@ -148,36 +184,43 @@ def main(use_main_net_dcld, use_test_net_dcld, use_main_net_http, use_test_net_h rest_node_url = PRODUCTION_NODE_URL_REST if production else TEST_NODE_URL_REST os.makedirs(paa_trust_store_path, exist_ok=True) + original_dir = os.getcwd() os.chdir(paa_trust_store_path) if use_rest: - paa_list = requests.get(f"{rest_node_url}/dcl/pki/root-certificates").json()["approvedRootCertificates"]["certs"] + paa_list = requests.get( + f"{rest_node_url}/dcl/pki/root-certificates").json()["approvedRootCertificates"]["certs"] else: cmdlist = ['query', 'pki', 'all-x509-root-certs'] - cmdpipe = subprocess.Popen(use_dcld(dcld, production, cmdlist), stdout=subprocess.PIPE, stderr=subprocess.PIPE) + cmdpipe = subprocess.Popen(use_dcld( + dcld, production, cmdlist), stdout=subprocess.PIPE, stderr=subprocess.PIPE) paa_list = [] parse_paa_root_certs.counter = 0 parse_paa_root_certs(cmdpipe, paa_list) for paa in paa_list: + if paa['subject'] == MATTER_CERT_CA_SUBJECT and paa['subjectKeyId'] == MATTER_CERT_CA_SUBJECT_KEY_ID: + # Don't include the CD signing cert as a PAA root. + continue if use_rest: - response = requests.get( - f"{rest_node_url}/dcl/pki/certificates/{paa['subject']}/{paa['subjectKeyId']}").json()["approvedCertificates"]["certs"][0] - certificate = response["pemCert"] - subject = response["subjectAsText"] + certificate, subject = get_cert_from_rest(rest_node_url, paa['subject'], paa['subjectKeyId']) else: - cmdlist = ['query', 'pki', 'x509-cert', '-u', paa['subject'], '-k', paa['subjectKeyId']] + cmdlist = ['query', 'pki', 'x509-cert', '-u', + paa['subject'], '-k', paa['subjectKeyId']] - cmdpipe = subprocess.Popen(use_dcld(dcld, production, cmdlist), stdout=subprocess.PIPE, stderr=subprocess.PIPE) + cmdpipe = subprocess.Popen(use_dcld( + dcld, production, cmdlist), stdout=subprocess.PIPE, stderr=subprocess.PIPE) (certificate, subject) = parse_paa_root_cert_from_dcld(cmdpipe) certificate = certificate.rstrip('\n') - print(f"Downloaded certificate with subject: {subject}") - write_paa_root_cert(certificate, subject) + print(f"Downloaded PAA certificate with subject: {subject}") + write_cert(certificate, subject) + + os.chdir(original_dir) if __name__ == "__main__": diff --git a/src/controller/python/chip/ChipDeviceCtrl.py b/src/controller/python/chip/ChipDeviceCtrl.py index 6e2e1336b5c837..736bfae0a5cf85 100644 --- a/src/controller/python/chip/ChipDeviceCtrl.py +++ b/src/controller/python/chip/ChipDeviceCtrl.py @@ -254,13 +254,18 @@ class DeviceProxyWrapper(): that is not an issue that needs to be accounted for and it will become very apparent if that happens. ''' + class DeviceProxyType(enum.Enum): + OPERATIONAL = enum.auto(), + COMMISSIONEE = enum.auto(), - def __init__(self, deviceProxy: ctypes.c_void_p, dmLib=None): + def __init__(self, deviceProxy: ctypes.c_void_p, proxyType, dmLib=None): self._deviceProxy = deviceProxy self._dmLib = dmLib + self._proxyType = proxyType def __del__(self): - if (self._dmLib is not None and hasattr(builtins, 'chipStack') and builtins.chipStack is not None): + # Commissionee device proxies are owned by the DeviceCommissioner. See #33031 + if (self._proxyType == self.DeviceProxyType.OPERATIONAL and self.self._dmLib is not None and hasattr(builtins, 'chipStack') and builtins.chipStack is not None): # This destructor is called from any threading context, including on the Matter threading context. # So, we cannot call chipStack.Call or chipStack.CallAsyncWithCompleteCallback which waits for the posted work to # actually be executed. Instead, we just post/schedule the work and move on. @@ -861,7 +866,23 @@ def GetClusterHandler(self): return self._Cluster - def GetConnectedDeviceSync(self, nodeid, allowPASE: bool = True, timeoutMs: int = None): + def FindOrEstablishPASESession(self, setupCode: str, nodeid: int, timeoutMs: int = None) -> typing.Optional[DeviceProxyWrapper]: + ''' Returns CommissioneeDeviceProxy if we can find or establish a PASE connection to the specified device''' + self.CheckIsActive() + returnDevice = c_void_p(None) + res = self._ChipStack.Call(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned( + self.devCtrl, nodeid, byref(returnDevice)), timeoutMs) + if res.is_success: + return DeviceProxyWrapper(returnDevice, DeviceProxyWrapper.DeviceProxyType.COMMISSIONEE, self._dmLib) + + self.EstablishPASESession(setupCode, nodeid) + + res = self._ChipStack.Call(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned( + self.devCtrl, nodeid, byref(returnDevice)), timeoutMs) + if res.is_success: + return DeviceProxyWrapper(returnDevice, DeviceProxyWrapper.DeviceProxyType.COMMISSIONEE, self._dmLib) + + def GetConnectedDeviceSync(self, nodeid, allowPASE=True, timeoutMs: int = None): ''' Gets an OperationalDeviceProxy or CommissioneeDeviceProxy for the specified Node. nodeId: Target's Node ID @@ -882,7 +903,7 @@ def GetConnectedDeviceSync(self, nodeid, allowPASE: bool = True, timeoutMs: int self.devCtrl, nodeid, byref(returnDevice)), timeoutMs) if res.is_success: logging.info('Using PASE connection') - return DeviceProxyWrapper(returnDevice) + return DeviceProxyWrapper(returnDevice, DeviceProxyWrapper.DeviceProxyType.COMMISSIONEE, self._dmLib) class DeviceAvailableClosure(): def deviceAvailable(self, device, err): @@ -916,7 +937,7 @@ def deviceAvailable(self, device, err): if returnDevice.value is None: returnErr.raise_on_error() - return DeviceProxyWrapper(returnDevice, self._dmLib) + return DeviceProxyWrapper(returnDevice, DeviceProxyWrapper.DeviceProxyType.OPERATIONAL, self._dmLib) async def WaitForActive(self, nodeid, *, timeoutSeconds=30.0, stayActiveDurationMs=30000): ''' Waits a LIT ICD device to become active. Will send a StayActive command to the device on active to allow human operations. @@ -948,7 +969,7 @@ async def GetConnectedDevice(self, nodeid, allowPASE: bool = True, timeoutMs: in self.devCtrl, nodeid, byref(returnDevice)), timeoutMs) if res.is_success: logging.info('Using PASE connection') - return DeviceProxyWrapper(returnDevice) + return DeviceProxyWrapper(returnDevice, DeviceProxyWrapper.DeviceProxyType.COMMISSIONEE, self._dmLib) eventLoop = asyncio.get_running_loop() future = eventLoop.create_future() @@ -987,7 +1008,7 @@ def deviceAvailable(self, device, err): else: await future - return DeviceProxyWrapper(future.result(), self._dmLib) + return DeviceProxyWrapper(future.result(), DeviceProxyWrapper.DeviceProxyType.OPERATIONAL, self._dmLib) def ComputeRoundTripTimeout(self, nodeid, upperLayerProcessingTimeoutMs: int = 0): ''' Returns a computed timeout value based on the round-trip time it takes for the peer at the other end of the session to diff --git a/src/python_testing/TC_DA_1_2.py b/src/python_testing/TC_DA_1_2.py index 759fa3eae3e2a7..08fb375139088b 100644 --- a/src/python_testing/TC_DA_1_2.py +++ b/src/python_testing/TC_DA_1_2.py @@ -20,6 +20,7 @@ import re import chip.clusters as Clusters +from basic_composition_support import BasicCompositionTests from chip.interaction_model import InteractionModelError, Status from chip.tlv import TLVReader from cryptography import x509 @@ -104,7 +105,7 @@ def parse_ids_from_certs(dac: x509.Certificate, pai: x509.Certificate) -> tuple( # default is 'credentials/development/cd-certs'. -class TC_DA_1_2(MatterBaseTest): +class TC_DA_1_2(MatterBaseTest, BasicCompositionTests): def desc_TC_DA_1_2(self): return "Device Attestation Request Validation [DUT - Commissionee]" @@ -164,6 +165,11 @@ def steps_TC_DA_1_2(self): async def test_TC_DA_1_2(self): is_ci = self.check_pics('PICS_SDK_CI_ONLY') cd_cert_dir = self.user_params.get("cd_cert_dir", 'credentials/development/cd-certs') + post_cert_test = self.user_params.get("post_cert_test", False) + + do_test_over_pase = self.user_params.get("use_pase_only", False) + if do_test_over_pase: + self.connect_over_pase(self.default_controller) # Commissioning - done self.step(0) @@ -308,7 +314,9 @@ async def test_TC_DA_1_2(self): self.step("6.8") asserts.assert_in(version_number, range(0, 65535), "Version number out of range") self.step("6.9") - if is_ci: + if post_cert_test: + asserts.assert_equal(certification_type, 2, "Certification declaration is not marked as production.") + elif is_ci: asserts.assert_in(certification_type, [0, 1, 2], "Certification type is out of range") else: asserts.assert_in(certification_type, [1, 2], "Certification type is out of range") @@ -392,7 +400,7 @@ async def test_TC_DA_1_2(self): self.mark_current_step_skipped() self.step(12) - proxy = self.default_controller.GetConnectedDeviceSync(self.dut_node_id, False) + proxy = self.default_controller.GetConnectedDeviceSync(self.dut_node_id, do_test_over_pase) asserts.assert_equal(len(proxy.attestationChallenge), 16, "Attestation challenge is the wrong length") attestation_tbs = elements + proxy.attestationChallenge diff --git a/src/python_testing/basic_composition_support.py b/src/python_testing/basic_composition_support.py index 299b7da194b1bb..8cc958a7207b3d 100644 --- a/src/python_testing/basic_composition_support.py +++ b/src/python_testing/basic_composition_support.py @@ -98,6 +98,11 @@ def ConvertValue(value) -> Any: class BasicCompositionTests: + def connect_over_pase(self, dev_ctrl): + setupCode = self.matter_test_config.qr_code_content if self.matter_test_config.qr_code_content is not None else self.matter_test_config.manual_code + asserts.assert_true(setupCode, "Require either --qr-code or --manual-code.") + dev_ctrl.FindOrEstablishPASESession(setupCode, self.dut_node_id) + def dump_wildcard(self, dump_device_composition_path: typing.Optional[str]): node_dump_dict = {endpoint_id: MatterTlvToJson(self.endpoints_tlv[endpoint_id]) for endpoint_id in self.endpoints_tlv} logging.debug(f"Raw TLV contents of Node: {json.dumps(node_dump_dict, indent=2)}") @@ -116,10 +121,8 @@ async def setup_class_helper(self, default_to_pase: bool = True): dump_device_composition_path: Optional[str] = self.user_params.get("dump_device_composition_path", None) if do_test_over_pase: - setupCode = self.matter_test_config.qr_code_content if self.matter_test_config.qr_code_content is not None else self.matter_test_config.manual_code - asserts.assert_true(setupCode, "Require either --qr-code or --manual-code.") + self.connect_over_pase(dev_ctrl) node_id = self.dut_node_id - dev_ctrl.EstablishPASESession(setupCode, node_id) else: # Using the already commissioned node node_id = self.dut_node_id diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py index e0acefd1c61782..9fad2a0d316178 100644 --- a/src/python_testing/matter_testing_support.py +++ b/src/python_testing/matter_testing_support.py @@ -1670,7 +1670,7 @@ def run_tests_no_exit(test_class: MatterBaseTest, matter_test_config: MatterTest if hooks: # Right now, we only support running a single test class at once, - # but it's relatively easy to exapand that to make the test process faster + # but it's relatively easy to expand that to make the test process faster # TODO: support a list of tests hooks.start(count=1) # Mobly gives the test run time in seconds, lets be a bit more precise diff --git a/src/python_testing/post_certification_tests/production_device_checks.py b/src/python_testing/post_certification_tests/production_device_checks.py new file mode 100644 index 00000000000000..0e8fd617c44110 --- /dev/null +++ b/src/python_testing/post_certification_tests/production_device_checks.py @@ -0,0 +1,477 @@ +# +# Copyright (c) 2024 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# This test is used to evaluate that all the proper post-certification +# work has been done to make a Matter device production ready. +# This test ensure that: +# - DAC chain is valid and spec compliant, and chains up to a PAA that +# is registered in the main net DCL +# - CD is valid and, signed by one of the known CSA signing certs and +# is marked as a production CD +# - DCL entries for this device and vendor have all been registered +# - DCL OTA entries have proper sizes and checksums +# - TestEventTriggers have been turned off +# +# This test is performed over PASE on a factory reset device. +# +# To run this test, first build and install the python chip wheel +# files, then add the extra dependencies. From the root: +# +# . scripts/activate.sh +# ./scripts/build_python.sh -i py +# source py/bin/activate +# pip install opencv-python requests click_option_group +# python src/python_testing/post_certification_tests/production_device_checks.py + +import base64 +import hashlib +import importlib +import logging +import os +import shutil +import sys +import time +import typing +import uuid +from dataclasses import dataclass +from enum import Enum, auto +from pathlib import Path + +import chip.clusters as Clusters +import cv2 +import requests +from mobly import asserts + +DEFAULT_CHIP_ROOT = os.path.abspath( + os.path.join(os.path.dirname(__file__), '..', '..', '..')) + +try: + from basic_composition_support import BasicCompositionTests + from matter_testing_support import (MatterBaseTest, MatterStackState, MatterTestConfig, TestStep, async_test_body, + run_tests_no_exit) +except ImportError: + sys.path.append(os.path.abspath( + os.path.join(os.path.dirname(__file__), '..'))) + from basic_composition_support import BasicCompositionTests + from matter_testing_support import (MatterBaseTest, MatterStackState, MatterTestConfig, TestStep, async_test_body, + run_tests_no_exit) + +try: + import fetch_paa_certs_from_dcl +except ImportError: + sys.path.append(os.path.abspath( + os.path.join(DEFAULT_CHIP_ROOT, 'credentials'))) + import fetch_paa_certs_from_dcl + + +@dataclass +class Failure: + step: str + exception: typing.Optional[Exception] + + +class Hooks(): + def __init__(self): + self.failures = {} + self.current_step = 'unknown' + self.current_test = 'unknown' + + def start(self, count: int): + pass + + def stop(self, duration: int): + pass + + def test_start(self, filename: str, name: str, count: int): + self.current_test = name + pass + + def test_stop(self, exception: Exception, duration: int): + # Exception is the test assertion that caused the failure + if exception: + self.failures[self.current_test].exception = exception + + def step_skipped(self, name: str, expression: str): + pass + + def step_start(self, name: str): + self.current_step = name + + def step_success(self, logger, logs, duration: int, request): + pass + + def step_failure(self, logger, logs, duration: int, request, received): + self.failures[self.current_test] = Failure(self.current_step, None) + + def step_unknown(self): + pass + + def get_failures(self) -> list[str]: + return self.failures + + +class TestEventTriggersCheck(MatterBaseTest, BasicCompositionTests): + @async_test_body + async def test_TestEventTriggersCheck(self): + self.connect_over_pase(self.default_controller) + gd = Clusters.GeneralDiagnostics + ret = await self.read_single_attribute_check_success(cluster=gd, attribute=gd.Attributes.TestEventTriggersEnabled) + asserts.assert_equal(ret, 0, "TestEventTriggers are still on") + + +class DclCheck(MatterBaseTest, BasicCompositionTests): + @async_test_body + async def setup_class(self): + self.connect_over_pase(self.default_controller) + bi = Clusters.BasicInformation + self.vid = await self.read_single_attribute_check_success(cluster=bi, attribute=bi.Attributes.VendorID) + self.pid = await self.read_single_attribute_check_success(cluster=bi, attribute=bi.Attributes.ProductID) + self.software_version = await self.read_single_attribute_check_success(cluster=bi, attribute=bi.Attributes.SoftwareVersion) + self.url = fetch_paa_certs_from_dcl.PRODUCTION_NODE_URL_REST + + self.vid_str = f'vid = 0x{self.vid:04X}' + self.vid_pid_str = f'{self.vid_str} pid = 0x{self.pid:04X}' + self.vid_pid_sv_str = f'{self.vid_pid_str} software version = {self.software_version}' + + def steps_Vendor(self): + return [TestStep(1, "Check if device VID is listed in the DCL vendor schema", "Listing found")] + + def test_Vendor(self): + self.step(1) + entry = requests.get(f"{self.url}/dcl/vendorinfo/vendors/{self.vid}").json() + key = 'vendorInfo' + asserts.assert_true(key in entry.keys(), f"Unable to find vendor entry for {self.vid_str}") + logging.info(f'Found vendor key for {self.vid_str} in the DCL:') + logging.info(f'{entry[key]}') + + def steps_Model(self): + return [TestStep(1, "Check if device VID/PID are listed in the DCL model schema", "Listing found")] + + def test_Model(self): + self.step(1) + key = 'model' + entry = requests.get(f"{self.url}/dcl/model/models/{self.vid}/{self.pid}").json() + asserts.assert_true(key in entry.keys(), f"Unable to find model entry for {self.vid_pid_str}") + logging.info(f'Found model entry for {self.vid_pid_str} in the DCL:') + logging.info(f'{entry[key]}') + + def steps_Compliance(self): + return [TestStep(1, "Check if device VID/PID/SoftwareVersion are listed in the DCL compliance info schema", "Listing found")] + + def test_Compliance(self): + self.step(1) + key = 'complianceInfo' + entry = requests.get( + f"{self.url}/dcl/compliance/compliance-info/{self.vid}/{self.pid}/{self.software_version}/matter").json() + asserts.assert_true(key in entry.keys(), + f"Unable to find compliance entry for {self.vid_pid_sv_str}") + logging.info( + f'Found compliance info for {self.vid_pid_sv_str} in the DCL:') + logging.info(f'{entry[key]}') + + def steps_CertifiedModel(self): + return [TestStep(1, "Check if device VID/PID/SoftwareVersion are listed in the DCL certified model schema", "Listing found")] + + def test_CertifiedModel(self): + self.step(1) + key = 'certifiedModel' + entry = requests.get( + f"{self.url}/dcl/compliance/certified-models/{self.vid}/{self.pid}/{self.software_version}/matter").json() + asserts.assert_true(key in entry.keys(), + f"Unable to find certified model entry for {self.vid_pid_sv_str}") + logging.info( + f'Found certified model for {self.vid_pid_sv_str} in the DCL:') + logging.info(f'{entry[key]}') + + def steps_AllSoftwareVersions(self): + return [TestStep(1, "Query the version information for this software version", "DCL entry exists"), + TestStep(2, "For each valid software version with an OtaUrl, verify the OtaChecksumType is in the valid range and the OtaChecksum is a base64. If the softwareVersion matches the current softwareVersion on the device, ensure the entry is valid.", "OtaChecksum is base64 and OtaChecksumType is in the valid set")] + + def test_AllSoftwareVersions(self): + self.step(1) + versions_entry = requests.get(f"{self.url}/dcl/model/versions/{self.vid}/{self.pid}").json() + key_model_versions = 'modelVersions' + asserts.assert_true(key_model_versions in versions_entry.keys(), + f"Unable to find {key_model_versions} in software versions schema for {self.vid_pid_str}") + logging.info(f'Found version info for vid=0x{self.vid_pid_str} in the DCL:') + logging.info(f'{versions_entry[key_model_versions]}') + key_software_versions = 'softwareVersions' + asserts.assert_true(key_software_versions in versions_entry[key_model_versions].keys( + ), f"Unable to find {key_software_versions} in software versions schema for {self.vid_pid_str}") + + problems = [] + self.step(2) + for software_version in versions_entry[key_model_versions][key_software_versions]: + entry_wrapper = requests.get(f"{self.url}/dcl/model/versions/{self.vid}/{self.pid}/{software_version}").json() + key_model_version = 'modelVersion' + if key_model_version not in entry_wrapper: + problems.append( + f'Missing key {key_model_version} in entry for {self.vid_pid_str} software version={software_version}') + continue + logging.info(f'Found entry version entry for {self.vid_pid_str} software version={software_version}') + logging.info(entry_wrapper) + entry = entry_wrapper[key_model_version] + key_ota_url = 'otaUrl' + key_software_version_valid = 'softwareVersionValid' + key_ota_checksum = 'otaChecksum' + key_ota_checksum_type = 'otaChecksumType' + key_ota_file_size = 'otaFileSize' + + def check_key(key): + if key not in entry.keys(): + problems.append( + f'Missing key {key} in DCL versions entry for {self.vid_pid_str} software version={software_version}') + check_key(key_ota_url) + check_key(key_software_version_valid) + if entry[key_software_version_valid] and entry[key_ota_url]: + check_key(key_ota_checksum) + check_key(key_ota_checksum_type) + checksum_types = {1: hashlib.sha256, 7: hashlib.sha384, 8: hashlib.sha256, + 10: hashlib.sha3_256, 11: hashlib.sha3_384, 12: hashlib.sha3_512} + if entry[key_ota_checksum_type] not in checksum_types.keys(): + problems.append( + f'OtaChecksumType for entry {self.vid_pid_str} software version={software_version} is invalid. Found {entry[key_ota_checksum_type]} valid values: {checksum_types.keys()}') + continue + checksum = entry[key_ota_checksum] + try: + is_base64 = base64.b64encode(base64.b64decode(checksum)).decode('utf-8') == checksum + except (ValueError, TypeError): + is_base64 = False + if not is_base64: + problems.append( + f"Checksum {checksum} is not base64 encoded for for entry {self.vid_pid_str} software version={software_version}") + continue + + response = requests.get(entry[key_ota_url]) + if not response.ok: + problems.append( + f"Unable to get OTA object from {entry[key_ota_url]} for {self.vid_pid_str} software version = {software_version}") + continue + + ota_len = str(len(response.content)) + dcl_len = entry[key_ota_file_size] + if ota_len != dcl_len: + problems.append( + f'Incorrect OTA size for {self.vid_pid_str} software_version = {software_version}, received size: {len(response.content)} DCL states {entry[key_ota_file_size]}') + continue + + checksum = checksum_types[entry[key_ota_checksum_type]](response.content).digest() + dcl_checksum = base64.b64decode(entry[key_ota_checksum]) + if checksum != dcl_checksum: + problems.append( + f'Incorrect checksum for {self.vid_pid_str} software version = {software_version}, calculated: {checksum}, DCL: {dcl_checksum}') + + msg = 'Problems found in software version DCL checks:\n' + for problem in problems: + msg += f'{problem}\n' + asserts.assert_false(problems, msg) + + +def get_qr() -> str: + qr_code_detector = cv2.QRCodeDetector() + camera_id = 0 + video_capture = cv2.VideoCapture(camera_id) + window_name = 'Post-certification check QR code reader' + qr = '' + while not qr: + ret, frame = video_capture.read() + if ret: + ret_qr, decoded_info, points, _ = qr_code_detector.detectAndDecodeMulti( + frame) + if ret_qr: + for s, p in zip(decoded_info, points): + if s and s.startswith("MT:"): + qr = s + color = (0, 255, 0) + else: + color = (0, 0, 255) + frame = cv2.polylines( + frame, [p.astype(int)], True, color, 8) + cv2.imshow(window_name, frame) + + if (cv2.waitKey(1) & 0xFF == ord('q')): + break + if qr: + time.sleep(1) + break + + cv2.destroyWindow(window_name) + return qr + + +class SetupCodeType(Enum): + UNKNOWN = auto(), + QR = auto(), + MANUAL = auto(), + + +def get_setup_code() -> (str, bool): + ''' Returns the setup code and an enum indicating the code type.''' + while True: + print('Press q for qr code or m for manual code') + pref = input() + if pref in ['q', 'Q']: + return (get_qr(), SetupCodeType.QR) + elif pref in ['m', 'M']: + print('please enter manual code') + m = input() + m = ''.join([i for i in m if m.isnumeric()]) + if len(m) == 11 or len(m) == 21: + return (m, SetupCodeType.MANUAL) + else: + print("Invalid manual code - please try again") + + +class TestConfig(object): + def __init__(self, code: str, code_type: SetupCodeType): + tmp_uuid = str(uuid.uuid4()) + tmpdir_paa = f'paas_{tmp_uuid}' + tmpdir_cd = f'cd_{tmp_uuid}' + self.paa_path = os.path.join('.', tmpdir_paa) + self.cd_path = os.path.join('.', tmpdir_cd) + os.mkdir(self.paa_path) + os.mkdir(self.cd_path) + fetch_paa_certs_from_dcl.fetch_paa_certs(use_main_net_dcld='', use_test_net_dcld='', + use_main_net_http=True, use_test_net_http=False, paa_trust_store_path=tmpdir_paa) + fetch_paa_certs_from_dcl.fetch_cd_signing_certs(tmpdir_cd) + self.admin_storage = f'admin_storage_{tmp_uuid}.json' + global_test_params = {'use_pase_only': True, 'post_cert_test': True} + self.config = MatterTestConfig(endpoint=0, dut_node_ids=[ + 1], global_test_params=global_test_params, storage_path=self.admin_storage) + if code_type == SetupCodeType.QR: + self.config.qr_code_content = code + else: + self.config.manual_code = code + self.config.paa_trust_store_path = Path(self.paa_path) + # Set for DA-1.2, which uses the CD signing certs for verification. This test is now set to use the production CD signing certs from the DCL. + self.config.global_test_params['cd_cert_dir'] = tmpdir_cd + self.stack = MatterStackState(self.config) + self.default_controller = self.stack.certificate_authorities[0].adminList[0].NewController( + nodeId=112233, + paaTrustStorePath=str(self.config.paa_trust_store_path) + ) + + def get_stack(self): + return self.stack + + def get_controller(self): + return self.default_controller + + def get_config(self, tests: list[str]): + self.config.tests = tests + return self.config + + def __enter__(self): + return self + + def __exit__(self, *args): + self.default_controller.Shutdown() + self.stack.Shutdown() + os.remove(self.admin_storage) + shutil.rmtree(self.paa_path) + shutil.rmtree(self.cd_path) + + +def run_test(test_class: MatterBaseTest, tests: typing.List[str], test_config: TestConfig) -> list[str]: + hooks = Hooks() + stack = test_config.get_stack() + controller = test_config.get_controller() + matter_config = test_config.get_config(tests) + ok = run_tests_no_exit(test_class, matter_config, hooks, controller, stack) + if not ok: + print(f"Test failure. Failed on step: {hooks.get_failures()}") + return hooks.get_failures() + + +def run_cert_test(test: str, test_config: TestConfig) -> list[str]: + ''' Runs the specified test, returns a list of failures''' + # for simplicity and because I know the tests we're running follow this pattern, + # just assume the naming convention based off the base name - ie, file and class + # share a name, test is test_classname + module = importlib.import_module(test) + test_class = getattr(module, test) + return run_test(test_class, [f'test_{test}'], test_config) + + +def main(): + code, code_type = get_setup_code() + with TestConfig(code, code_type) as test_config: + # DA-1.2 is a test of the certification declaration + failures_DA_1_2 = run_cert_test('TC_DA_1_2', test_config) + # DA-1.7 is a test of the DAC chain (up to a PAA in the given directory) + failures_DA_1_7 = run_cert_test('TC_DA_1_7', test_config) + + failures_test_event_trigger = run_test(TestEventTriggersCheck, ['test_TestEventTriggersCheck'], test_config) + + # [] means all tests. + failures_dcl = run_test(DclCheck, [], test_config) + + report = [] + for test, failure in failures_DA_1_2.items(): + # Check for known failures first + # step 6.9 - non-production CD + # 9 - not signed by CSA CA + # other steps - should have been caught in cert, but we should report none the less + if failure.step.startswith('6.9'): + report.append('Device is using a non-production certification declaration') + elif failure.step.startswith('9'): + report.append('Device is using a certification declaration that was not signed by the CSA CA') + else: + report.append(f'Device attestation failure: TC-DA-1.2: {failure.step}') + report.append(f'\t{str(failure.exception)}\n') + + for test, failure in failures_DA_1_7.items(): + # Notable failures in DA-1.7: + # 1.3 - PAI signature does not chain to a PAA in the main net DCL + if failure.step.startswith('1.3'): + report.append('Device DAC chain does not chain to a PAA in the main net DCL') + else: + report.append(f'Device attestation failure: TC-DA-1.7: {failure.step}') + report.append(f'\t{str(failure.exception)}\n') + + for test, failure in failures_test_event_trigger.items(): + # only one possible failure here + report.append('Device has test event triggers enabled in production') + report.append(f'\t{str(failure.exception)}\n') + + for test, failure in failures_dcl.items(): + if test == 'test_Vendor': + report.append('Device vendor ID is not present in the DCL') + elif test == 'test_Model': + report.append('Device model is not present in the DCL') + elif test == 'test_Compliance': + report.append('Device compliance information is not present in the DCL') + elif test == 'test_CertifiedModel': + report.append('Device certified model is not present in the DCL') + elif test == 'test_AllSoftwareVersions': + report.append('Problems with device software version in the DCL') + else: + report.append(f'unknown DCL failure in test {test}: {failure.step}') + report.append(f'\t{str(failure.exception)}\n') + + print('\n\n\n') + if report: + print('TEST FAILED:') + for s in report: + print(f'\t{s}') + return 1 + else: + print('TEST PASSED!') + return 0 + + +if __name__ == "__main__": + sys.exit(main())