Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse the oldest keep-alive connection first #9672

Merged
merged 13 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions aiohttp/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import sys
import traceback
import warnings
from collections import OrderedDict, defaultdict
from collections import OrderedDict, defaultdict, deque
from contextlib import suppress
from http import HTTPStatus
from itertools import chain, cycle, islice
Expand All @@ -18,6 +18,7 @@
Awaitable,
Callable,
DefaultDict,
Deque,
Dict,
Iterator,
List,
Expand Down Expand Up @@ -241,7 +242,9 @@ def __init__(
if loop.get_debug():
self._source_traceback = traceback.extract_stack(sys._getframe(1))

self._conns: Dict[ConnectionKey, List[Tuple[ResponseHandler, float]]] = {}
self._conns: DefaultDict[
bdraco marked this conversation as resolved.
Show resolved Hide resolved
ConnectionKey, Deque[Tuple[ResponseHandler, float]]
] = defaultdict(deque)
self._limit = limit
self._limit_per_host = limit_per_host
self._acquired: Set[ResponseHandler] = set()
Expand Down Expand Up @@ -341,10 +344,10 @@ def _cleanup(self) -> None:
timeout = self._keepalive_timeout

if self._conns:
connections = {}
connections = defaultdict(deque)
deadline = now - timeout
for key, conns in self._conns.items():
alive: List[Tuple[ResponseHandler, float]] = []
alive: Deque[Tuple[ResponseHandler, float]] = deque()
for proto, use_time in conns:
if proto.is_connected() and use_time - deadline >= 0:
alive.append((proto, use_time))
Expand Down Expand Up @@ -583,14 +586,12 @@ async def _get(

The connection will be marked as acquired.
"""
try:
conns = self._conns[key]
except KeyError:
if (conns := self._conns.get(key)) is None:
return None

t1 = monotonic()
while conns:
proto, t0 = conns.pop()
proto, t0 = conns.popleft()
# We will we reuse the connection if its connected and
# the keepalive timeout has not been exceeded
if proto.is_connected() and t1 - t0 <= self._keepalive_timeout:
Expand Down Expand Up @@ -685,10 +686,7 @@ def _release(
self._cleanup_closed_transports.append(transport)
return

conns = self._conns.get(key)
if conns is None:
conns = self._conns[key] = []
conns.append((protocol, monotonic()))
self._conns[key].append((protocol, monotonic()))

if self._cleanup_handle is None:
self._cleanup_handle = helpers.weakref_handle(
Expand Down
3 changes: 2 additions & 1 deletion tests/test_client_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import gc
import io
import json
from collections import deque
from http.cookies import SimpleCookie
from typing import (
Any,
Expand Down Expand Up @@ -55,7 +56,7 @@ async def make_conn() -> BaseConnector:
key = ConnectionKey("localhost", 80, False, True, None, None, None)
conn = loop.run_until_complete(make_conn())
proto = create_mocked_conn()
conn._conns[key] = [(proto, 123)]
conn._conns[key] = deque([(proto, 123)])
yield conn
loop.run_until_complete(conn.close())

Expand Down
81 changes: 43 additions & 38 deletions tests/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
import ssl
import sys
import uuid
from collections import defaultdict, deque
from concurrent import futures
from contextlib import closing, suppress
from typing import (
Awaitable,
Callable,
Dict,
DefaultDict,
Deque,
Iterator,
List,
Literal,
Expand Down Expand Up @@ -225,7 +227,7 @@ async def test_del_with_scheduled_cleanup(
loop.set_debug(True)
conn = aiohttp.BaseConnector(keepalive_timeout=0.01)
transp = create_mocked_conn(loop)
conn._conns[key] = [(transp, 123)]
conn._conns[key] = deque([(transp, 123)])

conns_impl = conn._conns
exc_handler = mock.Mock()
Expand Down Expand Up @@ -257,7 +259,7 @@ async def make_conn() -> aiohttp.BaseConnector:

conn = loop.run_until_complete(make_conn())
transp = create_mocked_conn(loop)
conn._conns[key] = [(transp, 123)]
conn._conns[key] = deque([(transp, 123)])

conns_impl = conn._conns
exc_handler = mock.Mock()
Expand Down Expand Up @@ -304,7 +306,7 @@ async def test_close(key: ConnectionKey) -> None:

conn = aiohttp.BaseConnector()
assert not conn.closed
conn._conns[key] = [(proto, 0)]
conn._conns[key] = deque([(proto, 0)])
await conn.close()

assert not conn._conns
Expand All @@ -317,7 +319,7 @@ async def test_get(loop: asyncio.AbstractEventLoop, key: ConnectionKey) -> None:
assert await conn._get(key, []) is None

proto = create_mocked_conn(loop)
conn._conns[key] = [(proto, loop.time())]
conn._conns[key] = deque([(proto, loop.time())])
connection = await conn._get(key, [])
assert connection is not None
assert connection.protocol == proto
Expand All @@ -331,14 +333,14 @@ async def test_get_unconnected_proto(loop: asyncio.AbstractEventLoop) -> None:
assert await conn._get(key, []) is None

proto = create_mocked_conn(loop)
conn._conns[key] = [(proto, loop.time())]
conn._conns[key] = deque([(proto, loop.time())])
connection = await conn._get(key, [])
assert connection is not None
assert connection.protocol == proto
connection.close()

assert await conn._get(key, []) is None
conn._conns[key] = [(proto, loop.time())]
conn._conns[key] = deque([(proto, loop.time())])
proto.is_connected = lambda *args: False
assert await conn._get(key, []) is None
await conn.close()
Expand All @@ -350,14 +352,14 @@ async def test_get_unconnected_proto_ssl(loop: asyncio.AbstractEventLoop) -> Non
assert await conn._get(key, []) is None

proto = create_mocked_conn(loop)
conn._conns[key] = [(proto, loop.time())]
conn._conns[key] = deque([(proto, loop.time())])
connection = await conn._get(key, [])
assert connection is not None
assert connection.protocol == proto
connection.close()

assert await conn._get(key, []) is None
conn._conns[key] = [(proto, loop.time())]
conn._conns[key] = deque([(proto, loop.time())])
proto.is_connected = lambda *args: False
assert await conn._get(key, []) is None
await conn.close()
Expand All @@ -369,7 +371,7 @@ async def test_get_expired(loop: asyncio.AbstractEventLoop) -> None:
assert await conn._get(key, []) is None

proto = create_mocked_conn(loop)
conn._conns[key] = [(proto, loop.time() - 1000)]
conn._conns[key] = deque([(proto, loop.time() - 1000)])
assert await conn._get(key, []) is None
assert not conn._conns
await conn.close()
Expand All @@ -382,7 +384,7 @@ async def test_get_expired_ssl(loop: asyncio.AbstractEventLoop) -> None:

proto = create_mocked_conn(loop)
transport = proto.transport
conn._conns[key] = [(proto, loop.time() - 1000)]
conn._conns[key] = deque([(proto, loop.time() - 1000)])
assert await conn._get(key, []) is None
assert not conn._conns
assert conn._cleanup_closed_transports == [transport]
Expand Down Expand Up @@ -1506,7 +1508,6 @@ async def test_get_pop_empty_conns(
) -> None:
# see issue #473
conn = aiohttp.BaseConnector()
conn._conns[key] = []
assert await conn._get(key, []) is None
assert not conn._conns

Expand All @@ -1530,12 +1531,12 @@ async def test_release_close_do_not_delete_existing_connections(
proto1 = create_mocked_conn(loop)

conn = aiohttp.BaseConnector()
conn._conns[key] = [(proto1, 1)]
conn._conns[key] = deque([(proto1, 1)])

proto = create_mocked_conn(loop, should_close=True)
conn._acquired.add(proto)
conn._release(key, proto)
assert conn._conns[key] == [(proto1, 1)]
assert conn._conns[key] == deque([(proto1, 1)])
assert proto.close.called
await conn.close()

Expand Down Expand Up @@ -1573,7 +1574,7 @@ async def test_connect(loop: asyncio.AbstractEventLoop, key: ConnectionKey) -> N
req = ClientRequest("GET", URL("http://localhost:80"), loop=loop)

conn = aiohttp.BaseConnector()
conn._conns[key] = [(proto, loop.time())]
conn._conns[key] = deque([(proto, loop.time())])
with mock.patch.object(conn, "_create_connection", create_mocked_conn(loop)) as m:
m.return_value = loop.create_future()
m.return_value.set_result(proto)
Expand Down Expand Up @@ -1809,9 +1810,10 @@ async def test_cleanup(key: ConnectionKey) -> None:
m2 = mock.Mock()
m1.is_connected.return_value = True
m2.is_connected.return_value = False
testset: Dict[ConnectionKey, List[Tuple[ResponseHandler, float]]] = {
key: [(m1, 10), (m2, 300)],
}
testset: DefaultDict[ConnectionKey, Deque[Tuple[ResponseHandler, float]]] = (
defaultdict(deque)
)
testset[key] = deque([(m1, 10), (m2, 300)])

loop = mock.Mock()
loop.time.return_value = 300
Expand All @@ -1831,9 +1833,10 @@ async def test_cleanup_close_ssl_transport(
) -> None:
proto = create_mocked_conn(loop)
transport = proto.transport
testset: Dict[ConnectionKey, List[Tuple[ResponseHandler, float]]] = {
ssl_key: [(proto, 10)]
}
testset: DefaultDict[ConnectionKey, Deque[Tuple[ResponseHandler, float]]] = (
defaultdict(deque)
)
testset[ssl_key] = deque([(proto, 10)])

loop = mock.Mock()
new_time = asyncio.get_event_loop().time() + 300
Expand All @@ -1853,9 +1856,10 @@ async def test_cleanup_close_ssl_transport(
async def test_cleanup2(loop: asyncio.AbstractEventLoop, key: ConnectionKey) -> None:
m = create_mocked_conn()
m.is_connected.return_value = True
testset: Dict[ConnectionKey, List[Tuple[ResponseHandler, float]]] = {
key: [(m, 300)]
}
testset: DefaultDict[ConnectionKey, Deque[Tuple[ResponseHandler, float]]] = (
defaultdict(deque)
)
testset[key] = deque([(m, 300)])

conn = aiohttp.BaseConnector(keepalive_timeout=10)
conn._loop = mock.Mock()
Expand All @@ -1873,9 +1877,10 @@ async def test_cleanup2(loop: asyncio.AbstractEventLoop, key: ConnectionKey) ->
async def test_cleanup3(loop: asyncio.AbstractEventLoop, key: ConnectionKey) -> None:
m = create_mocked_conn(loop)
m.is_connected.return_value = True
testset: Dict[ConnectionKey, List[Tuple[ResponseHandler, float]]] = {
key: [(m, 290.1), (create_mocked_conn(loop), 305.1)]
}
testset: DefaultDict[ConnectionKey, Deque[Tuple[ResponseHandler, float]]] = (
defaultdict(deque)
)
testset[key] = deque([(m, 290.1), (create_mocked_conn(loop), 305.1)])

conn = aiohttp.BaseConnector(keepalive_timeout=10)
conn._loop = mock.Mock()
Expand All @@ -1885,7 +1890,7 @@ async def test_cleanup3(loop: asyncio.AbstractEventLoop, key: ConnectionKey) ->
with mock.patch("aiohttp.connector.monotonic", return_value=308.5):
conn._cleanup()

assert conn._conns == {key: [testset[key][1]]}
assert conn._conns == {key: deque([testset[key][1]])}

assert conn._cleanup_handle is not None
conn._loop.call_at.assert_called_with(319, mock.ANY, mock.ANY)
Expand Down Expand Up @@ -2073,7 +2078,7 @@ async def test_close_twice(loop: asyncio.AbstractEventLoop, key: ConnectionKey)
proto: ResponseHandler = create_mocked_conn(loop)

conn = aiohttp.BaseConnector()
conn._conns[key] = [(proto, 0)]
conn._conns[key] = deque([(proto, 0)])
await conn.close()

assert not conn._conns
Expand Down Expand Up @@ -2473,7 +2478,7 @@ async def test_connect_with_limit(
)

conn = aiohttp.BaseConnector(limit=1)
conn._conns[key] = [(proto, loop.time())]
conn._conns[key] = deque([(proto, loop.time())])
with mock.patch.object(
conn, "_create_connection", autospec=True, spec_set=True, return_value=proto
):
Expand Down Expand Up @@ -2530,7 +2535,7 @@ async def test_connect_queued_operation_tracing(
)

conn = aiohttp.BaseConnector(limit=1)
conn._conns[key] = [(proto, loop.time())]
conn._conns[key] = deque([(proto, loop.time())])
with mock.patch.object(
conn, "_create_connection", autospec=True, spec_set=True, return_value=proto
):
Expand Down Expand Up @@ -2575,7 +2580,7 @@ async def test_connect_reuseconn_tracing(
)

conn = aiohttp.BaseConnector(limit=1)
conn._conns[key] = [(proto, loop.time())]
conn._conns[key] = deque([(proto, loop.time())])
conn2 = await conn.connect(req, traces, ClientTimeout())
conn2.release()

Expand All @@ -2594,7 +2599,7 @@ async def test_connect_with_limit_and_limit_per_host(
req = ClientRequest("GET", URL("http://localhost:80"), loop=loop)

conn = aiohttp.BaseConnector(limit=1000, limit_per_host=1)
conn._conns[key] = [(proto, loop.time())]
conn._conns[key] = deque([(proto, loop.time())])
with mock.patch.object(
conn, "_create_connection", autospec=True, spec_set=True, return_value=proto
):
Expand Down Expand Up @@ -2629,7 +2634,7 @@ async def test_connect_with_no_limit_and_limit_per_host(
req = ClientRequest("GET", URL("http://localhost1:80"), loop=loop)

conn = aiohttp.BaseConnector(limit=0, limit_per_host=1)
conn._conns[key] = [(proto, loop.time())]
conn._conns[key] = deque([(proto, loop.time())])
with mock.patch.object(
conn, "_create_connection", autospec=True, spec_set=True, return_value=proto
):
Expand Down Expand Up @@ -2662,7 +2667,7 @@ async def test_connect_with_no_limits(
req = ClientRequest("GET", URL("http://localhost:80"), loop=loop)

conn = aiohttp.BaseConnector(limit=0, limit_per_host=0)
conn._conns[key] = [(proto, loop.time())]
conn._conns[key] = deque([(proto, loop.time())])
with mock.patch.object(
conn, "_create_connection", autospec=True, spec_set=True, return_value=proto
):
Expand Down Expand Up @@ -2695,7 +2700,7 @@ async def test_connect_with_limit_cancelled(
req = ClientRequest("GET", URL("http://host:80"), loop=loop)

conn = aiohttp.BaseConnector(limit=1)
conn._conns[key] = [(proto, loop.time())]
conn._conns[key] = deque([(proto, loop.time())])
with mock.patch.object(
conn, "_create_connection", autospec=True, spec_set=True, return_value=proto
):
Expand Down Expand Up @@ -2845,7 +2850,7 @@ async def test_close_with_acquired_connection(
req = ClientRequest("GET", URL("http://host:80"), loop=loop)

conn = aiohttp.BaseConnector(limit=1)
conn._conns[key] = [(proto, loop.time())]
conn._conns[key] = deque([(proto, loop.time())])
with mock.patch.object(
conn, "_create_connection", autospec=True, spec_set=True, return_value=proto
):
Expand Down Expand Up @@ -3513,7 +3518,7 @@ async def allow_connection_and_add_dummy_waiter() -> None:
spec_set=True,
side_effect=[0, 1, 1, 1],
):
connector._conns[key] = [(proto, loop.time())]
connector._conns[key] = deque([(proto, loop.time())])
with mock.patch.object(
connector,
"_create_connection",
Expand Down
Loading