Skip to content

Commit

Permalink
speed up connector limiting (#2937)
Browse files Browse the repository at this point in the history
  • Loading branch information
thehesiod authored and asvetlov committed Apr 27, 2018
1 parent c8483a0 commit 03d590e
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGES/2937.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Speed up connector limiting
46 changes: 31 additions & 15 deletions aiohttp/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import sys
import traceback
import warnings
from collections import defaultdict
from collections import defaultdict, deque
from contextlib import suppress
from http.cookies import SimpleCookie
from itertools import cycle, islice
Expand Down Expand Up @@ -181,7 +181,9 @@ def __init__(self, *, keepalive_timeout=sentinel,
self._acquired_per_host = defaultdict(set)
self._keepalive_timeout = keepalive_timeout
self._force_close = force_close
self._waiters = defaultdict(list)

# {host_key: FIFO list of waiters}
self._waiters = defaultdict(deque)

self._loop = loop
self._factory = functools.partial(ResponseHandler, loop=loop)
Expand Down Expand Up @@ -392,12 +394,19 @@ async def connect(self, req, traces=None):

try:
await fut
finally:
# remove a waiter even if it was cancelled
waiters.remove(fut)
except BaseException:
# remove a waiter even if it was cancelled, normally it's
# removed when it's notified
try:
waiters.remove(fut)
except ValueError: # fut may no longer be in list
pass

if not waiters:
del self._waiters[key]

raise

if traces:
for trace in traces:
await trace.send_connection_queued_end()
Expand All @@ -423,10 +432,8 @@ async def connect(self, req, traces=None):
except BaseException:
# signal to waiter
if key in self._waiters:
for waiter in self._waiters[key]:
if not waiter.done():
waiter.set_result(None)
break
waiters = self._waiters[key]
self._release_key_waiter(key, waiters)
raise
finally:
if not self._closed:
Expand Down Expand Up @@ -470,25 +477,34 @@ def _get(self, key):
del self._conns[key]
return None

def _release_key_waiter(self, key, waiters):
if not waiters:
return False

waiter = waiters.popleft()
if not waiter.done():
waiter.set_result(None)

if not waiters:
del self._waiters[key]

return True

def _release_waiter(self):
# always release only one waiter

if self._limit:
# if we have limit and we have available
if self._limit - len(self._acquired) > 0:
for key, waiters in self._waiters.items():
if waiters:
if not waiters[0].done():
waiters[0].set_result(None)
if self._release_key_waiter(key, waiters):
break

elif self._limit_per_host:
# if we have dont have limit but have limit per host
# then release first available
for key, waiters in self._waiters.items():
if waiters:
if not waiters[0].done():
waiters[0].set_result(None)
if self._release_key_waiter(key, waiters):
break

def _release_acquired(self, key, proto):
Expand Down
48 changes: 38 additions & 10 deletions tests/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import socket
import ssl
import uuid
from collections import deque
from unittest import mock

import pytest
Expand Down Expand Up @@ -40,8 +41,8 @@ def ssl_key():


@pytest.fixture
def unix_sockname(tmpdir):
sock_path = tmpdir / 'socket.sock'
def unix_sockname(shorttmpdir):
sock_path = shorttmpdir / 'socket.sock'
return str(sock_path)


Expand Down Expand Up @@ -90,7 +91,7 @@ async def test_del_with_scheduled_cleanup(loop):
loop.set_debug(True)
conn = aiohttp.BaseConnector(loop=loop, keepalive_timeout=0.01)
transp = mock.Mock()
conn._conns['a'] = [(transp, 'proto', 123)]
conn._conns['a'] = [(transp, 123)]

conns_impl = conn._conns
exc_handler = mock.Mock()
Expand All @@ -115,7 +116,7 @@ async def test_del_with_scheduled_cleanup(loop):
def test_del_with_closed_loop(loop):
conn = aiohttp.BaseConnector(loop=loop)
transp = mock.Mock()
conn._conns['a'] = [(transp, 'proto', 123)]
conn._conns['a'] = [(transp, 123)]

conns_impl = conn._conns
exc_handler = mock.Mock()
Expand Down Expand Up @@ -319,7 +320,7 @@ def test_release_waiter(loop, key, key2):
w1, w2 = mock.Mock(), mock.Mock()
w1.done.return_value = False
w2.done.return_value = False
conn._waiters[key] = [w1, w2]
conn._waiters[key] = deque([w1, w2])
conn._release_waiter()
assert w1.set_result.called
assert not w2.set_result.called
Expand All @@ -330,7 +331,7 @@ def test_release_waiter(loop, key, key2):
w1, w2 = mock.Mock(), mock.Mock()
w1.done.return_value = True
w2.done.return_value = False
conn._waiters[key] = [w1, w2]
conn._waiters[key] = deque([w1, w2])
conn._release_waiter()
assert not w1.set_result.called
assert not w2.set_result.called
Expand All @@ -343,8 +344,8 @@ def test_release_waiter_per_host(loop, key, key2):
w1, w2 = mock.Mock(), mock.Mock()
w1.done.return_value = False
w2.done.return_value = False
conn._waiters[key] = [w1]
conn._waiters[key2] = [w2]
conn._waiters[key] = deque([w1])
conn._waiters[key2] = deque([w2])
conn._release_waiter()
assert ((w1.set_result.called and not w2.set_result.called) or
(not w1.set_result.called and w2.set_result.called))
Expand Down Expand Up @@ -960,7 +961,9 @@ async def test_connect_tracing(loop):
conn._create_connection.return_value = loop.create_future()
conn._create_connection.return_value.set_result(proto)

await conn.connect(req, traces=traces)
conn2 = await conn.connect(req, traces=traces)
conn2.release()

on_connection_create_start.assert_called_with(
session,
trace_config_ctx,
Expand Down Expand Up @@ -1411,7 +1414,8 @@ async def test_connect_reuseconn_tracing(loop, key):

conn = aiohttp.BaseConnector(loop=loop, limit=1)
conn._conns[key] = [(proto, loop.time())]
await conn.connect(req, traces=traces)
conn2 = await conn.connect(req, traces=traces)
conn2.release()

on_connection_reuseconn.assert_called_with(
session,
Expand Down Expand Up @@ -1736,6 +1740,29 @@ async def create_connection(req, traces=None):
assert ret._key == 'key'
assert ret.protocol == proto
assert proto in conn._acquired
ret.release()


async def test_cancelled_waiter(loop):
conn = aiohttp.BaseConnector(limit=1, loop=loop)
req = mock.Mock()
req.connection_key = 'key'
proto = mock.Mock()

async def create_connection(req, traces=None):
await asyncio.sleep(1)
return proto

conn._create_connection = create_connection

conn._acquired.add(proto)

conn2 = loop.create_task(conn.connect(req))
await asyncio.sleep(0, loop=loop)
conn2.cancel()

with pytest.raises(asyncio.CancelledError):
await conn2


async def test_error_on_connection_with_cancelled_waiter(loop):
Expand Down Expand Up @@ -1785,6 +1812,7 @@ async def create_connection(req, traces=None):
assert ret._key == 'key'
assert ret.protocol == proto
assert proto in conn._acquired
ret.release()


async def test_tcp_connector(aiohttp_client, loop):
Expand Down

0 comments on commit 03d590e

Please sign in to comment.