From a0391c254f18c643ff65ee5086ae8248eec2aef7 Mon Sep 17 00:00:00 2001 From: James Bowes Date: Sun, 9 Aug 2015 12:58:40 -0300 Subject: [PATCH 1/4] Wait for a pending connection when acquiring There are two cases when a caller might ask for a connection when the only available ones are in a pending state: - Upon initialization, if the caller hasn't waited for `connect()` to complete. - After all connections to the database are lost, and they are reconnecting. Instead of immediately returning an error to the caller, add them to the waiting queue, allowing them to either get a connection, or return an error if none of the pending connections succeed. --- momoko/connection.py | 13 ++++++--- tests.py | 64 +++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 73 insertions(+), 4 deletions(-) diff --git a/momoko/connection.py b/momoko/connection.py index 2fe25f2..bbeb15c 100644 --- a/momoko/connection.py +++ b/momoko/connection.py @@ -71,6 +71,11 @@ def add_dead(self, conn): self.pending.discard(conn) self.dead.add(conn) + # If everything is dead, abort anything pending. + if not self.pending: + self.abort_waiting_queue(Pool.DatabaseNotAvailable( + "No database connection available")) + def acquire(self): """Occupy free connection""" future = Future() @@ -84,6 +89,10 @@ def acquire(self): log.debug("No free connections, and some are busy - put in waiting queue") self.waiting_queue.appendleft(future) return future + elif self.pending: + log.debug("No free connections, but some are pending - put in waiting queue") + self.waiting_queue.appendleft(future) + return future else: log.debug("All connections are dead") return None @@ -115,7 +124,7 @@ def shrink(self, target_size, delay_in_seconds): @property def all_dead(self): - return not (self.free or self.busy) + return not (self.free or self.busy or self.waiting_queue) @property def total(self): @@ -295,8 +304,6 @@ def getconn(self, ping=True): future = rv else: # Else, all connections are dead - assert len(self.conns.pending) == 0, "BUG! should be no pending connection" - future = Future() def on_reanimate_done(fut): diff --git a/tests.py b/tests.py index 37d91ae..26e2e3b 100644 --- a/tests.py +++ b/tests.py @@ -812,9 +812,71 @@ def test_abort_waiting_queue(self): pass self.assertEqual(len(db.conns.waiting_queue), 0) + @gen_test + def test_execute_can_start_before_connection_is_done(self): + db = momoko.Pool(dsn=self.good_dsn, size=1, ioloop=self.io_loop) + db.connect() + cursor = yield db.execute("SELECT 1") + self.assertEqual(cursor.fetchone()[0], 1) + + @gen_test + def test_execute_before_connection_is_done_will_error(self): + db = momoko.Pool(dsn=bad_dsn, size=1, ioloop=self.io_loop) + db.connect() + + try: + yield db.execute("SELECT 1") + + # This likely won't get hit; instead the test will timeout on + # failure. + self.fail("Exception should have been raised") + except psycopg2.DatabaseError: + pass + class MomokoPoolVolatileDbTestProxy(ProxyMixIn, MomokoPoolVolatileDbTest): - pass + + @gen_test + def test_execute_can_wait_for_connection_after_disconnect(self): + db = yield self.build_pool(dsn=self.good_dsn, size=1) + + f1 = db.execute("SELECT 1") + self.terminate_proxy() + try: + yield f1 + except psycopg2.DatabaseError: + pass + self.start_proxy() + yield gen.sleep(db.reconnect_interval) + f2 = db.execute("SELECT 1") + f3 = db.execute("SELECT 1") + + cursors = yield [f2, f3] + self.assertEqual(cursors[0].fetchone()[0], 1) + self.assertEqual(cursors[1].fetchone()[0], 1) + + @gen_test + def test_execute_can_fail_after_disconnect_with_no_reconnect(self): + db = yield self.build_pool(dsn=self.good_dsn, size=1) + + f1 = db.execute("SELECT 1") + self.terminate_proxy() + try: + yield f1 + except psycopg2.DatabaseError: + pass + + # No start proxy here! + yield gen.sleep(db.reconnect_interval) + f2 = db.execute("SELECT 1") + f3 = db.execute("SELECT 1") + + try: + yield [f2, f3] + self.fail("Exception should have been raised") + except psycopg2.DatabaseError: + pass + self.assertEqual(len(db.conns.waiting_queue), 0) class MomokoPoolPartiallyConnectedTest(PoolBaseTest): From a5e31bf479ce654a2f63601a64008d70aa6fdccf Mon Sep 17 00:00:00 2001 From: Zaar Hai Date: Tue, 13 Oct 2015 11:02:55 +0300 Subject: [PATCH 2/4] Style fix --- momoko/connection.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/momoko/connection.py b/momoko/connection.py index bbeb15c..9e7607c 100644 --- a/momoko/connection.py +++ b/momoko/connection.py @@ -73,8 +73,7 @@ def add_dead(self, conn): # If everything is dead, abort anything pending. if not self.pending: - self.abort_waiting_queue(Pool.DatabaseNotAvailable( - "No database connection available")) + self.abort_waiting_queue(Pool.DatabaseNotAvailable("No database connection available")) def acquire(self): """Occupy free connection""" From aa0d486cfe14084e698d3d5b7c9dee402e329f7c Mon Sep 17 00:00:00 2001 From: Zaar Hai Date: Tue, 13 Oct 2015 11:10:24 +0300 Subject: [PATCH 3/4] Going 2.2.1 --- changelog.rst | 7 +++++++ docs/conf.py | 4 ++-- setup.py | 2 +- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/changelog.rst b/changelog.rst index 6557c1f..1f1bbe5 100644 --- a/changelog.rst +++ b/changelog.rst @@ -1,6 +1,13 @@ Changelog ========= +2.2.1 (2015-10-13) +------------------ +* Wait for pending connections during connection acquiring (`issue 121`_). Thanks to jbowes_. + +.. _issue 121: https://github.com/FSX/momoko/issues/121 +.. _jbowes: https://github.com/jbowes + 2.2.0 (2015-09-20) ------------------ * Fixed serious flaw with connection retrials. `More details`_. diff --git a/docs/conf.py b/docs/conf.py index 1d93810..78dd77e 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -49,9 +49,9 @@ # built documents. # # The short X.Y version. -version = '2.2.0' +version = '2.2.1' # The full version, including alpha/beta/rc tags. -release = '2.2.0' +release = '2.2.1' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/setup.py b/setup.py index cf967a2..ead7814 100644 --- a/setup.py +++ b/setup.py @@ -34,7 +34,7 @@ setup( name='Momoko', - version='2.2.0', + version='2.2.1', description="Momoko wraps Psycopg2's functionality for use in Tornado.", long_description=open('README.rst').read(), author='Frank Smit & Zaar Hai', From 83ff0d0f492dbebfe0cac4d63726cccfe51f978f Mon Sep 17 00:00:00 2001 From: Zaar Hai Date: Tue, 13 Oct 2015 11:12:49 +0300 Subject: [PATCH 4/4] Fixed wrong issue reference --- changelog.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/changelog.rst b/changelog.rst index 1f1bbe5..98eeeda 100644 --- a/changelog.rst +++ b/changelog.rst @@ -3,9 +3,9 @@ Changelog 2.2.1 (2015-10-13) ------------------ -* Wait for pending connections during connection acquiring (`issue 121`_). Thanks to jbowes_. +* Wait for pending connections during connection acquiring (`issue 122`_). Thanks to jbowes_. -.. _issue 121: https://github.com/FSX/momoko/issues/121 +.. _issue 122: https://github.com/FSX/momoko/issues/122 .. _jbowes: https://github.com/jbowes 2.2.0 (2015-09-20)