Skip to content

Commit

Permalink
[IOT] Test updates for beta branch (#22262)
Browse files Browse the repository at this point in the history
  • Loading branch information
vilit1 authored May 19, 2022
1 parent a7d5d6d commit 4f1e948
Show file tree
Hide file tree
Showing 22 changed files with 6,415 additions and 8,814 deletions.
56 changes: 39 additions & 17 deletions src/azure-cli/azure/cli/command_modules/iot/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,52 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import datetime
from os.path import exists, join
import base64
from OpenSSL import crypto
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import rsa


def create_self_signed_certificate(device_id, valid_days, cert_output_dir):
cert_file = device_id + '-cert.pem'
key_file = device_id + '-key.pem'

# create a key pair
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, 2048)
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)

# create a self-signed cert
cert = crypto.X509()
cert.get_subject().CN = device_id
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(valid_days * 24 * 60 * 60)
cert.set_issuer(cert.get_subject())
cert.set_pubkey(key)
cert.sign(key, 'sha256')

cert_dump = crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode('utf-8')
key_dump = crypto.dump_privatekey(crypto.FILETYPE_PEM, key).decode('utf-8')
thumbprint = cert.digest('sha1').replace(b':', b'').decode('utf-8')
subject_name = x509.Name(
[
x509.NameAttribute(NameOID.COMMON_NAME, device_id),
]
)
cert = (
x509.CertificateBuilder()
.subject_name(subject_name)
.issuer_name(subject_name)
.public_key(key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.datetime.utcnow())
.not_valid_after(
datetime.datetime.utcnow() + datetime.timedelta(days=valid_days)
)
.sign(key, hashes.SHA256())
)

key_dump = key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
).decode("utf-8")
cert_dump = cert.public_bytes(serialization.Encoding.PEM).decode("utf-8")
thumbprint = cert.fingerprint(hashes.SHA1()).hex().upper()

if cert_output_dir is not None and exists(cert_output_dir):
open(join(cert_output_dir, cert_file), "wt").write(cert_dump)
open(join(cert_output_dir, key_file), "wt").write(key_dump)

return {
'certificate': cert_dump,
'privateKey': key_dump,
Expand All @@ -45,8 +61,14 @@ def open_certificate(certificate_path):
if certificate_path.endswith('.pem') or certificate_path.endswith('.cer'):
with open(certificate_path, "rb") as cert_file:
certificate = cert_file.read()
certificate = base64.b64encode(certificate).decode("utf-8")
return certificate
try:
certificate = certificate.decode("utf-8")
except UnicodeError:
certificate = base64.b64encode(certificate).decode("utf-8")
else:
raise ValueError("Certificate file type must be either '.pem' or '.cer'.")
# Remove trailing white space from the certificate content
return certificate.rstrip()


def generate_key(byte_length=32):
Expand Down
17 changes: 12 additions & 5 deletions src/azure-cli/azure/cli/command_modules/iot/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
)
from azure.cli.core.commands import LongRunningOperation
from azure.cli.core.util import sdk_no_wait
from azure.cli.core.profiles._shared import AZURE_API_PROFILES, ResourceType

from azure.mgmt.iothub.models import (IotHubSku,
AccessRights,
Expand Down Expand Up @@ -423,8 +424,11 @@ def iot_hub_certificate_create(client, hub_name, certificate_name, certificate_p
if not certificate:
raise CLIError("Error uploading certificate '{0}'.".format(certificate_path))
cert_properties = CertificateProperties(certificate=certificate, is_verified=is_verified)
cert_description = CertificateDescription(properties=cert_properties)
return client.certificates.create_or_update(resource_group_name, hub_name, certificate_name, cert_description)

if AZURE_API_PROFILES["latest"][ResourceType.MGMT_IOTHUB] in client.profile.label:
cert_description = CertificateDescription(properties=cert_properties)
return client.certificates.create_or_update(resource_group_name, hub_name, certificate_name, cert_description)
return client.certificates.create_or_update(resource_group_name, hub_name, certificate_name, cert_properties)


def iot_hub_certificate_update(client, hub_name, certificate_name, certificate_path, etag, resource_group_name=None, is_verified=None):
Expand All @@ -436,8 +440,11 @@ def iot_hub_certificate_update(client, hub_name, certificate_name, certificate_p
if not certificate:
raise CLIError("Error uploading certificate '{0}'.".format(certificate_path))
cert_properties = CertificateProperties(certificate=certificate, is_verified=is_verified)
cert_description = CertificateDescription(properties=cert_properties)
return client.certificates.create_or_update(resource_group_name, hub_name, certificate_name, cert_description, etag)

if AZURE_API_PROFILES["latest"][ResourceType.MGMT_IOTHUB] in client.profile.label:
cert_description = CertificateDescription(properties=cert_properties)
return client.certificates.create_or_update(resource_group_name, hub_name, certificate_name, cert_description, etag)
return client.certificates.create_or_update(resource_group_name, hub_name, certificate_name, cert_properties, etag)
raise CLIError("Certificate '{0}' does not exist. Use 'iot hub certificate create' to create a new certificate."
.format(certificate_name))

Expand Down Expand Up @@ -1175,7 +1182,7 @@ def iot_message_enrichment_list(cmd, client, hub_name, resource_group_name=None)


def iot_hub_devicestream_show(cmd, client, hub_name, resource_group_name=None):
from azure.cli.core.commands.client_factory import get_mgmt_service_client, ResourceType
from azure.cli.core.commands.client_factory import get_mgmt_service_client
resource_group_name = _ensure_hub_resource_group_name(client, resource_group_name, hub_name)
# DeviceStreams property is still in preview, so until GA we need to use a preview API-version
client = get_mgmt_service_client(cmd.cli_ctx, ResourceType.MGMT_IOTHUB, api_version='2019-07-01-preview')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,78 +3,90 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import datetime
from os.path import exists
import os
from OpenSSL import crypto
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.serialization import load_pem_private_key


def _create_test_cert(cert_file, key_file, subject, valid_days, serial_number):
# create a key pair
k = crypto.PKey()
k.generate_key(crypto.TYPE_RSA, 2046)

# create a self-signed cert with some basic constraints
cert = crypto.X509()
cert.get_subject().CN = subject
cert.gmtime_adj_notBefore(-1 * 24 * 60 * 60)
cert.gmtime_adj_notAfter(valid_days * 24 * 60 * 60)
cert.set_version(2)
cert.set_serial_number(serial_number)
cert.add_extensions([
crypto.X509Extension(b"basicConstraints", True, b"CA:TRUE, pathlen:1"),
crypto.X509Extension(b"subjectKeyIdentifier", False, b"hash",
subject=cert),
])
cert.add_extensions([
crypto.X509Extension(b"authorityKeyIdentifier", False, b"keyid:always",
issuer=cert)
])
cert.set_issuer(cert.get_subject())
cert.set_pubkey(k)
cert.sign(k, 'sha256')

cert_str = crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode('ascii')
key_str = crypto.dump_privatekey(crypto.FILETYPE_PEM, k).decode('ascii')

open(cert_file, 'w').write(cert_str)
open(key_file, 'w').write(key_str)
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)

# create a self-signed cert
subject_name = x509.Name(
[
x509.NameAttribute(NameOID.COMMON_NAME, subject),
]
)
cert = (
x509.CertificateBuilder()
.subject_name(subject_name)
.issuer_name(subject_name)
.public_key(key.public_key())
.serial_number(serial_number)
.not_valid_before(datetime.datetime.utcnow())
.not_valid_after(
datetime.datetime.utcnow() + datetime.timedelta(days=valid_days)
)
.sign(key, hashes.SHA256())
)

key_dump = key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
).decode("utf-8")
cert_dump = cert.public_bytes(serialization.Encoding.PEM).decode("utf-8")

with open(cert_file, "wt", encoding="utf-8") as f:
f.write(cert_dump)

with open(key_file, "wt", encoding="utf-8") as f:
f.write(key_dump)


def _delete_test_cert(cert_file, key_file, verification_file):
if exists(cert_file) and exists(key_file):
os.remove(cert_file)
os.remove(key_file)

if exists(verification_file):
os.remove(verification_file)


def _create_verification_cert(cert_file, key_file, verification_file, nonce, valid_days, serial_number):
if exists(cert_file) and exists(key_file):
# create a key pair
public_key = crypto.PKey()
public_key.generate_key(crypto.TYPE_RSA, 2046)

key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
# open the root cert and key
signing_cert = crypto.load_certificate(crypto.FILETYPE_PEM, open(cert_file).read())
k = crypto.load_privatekey(crypto.FILETYPE_PEM, open(key_file).read())
signing_cert = x509.load_pem_x509_certificate(open(cert_file, "rb").read())
k = load_pem_private_key(open(key_file, "rb").read(), None)


subject_name = x509.Name(
[
x509.NameAttribute(NameOID.COMMON_NAME, nonce),
]
)

# create a cert signed by the root
verification_cert = crypto.X509()
verification_cert.get_subject().CN = nonce
verification_cert.gmtime_adj_notBefore(-1 * 24 * 60 * 60)
verification_cert.gmtime_adj_notAfter(valid_days * 24 * 60 * 60)
verification_cert.set_version(2)
verification_cert.set_serial_number(serial_number)

verification_cert.set_pubkey(public_key)
verification_cert.set_issuer(signing_cert.get_subject())
verification_cert.add_extensions([
crypto.X509Extension(b"authorityKeyIdentifier", False, b"keyid:always",
issuer=signing_cert)
])
verification_cert.sign(k, 'sha256')

verification_cert_str = crypto.dump_certificate(crypto.FILETYPE_PEM, verification_cert).decode('ascii')
verification_cert = (
x509.CertificateBuilder()
.subject_name(subject_name)
.issuer_name(signing_cert.subject)
.public_key(key.public_key())
.serial_number(serial_number)
.not_valid_before(datetime.datetime.utcnow())
.not_valid_after(
datetime.datetime.utcnow() + datetime.timedelta(days=valid_days)
)
.sign(k, hashes.SHA256())
)

verification_cert_str = verification_cert.public_bytes(serialization.Encoding.PEM).decode('ascii')

open(verification_file, 'w').write(verification_cert_str)
Loading

0 comments on commit 4f1e948

Please sign in to comment.