From a0391c254f18c643ff65ee5086ae8248eec2aef7 Mon Sep 17 00:00:00 2001 From: James Bowes Date: Sun, 9 Aug 2015 12:58:40 -0300 Subject: [PATCH] Wait for a pending connection when acquiring There are two cases when a caller might ask for a connection when the only available ones are in a pending state: - Upon initialization, if the caller hasn't waited for `connect()` to complete. - After all connections to the database are lost, and they are reconnecting. Instead of immediately returning an error to the caller, add them to the waiting queue, allowing them to either get a connection, or return an error if none of the pending connections succeed. --- momoko/connection.py | 13 ++++++--- tests.py | 64 +++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 73 insertions(+), 4 deletions(-) diff --git a/momoko/connection.py b/momoko/connection.py index 2fe25f2..bbeb15c 100644 --- a/momoko/connection.py +++ b/momoko/connection.py @@ -71,6 +71,11 @@ def add_dead(self, conn): self.pending.discard(conn) self.dead.add(conn) + # If everything is dead, abort anything pending. + if not self.pending: + self.abort_waiting_queue(Pool.DatabaseNotAvailable( + "No database connection available")) + def acquire(self): """Occupy free connection""" future = Future() @@ -84,6 +89,10 @@ def acquire(self): log.debug("No free connections, and some are busy - put in waiting queue") self.waiting_queue.appendleft(future) return future + elif self.pending: + log.debug("No free connections, but some are pending - put in waiting queue") + self.waiting_queue.appendleft(future) + return future else: log.debug("All connections are dead") return None @@ -115,7 +124,7 @@ def shrink(self, target_size, delay_in_seconds): @property def all_dead(self): - return not (self.free or self.busy) + return not (self.free or self.busy or self.waiting_queue) @property def total(self): @@ -295,8 +304,6 @@ def getconn(self, ping=True): future = rv else: # Else, all connections are dead - assert len(self.conns.pending) == 0, "BUG! should be no pending connection" - future = Future() def on_reanimate_done(fut): diff --git a/tests.py b/tests.py index 37d91ae..26e2e3b 100644 --- a/tests.py +++ b/tests.py @@ -812,9 +812,71 @@ def test_abort_waiting_queue(self): pass self.assertEqual(len(db.conns.waiting_queue), 0) + @gen_test + def test_execute_can_start_before_connection_is_done(self): + db = momoko.Pool(dsn=self.good_dsn, size=1, ioloop=self.io_loop) + db.connect() + cursor = yield db.execute("SELECT 1") + self.assertEqual(cursor.fetchone()[0], 1) + + @gen_test + def test_execute_before_connection_is_done_will_error(self): + db = momoko.Pool(dsn=bad_dsn, size=1, ioloop=self.io_loop) + db.connect() + + try: + yield db.execute("SELECT 1") + + # This likely won't get hit; instead the test will timeout on + # failure. + self.fail("Exception should have been raised") + except psycopg2.DatabaseError: + pass + class MomokoPoolVolatileDbTestProxy(ProxyMixIn, MomokoPoolVolatileDbTest): - pass + + @gen_test + def test_execute_can_wait_for_connection_after_disconnect(self): + db = yield self.build_pool(dsn=self.good_dsn, size=1) + + f1 = db.execute("SELECT 1") + self.terminate_proxy() + try: + yield f1 + except psycopg2.DatabaseError: + pass + self.start_proxy() + yield gen.sleep(db.reconnect_interval) + f2 = db.execute("SELECT 1") + f3 = db.execute("SELECT 1") + + cursors = yield [f2, f3] + self.assertEqual(cursors[0].fetchone()[0], 1) + self.assertEqual(cursors[1].fetchone()[0], 1) + + @gen_test + def test_execute_can_fail_after_disconnect_with_no_reconnect(self): + db = yield self.build_pool(dsn=self.good_dsn, size=1) + + f1 = db.execute("SELECT 1") + self.terminate_proxy() + try: + yield f1 + except psycopg2.DatabaseError: + pass + + # No start proxy here! + yield gen.sleep(db.reconnect_interval) + f2 = db.execute("SELECT 1") + f3 = db.execute("SELECT 1") + + try: + yield [f2, f3] + self.fail("Exception should have been raised") + except psycopg2.DatabaseError: + pass + self.assertEqual(len(db.conns.waiting_queue), 0) class MomokoPoolPartiallyConnectedTest(PoolBaseTest):