From 34b018fd63edc3526f9108d871fc5d25d279690d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jeremy=20Lain=C3=A9?= Date: Wed, 10 Jan 2024 22:46:32 +0100 Subject: [PATCH] Add Connection.get_selected_srtp_profile (#1278) (#1279) If an SRTP profile was negotiated as part of the handshake, make it possible to retrieve the name of the profile. This is needed to determine which profiles were offered using `Context.set_tlsext_use_srtp` was actually selected. --- CHANGELOG.rst | 2 ++ src/OpenSSL/SSL.py | 13 +++++++++++++ tests/test_ssl.py | 20 +++++++++++++++++++- 3 files changed, 34 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index d50449931..408e5c285 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -35,6 +35,8 @@ Changes: to the now deprecated ``OpenSSL.crypto.CRL`` arguments. - Fixed ``test_set_default_verify_paths`` test so that it is skipped if no network connection is available. +- Added ``OpenSSL.SSL.Connection.get_selected_srtp_profile`` to determine which SRTP profile was negotiated. + `#1279 `_. 23.2.0 (2023-05-30) ------------------- diff --git a/src/OpenSSL/SSL.py b/src/OpenSSL/SSL.py index ae07442f5..4db5240e7 100644 --- a/src/OpenSSL/SSL.py +++ b/src/OpenSSL/SSL.py @@ -2858,6 +2858,19 @@ def get_alpn_proto_negotiated(self): return _ffi.buffer(data[0], data_len[0])[:] + def get_selected_srtp_profile(self): + """ + Get the SRTP protocol which was negotiated. + + :returns: A bytestring of the SRTP profile name. If no profile has been + negotiated yet, returns an empty bytestring. + """ + profile = _lib.SSL_get_selected_srtp_profile(self._ssl) + if not profile: + return b"" + + return _ffi.string(profile.name) + def request_ocsp(self): """ Called to request that the server sends stapled OCSP data, if diff --git a/tests/test_ssl.py b/tests/test_ssl.py index d667eec24..ca5bf836d 100644 --- a/tests/test_ssl.py +++ b/tests/test_ssl.py @@ -4380,7 +4380,7 @@ class TestDTLS: # Arbitrary number larger than any conceivable handshake volley. LARGE_BUFFER = 65536 - def test_it_works_at_all(self): + def _test_handshake_and_data(self, srtp_profile): s_ctx = Context(DTLS_METHOD) def generate_cookie(ssl): @@ -4394,11 +4394,15 @@ def verify_cookie(ssl, cookie): s_ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem)) s_ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem)) s_ctx.set_options(OP_NO_QUERY_MTU) + if srtp_profile is not None: + s_ctx.set_tlsext_use_srtp(srtp_profile) s = Connection(s_ctx) s.set_accept_state() c_ctx = Context(DTLS_METHOD) c_ctx.set_options(OP_NO_QUERY_MTU) + if srtp_profile is not None: + c_ctx.set_tlsext_use_srtp(srtp_profile) c = Connection(c_ctx) c.set_connect_state() @@ -4480,6 +4484,14 @@ def pump(): pump() assert s.read(100) == b"goodbye" + # Check whether SRTP was negotiated + if srtp_profile is not None: + assert s.get_selected_srtp_profile() == srtp_profile + assert c.get_selected_srtp_profile() == srtp_profile + else: + assert s.get_selected_srtp_profile() == b"" + assert c.get_selected_srtp_profile() == b"" + # Check that the MTU set/query functions are doing *something* c.set_ciphertext_mtu(1000) try: @@ -4492,6 +4504,12 @@ def pump(): except NotImplementedError: # OpenSSL 1.1.0 and earlier pass + def test_it_works_at_all(self): + self._test_handshake_and_data(srtp_profile=None) + + def test_it_works_with_srtp(self): + self._test_handshake_and_data(srtp_profile=b"SRTP_AES128_CM_SHA1_80") + def test_timeout(self, monkeypatch): c_ctx = Context(DTLS_METHOD) c = Connection(c_ctx)