Skip to content

Commit

Permalink
add serialization type hinting (#5718)
Browse files Browse the repository at this point in the history
* add serialization type hinting

* reorganize to prevent circular dependency

* review feedback

* damn you black
  • Loading branch information
reaperhulk authored Jan 31, 2021
1 parent 4372d3f commit f5940f0
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 49 deletions.
29 changes: 29 additions & 0 deletions src/cryptography/hazmat/_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.

import typing

from cryptography.hazmat.primitives.asymmetric import (
dsa,
ec,
ed25519,
ed448,
rsa,
)


_PUBLIC_KEY_TYPES = typing.Union[
dsa.DSAPublicKey,
rsa.RSAPublicKey,
ec.EllipticCurvePublicKey,
ed25519.Ed25519PublicKey,
ed448.Ed448PublicKey,
]
_PRIVATE_KEY_TYPES = typing.Union[
ed25519.Ed25519PrivateKey,
ed448.Ed448PrivateKey,
rsa.RSAPrivateKey,
dsa.DSAPrivateKey,
ec.EllipticCurvePrivateKey,
]
28 changes: 16 additions & 12 deletions src/cryptography/hazmat/primitives/serialization/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,44 @@


import abc
import typing
from enum import Enum

from cryptography import utils
from cryptography.hazmat._types import _PRIVATE_KEY_TYPES, _PUBLIC_KEY_TYPES
from cryptography.hazmat.backends import _get_backend
from cryptography.hazmat.primitives.asymmetric import dh


def load_pem_private_key(data, password, backend=None):
def load_pem_private_key(
data: bytes, password: typing.Optional[bytes], backend=None
) -> _PRIVATE_KEY_TYPES:
backend = _get_backend(backend)
return backend.load_pem_private_key(data, password)


def load_pem_public_key(data, backend=None):
def load_pem_public_key(data: bytes, backend=None) -> _PUBLIC_KEY_TYPES:
backend = _get_backend(backend)
return backend.load_pem_public_key(data)


def load_pem_parameters(data, backend=None):
def load_pem_parameters(data: bytes, backend=None) -> dh.DHParameters:
backend = _get_backend(backend)
return backend.load_pem_parameters(data)


def load_der_private_key(data, password, backend=None):
def load_der_private_key(
data: bytes, password: typing.Optional[bytes], backend=None
) -> _PRIVATE_KEY_TYPES:
backend = _get_backend(backend)
return backend.load_der_private_key(data, password)


def load_der_public_key(data, backend=None):
def load_der_public_key(data: bytes, backend=None) -> _PUBLIC_KEY_TYPES:
backend = _get_backend(backend)
return backend.load_der_public_key(data)


def load_der_parameters(data, backend=None):
def load_der_parameters(data: bytes, backend=None) -> dh.DHParameters:
backend = _get_backend(backend)
return backend.load_der_parameters(data)

Expand Down Expand Up @@ -73,15 +79,13 @@ class KeySerializationEncryption(metaclass=abc.ABCMeta):
pass


@utils.register_interface(KeySerializationEncryption)
class BestAvailableEncryption(object):
def __init__(self, password):
class BestAvailableEncryption(KeySerializationEncryption):
def __init__(self, password: bytes):
if not isinstance(password, bytes) or len(password) == 0:
raise ValueError("Password must be 1 or more bytes.")

self.password = password


@utils.register_interface(KeySerializationEncryption)
class NoEncryption(object):
class NoEncryption(KeySerializationEncryption):
pass
20 changes: 18 additions & 2 deletions src/cryptography/hazmat/primitives/serialization/pkcs12.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,35 @@
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.

import typing

from cryptography import x509
from cryptography.hazmat.backends import _get_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import dsa, ec, rsa


def load_key_and_certificates(data, password, backend=None):
def load_key_and_certificates(
data: bytes, password: typing.Optional[bytes], backend=None
):
backend = _get_backend(backend)
return backend.load_key_and_certificates_from_pkcs12(data, password)


def serialize_key_and_certificates(name, key, cert, cas, encryption_algorithm):
_ALLOWED_PKCS12_TYPES = typing.Union[
rsa.RSAPrivateKeyWithSerialization,
dsa.DSAPrivateKeyWithSerialization,
ec.EllipticCurvePrivateKeyWithSerialization,
]


def serialize_key_and_certificates(
name: bytes,
key: typing.Optional[_ALLOWED_PKCS12_TYPES],
cert: typing.Optional[x509.Certificate],
cas: typing.Optional[typing.Iterable[x509.Certificate]],
encryption_algorithm: serialization.KeySerializationEncryption,
):
if key is not None and not isinstance(
key,
(
Expand Down
57 changes: 41 additions & 16 deletions src/cryptography/hazmat/primitives/serialization/pkcs7.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.


import typing
from enum import Enum

from cryptography import x509
Expand All @@ -12,30 +12,57 @@
from cryptography.utils import _check_byteslike


def load_pem_pkcs7_certificates(data):
def load_pem_pkcs7_certificates(data: bytes) -> typing.List[x509.Certificate]:
backend = _get_backend(None)
return backend.load_pem_pkcs7_certificates(data)


def load_der_pkcs7_certificates(data):
def load_der_pkcs7_certificates(data: bytes) -> typing.List[x509.Certificate]:
backend = _get_backend(None)
return backend.load_der_pkcs7_certificates(data)


_ALLOWED_PKCS7_HASH_TYPES = typing.Union[
hashes.SHA1,
hashes.SHA224,
hashes.SHA256,
hashes.SHA384,
hashes.SHA512,
]

_ALLOWED_PRIVATE_KEY_TYPES = typing.Union[
rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey
]


class PKCS7Options(Enum):
Text = "Add text/plain MIME type"
Binary = "Don't translate input data into canonical MIME format"
DetachedSignature = "Don't embed data in the PKCS7 structure"
NoCapabilities = "Don't embed SMIME capabilities"
NoAttributes = "Don't embed authenticatedAttributes"
NoCerts = "Don't embed signer certificate"


class PKCS7SignatureBuilder(object):
def __init__(self, data=None, signers=[], additional_certs=[]):
self._data = data
self._signers = signers
self._additional_certs = additional_certs

def set_data(self, data):
def set_data(self, data: bytes) -> "PKCS7SignatureBuilder":
_check_byteslike("data", data)
if self._data is not None:
raise ValueError("data may only be set once")

return PKCS7SignatureBuilder(data, self._signers)

def add_signer(self, certificate, private_key, hash_algorithm):
def add_signer(
self,
certificate: x509.Certificate,
private_key: _ALLOWED_PRIVATE_KEY_TYPES,
hash_algorithm: _ALLOWED_PKCS7_HASH_TYPES,
) -> "PKCS7SignatureBuilder":
if not isinstance(
hash_algorithm,
(
Expand Down Expand Up @@ -63,15 +90,22 @@ def add_signer(self, certificate, private_key, hash_algorithm):
self._signers + [(certificate, private_key, hash_algorithm)],
)

def add_certificate(self, certificate):
def add_certificate(
self, certificate: x509.Certificate
) -> "PKCS7SignatureBuilder":
if not isinstance(certificate, x509.Certificate):
raise TypeError("certificate must be a x509.Certificate")

return PKCS7SignatureBuilder(
self._data, self._signers, self._additional_certs + [certificate]
)

def sign(self, encoding, options, backend=None):
def sign(
self,
encoding: serialization.Encoding,
options: typing.Iterable[PKCS7Options],
backend=None,
) -> bytes:
if len(self._signers) == 0:
raise ValueError("Must have at least one signer")
if self._data is None:
Expand Down Expand Up @@ -120,12 +154,3 @@ def sign(self, encoding, options, backend=None):

backend = _get_backend(backend)
return backend.pkcs7_sign(self, encoding, options)


class PKCS7Options(Enum):
Text = "Add text/plain MIME type"
Binary = "Don't translate input data into canonical MIME format"
DetachedSignature = "Don't embed data in the PKCS7 structure"
NoCapabilities = "Don't embed SMIME capabilities"
NoAttributes = "Don't embed authenticatedAttributes"
NoCerts = "Don't embed signer certificate"
35 changes: 30 additions & 5 deletions src/cryptography/hazmat/primitives/serialization/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import os
import re
import struct
import typing
from base64 import encodebytes as _base64_encode

from cryptography import utils
Expand Down Expand Up @@ -464,7 +465,17 @@ def _lookup_kformat(key_type):
raise UnsupportedAlgorithm("Unsupported key type: %r" % key_type)


def load_ssh_private_key(data, password, backend=None):
_SSH_PRIVATE_KEY_TYPES = typing.Union[
ec.EllipticCurvePrivateKey,
rsa.RSAPrivateKey,
dsa.DSAPrivateKey,
ed25519.Ed25519PrivateKey,
]


def load_ssh_private_key(
data: bytes, password: typing.Optional[bytes], backend=None
) -> _SSH_PRIVATE_KEY_TYPES:
"""Load private key from OpenSSH custom encoding."""
utils._check_byteslike("data", data)
backend = _get_backend(backend)
Expand Down Expand Up @@ -538,7 +549,10 @@ def load_ssh_private_key(data, password, backend=None):
return private_key


def serialize_ssh_private_key(private_key, password=None):
def serialize_ssh_private_key(
private_key: _SSH_PRIVATE_KEY_TYPES,
password: typing.Optional[bytes] = None,
):
"""Serialize private key with OpenSSH custom encoding."""
if password is not None:
utils._check_bytes("password", password)
Expand Down Expand Up @@ -613,11 +627,22 @@ def serialize_ssh_private_key(private_key, password=None):
ciph.encryptor().update_into(buf[ofs:mlen], buf[ofs:])

txt = _ssh_pem_encode(buf[:mlen])
buf[ofs:mlen] = bytearray(slen)
# Ignore the following type because mypy wants
# Sequence[bytes] but what we're passing is fine.
# https://github.com/python/mypy/issues/9999
buf[ofs:mlen] = bytearray(slen) # type: ignore
return txt


def load_ssh_public_key(data, backend=None):
_SSH_PUBLIC_KEY_TYPES = typing.Union[
ec.EllipticCurvePublicKey,
rsa.RSAPublicKey,
dsa.DSAPublicKey,
ed25519.Ed25519PublicKey,
]


def load_ssh_public_key(data: bytes, backend=None) -> _SSH_PUBLIC_KEY_TYPES:
"""Load public key from OpenSSH one-line format."""
backend = _get_backend(backend)
utils._check_byteslike("data", data)
Expand Down Expand Up @@ -660,7 +685,7 @@ def load_ssh_public_key(data, backend=None):
return public_key


def serialize_ssh_public_key(public_key):
def serialize_ssh_public_key(public_key: _SSH_PUBLIC_KEY_TYPES) -> bytes:
"""One-line public key format for OpenSSH"""
if isinstance(public_key, ec.EllipticCurvePublicKey):
key_type = _ecdsa_key_type(public_key)
Expand Down
15 changes: 1 addition & 14 deletions src/cryptography/x509/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import typing
from enum import Enum

from cryptography.hazmat._types import _PRIVATE_KEY_TYPES, _PUBLIC_KEY_TYPES
from cryptography.hazmat.backends import _get_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import (
Expand All @@ -24,20 +25,6 @@


_EARLIEST_UTC_TIME = datetime.datetime(1950, 1, 1)
_PUBLIC_KEY_TYPES = typing.Union[
dsa.DSAPublicKey,
rsa.RSAPublicKey,
ec.EllipticCurvePublicKey,
ed25519.Ed25519PublicKey,
ed448.Ed448PublicKey,
]
_PRIVATE_KEY_TYPES = typing.Union[
ed25519.Ed25519PrivateKey,
ed448.Ed448PrivateKey,
rsa.RSAPrivateKey,
dsa.DSAPrivateKey,
ec.EllipticCurvePrivateKey,
]


class AttributeNotFound(Exception):
Expand Down

0 comments on commit f5940f0

Please sign in to comment.