Skip to content

Commit

Permalink
[fix] #37 _pkcs7_to_pem()
Browse files Browse the repository at this point in the history
  • Loading branch information
grindsa committed Jul 6, 2020
1 parent 428c720 commit 9a580fc
Showing 1 changed file with 66 additions and 3 deletions.
69 changes: 66 additions & 3 deletions examples/ca_handler/mscertsrv_ca_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,44 @@
""" ca handler for Microsoft Webenrollment service (certsrv) """
from __future__ import print_function
import textwrap
from OpenSSL import crypto
from OpenSSL.crypto import _lib, _ffi, X509
# pylint: disable=E0401
from certsrv import Certsrv
# pylint: disable=E0401
from acme.helper import load_config, b64_url_recode, convert_byte_to_string

def _get_certificates(self):
"""
https://github.com/pyca/pyopenssl/pull/367/files#r67300900
Returns all certificates for the PKCS7 structure, if present. Only
objects of type ``signedData`` or ``signedAndEnvelopedData`` can embed
certificates.
:return: The certificates in the PKCS7, or :const:`None` if
there are none.
:rtype: :class:`tuple` of :class:`X509` or :const:`None`
"""
certs = _ffi.NULL
if self.type_is_signed():
# pylint: disable=W0212
certs = self._pkcs7.d.sign.cert
elif self.type_is_signedAndEnveloped():
# pylint: disable=W0212
certs = self._pkcs7.d.signed_and_enveloped.cert

pycerts = []
for i in range(_lib.sk_X509_num(certs)):
pycert = X509.__new__(X509)
# pylint: disable=W0212
pycert._x509 = _lib.sk_X509_value(certs, i)
pycerts.append(pycert)

if not pycerts:
return None
return tuple(pycerts)

class CAhandler(object):
""" EST CA handler """

Expand All @@ -29,6 +62,35 @@ def __enter__(self):
def __exit__(self, *args):
""" cose the connection at the end of the context """

def _pkcs7_to_pem(self, pkcs7_content, outform='string'):
""" convert pkcs7 to pem """
self.logger.debug('CAhandler._pkcs7_to_pem()')
for filetype in (crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1):
try:
pkcs7 = crypto.load_pkcs7_data(filetype, pkcs7_content)
break
except crypto.Error as _err:
pkcs7 = None
# print(err)

cert_pem_list = []
if pkcs7:
# convert cert pkcs#7 to pem
cert_list = _get_certificates(pkcs7)
for cert in cert_list:
cert_pem_list.append(convert_byte_to_string(crypto.dump_certificate(crypto.FILETYPE_PEM, cert)))

# define output format
if outform == 'string':
result = ''.join(cert_pem_list)
elif outform == 'list':
result = cert_pem_list
else:
result = None

self.logger.debug('Certificate._pkcs7_to_pem() ended')
return result

def _check_credentials(self, ca_server):
""" check creadentials """
self.logger.debug('CAhandler.__check_credentials()')
Expand All @@ -55,16 +117,17 @@ def enroll(self, csr):

# get ca_chain
try:
ca_pem = convert_byte_to_string(ca_server.get_chain(encoding='b64'))
ca_pkcs7 = convert_byte_to_string(ca_server.get_chain(encoding='b64'))
ca_pem = self._pkcs7_to_pem(ca_pkcs7)
# replace crlf with lf
ca_pem = ca_pem.replace('\r\n', '\n')
# ca_pem = ca_pem.replace('\r\n', '\n')
except BaseException as err_:
ca_pem = None
self.logger.error('ca_server.get_chain() failed with error: {0}'.format(err_))

try:
cert_raw = convert_byte_to_string(ca_server.get_cert(csr, self.template))
# replace crlf with lf
# replace crlf with lf
cert_raw = cert_raw.replace('\r\n', '\n')
except BaseException as err_:
cert_raw = None
Expand Down

0 comments on commit 9a580fc

Please sign in to comment.