Skip to content

Commit

Permalink
Update PKIDeployer.retrieve_cert_chain()
Browse files Browse the repository at this point in the history
The code that retrieves the cert chain from CA has been moved
into PKIDeployer.retrieve_cert_chain().
  • Loading branch information
edewata committed May 14, 2024
1 parent ad8602b commit 09ea4f7
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 46 deletions.
55 changes: 45 additions & 10 deletions base/server/python/pki/server/deployment/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2305,21 +2305,56 @@ def import_cert_chain(self, nssdb):
cert_chain_file=cert_chain_path,
trust_attributes='CT,C,C')

def retrieve_cert_chain(self, url):
def retrieve_cert_chain(self, nssdb, subsystem):

external = self.configuration_file.external
standalone = self.configuration_file.standalone
subordinate = self.configuration_file.subordinate
clone = self.configuration_file.clone

system_certs_imported = \
self.mdict['pki_server_pkcs12_path'] != '' or \
self.mdict['pki_clone_pkcs12_path'] != ''

if subsystem.type == 'CA' and external or \
subsystem.type in ['KRA', 'OCSP'] and standalone:

# For external sub-CA and standalone KRA/OCSP, no need to retrieve
# the cert chain since it's already imported from a local file by
# import_cert_chain().

return

elif (subsystem.type == 'CA' and subordinate or subsystem.type != 'CA') and \
not clone and not system_certs_imported:

# For primary (not clone) sub-CA and KRA, OCSP, TKS, and TPS,
# retrieve the cert chain from the issuing CA unless it's already
# imported from PKCS #12 file by import_server_pkcs12().

url = self.mdict['pki_issuing_ca']

elif subsystem.type == 'CA' and clone and not system_certs_imported:

# For root CA and sub-CA clone, retrieve the cert chain from the
# primary server unless it's already imported from a PKCS #12 file
# by import_clone_pkcs12().

url = self.mdict['pki_clone_uri']

else:
return

logger.info('Retrieving cert chain from %s', url)
cert_chain = self.get_ca_signing_cert(url)

logger.info('Importing cert chain from %s', url)
nssdb = self.instance.open_nssdb()
try:
nssdb.import_pkcs7(
pkcs7_data=cert_chain,
trust_attributes='CT,C,C')
finally:
nssdb.close()
base64_chain = pki.nssdb.convert_pkcs7(cert_chain, 'pem', 'base64')
subsystem.set_config('preop.ca.pkcs7', base64_chain)

return cert_chain
logger.info('Importing cert chain from %s', url)
nssdb.import_pkcs7(
pkcs7_data=cert_chain,
trust_attributes='CT,C,C')

def import_system_certs(self, nssdb, subsystem):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ def spawn(self, deployer):

external = deployer.configuration_file.external
standalone = deployer.configuration_file.standalone
subordinate = deployer.configuration_file.subordinate
clone = deployer.configuration_file.clone
step_one = deployer.configuration_file.external_step_one
skip_configuration = deployer.configuration_file.skip_configuration
Expand All @@ -61,6 +60,8 @@ def spawn(self, deployer):
nssdb = instance.open_nssdb()
try:
deployer.import_system_certs(nssdb, subsystem)
deployer.retrieve_cert_chain(nssdb, subsystem)

finally:
nssdb.close()

Expand All @@ -72,41 +73,6 @@ def spawn(self, deployer):
if config.str2bool(deployer.mdict['pki_security_domain_setup']):
deployer.setup_security_domain(subsystem)

system_certs_imported = \
deployer.mdict['pki_server_pkcs12_path'] != '' or \
deployer.mdict['pki_clone_pkcs12_path'] != ''

if subsystem.type == 'CA' and external or \
subsystem.type in ['KRA', 'OCSP'] and standalone:

# For external sub-CA and standalone KRA/OCSP, no need to retrieve
# the cert chain since it's already imported from a local file by
# PKIDeployer.import_cert_chain().

subsystem.set_config('preop.ca.pkcs7', '')

elif (subsystem.type == 'CA' and subordinate or subsystem.type != 'CA') and \
not clone and not system_certs_imported:

# For primary (not clone) sub-CA and KRA, OCSP, TKS, and TPS,
# retrieve the cert chain from the issuing CA unless it's already
# imported from PKCS #12 file by PKIDeployer.import_server_pkcs12().

issuing_ca = deployer.mdict['pki_issuing_ca']
pem_chain = deployer.retrieve_cert_chain(issuing_ca)

base64_chain = pki.nssdb.convert_pkcs7(pem_chain, 'pem', 'base64')
subsystem.set_config('preop.ca.pkcs7', base64_chain)

elif subsystem.type == 'CA' and clone and not system_certs_imported:

# For root CA and sub-CA clone, retrieve the cert chain from the
# primary server unless it's already imported from a PKCS #12 file
# by PKIDeployer.import_clone_pkcs12().

master_url = deployer.mdict['pki_clone_uri']
pem_chain = deployer.retrieve_cert_chain(master_url)

subsystem.save()

if clone:
Expand Down

0 comments on commit 09ea4f7

Please sign in to comment.