Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

speed up connector limiting #2937

Merged
merged 24 commits into from
Apr 27, 2018
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/2937.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Speed up connector limiting
52 changes: 35 additions & 17 deletions aiohttp/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import sys
import traceback
import warnings
from collections import defaultdict
from collections import defaultdict, deque
from contextlib import suppress
from http.cookies import SimpleCookie
from itertools import cycle, islice
Expand Down Expand Up @@ -181,7 +181,11 @@ def __init__(self, *, keepalive_timeout=sentinel,
self._acquired_per_host = defaultdict(set)
self._keepalive_timeout = keepalive_timeout
self._force_close = force_close
self._waiters = defaultdict(list)

# {host_key: FIFO list of waiters}
# NOTE: this is not a true FIFO because the true order is lost amongst
# the dictionary keys
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest using the deque [1] data strucutre that is the one used by al of the implementations of asyncio.locks. More likely because lists have the following constraint:

lists incur O(n) memory movement costs for pop(0)

[1] https://github.com/python/cpython/blob/master/Lib/asyncio/locks.py#L437

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great idea, done, slight speed improvement :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that this can be removed, the FIFO is for each dictionary key. Different keys mean different tuple values of (hosts, port) which FIFO does not make sense.

Copy link
Contributor Author

@thehesiod thehesiod Apr 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

each deque is indeed a FIFO as the first one in will be the first one out (amongst items in that list), however across keys it's not a FIFO because it currently iterates across keys (which theoretically can be in any order) when choosing from which FIFO to release.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which deque to release is not a random choice and its based on the hash of the host and port, so those connections that are waiting for a free connection and match the host and port will share the same deque in a FIFO way.

Yes we are saying the same, the dictionary is just the structure that keeps all of the FIFO queues.

Let's save the comments for what is really not understandable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't say it was random, I said it wasn't a true FIFO queue because it's choosing which queue to release a connector from in dictionary key order, and not in FIFO order. Anyways, removed the comment and people will have to figure this out themselves now. If this were to be "correct" there would need to be a separate priority queue with pointers back to these queues....or perhaps a multiindex priority queue :)

self._waiters = defaultdict(deque)

self._loop = loop
self._factory = functools.partial(ResponseHandler, loop=loop)
Expand Down Expand Up @@ -392,11 +396,18 @@ async def connect(self, req, traces=None):

try:
await fut
finally:
# remove a waiter even if it was cancelled
waiters.remove(fut)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is slow because it's trying to find a future in a list for each request that's waiting on a connector

if not waiters:
del self._waiters[key]
except BaseException:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don't understand why the deletion is moved from finally to except.
Why shouldn't we remove the waiter if no exceptions was raised?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@asvetlov this is for two reasons: performance, and that if no exception is thrown the removal happened by the release method.

# remove a waiter even if it was cancelled, normally it's
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the future has been canceled, we do need to wake up another waiter. Take a look at the semaphore implementation [1]

[1] https://github.com/python/cpython/blob/master/Lib/asyncio/locks.py#L478

Copy link
Contributor Author

@thehesiod thehesiod Apr 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method created the future, why would we need to wake up another waiter? That doesn't make sense as it would imply yet another connector is available. This is 1-1, one waiter was added, one removed. Also note that code is if the future was not cancelled, in this scenario it can only be cancelled

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My mistake, the wake up will be done automariaclly by the exit of the context manager in whatever scenario. So forget about this

# removed when it's notified
try:
waiters.remove(fut)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just checked and there is no remove method in defaultdict.


if not waiters:
del self._waiters[key]
except ValueError: # fut may no longer be in list
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be careful with that, in case a ValueError exception you will mask the original one

try:
    val = 1 / 0
except Exception:
    try:
        a = {}
        a['foo']
    except KeyError:
        pass
    raise

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @pfreixes , try to scope what you are catching as much as possible.

This is much better imo:

try:
    ...

    if not waiters:
        try:
            del self._waiters[key]
        except ValueError:
            pass

    ...

except:
    ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see what you're saying here, waiters.remove(fut) throws the ValueError, del self._waiters[key] could throw a KeyError, not another ValueError. Not going to change this unless it's really needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Read my first comment, unless you make it explicit with a raise from e, the second try/except masks the original exception in case of ValueError.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotchya, thanks


raise

if traces:
for trace in traces:
Expand All @@ -423,10 +434,8 @@ async def connect(self, req, traces=None):
except BaseException:
# signal to waiter
if key in self._waiters:
for waiter in self._waiters[key]:
if not waiter.done():
waiter.set_result(None)
break
waiters = self._waiters[key]
self._release_key_waiter(key, waiters)
raise
finally:
if not self._closed:
Expand Down Expand Up @@ -470,25 +479,34 @@ def _get(self, key):
del self._conns[key]
return None

def _release_key_waiter(self, key, waiters):
if not waiters:
return False

waiter = waiters.popleft()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Umm, you have to persist till reaches a waiter not done or you run out of waiters, see the implementation of CPython [1].

[1] https://github.com/python/cpython/blob/master/Lib/asyncio/locks.py#L450

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you saying the old implementation was wrong: https://github.com/aio-libs/aiohttp/pull/2937/files#diff-7f25afde79f309e2f8722c26cf1f10adL481 ? I don't believe this is the case. There are two ways a waiter can be removed:

  1. An exception happened while waiting (in exception handler)
    1. a release was dispatched for said waiter (someone will see a release)
  2. through this method

What you describe would create a double release for 1.i. This is in fact the scenario you before alluded to

Copy link
Contributor

@pfreixes pfreixes Apr 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the issue was already there, indeed I can see the following issues with the code that we have in master:

  • The iteration till reach a none canceled waiter has to be done through all of the items of a list, right now is only done on top of the head of each list.
  • Each time that we try to release a waiter we have to calculate if the limit and the number of concurrent connections allows us to make it. This is done only in one when the release_waiter is called explicitly but not when we had an exception trying to make the connection.
  • The limit per host, TBH, i would say that is not well calculated.

So we have to fix them, but true that they were already there and would be nice if we decouple both things.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya I have a feeling this is the tip of the iceberg :) I have a strong suspicious there's a leak in aiohttp or something aiohttp uses as right now we're leaking ~40MB / week in prod

if not waiter.done():
waiter.set_result(None)

if not waiters:
del self._waiters[key]

return True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't do anything with the result, don't implement a result at all, it's like dead code. This is just a small remark

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

? result is used in _release_waiter

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see sry


def _release_waiter(self):
# always release only one waiter

if self._limit:
# if we have limit and we have available
if self._limit - len(self._acquired) > 0:
for key, waiters in self._waiters.items():
if waiters:
if not waiters[0].done():
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with the new model we can guarantee that there are only active waiters in the list so we can greatly simplify this to always popping and notifying the first item in the list

waiters[0].set_result(None)
if self._release_key_waiter(key, waiters):
break

elif self._limit_per_host:
# if we have dont have limit but have limit per host
# then release first available
for key, waiters in self._waiters.items():
if waiters:
if not waiters[0].done():
waiters[0].set_result(None)
if self._release_key_waiter(key, waiters):
break

def _release_acquired(self, key, proto):
Expand Down
48 changes: 38 additions & 10 deletions tests/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import socket
import ssl
import uuid
from collections import deque
from unittest import mock

import pytest
Expand Down Expand Up @@ -40,8 +41,8 @@ def ssl_key():


@pytest.fixture
def unix_sockname(tmpdir):
sock_path = tmpdir / 'socket.sock'
def unix_sockname(shorttmpdir):
sock_path = shorttmpdir / 'socket.sock'
return str(sock_path)


Expand Down Expand Up @@ -90,7 +91,7 @@ async def test_del_with_scheduled_cleanup(loop):
loop.set_debug(True)
conn = aiohttp.BaseConnector(loop=loop, keepalive_timeout=0.01)
transp = mock.Mock()
conn._conns['a'] = [(transp, 'proto', 123)]
conn._conns['a'] = [(transp, 123)]

conns_impl = conn._conns
exc_handler = mock.Mock()
Expand All @@ -115,7 +116,7 @@ async def test_del_with_scheduled_cleanup(loop):
def test_del_with_closed_loop(loop):
conn = aiohttp.BaseConnector(loop=loop)
transp = mock.Mock()
conn._conns['a'] = [(transp, 'proto', 123)]
conn._conns['a'] = [(transp, 123)]

conns_impl = conn._conns
exc_handler = mock.Mock()
Expand Down Expand Up @@ -319,7 +320,7 @@ def test_release_waiter(loop, key, key2):
w1, w2 = mock.Mock(), mock.Mock()
w1.done.return_value = False
w2.done.return_value = False
conn._waiters[key] = [w1, w2]
conn._waiters[key] = deque([w1, w2])
conn._release_waiter()
assert w1.set_result.called
assert not w2.set_result.called
Expand All @@ -330,7 +331,7 @@ def test_release_waiter(loop, key, key2):
w1, w2 = mock.Mock(), mock.Mock()
w1.done.return_value = True
w2.done.return_value = False
conn._waiters[key] = [w1, w2]
conn._waiters[key] = deque([w1, w2])
conn._release_waiter()
assert not w1.set_result.called
assert not w2.set_result.called
Expand All @@ -343,8 +344,8 @@ def test_release_waiter_per_host(loop, key, key2):
w1, w2 = mock.Mock(), mock.Mock()
w1.done.return_value = False
w2.done.return_value = False
conn._waiters[key] = [w1]
conn._waiters[key2] = [w2]
conn._waiters[key] = deque([w1])
conn._waiters[key2] = deque([w2])
conn._release_waiter()
assert ((w1.set_result.called and not w2.set_result.called) or
(not w1.set_result.called and w2.set_result.called))
Expand Down Expand Up @@ -960,7 +961,9 @@ async def test_connect_tracing(loop):
conn._create_connection.return_value = loop.create_future()
conn._create_connection.return_value.set_result(proto)

await conn.connect(req, traces=traces)
conn2 = await conn.connect(req, traces=traces)
conn2.release()

on_connection_create_start.assert_called_with(
session,
trace_config_ctx,
Expand Down Expand Up @@ -1411,7 +1414,8 @@ async def test_connect_reuseconn_tracing(loop, key):

conn = aiohttp.BaseConnector(loop=loop, limit=1)
conn._conns[key] = [(proto, loop.time())]
await conn.connect(req, traces=traces)
conn2 = await conn.connect(req, traces=traces)
conn2.release()

on_connection_reuseconn.assert_called_with(
session,
Expand Down Expand Up @@ -1736,6 +1740,29 @@ async def create_connection(req, traces=None):
assert ret._key == 'key'
assert ret.protocol == proto
assert proto in conn._acquired
ret.release()


async def test_cancelled_waiter(loop):
conn = aiohttp.BaseConnector(limit=1, loop=loop)
req = mock.Mock()
req.connection_key = 'key'
proto = mock.Mock()

async def create_connection(req, traces=None):
await asyncio.sleep(1)
return proto

conn._create_connection = create_connection

conn._acquired.add(proto)

conn2 = loop.create_task(conn.connect(req))
await asyncio.sleep(0, loop=loop)
conn2.cancel()

with pytest.raises(asyncio.CancelledError):
await conn2


async def test_error_on_connection_with_cancelled_waiter(loop):
Expand Down Expand Up @@ -1785,6 +1812,7 @@ async def create_connection(req, traces=None):
assert ret._key == 'key'
assert ret.protocol == proto
assert proto in conn._acquired
ret.release()


async def test_tcp_connector(aiohttp_client, loop):
Expand Down