From 5350fa9bb5d67b79d652a57b766ed1b1167a92eb Mon Sep 17 00:00:00 2001 From: Tennessee Carmel-Veilleux Date: Wed, 10 Aug 2022 10:54:13 -0400 Subject: [PATCH] Introduce TC-DA-1.7 Python test (#21775) - Test TC-DA-1.7 is very error-prone with chip-tool due to large amount of complex manual operations on manual log extractions of certificate values. Fixes #21735 This PR: - Adds a version of TC-DA-1.7 in Python - **Touches no C++ SDK code** Testing done: - Ran the test as it could be done by an end-user, passed on Linux and on ESP32 To run: - Build all-clusters app Linux: - `scripts/examples/gn_build_example.sh examples/all-clusters-app/linux out/debug/standalone chip_config_network_layer_ble=false` - In a shell, run: `clear && rm -f kvs1 && out/debug/standalone/chip-all-clusters-app --discriminator 3840 --KVS kvs1` - Build the Python environment, activate it, then run the test - `./scripts/build_python.sh -m platform -i separate` - `. ./out/python_env/bin/activate` - Run the test: `rm -f admin_storage.json && python src/python_testing/TC_DA_1_7.py -m on-network -d 3840 -p 20202021` - Add `--bool-arg allow_sdk_dac:true` to the end of the command line if running against a device that has DACs chaining to SDK PAAs (which are forbidden by TC-DA-1.7 step 4 check 2) --- TC_DA_1_7.py | 125 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 125 insertions(+) create mode 100644 TC_DA_1_7.py diff --git a/TC_DA_1_7.py b/TC_DA_1_7.py new file mode 100644 index 00000000000000..d36aa4e241eddc --- /dev/null +++ b/TC_DA_1_7.py @@ -0,0 +1,125 @@ +# +# Copyright (c) 2022 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from matter_testing_support import MatterBaseTest, default_matter_test_main, async_test_body +from matter_testing_support import hex_from_bytes, bytes_from_hex +from chip.interaction_model import Status +import chip.clusters as Clusters +import logging +from mobly import asserts +from pathlib import Path +from glob import glob +from cryptography.x509 import load_der_x509_certificate, SubjectKeyIdentifier, AuthorityKeyIdentifier, Certificate +from cryptography.exceptions import InvalidSignature +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives.serialization import PublicFormat, Encoding +from typing import Optional + +FORBIDDEN_AKID = [ + bytes_from_hex("78:5C:E7:05:B8:6B:8F:4E:6F:C7:93:AA:60:CB:43:EA:69:68:82:D5"), + bytes_from_hex("6A:FD:22:77:1F:51:1F:EC:BF:16:41:97:67:10:DC:DC:31:A1:71:7E") +] + + +def load_all_paa(paa_path: Path) -> dict: + logging.info("Loading all PAAs in %s" % paa_path) + + paa_by_skid = {} + for filename in glob(str(paa_path.joinpath("*.der"))): + with open(filename, "rb") as derfile: + # Load cert + paa_der = derfile.read() + paa_cert = load_der_x509_certificate(paa_der) + + # Find the subject key identifier (if present), and record it + for extension in paa_cert.extensions: + if extension.oid == SubjectKeyIdentifier.oid: + skid = extension.value.key_identifier + paa_by_skid[skid] = (Path(filename).name, paa_cert) + + return paa_by_skid + + +def extract_akid(cert: Certificate) -> Optional[bytes]: + # Find the authority key identifier (if present) + for extension in cert.extensions: + if extension.oid == AuthorityKeyIdentifier.oid: + return extension.value.key_identifier + else: + return None + + +class TC_DA_1_7(MatterBaseTest): + @async_test_body + async def test_TC_DA_1_7(self): + # Option to allow SDK roots (skip step 4 check 2) + allow_sdk_dac = self.user_params.get("allow_sdk_dac", False) + + logging.info("Pre-condition: load all PAAs SKIDs") + conf = self.matter_test_config + paa_by_skid = load_all_paa(conf.paa_trust_store_path) + logging.info("Found %d PAAs" % len(paa_by_skid)) + + logging.info("Step 1: Commissioning, already done") + dev_ctrl = self.default_controller + + logging.info("Step 2: Get PAI of DUT1 with certificate chain request") + result = await dev_ctrl.SendCommand(self.dut_node_id, 0, Clusters.OperationalCredentials.Commands.CertificateChainRequest(2)) + pai_1 = result.certificate + asserts.assert_less_equal(len(pai_1), 600, "PAI cert must be at most 600 bytes") + self.record_data({"pai_1": hex_from_bytes(pai_1)}) + + logging.info("Step 3: Get DAC of DUT1 with certificate chain request") + result = await dev_ctrl.SendCommand(self.dut_node_id, 0, Clusters.OperationalCredentials.Commands.CertificateChainRequest(1)) + dac_1 = result.certificate + asserts.assert_less_equal(len(dac_1), 600, "DAC cert must be at most 600 bytes") + self.record_data({"dac_1": hex_from_bytes(dac_1)}) + + logging.info("Step 4 check 1: Ensure PAI's AKID matches a PAA and signature is valid") + pai1_cert = load_der_x509_certificate(pai_1) + pai1_akid = extract_akid(pai1_cert) + if pai1_akid not in paa_by_skid: + asserts.fail("DUT1's PAI (%s) not matched in PAA trust store" % hex_from_bytes(pai1_akid)) + + filename, paa_cert = paa_by_skid[pai1_akid] + logging.info("Matched PAA file %s, subject: %s" % (filename, paa_cert.subject)) + public_key = paa_cert.public_key() + + try: + public_key.verify(signature=pai1_cert.signature, data=pai1_cert.tbs_certificate_bytes, + signature_algorithm=ec.ECDSA(hashes.SHA256())) + except InvalidSignature as e: + asserts.fail("Failed to verify PAI signature against PAA public key: %s" % str(e)) + logging.info("Validated PAI signature against PAA") + + logging.info("Step 4 check 2: Verify PAI AKID not in denylist of SDK PAIs") + if allow_sdk_dac: + logging.warn("===> TEST STEP SKIPPED: Allowing SDK DACs!") + else: + for candidate in FORBIDDEN_AKID: + asserts.assert_not_equal(hex_from_bytes(pai1_akid), hex_from_bytes(candidate), "PAI AKID must not be in denylist") + + logging.info("Step 5: Extract subject public key of DAC and save") + dac1_cert = load_der_x509_certificate(dac_1) + pk_1 = dac1_cert.public_key().public_bytes(encoding=Encoding.X962, format=PublicFormat.UncompressedPoint) + logging.info("Subject public key pk_1: %s" % hex_from_bytes(pk_1)) + self.record_data({"pk_1": hex_from_bytes(pk_1)}) + + +if __name__ == "__main__": + default_matter_test_main()