From cf964954c11271a54352799dc589f47d7f71d4aa Mon Sep 17 00:00:00 2001 From: cecille Date: Fri, 12 Apr 2024 16:27:22 -0400 Subject: [PATCH 01/13] Post certification checks script This test is used to evaluate that all the proper post-certification work has been done to make a Matter device production ready. This test ensure that: - DAC chain is valid and spec compliant, and chains up to a PAA that is registered in the main net DCL - CD is valid and, signed by one of the known CSA signing certs and is marked as a production CD - DCL entries for this device and vendor have all been registered - TestEventTriggers have been turned off This test is performed over PASE on a factory reset device. To run this test, first build and install the python chip wheel files, then add the extra dependencies. From the root: ./scripts/build_python.sh -i py source py/bin/activate pip install opencv-python requests click_option_group --- ...rom-dcl.py => fetch_paa_certs_from_dcl.py} | 66 +++- src/controller/python/chip/ChipDeviceCtrl.py | 35 +- src/python_testing/TC_DA_1_2.py | 9 +- .../basic_composition_support.py | 9 +- src/python_testing/matter_testing_support.py | 2 +- .../post-cert-checks.py | 342 ++++++++++++++++++ 6 files changed, 437 insertions(+), 26 deletions(-) rename credentials/{fetch-paa-certs-from-dcl.py => fetch_paa_certs_from_dcl.py} (70%) create mode 100644 src/python_testing/post_certification_tests/post-cert-checks.py diff --git a/credentials/fetch-paa-certs-from-dcl.py b/credentials/fetch_paa_certs_from_dcl.py similarity index 70% rename from credentials/fetch-paa-certs-from-dcl.py rename to credentials/fetch_paa_certs_from_dcl.py index d440398c472be7..68ecf19fa35b80 100644 --- a/credentials/fetch-paa-certs-from-dcl.py +++ b/credentials/fetch_paa_certs_from_dcl.py @@ -37,6 +37,10 @@ PRODUCTION_NODE_URL_REST = "https://on.dcl.csa-iot.org" TEST_NODE_URL_REST = "https://on.test-net.dcl.csa-iot.org" +# TODO: really? We can't just get this by name from the DCL? +MATTER_CERT_CA_SUBJECT = "MFIxDDAKBgNVBAoMA0NTQTEsMCoGA1UEAwwjTWF0dGVyIENlcnRpZmljYXRpb24gYW5kIFRlc3RpbmcgQ0ExFDASBgorBgEEAYKifAIBDARDNUEw" +MATTER_CERT_CA_SUBJECT_KEY_ID = "97:E4:69:D0:C5:04:14:C2:6F:C7:01:F7:7E:94:77:39:09:8D:F6:A5" + def parse_paa_root_certs(cmdpipe, paa_list): """ @@ -73,13 +77,14 @@ def parse_paa_root_certs(cmdpipe, paa_list): else: if b': ' in line: key, value = line.split(b': ') - result[key.strip(b' -').decode("utf-8")] = value.strip().decode("utf-8") + result[key.strip(b' -').decode("utf-8") + ] = value.strip().decode("utf-8") parse_paa_root_certs.counter += 1 if parse_paa_root_certs.counter % 2 == 0: paa_list.append(copy.deepcopy(result)) -def write_paa_root_cert(certificate, subject): +def write_cert(certificate, subject): filename = 'dcld_mirror_' + \ re.sub('[^a-zA-Z0-9_-]', '', re.sub('[=, ]', '_', subject)) with open(filename + '.pem', 'w+') as outfile: @@ -93,7 +98,8 @@ def write_paa_root_cert(certificate, subject): serialization.Encoding.DER) outfile.write(der_certificate) except (IOError, ValueError) as e: - print(f"ERROR: Failed to convert {filename + '.pem'}: {str(e)}. Skipping...") + print( + f"ERROR: Failed to convert {filename + '.pem'}: {str(e)}. Skipping...") def parse_paa_root_cert_from_dcld(cmdpipe): @@ -133,7 +139,37 @@ def use_dcld(dcld, production, cmdlist): @optgroup.option('--paa-trust-store-path', default='paa-root-certs', type=str, metavar='PATH', help="PAA trust store path (default: paa-root-certs)") def main(use_main_net_dcld, use_test_net_dcld, use_main_net_http, use_test_net_http, paa_trust_store_path): """DCL PAA mirroring tools""" + fetch_paa_certs(use_main_net_dcld, use_test_net_dcld, use_main_net_http, use_test_net_http, paa_trust_store_path) + + +def get_cert_from_rest(rest_node_url, subject, subject_key_id): + response = requests.get( + f"{rest_node_url}/dcl/pki/certificates/{subject}/{subject_key_id}").json()["approvedCertificates"]["certs"][0] + certificate = response["pemCert"].rstrip("\n") + subject = response["subjectAsText"] + return certificate, subject + + +def fetch_cd_signing_certs(store_path): + ''' Only supports using main net http currently.''' + rest_node_url = PRODUCTION_NODE_URL_REST + os.makedirs(store_path, exist_ok=True) + original_dir = os.getcwd() + os.chdir(store_path) + cd_signer_ids = requests.get(f"{rest_node_url}/dcl/pki/child-certificates/{MATTER_CERT_CA_SUBJECT}/{MATTER_CERT_CA_SUBJECT_KEY_ID}").json()['childCertificates']['certIds'] + for signer in cd_signer_ids: + subject = signer['subject'] + subject_key_id = signer['subjectKeyId'] + certificate, subject = get_cert_from_rest(rest_node_url, subject, subject_key_id) + + print(f"Downloaded CD signing cert with subject: {subject}") + write_cert(certificate, subject) + + os.chdir(original_dir) + + +def fetch_paa_certs(use_main_net_dcld, use_test_net_dcld, use_main_net_http, use_test_net_http, paa_trust_store_path): production = False dcld = use_test_net_dcld @@ -148,14 +184,17 @@ def main(use_main_net_dcld, use_test_net_dcld, use_main_net_http, use_test_net_h rest_node_url = PRODUCTION_NODE_URL_REST if production else TEST_NODE_URL_REST os.makedirs(paa_trust_store_path, exist_ok=True) + original_dir = os.getcwd() os.chdir(paa_trust_store_path) if use_rest: - paa_list = requests.get(f"{rest_node_url}/dcl/pki/root-certificates").json()["approvedRootCertificates"]["certs"] + paa_list = requests.get( + f"{rest_node_url}/dcl/pki/root-certificates").json()["approvedRootCertificates"]["certs"] else: cmdlist = ['query', 'pki', 'all-x509-root-certs'] - cmdpipe = subprocess.Popen(use_dcld(dcld, production, cmdlist), stdout=subprocess.PIPE, stderr=subprocess.PIPE) + cmdpipe = subprocess.Popen(use_dcld( + dcld, production, cmdlist), stdout=subprocess.PIPE, stderr=subprocess.PIPE) paa_list = [] parse_paa_root_certs.counter = 0 @@ -163,21 +202,22 @@ def main(use_main_net_dcld, use_test_net_dcld, use_main_net_http, use_test_net_h for paa in paa_list: if use_rest: - response = requests.get( - f"{rest_node_url}/dcl/pki/certificates/{paa['subject']}/{paa['subjectKeyId']}").json()["approvedCertificates"]["certs"][0] - certificate = response["pemCert"] - subject = response["subjectAsText"] + certificate, subject = get_cert_from_rest(rest_node_url, paa['subject'], paa['subjectKeyId']) else: - cmdlist = ['query', 'pki', 'x509-cert', '-u', paa['subject'], '-k', paa['subjectKeyId']] + cmdlist = ['query', 'pki', 'x509-cert', '-u', + paa['subject'], '-k', paa['subjectKeyId']] - cmdpipe = subprocess.Popen(use_dcld(dcld, production, cmdlist), stdout=subprocess.PIPE, stderr=subprocess.PIPE) + cmdpipe = subprocess.Popen(use_dcld( + dcld, production, cmdlist), stdout=subprocess.PIPE, stderr=subprocess.PIPE) (certificate, subject) = parse_paa_root_cert_from_dcld(cmdpipe) certificate = certificate.rstrip('\n') - print(f"Downloaded certificate with subject: {subject}") - write_paa_root_cert(certificate, subject) + print(f"Downloaded PAA certificate with subject: {subject}") + write_cert(certificate, subject) + + os.chdir(original_dir) if __name__ == "__main__": diff --git a/src/controller/python/chip/ChipDeviceCtrl.py b/src/controller/python/chip/ChipDeviceCtrl.py index 4e57e3d6044e67..09717df34792ba 100644 --- a/src/controller/python/chip/ChipDeviceCtrl.py +++ b/src/controller/python/chip/ChipDeviceCtrl.py @@ -178,13 +178,18 @@ class DeviceProxyWrapper(): that is not an issue that needs to be accounted for and it will become very apparent if that happens. ''' + class DeviceProxyType(enum.Enum): + OPERATIONAL = enum.auto(), + COMMISSIONEE = enum.auto(), - def __init__(self, deviceProxy: ctypes.c_void_p, dmLib=None): + def __init__(self, deviceProxy: ctypes.c_void_p, proxyType, dmLib=None): self._deviceProxy = deviceProxy self._dmLib = dmLib + self._proxyType = proxyType def __del__(self): - if (self._dmLib is not None and hasattr(builtins, 'chipStack') and builtins.chipStack is not None): + # Commissionee device proxies are owned by the DeviceCommissioner. See #33031 + if (self._proxyType == self.DeviceProxyType.OPERATIONAL and self.self._dmLib is not None and hasattr(builtins, 'chipStack') and builtins.chipStack is not None): # This destructor is called from any threading context, including on the Matter threading context. # So, we cannot call chipStack.Call or chipStack.CallAsync which waits for the posted work to # actually be executed. Instead, we just post/schedule the work and move on. @@ -787,7 +792,23 @@ def GetClusterHandler(self): return self._Cluster - def GetConnectedDeviceSync(self, nodeid, allowPASE: bool = True, timeoutMs: int = None): + def FindOrEstablishPASESession(self, setupCode: str, nodeid: int, timeoutMs: int = None) -> typing.Optional[DeviceProxyWrapper]: + ''' Returns CommissioneeDeviceProxy if we can find or establish a PASE connection to the specified device''' + self.CheckIsActive() + returnDevice = c_void_p(None) + res = self._ChipStack.Call(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned( + self.devCtrl, nodeid, byref(returnDevice)), timeoutMs) + if res.is_success: + return DeviceProxyWrapper(returnDevice, DeviceProxyWrapper.DeviceProxyType.COMMISSIONEE, self._dmLib) + + self.EstablishPASESession(setupCode, nodeid) + + res = self._ChipStack.Call(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned( + self.devCtrl, nodeid, byref(returnDevice)), timeoutMs) + if res.is_success: + return DeviceProxyWrapper(returnDevice, DeviceProxyWrapper.DeviceProxyType.COMMISSIONEE, self._dmLib) + + def GetConnectedDeviceSync(self, nodeid, allowPASE=True, timeoutMs: int = None): ''' Gets an OperationalDeviceProxy or CommissioneeDeviceProxy for the specified Node. nodeId: Target's Node ID @@ -808,7 +829,7 @@ def GetConnectedDeviceSync(self, nodeid, allowPASE: bool = True, timeoutMs: int self.devCtrl, nodeid, byref(returnDevice)), timeoutMs) if res.is_success: logging.info('Using PASE connection') - return DeviceProxyWrapper(returnDevice) + return DeviceProxyWrapper(returnDevice, DeviceProxyWrapper.DeviceProxyType.COMMISSIONEE, self._dmLib) class DeviceAvailableClosure(): def deviceAvailable(self, device, err): @@ -842,7 +863,7 @@ def deviceAvailable(self, device, err): if returnDevice.value is None: returnErr.raise_on_error() - return DeviceProxyWrapper(returnDevice, self._dmLib) + return DeviceProxyWrapper(returnDevice, DeviceProxyWrapper.DeviceProxyType.OPERATIONAL, self._dmLib) async def GetConnectedDevice(self, nodeid, allowPASE: bool = True, timeoutMs: int = None): ''' Gets an OperationalDeviceProxy or CommissioneeDeviceProxy for the specified Node. @@ -862,7 +883,7 @@ async def GetConnectedDevice(self, nodeid, allowPASE: bool = True, timeoutMs: in self.devCtrl, nodeid, byref(returnDevice)), timeoutMs) if res.is_success: logging.info('Using PASE connection') - return DeviceProxyWrapper(returnDevice) + return DeviceProxyWrapper(returnDevice, DeviceProxyWrapper.DeviceProxyType.COMMISSIONEE, self._dmLib) eventLoop = asyncio.get_running_loop() future = eventLoop.create_future() @@ -900,7 +921,7 @@ def deviceAvailable(self, device, err): else: await future - return DeviceProxyWrapper(future.result(), self._dmLib) + return DeviceProxyWrapper(future.result(), DeviceProxyWrapper.DeviceProxyType.OPERATIONAL, self._dmLib) def ComputeRoundTripTimeout(self, nodeid, upperLayerProcessingTimeoutMs: int = 0): ''' Returns a computed timeout value based on the round-trip time it takes for the peer at the other end of the session to diff --git a/src/python_testing/TC_DA_1_2.py b/src/python_testing/TC_DA_1_2.py index 100f5813778966..d28ba42883974f 100644 --- a/src/python_testing/TC_DA_1_2.py +++ b/src/python_testing/TC_DA_1_2.py @@ -19,6 +19,7 @@ import random import chip.clusters as Clusters +from basic_composition_support import BasicCompositionTests from chip.interaction_model import InteractionModelError, Status from chip.tlv import TLVReader from cryptography import x509 @@ -103,7 +104,7 @@ def parse_ids_from_certs(dac: x509.Certificate, pai: x509.Certificate) -> tuple( # default is 'credentials/development/cd-certs'. -class TC_DA_1_2(MatterBaseTest): +class TC_DA_1_2(MatterBaseTest, BasicCompositionTests): def desc_TC_DA_1_2(self): return "Device Attestation Request Validation [DUT - Commissionee]" @@ -164,6 +165,10 @@ async def test_TC_DA_1_2(self): is_ci = self.check_pics('PICS_SDK_CI_ONLY') cd_cert_dir = self.user_params.get("cd_cert_dir", 'credentials/development/cd-certs') + do_test_over_pase = self.user_params.get("use_pase_only", False) + if do_test_over_pase: + self.connect_over_pase(self.default_controller) + # Commissioning - done self.step(0) @@ -391,7 +396,7 @@ async def test_TC_DA_1_2(self): self.mark_current_step_skipped() self.step(12) - proxy = self.default_controller.GetConnectedDeviceSync(self.dut_node_id, False) + proxy = self.default_controller.GetConnectedDeviceSync(self.dut_node_id, do_test_over_pase) asserts.assert_equal(len(proxy.attestationChallenge), 16, "Attestation challenge is the wrong length") attestation_tbs = elements + proxy.attestationChallenge diff --git a/src/python_testing/basic_composition_support.py b/src/python_testing/basic_composition_support.py index 4a516f67ecbc97..c5d8f092ea3450 100644 --- a/src/python_testing/basic_composition_support.py +++ b/src/python_testing/basic_composition_support.py @@ -97,6 +97,11 @@ def ConvertValue(value) -> Any: class BasicCompositionTests: + def connect_over_pase(self, dev_ctrl): + setupCode = self.matter_test_config.qr_code_content if self.matter_test_config.qr_code_content is not None else self.matter_test_config.manual_code + asserts.assert_true(setupCode, "Require either --qr-code or --manual-code.") + dev_ctrl.FindOrEstablishPASESession(setupCode, self.dut_node_id) + async def setup_class_helper(self, default_to_pase: bool = True): dev_ctrl = self.default_controller self.problems = [] @@ -105,10 +110,8 @@ async def setup_class_helper(self, default_to_pase: bool = True): dump_device_composition_path: Optional[str] = self.user_params.get("dump_device_composition_path", None) if do_test_over_pase: - setupCode = self.matter_test_config.qr_code_content if self.matter_test_config.qr_code_content is not None else self.matter_test_config.manual_code - asserts.assert_true(setupCode, "Require either --qr-code or --manual-code.") + self.connect_over_pase(dev_ctrl) node_id = self.dut_node_id - dev_ctrl.EstablishPASESession(setupCode, node_id) else: # Using the already commissioned node node_id = self.dut_node_id diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py index 4861cae7ea8c61..a8f01b1dd018eb 100644 --- a/src/python_testing/matter_testing_support.py +++ b/src/python_testing/matter_testing_support.py @@ -1713,7 +1713,7 @@ def run_tests_no_exit(test_class: MatterBaseTest, matter_test_config: MatterTest if hooks: # Right now, we only support running a single test class at once, - # but it's relatively easy to exapand that to make the test process faster + # but it's relatively easy to expand that to make the test process faster # TODO: support a list of tests hooks.start(count=1) # Mobly gives the test run time in seconds, lets be a bit more precise diff --git a/src/python_testing/post_certification_tests/post-cert-checks.py b/src/python_testing/post_certification_tests/post-cert-checks.py new file mode 100644 index 00000000000000..436672910ad0f1 --- /dev/null +++ b/src/python_testing/post_certification_tests/post-cert-checks.py @@ -0,0 +1,342 @@ +# +# Copyright (c) 2024 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# This test is used to evaluate that all the proper post-certification +# work has been done to make a Matter device production ready. +# This test ensure that: +# - DAC chain is valid and spec compliant, and chains up to a PAA that +# is registered in the main net DCL +# - CD is valid and, signed by one of the known CSA signing certs and +# is marked as a production CD +# - DCL entries for this device and vendor have all been registered +# - TestEventTriggers have been turned off +# +# This test is performed over PASE on a factory reset device. +# +# To run this test, first build and install the python chip wheel +# files, then add the extra dependencies. From the root: +# +# ./scripts/build_python.sh -i py +# source py/bin/activate +# pip install opencv-python requests click_option_group + + +import importlib +import logging +import os +import shutil +import sys +import time +import typing +import uuid +from dataclasses import dataclass +from enum import Enum, auto +from pathlib import Path + +import chip.clusters as Clusters +import cv2 +import requests +from mobly import asserts, signals + +DEFAULT_CHIP_ROOT = os.path.abspath( + os.path.join(os.path.dirname(__file__), '..', '..', '..')) + +try: + from basic_composition_support import BasicCompositionTests + from matter_testing_support import (MatterBaseTest, MatterStackState, MatterTestConfig, TestStep, async_test_body, + run_tests_no_exit) +except ImportError: + sys.path.append(os.path.abspath( + os.path.join(os.path.dirname(__file__), '..'))) + from basic_composition_support import BasicCompositionTests + from matter_testing_support import (MatterBaseTest, MatterStackState, MatterTestConfig, TestStep, async_test_body, + run_tests_no_exit) + +try: + import fetch_paa_certs_from_dcl +except ImportError: + sys.path.append(os.path.abspath( + os.path.join(DEFAULT_CHIP_ROOT, 'credentials'))) + import fetch_paa_certs_from_dcl + + +@dataclass +class Failure: + test: str + step: str + + +class Hooks(): + def __init__(self): + self.failures = [] + self.current_step = 'unknown' + self.current_test = 'unknown' + + def start(self, count: int): + pass + + def stop(self, duration: int): + pass + + def test_start(self, filename: str, name: str, count: int): + self.current_test = name + pass + + def test_stop(self, exception: Exception, duration: int): + pass + + def step_skipped(self, name: str, expression: str): + pass + + def step_start(self, name: str): + self.current_step = name + + def step_success(self, logger, logs, duration: int, request): + pass + + def step_failure(self, logger, logs, duration: int, request, received): + self.failures.append(Failure(self.current_test, self.current_step)) + + def step_unknown(self): + pass + + def get_failures(self) -> list[str]: + return self.failures + + +class TestEventTriggersCheck(MatterBaseTest, BasicCompositionTests): + @async_test_body + async def test_TestEventTriggersCheck(self): + self.connect_over_pase(self.default_controller) + gd = Clusters.GeneralDiagnostics + ret = await self.read_single_attribute_check_success(cluster=gd, attribute=gd.Attributes.TestEventTriggersEnabled) + asserts.assert_equal(ret, 0, "TestEventTriggers are still on") + + +def get_dcl_vendor(vid): + return requests.get(f"{fetch_paa_certs_from_dcl.PRODUCTION_NODE_URL_REST}/dcl/vendorinfo/vendors/{vid}").json() + + +def get_dcl_model(vid, pid): + return requests.get(f"{fetch_paa_certs_from_dcl.PRODUCTION_NODE_URL_REST}/dcl/model/models/{vid}/{pid}").json() + + +def get_dcl_compliance_info(vid, pid, software_version): + return requests.get(f"{fetch_paa_certs_from_dcl.PRODUCTION_NODE_URL_REST}/dcl/compliance/compliance-info/{vid}/{pid}/{software_version}/matter").json() + + +def get_dcl_certified_model(vid, pid, software_version): + return requests.get(f"{fetch_paa_certs_from_dcl.PRODUCTION_NODE_URL_REST}/dcl/compliance/certified-models/{vid}/{pid}/{software_version}/matter").json() + + +class DclCheck(MatterBaseTest, BasicCompositionTests): + @async_test_body + async def setup_class(self): + self.connect_over_pase(self.default_controller) + bi = Clusters.BasicInformation + self.vid = await self.read_single_attribute_check_success(cluster=bi, attribute=bi.Attributes.VendorID) + self.pid = await self.read_single_attribute_check_success(cluster=bi, attribute=bi.Attributes.ProductID) + self.software_version = await self.read_single_attribute_check_success(cluster=bi, attribute=bi.Attributes.SoftwareVersion) + + def steps_Vendor(self): + return [TestStep(1, "Check if device VID is listed in the DCL vendor schema", "Listing found")] + + def test_Vendor(self): + self.step(1) + entry = get_dcl_vendor(self.vid) + key = 'vendorInfo' + asserts.assert_true(key in entry.keys(), f"Unable to find vendor entry for {self.vid:04x}") + logging.info(f'Found vendor key 0x{self.vid:04X} in DCL:') + logging.info(f'{entry[key]}') + + def steps_Model(self): + return [TestStep(1, "Check if device VID/PID are listed in the DCL model schema", "Listing found")] + + def test_Model(self): + self.step(1) + key = 'model' + entry = get_dcl_model(self.vid, self.pid) + asserts.assert_true(key in entry.keys(), f"Unable to find model entry for {self.vid:04x} {self.pid:04x}") + logging.info(f'Found model entry for vid=0x{self.vid:04X} pid=0x{self.pid:04X} in the DCL:') + logging.info(f'{entry[key]}') + + def steps_Compliance(self): + return [TestStep(1, "Check if device VID/PID/SoftwareVersion are listed in the DCL compliance info schema", "Listing found")] + + def test_Compliance(self): + self.step(1) + key = 'complianceInfo' + entry = get_dcl_compliance_info(self.vid, self.pid, self.software_version) + asserts.assert_true(key in entry.keys(), f"Unable to find compliance entry for {self.vid:04x} {self.pid:04x} {self.software_version}") + logging.info(f'Found compliance info for vid=0x{self.vid:04X} pid=0x{self.pid:04X} software version={self.software_version} in the DCL:') + logging.info(f'{entry[key]}') + + def steps_CertifiedModel(self): + return [TestStep(1, "Check if device VID/PID/SoftwareVersion are listed in the DCL certified model schema", "Listing found")] + + def test_CertifiedModel(self): + self.step(1) + key = 'certifiedModel' + entry = get_dcl_certified_model(self.vid, self.pid, self.software_version) + asserts.assert_true(key in entry.keys(), f"Unable to find certified model entry for {self.vid:04x} {self.pid:04x} {self.software_version}") + logging.info(f'Found certified model for vid=0x{self.vid:04X} pid=0x{self.pid:04X} software version={self.software_version} in the DCL:') + logging.info(f'{entry[key]}') + + +def get_qr() -> str: + qr_code_detector = cv2.QRCodeDetector() + camera_id = 0 + video_capture = cv2.VideoCapture(camera_id) + window_name = 'Post-certification check QR code reader' + qr = '' + while not qr: + ret, frame = video_capture.read() + if ret: + ret_qr, decoded_info, points, _ = qr_code_detector.detectAndDecodeMulti( + frame) + if ret_qr: + for s, p in zip(decoded_info, points): + if s and s.startswith("MT:"): + qr = s + color = (0, 255, 0) + else: + color = (0, 0, 255) + frame = cv2.polylines( + frame, [p.astype(int)], True, color, 8) + cv2.imshow(window_name, frame) + + if (cv2.waitKey(1) & 0xFF == ord('q')): + break + if qr: + time.sleep(1) + break + + cv2.destroyWindow(window_name) + return qr + + +class SetupCodeType(Enum): + UNKNOWN = auto(), + QR = auto(), + MANUAL = auto(), + + +def get_setup_code() -> (str, bool): + ''' Returns the setup code and an enum indicating the code type.''' + while True: + print('Press q for qr code or m for manual code') + pref = input() + if pref in ['q', 'Q']: + return (get_qr(), SetupCodeType.QR) + elif pref in ['m', 'M']: + print('please enter manual code') + m = input() + m = ''.join([i for i in m if m.isnumeric()]) + if len(m) == 11 or len(m) == 21: + return (m, SetupCodeType.MANUAL) + else: + print("Invalid manual code - please try again") + + +class TestConfig(object): + def __init__(self, code: str, code_type: SetupCodeType): + tmp_uuid = str(uuid.uuid4()) + tmpdir_paa = f'paas_{tmp_uuid}' + tmpdir_cd = f'cd_{tmp_uuid}' + self.paa_path = os.path.join('.', tmpdir_paa) + self.cd_path = os.path.join('.', tmpdir_cd) + os.mkdir(self.paa_path) + os.mkdir(self.cd_path) + fetch_paa_certs_from_dcl.fetch_paa_certs(use_main_net_dcld='', use_test_net_dcld='', use_main_net_http=True, use_test_net_http=False, paa_trust_store_path=tmpdir_paa) + fetch_paa_certs_from_dcl.fetch_cd_signing_certs(tmpdir_cd) + self.admin_storage = f'admin_storage_{tmp_uuid}.json' + global_test_params = {'use_pase_only': True, 'post_cert_test': True} + self.config = MatterTestConfig(endpoint=0, dut_node_ids=[1], global_test_params=global_test_params, storage_path=self.admin_storage) + if code_type == SetupCodeType.QR: + self.config.qr_code_content = code + else: + self.config.manual_code = code + self.config.paa_trust_store_path = Path(self.paa_path) + # Set for DA-1.2, which uses the CD signing certs for verification. This test is now set to use the production CD signing certs from the DCL. + self.config.global_test_params['cd_cert_dir'] = tmpdir_cd + self.stack = MatterStackState(self.config) + self.default_controller = self.stack.certificate_authorities[0].adminList[0].NewController( + nodeId=112233, + paaTrustStorePath=str(self.config.paa_trust_store_path) + ) + + def get_stack(self): + return self.stack + + def get_controller(self): + return self.default_controller + + def get_config(self, tests: list[str]): + self.config.tests = tests + return self.config + + def __enter__(self): + return self + + def __exit__(self, *args): + self.default_controller.Shutdown() + self.stack.Shutdown() + os.remove(self.admin_storage) + shutil.rmtree(self.paa_path) + shutil.rmtree(self.cd_path) + + +def run_test(test_class: MatterBaseTest, tests: typing.List[str], test_config: TestConfig) -> list[str]: + hooks = Hooks() + stack = test_config.get_stack() + controller = test_config.get_controller() + matter_config = test_config.get_config(tests) + ok = run_tests_no_exit(test_class, matter_config, hooks, controller, stack) + if not ok: + print(f"Test failure. Failed on step: {hooks.get_failures()}") + return hooks.get_failures() + + +def run_cert_test(test: str, test_config: TestConfig) -> list[str]: + ''' Runs the specified test, returns a list of failures''' + # for simplicity and because I know the tests we're running follow this pattern, + # just assume the naming convention based off the base name - ie, file and class + # share a name, test is test_classname + module = importlib.import_module(test) + test_class = getattr(module, test) + return run_test(test_class, [f'test_{test}'], test_config) + + +def main(): + code, code_type = get_setup_code() + with TestConfig(code, code_type) as test_config: + # DA-1.2 is a test of the certification declaration + failures_DA_1_2 = run_cert_test('TC_DA_1_2', test_config) + # DA-1.7 is a test of the DAC chain (up to a PAA in the given directory) + failures_DA_1_7 = run_cert_test('TC_DA_1_7', test_config) + + failures_test_event_trigger = run_test(TestEventTriggersCheck, ['test_TestEventTriggersCheck'], test_config) + + failures_dcl = run_test(DclCheck, ['test_Vendor', 'test_Model', 'test_Compliance', 'test_CertifiedModel'], test_config) + + failures = failures_DA_1_2 + failures_DA_1_7 + failures_test_event_trigger + failures_dcl + + print(failures) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) From a5a8ee034131e7db7324c028d47602362ba60013 Mon Sep 17 00:00:00 2001 From: "Restyled.io" Date: Wed, 1 May 2024 14:48:16 +0000 Subject: [PATCH 02/13] Restyled by autopep8 --- credentials/fetch_paa_certs_from_dcl.py | 3 ++- .../post-cert-checks.py | 18 ++++++++++++------ 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/credentials/fetch_paa_certs_from_dcl.py b/credentials/fetch_paa_certs_from_dcl.py index 68ecf19fa35b80..5fe776d090721e 100644 --- a/credentials/fetch_paa_certs_from_dcl.py +++ b/credentials/fetch_paa_certs_from_dcl.py @@ -157,7 +157,8 @@ def fetch_cd_signing_certs(store_path): original_dir = os.getcwd() os.chdir(store_path) - cd_signer_ids = requests.get(f"{rest_node_url}/dcl/pki/child-certificates/{MATTER_CERT_CA_SUBJECT}/{MATTER_CERT_CA_SUBJECT_KEY_ID}").json()['childCertificates']['certIds'] + cd_signer_ids = requests.get( + f"{rest_node_url}/dcl/pki/child-certificates/{MATTER_CERT_CA_SUBJECT}/{MATTER_CERT_CA_SUBJECT_KEY_ID}").json()['childCertificates']['certIds'] for signer in cd_signer_ids: subject = signer['subject'] subject_key_id = signer['subjectKeyId'] diff --git a/src/python_testing/post_certification_tests/post-cert-checks.py b/src/python_testing/post_certification_tests/post-cert-checks.py index 436672910ad0f1..e06a7861d2b0f4 100644 --- a/src/python_testing/post_certification_tests/post-cert-checks.py +++ b/src/python_testing/post_certification_tests/post-cert-checks.py @@ -180,8 +180,10 @@ def test_Compliance(self): self.step(1) key = 'complianceInfo' entry = get_dcl_compliance_info(self.vid, self.pid, self.software_version) - asserts.assert_true(key in entry.keys(), f"Unable to find compliance entry for {self.vid:04x} {self.pid:04x} {self.software_version}") - logging.info(f'Found compliance info for vid=0x{self.vid:04X} pid=0x{self.pid:04X} software version={self.software_version} in the DCL:') + asserts.assert_true(key in entry.keys(), + f"Unable to find compliance entry for {self.vid:04x} {self.pid:04x} {self.software_version}") + logging.info( + f'Found compliance info for vid=0x{self.vid:04X} pid=0x{self.pid:04X} software version={self.software_version} in the DCL:') logging.info(f'{entry[key]}') def steps_CertifiedModel(self): @@ -191,8 +193,10 @@ def test_CertifiedModel(self): self.step(1) key = 'certifiedModel' entry = get_dcl_certified_model(self.vid, self.pid, self.software_version) - asserts.assert_true(key in entry.keys(), f"Unable to find certified model entry for {self.vid:04x} {self.pid:04x} {self.software_version}") - logging.info(f'Found certified model for vid=0x{self.vid:04X} pid=0x{self.pid:04X} software version={self.software_version} in the DCL:') + asserts.assert_true(key in entry.keys(), + f"Unable to find certified model entry for {self.vid:04x} {self.pid:04x} {self.software_version}") + logging.info( + f'Found certified model for vid=0x{self.vid:04X} pid=0x{self.pid:04X} software version={self.software_version} in the DCL:') logging.info(f'{entry[key]}') @@ -260,11 +264,13 @@ def __init__(self, code: str, code_type: SetupCodeType): self.cd_path = os.path.join('.', tmpdir_cd) os.mkdir(self.paa_path) os.mkdir(self.cd_path) - fetch_paa_certs_from_dcl.fetch_paa_certs(use_main_net_dcld='', use_test_net_dcld='', use_main_net_http=True, use_test_net_http=False, paa_trust_store_path=tmpdir_paa) + fetch_paa_certs_from_dcl.fetch_paa_certs(use_main_net_dcld='', use_test_net_dcld='', + use_main_net_http=True, use_test_net_http=False, paa_trust_store_path=tmpdir_paa) fetch_paa_certs_from_dcl.fetch_cd_signing_certs(tmpdir_cd) self.admin_storage = f'admin_storage_{tmp_uuid}.json' global_test_params = {'use_pase_only': True, 'post_cert_test': True} - self.config = MatterTestConfig(endpoint=0, dut_node_ids=[1], global_test_params=global_test_params, storage_path=self.admin_storage) + self.config = MatterTestConfig(endpoint=0, dut_node_ids=[ + 1], global_test_params=global_test_params, storage_path=self.admin_storage) if code_type == SetupCodeType.QR: self.config.qr_code_content = code else: From 7384e5bd37cbf6ffc60e5101f5133d57f2a30cbd Mon Sep 17 00:00:00 2001 From: cecille Date: Wed, 1 May 2024 11:32:12 -0400 Subject: [PATCH 03/13] linter --- src/python_testing/post_certification_tests/post-cert-checks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python_testing/post_certification_tests/post-cert-checks.py b/src/python_testing/post_certification_tests/post-cert-checks.py index e06a7861d2b0f4..07ee3a51e809be 100644 --- a/src/python_testing/post_certification_tests/post-cert-checks.py +++ b/src/python_testing/post_certification_tests/post-cert-checks.py @@ -49,7 +49,7 @@ import chip.clusters as Clusters import cv2 import requests -from mobly import asserts, signals +from mobly import asserts DEFAULT_CHIP_ROOT = os.path.abspath( os.path.join(os.path.dirname(__file__), '..', '..', '..')) From 45efbe8bae9171d1c68c841a236896f81b748619 Mon Sep 17 00:00:00 2001 From: cecille Date: Wed, 1 May 2024 15:15:36 -0400 Subject: [PATCH 04/13] Add a production CD check to DA-1.7 --- src/python_testing/TC_DA_1_2.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/python_testing/TC_DA_1_2.py b/src/python_testing/TC_DA_1_2.py index d28ba42883974f..4ac3b007c59e76 100644 --- a/src/python_testing/TC_DA_1_2.py +++ b/src/python_testing/TC_DA_1_2.py @@ -164,6 +164,7 @@ def steps_TC_DA_1_2(self): async def test_TC_DA_1_2(self): is_ci = self.check_pics('PICS_SDK_CI_ONLY') cd_cert_dir = self.user_params.get("cd_cert_dir", 'credentials/development/cd-certs') + self.post_cert_test = self.user_params.get("post_cert_test", False) do_test_over_pase = self.user_params.get("use_pase_only", False) if do_test_over_pase: @@ -312,7 +313,9 @@ async def test_TC_DA_1_2(self): self.step("6.8") asserts.assert_in(version_number, range(0, 65535), "Version number out of range") self.step("6.9") - if is_ci: + if post_cert_test: + asserts.assert_equal(certification_type, 2, "Certification declaration is not marked as production.") + elif is_ci: asserts.assert_in(certification_type, [0, 1, 2], "Certification type is out of range") else: asserts.assert_in(certification_type, [1, 2], "Certification type is out of range") From bc73bc849c54993db8b1dd8c74e11d60c5ca6f37 Mon Sep 17 00:00:00 2001 From: cecille Date: Wed, 1 May 2024 15:16:02 -0400 Subject: [PATCH 05/13] report out results better --- .../post-cert-checks.py | 49 ++++++++++++++++++- 1 file changed, 47 insertions(+), 2 deletions(-) diff --git a/src/python_testing/post_certification_tests/post-cert-checks.py b/src/python_testing/post_certification_tests/post-cert-checks.py index 07ee3a51e809be..05381edffd4729 100644 --- a/src/python_testing/post_certification_tests/post-cert-checks.py +++ b/src/python_testing/post_certification_tests/post-cert-checks.py @@ -340,8 +340,53 @@ def main(): failures = failures_DA_1_2 + failures_DA_1_7 + failures_test_event_trigger + failures_dcl - print(failures) - return 0 + report = [] + for failure in failures_DA_1_2: + # Check for known failures first + # step 6.9 - non-production CD + # 9 - not signed by CSA CA + # other steps - should have been caught in cert, but we should report none the less + if failure.step.startswith('6.9'): + report.append('Device is using a non-production certification declaration') + continue + if failure.step.startswith('9'): + report.append('Device is using a certification declaration that was not signed by the CSA CA') + continue + report.append(f'Device attestation failure: TC-DA-1.2: {failure.step}') + + for failure in failures_DA_1_7: + # Notable failures in DA-1.7: + # 1.3 - PAI signature does not chain to a PAA in the main net DCL + if failure.step.startswith('1.3'): + report.append('Device DAC chain does not chain to a PAA in the main net DCL') + continue + report.append(f'Device attestation failure: TC-DA-1.7: {failure.step}') + + for failure in failures_test_event_trigger: + # only one possible failure here + report.append('Device has test event triggers enabled in production') + + for failure in failures_dcl: + if failure.test == 'test_Vendor': + report.append('Device vendor ID is not present in the DCL') + elif failure.test == 'test_Model': + report.append('Device model is not present in the DCL') + elif failure.test == 'test_Compliance': + report.append('Device compliance information is not present in the DCL') + elif failure.test == 'test_CertifiedModel': + report.append('Device certified model is not present in the DCL') + else: + report.append(f'unknown DCL failure in test {failure.test}: {failure.step}') + + print('\n\n\n') + if report: + print('TEST FAILED:') + for s in report: + print(f'\t{s}') + return 1 + else: + print('TEST PASSED!') + return 0 if __name__ == "__main__": From f3f393c6673739ee12b757feae302f19420ca33f Mon Sep 17 00:00:00 2001 From: Cecille Freeman Date: Wed, 1 May 2024 15:20:42 -0400 Subject: [PATCH 06/13] fix post cert check in 1.2 --- src/python_testing/TC_DA_1_2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python_testing/TC_DA_1_2.py b/src/python_testing/TC_DA_1_2.py index 4ac3b007c59e76..799d2c53cf40be 100644 --- a/src/python_testing/TC_DA_1_2.py +++ b/src/python_testing/TC_DA_1_2.py @@ -164,7 +164,7 @@ def steps_TC_DA_1_2(self): async def test_TC_DA_1_2(self): is_ci = self.check_pics('PICS_SDK_CI_ONLY') cd_cert_dir = self.user_params.get("cd_cert_dir", 'credentials/development/cd-certs') - self.post_cert_test = self.user_params.get("post_cert_test", False) + post_cert_test = self.user_params.get("post_cert_test", False) do_test_over_pase = self.user_params.get("use_pase_only", False) if do_test_over_pase: From 1f279c045e8b47004af378dfc07df8ebd168feaf Mon Sep 17 00:00:00 2001 From: Cecille Freeman Date: Thu, 2 May 2024 10:01:48 -0400 Subject: [PATCH 07/13] Add a check for software versions NOTE: not done yet because as it turns out hex strings appear to be valid base64. I mean, they're garbage, but this test won't find that. Need additional tests to detect hex vs. base64 --- .../post-cert-checks.py | 113 ++++++++++++------ 1 file changed, 79 insertions(+), 34 deletions(-) diff --git a/src/python_testing/post_certification_tests/post-cert-checks.py b/src/python_testing/post_certification_tests/post-cert-checks.py index 05381edffd4729..451d38aa8ee086 100644 --- a/src/python_testing/post_certification_tests/post-cert-checks.py +++ b/src/python_testing/post_certification_tests/post-cert-checks.py @@ -33,7 +33,7 @@ # source py/bin/activate # pip install opencv-python requests click_option_group - +import base64 import importlib import logging import os @@ -75,13 +75,13 @@ @dataclass class Failure: - test: str step: str + exception: typing.Optional[Exception] class Hooks(): def __init__(self): - self.failures = [] + self.failures = {} self.current_step = 'unknown' self.current_test = 'unknown' @@ -96,7 +96,9 @@ def test_start(self, filename: str, name: str, count: int): pass def test_stop(self, exception: Exception, duration: int): - pass + # Exception is the test assertion that caused the failure + if exception: + self.failures[self.current_test].exception = exception def step_skipped(self, name: str, expression: str): pass @@ -108,7 +110,7 @@ def step_success(self, logger, logs, duration: int, request): pass def step_failure(self, logger, logs, duration: int, request, received): - self.failures.append(Failure(self.current_test, self.current_step)) + self.failures[self.current_test] = Failure(self.current_step) def step_unknown(self): pass @@ -126,22 +128,6 @@ async def test_TestEventTriggersCheck(self): asserts.assert_equal(ret, 0, "TestEventTriggers are still on") -def get_dcl_vendor(vid): - return requests.get(f"{fetch_paa_certs_from_dcl.PRODUCTION_NODE_URL_REST}/dcl/vendorinfo/vendors/{vid}").json() - - -def get_dcl_model(vid, pid): - return requests.get(f"{fetch_paa_certs_from_dcl.PRODUCTION_NODE_URL_REST}/dcl/model/models/{vid}/{pid}").json() - - -def get_dcl_compliance_info(vid, pid, software_version): - return requests.get(f"{fetch_paa_certs_from_dcl.PRODUCTION_NODE_URL_REST}/dcl/compliance/compliance-info/{vid}/{pid}/{software_version}/matter").json() - - -def get_dcl_certified_model(vid, pid, software_version): - return requests.get(f"{fetch_paa_certs_from_dcl.PRODUCTION_NODE_URL_REST}/dcl/compliance/certified-models/{vid}/{pid}/{software_version}/matter").json() - - class DclCheck(MatterBaseTest, BasicCompositionTests): @async_test_body async def setup_class(self): @@ -150,13 +136,17 @@ async def setup_class(self): self.vid = await self.read_single_attribute_check_success(cluster=bi, attribute=bi.Attributes.VendorID) self.pid = await self.read_single_attribute_check_success(cluster=bi, attribute=bi.Attributes.ProductID) self.software_version = await self.read_single_attribute_check_success(cluster=bi, attribute=bi.Attributes.SoftwareVersion) + self.url = fetch_paa_certs_from_dcl.PRODUCTION_NODE_URL_REST + self.vid = 0x6006 + self.pid = 1 + self.software_version = 1300 def steps_Vendor(self): return [TestStep(1, "Check if device VID is listed in the DCL vendor schema", "Listing found")] def test_Vendor(self): self.step(1) - entry = get_dcl_vendor(self.vid) + entry = requests.get(f"{self.url}/dcl/vendorinfo/vendors/{self.vid}").json() key = 'vendorInfo' asserts.assert_true(key in entry.keys(), f"Unable to find vendor entry for {self.vid:04x}") logging.info(f'Found vendor key 0x{self.vid:04X} in DCL:') @@ -168,7 +158,7 @@ def steps_Model(self): def test_Model(self): self.step(1) key = 'model' - entry = get_dcl_model(self.vid, self.pid) + entry = requests.get(f"{self.url}/dcl/model/models/{self.vid}/{self.pid}").json() asserts.assert_true(key in entry.keys(), f"Unable to find model entry for {self.vid:04x} {self.pid:04x}") logging.info(f'Found model entry for vid=0x{self.vid:04X} pid=0x{self.pid:04X} in the DCL:') logging.info(f'{entry[key]}') @@ -179,7 +169,7 @@ def steps_Compliance(self): def test_Compliance(self): self.step(1) key = 'complianceInfo' - entry = get_dcl_compliance_info(self.vid, self.pid, self.software_version) + entry = requests.get(f"{self.url}/dcl/compliance/compliance-info/{self.vid}/{self.pid}/{self.software_version}/matter").json() asserts.assert_true(key in entry.keys(), f"Unable to find compliance entry for {self.vid:04x} {self.pid:04x} {self.software_version}") logging.info( @@ -192,13 +182,66 @@ def steps_CertifiedModel(self): def test_CertifiedModel(self): self.step(1) key = 'certifiedModel' - entry = get_dcl_certified_model(self.vid, self.pid, self.software_version) + entry = requests.get(f"{self.url}/dcl/compliance/certified-models/{self.vid}/{self.pid}/{self.software_version}/matter").json() asserts.assert_true(key in entry.keys(), f"Unable to find certified model entry for {self.vid:04x} {self.pid:04x} {self.software_version}") logging.info( f'Found certified model for vid=0x{self.vid:04X} pid=0x{self.pid:04X} software version={self.software_version} in the DCL:') logging.info(f'{entry[key]}') + def steps_AllSoftwareVersions(self): + return [TestStep(1, "Query the version information for this software version", "DCL entry exists"), + TestStep(2, "For each valid software version with an OtaUrl, verify the OtaChecksumType is in the valid range and the OtaChecksum is a base64. If the softwareVersion matches the current softwareVersion on the device, ensure the entry is valid.", "OtaChecksum is base64 and OtaChecksumType is in the valid set")] + def test_AllSoftwareVersions(self): + self.step(1) + versions_entry = requests.get(f"{self.url}/dcl/model/versions/{self.vid}/{self.pid}").json() + key_model_versions = 'modelVersions' + asserts.assert_true(key_model_versions in versions_entry.keys(), f"Unable to find {key_model_versions} in software versions schema for vid=0x{self.vid:04X} pid=0x{self.pid:04X}") + logging.info( + f'Found version info for vid=0x{self.vid:04X} pid=0x{self.pid:04X} in the DCL:') + logging.info(f'{versions_entry[key_model_versions]}') + key_software_versions = 'softwareVersions' + asserts.assert_true(key_software_versions in versions_entry[key_model_versions].keys(), f"Unable to find {key_software_versions} in software versions schema for vid=0x{self.vid:04X} pid=0x{self.pid:04X}") + + problems = [] + self.step(2) + for software_version in versions_entry[key_model_versions][key_software_versions]: + entry_wrapper = requests.get(f"{self.url}/dcl/model/versions/{self.vid}/{self.pid}/{software_version}").json() + key_model_version = 'modelVersion' + if key_model_version not in entry_wrapper: + problems.append(f'Missing key {key_model_version} in entry for vid=0x{self.vid:04X} pid=0x{self.pid:04X} software version={software_version}') + continue + logging.info(f'Found entry version entry for vid=0x{self.vid:04X} pid=0x{self.pid:04X} software version={software_version}') + logging.info(entry_wrapper) + entry = entry_wrapper[key_model_version] + key_ota_url = 'otaUrl' + key_software_version_valid = 'softwareVersionValid' + key_ota_checksum = 'otaChecksum' + key_ota_checksum_type = 'otaChecksumType' + def check_key(key): + if key not in entry.keys(): + problems.append(f'Missing key {key} in DCL versions entry for vid=0x{self.vid:04X} pid=0x{self.pid:04X} software version={software_version}') + check_key(key_ota_url) + check_key(key_software_version_valid) + if entry[key_software_version_valid] and entry[key_ota_url]: + check_key(key_ota_checksum) + check_key(key_ota_checksum_type) + valid_checksum_types = [1, 7, 8, 10, 11, 12] + if entry[key_ota_checksum_type] not in valid_checksum_types: + problems.append(f'OtaChecksumType for entry vid=0x{self.vid:04X} pid=0x{self.pid:04X} software version={software_version} is invalid. Found {entry[key_ota_checksum_type]} valid values: {valid_checksum_types}') + checksum = entry[key_ota_checksum] + try: + is_base64 = base64.b64encode(base64.b64decode(checksum)).decode('utf-8') == checksum + except (ValueError, TypeError): + is_base64 = False + if not is_base64: + problems.append(f"Checksum {checksum} is not base64 encoded for for entry vid=0x{self.vid:04X} pid=0x{self.pid:04X} software version={software_version}") + #TODO: download and actually checksum it? Maybe just for the current version? And size check? + msg = 'Problems found in software version DCL checks:\n' + for problem in problems: + msg += f'{problem}\n' + asserts.assert_false(problems, msg) + def get_qr() -> str: qr_code_detector = cv2.QRCodeDetector() @@ -336,9 +379,7 @@ def main(): failures_test_event_trigger = run_test(TestEventTriggersCheck, ['test_TestEventTriggersCheck'], test_config) - failures_dcl = run_test(DclCheck, ['test_Vendor', 'test_Model', 'test_Compliance', 'test_CertifiedModel'], test_config) - - failures = failures_DA_1_2 + failures_DA_1_7 + failures_test_event_trigger + failures_dcl + failures_dcl = run_test(DclCheck, ['test_Vendor', 'test_Model', 'test_Compliance', 'test_CertifiedModel', 'test_AllSoftwareVersions'], test_config) report = [] for failure in failures_DA_1_2: @@ -366,17 +407,21 @@ def main(): # only one possible failure here report.append('Device has test event triggers enabled in production') - for failure in failures_dcl: - if failure.test == 'test_Vendor': + for test, failure in failures_dcl.items(): + if test == 'test_Vendor': report.append('Device vendor ID is not present in the DCL') - elif failure.test == 'test_Model': + elif test == 'test_Model': report.append('Device model is not present in the DCL') - elif failure.test == 'test_Compliance': + elif test == 'test_Compliance': report.append('Device compliance information is not present in the DCL') - elif failure.test == 'test_CertifiedModel': + elif test == 'test_CertifiedModel': report.append('Device certified model is not present in the DCL') + elif test == 'test_AllSoftwareVersions': + report.append('Problems with device software version in the DCL') else: - report.append(f'unknown DCL failure in test {failure.test}: {failure.step}') + report.append(f'unknown DCL failure in test {test}: {failure.step}') + report.append('\n') + report.append(str(failure.exception)) print('\n\n\n') if report: From a9edbf490cff417e509a253b73290a00f5eaceec Mon Sep 17 00:00:00 2001 From: Cecille Freeman Date: Fri, 3 May 2024 12:13:04 -0400 Subject: [PATCH 08/13] fix accidental checkin --- .../post_certification_tests/post-cert-checks.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/python_testing/post_certification_tests/post-cert-checks.py b/src/python_testing/post_certification_tests/post-cert-checks.py index 451d38aa8ee086..c309f7ff8c767a 100644 --- a/src/python_testing/post_certification_tests/post-cert-checks.py +++ b/src/python_testing/post_certification_tests/post-cert-checks.py @@ -137,9 +137,6 @@ async def setup_class(self): self.pid = await self.read_single_attribute_check_success(cluster=bi, attribute=bi.Attributes.ProductID) self.software_version = await self.read_single_attribute_check_success(cluster=bi, attribute=bi.Attributes.SoftwareVersion) self.url = fetch_paa_certs_from_dcl.PRODUCTION_NODE_URL_REST - self.vid = 0x6006 - self.pid = 1 - self.software_version = 1300 def steps_Vendor(self): return [TestStep(1, "Check if device VID is listed in the DCL vendor schema", "Listing found")] From e0d463483f70618bb42cbec26592d451808afab8 Mon Sep 17 00:00:00 2001 From: cecille Date: Mon, 6 May 2024 17:11:33 -0400 Subject: [PATCH 09/13] Proper checksum check on the downloaded OTA --- .../post-cert-checks.py | 93 ++++++++++++------- 1 file changed, 60 insertions(+), 33 deletions(-) diff --git a/src/python_testing/post_certification_tests/post-cert-checks.py b/src/python_testing/post_certification_tests/post-cert-checks.py index c309f7ff8c767a..18dd9ae788db02 100644 --- a/src/python_testing/post_certification_tests/post-cert-checks.py +++ b/src/python_testing/post_certification_tests/post-cert-checks.py @@ -34,6 +34,7 @@ # pip install opencv-python requests click_option_group import base64 +import hashlib import importlib import logging import os @@ -110,7 +111,7 @@ def step_success(self, logger, logs, duration: int, request): pass def step_failure(self, logger, logs, duration: int, request, received): - self.failures[self.current_test] = Failure(self.current_step) + self.failures[self.current_test] = Failure(self.current_step, None) def step_unknown(self): pass @@ -138,6 +139,10 @@ async def setup_class(self): self.software_version = await self.read_single_attribute_check_success(cluster=bi, attribute=bi.Attributes.SoftwareVersion) self.url = fetch_paa_certs_from_dcl.PRODUCTION_NODE_URL_REST + self.vid_str = f'vid = 0x{self.vid:04X}' + self.vid_pid_str = f'{self.vid_str} pid = 0x{self.pid:04X}' + self.vid_pid_sv_str = f'{self.vid_pid_str} software version = {self.software_version}' + def steps_Vendor(self): return [TestStep(1, "Check if device VID is listed in the DCL vendor schema", "Listing found")] @@ -145,8 +150,8 @@ def test_Vendor(self): self.step(1) entry = requests.get(f"{self.url}/dcl/vendorinfo/vendors/{self.vid}").json() key = 'vendorInfo' - asserts.assert_true(key in entry.keys(), f"Unable to find vendor entry for {self.vid:04x}") - logging.info(f'Found vendor key 0x{self.vid:04X} in DCL:') + asserts.assert_true(key in entry.keys(), f"Unable to find vendor entry for {self.vid_str}") + logging.info(f'Found vendor key for {self.vid_str} in the DCL:') logging.info(f'{entry[key]}') def steps_Model(self): @@ -156,8 +161,8 @@ def test_Model(self): self.step(1) key = 'model' entry = requests.get(f"{self.url}/dcl/model/models/{self.vid}/{self.pid}").json() - asserts.assert_true(key in entry.keys(), f"Unable to find model entry for {self.vid:04x} {self.pid:04x}") - logging.info(f'Found model entry for vid=0x{self.vid:04X} pid=0x{self.pid:04X} in the DCL:') + asserts.assert_true(key in entry.keys(), f"Unable to find model entry for {self.vid_pid_str}") + logging.info(f'Found model entry for {self.vid_pid_str} in the DCL:') logging.info(f'{entry[key]}') def steps_Compliance(self): @@ -168,9 +173,9 @@ def test_Compliance(self): key = 'complianceInfo' entry = requests.get(f"{self.url}/dcl/compliance/compliance-info/{self.vid}/{self.pid}/{self.software_version}/matter").json() asserts.assert_true(key in entry.keys(), - f"Unable to find compliance entry for {self.vid:04x} {self.pid:04x} {self.software_version}") + f"Unable to find compliance entry for {self.vid_pid_sv_str}") logging.info( - f'Found compliance info for vid=0x{self.vid:04X} pid=0x{self.pid:04X} software version={self.software_version} in the DCL:') + f'Found compliance info for {self.vid_pid_sv_str} in the DCL:') logging.info(f'{entry[key]}') def steps_CertifiedModel(self): @@ -181,24 +186,24 @@ def test_CertifiedModel(self): key = 'certifiedModel' entry = requests.get(f"{self.url}/dcl/compliance/certified-models/{self.vid}/{self.pid}/{self.software_version}/matter").json() asserts.assert_true(key in entry.keys(), - f"Unable to find certified model entry for {self.vid:04x} {self.pid:04x} {self.software_version}") + f"Unable to find certified model entry for {self.vid_pid_sv_str}") logging.info( - f'Found certified model for vid=0x{self.vid:04X} pid=0x{self.pid:04X} software version={self.software_version} in the DCL:') + f'Found certified model for {self.vid_pid_sv_str} in the DCL:') logging.info(f'{entry[key]}') def steps_AllSoftwareVersions(self): return [TestStep(1, "Query the version information for this software version", "DCL entry exists"), TestStep(2, "For each valid software version with an OtaUrl, verify the OtaChecksumType is in the valid range and the OtaChecksum is a base64. If the softwareVersion matches the current softwareVersion on the device, ensure the entry is valid.", "OtaChecksum is base64 and OtaChecksumType is in the valid set")] + def test_AllSoftwareVersions(self): self.step(1) versions_entry = requests.get(f"{self.url}/dcl/model/versions/{self.vid}/{self.pid}").json() key_model_versions = 'modelVersions' - asserts.assert_true(key_model_versions in versions_entry.keys(), f"Unable to find {key_model_versions} in software versions schema for vid=0x{self.vid:04X} pid=0x{self.pid:04X}") - logging.info( - f'Found version info for vid=0x{self.vid:04X} pid=0x{self.pid:04X} in the DCL:') + asserts.assert_true(key_model_versions in versions_entry.keys(), f"Unable to find {key_model_versions} in software versions schema for {self.vid_pid_str}") + logging.info(f'Found version info for vid=0x{self.vid_pid_str} in the DCL:') logging.info(f'{versions_entry[key_model_versions]}') key_software_versions = 'softwareVersions' - asserts.assert_true(key_software_versions in versions_entry[key_model_versions].keys(), f"Unable to find {key_software_versions} in software versions schema for vid=0x{self.vid:04X} pid=0x{self.pid:04X}") + asserts.assert_true(key_software_versions in versions_entry[key_model_versions].keys(), f"Unable to find {key_software_versions} in software versions schema for {self.vid_pid_str}") problems = [] self.step(2) @@ -206,34 +211,54 @@ def test_AllSoftwareVersions(self): entry_wrapper = requests.get(f"{self.url}/dcl/model/versions/{self.vid}/{self.pid}/{software_version}").json() key_model_version = 'modelVersion' if key_model_version not in entry_wrapper: - problems.append(f'Missing key {key_model_version} in entry for vid=0x{self.vid:04X} pid=0x{self.pid:04X} software version={software_version}') + problems.append(f'Missing key {key_model_version} in entry for {self.vid_pid_str} software version={software_version}') continue - logging.info(f'Found entry version entry for vid=0x{self.vid:04X} pid=0x{self.pid:04X} software version={software_version}') + logging.info(f'Found entry version entry for {self.vid_pid_str} software version={software_version}') logging.info(entry_wrapper) entry = entry_wrapper[key_model_version] key_ota_url = 'otaUrl' key_software_version_valid = 'softwareVersionValid' key_ota_checksum = 'otaChecksum' key_ota_checksum_type = 'otaChecksumType' + key_ota_file_size = 'otaFileSize' + def check_key(key): if key not in entry.keys(): - problems.append(f'Missing key {key} in DCL versions entry for vid=0x{self.vid:04X} pid=0x{self.pid:04X} software version={software_version}') + problems.append(f'Missing key {key} in DCL versions entry for {self.vid_pid_str} software version={software_version}') check_key(key_ota_url) check_key(key_software_version_valid) if entry[key_software_version_valid] and entry[key_ota_url]: check_key(key_ota_checksum) check_key(key_ota_checksum_type) - valid_checksum_types = [1, 7, 8, 10, 11, 12] - if entry[key_ota_checksum_type] not in valid_checksum_types: - problems.append(f'OtaChecksumType for entry vid=0x{self.vid:04X} pid=0x{self.pid:04X} software version={software_version} is invalid. Found {entry[key_ota_checksum_type]} valid values: {valid_checksum_types}') + checksum_types = {1: hashlib.sha256, 7: hashlib.sha384, 8: hashlib.sha256, 10: hashlib.sha3_256, 11: hashlib.sha3_384, 12: hashlib.sha3_512} + if entry[key_ota_checksum_type] not in checksum_types.keys(): + problems.append(f'OtaChecksumType for entry {self.vid_pid_str} software version={software_version} is invalid. Found {entry[key_ota_checksum_type]} valid values: {checksum_types.keys()}') + continue checksum = entry[key_ota_checksum] try: is_base64 = base64.b64encode(base64.b64decode(checksum)).decode('utf-8') == checksum except (ValueError, TypeError): is_base64 = False if not is_base64: - problems.append(f"Checksum {checksum} is not base64 encoded for for entry vid=0x{self.vid:04X} pid=0x{self.pid:04X} software version={software_version}") - #TODO: download and actually checksum it? Maybe just for the current version? And size check? + problems.append(f"Checksum {checksum} is not base64 encoded for for entry {self.vid_pid_str} software version={software_version}") + continue + + response = requests.get(entry[key_ota_url]) + if not response.ok: + problems.append(f"Unable to get OTA object from {entry[key_ota_url]} for {self.vid_pid_str} software version = {software_version}") + continue + + ota_len = str(len(response.content)) + dcl_len = entry[key_ota_file_size] + if ota_len != dcl_len: + problems.append(f'Incorrect OTA size for {self.vid_pid_str} software_version = {software_version}, received size: {len(response.content)} DCL states {entry[key_ota_file_size]}') + continue + + checksum = checksum_types[entry[key_ota_checksum_type]](response.content).digest() + dcl_checksum = base64.b64decode(entry[key_ota_checksum]) + if checksum != dcl_checksum: + problems.append(f'Incorrect checksum for {self.vid_pid_str} software version = {software_version}, calculated: {checksum}, DCL: {dcl_checksum}') + msg = 'Problems found in software version DCL checks:\n' for problem in problems: msg += f'{problem}\n' @@ -376,33 +401,36 @@ def main(): failures_test_event_trigger = run_test(TestEventTriggersCheck, ['test_TestEventTriggersCheck'], test_config) - failures_dcl = run_test(DclCheck, ['test_Vendor', 'test_Model', 'test_Compliance', 'test_CertifiedModel', 'test_AllSoftwareVersions'], test_config) + # [] means all tests. + failures_dcl = run_test(DclCheck, [], test_config) report = [] - for failure in failures_DA_1_2: + for test, failure in failures_DA_1_2.items(): # Check for known failures first # step 6.9 - non-production CD # 9 - not signed by CSA CA # other steps - should have been caught in cert, but we should report none the less if failure.step.startswith('6.9'): report.append('Device is using a non-production certification declaration') - continue - if failure.step.startswith('9'): + elif failure.step.startswith('9'): report.append('Device is using a certification declaration that was not signed by the CSA CA') - continue - report.append(f'Device attestation failure: TC-DA-1.2: {failure.step}') + else: + report.append(f'Device attestation failure: TC-DA-1.2: {failure.step}') + report.append(f'\t{str(failure.exception)}\n') - for failure in failures_DA_1_7: + for test, failure in failures_DA_1_7.items(): # Notable failures in DA-1.7: # 1.3 - PAI signature does not chain to a PAA in the main net DCL if failure.step.startswith('1.3'): report.append('Device DAC chain does not chain to a PAA in the main net DCL') - continue - report.append(f'Device attestation failure: TC-DA-1.7: {failure.step}') + else: + report.append(f'Device attestation failure: TC-DA-1.7: {failure.step}') + report.append(f'\t{str(failure.exception)}\n') - for failure in failures_test_event_trigger: + for test, failure in failures_test_event_trigger.items(): # only one possible failure here report.append('Device has test event triggers enabled in production') + report.append(f'\t{str(failure.exception)}\n') for test, failure in failures_dcl.items(): if test == 'test_Vendor': @@ -417,8 +445,7 @@ def main(): report.append('Problems with device software version in the DCL') else: report.append(f'unknown DCL failure in test {test}: {failure.step}') - report.append('\n') - report.append(str(failure.exception)) + report.append(f'\t{str(failure.exception)}\n') print('\n\n\n') if report: From 14726c871a4286fd790661fb494091120b4b5ec2 Mon Sep 17 00:00:00 2001 From: C Freeman Date: Tue, 7 May 2024 12:37:08 -0400 Subject: [PATCH 10/13] Update credentials/fetch_paa_certs_from_dcl.py --- credentials/fetch_paa_certs_from_dcl.py | 1 - 1 file changed, 1 deletion(-) diff --git a/credentials/fetch_paa_certs_from_dcl.py b/credentials/fetch_paa_certs_from_dcl.py index 5fe776d090721e..61decd5aa3bfae 100644 --- a/credentials/fetch_paa_certs_from_dcl.py +++ b/credentials/fetch_paa_certs_from_dcl.py @@ -37,7 +37,6 @@ PRODUCTION_NODE_URL_REST = "https://on.dcl.csa-iot.org" TEST_NODE_URL_REST = "https://on.test-net.dcl.csa-iot.org" -# TODO: really? We can't just get this by name from the DCL? MATTER_CERT_CA_SUBJECT = "MFIxDDAKBgNVBAoMA0NTQTEsMCoGA1UEAwwjTWF0dGVyIENlcnRpZmljYXRpb24gYW5kIFRlc3RpbmcgQ0ExFDASBgorBgEEAYKifAIBDARDNUEw" MATTER_CERT_CA_SUBJECT_KEY_ID = "97:E4:69:D0:C5:04:14:C2:6F:C7:01:F7:7E:94:77:39:09:8D:F6:A5" From 528d8dcec81ed06108f915c7705d9b80b9643a9f Mon Sep 17 00:00:00 2001 From: cecille Date: Tue, 7 May 2024 13:02:57 -0400 Subject: [PATCH 11/13] Don't include CSA root as a PAA --- credentials/fetch_paa_certs_from_dcl.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/credentials/fetch_paa_certs_from_dcl.py b/credentials/fetch_paa_certs_from_dcl.py index 61decd5aa3bfae..3bcd74b6534e26 100644 --- a/credentials/fetch_paa_certs_from_dcl.py +++ b/credentials/fetch_paa_certs_from_dcl.py @@ -201,6 +201,9 @@ def fetch_paa_certs(use_main_net_dcld, use_test_net_dcld, use_main_net_http, use parse_paa_root_certs(cmdpipe, paa_list) for paa in paa_list: + if paa['subject'] == MATTER_CERT_CA_SUBJECT and paa['subjectKeyId'] == MATTER_CERT_CA_SUBJECT_KEY_ID: + # Don't include the CD signing cert as a PAA root. + continue if use_rest: certificate, subject = get_cert_from_rest(rest_node_url, paa['subject'], paa['subjectKeyId']) else: From d3c297b106be5dd0e78859fb8d3aa4d30edb00e7 Mon Sep 17 00:00:00 2001 From: "Restyled.io" Date: Tue, 7 May 2024 17:04:00 +0000 Subject: [PATCH 12/13] Restyled by autopep8 --- .../post-cert-checks.py | 36 ++++++++++++------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/src/python_testing/post_certification_tests/post-cert-checks.py b/src/python_testing/post_certification_tests/post-cert-checks.py index 18dd9ae788db02..06414c251b3371 100644 --- a/src/python_testing/post_certification_tests/post-cert-checks.py +++ b/src/python_testing/post_certification_tests/post-cert-checks.py @@ -171,7 +171,8 @@ def steps_Compliance(self): def test_Compliance(self): self.step(1) key = 'complianceInfo' - entry = requests.get(f"{self.url}/dcl/compliance/compliance-info/{self.vid}/{self.pid}/{self.software_version}/matter").json() + entry = requests.get( + f"{self.url}/dcl/compliance/compliance-info/{self.vid}/{self.pid}/{self.software_version}/matter").json() asserts.assert_true(key in entry.keys(), f"Unable to find compliance entry for {self.vid_pid_sv_str}") logging.info( @@ -184,7 +185,8 @@ def steps_CertifiedModel(self): def test_CertifiedModel(self): self.step(1) key = 'certifiedModel' - entry = requests.get(f"{self.url}/dcl/compliance/certified-models/{self.vid}/{self.pid}/{self.software_version}/matter").json() + entry = requests.get( + f"{self.url}/dcl/compliance/certified-models/{self.vid}/{self.pid}/{self.software_version}/matter").json() asserts.assert_true(key in entry.keys(), f"Unable to find certified model entry for {self.vid_pid_sv_str}") logging.info( @@ -199,11 +201,13 @@ def test_AllSoftwareVersions(self): self.step(1) versions_entry = requests.get(f"{self.url}/dcl/model/versions/{self.vid}/{self.pid}").json() key_model_versions = 'modelVersions' - asserts.assert_true(key_model_versions in versions_entry.keys(), f"Unable to find {key_model_versions} in software versions schema for {self.vid_pid_str}") + asserts.assert_true(key_model_versions in versions_entry.keys(), + f"Unable to find {key_model_versions} in software versions schema for {self.vid_pid_str}") logging.info(f'Found version info for vid=0x{self.vid_pid_str} in the DCL:') logging.info(f'{versions_entry[key_model_versions]}') key_software_versions = 'softwareVersions' - asserts.assert_true(key_software_versions in versions_entry[key_model_versions].keys(), f"Unable to find {key_software_versions} in software versions schema for {self.vid_pid_str}") + asserts.assert_true(key_software_versions in versions_entry[key_model_versions].keys( + ), f"Unable to find {key_software_versions} in software versions schema for {self.vid_pid_str}") problems = [] self.step(2) @@ -211,7 +215,8 @@ def test_AllSoftwareVersions(self): entry_wrapper = requests.get(f"{self.url}/dcl/model/versions/{self.vid}/{self.pid}/{software_version}").json() key_model_version = 'modelVersion' if key_model_version not in entry_wrapper: - problems.append(f'Missing key {key_model_version} in entry for {self.vid_pid_str} software version={software_version}') + problems.append( + f'Missing key {key_model_version} in entry for {self.vid_pid_str} software version={software_version}') continue logging.info(f'Found entry version entry for {self.vid_pid_str} software version={software_version}') logging.info(entry_wrapper) @@ -224,15 +229,18 @@ def test_AllSoftwareVersions(self): def check_key(key): if key not in entry.keys(): - problems.append(f'Missing key {key} in DCL versions entry for {self.vid_pid_str} software version={software_version}') + problems.append( + f'Missing key {key} in DCL versions entry for {self.vid_pid_str} software version={software_version}') check_key(key_ota_url) check_key(key_software_version_valid) if entry[key_software_version_valid] and entry[key_ota_url]: check_key(key_ota_checksum) check_key(key_ota_checksum_type) - checksum_types = {1: hashlib.sha256, 7: hashlib.sha384, 8: hashlib.sha256, 10: hashlib.sha3_256, 11: hashlib.sha3_384, 12: hashlib.sha3_512} + checksum_types = {1: hashlib.sha256, 7: hashlib.sha384, 8: hashlib.sha256, + 10: hashlib.sha3_256, 11: hashlib.sha3_384, 12: hashlib.sha3_512} if entry[key_ota_checksum_type] not in checksum_types.keys(): - problems.append(f'OtaChecksumType for entry {self.vid_pid_str} software version={software_version} is invalid. Found {entry[key_ota_checksum_type]} valid values: {checksum_types.keys()}') + problems.append( + f'OtaChecksumType for entry {self.vid_pid_str} software version={software_version} is invalid. Found {entry[key_ota_checksum_type]} valid values: {checksum_types.keys()}') continue checksum = entry[key_ota_checksum] try: @@ -240,24 +248,28 @@ def check_key(key): except (ValueError, TypeError): is_base64 = False if not is_base64: - problems.append(f"Checksum {checksum} is not base64 encoded for for entry {self.vid_pid_str} software version={software_version}") + problems.append( + f"Checksum {checksum} is not base64 encoded for for entry {self.vid_pid_str} software version={software_version}") continue response = requests.get(entry[key_ota_url]) if not response.ok: - problems.append(f"Unable to get OTA object from {entry[key_ota_url]} for {self.vid_pid_str} software version = {software_version}") + problems.append( + f"Unable to get OTA object from {entry[key_ota_url]} for {self.vid_pid_str} software version = {software_version}") continue ota_len = str(len(response.content)) dcl_len = entry[key_ota_file_size] if ota_len != dcl_len: - problems.append(f'Incorrect OTA size for {self.vid_pid_str} software_version = {software_version}, received size: {len(response.content)} DCL states {entry[key_ota_file_size]}') + problems.append( + f'Incorrect OTA size for {self.vid_pid_str} software_version = {software_version}, received size: {len(response.content)} DCL states {entry[key_ota_file_size]}') continue checksum = checksum_types[entry[key_ota_checksum_type]](response.content).digest() dcl_checksum = base64.b64decode(entry[key_ota_checksum]) if checksum != dcl_checksum: - problems.append(f'Incorrect checksum for {self.vid_pid_str} software version = {software_version}, calculated: {checksum}, DCL: {dcl_checksum}') + problems.append( + f'Incorrect checksum for {self.vid_pid_str} software version = {software_version}, calculated: {checksum}, DCL: {dcl_checksum}') msg = 'Problems found in software version DCL checks:\n' for problem in problems: From 5be6c143b240f1256b46c7a708efdbae273b28e4 Mon Sep 17 00:00:00 2001 From: cecille Date: Tue, 21 May 2024 11:09:48 -0400 Subject: [PATCH 13/13] rename file to production_device_checks.py --- .../{post-cert-checks.py => production_device_checks.py} | 3 +++ 1 file changed, 3 insertions(+) rename src/python_testing/post_certification_tests/{post-cert-checks.py => production_device_checks.py} (99%) diff --git a/src/python_testing/post_certification_tests/post-cert-checks.py b/src/python_testing/post_certification_tests/production_device_checks.py similarity index 99% rename from src/python_testing/post_certification_tests/post-cert-checks.py rename to src/python_testing/post_certification_tests/production_device_checks.py index 06414c251b3371..0e8fd617c44110 100644 --- a/src/python_testing/post_certification_tests/post-cert-checks.py +++ b/src/python_testing/post_certification_tests/production_device_checks.py @@ -22,6 +22,7 @@ # - CD is valid and, signed by one of the known CSA signing certs and # is marked as a production CD # - DCL entries for this device and vendor have all been registered +# - DCL OTA entries have proper sizes and checksums # - TestEventTriggers have been turned off # # This test is performed over PASE on a factory reset device. @@ -29,9 +30,11 @@ # To run this test, first build and install the python chip wheel # files, then add the extra dependencies. From the root: # +# . scripts/activate.sh # ./scripts/build_python.sh -i py # source py/bin/activate # pip install opencv-python requests click_option_group +# python src/python_testing/post_certification_tests/production_device_checks.py import base64 import hashlib