Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dac-revocation: Some refactoring to revocation set generation script #33520

Merged
merged 8 commits into from
May 24, 2024
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 120 additions & 63 deletions credentials/generate-revocation-set.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/python
#!/usr/bin/env python3

#
# Copyright (c) 2023 Project CHIP Authors
# Copyright (c) 2023-2024 Project CHIP Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,6 +30,7 @@
import requests
from click_option_group import RequiredMutuallyExclusiveOptionGroup, optgroup
from cryptography import x509
from cryptography.hazmat.primitives.asymmetric import ec


class RevocationType(Enum):
Expand All @@ -44,20 +45,61 @@ class RevocationType(Enum):
TEST_NODE_URL_REST = "https://on.test-net.dcl.csa-iot.org"


def use_dcld(dcld, production, cmdlist):
return [dcld] + cmdlist + (['--node', PRODUCTION_NODE_URL] if production else [])


def extract_single_integer_attribute(subject, oid):
attribute_list = subject.get_attributes_for_oid(oid)

if len(attribute_list) == 1:
if attribute_list[0].value.isdigit():
return int(attribute_list[0].value)
return int(attribute_list[0].value, 16)

return None


class DCLDRequestsHelper:
def __init__(self, use_rest, dcld, production, rest_node_url):
self.use_rest = use_rest
shubhamdp marked this conversation as resolved.
Show resolved Hide resolved
self.dcld = dcld
self.production = production
self.rest_node_url = rest_node_url

def use_dcld(self, dcld, production, cmdlist):
return [dcld] + cmdlist + (['--node', PRODUCTION_NODE_URL] if production else [])
shubhamdp marked this conversation as resolved.
Show resolved Hide resolved

def get_dcld_cmd_output(self, cmdlist):
# Set the output as JSON
shubhamdp marked this conversation as resolved.
Show resolved Hide resolved
subprocess.Popen([self.dcld, 'config', 'output', 'json'])

cmdpipe = subprocess.Popen(self.use_dcld(self.dcld, self.production, cmdlist),
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return json.loads(cmdpipe.stdout.read())

def get_revocation_points(self):
if self.use_rest:
response = requests.get(f"{self.rest_node_url}/dcl/pki/revocation-points").json()
else:
response = self.get_dcld_cmd_output(['query', 'pki', 'all-revocation-points'])

return response["PkiRevocationDistributionPoint"]

def get_paa_cert_for_crl_issuer(self, crl_signer_issuer_name, crl_signer_authority_key_id):
if self.use_rest:
shubhamdp marked this conversation as resolved.
Show resolved Hide resolved
response = requests.get(
f"{self.rest_node_url}/dcl/pki/certificates/{crl_signer_issuer_name}/{crl_signer_authority_key_id}").json()
else:
response = self.get_dcld_cmd_output(
['query', 'pki', 'x509-cert', '-u', crl_signer_issuer_name, '-k', crl_signer_authority_key_id])

return response["approvedCertificates"]["certs"][0]["pemCert"]

def get_revocations_points_by_skid(self, issuer_subject_key_id):
if self.use_rest:
response = requests.get(f"{self.rest_node_url}/dcl/pki/revocation-points/{issuer_subject_key_id}").json()
else:
response = self.get_dcld_cmd_output(['query', 'pki', 'revocation-points',
'--issuer-subject-key-id', issuer_subject_key_id])

return response["pkiRevocationDistributionPointsByIssuerSubjectKeyID"]["points"]


@click.command()
@click.help_option('-h', '--help')
@optgroup.group('Input data sources', cls=RequiredMutuallyExclusiveOptionGroup)
Expand All @@ -68,7 +110,7 @@ def extract_single_integer_attribute(subject, oid):
@optgroup.group('Optional arguments')
@optgroup.option('--output', default='sample_revocation_set_list.json', type=str, metavar='FILEPATH', help="Output filename (default: sample_revocation_set_list.json)")
def main(use_main_net_dcld, use_test_net_dcld, use_main_net_http, use_test_net_http, output):
"""DCL PAA mirroring tools"""
"""Tool to construct revocation set from DCL"""

production = False
dcld = use_test_net_dcld
Expand All @@ -83,28 +125,20 @@ def main(use_main_net_dcld, use_test_net_dcld, use_main_net_http, use_test_net_h

rest_node_url = PRODUCTION_NODE_URL_REST if production else TEST_NODE_URL_REST

# TODO: Extract this to a helper function
if use_rest:
revocation_point_list = requests.get(f"{rest_node_url}/dcl/pki/revocation-points").json()["PkiRevocationDistributionPoint"]
else:
cmdlist = ['config', 'output', 'json']
subprocess.Popen([dcld] + cmdlist)

cmdlist = ['query', 'pki', 'all-revocation-points']
dcld_helper = DCLDRequestsHelper(use_rest, dcld, production, rest_node_url)

shubhamdp marked this conversation as resolved.
Show resolved Hide resolved
cmdpipe = subprocess.Popen(use_dcld(dcld, production, cmdlist), stdout=subprocess.PIPE, stderr=subprocess.PIPE)

revocation_point_list = json.loads(cmdpipe.stdout.read())["PkiRevocationDistributionPoint"]
revocation_point_list = dcld_helper.get_revocation_points()

revocation_set = []

for revocation_point in revocation_point_list:
# 1. Validate Revocation Type
if revocation_point["revocationType"] != RevocationType.CRL:
if revocation_point["revocationType"] != RevocationType.CRL.value:
print("Revocation Type is not CRL, continue...")
continue

# 2. Parse the certificate
crl_signer_certificate = x509.load_pem_x509_certificate(revocation_point["crlSignerCertificate"])
crl_signer_certificate = x509.load_pem_x509_certificate(bytes(revocation_point["crlSignerCertificate"], 'utf-8'))

vid = revocation_point["vid"]
pid = revocation_point["pid"]
Expand All @@ -119,12 +153,15 @@ def main(use_main_net_dcld, use_test_net_dcld, use_main_net_http, use_test_net_h
if crl_vid is not None:
if vid != crl_vid:
# TODO: Need to log all situations where a continue is called
print("VID is not CRL VID, continue...")
continue
else:
if crl_vid is None or vid != crl_vid:
print("VID is not CRL VID, continue...")
continue
if crl_pid is not None:
if pid != crl_pid:
print("PID is not CRL PID, continue...")
continue
shubhamdp marked this conversation as resolved.
Show resolved Hide resolved

# 5. Validate the certification path containing CRLSignerCertificate.
Expand All @@ -133,99 +170,119 @@ def main(use_main_net_dcld, use_test_net_dcld, use_main_net_http, use_test_net_h
crl_signer_authority_key_id = crl_signer_certificate.extensions.get_extension_for_oid(
x509.OID_AUTHORITY_KEY_IDENTIFIER).value.key_identifier

paa_certificate = None
# Convert CRL Signer AKID to colon separated hex
crl_signer_authority_key_id = crl_signer_authority_key_id.hex().upper()
crl_signer_authority_key_id = ':'.join([crl_signer_authority_key_id[i:i+2]
for i in range(0, len(crl_signer_authority_key_id), 2)])

# TODO: Extract this to a helper function
if use_rest:
response = requests.get(
f"{rest_node_url}/dcl/pki/certificates/{crl_signer_issuer_name}/{crl_signer_authority_key_id}").json()["approvedCertificates"]["certs"][0]
paa_certificate = response["pemCert"]
else:
cmdlist = ['query', 'pki', 'x509-cert', '-u', crl_signer_issuer_name, '-k', crl_signer_authority_key_id]
cmdpipe = subprocess.Popen(use_dcld(dcld, production, cmdlist), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
paa_certificate = json.loads(cmdpipe.stdout.read())["approvedCertificates"]["certs"][0]["pemCert"]
paa_certificate = dcld_helper.get_paa_cert_for_crl_issuer(crl_signer_issuer_name, crl_signer_authority_key_id)

if paa_certificate is None:
print("PAA Certificate not found, continue...")
continue

paa_certificate_object = x509.load_pem_x509_certificate(paa_certificate)
paa_certificate_object = x509.load_pem_x509_certificate(bytes(paa_certificate, 'utf-8'))

# This requires cryptography 40.0.0 and this haults on 40.0.0
# try:
# crl_signer_certificate.verify_directly_issued_by(paa_certificate_object)
# except Exception as e:
# print("CRL Signer Certificate is not issued by PAA Certificate, continue...")
# continue

shubhamdp marked this conversation as resolved.
Show resolved Hide resolved
# Verify issuer matches with subject
if crl_signer_certificate.issuer != paa_certificate_object.subject:
print("CRL Signer Certificate issuer does not match with PAA Certificate subject, continue...")
continue

# Check crl signers AKID matches with SKID of paa_certificate_object's AKID
paa_skid = paa_certificate_object.extensions.get_extension_for_oid(x509.OID_SUBJECT_KEY_IDENTIFIER).value.key_identifier
crl_akid = crl_signer_certificate.extensions.get_extension_for_oid(x509.OID_AUTHORITY_KEY_IDENTIFIER).value.key_identifier
if paa_skid != crl_akid:
print("CRL Signer's AKID does not match with PAA Certificate SKID, continue...")
continue

# verify if PAA singed the crl's public key
try:
crl_signer_certificate.verify_directly_issued_by(paa_certificate_object)
except Exception:
paa_certificate_object.public_key().verify(crl_signer_certificate.signature,
crl_signer_certificate.tbs_certificate_bytes,
ec.ECDSA(crl_signer_certificate.signature_hash_algorithm))
except Exception as e:
print("CRL Signer Certificate is not signed by PAA Certificate, continue...")
print("Error: ", e)
continue

# 6. Obtain the CRL
r = requests.get(revocation_point["dataURL"])
try:
r = requests.get(revocation_point["dataURL"], timeout=5)
except requests.exceptions.Timeout:
print("Timeout fetching CRL from ", revocation_point["dataURL"])
continue

crl_file = x509.load_der_x509_crl(r.content)

# 7. Perform CRL File Validation
crl_authority_key_id = crl_file.extensions.get_extension_for_oid(x509.OID_AUTHORITY_KEY_IDENTIFIER).value.key_identifier
crl_signer_subject_key_id = crl_signer_certificate.extensions.get_extension_for_oid(
x509.OID_SUBJECT_KEY_IDENTIFIER).value.key_identifier
if crl_authority_key_id != crl_signer_subject_key_id:
print("CRL Authority Key ID is not CRL Signer Subject Key ID, continue...")
continue

issuer_subject_key_id = ''.join('{:02X}'.format(x) for x in crl_authority_key_id)

same_issuer_points = None
# b.
same_issuer_points = dcld_helper.get_revocations_points_by_skid(issuer_subject_key_id)
count_with_matching_vid_issuer_skid = sum(item.get('vid') == vid for item in same_issuer_points)

# TODO: Extract this to a helper function
if use_rest:
response = requests.get(
f"{rest_node_url}/dcl/pki/revocation-points/{issuer_subject_key_id}").json()["pkiRevocationDistributionPointsByIssuerSubjectKeyID"]
same_issuer_points = response["points"]
else:
cmdlist = ['query', 'pki', 'revocation-points', '--issuer-subject-key-id', issuer_subject_key_id]
cmdpipe = subprocess.Popen(use_dcld(dcld, production, cmdlist), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
same_issuer_points = json.loads(cmdpipe.stdout.read())[
"pkiRevocationDistributionPointsByIssuerSubjectKeyID"]["points"]

matching_entries = False
for same_issuer_point in same_issuer_points:
if same_issuer_point["vid"] == vid:
matching_entries = True
break

if matching_entries:
if count_with_matching_vid_issuer_skid > 1:
try:
issuing_distribution_point = crl_file.extensions.get_extension_for_oid(
x509.OID_ISSUING_DISTRIBUTION_POINT).value
except Exception:
print("CRL Issuing Distribution Point not found, continue...")
continue

uri_list = issuing_distribution_point.full_name
if len(uri_list) == 1 and isinstance(uri_list[0], x509.UniformResourceIdentifier):
if uri_list[0].value != revocation_point["dataURL"]:
print("CRL Issuing Distribution Point URI is not CRL URL, continue...")
continue
else:
print("CRL Issuing Distribution Point URI is not CRL URL, continue...")
continue

# 9. Assign CRL File Issuer
certificate_authority_name = base64.b64encode(crl_file.issuer.public_bytes()).decode('utf-8')
print(f"CRL File Issuer: {certificate_authority_name}")

serialnumber_list = []
# 10. Iterate through the Revoked Certificates List
for revoked_cert in crl_file:
print(revoked_cert)
# a.
shubhamdp marked this conversation as resolved.
Show resolved Hide resolved
try:
revoked_cert_issuer = revoked_cert.extensions.get_extension_for_oid(
x509.CRLEntryExtensionOID.CERTIFICATE_ISSUER).value.get_values_for_type(x509.DirectoryName).value

if revoked_cert_issuer is not None:
if revoked_cert_issuer != certificate_authority_name:
print("CRL Issuer is not CRL File Issuer, continue...")
continue
except Exception:
pass

# b.
try:
revoked_cert_authority_key_id = revoked_cert.extensions.get_extension_for_oid(
x509.OID_AUTHORITY_KEY_IDENTIFIER).value.key_identifier

if revoked_cert_authority_key_id is None or revoked_cert_authority_key_id != crl_signer_subject_key_id:
continue
except Exception:
continue
# try:
# revoked_cert_authority_key_id = revoked_cert.extensions.get_extension_for_oid(
# x509.OID_AUTHORITY_KEY_IDENTIFIER).value.key_identifier
#
# if revoked_cert_authority_key_id is None or revoked_cert_authority_key_id != crl_signer_subject_key_id:
# print("CRL Authority Key ID is not CRL Signer Subject Key ID, continue...")
# continue
# except Exception:
# print("CRL Authority Key ID not found, continue...")
# continue

shubhamdp marked this conversation as resolved.
Show resolved Hide resolved
# c. and d.
serialnumber_list.append(bytes(str('{:02X}'.format(revoked_cert.serial_number)), 'utf-8').decode('utf-8'))
Expand Down
Loading