Skip to content

Commit

Permalink
Introduce TC-DA-1.7 Python test
Browse files Browse the repository at this point in the history
- Test TC-DA-1.7 is very error-prone with chip-tool due to
  large amount of complex manual operations on manual log
  extractions of certificate values.

Fixes project-chip#21735

This PR:
- Adds a version of TC-DA-1.7 in Python
- **Touches no C++ SDK code**

Testing done:

- Ran the test as it could be done by an end-user, passed on Linux
  and on ESP32

To run:

- Build all-clusters app Linux:
  - `scripts/examples/gn_build_example.sh examples/all-clusters-app/linux out/debug/standalone chip_config_network_layer_ble=false`
  - In a shell, run: `clear && rm -f kvs1 && out/debug/standalone/chip-all-clusters-app --discriminator 3840 --KVS kvs1`
- Build the Python environment, activate it, then run the test
  - `./scripts/build_python.sh -m platform -i separate`
  - `. ./out/python_env/bin/activate`
  - Run the test: `rm -f admin_storage.json && python src/python_testing/TC_DA_1_7.py -m on-network -d 3840 -p 20202021`
    - Add `--bool-arg allow_sdk_dac:true` to the end of the command line if running against a device that
      has DACs chaining to SDK PAAs (which are forbidden by TC-DA-1.7 step 4 check 2)
  • Loading branch information
tcarmelveilleux authored and woody-apple committed Aug 10, 2022
1 parent 9b85c7a commit e9b797e
Showing 1 changed file with 125 additions and 0 deletions.
125 changes: 125 additions & 0 deletions TC_DA_1_7.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
#
# Copyright (c) 2022 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from matter_testing_support import MatterBaseTest, default_matter_test_main, async_test_body
from matter_testing_support import hex_from_bytes, bytes_from_hex
from chip.interaction_model import Status
import chip.clusters as Clusters
import logging
from mobly import asserts
from pathlib import Path
from glob import glob
from cryptography.x509 import load_der_x509_certificate, SubjectKeyIdentifier, AuthorityKeyIdentifier, Certificate
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.serialization import PublicFormat, Encoding
from typing import Optional

FORBIDDEN_AKID = [
bytes_from_hex("78:5C:E7:05:B8:6B:8F:4E:6F:C7:93:AA:60:CB:43:EA:69:68:82:D5"),
bytes_from_hex("6A:FD:22:77:1F:51:1F:EC:BF:16:41:97:67:10:DC:DC:31:A1:71:7E")
]


def load_all_paa(paa_path: Path) -> dict:
logging.info("Loading all PAAs in %s" % paa_path)

paa_by_skid = {}
for filename in glob(str(paa_path.joinpath("*.der"))):
with open(filename, "rb") as derfile:
# Load cert
paa_der = derfile.read()
paa_cert = load_der_x509_certificate(paa_der)

# Find the subject key identifier (if present), and record it
for extension in paa_cert.extensions:
if extension.oid == SubjectKeyIdentifier.oid:
skid = extension.value.key_identifier
paa_by_skid[skid] = (Path(filename).name, paa_cert)

return paa_by_skid


def extract_akid(cert: Certificate) -> Optional[bytes]:
# Find the authority key identifier (if present)
for extension in cert.extensions:
if extension.oid == AuthorityKeyIdentifier.oid:
return extension.value.key_identifier
else:
return None


class TC_DA_1_7(MatterBaseTest):
@async_test_body
async def test_TC_DA_1_7(self):
# Option to allow SDK roots (skip step 4 check 2)
allow_sdk_dac = self.user_params.get("allow_sdk_dac", False)

logging.info("Pre-condition: load all PAAs SKIDs")
conf = self.matter_test_config
paa_by_skid = load_all_paa(conf.paa_trust_store_path)
logging.info("Found %d PAAs" % len(paa_by_skid))

logging.info("Step 1: Commissioning, already done")
dev_ctrl = self.default_controller

logging.info("Step 2: Get PAI of DUT1 with certificate chain request")
result = await dev_ctrl.SendCommand(self.dut_node_id, 0, Clusters.OperationalCredentials.Commands.CertificateChainRequest(2))
pai_1 = result.certificate
asserts.assert_less_equal(len(pai_1), 600, "PAI cert must be at most 600 bytes")
self.record_data({"pai_1": hex_from_bytes(pai_1)})

logging.info("Step 3: Get DAC of DUT1 with certificate chain request")
result = await dev_ctrl.SendCommand(self.dut_node_id, 0, Clusters.OperationalCredentials.Commands.CertificateChainRequest(1))
dac_1 = result.certificate
asserts.assert_less_equal(len(dac_1), 600, "DAC cert must be at most 600 bytes")
self.record_data({"dac_1": hex_from_bytes(dac_1)})

logging.info("Step 4 check 1: Ensure PAI's AKID matches a PAA and signature is valid")
pai1_cert = load_der_x509_certificate(pai_1)
pai1_akid = extract_akid(pai1_cert)
if pai1_akid not in paa_by_skid:
asserts.fail("DUT1's PAI (%s) not matched in PAA trust store" % hex_from_bytes(pai1_akid))

filename, paa_cert = paa_by_skid[pai1_akid]
logging.info("Matched PAA file %s, subject: %s" % (filename, paa_cert.subject))
public_key = paa_cert.public_key()

try:
public_key.verify(signature=pai1_cert.signature, data=pai1_cert.tbs_certificate_bytes,
signature_algorithm=ec.ECDSA(hashes.SHA256()))
except InvalidSignature as e:
asserts.fail("Failed to verify PAI signature against PAA public key: %s" % str(e))
logging.info("Validated PAI signature against PAA")

logging.info("Step 4 check 2: Verify PAI AKID not in denylist of SDK PAIs")
if allow_sdk_dac:
logging.warn("===> TEST STEP SKIPPED: Allowing SDK DACs!")
else:
for candidate in FORBIDDEN_AKID:
asserts.assert_not_equal(hex_from_bytes(pai1_akid), hex_from_bytes(candidate), "PAI AKID must not be in denylist")

logging.info("Step 5: Extract subject public key of DAC and save")
dac1_cert = load_der_x509_certificate(dac_1)
pk_1 = dac1_cert.public_key().public_bytes(encoding=Encoding.X962, format=PublicFormat.UncompressedPoint)
logging.info("Subject public key pk_1: %s" % hex_from_bytes(pk_1))
self.record_data({"pk_1": hex_from_bytes(pk_1)})


if __name__ == "__main__":
default_matter_test_main()

0 comments on commit e9b797e

Please sign in to comment.