Skip to content

Commit

Permalink
Add integration test for signing csr
Browse files Browse the repository at this point in the history
  • Loading branch information
sonicaj committed Oct 13, 2024
1 parent eea4356 commit 7de3300
Showing 1 changed file with 78 additions and 0 deletions.
78 changes: 78 additions & 0 deletions tests/api2/test_certs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import datetime
import os.path
import textwrap

import cryptography
import pytest
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509 import Name, NameAttribute, CertificateBuilder, BasicConstraints

from middlewared.test.integration.assets.crypto import (
certificate_signing_request, get_cert_params, intermediate_certificate_authority, root_certificate_authority
Expand Down Expand Up @@ -249,6 +256,77 @@ def test_signing_csr():
call('certificate.delete', cert['id'], job=True)


def test_sign_csr_with_imported_ca():
# Creating Root CA
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)

# Serialize the private key
private_key_bytes = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption() # No encryption for test purposes
)

# Create a self-signed certificate
subject = Name([
NameAttribute(cryptography.x509.NameOID.COUNTRY_NAME, u'US'),
NameAttribute(cryptography.x509.NameOID.STATE_OR_PROVINCE_NAME, u'Ohio'),
NameAttribute(cryptography.x509.NameOID.LOCALITY_NAME, u'Texas'),
NameAttribute(cryptography.x509.NameOID.ORGANIZATION_NAME, u'MyCA'),
NameAttribute(cryptography.x509.NameOID.COMMON_NAME, u'MyCA'),
])
issuer = subject

cert = CertificateBuilder().subject_name(subject).issuer_name(issuer).not_valid_before(
datetime.datetime.utcnow()).not_valid_after(
datetime.datetime.utcnow() + datetime.timedelta(days=3650)).serial_number(
x509.random_serial_number()).public_key(
private_key.public_key()).add_extension(
BasicConstraints(ca=True, path_length=None), critical=True).sign(
private_key, hashes.SHA256(), default_backend()
)

# Serialize the certificate
cert_bytes = cert.public_bytes(serialization.Encoding.PEM)

imported_root_ca = call('certificateauthority.create', {
'certificate': cert_bytes.decode('utf-8'),
'privatekey': private_key_bytes.decode('utf-8'),
'name': 'test_imported_root_ca',
'create_type': 'CA_CREATE_IMPORTED',
})

with certificate_signing_request('csr_test') as csr:
cert = call('certificateauthority.ca_sign_csr', {
'ca_id': imported_root_ca['id'],
'csr_cert_id': csr['id'],
'name': 'inter_signed_csr'
})

cert_pem_data = cert['certificate'].encode()
cert_data = x509.load_pem_x509_certificate(cert_pem_data, default_backend())
cert_issuer = cert_data.issuer
ca_pem_data = imported_root_ca['certificate'].encode()
ca_data = x509.load_pem_x509_certificate(ca_pem_data, default_backend())
ca_subject = ca_data.subject
imported_root_ca = call('certificateauthority.get_instance', imported_root_ca['id'])

try:
assert imported_root_ca['CA_type_existing'] is True, imported_root_ca
assert isinstance(cert['signedby'], dict), cert
assert cert['signedby']['id'] == imported_root_ca['id'], cert
assert cert['chain_list'] == [cert['certificate'], imported_root_ca['certificate']], cert
assert cert['issuer'] == imported_root_ca, cert
assert cert_issuer == ca_subject
finally:
call('certificate.delete', cert['id'], job=True)
call('certificateauthority.delete', imported_root_ca['id'])


def test_revoking_cert():
with intermediate_certificate_authority('root_ca', 'intermediate_ca') as (root_ca, intermediate_ca):
cert = call('certificate.create', {
Expand Down

0 comments on commit 7de3300

Please sign in to comment.