Skip to content

Commit

Permalink
Reuse the oldest keep-alive connection first (#9672)
Browse files Browse the repository at this point in the history
  • Loading branch information
bdraco authored Nov 5, 2024
1 parent 75ae623 commit afb5ebb
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 51 deletions.
3 changes: 3 additions & 0 deletions CHANGES/9672.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fixed the keep-alive connection pool to be FIFO instead of LIFO -- by :user:`bdraco`.

Keep-alive connections are more likely to be reused before they disconnect.
25 changes: 13 additions & 12 deletions aiohttp/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import sys
import traceback
import warnings
from collections import OrderedDict, defaultdict
from collections import OrderedDict, defaultdict, deque
from contextlib import suppress
from http import HTTPStatus
from itertools import chain, cycle, islice
Expand All @@ -18,6 +18,7 @@
Awaitable,
Callable,
DefaultDict,
Deque,
Dict,
Iterator,
List,
Expand Down Expand Up @@ -241,7 +242,12 @@ def __init__(
if loop.get_debug():
self._source_traceback = traceback.extract_stack(sys._getframe(1))

self._conns: Dict[ConnectionKey, List[Tuple[ResponseHandler, float]]] = {}
# Connection pool of reusable connections.
# We use a deque to store connections because it has O(1) popleft()
# and O(1) append() operations to implement a FIFO queue.
self._conns: DefaultDict[
ConnectionKey, Deque[Tuple[ResponseHandler, float]]
] = defaultdict(deque)
self._limit = limit
self._limit_per_host = limit_per_host
self._acquired: Set[ResponseHandler] = set()
Expand Down Expand Up @@ -341,10 +347,10 @@ def _cleanup(self) -> None:
timeout = self._keepalive_timeout

if self._conns:
connections = {}
connections = defaultdict(deque)
deadline = now - timeout
for key, conns in self._conns.items():
alive: List[Tuple[ResponseHandler, float]] = []
alive: Deque[Tuple[ResponseHandler, float]] = deque()
for proto, use_time in conns:
if proto.is_connected() and use_time - deadline >= 0:
alive.append((proto, use_time))
Expand Down Expand Up @@ -583,14 +589,12 @@ async def _get(
The connection will be marked as acquired.
"""
try:
conns = self._conns[key]
except KeyError:
if (conns := self._conns.get(key)) is None:
return None

t1 = monotonic()
while conns:
proto, t0 = conns.pop()
proto, t0 = conns.popleft()
# We will we reuse the connection if its connected and
# the keepalive timeout has not been exceeded
if proto.is_connected() and t1 - t0 <= self._keepalive_timeout:
Expand Down Expand Up @@ -685,10 +689,7 @@ def _release(
self._cleanup_closed_transports.append(transport)
return

conns = self._conns.get(key)
if conns is None:
conns = self._conns[key] = []
conns.append((protocol, monotonic()))
self._conns[key].append((protocol, monotonic()))

if self._cleanup_handle is None:
self._cleanup_handle = helpers.weakref_handle(
Expand Down
3 changes: 2 additions & 1 deletion tests/test_client_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import gc
import io
import json
from collections import deque
from http.cookies import SimpleCookie
from typing import (
Any,
Expand Down Expand Up @@ -55,7 +56,7 @@ async def make_conn() -> BaseConnector:
key = ConnectionKey("localhost", 80, False, True, None, None, None)
conn = loop.run_until_complete(make_conn())
proto = create_mocked_conn()
conn._conns[key] = [(proto, 123)]
conn._conns[key] = deque([(proto, 123)])
yield conn
loop.run_until_complete(conn.close())

Expand Down
81 changes: 43 additions & 38 deletions tests/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
import ssl
import sys
import uuid
from collections import defaultdict, deque
from concurrent import futures
from contextlib import closing, suppress
from typing import (
Awaitable,
Callable,
Dict,
DefaultDict,
Deque,
Iterator,
List,
Literal,
Expand Down Expand Up @@ -225,7 +227,7 @@ async def test_del_with_scheduled_cleanup(
loop.set_debug(True)
conn = aiohttp.BaseConnector(keepalive_timeout=0.01)
transp = create_mocked_conn(loop)
conn._conns[key] = [(transp, 123)]
conn._conns[key] = deque([(transp, 123)])

conns_impl = conn._conns
exc_handler = mock.Mock()
Expand Down Expand Up @@ -257,7 +259,7 @@ async def make_conn() -> aiohttp.BaseConnector:

conn = loop.run_until_complete(make_conn())
transp = create_mocked_conn(loop)
conn._conns[key] = [(transp, 123)]
conn._conns[key] = deque([(transp, 123)])

conns_impl = conn._conns
exc_handler = mock.Mock()
Expand Down Expand Up @@ -304,7 +306,7 @@ async def test_close(key: ConnectionKey) -> None:

conn = aiohttp.BaseConnector()
assert not conn.closed
conn._conns[key] = [(proto, 0)]
conn._conns[key] = deque([(proto, 0)])
await conn.close()

assert not conn._conns
Expand All @@ -317,7 +319,7 @@ async def test_get(loop: asyncio.AbstractEventLoop, key: ConnectionKey) -> None:
assert await conn._get(key, []) is None

proto = create_mocked_conn(loop)
conn._conns[key] = [(proto, loop.time())]
conn._conns[key] = deque([(proto, loop.time())])
connection = await conn._get(key, [])
assert connection is not None
assert connection.protocol == proto
Expand All @@ -331,14 +333,14 @@ async def test_get_unconnected_proto(loop: asyncio.AbstractEventLoop) -> None:
assert await conn._get(key, []) is None

proto = create_mocked_conn(loop)
conn._conns[key] = [(proto, loop.time())]
conn._conns[key] = deque([(proto, loop.time())])
connection = await conn._get(key, [])
assert connection is not None
assert connection.protocol == proto
connection.close()

assert await conn._get(key, []) is None
conn._conns[key] = [(proto, loop.time())]
conn._conns[key] = deque([(proto, loop.time())])
proto.is_connected = lambda *args: False
assert await conn._get(key, []) is None
await conn.close()
Expand All @@ -350,14 +352,14 @@ async def test_get_unconnected_proto_ssl(loop: asyncio.AbstractEventLoop) -> Non
assert await conn._get(key, []) is None

proto = create_mocked_conn(loop)
conn._conns[key] = [(proto, loop.time())]
conn._conns[key] = deque([(proto, loop.time())])
connection = await conn._get(key, [])
assert connection is not None
assert connection.protocol == proto
connection.close()

assert await conn._get(key, []) is None
conn._conns[key] = [(proto, loop.time())]
conn._conns[key] = deque([(proto, loop.time())])
proto.is_connected = lambda *args: False
assert await conn._get(key, []) is None
await conn.close()
Expand All @@ -369,7 +371,7 @@ async def test_get_expired(loop: asyncio.AbstractEventLoop) -> None:
assert await conn._get(key, []) is None

proto = create_mocked_conn(loop)
conn._conns[key] = [(proto, loop.time() - 1000)]
conn._conns[key] = deque([(proto, loop.time() - 1000)])
assert await conn._get(key, []) is None
assert not conn._conns
await conn.close()
Expand All @@ -382,7 +384,7 @@ async def test_get_expired_ssl(loop: asyncio.AbstractEventLoop) -> None:

proto = create_mocked_conn(loop)
transport = proto.transport
conn._conns[key] = [(proto, loop.time() - 1000)]
conn._conns[key] = deque([(proto, loop.time() - 1000)])
assert await conn._get(key, []) is None
assert not conn._conns
assert conn._cleanup_closed_transports == [transport]
Expand Down Expand Up @@ -1506,7 +1508,6 @@ async def test_get_pop_empty_conns(
) -> None:
# see issue #473
conn = aiohttp.BaseConnector()
conn._conns[key] = []
assert await conn._get(key, []) is None
assert not conn._conns

Expand All @@ -1530,12 +1531,12 @@ async def test_release_close_do_not_delete_existing_connections(
proto1 = create_mocked_conn(loop)

conn = aiohttp.BaseConnector()
conn._conns[key] = [(proto1, 1)]
conn._conns[key] = deque([(proto1, 1)])

proto = create_mocked_conn(loop, should_close=True)
conn._acquired.add(proto)
conn._release(key, proto)
assert conn._conns[key] == [(proto1, 1)]
assert conn._conns[key] == deque([(proto1, 1)])
assert proto.close.called
await conn.close()

Expand Down Expand Up @@ -1573,7 +1574,7 @@ async def test_connect(loop: asyncio.AbstractEventLoop, key: ConnectionKey) -> N
req = ClientRequest("GET", URL("http://localhost:80"), loop=loop)

conn = aiohttp.BaseConnector()
conn._conns[key] = [(proto, loop.time())]
conn._conns[key] = deque([(proto, loop.time())])
with mock.patch.object(conn, "_create_connection", create_mocked_conn(loop)) as m:
m.return_value = loop.create_future()
m.return_value.set_result(proto)
Expand Down Expand Up @@ -1809,9 +1810,10 @@ async def test_cleanup(key: ConnectionKey) -> None:
m2 = mock.Mock()
m1.is_connected.return_value = True
m2.is_connected.return_value = False
testset: Dict[ConnectionKey, List[Tuple[ResponseHandler, float]]] = {
key: [(m1, 10), (m2, 300)],
}
testset: DefaultDict[ConnectionKey, Deque[Tuple[ResponseHandler, float]]] = (
defaultdict(deque)
)
testset[key] = deque([(m1, 10), (m2, 300)])

loop = mock.Mock()
loop.time.return_value = 300
Expand All @@ -1831,9 +1833,10 @@ async def test_cleanup_close_ssl_transport(
) -> None:
proto = create_mocked_conn(loop)
transport = proto.transport
testset: Dict[ConnectionKey, List[Tuple[ResponseHandler, float]]] = {
ssl_key: [(proto, 10)]
}
testset: DefaultDict[ConnectionKey, Deque[Tuple[ResponseHandler, float]]] = (
defaultdict(deque)
)
testset[ssl_key] = deque([(proto, 10)])

loop = mock.Mock()
new_time = asyncio.get_event_loop().time() + 300
Expand All @@ -1853,9 +1856,10 @@ async def test_cleanup_close_ssl_transport(
async def test_cleanup2(loop: asyncio.AbstractEventLoop, key: ConnectionKey) -> None:
m = create_mocked_conn()
m.is_connected.return_value = True
testset: Dict[ConnectionKey, List[Tuple[ResponseHandler, float]]] = {
key: [(m, 300)]
}
testset: DefaultDict[ConnectionKey, Deque[Tuple[ResponseHandler, float]]] = (
defaultdict(deque)
)
testset[key] = deque([(m, 300)])

conn = aiohttp.BaseConnector(keepalive_timeout=10)
conn._loop = mock.Mock()
Expand All @@ -1873,9 +1877,10 @@ async def test_cleanup2(loop: asyncio.AbstractEventLoop, key: ConnectionKey) ->
async def test_cleanup3(loop: asyncio.AbstractEventLoop, key: ConnectionKey) -> None:
m = create_mocked_conn(loop)
m.is_connected.return_value = True
testset: Dict[ConnectionKey, List[Tuple[ResponseHandler, float]]] = {
key: [(m, 290.1), (create_mocked_conn(loop), 305.1)]
}
testset: DefaultDict[ConnectionKey, Deque[Tuple[ResponseHandler, float]]] = (
defaultdict(deque)
)
testset[key] = deque([(m, 290.1), (create_mocked_conn(loop), 305.1)])

conn = aiohttp.BaseConnector(keepalive_timeout=10)
conn._loop = mock.Mock()
Expand All @@ -1885,7 +1890,7 @@ async def test_cleanup3(loop: asyncio.AbstractEventLoop, key: ConnectionKey) ->
with mock.patch("aiohttp.connector.monotonic", return_value=308.5):
conn._cleanup()

assert conn._conns == {key: [testset[key][1]]}
assert conn._conns == {key: deque([testset[key][1]])}

assert conn._cleanup_handle is not None
conn._loop.call_at.assert_called_with(319, mock.ANY, mock.ANY)
Expand Down Expand Up @@ -2073,7 +2078,7 @@ async def test_close_twice(loop: asyncio.AbstractEventLoop, key: ConnectionKey)
proto: ResponseHandler = create_mocked_conn(loop)

conn = aiohttp.BaseConnector()
conn._conns[key] = [(proto, 0)]
conn._conns[key] = deque([(proto, 0)])
await conn.close()

assert not conn._conns
Expand Down Expand Up @@ -2473,7 +2478,7 @@ async def test_connect_with_limit(
)

conn = aiohttp.BaseConnector(limit=1)
conn._conns[key] = [(proto, loop.time())]
conn._conns[key] = deque([(proto, loop.time())])
with mock.patch.object(
conn, "_create_connection", autospec=True, spec_set=True, return_value=proto
):
Expand Down Expand Up @@ -2530,7 +2535,7 @@ async def test_connect_queued_operation_tracing(
)

conn = aiohttp.BaseConnector(limit=1)
conn._conns[key] = [(proto, loop.time())]
conn._conns[key] = deque([(proto, loop.time())])
with mock.patch.object(
conn, "_create_connection", autospec=True, spec_set=True, return_value=proto
):
Expand Down Expand Up @@ -2575,7 +2580,7 @@ async def test_connect_reuseconn_tracing(
)

conn = aiohttp.BaseConnector(limit=1)
conn._conns[key] = [(proto, loop.time())]
conn._conns[key] = deque([(proto, loop.time())])
conn2 = await conn.connect(req, traces, ClientTimeout())
conn2.release()

Expand All @@ -2594,7 +2599,7 @@ async def test_connect_with_limit_and_limit_per_host(
req = ClientRequest("GET", URL("http://localhost:80"), loop=loop)

conn = aiohttp.BaseConnector(limit=1000, limit_per_host=1)
conn._conns[key] = [(proto, loop.time())]
conn._conns[key] = deque([(proto, loop.time())])
with mock.patch.object(
conn, "_create_connection", autospec=True, spec_set=True, return_value=proto
):
Expand Down Expand Up @@ -2629,7 +2634,7 @@ async def test_connect_with_no_limit_and_limit_per_host(
req = ClientRequest("GET", URL("http://localhost1:80"), loop=loop)

conn = aiohttp.BaseConnector(limit=0, limit_per_host=1)
conn._conns[key] = [(proto, loop.time())]
conn._conns[key] = deque([(proto, loop.time())])
with mock.patch.object(
conn, "_create_connection", autospec=True, spec_set=True, return_value=proto
):
Expand Down Expand Up @@ -2662,7 +2667,7 @@ async def test_connect_with_no_limits(
req = ClientRequest("GET", URL("http://localhost:80"), loop=loop)

conn = aiohttp.BaseConnector(limit=0, limit_per_host=0)
conn._conns[key] = [(proto, loop.time())]
conn._conns[key] = deque([(proto, loop.time())])
with mock.patch.object(
conn, "_create_connection", autospec=True, spec_set=True, return_value=proto
):
Expand Down Expand Up @@ -2695,7 +2700,7 @@ async def test_connect_with_limit_cancelled(
req = ClientRequest("GET", URL("http://host:80"), loop=loop)

conn = aiohttp.BaseConnector(limit=1)
conn._conns[key] = [(proto, loop.time())]
conn._conns[key] = deque([(proto, loop.time())])
with mock.patch.object(
conn, "_create_connection", autospec=True, spec_set=True, return_value=proto
):
Expand Down Expand Up @@ -2845,7 +2850,7 @@ async def test_close_with_acquired_connection(
req = ClientRequest("GET", URL("http://host:80"), loop=loop)

conn = aiohttp.BaseConnector(limit=1)
conn._conns[key] = [(proto, loop.time())]
conn._conns[key] = deque([(proto, loop.time())])
with mock.patch.object(
conn, "_create_connection", autospec=True, spec_set=True, return_value=proto
):
Expand Down Expand Up @@ -3513,7 +3518,7 @@ async def allow_connection_and_add_dummy_waiter() -> None:
spec_set=True,
side_effect=[0, 1, 1, 1],
):
connector._conns[key] = [(proto, loop.time())]
connector._conns[key] = deque([(proto, loop.time())])
with mock.patch.object(
connector,
"_create_connection",
Expand Down

0 comments on commit afb5ebb

Please sign in to comment.