Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

da_revocation: align the revocation set generation algorithm with spec changes #36225

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
309 changes: 208 additions & 101 deletions credentials/generate-revocation-set.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import subprocess
import sys
from enum import Enum
from typing import Optional

import click
import requests
Expand Down Expand Up @@ -91,6 +92,112 @@ def parse_vid_pid_from_distinguished_name(distinguished_name):
return vid, pid


def get_akid(cert: x509.Certificate) -> Optional[bytes]:
try:
return cert.extensions.get_extension_for_oid(x509.OID_AUTHORITY_KEY_IDENTIFIER).value.key_identifier
except Exception:
logging.warning("AKID not found in certificate")
return None


def get_skid(cert: x509.Certificate) -> Optional[bytes]:
try:
return cert.extensions.get_extension_for_oid(x509.OID_SUBJECT_KEY_IDENTIFIER).value.key_identifier
except Exception:
logging.warning("SKID not found in certificate")
return None
shubhamdp marked this conversation as resolved.
Show resolved Hide resolved


def get_subject_b64(cert: x509.Certificate) -> str:
return base64.b64encode(cert.subject.public_bytes()).decode('utf-8')


def get_issuer_b64(cert: x509.Certificate) -> str:
return base64.b64encode(cert.issuer.public_bytes()).decode('utf-8')


def verify_cert(cert: x509.Certificate, root: x509.Certificate) -> bool:
'''
Verifies if the cert is signed by root.
'''

cert_akid = get_akid(cert)
root_skid = get_skid(root)
if cert_akid is None or root_skid is None or cert_akid != root_skid:
return False

if cert.issuer != root.subject:
return False

# public_key().verify() do not return anything if signature is valid,
# will raise an exception if signature is invalid
try:
root.public_key().verify(cert.signature, cert.tbs_certificate_bytes, ec.ECDSA(cert.signature_hash_algorithm))
except Exception:
shubhamdp marked this conversation as resolved.
Show resolved Hide resolved
logging.warning(f"Signature verification failed for cert subject: {get_subject_b64(cert)}, issuer: {get_issuer_b64(cert)}")
return False

return True


def is_self_signed_certificate(cert: x509.Certificate) -> bool:
return verify_cert(cert, cert)


# delegator is optional so can be None, but crl_signer and paa has to be present
def validate_cert_chain(crl_signer: x509.Certificate, crl_signer_delegator: x509.Certificate, paa: x509.Certificate):
'''
There could be four scenarios:
1. CRL Signer is PAA itself, hence its self-signed certificate
2. CRL Signer is PAI certificate, and we can validate (crl_signer -> paa) chain
3. CRL Signer delegator is PAA, and we can validate (crl_signer -> crl_signer_delegator(paa) -> paa) chain
4. CRL Signer delegator is PAI, and we can validate (crl_signer -> crl_signer_delegator -> paa) chain
'''

if crl_signer_delegator:
return verify_cert(crl_signer, crl_signer_delegator) and verify_cert(crl_signer_delegator, paa)
else:
return verify_cert(crl_signer, paa)


def validate_vid_pid(revocation_point: dict, crl_signer_certificate: x509.Certificate, crl_signer_delegator_certificate: x509.Certificate) -> bool:
crl_signer_vid, crl_signer_pid = parse_vid_pid_from_distinguished_name(crl_signer_certificate.subject)

if revocation_point["isPAA"]:
if crl_signer_vid is not None:
if revocation_point["vid"] != crl_signer_vid:
logging.warning("VID in CRL Signer Certificate does not match with VID in revocation point, continue...")
return False
else:
vid_to_match = crl_signer_vid
pid_to_match = crl_signer_pid

# if the CRL Signer is delegated then match the VID and PID of the CRL Signer Delegator
if crl_signer_delegator_certificate:
vid_to_match, pid_to_match = parse_vid_pid_from_distinguished_name(crl_signer_delegator_certificate.subject)

if vid_to_match is None or revocation_point["vid"] != vid_to_match:
logging.warning("VID in CRL Signer Certificate does not match with VID in revocation point, continue...")
return False

if pid_to_match is not None:
if revocation_point["pid"] != pid_to_match:
logging.warning("PID in CRL Signer Certificate does not match with PID in revocation point, continue...")
return False

return True


def fetch_crl_from_url(url: str, timeout: int) -> x509.CertificateRevocationList:
logging.debug(f"Fetching CRL from {url}")

try:
r = requests.get(url, timeout=timeout)
return x509.load_der_x509_crl(r.content)
except Exception:
logging.error('Failed to fetch a valid CRL')


class DCLDClient:
'''
A client for interacting with DCLD using either the REST API or command line interface (CLI).
Expand Down Expand Up @@ -172,30 +279,50 @@ def get_revocation_points(self) -> list[dict]:

return response["PkiRevocationDistributionPoint"]

def get_paa_cert_for_crl_issuer(self, crl_signer_issuer_name_b64, crl_signer_authority_key_id) -> str:
shubhamdp marked this conversation as resolved.
Show resolved Hide resolved
def get_issuer_cert(self, cert: x509.Certificate) -> Optional[x509.Certificate]:
'''
Get PAA certificate for CRL issuer
Get the issuer certificate for

Parameters
----------
crl_signer_issuer_name_b64: str
The issuer name of the CRL signer.
crl_signer_authority_key_id: str
The authority key ID of the CRL signer.
cert: x509.Certificate
Certificate

Returns
-------
str
PAA certificate in PEM format
Issuer certificate in PEM format
'''
issuer_name_b64 = get_issuer_b64(cert)
akid = get_akid(cert)
if akid is None:
return

# Convert CRL Signer AKID to colon separated hex
akid_hex = akid.hex().upper()
akid_hex = ':'.join([akid_hex[i:i+2] for i in range(0, len(akid_hex), 2)])

logging.debug(
f"Fetching issuer from:{self.rest_node_url}/dcl/pki/certificates/{issuer_name_b64}/{akid_hex}")

if self.use_rest:
response = requests.get(
f"{self.rest_node_url}/dcl/pki/certificates/{crl_signer_issuer_name_b64}/{crl_signer_authority_key_id}").json()
f"{self.rest_node_url}/dcl/pki/certificates/{issuer_name_b64}/{akid_hex}").json()
else:
response = self.get_dcld_cmd_output_json(
['query', 'pki', 'x509-cert', '-u', crl_signer_issuer_name_b64, '-k', crl_signer_authority_key_id])
['query', 'pki', 'x509-cert', '-u', issuer_name_b64, '-k', akid_hex])

issuer_certificate = response["approvedCertificates"]["certs"][0]["pemCert"]

logging.debug(f"issuer: {issuer_certificate}")

return response["approvedCertificates"]["certs"][0]["pemCert"]
try:
issuer_certificate_object = x509.load_pem_x509_certificate(bytes(issuer_certificate, 'utf-8'))
except Exception:
logging.error('Failed to parse PAA certificate')
return

return issuer_certificate_object

def get_revocations_points_by_skid(self, issuer_subject_key_id) -> list[dict]:
'''
Expand All @@ -211,6 +338,7 @@ def get_revocations_points_by_skid(self, issuer_subject_key_id) -> list[dict]:
list[dict]
List of revocation points
'''

if self.use_rest:
response = requests.get(f"{self.rest_node_url}/dcl/pki/revocation-points/{issuer_subject_key_id}").json()
else:
Expand Down Expand Up @@ -268,97 +396,55 @@ def main(use_main_net_dcld: str, use_test_net_dcld: str, use_main_net_http: bool
continue

# 2. Parse the certificate
crl_signer_certificate = x509.load_pem_x509_certificate(bytes(revocation_point["crlSignerCertificate"], 'utf-8'))

vid = revocation_point["vid"]
pid = revocation_point["pid"]
is_paa = revocation_point["isPAA"]

# 3. && 4. Validate VID/PID
crl_vid, crl_pid = parse_vid_pid_from_distinguished_name(crl_signer_certificate.subject)

if is_paa:
if crl_vid is not None:
if vid != crl_vid:
logging.warning("VID is not CRL VID, continue...")
continue
else:
if crl_vid is None or vid != crl_vid:
logging.warning("VID is not CRL VID, continue...")
continue
if crl_pid is not None:
if pid != crl_pid:
logging.warning("PID is not CRL PID, continue...")
continue

# 5. Validate the certification path containing CRLSignerCertificate.
crl_signer_issuer_name = base64.b64encode(crl_signer_certificate.issuer.public_bytes()).decode('utf-8')

crl_signer_authority_key_id = crl_signer_certificate.extensions.get_extension_for_oid(
x509.OID_AUTHORITY_KEY_IDENTIFIER).value.key_identifier

# Convert CRL Signer AKID to colon separated hex
crl_signer_authority_key_id = crl_signer_authority_key_id.hex().upper()
crl_signer_authority_key_id = ':'.join([crl_signer_authority_key_id[i:i+2]
for i in range(0, len(crl_signer_authority_key_id), 2)])

paa_certificate = dcld_client.get_paa_cert_for_crl_issuer(crl_signer_issuer_name, crl_signer_authority_key_id)

if paa_certificate is None:
logging.warning("PAA Certificate not found, continue...")
try:
crl_signer_certificate = x509.load_pem_x509_certificate(bytes(revocation_point["crlSignerCertificate"], 'utf-8'))
except Exception:
logging.warning("CRL Signer Certificate is not valid, continue...")
continue

paa_certificate_object = x509.load_pem_x509_certificate(bytes(paa_certificate, 'utf-8'))
# Parse the crl signer delegator
crl_signer_delegator_cert = None
if "crlSignerDelegator" in revocation_point:
crl_signer_delegator_cert_pem = revocation_point["crlSignerDelegator"]
logging.debug(f"CRLSignerDelegator: {crl_signer_delegator_cert_pem}")
try:
crl_signer_delegator_cert = x509.load_pem_x509_certificate(bytes(crl_signer_delegator_cert_pem, 'utf-8'))
except Exception:
logging.warning("CRL Signer Delegator Certificate not found...")

# TODO: use verify_directly_issued_by() method when we upgrade cryptography to v40.0.0
# Verify issuer matches with subject
if crl_signer_certificate.issuer != paa_certificate_object.subject:
logging.warning("CRL Signer Certificate issuer does not match with PAA Certificate subject, continue...")
# 3. and 4. Validate VID/PID
if not validate_vid_pid(revocation_point, crl_signer_certificate, crl_signer_delegator_cert):
logging.warning("Failed to validate VID/PID, continue...")
continue

# Check crl signers AKID matches with SKID of paa_certificate_object's AKID
paa_skid = paa_certificate_object.extensions.get_extension_for_oid(x509.OID_SUBJECT_KEY_IDENTIFIER).value.key_identifier
crl_akid = crl_signer_certificate.extensions.get_extension_for_oid(x509.OID_AUTHORITY_KEY_IDENTIFIER).value.key_identifier
if paa_skid != crl_akid:
logging.warning("CRL Signer's AKID does not match with PAA Certificate SKID, continue...")
# 5. Validate the certification path containing CRLSignerCertificate.
paa_certificate_object = dcld_client.get_issuer_cert(crl_signer_certificate)
if paa_certificate_object is None:
logging.warning("PAA Certificate not found, continue...")
continue

# verify if PAA singed the crl signer certificate
try:
paa_certificate_object.public_key().verify(crl_signer_certificate.signature,
crl_signer_certificate.tbs_certificate_bytes,
ec.ECDSA(crl_signer_certificate.signature_hash_algorithm))
except Exception:
logging.warning("CRL Signer Certificate is not signed by PAA Certificate, continue...")
if validate_cert_chain(crl_signer_certificate, crl_signer_delegator_cert, paa_certificate_object) is False:
logging.warning("Failed to validate CRL Signer Certificate chain, continue...")
continue

# 6. Obtain the CRL
logging.debug(f"Fetching CRL from {revocation_point['dataURL']}")
try:
r = requests.get(revocation_point["dataURL"], timeout=5)
except Exception:
logging.error('Failed to fetch CRL')
continue

try:
crl_file = x509.load_der_x509_crl(r.content)
except Exception:
logging.error('Failed to load CRL')
crl_file = fetch_crl_from_url(revocation_point["dataURL"], 5) # timeout in seconds
if crl_file is None:
continue

# 7. Perform CRL File Validation
crl_authority_key_id = crl_file.extensions.get_extension_for_oid(x509.OID_AUTHORITY_KEY_IDENTIFIER).value.key_identifier
crl_signer_subject_key_id = crl_signer_certificate.extensions.get_extension_for_oid(
x509.OID_SUBJECT_KEY_IDENTIFIER).value.key_identifier
if crl_authority_key_id != crl_signer_subject_key_id:
logging.warning("CRL Authority Key ID is not CRL Signer Subject Key ID, continue...")
# a.
crl_signer_skid = get_skid(crl_signer_certificate)
crl_akid = get_akid(crl_file)
if crl_akid != crl_signer_skid:
logging.warning("CRL AKID is not CRL Signer SKID, continue...")
continue

issuer_subject_key_id = ''.join('{:02X}'.format(x) for x in crl_authority_key_id)
crl_akid_hex = ''.join('{:02X}'.format(x) for x in crl_akid)

# b.
same_issuer_points = dcld_client.get_revocations_points_by_skid(issuer_subject_key_id)
count_with_matching_vid_issuer_skid = sum(item.get('vid') == vid for item in same_issuer_points)
same_issuer_points = dcld_client.get_revocations_points_by_skid(crl_akid_hex)
count_with_matching_vid_issuer_skid = sum(item.get('vid') == revocation_point["vid"] for item in same_issuer_points)

if count_with_matching_vid_issuer_skid > 1:
try:
Expand All @@ -377,40 +463,61 @@ def main(use_main_net_dcld: str, use_test_net_dcld: str, use_main_net_http: bool
logging.warning("CRL Issuing Distribution Point URI is not CRL URL, continue...")
continue

# 9. Assign CRL File Issuer
certificate_authority_name = base64.b64encode(crl_file.issuer.public_bytes()).decode('utf-8')
logging.debug(f"CRL File Issuer: {certificate_authority_name}")
# TODO: 8. Validate CRL as per Section 6.3 of RFC 5280

# 9. decide on certificate authority name and AKID
if revocation_point["isPAA"] and not is_self_signed_certificate(crl_signer_certificate):
certificate_authority_name_b64 = get_subject_b64(paa_certificate_object)
certificate_akid = get_skid(paa_certificate_object)
elif crl_signer_delegator_cert:
certificate_authority_name_b64 = get_subject_b64(crl_signer_delegator_cert)
certificate_akid = get_skid(crl_signer_delegator_cert)
else:
certificate_authority_name_b64 = get_subject_b64(crl_signer_certificate)
certificate_akid = get_skid(crl_signer_certificate)

# validate issuer skid matchces with the one in revocation points
certificate_akid_hex = ''.join('{:02X}'.format(x) for x in certificate_akid)

logging.debug(f"Certificate Authority Name: {certificate_authority_name_b64}")
logging.debug(f"Certificate AKID: {certificate_akid_hex}")
logging.debug(f"revocation_point['issuerSubjectKeyID']: {revocation_point['issuerSubjectKeyID']}")

if revocation_point["issuerSubjectKeyID"] != certificate_akid_hex:
logging.warning("CRL Issuer Subject Key ID is not CRL Signer Subject Key ID, continue...")
continue

serialnumber_list = []
# 10. Iterate through the Revoked Certificates List
for revoked_cert in crl_file:
# a.
try:
revoked_cert_issuer = revoked_cert.extensions.get_extension_for_oid(
x509.CRLEntryExtensionOID.CERTIFICATE_ISSUER).value.get_values_for_type(x509.DirectoryName).value

if revoked_cert_issuer is not None:
if revoked_cert_issuer != certificate_authority_name:
# check if this really are the same thing
if revoked_cert_issuer != certificate_authority_name_b64:
logging.warning("CRL Issuer is not CRL File Issuer, continue...")
continue
except Exception:
logging.warning("certificateIssuer entry extension not found in CRL")
pass

# b.
# TODO: Verify that the certificate chain of the entry is linking to the same PAA
# that issued the CRLSignerCertificate for this entry, including path through
# CRLSignerDelegator if present. If the PAAs under which were issued the certificate
# and the CRLSignerCertificate are different, ignore the entry.

# c. and d.
serialnumber_list.append(bytes(str('{:02X}'.format(revoked_cert.serial_number)), 'utf-8').decode('utf-8'))

issuer_name = base64.b64encode(crl_file.issuer.public_bytes()).decode('utf-8')
entry = {
"type": "revocation_set",
"issuer_subject_key_id": certificate_akid_hex,
"issuer_name": certificate_authority_name_b64,
"revoked_serial_numbers": serialnumber_list,
"crl_signer_cert": revocation_point["crlSignerCertificate"],
}

if "crlSignerDelegator" in revocation_point:
entry["crl_signer_delegator"] = revocation_point["crlSignerDelegator"]

revocation_set.append({"type": "revocation_set",
"issuer_subject_key_id": issuer_subject_key_id,
"issuer_name": issuer_name,
"revoked_serial_numbers": serialnumber_list})
logging.debug(f"Entry to append: {entry}")
revocation_set.append(entry)

with open(output, 'w+') as outfile:
json.dump(revocation_set, outfile, indent=4)
Expand Down
Loading