-
-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
speed up connector limiting #2937
Changes from 15 commits
a647b47
dd56884
5d681f6
2b90858
b16fafe
d78d618
f8d1b60
438528c
5bff7b8
6ecd39d
de6043d
8ab132d
ebdbe9f
0a594bd
bdff085
e48fe4a
9494cb2
f157307
cf93f57
37dd8ac
9cc1b73
dd03363
5143a12
66333b5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,7 +3,7 @@ | |
import sys | ||
import traceback | ||
import warnings | ||
from collections import defaultdict | ||
from collections import defaultdict, deque | ||
from contextlib import suppress | ||
from http.cookies import SimpleCookie | ||
from itertools import cycle, islice | ||
|
@@ -181,7 +181,11 @@ def __init__(self, *, keepalive_timeout=sentinel, | |
self._acquired_per_host = defaultdict(set) | ||
self._keepalive_timeout = keepalive_timeout | ||
self._force_close = force_close | ||
self._waiters = defaultdict(list) | ||
|
||
# {host_key: FIFO list of waiters} | ||
# NOTE: this is not a true FIFO because the true order is lost amongst | ||
# the dictionary keys | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess that this can be removed, the FIFO is for each dictionary key. Different keys mean different tuple values of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. each deque is indeed a FIFO as the first one in will be the first one out (amongst items in that list), however across keys it's not a FIFO because it currently iterates across keys (which theoretically can be in any order) when choosing from which FIFO to release. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Which deque to release is not a random choice and its based on the hash of the host and port, so those connections that are waiting for a free connection and match the host and port will share the same deque in a FIFO way. Yes we are saying the same, the dictionary is just the structure that keeps all of the FIFO queues. Let's save the comments for what is really not understandable. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't say it was random, I said it wasn't a true FIFO queue because it's choosing which queue to release a connector from in dictionary key order, and not in FIFO order. Anyways, removed the comment and people will have to figure this out themselves now. If this were to be "correct" there would need to be a separate priority queue with pointers back to these queues....or perhaps a multiindex priority queue :) |
||
self._waiters = defaultdict(deque) | ||
|
||
self._loop = loop | ||
self._factory = functools.partial(ResponseHandler, loop=loop) | ||
|
@@ -392,11 +396,18 @@ async def connect(self, req, traces=None): | |
|
||
try: | ||
await fut | ||
finally: | ||
# remove a waiter even if it was cancelled | ||
waiters.remove(fut) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is slow because it's trying to find a future in a list for each request that's waiting on a connector |
||
if not waiters: | ||
del self._waiters[key] | ||
except BaseException: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, I don't understand why the deletion is moved from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @asvetlov this is for two reasons: performance, and that if no exception is thrown the removal happened by the release method. |
||
# remove a waiter even if it was cancelled, normally it's | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if the future has been canceled, we do need to wake up another waiter. Take a look at the semaphore implementation [1] [1] https://github.com/python/cpython/blob/master/Lib/asyncio/locks.py#L478 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this method created the future, why would we need to wake up another waiter? That doesn't make sense as it would imply yet another connector is available. This is 1-1, one waiter was added, one removed. Also note that code is if the future was not cancelled, in this scenario it can only be cancelled There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My mistake, the wake up will be done automariaclly by the exit of the context manager in whatever scenario. So forget about this |
||
# removed when it's notified | ||
try: | ||
waiters.remove(fut) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just checked and there is no |
||
|
||
if not waiters: | ||
del self._waiters[key] | ||
except ValueError: # fut may no longer be in list | ||
pass | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Be careful with that, in case a try:
val = 1 / 0
except Exception:
try:
a = {}
a['foo']
except KeyError:
pass
raise There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with @pfreixes , try to scope what you are catching as much as possible. This is much better imo: try:
...
if not waiters:
try:
del self._waiters[key]
except ValueError:
pass
...
except:
... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see what you're saying here, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Read my first comment, unless you make it explicit with a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. gotchya, thanks |
||
|
||
raise | ||
|
||
if traces: | ||
for trace in traces: | ||
|
@@ -423,10 +434,8 @@ async def connect(self, req, traces=None): | |
except BaseException: | ||
# signal to waiter | ||
if key in self._waiters: | ||
for waiter in self._waiters[key]: | ||
if not waiter.done(): | ||
waiter.set_result(None) | ||
break | ||
waiters = self._waiters[key] | ||
self._release_key_waiter(key, waiters) | ||
raise | ||
finally: | ||
if not self._closed: | ||
|
@@ -470,25 +479,33 @@ def _get(self, key): | |
del self._conns[key] | ||
return None | ||
|
||
def _release_key_waiter(self, key, waiters): | ||
if not waiters: | ||
return False | ||
|
||
waiter = waiters.popleft() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Umm, you have to persist till reaches a waiter not done or you run out of waiters, see the implementation of CPython [1]. [1] https://github.com/python/cpython/blob/master/Lib/asyncio/locks.py#L450 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are you saying the old implementation was wrong: https://github.com/aio-libs/aiohttp/pull/2937/files#diff-7f25afde79f309e2f8722c26cf1f10adL481 ? I don't believe this is the case. There are two ways a waiter can be removed:
What you describe would create a double release for 1.i. This is in fact the scenario you before alluded to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, the issue was already there, indeed I can see the following issues with the code that we have in master:
So we have to fix them, but true that they were already there and would be nice if we decouple both things. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ya I have a feeling this is the tip of the iceberg :) I have a strong suspicious there's a leak in aiohttp or something aiohttp uses as right now we're leaking ~40MB / week in prod |
||
waiter.set_result(None) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can not guarantee that the element is a none canceled future. Even you are removing from the list here [1]. Once the [1] https://github.com/aio-libs/aiohttp/pull/2937/files#diff-7f25afde79f309e2f8722c26cf1f10adR399 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will mimic this code [1] https://github.com/python/cpython/blob/master/Lib/asyncio/locks.py#L450 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see how this can happen because the async stack ends on the waiter for Think of how this would work, currently no-one sets an exception on this future besides cancelling the waiter at [1], we can verify this by seeing the future is only available from so, if you cancel task at [1], you have two options:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let me know if I'm missing something in my logic, I know it can get complex There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The task will end up once the reactor schedules again the task, the result that is given back to the task depends on the future taht yielded the task before. The cancel is only a signal that wakes up the task with an CanceledError exception. So the task might be still there just waiting for its turn in the reactor/loop. While its happenning you have chances to have the situation that I told tou. Please review the asyncio lock code, all of the different implementations take this into consideration. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks @pfreixes for that example, so it seems it depends on the ordering of the tasks called during cancellation. Here's a question, can you have an outstanding release for a single connector? It seems like you could end up releasing two connectors for a single cancel no if the except clause executes before There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure about the question, but once the race condition is considered and mitigated the code should work as is expected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @pfreixes my "picture" :) import asyncio
loop = asyncio.get_event_loop()
waiters = []
async def task(waiter):
try:
await waiter
except:
print("task finalized")
if waiter in waiters:
print("waiter removed")
try:
waiters.remove(waiter)
except ValueError:
print("Waiter already removed")
else:
print("waiter not present")
raise
def wakeup_next_waiter():
if not waiters:
return
waiter = waiters.pop(0)
try:
if not waiter.done():
waiter.set_result(None)
except Exception as e:
print(f"Exception calling set_result {e!r}")
raise
async def main(loop):
# We add two waiters
waiter = loop.create_future()
waiters.append(waiter)
waiter = loop.create_future()
waiters.append(waiter)
# create the task that will wait till either the waiter
# is finished or the task is cancelled.
t = asyncio.ensure_future(task(waiter))
# make a loop iteration to allow the task reach the
# await waiter
await asyncio.sleep(0)
# put in the loop a callback to wake up the waiter.
loop.call_later(0.1, wakeup_next_waiter)
# cancel the task, this will mark the task as cancelled
# but will be pending a loop iteration to wake up the
# task, having as a result a CanceledError exception.
# This implicitly will schedule the Task._wakeup function
# to be executed in the next loop iteration.
t.cancel()
try:
await t
except asyncio.CancelledError:
pass
# wait for the release to run
await asyncio.sleep(1)
# now we have zero waiters even though only one was cancelled
print(len(waiters))
loop.run_until_complete(main(loop)) note in this example I add two waiters, cancel one, but at the end none are left because I ensure that the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see the issue in the code - be carefull with extra ifs that are not needed in thte The first waiter that is created is removed through the happy path of the Just to put all of us in the same page, the release of a used connection is done automatically by the context manager of a request calling the [1] https://github.com/aio-libs/aiohttp/blob/master/aiohttp/client_reqrep.py#L786 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok I think ya the way to think about this is that there are N waiters and M connectors, while they're represented in |
||
|
||
if not waiters: | ||
del self._waiters[key] | ||
|
||
return True | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you don't do anything with the result, don't implement a result at all, it's like dead code. This is just a small remark There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ? result is used in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah I see sry |
||
|
||
def _release_waiter(self): | ||
# always release only one waiter | ||
|
||
if self._limit: | ||
# if we have limit and we have available | ||
if self._limit - len(self._acquired) > 0: | ||
for key, waiters in self._waiters.items(): | ||
if waiters: | ||
if not waiters[0].done(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. with the new model we can guarantee that there are only active waiters in the list so we can greatly simplify this to always popping and notifying the first item in the list |
||
waiters[0].set_result(None) | ||
if self._release_key_waiter(key, waiters): | ||
break | ||
|
||
elif self._limit_per_host: | ||
# if we have dont have limit but have limit per host | ||
# then release first available | ||
for key, waiters in self._waiters.items(): | ||
if waiters: | ||
if not waiters[0].done(): | ||
waiters[0].set_result(None) | ||
if self._release_key_waiter(key, waiters): | ||
break | ||
|
||
def _release_acquired(self, key, proto): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest using the
deque
[1] data strucutre that is the one used by al of the implementations ofasyncio.locks
. More likely because lists have the following constraint:[1] https://github.com/python/cpython/blob/master/Lib/asyncio/locks.py#L437
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great idea, done, slight speed improvement :)