From 7de3300afd11c2f751858f440e8fbd996c40029c Mon Sep 17 00:00:00 2001 From: Waqar Ahmed Date: Sun, 13 Oct 2024 19:58:30 +0500 Subject: [PATCH] Add integration test for signing csr --- tests/api2/test_certs.py | 78 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/tests/api2/test_certs.py b/tests/api2/test_certs.py index a7bbd622ef781..7a742a9080c89 100644 --- a/tests/api2/test_certs.py +++ b/tests/api2/test_certs.py @@ -1,7 +1,14 @@ +import datetime import os.path import textwrap +import cryptography import pytest +from cryptography import x509 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import serialization, hashes +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.x509 import Name, NameAttribute, CertificateBuilder, BasicConstraints from middlewared.test.integration.assets.crypto import ( certificate_signing_request, get_cert_params, intermediate_certificate_authority, root_certificate_authority @@ -249,6 +256,77 @@ def test_signing_csr(): call('certificate.delete', cert['id'], job=True) +def test_sign_csr_with_imported_ca(): + # Creating Root CA + private_key = rsa.generate_private_key( + public_exponent=65537, + key_size=2048, + backend=default_backend() + ) + + # Serialize the private key + private_key_bytes = private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption() # No encryption for test purposes + ) + + # Create a self-signed certificate + subject = Name([ + NameAttribute(cryptography.x509.NameOID.COUNTRY_NAME, u'US'), + NameAttribute(cryptography.x509.NameOID.STATE_OR_PROVINCE_NAME, u'Ohio'), + NameAttribute(cryptography.x509.NameOID.LOCALITY_NAME, u'Texas'), + NameAttribute(cryptography.x509.NameOID.ORGANIZATION_NAME, u'MyCA'), + NameAttribute(cryptography.x509.NameOID.COMMON_NAME, u'MyCA'), + ]) + issuer = subject + + cert = CertificateBuilder().subject_name(subject).issuer_name(issuer).not_valid_before( + datetime.datetime.utcnow()).not_valid_after( + datetime.datetime.utcnow() + datetime.timedelta(days=3650)).serial_number( + x509.random_serial_number()).public_key( + private_key.public_key()).add_extension( + BasicConstraints(ca=True, path_length=None), critical=True).sign( + private_key, hashes.SHA256(), default_backend() + ) + + # Serialize the certificate + cert_bytes = cert.public_bytes(serialization.Encoding.PEM) + + imported_root_ca = call('certificateauthority.create', { + 'certificate': cert_bytes.decode('utf-8'), + 'privatekey': private_key_bytes.decode('utf-8'), + 'name': 'test_imported_root_ca', + 'create_type': 'CA_CREATE_IMPORTED', + }) + + with certificate_signing_request('csr_test') as csr: + cert = call('certificateauthority.ca_sign_csr', { + 'ca_id': imported_root_ca['id'], + 'csr_cert_id': csr['id'], + 'name': 'inter_signed_csr' + }) + + cert_pem_data = cert['certificate'].encode() + cert_data = x509.load_pem_x509_certificate(cert_pem_data, default_backend()) + cert_issuer = cert_data.issuer + ca_pem_data = imported_root_ca['certificate'].encode() + ca_data = x509.load_pem_x509_certificate(ca_pem_data, default_backend()) + ca_subject = ca_data.subject + imported_root_ca = call('certificateauthority.get_instance', imported_root_ca['id']) + + try: + assert imported_root_ca['CA_type_existing'] is True, imported_root_ca + assert isinstance(cert['signedby'], dict), cert + assert cert['signedby']['id'] == imported_root_ca['id'], cert + assert cert['chain_list'] == [cert['certificate'], imported_root_ca['certificate']], cert + assert cert['issuer'] == imported_root_ca, cert + assert cert_issuer == ca_subject + finally: + call('certificate.delete', cert['id'], job=True) + call('certificateauthority.delete', imported_root_ca['id']) + + def test_revoking_cert(): with intermediate_certificate_authority('root_ca', 'intermediate_ca') as (root_ca, intermediate_ca): cert = call('certificate.create', {