Skip to content

Commit

Permalink
Add support for converting sessions to / from ASN1 representation
Browse files Browse the repository at this point in the history
  • Loading branch information
mdulaney committed Oct 21, 2024
1 parent 6502a0e commit 8d24c48
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Deprecations:
Changes:
^^^^^^^^

* Added ``OpenSSL.SSL.Session.i2d`` to convert session objects to ASN1. Updated ``OpenSSL.SSL.Session`` constructor to support conversion from ASN1. `#1373 <https://github.com/pyca/pyopenssl/pull/1373>`_.
* ``OpenSSL.SSL.Connection.get_certificate``, ``OpenSSL.SSL.Connection.get_peer_certificate``, ``OpenSSL.SSL.Connection.get_peer_cert_chain``, and ``OpenSSL.SSL.Connection.get_verified_chain`` now take an ``as_cryptography`` keyword-argument. When ``True`` is passed then ``cryptography.x509.Certificate`` are returned, instead of ``OpenSSL.crypto.X509``. In the future, passing ``False`` (the default) will be deprecated.


Expand Down
35 changes: 34 additions & 1 deletion src/OpenSSL/SSL.py
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,40 @@ class Session:
.. versionadded:: 0.14
"""

_session: Any
_session: Any = None

def __init__(self, data: Optional[bytes] = None) -> None:
if data is None:
return

p = _ffi.new("unsigned char[]", data)
pp = _ffi.new("unsigned char **")
pp[0] = p
length = _ffi.cast("long", len(data))

session = _lib.d2i_SSL_SESSION(_ffi.NULL, pp, length)
if session == _ffi.NULL:
_raise_current_error()

self._session = _ffi.gc(session, _lib.SSL_SESSION_free)

def i2d(self) -> bytes:
if self._session is None:
raise ValueError("Not a valid session")

length = _lib.i2d_SSL_SESSION(self._session, _ffi.NULL)
if length == 0:
raise ValueError("Not a valid session")

pp = _ffi.new("unsigned char **")
p = _ffi.new("unsigned char[]", length)
pp[0] = p

length = _lib.i2d_SSL_SESSION(self._session, pp)
if length == 0:
raise ValueError("Not a valid session")

return _ffi.buffer(p, length)[:]


class Context:
Expand Down
102 changes: 100 additions & 2 deletions tests/test_ssl.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,17 +299,21 @@ def _create_certificate_chain():
return [(cakey, cacert), (ikey, icert), (skey, scert)]


def loopback_client_factory(socket, version=SSLv23_METHOD):
def loopback_client_factory(socket, version=SSLv23_METHOD, session_data=None):
client = Connection(Context(version), socket)
if session_data is not None:
client.set_session(Session(session_data))
client.set_connect_state()
return client


def loopback_server_factory(socket, version=SSLv23_METHOD):
def loopback_server_factory(socket, version=SSLv23_METHOD, session_data=None):
ctx = Context(version)
ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
server = Connection(ctx, socket)
if session_data is not None:
server.set_session(Session(session_data))
server.set_accept_state()
return server

Expand Down Expand Up @@ -2154,6 +2158,100 @@ def test_construction(self):
new_session = Session()
assert isinstance(new_session, Session)

def test_d2i_fail(self):
with pytest.raises(Error) as e:
Session(b"abc" * 1000)

assert e.value.args[0][0] in [
# TODO
# 1.1.x
# (
# "SSL routines",
# "SSL_CTX_set_session_id_context",
# "ssl session id context too long",
# ),
# 3.0.x
(
"asn1 encoding routines",
"",
"wrong tag",
),
]

assert e.value.args[0][1] in [
# TODO
# 1.1.x
# (
# "SSL routines",
# "SSL_CTX_set_session_id_context",
# "ssl session id context too long",
# ),
# 3.0.x
(
"asn1 encoding routines",
"",
"nested asn1 error",
),
]

def test_session_success(self):
session_id = (
b"\x51\x6d\x1d\x18\xc3\xb5\x86\x81\xc6\x79\x89\x2c\x89\x3e\x56\x33"
b"\xa7\x9c\xcd\x9b\x87\xbb\xb3\xdc\xf6\x76\x70\xf9\xc0\xdd\xf4\xef"
)

master_key = (
b"\x0f\xb2\x51\xe3\x15\x60\x2d\xef\x6e\x6d\xd2\x94\x2d\xe5\x37\x96"
b"\x72\xfa\xce\xb0\x39\xcc\x8d\xdf\xab\x32\xcc\x75\x0c\x66\xf9\xfd"
b"\xef\xbc\xc6\x2a\x8f\x9c\x35\x16\xfd\x4d\x38\xd9\xf9\xeb\x1d\xe4"
)

session_data = (
# sequence length=0x71
b"\x30\x71"
# integer (version)
b"\x02\x01\x01"
# integer (SSL version)
b"\x02\x02\x03\x03"
# octet-string (cipher suite)
b"\x04\x02\xc0\x30"
# octet-string length=0x20 (session id)
b"\x04\x20"
+ session_id
+
# octet-string length=0x30 (master secret)
b"\x04\x30"
+ master_key
+
# application (1), integer (time)
b"\xa1\x06\x02\x04"
+ b"\x66\xec\x4c\x2d"
+
# application (2), integer (timeout)
b"\xa2\x04\x02\x02"
+ b"\x02\x58"
+
# application (4), octet-string (session id context)
b"\xa4\x02\x04"
+ b"\x00"
)
serverSocket, clientSocket = socket_pair()

client = loopback_client_factory(
clientSocket, session_data=session_data
)
server = loopback_server_factory(
serverSocket, session_data=session_data
)

assert client.master_key() == master_key
assert server.master_key() == master_key

handshake(client, server)

client.send(b"hello world")
assert b"hello world" == server.recv(len(b"hello world"))


@pytest.fixture(params=["context", "connection"])
def ctx_or_conn(request) -> Union[Context, Connection]:
Expand Down

0 comments on commit 8d24c48

Please sign in to comment.