diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index a51bacd2340a84..bda66ca507030b 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -509,6 +509,8 @@ jobs: scripts/run_in_build_env.sh './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace_decode 1" --script "src/python_testing/TC_ACE_1_4.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --int-arg PIXIT.ACE.APPENDPOINT:1 PIXIT.ACE.APPDEVTYPEID:0x0100 --string-arg PIXIT.ACE.APPCLUSTER:OnOff PIXIT.ACE.APPATTRIBUTE:OnOff"' scripts/run_in_build_env.sh './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace_decode 1" --script "src/python_testing/TC_ACE_1_3.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021"' scripts/run_in_build_env.sh './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace_decode 1" --script "src/python_testing/TC_CGEN_2_4.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021"' + scripts/run_in_build_env.sh './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace_decode 1" --script "src/python_testing/TC_DA_1_2.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values"' + scripts/run_in_build_env.sh './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace_decode 1" --script "src/python_testing/TC_DA_1_5.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values"' scripts/run_in_build_env.sh './scripts/tests/run_python_test.py --script "src/python_testing/TestMatterTestingSupport.py"' - name: Uploading core files uses: actions/upload-artifact@v3 diff --git a/src/controller/python/ChipDeviceController-ScriptBinding.cpp b/src/controller/python/ChipDeviceController-ScriptBinding.cpp index 5b2b3a01759ec4..39d4fa58647ce3 100644 --- a/src/controller/python/ChipDeviceController-ScriptBinding.cpp +++ b/src/controller/python/ChipDeviceController-ScriptBinding.cpp @@ -198,6 +198,7 @@ PyChipError pychip_GetConnectedDeviceByNodeId(chip::Controller::DeviceCommission PyChipError pychip_FreeOperationalDeviceProxy(chip::OperationalDeviceProxy * deviceProxy); PyChipError pychip_GetLocalSessionId(chip::OperationalDeviceProxy * deviceProxy, uint16_t * localSessionId); PyChipError pychip_GetNumSessionsToPeer(chip::OperationalDeviceProxy * deviceProxy, uint32_t * numSessions); +PyChipError pychip_GetAttestationChallenge(chip::OperationalDeviceProxy * deviceProxy, uint8_t * buf, size_t * size); PyChipError pychip_GetDeviceBeingCommissioned(chip::Controller::DeviceCommissioner * devCtrl, chip::NodeId nodeId, CommissioneeDeviceProxy ** proxy); PyChipError pychip_ExpireSessions(chip::Controller::DeviceCommissioner * devCtrl, chip::NodeId nodeId); @@ -713,6 +714,19 @@ PyChipError pychip_GetNumSessionsToPeer(chip::OperationalDeviceProxy * devicePro return ToPyChipError(CHIP_NO_ERROR); } +PyChipError pychip_GetAttestationChallenge(chip::OperationalDeviceProxy * deviceProxy, uint8_t * buf, size_t * size) +{ + VerifyOrReturnError(deviceProxy->GetSecureSession().HasValue(), ToPyChipError(CHIP_ERROR_MISSING_SECURE_SESSION)); + VerifyOrReturnError(buf != nullptr, ToPyChipError(CHIP_ERROR_INVALID_ARGUMENT)); + + ByteSpan challenge = deviceProxy->GetSecureSession().Value()->AsSecureSession()->GetCryptoContext().GetAttestationChallenge(); + VerifyOrReturnError(challenge.size() <= *size, ToPyChipError(CHIP_ERROR_INVALID_ARGUMENT)); + memcpy(buf, challenge.data(), challenge.size()); + *size = challenge.size(); + + return ToPyChipError(CHIP_NO_ERROR); +} + PyChipError pychip_GetDeviceBeingCommissioned(chip::Controller::DeviceCommissioner * devCtrl, chip::NodeId nodeId, CommissioneeDeviceProxy ** proxy) { diff --git a/src/controller/python/chip/ChipDeviceCtrl.py b/src/controller/python/chip/ChipDeviceCtrl.py index e41bb4257434bc..7982e3f644911c 100644 --- a/src/controller/python/chip/ChipDeviceCtrl.py +++ b/src/controller/python/chip/ChipDeviceCtrl.py @@ -201,6 +201,23 @@ def numTotalSessions(self) -> int: return numSessions.value + @property + def attestationChallenge(self) -> bytes: + self._dmLib.pychip_GetAttestationChallenge.argtypes = (c_void_p, POINTER(c_uint8), POINTER(c_size_t)) + self._dmLib.pychip_GetAttestationChallenge.restype = PyChipError + + # this buffer is overly large, but we shall resize + size = 64 + buf = ctypes.c_uint8(size) + csize = ctypes.c_size_t(size) + builtins.chipStack.Call( + lambda: self._dmLib.pychip_GetAttestationChallenge(self._deviceProxy, buf, ctypes.byref(csize)) + ).raise_on_error() + + resize(buf, csize.value) + + return bytes(buf) + DiscoveryFilterType = discovery.FilterType diff --git a/src/python_testing/TC_DA_1_2.py b/src/python_testing/TC_DA_1_2.py new file mode 100644 index 00000000000000..2140330be3ca97 --- /dev/null +++ b/src/python_testing/TC_DA_1_2.py @@ -0,0 +1,391 @@ +# +# Copyright (c) 2023 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import random + +import chip.clusters as Clusters +from chip.interaction_model import InteractionModelError, Status +from chip.tlv import TLVReader +from cryptography import x509 +from cryptography.exceptions import InvalidSignature +from cryptography.hazmat._oid import ExtensionOID +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import ec, utils +from ecdsa.curves import curve_by_name +from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main, hex_from_bytes, type_matches +from mobly import asserts +from pyasn1.codec.der.decoder import decode as der_decoder +from pyasn1.error import PyAsn1Error +from pyasn1.type import univ +from pyasn1_modules import rfc5652 + + +def get_value_for_oid(oid_dotted_str: str, cert: x509.Certificate) -> str: + rdn = list(filter(lambda rdn: oid_dotted_str in rdn.oid.dotted_string, cert.subject)) + if len(rdn) != 1: + return None + + return rdn[0].value.strip() + + +def parse_ids_from_subject(cert: x509.Certificate) -> tuple([str, str]): + vid_str = get_value_for_oid('1.3.6.1.4.1.37244.2.1', cert) + pid_str = get_value_for_oid('1.3.6.1.4.1.37244.2.2', cert) + + return vid_str, pid_str + + +def parse_single_vidpid_from_common_name(commonName: str, tag_str: str) -> str: + sp = commonName.split(tag_str) + if (len(sp)) != 2: + return None + + s = sp[1][:4] + if not s.isupper() or len(s) != 4: + return None + + return s + + +def parse_ids_from_common_name(cert: x509.Certificate) -> tuple([str, str]): + common = get_value_for_oid('2.5.4.3', cert) + vid_str = parse_single_vidpid_from_common_name(common, 'Mvid:') + pid_str = parse_single_vidpid_from_common_name(common, 'Mpid:') + + return vid_str, pid_str + + +def parse_ids_from_certs(dac: x509.Certificate, pai: x509.Certificate) -> tuple([int, int, int, int]): + dac_vid_str, dac_pid_str = parse_ids_from_subject(dac) + pai_vid_str, pai_pid_str = parse_ids_from_subject(pai) + + # Fallback methods - parse from commonName + if dac_vid_str is None and dac_pid_str is None: + dac_vid_str, dac_pid_str = parse_ids_from_common_name(dac) + + if pai_vid_str is None and pai_pid_str is None: + pai_vid_str, pai_pid_str = parse_ids_from_common_name(pai) + + # PID is not required in the PAI + asserts.assert_true(dac_vid_str is not None, "VID must be present in the DAC") + asserts.assert_true(dac_pid_str is not None, "PID must be present in the DAC") + asserts.assert_true(pai_vid_str is not None, "VID must be present in the PAI") + + dac_vid = int(dac_vid_str, 16) + dac_pid = int(dac_pid_str, 16) + pai_vid = int(pai_vid_str, 16) + if pai_pid_str: + pai_pid = int(pai_pid_str, 16) + else: + pai_pid = None + + return dac_vid, dac_pid, pai_vid, pai_pid + + +class TC_DA_1_2(MatterBaseTest): + @async_test_body + async def test_TC_DA_1_2(self): + is_ci = self.check_pics('PICS_SDK_CI_ONLY') + # These PICS will be ignored on the CI because we're going to test a bunch of combos + pics_origin_pid = self.check_pics('MCORE.DA.CERTDECL_ORIGIN_PRODUCTID') + pics_origin_vid = self.check_pics('MCORE.DA.CERTDECL_ORIGIN_VENDORID') + pics_paa_list = self.check_pics('MCORE.DA.CERTDECL_AUTH_PAA') + pics_firmware_info = self.check_pics('MCORE.DA.ATTESTELEMENT_FW_INFO') + if pics_origin_pid != pics_origin_vid: + asserts.fail("MCORE.DA.CERTDECL_ORIGIN_PRODUCTID and MCORE.DA.CERTDECL_ORIGIN_VENDORID PICS codes must match") + + self.print_step(0, "Commissioning, already done") + + opcreds = Clusters.Objects.OperationalCredentials + basic = Clusters.Objects.BasicInformation + + self.print_step(1, "Generate 32-byte nonce") + nonce = random.randbytes(32) + + self.print_step(2, "Send AttestationRequest") + attestation_resp = await self.send_single_cmd(cmd=opcreds.Commands.AttestationRequest(attestationNonce=nonce)) + + self.print_step("3a", "Verify AttestationResponse is correct type") + asserts.assert_true(type_matches(attestation_resp, opcreds.Commands.AttestationResponse), + "DUT returned invalid response to AttestationRequest") + + self.print_step("3b", "Send CertificateChainRequest for DAC") + type = opcreds.Enums.CertificateChainTypeEnum.kDACCertificate + dac_resp = await self.send_single_cmd(cmd=opcreds.Commands.CertificateChainRequest(certificateType=type)) + + self.print_step("3c", "Verify DAC is x509v3 and <= 600 bytes") + asserts.assert_true(type_matches(dac_resp, opcreds.Commands.CertificateChainResponse), + "DUT returned invalid response to CertificateChainRequest") + der_dac = dac_resp.certificate + asserts.assert_less_equal(len(der_dac), 600, "Returned DAC is > 600 bytes") + # This throws an exception for a non-x509 cert + try: + parsed_dac = x509.load_der_x509_certificate(der_dac) + except ValueError: + asserts.assert_true(False, "Unable to parse certificate from CertificateChainResponse") + asserts.assert_equal(parsed_dac.version, x509.Version.v3, "DUT returned incorrect certificate type") + + self.print_step("3d", "Send CertificateChainRequest for PAI and verifies PAI is x509v3 and <= 600 bytes") + type = opcreds.Enums.CertificateChainTypeEnum.kPAICertificate + pai_resp = await self.send_single_cmd(cmd=opcreds.Commands.CertificateChainRequest(certificateType=type)) + asserts.assert_true(type_matches(pai_resp, opcreds.Commands.CertificateChainResponse), + "DUT returned invalid response to CertificateChainRequest") + der_pai = pai_resp.certificate + asserts.assert_less_equal(len(der_pai), 600, "Returned PAI is > 600 bytes") + # This throws an exception for a non-x509 cert + try: + parsed_pai = x509.load_der_x509_certificate(der_pai) + except ValueError: + asserts.assert_true(False, "Unable to parse certificate from CertificateChainResponse") + asserts.assert_equal(parsed_pai.version, x509.Version.v3, "DUT returned incorrect certificate type") + + self.print_step("3e", "TH1 saves PAI") + # already saved above + + self.print_step("4a", "Read VendorID from basic info") + basic_info_vendor_id = await self.read_single_attribute_check_success(basic, basic.Attributes.VendorID) + + self.print_step("4b", "Read ProductID from basic info") + basic_info_product_id = await self.read_single_attribute_check_success(basic, basic.Attributes.ProductID) + + self.print_step(5, "Extract the attestation_elements_message") + elements = attestation_resp.attestationElements + + self.print_step(6, "Verify the AttestationResponse has the following fields") + # OK, it's a bit weird that we're doing this after extracting the elements already, but sure. + # We type checked earlier, but let's grab the signature here. + signature_attestation_raw = attestation_resp.attestationSignature + + self.print_step(7, "Read the attestation_elements_message structure fields") + # Already done + + self.print_step(8, "Verify that the attestation_elements_message structure fields satisfy the following conditions") + # Not sure why this is a separate step, but I'm ready...let's check. + + self.print_step("8.1", "Verify attestation elements size is < = 900 bytes") + asserts.assert_less_equal(len(elements), 900, "AttestationElements field is more than 900 bytes") + + self.print_step("8.2", "Verify certification declaration is present and follows spec format") + decoded = TLVReader(elements).get()["Any"] + # Certification declaration is tag 1 + asserts.assert_in(1, decoded.keys(), "CD is not present in the attestation elements") + cd_der = decoded[1] + + try: + temp, _ = der_decoder(cd_der, asn1Spec=rfc5652.ContentInfo()) + except PyAsn1Error: + asserts.fail("Unable to decode CD - improperly encoded DER") + + # turn this into a dict so I don't have to keep parsing tuples + layer1 = dict(temp) + + id_sha256 = univ.ObjectIdentifier('2.16.840.1.101.3.4.2.1') + + asserts.assert_equal(layer1['contentType'], rfc5652.id_signedData, "Incorrect object type") + + # uh, is this actually right? Doesn't the spec say enveloped data? + temp, _ = der_decoder(layer1['content'].asOctets(), asn1Spec=rfc5652.SignedData()) + + signed_data = dict(temp) + + asserts.assert_equal(signed_data['version'], 3, "Signed data version is not 3") + asserts.assert_equal(len(signed_data['digestAlgorithms']), 1, "More than one digest algorithm listed") + + # DigestAlgorithmIdentifier + algo_id = dict(signed_data['digestAlgorithms'][0]) + asserts.assert_equal(algo_id['algorithm'], id_sha256, "Reported digest algorithm is not SHA256") + + encap_content_info = dict(signed_data['encapContentInfo']) + + id_pkcs7_data = univ.ObjectIdentifier('1.2.840.113549.1.7.1') + asserts.assert_equal(encap_content_info['eContentType'], id_pkcs7_data, "Incorrect encapsulated content type") + + cd_tlv = bytes(encap_content_info['eContent']) + + # Check the signer info + # There should be only one signer info + asserts.assert_equal(len(signed_data['signerInfos']), 1, "Too many signer infos provided") + + # version shoule be 3 + signer_info = dict(signed_data['signerInfos'][0]) + asserts.assert_equal(signer_info['version'], 3, "Incorrect version on signer info") + + # subject key identifier needs to match the connectivity standards aliance key + subject_key_identifier = bytes(dict(signer_info['sid'])['subjectKeyIdentifier']) + + # digest algorithm is sha256, only one allowed + algo_id = dict(signer_info['digestAlgorithm']) + asserts.assert_equal(algo_id['algorithm'], id_sha256, "Incorrect digest algorithm for the signer info") + + # signature algorithm is ecdsa-with-sha256 + id_ecdsa_with_sha256 = univ.ObjectIdentifier('1.2.840.10045.4.3.2') + algo_id = dict(signer_info['signatureAlgorithm']) + asserts.assert_equal(algo_id['algorithm'], id_ecdsa_with_sha256, "Incorrect signature algorithm") + + self.print_step("8.3", "Verify mandatory cd contents") + # First, lets parse it + cd = TLVReader(cd_tlv).get()["Any"] + format_version = cd[0] + vendor_id = cd[1] + product_id_array = cd[2] + device_type_id = cd[3] + certificate_id = cd[4] + security_level = cd[5] + security_info = cd[6] + version_number = cd[7] + certification_type = cd[8] + + asserts.assert_equal(format_version, 1, "Format version is incorrect") + asserts.assert_equal(vendor_id, basic_info_vendor_id, "Vendor ID is incorrect") + if not is_ci: + asserts.assert_in(vendor_id, range(1, 0xfff0), "Vendor ID is out of range") + asserts.assert_true(basic_info_product_id in product_id_array, "Product ID not found in CD product array") + asserts.assert_in(device_type_id, range(0, (2**31)-1), "Device type ID is out of range") + asserts.assert_equal(len(certificate_id), 19, "Certificate id is the incorrect length") + asserts.assert_equal(security_level, 0, "Incorrect value for security level") + asserts.assert_equal(security_info, 0, "Incorrect value for security information") + asserts.assert_in(version_number, range(0, 65535), "Version number out of range") + if is_ci: + asserts.assert_in(certification_type, [0, 1, 2], "Certification type is out of range") + else: + asserts.assert_in(certification_type, [1, 2], "Certification type is out of range") + + self.print_step("8.4", "Confirm that both dac_origin_vendor_id and dac_origin_product_id are present") + if not is_ci and pics_origin_vid: + asserts.assert_in(9, cd.keys(), "Origin vendor ID not found in cert") + asserts.assert_in(10, cd.keys(), "Origin product ID not found in cert") + + self.print_step("8.5", "Confirm that neither dac_origin_vendor_id nor dac_origin_product_id are present") + if not is_ci and not pics_origin_vid: + asserts.assert_not_in(9, cd.keys(), "Origin vendor ID found in cert") + asserts.assert_not_in(10, cd.keys(), "Origin product ID found in cert") + + dac_vid, dac_pid, pai_vid, pai_pid = parse_ids_from_certs(parsed_dac, parsed_pai) + + self.print_step("8.6", "Check origin PID/VID against DAC and PAI") + has_origin_vid = 9 in cd.keys() + has_origin_pid = 10 in cd.keys() + if not is_ci and has_origin_vid != pics_origin_vid: + asserts.fail("Origin VID in CD does not match PICS") + if not is_ci and has_origin_pid and not pics_origin_pid: + asserts.fail("Origin PID in CD does not match PICS") + if has_origin_pid != has_origin_vid: + asserts.fail("Found one of origin PID or VID in CD but not both") + + # If this is the CI, ignore the PICS, we're going to try many cases. + if has_origin_vid: + origin_vid = cd[9] + origin_pid = cd[10] + + asserts.assert_equal(dac_vid, origin_vid, "Origin Vendor ID in the CD does not match the Vendor ID in the DAC") + asserts.assert_equal(pai_vid, origin_vid, "Origin Vendor ID in the CD does not match the Vendor ID in the PAI") + asserts.assert_equal(dac_pid, origin_pid, "Origin Product ID in the CD does not match the Product ID in the DAC") + if pai_pid: + asserts.assert_equal(pai_pid, origin_pid, "Origin Product ID in the CD does not match the Product ID in the PAI") + + self.print_step("8.7", "Check CD PID/VID against DAC and PAI") + if not has_origin_vid: + asserts.assert_equal(dac_vid, vendor_id, "Vendor ID in the CD does not match the Vendor ID in the DAC") + asserts.assert_equal(pai_vid, vendor_id, "Vendor ID in the CD does not match the Vendor ID in the PAI") + asserts.assert_in(dac_pid, product_id_array, "Product ID from the DAC is not present in the PID list in the CD") + if pai_pid: + asserts.assert_in(pai_pid, product_id_array, "Product ID from the PAI is not present in the PID list in the CD") + + self.print_step("8.8", "Check PAAs") + has_paa_list = 11 in cd.keys() + if not is_ci and pics_paa_list != has_paa_list: + asserts.fail("PAA list does not match PICS") + + if has_paa_list: + akids = [ext.value.key_identifier for ext in parsed_pai.extensions if ext.oid == ExtensionOID.AUTHORITY_KEY_IDENTIFIER] + asserts.assert_equal(len(akids), 1, "PAI requires exactly one AuthorityKeyIdentifier") + paa_authority_list = cd[11] + asserts.assert_in(akids[0], paa_authority_list, "PAI AKID not found in the authority list") + + self.print_step("8.9", "Check signature") + signature_cd = bytes(signer_info['signature']) + # TODO: Cecille - this path needs to be set as an input + cert_dir = 'credentials/development/cd-certs' + certs = {} + for filename in os.listdir(cert_dir): + if '.der' not in filename: + continue + with open(os.path.join(cert_dir, filename), 'rb') as f: + cert = x509.load_der_x509_certificate(f.read()) + pub = cert.public_key() + ski = x509.SubjectKeyIdentifier.from_public_key(pub).digest + certs[ski] = pub + + asserts.assert_true(subject_key_identifier in certs.keys(), "Subject key identifier not found in CD certs") + try: + certs[subject_key_identifier].verify(signature=signature_cd, data=cd_tlv, + signature_algorithm=ec.ECDSA(hashes.SHA256())) + except InvalidSignature: + asserts.fail("Failed to verify CD signature against known CD public key") + + self.print_step(9, "Verify nonce") + asserts.assert_in(2, decoded.keys(), "Attestation nonce is not present in the attestation elements") + returned_nonce = decoded[2] + asserts.assert_equal(returned_nonce, nonce, "Returned attestation nonce does not match request nonce") + asserts.assert_equal(len(returned_nonce), 32, "Returned nonce is incorrect size") + + self.print_step(10, "Verify firmware") + has_firmware_information = 4 in decoded.keys() + if not is_ci and has_firmware_information != pics_firmware_info: + asserts.fail("PICS for firmware information does not match returned value") + if has_firmware_information: + try: + int(decoded[4], 16) + except ValueError: + asserts.fail("Firmware is not an octet string") + + self.print_step(11, "Verify that the signature for the attestation response is valid") + proxy = self.default_controller.GetConnectedDeviceSync(self.dut_node_id, False) + asserts.assert_equal(len(proxy.attestationChallenge), 16, "Attestation challenge is the wrong length") + attestation_tbs = elements + proxy.attestationChallenge + + # signature is a struct of r and s - see 3.5.3 + # Actual curve is secp256r1 / NIST P-256 per 2.7 + baselen = curve_by_name("NIST256p").baselen + signature_attestation_raw_r = int(hex_from_bytes(signature_attestation_raw[:baselen]), 16) + signature_attestation_raw_s = int(hex_from_bytes(signature_attestation_raw[baselen:]), 16) + + signature_attestation = utils.encode_dss_signature(signature_attestation_raw_r, signature_attestation_raw_s) + + parsed_dac.public_key().verify(signature=signature_attestation, data=attestation_tbs, + signature_algorithm=ec.ECDSA(hashes.SHA256())) + + self.print_step(12, "Send AttestationRequest with nonce > 32 bytes") + nonce = random.randbytes(33) + try: + await self.send_single_cmd(cmd=opcreds.Commands.AttestationRequest(attestationNonce=nonce)) + asserts.fail("Received Success response when an INVALID_COMMAND was expected") + except InteractionModelError as e: + asserts.assert_equal(e.status, Status.InvalidCommand, "Received incorrect error from AttestationRequest command") + + self.print_step(13, "Send AttestationRequest with nonce < 32 bytes") + nonce = random.randbytes(31) + try: + await self.send_single_cmd(cmd=opcreds.Commands.AttestationRequest(attestationNonce=nonce)) + asserts.fail("Received Success response when an INVALID_COMMAND was expected") + except InteractionModelError as e: + asserts.assert_equal(e.status, Status.InvalidCommand, "Received incorrect error from AttestationRequest command") + + +if __name__ == "__main__": + default_matter_test_main() diff --git a/src/python_testing/TC_DA_1_5.py b/src/python_testing/TC_DA_1_5.py new file mode 100644 index 00000000000000..f27d2f136e5687 --- /dev/null +++ b/src/python_testing/TC_DA_1_5.py @@ -0,0 +1,211 @@ +# +# Copyright (c) 2023 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import random + +import chip.clusters as Clusters +from chip import ChipDeviceCtrl +from chip.interaction_model import InteractionModelError, Status +from chip.tlv import TLVReader +from cryptography import x509 +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import ec, utils +from ecdsa.curves import curve_by_name +from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main, hex_from_bytes, type_matches +from mobly import asserts +from pyasn1.codec.der.decoder import decode as der_decoder +from pyasn1.error import PyAsn1Error +from pyasn1_modules import rfc2986, rfc3279, rfc5480 + + +class TC_DA_1_5(MatterBaseTest): + @async_test_body + async def test_TC_DA_1_5(self): + + opcreds = Clusters.Objects.OperationalCredentials + + gcomm = Clusters.Objects.GeneralCommissioning + + self.print_step(1, "Commissioning, already done") + + self.print_step(2, "Save attestation challenge") + proxy = self.default_controller.GetConnectedDeviceSync(self.dut_node_id, False) + attestation_challenge = proxy.attestationChallenge + + self.print_step(3, "Send CertificateChainRequest for DAC") + certtype = opcreds.Enums.CertificateChainTypeEnum.kDACCertificate + dac_resp = await self.send_single_cmd(cmd=opcreds.Commands.CertificateChainRequest(certificateType=certtype)) + asserts.assert_true(type_matches(dac_resp, opcreds.Commands.CertificateChainResponse), + "Certificate request returned incorrect type") + der_dac = dac_resp.certificate + # This throws an exception for a non-x509 cert + try: + dac = x509.load_der_x509_certificate(der_dac) + except ValueError: + asserts.assert_true(False, "Unable to parse certificate from CertificateChainResponse") + + self.print_step(4, "Send ArmFailSafe") + await self.send_single_cmd(cmd=gcomm.Commands.ArmFailSafe(expiryLengthSeconds=900, breadcrumb=1)) + + self.print_step(5, "Send CSRRequest") + csr_nonce = random.randbytes(32) + csr_resp = await self.send_single_cmd(cmd=opcreds.Commands.CSRRequest(CSRNonce=csr_nonce, isForUpdateNOC=False)) + nocsr_elements = csr_resp.NOCSRElements + nocsr_attestation_signature_raw = csr_resp.attestationSignature + + self.print_step(6, "Extract TLV") + decoded = TLVReader(nocsr_elements).get()["Any"] + # CSR is field 1, nonce is field 2 + asserts.assert_in(1, decoded.keys(), "CSR is not present in the NOSCRElements") + asserts.assert_in(2, decoded.keys(), "Nonce is not present in the NOSCRElements") + csr_raw = decoded[1] + csr_nonce_returned = decoded[2] + vendor1 = None + vendor2 = None + vendor3 = None + if 3 in decoded.keys(): + vendor1 = decoded[3] + if 4 in decoded.keys(): + vendor2 = decoded[4] + if 5 in decoded.keys(): + vendor3 = decoded[5] + + # Verify that length of nocsr_elements is <= 900 + asserts.assert_less_equal(len(nocsr_elements), 900, "NOCSRElements is more than 900 bytes") + + # Verify der encoded and PKCS #10 (rfc2986 is PKCS #10) - next two requirements + try: + temp, _ = der_decoder(csr_raw, asn1Spec=rfc2986.CertificationRequest()) + except PyAsn1Error: + asserts.fail("Unable to decode CSR - improperly formatted DER file") + + layer1 = dict(temp) + info = dict(layer1['certificationRequestInfo']) + + # Verify public key is id-ecPublicKey with prime256v1 + requested_pk_algo = dict(dict(info['subjectPKInfo'])['algorithm']) + asserts.assert_equal(requested_pk_algo['algorithm'], rfc5480.id_ecPublicKey, "Incorrect public key algorithm") + der_parameters = requested_pk_algo['parameters'] + temp, _ = der_decoder(bytes(der_parameters), asn1Spec=rfc3279.EcpkParameters()) + parameters = dict(temp) + asserts.assert_in('namedCurve', parameters.keys(), "Unable to find namedCurve in EcpkParameters") + asserts.assert_equal(parameters['namedCurve'], rfc3279.prime256v1, "Incorrect curve specified for public key algorithm") + + # Verify public key is 256 bytes + csr = x509.load_der_x509_csr(csr_raw) + csr_pubkey = csr.public_key() + asserts.assert_equal(csr_pubkey.key_size, 256, "Incorrect key size") + + # Verify signature algorithm is ecdsa-with-SHA156 + signature_algorithm = dict(layer1['signatureAlgorithm'])['algorithm'] + asserts.assert_equal(signature_algorithm, rfc5480.ecdsa_with_SHA256, "CSR specifies incorrect signature key algorithm") + + # Verify signature is valid + asserts.assert_true(csr.is_signature_valid, "Signature is invalid") + + # Verify csr_nonce_returned is octet string of length 32 + try: + # csr_nonce_returned is an octet string if it can be converted to an int + int(hex_from_bytes(csr_nonce_returned), 16) + except ValueError: + asserts.fail("Returned CSR nonce is not an octet string") + + # Verify returned nonce matches sent nonce + asserts.assert_equal(csr_nonce_returned, csr_nonce, "Returned nonce is incorrect") + + nocsr_tbs = nocsr_elements + attestation_challenge + + self.print_step(7, "Verify signature") + baselen = curve_by_name("NIST256p").baselen + attestation_raw_r = int(hex_from_bytes(nocsr_attestation_signature_raw[:baselen]), 16) + attestation_raw_s = int(hex_from_bytes(nocsr_attestation_signature_raw[baselen:]), 16) + + nocsr_attestation = utils.encode_dss_signature(attestation_raw_r, attestation_raw_s) + + dac.public_key().verify(signature=nocsr_attestation, data=nocsr_tbs, signature_algorithm=ec.ECDSA(hashes.SHA256())) + + self.print_step(8, "Verify that attestation challenge does not appear in the vendor fields") + if vendor1: + asserts.assert_not_in(attestation_challenge, vendor1, "Attestation challenge appears in vendor 1") + if vendor2: + asserts.assert_not_in(attestation_challenge, vendor2, "Attestation challenge appears in vendor 2") + if vendor3: + asserts.assert_not_in(attestation_challenge, vendor3, "Attestation challenge appears in vendor 3") + + self.print_step(9, "Disarm failsafe") + await self.send_single_cmd(cmd=gcomm.Commands.ArmFailSafe(expiryLengthSeconds=0, breadcrumb=1)) + + self.print_step(10, "Arm failsafe to 900s") + await self.send_single_cmd(cmd=gcomm.Commands.ArmFailSafe(expiryLengthSeconds=900, breadcrumb=1)) + + self.print_step(11, "Send CSRRequest wtih 31-byte nonce") + bad_nonce = random.randbytes(32) + try: + await self.send_single_cmd(cmd=opcreds.Commands.CSRRequest(CSRNonce=bad_nonce, isForUpdateNOC=False)) + except InteractionModelError as e: + asserts.assert_equal(e.status, Status.InvalidCommand, "Received incorrect error from CSRRequest command with bad nonce") + + self.print_step(12, "Disarm failsafe") + await self.send_single_cmd(cmd=gcomm.Commands.ArmFailSafe(expiryLengthSeconds=0, breadcrumb=1)) + + self.print_step(13, "Open commissioning window") + pin, _ = self.default_controller.OpenCommissioningWindow( + nodeid=self.dut_node_id, timeout=600, iteration=10000, discriminator=1234, option=1) + + self.print_step(14, "Commission to TH2") + new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority() + new_fabric_admin = new_certificate_authority.NewFabricAdmin(vendorId=0xFFF1, fabricId=2) + TH2 = new_fabric_admin.NewController(nodeId=112233) + + success, _ = TH2.CommissionOnNetwork( + nodeId=self.dut_node_id, setupPinCode=pin, + filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=1234) + asserts.assert_true(success, 'Commissioning on TH2 did not complete successfully') + + self.print_step(15, "Read NOCs list for TH1") + temp = await self.read_single_attribute_check_success( + cluster=Clusters.OperationalCredentials, + attribute=Clusters.OperationalCredentials.Attributes.NOCs) + asserts.assert_equal(len(temp), 1, "Returned NOC list does not contain one entry") + th1_noc = temp[0].noc + + self.print_step(16, "Read NOCs list for TH2") + temp = await self.read_single_attribute_check_success( + cluster=Clusters.OperationalCredentials, + attribute=Clusters.OperationalCredentials.Attributes.NOCs, dev_ctrl=TH2) + asserts.assert_equal(len(temp), 1, "Returned NOC list does not contain one entry") + th2_noc = temp[0].noc + + self.print_step(17, "Extract the public keys") + # NOCs are TLV encoded, public key is field 9 + th1_decoded = TLVReader(th1_noc).get()["Any"] + th2_decoded = TLVReader(th2_noc).get()["Any"] + + th1_pk = th1_decoded[9] + th2_pk = th2_decoded[9] + asserts.assert_not_equal(th1_pk, th2_pk, "Publc keys are the same") + + self.print_step(17, "Read the fabric index for TH2") + th2_idx = await self.read_single_attribute_check_success( + Clusters.OperationalCredentials, + attribute=Clusters.OperationalCredentials.Attributes.CurrentFabricIndex, dev_ctrl=TH2) + + self.print_step(18, "Remove TH2") + await self.send_single_cmd(cmd=Clusters.OperationalCredentials.Commands.RemoveFabric(fabricIndex=th2_idx)) + + +if __name__ == "__main__": + default_matter_test_main() diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py index 366c71f3ad12c0..fd8bece2a64dd4 100644 --- a/src/python_testing/matter_testing_support.py +++ b/src/python_testing/matter_testing_support.py @@ -400,8 +400,8 @@ async def send_single_cmd( result = await dev_ctrl.SendCommand(nodeid=node_id, endpoint=endpoint, payload=cmd, timedRequestTimeoutMs=timedRequestTimeoutMs) return result - def print_step(self, stepnum: int, title: str) -> None: - logging.info('***** Test Step %d : %s', stepnum, title) + def print_step(self, stepnum: typing.Union[int, str], title: str) -> None: + logging.info('***** Test Step {} : {}'.format(stepnum, title)) def generate_mobly_test_config(matter_test_config: MatterTestConfig): @@ -560,6 +560,8 @@ def root_index(s: str) -> int: def populate_commissioning_args(args: argparse.Namespace, config: MatterTestConfig) -> bool: + config.dut_node_id = args.dut_node_id + if args.commissioning_method is None: return True @@ -569,7 +571,6 @@ def populate_commissioning_args(args: argparse.Namespace, config: MatterTestConf if args.dut_node_id is None: print("error: When --commissioning-method present, --dut-node-id is mandatory!") return False - config.dut_node_id = args.dut_node_id if args.discriminator is None and (args.qr_code is None and args.manual_code is None): print("error: Missing --discriminator when no --qr-code/--manual-code present!") diff --git a/src/python_testing/test_testing/test_TC_DA_1_2.py b/src/python_testing/test_testing/test_TC_DA_1_2.py new file mode 100755 index 00000000000000..52a45407ba4aa6 --- /dev/null +++ b/src/python_testing/test_testing/test_TC_DA_1_2.py @@ -0,0 +1,99 @@ +#!/usr/bin/env -S python3 -B +# +# Copyright (c) 2023 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import json +import os +import subprocess + +CHIP_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../..')) +RUNNER_SCRIPT_DIR = os.path.join(CHIP_ROOT, 'scripts/tests') + + +def run_single_test(dac_provider: str, product_id: int, factory_reset: bool = False) -> int: + + reset = "" + if factory_reset: + reset = ' --factoryreset' + + app = os.path.join(CHIP_ROOT, 'out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app') + + # Certs in the commissioner_dut directory use 0x8000 as the PID + app_args = '--discriminator 1234 --KVS kvs1 --product-id ' + \ + str(product_id) + ' --vendor-id 65521 --dac_provider ' + dac_provider + + ci_pics_values = os.path.abspath(os.path.join(CHIP_ROOT, 'src/app/tests/suites/certification/ci-pics-values')) + script_args = '--storage-path admin_storage.json --discriminator 1234 --passcode 20202021 --dut-node-id 1 --PICS ' + \ + str(ci_pics_values) + + # for any test with a dac_provider, we don't want to recommission because there's a chance the + # dac could be wrong and the commissioning would fail. Rely on the original commissioning. This is also faster. + if factory_reset: + script_args = script_args + ' --commissioning-method on-network' + + script = os.path.abspath(os.path.join(CHIP_ROOT, 'src/python_testing/TC_DA_1_2.py')) + + # run_python_test uses click so call as a command + run_python_test = os.path.abspath(os.path.join(RUNNER_SCRIPT_DIR, 'run_python_test.py')) + cmd = str(run_python_test) + reset + ' --app ' + str(app) + ' --app-args "' + \ + app_args + '" --script ' + str(script) + ' --script-args "' + script_args + '"' + + return subprocess.call(cmd, shell=True) + + +def main(): + cert_path = os.path.abspath(os.path.join(CHIP_ROOT, 'credentials/development/commissioner_dut')) + + # Commission first using a known good set, then run the rest of the tests without recommissioning + path = str(os.path.join(cert_path, "struct_cd_authorized_paa_list_count1_valid/test_case_vector.json")) + run_single_test(path, 32768, factory_reset=True) + + test_cases = {'struct_cd': 32768, 'fallback_encoding': 177} + + # struct_cd_version_number_wrong - excluded because this is a DCL test not covered by cert + # struct_cd_cert_id_mismatch - excluded because this is a DCL test not covered by cert + exclude_cases = ['struct_cd_version_number_wrong', 'struct_cd_cert_id_mismatch'] + + passes = [] + for p in os.listdir(cert_path): + matches = list(filter(lambda t: t in str(p), test_cases.keys())) + if len(matches) != 1: + continue + + if str(p) in exclude_cases: + continue + + path = str(os.path.join(cert_path, p, 'test_case_vector.json')) + with open(path, 'r') as f: + j = json.loads(f.read()) + success_expected = j['is_success_case'].lower() == 'true' + + ret = run_single_test(path, test_cases[matches[0]]) + passes.append((str(p), ret, success_expected)) + + retval = 0 + for p in passes: + success = p[1] == 0 + if p[2] != success: + print('INCORRECT: ' + p[0]) + retval = 1 + + return retval + + +if __name__ == '__main__': + main() diff --git a/src/python_testing/test_testing/test_TC_DA_1_5.py b/src/python_testing/test_testing/test_TC_DA_1_5.py new file mode 100755 index 00000000000000..126814ca615f29 --- /dev/null +++ b/src/python_testing/test_testing/test_TC_DA_1_5.py @@ -0,0 +1,85 @@ +#!/usr/bin/env -S python3 -B +# +# Copyright (c) 2023 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import subprocess + +CHIP_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../..')) +RUNNER_SCRIPT_DIR = os.path.join(CHIP_ROOT, 'scripts/tests') + + +def run_single_test(flag: str, factory_reset: bool = False) -> int: + + reset = "" + if factory_reset: + reset = ' --factoryreset' + + app = os.path.join(CHIP_ROOT, 'out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app') + + # Certs in the commissioner_dut directory use 0x8000 as the PID + app_args = '--discriminator 1234 --KVS kvs1 ' + flag + + ci_pics_values = os.path.abspath(os.path.join(CHIP_ROOT, 'src/app/tests/suites/certification/ci-pics-values')) + script_args = '--storage-path admin_storage.json --discriminator 1234 --passcode 20202021 --dut-node-id 1 --PICS ' + \ + str(ci_pics_values) + + # for any test with a dac_provider, we don't want to recommission because there's a chance the + # dac could be wrong and the commissioning would fail. Rely on the original commissioning. This is also faster. + if factory_reset: + script_args = script_args + ' --commissioning-method on-network' + + script = os.path.abspath(os.path.join(CHIP_ROOT, 'src/python_testing/TC_DA_1_5.py')) + + # run_python_test uses click so call as a command + run_python_test = os.path.abspath(os.path.join(RUNNER_SCRIPT_DIR, 'run_python_test.py')) + cmd = str(run_python_test) + reset + ' --app ' + str(app) + ' --app-args "' + \ + app_args + '" --script ' + str(script) + ' --script-args "' + script_args + '"' + + return subprocess.call(cmd, shell=True) + + +def main(): + # Commission first using a known good set, then run the rest of the tests without recommissioning + passes = [] + # test flag, test result, success expected + passes.append(("", run_single_test("", factory_reset=True), True)) + + failure_flags = ['--cert_error_csr_incorrect_type', + '--cert_error_csr_existing_keypair', + '--cert_error_csr_nonce_incorrect_type', + '--cert_error_csr_nonce_too_long', + '--cert_error_csr_nonce_invalid', + '--cert_error_nocsrelements_too_long', + '--cert_error_attestation_signature_incorrect_type', + '--cert_error_attestation_signature_invalid'] + + for f in failure_flags: + passes.append((f, run_single_test(f), False)) + + retval = 0 + for p in passes: + success = p[1] == 0 + if p[2] != success: + print('INCORRECT: ' + p[0]) + retval = 1 + + return retval + + +if __name__ == '__main__': + main()