Skip to content

Commit

Permalink
[tls] Add parsing and serialization for certificate requests
Browse files Browse the repository at this point in the history
In order to implement support for client certificates, we need to be
able to parse and serialize CERTIFICATE_REQUEST messages.

Co-authored-by: doronz88 <[email protected]>
  • Loading branch information
jlaine and doronz88 committed Nov 16, 2023
1 parent 6a777de commit 41fd641
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 0 deletions.
49 changes: 49 additions & 0 deletions src/aioquic/tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,55 @@ def push_certificate_entry(buf: Buffer, entry: CertificateEntry) -> None:
)


@dataclass
class CertificateRequest:
request_context: bytes = b""
signature_algorithms: Optional[List[int]] = None
other_extensions: List[Tuple[int, bytes]] = field(default_factory=list)


def pull_certificate_request(buf: Buffer) -> CertificateRequest:
certificate_request = CertificateRequest()

assert buf.pull_uint8() == HandshakeType.CERTIFICATE_REQUEST
with pull_block(buf, 3):
certificate_request.request_context = pull_opaque(buf, 1)

def pull_extension() -> None:
extension_type = buf.pull_uint16()
extension_length = buf.pull_uint16()
if extension_type == ExtensionType.SIGNATURE_ALGORITHMS:
certificate_request.signature_algorithms = pull_list(
buf, 2, buf.pull_uint16
)
else:
certificate_request.other_extensions.append(
(extension_type, buf.pull_bytes(extension_length))
)

pull_list(buf, 2, pull_extension)

return certificate_request


def push_certificate_request(
buf: Buffer, certificate_request: CertificateRequest
) -> None:
buf.push_uint8(HandshakeType.CERTIFICATE_REQUEST)
with push_block(buf, 3):
push_opaque(buf, 1, certificate_request.request_context)

with push_block(buf, 2):
with push_extension(buf, ExtensionType.SIGNATURE_ALGORITHMS):
push_list(
buf, 2, buf.push_uint16, certificate_request.signature_algorithms
)

for extension_type, extension_value in certificate_request.other_extensions:
with push_extension(buf, extension_type):
buf.push_bytes(extension_value)


@dataclass
class CertificateVerify:
algorithm: int
Expand Down
36 changes: 36 additions & 0 deletions tests/test_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from aioquic.quic.configuration import QuicConfiguration
from aioquic.tls import (
Certificate,
CertificateRequest,
CertificateVerify,
ClientHello,
Context,
Expand All @@ -20,13 +21,15 @@
load_pem_x509_certificates,
pull_block,
pull_certificate,
pull_certificate_request,
pull_certificate_verify,
pull_client_hello,
pull_encrypted_extensions,
pull_finished,
pull_new_session_ticket,
pull_server_hello,
push_certificate,
push_certificate_request,
push_certificate_verify,
push_client_hello,
push_encrypted_extensions,
Expand Down Expand Up @@ -1228,6 +1231,39 @@ def test_push_certificate(self):
push_certificate(buf, certificate)
self.assertEqual(buf.data, load("tls_certificate.bin"))

def test_pull_certificate_request(self):
buf = Buffer(data=load("tls_certificate_request.bin"))
certificate_request = pull_certificate_request(buf)
self.assertTrue(buf.eof())

self.assertEqual(certificate_request.request_context, b"")
self.assertEqual(
certificate_request.signature_algorithms,
[
tls.SignatureAlgorithm.RSA_PSS_RSAE_SHA256,
tls.SignatureAlgorithm.ECDSA_SECP256R1_SHA256,
tls.SignatureAlgorithm.RSA_PKCS1_SHA256,
tls.SignatureAlgorithm.RSA_PKCS1_SHA1,
],
)
self.assertEqual(certificate_request.other_extensions, [(12345, b"foo")])

def test_push_certificate_request(self):
certificate_request = CertificateRequest(
request_context=b"",
signature_algorithms=[
tls.SignatureAlgorithm.RSA_PSS_RSAE_SHA256,
tls.SignatureAlgorithm.ECDSA_SECP256R1_SHA256,
tls.SignatureAlgorithm.RSA_PKCS1_SHA256,
tls.SignatureAlgorithm.RSA_PKCS1_SHA1,
],
other_extensions=[(12345, b"foo")],
)

buf = Buffer(400)
push_certificate_request(buf, certificate_request)
self.assertEqual(buf.data, load("tls_certificate_request.bin"))

def test_pull_certificate_verify(self):
buf = Buffer(data=load("tls_certificate_verify.bin"))
verify = pull_certificate_verify(buf)
Expand Down
Binary file added tests/tls_certificate_request.bin
Binary file not shown.

0 comments on commit 41fd641

Please sign in to comment.