Skip to content

Commit

Permalink
cryptography backend: parse dirName, RID and otherName names (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
felixfontein authored Jun 21, 2020
1 parent 8651a6a commit cb38444
Show file tree
Hide file tree
Showing 9 changed files with 276 additions and 79 deletions.
2 changes: 2 additions & 0 deletions changelogs/fragments/67669-cryptography-names.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
minor_changes:
- "openssl_* modules - the cryptography backend now properly supports ``dirName``, ``otherName`` and ``RID`` (Registered ID) names."
101 changes: 89 additions & 12 deletions plugins/module_utils/crypto/cryptography_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import base64
import binascii
import re

from ansible.module_utils._text import to_text

Expand Down Expand Up @@ -124,6 +125,66 @@ def cryptography_oid_to_name(oid, short=False):
return NORMALIZE_NAMES.get(name, name)


def _get_hex(bytesstr):
if bytesstr is None:
return bytesstr
data = binascii.hexlify(bytesstr)
data = to_text(b':'.join(data[i:i + 2] for i in range(0, len(data), 2)))
return data


def _parse_hex(bytesstr):
if bytesstr is None:
return bytesstr
data = ''.join([('0' * (2 - len(p)) + p) if len(p) < 2 else p for p in to_text(bytesstr).split(':')])
data = binascii.unhexlify(data)
return data


def _parse_dn(name):
'''
Parse a Distinguished Name.
Can be of the form ``CN=Test, O = Something`` or ``CN = Test,O= Something``.
'''
original_name = name
name = name.lstrip()
sep = ','
if name.startswith('/'):
sep = '/'
name = name[1:]
sep_str = sep + '\\'
result = []
start_re = re.compile(r'^ *([a-zA-z0-9]+) *= *')
while name:
m = start_re.match(name)
if not m:
raise OpenSSLObjectError('Error while parsing distinguished name "{0}": cannot start part in "{1}"'.format(original_name, name))
oid = cryptography_name_to_oid(m.group(1))
idx = len(m.group(0))
decoded_name = []
length = len(name)
while idx < length:
i = idx
while i < length and name[i] not in sep_str:
i += 1
if i > idx:
decoded_name.append(name[idx:i])
idx = i
while idx + 1 < length and name[idx] == '\\':
decoded_name.append(name[idx + 1])
idx += 2
if idx < length and name[idx] == sep:
break
result.append(x509.NameAttribute(oid, ''.join(decoded_name)))
name = name[idx:]
if name:
if name[0] != sep or len(name) < 2:
raise OpenSSLObjectError('Error while parsing distinguished name "{0}": unexpected end of string'.format(original_name))
name = name[1:]
return result


def cryptography_get_name(name):
'''
Given a name string, returns a cryptography x509.Name object.
Expand All @@ -138,19 +199,35 @@ def cryptography_get_name(name):
return x509.RFC822Name(to_text(name[6:]))
if name.startswith('URI:'):
return x509.UniformResourceIdentifier(to_text(name[4:]))
if name.startswith('RID:'):
m = re.match(r'^([0-9]+(?:\.[0-9]+)*)$', to_text(name[4:]))
if not m:
raise OpenSSLObjectError('Cannot parse Subject Alternative Name "{0}"'.format(name))
return x509.RegisteredID(x509.oid.ObjectIdentifier(m.group(1)))
if name.startswith('otherName:'):
m = re.match(r'^([0-9]+(?:\.[0-9]+)*);([0-9a-fA-F]{1,2}(?::[0-9a-fA-F]{1,2})*)$', to_text(name[10:]))
if not m:
raise OpenSSLObjectError('Cannot parse Subject Alternative Name "{0}"'.format(name))
return x509.OtherName(x509.oid.ObjectIdentifier(m.group(1)), _parse_hex(m.group(2)))
if name.startswith('dirName:'):
return x509.DirectoryName(x509.Name(_parse_dn(to_text(name[8:]))))
except Exception as e:
raise OpenSSLObjectError('Cannot parse Subject Alternative Name "{0}": {1}'.format(name, e))
if ':' not in name:
raise OpenSSLObjectError('Cannot parse Subject Alternative Name "{0}" (forgot "DNS:" prefix?)'.format(name))
raise OpenSSLObjectError('Cannot parse Subject Alternative Name "{0}" (potentially unsupported by cryptography backend)'.format(name))


def _get_hex(bytesstr):
if bytesstr is None:
return bytesstr
data = binascii.hexlify(bytesstr)
data = to_text(b':'.join(data[i:i + 2] for i in range(0, len(data), 2)))
return data
def _dn_escape_value(value):
'''
Escape Distinguished Name's attribute value.
'''
value = value.replace('\\', '\\\\')
for ch in [',', '#', '+', '<', '>', ';', '"', '=', '/']:
value = value.replace(ch, '\\%s' % ch)
if value.startswith(' '):
value = r'\ ' + value[1:]
return value


def cryptography_decode_name(name):
Expand All @@ -167,14 +244,14 @@ def cryptography_decode_name(name):
if isinstance(name, x509.UniformResourceIdentifier):
return 'URI:{0}'.format(name.value)
if isinstance(name, x509.DirectoryName):
# FIXME: test
return 'DirName:' + ''.join(['/{0}:{1}'.format(attribute.oid._name, attribute.value) for attribute in name.value])
return 'dirName:' + ''.join([
'/{0}={1}'.format(cryptography_oid_to_name(attribute.oid, short=True), _dn_escape_value(attribute.value))
for attribute in name.value
])
if isinstance(name, x509.RegisteredID):
# FIXME: test
return 'RegisteredID:{0}'.format(name.value)
return 'RID:{0}'.format(name.value.dotted_string)
if isinstance(name, x509.OtherName):
# FIXME: test
return '{0}:{1}'.format(name.type_id.dotted_string, _get_hex(name.value))
return 'otherName:{0};{1}'.format(name.type_id.dotted_string, _get_hex(name.value))
raise OpenSSLObjectError('Cannot decode name "{0}"'.format(name))


Expand Down
19 changes: 19 additions & 0 deletions plugins/module_utils/crypto/pyopenssl_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

from ansible.module_utils._text import to_bytes, to_text

from ansible_collections.community.crypto.plugins.module_utils.compat import ipaddress as compat_ipaddress

try:
import OpenSSL
except ImportError:
Expand All @@ -48,6 +50,23 @@ def pyopenssl_normalize_name(name, short=False):
return NORMALIZE_NAMES.get(name, name)


def pyopenssl_normalize_name_attribute(san):
# apparently openssl returns 'IP address' not 'IP' as specifier when converting the subjectAltName to string
# although it won't accept this specifier when generating the CSR. (https://github.com/openssl/openssl/issues/4004)
if san.startswith('IP Address:'):
san = 'IP:' + san[len('IP Address:'):]
if san.startswith('IP:'):
ip = compat_ipaddress.ip_address(san[3:])
san = 'IP:{0}'.format(ip.compressed)

if san.startswith('Registered ID:'):
san = 'RID:' + san[len('Registered ID:'):]
# Some versions of OpenSSL apparently forgot the colon. Happens in CI with Ubuntu 16.04 and FreeBSD 11.1
if san.startswith('Registered ID'):
san = 'RID:' + san[len('Registered ID'):]
return san


def pyopenssl_get_extensions_from_cert(cert):
# While pyOpenSSL allows us to get an extension's DER value, it won't
# give us the dotted string for an OID. So we have to do some magic to
Expand Down
20 changes: 6 additions & 14 deletions plugins/modules/openssl_csr.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,6 @@
from ansible.module_utils.basic import AnsibleModule, missing_required_lib
from ansible.module_utils._text import to_native, to_bytes, to_text

from ansible_collections.community.crypto.plugins.module_utils.compat import ipaddress as compat_ipaddress

from ansible_collections.community.crypto.plugins.module_utils.io import (
load_file_if_exists,
write_file,
Expand All @@ -449,6 +447,10 @@
cryptography_parse_key_usage_params,
)

from ansible_collections.community.crypto.plugins.module_utils.crypto.pyopenssl_support import (
pyopenssl_normalize_name_attribute,
)

MINIMAL_PYOPENSSL_VERSION = '0.15'
MINIMAL_CRYPTOGRAPHY_VERSION = '1.3'

Expand Down Expand Up @@ -705,16 +707,6 @@ def _load_private_key(self):
except OpenSSLBadPassphraseError as exc:
raise CertificateSigningRequestError(exc)

def _normalize_san(self, san):
# Apparently OpenSSL returns 'IP address' not 'IP' as specifier when converting the subjectAltName to string
# although it won't accept this specifier when generating the CSR. (https://github.com/openssl/openssl/issues/4004)
if san.startswith('IP Address:'):
san = 'IP:' + san[len('IP Address:'):]
if san.startswith('IP:'):
ip = compat_ipaddress.ip_address(san[3:])
san = 'IP:{0}'.format(ip.compressed)
return san

def _check_csr(self):
def _check_subject(csr):
subject = [(OpenSSL._util.lib.OBJ_txt2nid(to_bytes(sub[0])), to_bytes(sub[1])) for sub in self.subject]
Expand All @@ -726,10 +718,10 @@ def _check_subject(csr):

def _check_subjectAltName(extensions):
altnames_ext = next((ext for ext in extensions if ext.get_short_name() == b'subjectAltName'), '')
altnames = [self._normalize_san(altname.strip()) for altname in
altnames = [pyopenssl_normalize_name_attribute(altname.strip()) for altname in
to_text(altnames_ext, errors='surrogate_or_strict').split(',') if altname.strip()]
if self.subjectAltName:
if (set(altnames) != set([self._normalize_san(to_text(name)) for name in self.subjectAltName]) or
if (set(altnames) != set([pyopenssl_normalize_name_attribute(to_text(name)) for name in self.subjectAltName]) or
altnames_ext.get_critical() != self.subjectAltName_critical):
return False
else:
Expand Down
17 changes: 3 additions & 14 deletions plugins/modules/openssl_csr_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,6 @@
from ansible.module_utils.basic import AnsibleModule, missing_required_lib
from ansible.module_utils._text import to_native, to_text, to_bytes

from ansible_collections.community.crypto.plugins.module_utils.compat import ipaddress as compat_ipaddress

from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import (
OpenSSLObjectError,
)
Expand All @@ -229,8 +227,9 @@
)

from ansible_collections.community.crypto.plugins.module_utils.crypto.pyopenssl_support import (
pyopenssl_normalize_name,
pyopenssl_get_extensions_from_csr,
pyopenssl_normalize_name,
pyopenssl_normalize_name_attribute,
)

MINIMAL_CRYPTOGRAPHY_VERSION = '1.3'
Expand Down Expand Up @@ -551,20 +550,10 @@ def _get_ocsp_must_staple(self):
else:
return None, False

def _normalize_san(self, san):
# apparently openssl returns 'IP address' not 'IP' as specifier when converting the subjectAltName to string
# although it won't accept this specifier when generating the CSR. (https://github.com/openssl/openssl/issues/4004)
if san.startswith('IP Address:'):
san = 'IP:' + san[len('IP Address:'):]
if san.startswith('IP:'):
ip = compat_ipaddress.ip_address(san[3:])
san = 'IP:{0}'.format(ip.compressed)
return san

def _get_subject_alt_name(self):
for extension in self.csr.get_extensions():
if extension.get_short_name() == b'subjectAltName':
result = [self._normalize_san(altname.strip()) for altname in
result = [pyopenssl_normalize_name_attribute(altname.strip()) for altname in
to_text(extension, errors='surrogate_or_strict').split(', ')]
return result, bool(extension.get_critical())
return None, False
Expand Down
18 changes: 6 additions & 12 deletions plugins/modules/x509_certificate.py
Original file line number Diff line number Diff line change
Expand Up @@ -894,6 +894,10 @@
cryptography_serial_number_of_cert,
)

from ansible_collections.community.crypto.plugins.module_utils.crypto.pyopenssl_support import (
pyopenssl_normalize_name_attribute,
)

MINIMAL_CRYPTOGRAPHY_VERSION = '1.6'
MINIMAL_PYOPENSSL_VERSION = '0.15'

Expand Down Expand Up @@ -2228,25 +2232,15 @@ def _validate_extended_key_usage(self):
if self.extended_key_usage:
return NO_EXTENSION

def _normalize_san(self, san):
# Apparently OpenSSL returns 'IP address' not 'IP' as specifier when converting the subjectAltName to string
# although it won't accept this specifier when generating the CSR. (https://github.com/openssl/openssl/issues/4004)
if san.startswith('IP Address:'):
san = 'IP:' + san[len('IP Address:'):]
if san.startswith('IP:'):
ip = compat_ipaddress.ip_address(san[3:])
san = 'IP:{0}'.format(ip.compressed)
return san

def _validate_subject_alt_name(self):
found = False
for extension_idx in range(0, self.cert.get_extension_count()):
extension = self.cert.get_extension(extension_idx)
if extension.get_short_name() == b'subjectAltName':
found = True
l_altnames = [self._normalize_san(altname.strip()) for altname in
l_altnames = [pyopenssl_normalize_name_attribute(altname.strip()) for altname in
to_text(extension, errors='surrogate_or_strict').split(', ')]
sans = [self._normalize_san(to_text(san, errors='surrogate_or_strict')) for san in self.subject_alt_name]
sans = [pyopenssl_normalize_name_attribute(to_text(san, errors='surrogate_or_strict')) for san in self.subject_alt_name]
if not compare_sets(sans, l_altnames, self.subject_alt_name_strict):
return self.subject_alt_name, l_altnames
if not found:
Expand Down
11 changes: 2 additions & 9 deletions plugins/modules/x509_certificate_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@
from ansible_collections.community.crypto.plugins.module_utils.crypto.pyopenssl_support import (
pyopenssl_get_extensions_from_cert,
pyopenssl_normalize_name,
pyopenssl_normalize_name_attribute,
)

MINIMAL_CRYPTOGRAPHY_VERSION = '1.6'
Expand Down Expand Up @@ -740,19 +741,11 @@ def _get_ocsp_must_staple(self):
else:
return None, False

def _normalize_san(self, san):
if san.startswith('IP Address:'):
san = 'IP:' + san[len('IP Address:'):]
if san.startswith('IP:'):
ip = compat_ipaddress.ip_address(san[3:])
san = 'IP:{0}'.format(ip.compressed)
return san

def _get_subject_alt_name(self):
for extension_idx in range(0, self.cert.get_extension_count()):
extension = self.cert.get_extension(extension_idx)
if extension.get_short_name() == b'subjectAltName':
result = [self._normalize_san(altname.strip()) for altname in
result = [pyopenssl_normalize_name_attribute(altname.strip()) for altname in
to_text(extension, errors='surrogate_or_strict').split(', ')]
return result, bool(extension.get_critical())
return None, False
Expand Down
Loading

0 comments on commit cb38444

Please sign in to comment.