diff --git a/nassl/_nassl/nassl_SSL.c b/nassl/_nassl/nassl_SSL.c index 73ad780..f23f2b1 100755 --- a/nassl/_nassl/nassl_SSL.c +++ b/nassl/_nassl/nassl_SSL.c @@ -819,6 +819,66 @@ static PyObject* nassl_SSL_set_ciphersuites(nassl_SSL_Object *self, PyObject *ar } +// SSL_set1_sigalgs() is only available in OpenSSL 1.1.1 +static PyObject* nassl_SSL_set1_sigalgs(nassl_SSL_Object *self, PyObject *args) +{ + int i = 0; + PyObject *pyListOfOpensslNids; + Py_ssize_t nidsCount = 0; + int *listOfNids; + + // Parse the Python list + if (!PyArg_ParseTuple(args, "O!", &PyList_Type, &pyListOfOpensslNids)) + { + return NULL; + } + + // Extract each NID int from the list + nidsCount = PyList_Size(pyListOfOpensslNids); + listOfNids = (int *) PyMem_Malloc(nidsCount * sizeof(int)); + if (listOfNids == NULL) + { + return PyErr_NoMemory(); + } + + for (i=0; issl, listOfNids, nidsCount) != 1) + { + PyMem_Free(listOfNids); + return raise_OpenSSL_error(); + } + + PyMem_Free(listOfNids); + Py_RETURN_NONE; +} + + +static PyObject* nassl_get_peer_signature_nid(nassl_SSL_Object *self) +{ + int psig_nid; + + if(SSL_get_peer_signature_nid(self->ssl, &psig_nid) != 1) + { + return raise_OpenSSL_error(); + } + return PyLong_FromUnsignedLong((long)psig_nid); +} + + static PyObject* nassl_SSL_get0_verified_chain(nassl_SSL_Object *self, PyObject *args) { STACK_OF(X509) *verifiedCertChain = NULL; @@ -1187,6 +1247,12 @@ static PyMethodDef nassl_SSL_Object_methods[] = {"set_ciphersuites", (PyCFunction)nassl_SSL_set_ciphersuites, METH_VARARGS, "OpenSSL's SSL_set_ciphersuites()." }, + {"set1_sigalgs", (PyCFunction)nassl_SSL_set1_sigalgs, METH_VARARGS, + "OpenSSL's SSL_set1_sigalgs()." + }, + {"get_peer_signature_nid", (PyCFunction)nassl_get_peer_signature_nid, METH_NOARGS, + "OpenSSL's get_peer_signature_nid(). Returns a digest NID" + }, {"get0_verified_chain", (PyCFunction)nassl_SSL_get0_verified_chain, METH_NOARGS, "OpenSSL's SSL_get0_verified_chain(). Returns an array of _nassl.X509 objects." }, diff --git a/nassl/ephemeral_key_info.py b/nassl/ephemeral_key_info.py index 8d7fcfd..6452cd4 100644 --- a/nassl/ephemeral_key_info.py +++ b/nassl/ephemeral_key_info.py @@ -12,6 +12,9 @@ class OpenSslEvpPkeyEnum(IntEnum): EC = 408 X25519 = 1034 X448 = 1035 + RSA = 6 + DSA = 116 + RSA_PSS = 912 class OpenSslEcNidEnum(IntEnum): @@ -84,6 +87,9 @@ def get_supported_by_ssl_client(cls) -> List["OpenSslEcNidEnum"]: OpenSslEvpPkeyEnum.EC: "ECDH", OpenSslEvpPkeyEnum.X25519: "ECDH", OpenSslEvpPkeyEnum.X448: "ECDH", + OpenSslEvpPkeyEnum.RSA: "RSA", + OpenSslEvpPkeyEnum.DSA: "DSA", + OpenSslEvpPkeyEnum.RSA_PSS: "RSA-PSS", } diff --git a/nassl/ssl_client.py b/nassl/ssl_client.py index ff72e15..acd5466 100755 --- a/nassl/ssl_client.py +++ b/nassl/ssl_client.py @@ -6,7 +6,7 @@ from nassl._nassl import WantReadError, OpenSSLError, WantX509LookupError from enum import IntEnum -from typing import List, Any +from typing import List, Any, Tuple from typing import Protocol @@ -32,6 +32,17 @@ class OpenSslVerifyEnum(IntEnum): CLIENT_ONCE = 4 +class OpenSslDigestNidEnum(IntEnum): + """SSL digest algorithms used for the signature algorithm, per obj_mac.h.""" + + MD5 = 4 + SHA1 = 64 + SHA224 = 675 + SHA256 = 672 + SHA384 = 673 + SHA512 = 674 + + class OpenSslVersionEnum(IntEnum): """SSL version constants.""" @@ -455,6 +466,17 @@ def set_ciphersuites(self, cipher_suites: str) -> None: # TODO(AD): Eventually merge this method with get/set_cipher_list() self._ssl.set_ciphersuites(cipher_suites) + def set_signature_algorithms(self, algorithms: List[Tuple[OpenSslDigestNidEnum, OpenSslEvpPkeyEnum]]) -> None: + """Set the enabled signature algorithms for the key exchange. + + The algorithms parameter is a list of a public key algorithm and a digest.""" + flattened_sigalgs = [item for sublist in algorithms for item in sublist] + self._ssl.set1_sigalgs(flattened_sigalgs) + + def get_peer_signature_nid(self) -> OpenSslDigestNidEnum: + """Get the digest used for TLS message signing.""" + return OpenSslDigestNidEnum(self._ssl.get_peer_signature_nid()) + def set_groups(self, supported_groups: List[OpenSslEcNidEnum]) -> None: """Specify elliptic curves or DH groups that are supported by the client in descending order.""" self._ssl.set1_groups(supported_groups) diff --git a/tests/ssl_client_test.py b/tests/ssl_client_test.py index a6ac2f6..60fc9c0 100644 --- a/tests/ssl_client_test.py +++ b/tests/ssl_client_test.py @@ -13,6 +13,7 @@ SslClient, OpenSSLError, OpenSslEarlyDataStatusEnum, + OpenSslDigestNidEnum, ) from nassl.ephemeral_key_info import ( OpenSslEvpPkeyEnum, @@ -218,6 +219,7 @@ def test_get_verified_chain(self) -> None: # And when requesting the verified certificate chain, it returns it assert ssl_client.get_verified_chain() + finally: ssl_client.shutdown() @@ -361,16 +363,20 @@ def test_set_groups_curve_x448(self) -> None: assert len(dh_info.public_bytes) == 56 def test_get_extended_master_secret_not_used(self) -> None: + # Given a TLS server that does NOT support the Extended Master Secret extension with LegacyOpenSslServer() as server: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5) sock.connect((server.hostname, server.port)) + # When a client connects to it ssl_client = SslClient( ssl_version=OpenSslVersionEnum.TLSV1_2, underlying_socket=sock, ssl_verify=OpenSslVerifyEnum.NONE, ) + + # Then, before the handshake, the client cannot tell if Extended Master Secret was used exms_support_before_handshake = ssl_client.get_extended_master_secret_support() assert exms_support_before_handshake == ExtendedMasterSecretSupportEnum.UNKNOWN @@ -379,29 +385,83 @@ def test_get_extended_master_secret_not_used(self) -> None: finally: ssl_client.shutdown() + # And after the handshake, the client can tell that Extended Master Secret was NOT used exms_support = ssl_client.get_extended_master_secret_support() assert exms_support == ExtendedMasterSecretSupportEnum.NOT_USED_IN_CURRENT_SESSION def test_get_extended_master_secret_used(self) -> None: + # Given a TLS server that DOES support the Extended Master Secret extension with ModernOpenSslServer() as server: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5) sock.connect((server.hostname, server.port)) + # When a client connects to it ssl_client = SslClient( ssl_version=OpenSslVersionEnum.TLSV1_2, underlying_socket=sock, ssl_verify=OpenSslVerifyEnum.NONE, ) + # Then, before the handshake, the client cannot tell if Extended Master Secret was used + exms_support_before_handshake = ssl_client.get_extended_master_secret_support() + assert exms_support_before_handshake == ExtendedMasterSecretSupportEnum.UNKNOWN + try: ssl_client.do_handshake() finally: ssl_client.shutdown() + # And after the handshake, the client can tell that Extended Master Secret was used exms_support = ssl_client.get_extended_master_secret_support() assert exms_support == ExtendedMasterSecretSupportEnum.USED_IN_CURRENT_SESSION + def test_set_signature_algorithms(self) -> None: + # Given a TLS server + with ModernOpenSslServer() as server: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(5) + sock.connect((server.hostname, server.port)) + + # And a client + ssl_client = SslClient( + ssl_version=OpenSslVersionEnum.TLSV1_2, + underlying_socket=sock, + ssl_verify=OpenSslVerifyEnum.NONE, + ) + # That's configured to use a specific signature algorithm + ssl_client.set_signature_algorithms([(OpenSslDigestNidEnum.SHA256, OpenSslEvpPkeyEnum.RSA)]) + + # When the client connects to the server, it succeeds + try: + ssl_client.do_handshake() + finally: + ssl_client.shutdown() + + # And the configured signature algorithm was used + assert ssl_client.get_peer_signature_nid() == OpenSslDigestNidEnum.SHA256 + + def test_set_signature_algorithms_but_not_supported(self) -> None: + # Given a TLS server + with ModernOpenSslServer() as server: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(5) + sock.connect((server.hostname, server.port)) + + # And a client + ssl_client = SslClient( + ssl_version=OpenSslVersionEnum.TLSV1_3, + underlying_socket=sock, + ssl_verify=OpenSslVerifyEnum.NONE, + ) + # That's configured to use signature algorithms that are NOT supported + ssl_client.set_signature_algorithms([(OpenSslDigestNidEnum.SHA512, OpenSslEvpPkeyEnum.EC)]) + + # Then, when the client connects to the server, the handshake fails + with pytest.raises(OpenSSLError, match="handshake failure"): + ssl_client.do_handshake() + ssl_client.shutdown() + class TestLegacySslClientOnline: def test_ssl_2(self) -> None: