Skip to content

Commit

Permalink
Post certification checks script (project-chip#33256)
Browse files Browse the repository at this point in the history
* Post certification checks script

This test is used to evaluate that all the proper post-certification
work has been done to make a Matter device production ready.
This test ensure that:
- DAC chain is valid and spec compliant, and chains up to a PAA that
  is registered in the main net DCL
- CD is valid and, signed by one of the known CSA signing certs and
  is marked as a production CD
- DCL entries for this device and vendor have all been registered
- TestEventTriggers have been turned off

This test is performed over PASE on a factory reset device.

To run this test, first build and install the python chip wheel
files, then add the extra dependencies. From the root:

./scripts/build_python.sh -i py
source py/bin/activate
pip install opencv-python requests click_option_group

* Restyled by autopep8

* linter

* Add a production CD check to DA-1.7

* report out results better

* fix post cert check in 1.2

* Add a check for software versions

NOTE: not done yet because as it turns out hex strings appear to
be valid base64. I mean, they're garbage, but this test won't
find that.

Need additional tests to detect hex vs. base64

* fix accidental checkin

* Proper checksum check on the downloaded OTA

* Update credentials/fetch_paa_certs_from_dcl.py

* Don't include CSA root as a PAA

* Restyled by autopep8

* rename file to production_device_checks.py

---------

Co-authored-by: Restyled.io <[email protected]>
  • Loading branch information
2 people authored and diogopintodsr committed Jun 19, 2024
1 parent 188e07e commit 6ba1f38
Show file tree
Hide file tree
Showing 6 changed files with 579 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
PRODUCTION_NODE_URL_REST = "https://on.dcl.csa-iot.org"
TEST_NODE_URL_REST = "https://on.test-net.dcl.csa-iot.org"

MATTER_CERT_CA_SUBJECT = "MFIxDDAKBgNVBAoMA0NTQTEsMCoGA1UEAwwjTWF0dGVyIENlcnRpZmljYXRpb24gYW5kIFRlc3RpbmcgQ0ExFDASBgorBgEEAYKifAIBDARDNUEw"
MATTER_CERT_CA_SUBJECT_KEY_ID = "97:E4:69:D0:C5:04:14:C2:6F:C7:01:F7:7E:94:77:39:09:8D:F6:A5"


def parse_paa_root_certs(cmdpipe, paa_list):
"""
Expand Down Expand Up @@ -73,13 +76,14 @@ def parse_paa_root_certs(cmdpipe, paa_list):
else:
if b': ' in line:
key, value = line.split(b': ')
result[key.strip(b' -').decode("utf-8")] = value.strip().decode("utf-8")
result[key.strip(b' -').decode("utf-8")
] = value.strip().decode("utf-8")
parse_paa_root_certs.counter += 1
if parse_paa_root_certs.counter % 2 == 0:
paa_list.append(copy.deepcopy(result))


def write_paa_root_cert(certificate, subject):
def write_cert(certificate, subject):
filename = 'dcld_mirror_' + \
re.sub('[^a-zA-Z0-9_-]', '', re.sub('[=, ]', '_', subject))
with open(filename + '.pem', 'w+') as outfile:
Expand All @@ -93,7 +97,8 @@ def write_paa_root_cert(certificate, subject):
serialization.Encoding.DER)
outfile.write(der_certificate)
except (IOError, ValueError) as e:
print(f"ERROR: Failed to convert {filename + '.pem'}: {str(e)}. Skipping...")
print(
f"ERROR: Failed to convert {filename + '.pem'}: {str(e)}. Skipping...")


def parse_paa_root_cert_from_dcld(cmdpipe):
Expand Down Expand Up @@ -133,7 +138,38 @@ def use_dcld(dcld, production, cmdlist):
@optgroup.option('--paa-trust-store-path', default='paa-root-certs', type=str, metavar='PATH', help="PAA trust store path (default: paa-root-certs)")
def main(use_main_net_dcld, use_test_net_dcld, use_main_net_http, use_test_net_http, paa_trust_store_path):
"""DCL PAA mirroring tools"""
fetch_paa_certs(use_main_net_dcld, use_test_net_dcld, use_main_net_http, use_test_net_http, paa_trust_store_path)


def get_cert_from_rest(rest_node_url, subject, subject_key_id):
response = requests.get(
f"{rest_node_url}/dcl/pki/certificates/{subject}/{subject_key_id}").json()["approvedCertificates"]["certs"][0]
certificate = response["pemCert"].rstrip("\n")
subject = response["subjectAsText"]
return certificate, subject


def fetch_cd_signing_certs(store_path):
''' Only supports using main net http currently.'''
rest_node_url = PRODUCTION_NODE_URL_REST
os.makedirs(store_path, exist_ok=True)
original_dir = os.getcwd()
os.chdir(store_path)

cd_signer_ids = requests.get(
f"{rest_node_url}/dcl/pki/child-certificates/{MATTER_CERT_CA_SUBJECT}/{MATTER_CERT_CA_SUBJECT_KEY_ID}").json()['childCertificates']['certIds']
for signer in cd_signer_ids:
subject = signer['subject']
subject_key_id = signer['subjectKeyId']
certificate, subject = get_cert_from_rest(rest_node_url, subject, subject_key_id)

print(f"Downloaded CD signing cert with subject: {subject}")
write_cert(certificate, subject)

os.chdir(original_dir)


def fetch_paa_certs(use_main_net_dcld, use_test_net_dcld, use_main_net_http, use_test_net_http, paa_trust_store_path):
production = False
dcld = use_test_net_dcld

Expand All @@ -148,36 +184,43 @@ def main(use_main_net_dcld, use_test_net_dcld, use_main_net_http, use_test_net_h
rest_node_url = PRODUCTION_NODE_URL_REST if production else TEST_NODE_URL_REST

os.makedirs(paa_trust_store_path, exist_ok=True)
original_dir = os.getcwd()
os.chdir(paa_trust_store_path)

if use_rest:
paa_list = requests.get(f"{rest_node_url}/dcl/pki/root-certificates").json()["approvedRootCertificates"]["certs"]
paa_list = requests.get(
f"{rest_node_url}/dcl/pki/root-certificates").json()["approvedRootCertificates"]["certs"]
else:
cmdlist = ['query', 'pki', 'all-x509-root-certs']

cmdpipe = subprocess.Popen(use_dcld(dcld, production, cmdlist), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
cmdpipe = subprocess.Popen(use_dcld(
dcld, production, cmdlist), stdout=subprocess.PIPE, stderr=subprocess.PIPE)

paa_list = []
parse_paa_root_certs.counter = 0
parse_paa_root_certs(cmdpipe, paa_list)

for paa in paa_list:
if paa['subject'] == MATTER_CERT_CA_SUBJECT and paa['subjectKeyId'] == MATTER_CERT_CA_SUBJECT_KEY_ID:
# Don't include the CD signing cert as a PAA root.
continue
if use_rest:
response = requests.get(
f"{rest_node_url}/dcl/pki/certificates/{paa['subject']}/{paa['subjectKeyId']}").json()["approvedCertificates"]["certs"][0]
certificate = response["pemCert"]
subject = response["subjectAsText"]
certificate, subject = get_cert_from_rest(rest_node_url, paa['subject'], paa['subjectKeyId'])
else:
cmdlist = ['query', 'pki', 'x509-cert', '-u', paa['subject'], '-k', paa['subjectKeyId']]
cmdlist = ['query', 'pki', 'x509-cert', '-u',
paa['subject'], '-k', paa['subjectKeyId']]

cmdpipe = subprocess.Popen(use_dcld(dcld, production, cmdlist), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
cmdpipe = subprocess.Popen(use_dcld(
dcld, production, cmdlist), stdout=subprocess.PIPE, stderr=subprocess.PIPE)

(certificate, subject) = parse_paa_root_cert_from_dcld(cmdpipe)

certificate = certificate.rstrip('\n')

print(f"Downloaded certificate with subject: {subject}")
write_paa_root_cert(certificate, subject)
print(f"Downloaded PAA certificate with subject: {subject}")
write_cert(certificate, subject)

os.chdir(original_dir)


if __name__ == "__main__":
Expand Down
35 changes: 28 additions & 7 deletions src/controller/python/chip/ChipDeviceCtrl.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,13 +254,18 @@ class DeviceProxyWrapper():
that is not an issue that needs to be accounted for and it will become very apparent
if that happens.
'''
class DeviceProxyType(enum.Enum):
OPERATIONAL = enum.auto(),
COMMISSIONEE = enum.auto(),

def __init__(self, deviceProxy: ctypes.c_void_p, dmLib=None):
def __init__(self, deviceProxy: ctypes.c_void_p, proxyType, dmLib=None):
self._deviceProxy = deviceProxy
self._dmLib = dmLib
self._proxyType = proxyType

def __del__(self):
if (self._dmLib is not None and hasattr(builtins, 'chipStack') and builtins.chipStack is not None):
# Commissionee device proxies are owned by the DeviceCommissioner. See #33031
if (self._proxyType == self.DeviceProxyType.OPERATIONAL and self.self._dmLib is not None and hasattr(builtins, 'chipStack') and builtins.chipStack is not None):
# This destructor is called from any threading context, including on the Matter threading context.
# So, we cannot call chipStack.Call or chipStack.CallAsyncWithCompleteCallback which waits for the posted work to
# actually be executed. Instead, we just post/schedule the work and move on.
Expand Down Expand Up @@ -861,7 +866,23 @@ def GetClusterHandler(self):

return self._Cluster

def GetConnectedDeviceSync(self, nodeid, allowPASE: bool = True, timeoutMs: int = None):
def FindOrEstablishPASESession(self, setupCode: str, nodeid: int, timeoutMs: int = None) -> typing.Optional[DeviceProxyWrapper]:
''' Returns CommissioneeDeviceProxy if we can find or establish a PASE connection to the specified device'''
self.CheckIsActive()
returnDevice = c_void_p(None)
res = self._ChipStack.Call(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned(
self.devCtrl, nodeid, byref(returnDevice)), timeoutMs)
if res.is_success:
return DeviceProxyWrapper(returnDevice, DeviceProxyWrapper.DeviceProxyType.COMMISSIONEE, self._dmLib)

self.EstablishPASESession(setupCode, nodeid)

res = self._ChipStack.Call(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned(
self.devCtrl, nodeid, byref(returnDevice)), timeoutMs)
if res.is_success:
return DeviceProxyWrapper(returnDevice, DeviceProxyWrapper.DeviceProxyType.COMMISSIONEE, self._dmLib)

def GetConnectedDeviceSync(self, nodeid, allowPASE=True, timeoutMs: int = None):
''' Gets an OperationalDeviceProxy or CommissioneeDeviceProxy for the specified Node.
nodeId: Target's Node ID
Expand All @@ -882,7 +903,7 @@ def GetConnectedDeviceSync(self, nodeid, allowPASE: bool = True, timeoutMs: int
self.devCtrl, nodeid, byref(returnDevice)), timeoutMs)
if res.is_success:
logging.info('Using PASE connection')
return DeviceProxyWrapper(returnDevice)
return DeviceProxyWrapper(returnDevice, DeviceProxyWrapper.DeviceProxyType.COMMISSIONEE, self._dmLib)

class DeviceAvailableClosure():
def deviceAvailable(self, device, err):
Expand Down Expand Up @@ -916,7 +937,7 @@ def deviceAvailable(self, device, err):
if returnDevice.value is None:
returnErr.raise_on_error()

return DeviceProxyWrapper(returnDevice, self._dmLib)
return DeviceProxyWrapper(returnDevice, DeviceProxyWrapper.DeviceProxyType.OPERATIONAL, self._dmLib)

async def WaitForActive(self, nodeid, *, timeoutSeconds=30.0, stayActiveDurationMs=30000):
''' Waits a LIT ICD device to become active. Will send a StayActive command to the device on active to allow human operations.
Expand Down Expand Up @@ -948,7 +969,7 @@ async def GetConnectedDevice(self, nodeid, allowPASE: bool = True, timeoutMs: in
self.devCtrl, nodeid, byref(returnDevice)), timeoutMs)
if res.is_success:
logging.info('Using PASE connection')
return DeviceProxyWrapper(returnDevice)
return DeviceProxyWrapper(returnDevice, DeviceProxyWrapper.DeviceProxyType.COMMISSIONEE, self._dmLib)

eventLoop = asyncio.get_running_loop()
future = eventLoop.create_future()
Expand Down Expand Up @@ -987,7 +1008,7 @@ def deviceAvailable(self, device, err):
else:
await future

return DeviceProxyWrapper(future.result(), self._dmLib)
return DeviceProxyWrapper(future.result(), DeviceProxyWrapper.DeviceProxyType.OPERATIONAL, self._dmLib)

def ComputeRoundTripTimeout(self, nodeid, upperLayerProcessingTimeoutMs: int = 0):
''' Returns a computed timeout value based on the round-trip time it takes for the peer at the other end of the session to
Expand Down
14 changes: 11 additions & 3 deletions src/python_testing/TC_DA_1_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import re

import chip.clusters as Clusters
from basic_composition_support import BasicCompositionTests
from chip.interaction_model import InteractionModelError, Status
from chip.tlv import TLVReader
from cryptography import x509
Expand Down Expand Up @@ -104,7 +105,7 @@ def parse_ids_from_certs(dac: x509.Certificate, pai: x509.Certificate) -> tuple(
# default is 'credentials/development/cd-certs'.


class TC_DA_1_2(MatterBaseTest):
class TC_DA_1_2(MatterBaseTest, BasicCompositionTests):
def desc_TC_DA_1_2(self):
return "Device Attestation Request Validation [DUT - Commissionee]"

Expand Down Expand Up @@ -164,6 +165,11 @@ def steps_TC_DA_1_2(self):
async def test_TC_DA_1_2(self):
is_ci = self.check_pics('PICS_SDK_CI_ONLY')
cd_cert_dir = self.user_params.get("cd_cert_dir", 'credentials/development/cd-certs')
post_cert_test = self.user_params.get("post_cert_test", False)

do_test_over_pase = self.user_params.get("use_pase_only", False)
if do_test_over_pase:
self.connect_over_pase(self.default_controller)

# Commissioning - done
self.step(0)
Expand Down Expand Up @@ -308,7 +314,9 @@ async def test_TC_DA_1_2(self):
self.step("6.8")
asserts.assert_in(version_number, range(0, 65535), "Version number out of range")
self.step("6.9")
if is_ci:
if post_cert_test:
asserts.assert_equal(certification_type, 2, "Certification declaration is not marked as production.")
elif is_ci:
asserts.assert_in(certification_type, [0, 1, 2], "Certification type is out of range")
else:
asserts.assert_in(certification_type, [1, 2], "Certification type is out of range")
Expand Down Expand Up @@ -392,7 +400,7 @@ async def test_TC_DA_1_2(self):
self.mark_current_step_skipped()

self.step(12)
proxy = self.default_controller.GetConnectedDeviceSync(self.dut_node_id, False)
proxy = self.default_controller.GetConnectedDeviceSync(self.dut_node_id, do_test_over_pase)
asserts.assert_equal(len(proxy.attestationChallenge), 16, "Attestation challenge is the wrong length")
attestation_tbs = elements + proxy.attestationChallenge

Expand Down
9 changes: 6 additions & 3 deletions src/python_testing/basic_composition_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ def ConvertValue(value) -> Any:


class BasicCompositionTests:
def connect_over_pase(self, dev_ctrl):
setupCode = self.matter_test_config.qr_code_content if self.matter_test_config.qr_code_content is not None else self.matter_test_config.manual_code
asserts.assert_true(setupCode, "Require either --qr-code or --manual-code.")
dev_ctrl.FindOrEstablishPASESession(setupCode, self.dut_node_id)

def dump_wildcard(self, dump_device_composition_path: typing.Optional[str]):
node_dump_dict = {endpoint_id: MatterTlvToJson(self.endpoints_tlv[endpoint_id]) for endpoint_id in self.endpoints_tlv}
logging.debug(f"Raw TLV contents of Node: {json.dumps(node_dump_dict, indent=2)}")
Expand All @@ -116,10 +121,8 @@ async def setup_class_helper(self, default_to_pase: bool = True):
dump_device_composition_path: Optional[str] = self.user_params.get("dump_device_composition_path", None)

if do_test_over_pase:
setupCode = self.matter_test_config.qr_code_content if self.matter_test_config.qr_code_content is not None else self.matter_test_config.manual_code
asserts.assert_true(setupCode, "Require either --qr-code or --manual-code.")
self.connect_over_pase(dev_ctrl)
node_id = self.dut_node_id
dev_ctrl.EstablishPASESession(setupCode, node_id)
else:
# Using the already commissioned node
node_id = self.dut_node_id
Expand Down
2 changes: 1 addition & 1 deletion src/python_testing/matter_testing_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -1670,7 +1670,7 @@ def run_tests_no_exit(test_class: MatterBaseTest, matter_test_config: MatterTest

if hooks:
# Right now, we only support running a single test class at once,
# but it's relatively easy to exapand that to make the test process faster
# but it's relatively easy to expand that to make the test process faster
# TODO: support a list of tests
hooks.start(count=1)
# Mobly gives the test run time in seconds, lets be a bit more precise
Expand Down
Loading

0 comments on commit 6ba1f38

Please sign in to comment.