diff --git a/examples/ca_handler/mscertsrv_ca_handler.py b/examples/ca_handler/mscertsrv_ca_handler.py index 3b4197e6..2210314a 100644 --- a/examples/ca_handler/mscertsrv_ca_handler.py +++ b/examples/ca_handler/mscertsrv_ca_handler.py @@ -3,11 +3,44 @@ """ ca handler for Microsoft Webenrollment service (certsrv) """ from __future__ import print_function import textwrap +from OpenSSL import crypto +from OpenSSL.crypto import _lib, _ffi, X509 # pylint: disable=E0401 from certsrv import Certsrv # pylint: disable=E0401 from acme.helper import load_config, b64_url_recode, convert_byte_to_string +def _get_certificates(self): + """ + https://github.com/pyca/pyopenssl/pull/367/files#r67300900 + + Returns all certificates for the PKCS7 structure, if present. Only + objects of type ``signedData`` or ``signedAndEnvelopedData`` can embed + certificates. + + :return: The certificates in the PKCS7, or :const:`None` if + there are none. + :rtype: :class:`tuple` of :class:`X509` or :const:`None` + """ + certs = _ffi.NULL + if self.type_is_signed(): + # pylint: disable=W0212 + certs = self._pkcs7.d.sign.cert + elif self.type_is_signedAndEnveloped(): + # pylint: disable=W0212 + certs = self._pkcs7.d.signed_and_enveloped.cert + + pycerts = [] + for i in range(_lib.sk_X509_num(certs)): + pycert = X509.__new__(X509) + # pylint: disable=W0212 + pycert._x509 = _lib.sk_X509_value(certs, i) + pycerts.append(pycert) + + if not pycerts: + return None + return tuple(pycerts) + class CAhandler(object): """ EST CA handler """ @@ -29,6 +62,35 @@ def __enter__(self): def __exit__(self, *args): """ cose the connection at the end of the context """ + def _pkcs7_to_pem(self, pkcs7_content, outform='string'): + """ convert pkcs7 to pem """ + self.logger.debug('CAhandler._pkcs7_to_pem()') + for filetype in (crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1): + try: + pkcs7 = crypto.load_pkcs7_data(filetype, pkcs7_content) + break + except crypto.Error as _err: + pkcs7 = None + # print(err) + + cert_pem_list = [] + if pkcs7: + # convert cert pkcs#7 to pem + cert_list = _get_certificates(pkcs7) + for cert in cert_list: + cert_pem_list.append(convert_byte_to_string(crypto.dump_certificate(crypto.FILETYPE_PEM, cert))) + + # define output format + if outform == 'string': + result = ''.join(cert_pem_list) + elif outform == 'list': + result = cert_pem_list + else: + result = None + + self.logger.debug('Certificate._pkcs7_to_pem() ended') + return result + def _check_credentials(self, ca_server): """ check creadentials """ self.logger.debug('CAhandler.__check_credentials()') @@ -55,16 +117,17 @@ def enroll(self, csr): # get ca_chain try: - ca_pem = convert_byte_to_string(ca_server.get_chain(encoding='b64')) + ca_pkcs7 = convert_byte_to_string(ca_server.get_chain(encoding='b64')) + ca_pem = self._pkcs7_to_pem(ca_pkcs7) # replace crlf with lf - ca_pem = ca_pem.replace('\r\n', '\n') + # ca_pem = ca_pem.replace('\r\n', '\n') except BaseException as err_: ca_pem = None self.logger.error('ca_server.get_chain() failed with error: {0}'.format(err_)) try: cert_raw = convert_byte_to_string(ca_server.get_cert(csr, self.template)) - # replace crlf with lf + # replace crlf with lf cert_raw = cert_raw.replace('\r\n', '\n') except BaseException as err_: cert_raw = None