diff --git a/serles/backends/certbot.py b/serles/backends/certbot.py index b979a60..dd46e7b 100644 --- a/serles/backends/certbot.py +++ b/serles/backends/certbot.py @@ -92,7 +92,7 @@ def sign(self, csr, subjectDN, subjectAltNames, email): ] for csr_san in subjectAltNames: - cmd.extend(["-d", csr_san]) + cmd.extend(["-d", str(csr_san.value)]) res = subprocess.run(cmd, stdout=PIPE, stderr=STDOUT, check=False) output = res.stdout.decode("utf-8") diff --git a/serles/backends/ejbca.py b/serles/backends/ejbca.py index f9a6ec6..aa115f3 100644 --- a/serles/backends/ejbca.py +++ b/serles/backends/ejbca.py @@ -73,7 +73,7 @@ def __init__(self, config): self.userData = self.client.get_type("ns0:userDataVOWS") def sign(self, csr, subjectDN, subjectAltNames, email): - subjectAltName = ",".join(f"DNSNAME={name}" for name in subjectAltNames) + subjectAltName = ",".join(name.ejbca_identifier() for name in subjectAltNames) csr_obj = x509.load_pem_x509_csr(csr, x509_backend()) csr_der = csr_obj.public_bytes(serialization.Encoding.DER) diff --git a/serles/challenge.py b/serles/challenge.py index 31d8a2a..ed2bf1f 100644 --- a/serles/challenge.py +++ b/serles/challenge.py @@ -9,10 +9,13 @@ import dns.resolver from datetime import datetime, timezone +from ipaddress import IPv4Address, IPv6Address, IPv4Network, IPv6Network, ip_address +from typing import Union from cryptography import x509 # python3-cryptography.x86_64 from cryptography.hazmat.backends import default_backend as x509_backend from cryptography.hazmat.primitives import serialization +from cryptography.x509 import SubjectAlternativeName from .utils import get_ptr, ip_in_ranges, normalize, ber_parse from .configloader import get_config @@ -22,6 +25,44 @@ config = {} backend = None +class ChallengeIdentifier: + type: IdentifierTypes + value: Union[str, IPv4Address, IPv6Address] + + def ejbca_identifier(self): + if self.type == IdentifierTypes.dns: + return f"DNSNAME={self.value}" + elif self.type == IdentifierTypes.ip: + return f"IPAddress={self.value}" + else: + return None + + def __init__(self, type: IdentifierTypes, value): + self.type = type + if self.type == IdentifierTypes.dns: + if not isinstance(value, str): + raise TypeError("DNS identifier value must be strings") + self.value = value + elif self.type == IdentifierTypes.ip: + if isinstance(value, (IPv4Address, IPv6Address)): + self.value = value + elif isinstance(value, (IPv4Network, IPv6Network)): + self.value = value.network_address + elif isinstance(value, str): + ip_address(value) + else: + raise TypeError("IP identifier value must be an supported IP type or string") + else: + raise ValueError("Invalid identifier type") + + def __eq__(self, other): + if isinstance(other, ChallengeIdentifier): + return self.type == other.type and self.value == other.value + else: + return False + + def __str__(self) -> str: + return f"{str(self.type).upper()}:{self.value}" def init_config(): global config, backend @@ -229,7 +270,7 @@ def alpn_challenge(challenge): # RFC 8737 §3 return None, None # no error occurred :) -def check_csr_and_return_cert(csr_der, order): +def check_csr_and_return_cert(csr_der: bytes, order: Order): """ validate CSR and pass to backend Checks that the CSR only contains domains from previously validated @@ -247,9 +288,18 @@ def check_csr_and_return_cert(csr_der, order): """ csr = x509.load_der_x509_csr(csr_der, x509_backend()) try: - alt_names = csr.extensions.get_extension_for_oid( + san: SubjectAlternativeName = csr.extensions.get_extension_for_oid( x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME - ).value.get_values_for_type(x509.DNSName) + ).value + alt_names_dns = san.get_values_for_type(x509.DNSName) + alt_names_ip = san.get_values_for_type(x509.IPAddress) + alt_names: list[ChallengeIdentifier] = [ + ChallengeIdentifier(IdentifierTypes.dns, dns) + for dns in alt_names_dns + ] + [ + ChallengeIdentifier(IdentifierTypes.ip, ip) + for ip in alt_names_ip + ] except: alt_names = [] try: @@ -259,14 +309,15 @@ def check_csr_and_return_cert(csr_der, order): except IndexError: # certbot does not set Subject Name, only SANs # https://github.com/certbot/certbot/issues/4922 - common_name = alt_names[0] + common_name = alt_names[0].value - if not common_name in alt_names: # chrome ignores CN, so write CN to SAN - alt_names.insert(0, common_name) + common_name_ident = ChallengeIdentifier(IdentifierTypes.dns, common_name) + if not common_name_ident in alt_names: # chrome ignores CN, so write CN to SAN + alt_names.insert(0, common_name_ident) # since we pass the CN and SANs to the backend, make sure the client only # specified those that we verified before: - order_identifiers = {ident.value for ident in order.identifiers} + order_identifiers = {ChallengeIdentifier(ident.type, ident.value) for ident in order.identifiers} csr_identifiers = {*alt_names} # convert list to set if order_identifiers != csr_identifiers: raise ACMEError(f"{order_identifiers} != {csr_identifiers}", 400, "badCSR") diff --git a/serles/models.py b/serles/models.py index 0fc8cb4..59e1c2d 100644 --- a/serles/models.py +++ b/serles/models.py @@ -91,6 +91,7 @@ def serialized(self): class IdentifierTypes(Enum): dns = "dns" + ip = "ip" class Identifier(db.Model): diff --git a/serles/views.py b/serles/views.py index 0d9770c..5e2e673 100644 --- a/serles/views.py +++ b/serles/views.py @@ -1,3 +1,4 @@ +from ipaddress import ip_address import jwcrypto.jwk # fedora package: python3-jwcrypto.noarch import jwcrypto.jws @@ -121,18 +122,35 @@ def post(self): raise ACMEError("identifier not valid", 400, "malformed") type_ = identifier.get("type") value = identifier.get("value") - if type_ != "dns": + try: + ident_type = IdentifierTypes(type_) + except ValueError: raise ACMEError( - "can only do 'dns' type identifiers", 400, "rejectedIdentifier" + "unsupported identifier type", 400, "rejectedIdentifier" ) - - identifier = Identifier(type=IdentifierTypes(type_), value=value) + if ident_type == IdentifierTypes.ip: + try: + value = ip_address(value) # raises on invalid IP + except ValueError: + raise ACMEError( + "invalid IP address", 400, "rejectedIdentifier" + ) + + identifier = Identifier(type=ident_type, value=value) db.session.add(identifier) - challenges = [ - Challenge(type=ChallengeTypes.http_01), - Challenge(type=ChallengeTypes.dns_01), - Challenge(type=ChallengeTypes.tls_alpn_01), - ] + if ident_type == IdentifierTypes.ip: + # RFC8738 § 7 + # "dns-01" challenge MUST NOT be used to validate IP identifiers. + challenges = [ + Challenge(type=ChallengeTypes.http_01), + Challenge(type=ChallengeTypes.tls_alpn_01), + ] + else: + challenges = [ + Challenge(type=ChallengeTypes.http_01), + Challenge(type=ChallengeTypes.dns_01), + Challenge(type=ChallengeTypes.tls_alpn_01), + ] for c in challenges: db.session.add(c) authz = Authorization(identifier=identifier, challenges=challenges)