Skip to content

Commit

Permalink
Move x509_crl_info code to module_utils.
Browse files Browse the repository at this point in the history
  • Loading branch information
felixfontein committed Mar 8, 2021
1 parent 2e69113 commit 3cd0056
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 102 deletions.
99 changes: 99 additions & 0 deletions plugins/module_utils/crypto/module_backends/crl_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# -*- coding: utf-8 -*-
#
# Copyright: (c) 2020, Felix Fontein <[email protected]>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)

from __future__ import absolute_import, division, print_function
__metaclass__ = type


import traceback

from distutils.version import LooseVersion

from ansible.module_utils.basic import missing_required_lib

from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
cryptography_oid_to_name,
)

from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_crl import (
TIMESTAMP_FORMAT,
cryptography_decode_revoked_certificate,
cryptography_dump_revoked,
cryptography_get_signature_algorithm_oid_from_crl,
)

from ansible_collections.community.crypto.plugins.module_utils.crypto.pem import (
identify_pem_format,
)

# crypto_utils

MINIMAL_CRYPTOGRAPHY_VERSION = '1.2'

CRYPTOGRAPHY_IMP_ERR = None
try:
import cryptography
from cryptography import x509
from cryptography.hazmat.backends import default_backend
CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__)
except ImportError:
CRYPTOGRAPHY_IMP_ERR = traceback.format_exc()
CRYPTOGRAPHY_FOUND = False
else:
CRYPTOGRAPHY_FOUND = True


class CRLInfoRetrieval(object):
def __init__(self, module, content):
# content must be a bytes string
self.module = module
self.content = content

def get_info(self):
self.crl_pem = identify_pem_format(self.content)
try:
if self.crl_pem:
self.crl = x509.load_pem_x509_crl(self.content, default_backend())
else:
self.crl = x509.load_der_x509_crl(self.content, default_backend())
except Exception as e:
self.module.fail_json(msg='Error while decoding CRL: {0}'.format(e))

result = {
'changed': False,
'format': 'pem' if self.crl_pem else 'der',
'last_update': None,
'next_update': None,
'digest': None,
'issuer_ordered': None,
'issuer': None,
'revoked_certificates': [],
}

result['last_update'] = self.crl.last_update.strftime(TIMESTAMP_FORMAT)
result['next_update'] = self.crl.next_update.strftime(TIMESTAMP_FORMAT)
result['digest'] = cryptography_oid_to_name(cryptography_get_signature_algorithm_oid_from_crl(self.crl))
issuer = []
for attribute in self.crl.issuer:
issuer.append([cryptography_oid_to_name(attribute.oid), attribute.value])
result['issuer_ordered'] = issuer
result['issuer'] = {}
for k, v in issuer:
result['issuer'][k] = v
result['revoked_certificates'] = []
for cert in self.crl:
entry = cryptography_decode_revoked_certificate(cert)
result['revoked_certificates'].append(cryptography_dump_revoked(entry))

return result


def get_crl_info(module, content):
if not CRYPTOGRAPHY_FOUND:
module.fail_json(msg=missing_required_lib('cryptography >= {0}'.format(MINIMAL_CRYPTOGRAPHY_VERSION)),
exception=CRYPTOGRAPHY_IMP_ERR)

info = CRLInfoRetrieval(module, content)
return info.get_info()
120 changes: 18 additions & 102 deletions plugins/modules/x509_crl_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,8 @@


import base64
import traceback

from distutils.version import LooseVersion

from ansible.module_utils.basic import AnsibleModule, missing_required_lib
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils._text import to_native

from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import (
Expand All @@ -164,99 +161,9 @@
identify_pem_format,
)

# crypto_utils

MINIMAL_CRYPTOGRAPHY_VERSION = '1.2'

CRYPTOGRAPHY_IMP_ERR = None
try:
import cryptography
from cryptography import x509
from cryptography.hazmat.backends import default_backend
CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__)
except ImportError:
CRYPTOGRAPHY_IMP_ERR = traceback.format_exc()
CRYPTOGRAPHY_FOUND = False
else:
CRYPTOGRAPHY_FOUND = True


class CRLError(OpenSSLObjectError):
pass


class CRLInfo(OpenSSLObject):
"""The main module implementation."""

def __init__(self, module):
super(CRLInfo, self).__init__(
module.params['path'] or '',
'present',
False,
module.check_mode
)

self.content = module.params['content']

self.module = module

self.crl = None
if self.content is None:
try:
with open(self.path, 'rb') as f:
data = f.read()
except Exception as e:
self.module.fail_json(msg='Error while reading CRL file from disk: {0}'.format(e))
else:
data = self.content.encode('utf-8')
if not identify_pem_format(data):
data = base64.b64decode(self.content)

self.crl_pem = identify_pem_format(data)
try:
if self.crl_pem:
self.crl = x509.load_pem_x509_crl(data, default_backend())
else:
self.crl = x509.load_der_x509_crl(data, default_backend())
except Exception as e:
self.module.fail_json(msg='Error while decoding CRL: {0}'.format(e))

def get_info(self):
result = {
'changed': False,
'format': 'pem' if self.crl_pem else 'der',
'last_update': None,
'next_update': None,
'digest': None,
'issuer_ordered': None,
'issuer': None,
'revoked_certificates': [],
}

result['last_update'] = self.crl.last_update.strftime(TIMESTAMP_FORMAT)
result['next_update'] = self.crl.next_update.strftime(TIMESTAMP_FORMAT)
result['digest'] = cryptography_oid_to_name(cryptography_get_signature_algorithm_oid_from_crl(self.crl))
issuer = []
for attribute in self.crl.issuer:
issuer.append([cryptography_oid_to_name(attribute.oid), attribute.value])
result['issuer_ordered'] = issuer
result['issuer'] = {}
for k, v in issuer:
result['issuer'][k] = v
result['revoked_certificates'] = []
for cert in self.crl:
entry = cryptography_decode_revoked_certificate(cert)
result['revoked_certificates'].append(cryptography_dump_revoked(entry))

return result

def generate(self):
# Empty method because OpenSSLObject wants this
pass

def dump(self):
# Empty method because OpenSSLObject wants this
pass
from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.crl_info import (
get_crl_info,
)


def main():
Expand All @@ -274,13 +181,22 @@ def main():
supports_check_mode=True,
)

if not CRYPTOGRAPHY_FOUND:
module.fail_json(msg=missing_required_lib('cryptography >= {0}'.format(MINIMAL_CRYPTOGRAPHY_VERSION)),
exception=CRYPTOGRAPHY_IMP_ERR)
if module.params['content'] is None:
try:
with open(module.params['path'], 'rb') as f:
data = f.read()
except Exception as e:
module.fail_json(msg='Error while reading CRL file from disk: {0}'.format(e))
else:
data = module.params['content'].encode('utf-8')
if not identify_pem_format(data):
try:
data = base64.b64decode(module.params['content'])
except Exception as e:
module.fail_json(msg='Error while Base64 decoding content: {0}'.format(e))

try:
crl = CRLInfo(module)
result = crl.get_info()
result = get_crl_info(module, data)
module.exit_json(**result)
except OpenSSLObjectError as e:
module.fail_json(msg=to_native(e))
Expand Down

0 comments on commit 3cd0056

Please sign in to comment.