-
Notifications
You must be signed in to change notification settings - Fork 158
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make engine.acquire() reentrant #128
Comments
@asvetlov What do you think about this new feature? |
how would you implement this feature without introducing some sort of global registry bounded to current task? |
Hi, this idea can be more harmful then helpful, async def in_real(event_loop):
async with engine.acquire() as conn1:
async with conn1.begin() as tr:
await conn.execute("INSERT....")
await test_nested_acquire(conn1)
async def test_nested_acquire(outer_conn):
async with engine.acquire() as conn2:
assert outer_conn is conn2
await conn2.execute("SELECT 1/0;") Outer transaction will be corrupted. Implementing this (re-entrant acquire) will require writing complex code checking many possible states of connections / task / etc and it will be hard to test (to instrument all possible usages and task states) |
@popravich the transaction will be errored and no further sql statements will be allowed before the begin block exits. I think this is just the way it should be. Maybe it is better if we add a |
@jettify I would keep this registry somewhere near the |
What I'm trying to say is that its an anti-pattern -- when you're acquiring connection you expect |
@popravich the issue is: accessing the current connection for the current asyncio task inside a function without passing it as an argument I think it is quite a common problem. See for instance the aiohttp chat demo views here each If in the future you need to add another query, somewhere in your views, you have the following options:
If you go for 2. you must be very very brave: If some of the code you call inside your So, either we detect reentrant |
In any way, if I have acquired the connection from the pool for serving the request, I have to be sure that I am using the same connection for all calls to the database in the context of the processing with the current request. Although it may be implemented as said @mpaolini by storing the current connection in a task-local variable, but it seems to me it is a quite common problem. |
I am going to check what django and pymongo have in place. Will be back to you soon |
@mpaolini wrote "If you go for 2. you must be very very brave: If some of the code you call inside your acquire bock does another acquire, than you have a lurking deadlock, that might block the entire application forever until kill. " It is a very, very right note. We can very easily block the entire application forever until kill, and we have got something like I show in #126, then the entire application has been blocked forever |
ok sharing the first finding for Tthe low-level In other words, there is no public way of acquiring a "socket" from the and use it. Every time you call a If we were to copy from them, aiopg should not expose its the test script follows import time
import threading
import logging
from pymongo import MongoClient
SLEEP_TIME = 0.1
logger = logging.getLogger(__name__)
def do_query(client, sleep):
client.test.stuff.find_one({'test': 1})
time.sleep(sleep)
do_query_inner(client, sleep)
def do_query_inner(client, sleep):
client.test.stuff_2.find_one({'test': 1})
time.sleep(sleep)
def get_socket_nested(client, sleep):
with client._socket_for_reads(None):
logger.debug('acquired outer for task {}'.format(threading.current_thread().name))
time.sleep(sleep)
with client._socket_for_reads(None):
logger.debug('acquired inner for task {}'.format(threading.current_thread().name))
time.sleep(sleep)
def main(concurrency, iterations, pool_size, sleep, nested):
client = MongoClient(maxPoolSize=pool_size)
for i in range(iterations):
threads = []
for it in range(concurrency):
thread = threading.Thread(
target=get_socket_nested if nested else do_query,
args=[client, sleep],
name='{}-{}'.format(i, it),
daemon=True
)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--concurrency', type=int, default=1)
parser.add_argument('-i', '--iterations', type=int, default=10)
parser.add_argument('-n', '--nested', action='store_true')
parser.add_argument('-p', '--pool-size', type=int, default=10)
parser.add_argument('-s', '--sleep', type=float, default=0.1)
args = parser.parse_args()
logging.basicConfig(level=logging.DEBUG, format='%(funcName)s %(lineno)d: %(message)s')
main(args.concurrency, args.iterations, args.pool_size, args.sleep, args.nested) |
This is only possible if you're using global variables (engine) otherwise you need to pass either engine (obtained from |
But it does. |
django situation: django uses a global var where it keeps all open connections as thread locals. Cursors are reentrant (I think) and all share the same connection. Connection pool implementation is very basic (supports basically a single connection per thread) and is completely hidden from the public API. As in |
@popravich I never suggested we mantain a global var in |
Yep, that is the source of the problem -- "letting the user keep a global var engine" ) |
Agree with @popravich. From other side aiopg is used also by tornado On Mon, Jul 18, 2016, 20:56 serg666 [email protected] wrote:
|
Ok, so you suggest acquiring the connection once and pass it all the way down the call stack, making sure no code down the code stack ever acquires a new But that brings us to the very subject of this issue: in case your code is deep within another framework like in this example, and you can't control the arguments to a function, you (as aiopg) are forcing the user to implement a global variable holding the connection, right? I'm fine with the solution, (a won'tfix) but then let's at least issue a warning when a user is acquiring a connection in a nested manner, or at least we could write it in the docs. |
So, should I use approach described by @mpaolini ?
but in this case we can always got deadlock if inner code make engine.acquire() call |
Simple answer is yes,
In case of tornado RequestHandler provides class ProfileHandler(RequestHandler):
def initialize(self, database):
self.database = database
def get(self, username):
...
app = Application([
(r'/user/(.*)', ProfileHandler, dict(database=database)),
]) replace |
this line do not work, since tornado do not know anything about |
And how I should access connection in framework like FormEncode in this example #127 ? |
@jettify definitely, this was just an example, and by no means a full implemententation |
it works as long as you are able to pass down the connection object |
So, the only way is acquire new connection inside FormEncode validator witch leads to deadlock |
Or I can set pool size as long as possible :-) |
My reasoning goes like this:
If we all agree on 1. and 2. let's move forward to the... Proposed solutions:
|
You'll need a bit of work:
|
... and still you will be suffering deadlocks when doing |
ok, not |
But then some inner code (not my code) call acquire it lead to deadlock in any way) |
I'm +1 for 4a and 3 and -1 for 2 I'm not against 1 but implementation can be very complex and it will need a lot of tests |
I have always seen the If we now need to pass the connection itself, we cannot have singletons anymore. Are you sure @popravich this is the right way to go? |
This is very hard to pass connection to all places I need. In my example I need connection inside
|
@mpaolini you can use middleware for that: async def middleware_factory(app, handler):
async def middleware_handler(request):
engine = app['engine']
async with engine.acquire() as conn1:
request['connection'] = conn1
resp = await handler(request)
return resp
return middleware_handler
async def my_handler(request):
conn = request['connection']
return web.Response(body=b'text') |
There are no request object in FormEncode validator |
Why not changing converter? class TransactionID(FancyValidator):
class converter(int):
@property
async def transaction(self, conn):
res = await conn.execute(m.transactions.select().
where(m.transactions.c.id == self))
trans = await res.fetchone()
if trans is None:
raise exceptions.InvalidTransaction(self)
return trans
class ReqHandler(tornado.web.RequestHandler):
def initialize(self, engine):
self.engine = engine
async def post(self):
async with self.engine.acquire() as conn:
transaction_id = self.BODY.get('transaction')
transaction = await transaction_id.transaction(conn) |
Guys, it seems to me you are trying to solve my custom problem, but I think this is a quite common problem, as @mpaolini said |
@mpaolini @serg666 sorry guys, I support @popravich and @jettify |
Not an option. Connection is a state-full object. You cannot open transaction in one task, change the state and re-enter into clean state for the same connection from other task. It works for stateless connections like mongodb and redis without MULTI statements but for RDBMs servers have a very rich state.
Good option but it should be solved by high-level libraries like ORM built on top of aiopg (not aiopg.sa).
You don't know the context. The code may rely on task but calling
is totaly correct. There is no way to check is aquiring leads to deadlock or not.
This is the best what we can do. Any volunteer?
I support this way. If some library doesn't support passing connection explicitly the library cannot be used with aiopg, sorry. But I believe for any concrete case there is a solution.
No, not an option. Let's never use global objects at all. It's the easiest way to make a mess and break the Universe. |
OK let's go for 4. then, document the proper use of the From the aiohttp demos we see there is a singleton This demo app does not trigger the deadlock, so copying from it we could say:
|
@asvetlov I don't entirely agree with this point. Using global objects is not only useful but mandatory for using aiopg correctly. The connection pool itself is only useful when shared by the whole app when running and thus used as a global kind of instance. The deadlock issue we are discussing happens exacly for this reason: we want a connection pool shared by all asyncio tasks in the running app. Having this singleton |
@popravich in order to correctly use the And yes, I agree that's the source of the problem, because having serialized access to shared resources is difficult and might lead to deadlocks. I just want to point out that the connection pool is a shared resource by design and you have to keep it a shared to be useful |
Ok I have bothered you guys enugh. Let me know it the main points I proposed are agreed upon, and I will try to cook a patch do the docs in the next few days to even things out ;) |
@mpaolini I'll rewrite aiohttp demo to reflect your notes and provide better storage for db pool ( |
Let me emphasize -- "Shared resource" != "global variable" |
should I close this one and create a new issue "Document how to correctly use connection pool" ? |
Yes, please do. @mpaolini would you make a PR for aiopg doc update? |
Closing this for issue #129 |
currently acquiring a connection from the pool in a nested manner is deadlock-prone. See #113 #126 and #127
Sometimes (e.g when implementing middlewares or forms validators, or any other feature plugged in existing frameworks) is just difficult to pass the
connection
instance down the call stack.I propose a new feature where
engine.acquire()
returns the same already acquired connection if called again inside aengine.acquire()
context by the same asyncio task.this would mean this test would pass
The text was updated successfully, but these errors were encountered: