Skip to content

Commit

Permalink
Fix SSLContext deprecation warnings
Browse files Browse the repository at this point in the history
`SSLContext(PROTOCOL_SSL...)` should not be used anymore.  Also, silence
the one test where we deliberately test TLS v1.1
  • Loading branch information
elprans committed Nov 7, 2021
1 parent 2f4fe53 commit 4d39a05
Showing 1 changed file with 40 additions and 31 deletions.
71 changes: 40 additions & 31 deletions tests/test_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import unittest
import unittest.mock
import urllib.parse
import warnings
import weakref

import asyncpg
Expand Down Expand Up @@ -1144,7 +1145,7 @@ def check():

@unittest.skipIf(os.environ.get('PGHOST'), 'unmanaged cluster')
async def test_connection_ssl_to_no_ssl_server(self):
ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_context.load_verify_locations(SSL_CA_CERT_FILE)

with self.assertRaisesRegex(ConnectionError, 'rejected SSL'):
Expand Down Expand Up @@ -1268,7 +1269,7 @@ def _add_hba_entry(self):
auth_method='trust')

async def test_ssl_connection_custom_context(self):
ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_context.load_verify_locations(SSL_CA_CERT_FILE)

con = await self.connect(
Expand Down Expand Up @@ -1360,7 +1361,7 @@ async def test_ssl_connection_default_context(self):
self.loop.set_exception_handler(old_handler)

async def test_ssl_connection_pool(self):
ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_context.load_verify_locations(SSL_CA_CERT_FILE)

pool = await self.create_pool(
Expand All @@ -1385,7 +1386,7 @@ async def worker():
await pool.close()

async def test_executemany_uvloop_ssl_issue_700(self):
ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_context.load_verify_locations(SSL_CA_CERT_FILE)

con = await self.connect(
Expand Down Expand Up @@ -1417,38 +1418,46 @@ async def test_tls_version(self):

# XXX: uvloop artifact
old_handler = self.loop.get_exception_handler()
try:
self.loop.set_exception_handler(lambda *args: None)
with self.assertRaisesRegex(ssl.SSLError, 'protocol version'):
await self.connect(
dsn='postgresql://ssl_user@localhost/postgres'
'?sslmode=require&ssl_min_protocol_version=TLSv1.3'
)
with self.assertRaises(ssl.SSLError):
await self.connect(
dsn='postgresql://ssl_user@localhost/postgres'
'?sslmode=require'
'&ssl_min_protocol_version=TLSv1.1'
'&ssl_max_protocol_version=TLSv1.1'
)
with self.assertRaisesRegex(ssl.SSLError, 'no protocols'):
await self.connect(

with warnings.catch_warnings():
warnings.filterwarnings(
"ignore",
message="ssl.TLSVersion.TLSv1_1 is deprecated",
category=DeprecationWarning
)
try:
self.loop.set_exception_handler(lambda *args: None)
with self.assertRaisesRegex(ssl.SSLError, 'protocol version'):
await self.connect(
dsn='postgresql://ssl_user@localhost/postgres'
'?sslmode=require&ssl_min_protocol_version=TLSv1.3'
)
with self.assertRaises(ssl.SSLError):
await self.connect(
dsn='postgresql://ssl_user@localhost/postgres'
'?sslmode=require'
'&ssl_min_protocol_version=TLSv1.1'
'&ssl_max_protocol_version=TLSv1.1'
)
with self.assertRaisesRegex(ssl.SSLError, 'no protocols'):
await self.connect(
dsn='postgresql://ssl_user@localhost/postgres'
'?sslmode=require'
'&ssl_min_protocol_version=TLSv1.2'
'&ssl_max_protocol_version=TLSv1.1'
)
con = await self.connect(
dsn='postgresql://ssl_user@localhost/postgres'
'?sslmode=require'
'&ssl_min_protocol_version=TLSv1.2'
'&ssl_max_protocol_version=TLSv1.1'
'&ssl_max_protocol_version=TLSv1.2'
)
con = await self.connect(
dsn='postgresql://ssl_user@localhost/postgres?sslmode=require'
'&ssl_min_protocol_version=TLSv1.2'
'&ssl_max_protocol_version=TLSv1.2'
)
try:
self.assertEqual(await con.fetchval('SELECT 42'), 42)
try:
self.assertEqual(await con.fetchval('SELECT 42'), 42)
finally:
await con.close()
finally:
await con.close()
finally:
self.loop.set_exception_handler(old_handler)
self.loop.set_exception_handler(old_handler)


@unittest.skipIf(os.environ.get('PGHOST'), 'unmanaged cluster')
Expand Down

0 comments on commit 4d39a05

Please sign in to comment.