diff --git a/momoko/connection.py b/momoko/connection.py index 2fe25f2..bbeb15c 100644 --- a/momoko/connection.py +++ b/momoko/connection.py @@ -71,6 +71,11 @@ def add_dead(self, conn): self.pending.discard(conn) self.dead.add(conn) + # If everything is dead, abort anything pending. + if not self.pending: + self.abort_waiting_queue(Pool.DatabaseNotAvailable( + "No database connection available")) + def acquire(self): """Occupy free connection""" future = Future() @@ -84,6 +89,10 @@ def acquire(self): log.debug("No free connections, and some are busy - put in waiting queue") self.waiting_queue.appendleft(future) return future + elif self.pending: + log.debug("No free connections, but some are pending - put in waiting queue") + self.waiting_queue.appendleft(future) + return future else: log.debug("All connections are dead") return None @@ -115,7 +124,7 @@ def shrink(self, target_size, delay_in_seconds): @property def all_dead(self): - return not (self.free or self.busy) + return not (self.free or self.busy or self.waiting_queue) @property def total(self): @@ -295,8 +304,6 @@ def getconn(self, ping=True): future = rv else: # Else, all connections are dead - assert len(self.conns.pending) == 0, "BUG! should be no pending connection" - future = Future() def on_reanimate_done(fut): diff --git a/tests.py b/tests.py index 37d91ae..26e2e3b 100644 --- a/tests.py +++ b/tests.py @@ -812,9 +812,71 @@ def test_abort_waiting_queue(self): pass self.assertEqual(len(db.conns.waiting_queue), 0) + @gen_test + def test_execute_can_start_before_connection_is_done(self): + db = momoko.Pool(dsn=self.good_dsn, size=1, ioloop=self.io_loop) + db.connect() + cursor = yield db.execute("SELECT 1") + self.assertEqual(cursor.fetchone()[0], 1) + + @gen_test + def test_execute_before_connection_is_done_will_error(self): + db = momoko.Pool(dsn=bad_dsn, size=1, ioloop=self.io_loop) + db.connect() + + try: + yield db.execute("SELECT 1") + + # This likely won't get hit; instead the test will timeout on + # failure. + self.fail("Exception should have been raised") + except psycopg2.DatabaseError: + pass + class MomokoPoolVolatileDbTestProxy(ProxyMixIn, MomokoPoolVolatileDbTest): - pass + + @gen_test + def test_execute_can_wait_for_connection_after_disconnect(self): + db = yield self.build_pool(dsn=self.good_dsn, size=1) + + f1 = db.execute("SELECT 1") + self.terminate_proxy() + try: + yield f1 + except psycopg2.DatabaseError: + pass + self.start_proxy() + yield gen.sleep(db.reconnect_interval) + f2 = db.execute("SELECT 1") + f3 = db.execute("SELECT 1") + + cursors = yield [f2, f3] + self.assertEqual(cursors[0].fetchone()[0], 1) + self.assertEqual(cursors[1].fetchone()[0], 1) + + @gen_test + def test_execute_can_fail_after_disconnect_with_no_reconnect(self): + db = yield self.build_pool(dsn=self.good_dsn, size=1) + + f1 = db.execute("SELECT 1") + self.terminate_proxy() + try: + yield f1 + except psycopg2.DatabaseError: + pass + + # No start proxy here! + yield gen.sleep(db.reconnect_interval) + f2 = db.execute("SELECT 1") + f3 = db.execute("SELECT 1") + + try: + yield [f2, f3] + self.fail("Exception should have been raised") + except psycopg2.DatabaseError: + pass + self.assertEqual(len(db.conns.waiting_queue), 0) class MomokoPoolPartiallyConnectedTest(PoolBaseTest):