Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Post certification checks script #33256

Merged
merged 14 commits into from
Jun 18, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
PRODUCTION_NODE_URL_REST = "https://on.dcl.csa-iot.org"
TEST_NODE_URL_REST = "https://on.test-net.dcl.csa-iot.org"

MATTER_CERT_CA_SUBJECT = "MFIxDDAKBgNVBAoMA0NTQTEsMCoGA1UEAwwjTWF0dGVyIENlcnRpZmljYXRpb24gYW5kIFRlc3RpbmcgQ0ExFDASBgorBgEEAYKifAIBDARDNUEw"
MATTER_CERT_CA_SUBJECT_KEY_ID = "97:E4:69:D0:C5:04:14:C2:6F:C7:01:F7:7E:94:77:39:09:8D:F6:A5"


def parse_paa_root_certs(cmdpipe, paa_list):
"""
Expand Down Expand Up @@ -73,13 +76,14 @@ def parse_paa_root_certs(cmdpipe, paa_list):
else:
if b': ' in line:
key, value = line.split(b': ')
result[key.strip(b' -').decode("utf-8")] = value.strip().decode("utf-8")
result[key.strip(b' -').decode("utf-8")
] = value.strip().decode("utf-8")
parse_paa_root_certs.counter += 1
if parse_paa_root_certs.counter % 2 == 0:
paa_list.append(copy.deepcopy(result))


def write_paa_root_cert(certificate, subject):
def write_cert(certificate, subject):
filename = 'dcld_mirror_' + \
re.sub('[^a-zA-Z0-9_-]', '', re.sub('[=, ]', '_', subject))
with open(filename + '.pem', 'w+') as outfile:
Expand All @@ -93,7 +97,8 @@ def write_paa_root_cert(certificate, subject):
serialization.Encoding.DER)
outfile.write(der_certificate)
except (IOError, ValueError) as e:
print(f"ERROR: Failed to convert {filename + '.pem'}: {str(e)}. Skipping...")
print(
f"ERROR: Failed to convert {filename + '.pem'}: {str(e)}. Skipping...")


def parse_paa_root_cert_from_dcld(cmdpipe):
Expand Down Expand Up @@ -133,7 +138,38 @@ def use_dcld(dcld, production, cmdlist):
@optgroup.option('--paa-trust-store-path', default='paa-root-certs', type=str, metavar='PATH', help="PAA trust store path (default: paa-root-certs)")
def main(use_main_net_dcld, use_test_net_dcld, use_main_net_http, use_test_net_http, paa_trust_store_path):
"""DCL PAA mirroring tools"""
fetch_paa_certs(use_main_net_dcld, use_test_net_dcld, use_main_net_http, use_test_net_http, paa_trust_store_path)


def get_cert_from_rest(rest_node_url, subject, subject_key_id):
response = requests.get(
f"{rest_node_url}/dcl/pki/certificates/{subject}/{subject_key_id}").json()["approvedCertificates"]["certs"][0]
certificate = response["pemCert"].rstrip("\n")
subject = response["subjectAsText"]
return certificate, subject


def fetch_cd_signing_certs(store_path):
''' Only supports using main net http currently.'''
rest_node_url = PRODUCTION_NODE_URL_REST
os.makedirs(store_path, exist_ok=True)
original_dir = os.getcwd()
os.chdir(store_path)
andy31415 marked this conversation as resolved.
Show resolved Hide resolved

cd_signer_ids = requests.get(
f"{rest_node_url}/dcl/pki/child-certificates/{MATTER_CERT_CA_SUBJECT}/{MATTER_CERT_CA_SUBJECT_KEY_ID}").json()['childCertificates']['certIds']
for signer in cd_signer_ids:
subject = signer['subject']
subject_key_id = signer['subjectKeyId']
certificate, subject = get_cert_from_rest(rest_node_url, subject, subject_key_id)

print(f"Downloaded CD signing cert with subject: {subject}")
write_cert(certificate, subject)

os.chdir(original_dir)


def fetch_paa_certs(use_main_net_dcld, use_test_net_dcld, use_main_net_http, use_test_net_http, paa_trust_store_path):
cecille marked this conversation as resolved.
Show resolved Hide resolved
production = False
dcld = use_test_net_dcld

Expand All @@ -148,36 +184,43 @@ def main(use_main_net_dcld, use_test_net_dcld, use_main_net_http, use_test_net_h
rest_node_url = PRODUCTION_NODE_URL_REST if production else TEST_NODE_URL_REST

os.makedirs(paa_trust_store_path, exist_ok=True)
original_dir = os.getcwd()
andy31415 marked this conversation as resolved.
Show resolved Hide resolved
os.chdir(paa_trust_store_path)

if use_rest:
paa_list = requests.get(f"{rest_node_url}/dcl/pki/root-certificates").json()["approvedRootCertificates"]["certs"]
paa_list = requests.get(
f"{rest_node_url}/dcl/pki/root-certificates").json()["approvedRootCertificates"]["certs"]
else:
cmdlist = ['query', 'pki', 'all-x509-root-certs']

cmdpipe = subprocess.Popen(use_dcld(dcld, production, cmdlist), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
cmdpipe = subprocess.Popen(use_dcld(
dcld, production, cmdlist), stdout=subprocess.PIPE, stderr=subprocess.PIPE)

paa_list = []
parse_paa_root_certs.counter = 0
parse_paa_root_certs(cmdpipe, paa_list)

for paa in paa_list:
if paa['subject'] == MATTER_CERT_CA_SUBJECT and paa['subjectKeyId'] == MATTER_CERT_CA_SUBJECT_KEY_ID:
# Don't include the CD signing cert as a PAA root.
continue
if use_rest:
response = requests.get(
f"{rest_node_url}/dcl/pki/certificates/{paa['subject']}/{paa['subjectKeyId']}").json()["approvedCertificates"]["certs"][0]
certificate = response["pemCert"]
subject = response["subjectAsText"]
certificate, subject = get_cert_from_rest(rest_node_url, paa['subject'], paa['subjectKeyId'])
else:
cmdlist = ['query', 'pki', 'x509-cert', '-u', paa['subject'], '-k', paa['subjectKeyId']]
cmdlist = ['query', 'pki', 'x509-cert', '-u',
paa['subject'], '-k', paa['subjectKeyId']]

cmdpipe = subprocess.Popen(use_dcld(dcld, production, cmdlist), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
cmdpipe = subprocess.Popen(use_dcld(
dcld, production, cmdlist), stdout=subprocess.PIPE, stderr=subprocess.PIPE)

(certificate, subject) = parse_paa_root_cert_from_dcld(cmdpipe)

certificate = certificate.rstrip('\n')

print(f"Downloaded certificate with subject: {subject}")
write_paa_root_cert(certificate, subject)
print(f"Downloaded PAA certificate with subject: {subject}")
write_cert(certificate, subject)

os.chdir(original_dir)


if __name__ == "__main__":
Expand Down
35 changes: 28 additions & 7 deletions src/controller/python/chip/ChipDeviceCtrl.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,13 +254,18 @@ class DeviceProxyWrapper():
that is not an issue that needs to be accounted for and it will become very apparent
if that happens.
'''
class DeviceProxyType(enum.Enum):
OPERATIONAL = enum.auto(),
COMMISSIONEE = enum.auto(),

def __init__(self, deviceProxy: ctypes.c_void_p, dmLib=None):
def __init__(self, deviceProxy: ctypes.c_void_p, proxyType, dmLib=None):
self._deviceProxy = deviceProxy
self._dmLib = dmLib
self._proxyType = proxyType

def __del__(self):
if (self._dmLib is not None and hasattr(builtins, 'chipStack') and builtins.chipStack is not None):
# Commissionee device proxies are owned by the DeviceCommissioner. See #33031
if (self._proxyType == self.DeviceProxyType.OPERATIONAL and self.self._dmLib is not None and hasattr(builtins, 'chipStack') and builtins.chipStack is not None):
# This destructor is called from any threading context, including on the Matter threading context.
# So, we cannot call chipStack.Call or chipStack.CallAsyncWithCompleteCallback which waits for the posted work to
# actually be executed. Instead, we just post/schedule the work and move on.
Expand Down Expand Up @@ -861,7 +866,23 @@ def GetClusterHandler(self):

return self._Cluster

def GetConnectedDeviceSync(self, nodeid, allowPASE: bool = True, timeoutMs: int = None):
def FindOrEstablishPASESession(self, setupCode: str, nodeid: int, timeoutMs: int = None) -> typing.Optional[DeviceProxyWrapper]:
''' Returns CommissioneeDeviceProxy if we can find or establish a PASE connection to the specified device'''
self.CheckIsActive()
returnDevice = c_void_p(None)
res = self._ChipStack.Call(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned(
self.devCtrl, nodeid, byref(returnDevice)), timeoutMs)
if res.is_success:
return DeviceProxyWrapper(returnDevice, DeviceProxyWrapper.DeviceProxyType.COMMISSIONEE, self._dmLib)

self.EstablishPASESession(setupCode, nodeid)

res = self._ChipStack.Call(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned(
self.devCtrl, nodeid, byref(returnDevice)), timeoutMs)
if res.is_success:
return DeviceProxyWrapper(returnDevice, DeviceProxyWrapper.DeviceProxyType.COMMISSIONEE, self._dmLib)

def GetConnectedDeviceSync(self, nodeid, allowPASE=True, timeoutMs: int = None):
''' Gets an OperationalDeviceProxy or CommissioneeDeviceProxy for the specified Node.

nodeId: Target's Node ID
Expand All @@ -882,7 +903,7 @@ def GetConnectedDeviceSync(self, nodeid, allowPASE: bool = True, timeoutMs: int
self.devCtrl, nodeid, byref(returnDevice)), timeoutMs)
if res.is_success:
logging.info('Using PASE connection')
return DeviceProxyWrapper(returnDevice)
return DeviceProxyWrapper(returnDevice, DeviceProxyWrapper.DeviceProxyType.COMMISSIONEE, self._dmLib)

class DeviceAvailableClosure():
def deviceAvailable(self, device, err):
Expand Down Expand Up @@ -916,7 +937,7 @@ def deviceAvailable(self, device, err):
if returnDevice.value is None:
returnErr.raise_on_error()

return DeviceProxyWrapper(returnDevice, self._dmLib)
return DeviceProxyWrapper(returnDevice, DeviceProxyWrapper.DeviceProxyType.OPERATIONAL, self._dmLib)

async def WaitForActive(self, nodeid, *, timeoutSeconds=30.0, stayActiveDurationMs=30000):
''' Waits a LIT ICD device to become active. Will send a StayActive command to the device on active to allow human operations.
Expand Down Expand Up @@ -948,7 +969,7 @@ async def GetConnectedDevice(self, nodeid, allowPASE: bool = True, timeoutMs: in
self.devCtrl, nodeid, byref(returnDevice)), timeoutMs)
if res.is_success:
logging.info('Using PASE connection')
return DeviceProxyWrapper(returnDevice)
return DeviceProxyWrapper(returnDevice, DeviceProxyWrapper.DeviceProxyType.COMMISSIONEE, self._dmLib)

eventLoop = asyncio.get_running_loop()
future = eventLoop.create_future()
Expand Down Expand Up @@ -987,7 +1008,7 @@ def deviceAvailable(self, device, err):
else:
await future

return DeviceProxyWrapper(future.result(), self._dmLib)
return DeviceProxyWrapper(future.result(), DeviceProxyWrapper.DeviceProxyType.OPERATIONAL, self._dmLib)

def ComputeRoundTripTimeout(self, nodeid, upperLayerProcessingTimeoutMs: int = 0):
''' Returns a computed timeout value based on the round-trip time it takes for the peer at the other end of the session to
Expand Down
14 changes: 11 additions & 3 deletions src/python_testing/TC_DA_1_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import re

import chip.clusters as Clusters
from basic_composition_support import BasicCompositionTests
from chip.interaction_model import InteractionModelError, Status
from chip.tlv import TLVReader
from cryptography import x509
Expand Down Expand Up @@ -104,7 +105,7 @@ def parse_ids_from_certs(dac: x509.Certificate, pai: x509.Certificate) -> tuple(
# default is 'credentials/development/cd-certs'.


class TC_DA_1_2(MatterBaseTest):
class TC_DA_1_2(MatterBaseTest, BasicCompositionTests):
def desc_TC_DA_1_2(self):
return "Device Attestation Request Validation [DUT - Commissionee]"

Expand Down Expand Up @@ -164,6 +165,11 @@ def steps_TC_DA_1_2(self):
async def test_TC_DA_1_2(self):
is_ci = self.check_pics('PICS_SDK_CI_ONLY')
cd_cert_dir = self.user_params.get("cd_cert_dir", 'credentials/development/cd-certs')
post_cert_test = self.user_params.get("post_cert_test", False)

do_test_over_pase = self.user_params.get("use_pase_only", False)
if do_test_over_pase:
self.connect_over_pase(self.default_controller)

# Commissioning - done
self.step(0)
Expand Down Expand Up @@ -308,7 +314,9 @@ async def test_TC_DA_1_2(self):
self.step("6.8")
asserts.assert_in(version_number, range(0, 65535), "Version number out of range")
self.step("6.9")
if is_ci:
if post_cert_test:
asserts.assert_equal(certification_type, 2, "Certification declaration is not marked as production.")
elif is_ci:
asserts.assert_in(certification_type, [0, 1, 2], "Certification type is out of range")
else:
asserts.assert_in(certification_type, [1, 2], "Certification type is out of range")
Expand Down Expand Up @@ -392,7 +400,7 @@ async def test_TC_DA_1_2(self):
self.mark_current_step_skipped()

self.step(12)
proxy = self.default_controller.GetConnectedDeviceSync(self.dut_node_id, False)
proxy = self.default_controller.GetConnectedDeviceSync(self.dut_node_id, do_test_over_pase)
asserts.assert_equal(len(proxy.attestationChallenge), 16, "Attestation challenge is the wrong length")
attestation_tbs = elements + proxy.attestationChallenge

Expand Down
9 changes: 6 additions & 3 deletions src/python_testing/basic_composition_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ def ConvertValue(value) -> Any:


class BasicCompositionTests:
def connect_over_pase(self, dev_ctrl):
setupCode = self.matter_test_config.qr_code_content if self.matter_test_config.qr_code_content is not None else self.matter_test_config.manual_code
asserts.assert_true(setupCode, "Require either --qr-code or --manual-code.")
dev_ctrl.FindOrEstablishPASESession(setupCode, self.dut_node_id)

def dump_wildcard(self, dump_device_composition_path: typing.Optional[str]):
node_dump_dict = {endpoint_id: MatterTlvToJson(self.endpoints_tlv[endpoint_id]) for endpoint_id in self.endpoints_tlv}
logging.debug(f"Raw TLV contents of Node: {json.dumps(node_dump_dict, indent=2)}")
Expand All @@ -116,10 +121,8 @@ async def setup_class_helper(self, default_to_pase: bool = True):
dump_device_composition_path: Optional[str] = self.user_params.get("dump_device_composition_path", None)

if do_test_over_pase:
setupCode = self.matter_test_config.qr_code_content if self.matter_test_config.qr_code_content is not None else self.matter_test_config.manual_code
asserts.assert_true(setupCode, "Require either --qr-code or --manual-code.")
self.connect_over_pase(dev_ctrl)
node_id = self.dut_node_id
dev_ctrl.EstablishPASESession(setupCode, node_id)
else:
# Using the already commissioned node
node_id = self.dut_node_id
Expand Down
2 changes: 1 addition & 1 deletion src/python_testing/matter_testing_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -1670,7 +1670,7 @@ def run_tests_no_exit(test_class: MatterBaseTest, matter_test_config: MatterTest

if hooks:
# Right now, we only support running a single test class at once,
# but it's relatively easy to exapand that to make the test process faster
# but it's relatively easy to expand that to make the test process faster
# TODO: support a list of tests
hooks.start(count=1)
# Mobly gives the test run time in seconds, lets be a bit more precise
Expand Down
Loading
Loading