Skip to content
This repository has been archived by the owner on Sep 24, 2022. It is now read-only.

Commit

Permalink
Wait for a pending connection when acquiring
Browse files Browse the repository at this point in the history
There are two cases when a caller might ask for a connection when the
only available ones are in a pending state:
- Upon initialization, if the caller hasn't waited for `connect()` to
  complete.
- After all connections to the database are lost, and they are
  reconnecting.

Instead of immediately returning an error to the caller, add them to the
waiting queue, allowing them to either get a connection, or return an
error if none of the pending connections succeed.
  • Loading branch information
jbowes committed Oct 7, 2015
1 parent 193f309 commit a0391c2
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 4 deletions.
13 changes: 10 additions & 3 deletions momoko/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ def add_dead(self, conn):
self.pending.discard(conn)
self.dead.add(conn)

# If everything is dead, abort anything pending.
if not self.pending:
self.abort_waiting_queue(Pool.DatabaseNotAvailable(
"No database connection available"))

def acquire(self):
"""Occupy free connection"""
future = Future()
Expand All @@ -84,6 +89,10 @@ def acquire(self):
log.debug("No free connections, and some are busy - put in waiting queue")
self.waiting_queue.appendleft(future)
return future
elif self.pending:
log.debug("No free connections, but some are pending - put in waiting queue")
self.waiting_queue.appendleft(future)
return future
else:
log.debug("All connections are dead")
return None
Expand Down Expand Up @@ -115,7 +124,7 @@ def shrink(self, target_size, delay_in_seconds):

@property
def all_dead(self):
return not (self.free or self.busy)
return not (self.free or self.busy or self.waiting_queue)

@property
def total(self):
Expand Down Expand Up @@ -295,8 +304,6 @@ def getconn(self, ping=True):
future = rv
else:
# Else, all connections are dead
assert len(self.conns.pending) == 0, "BUG! should be no pending connection"

future = Future()

def on_reanimate_done(fut):
Expand Down
64 changes: 63 additions & 1 deletion tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -812,9 +812,71 @@ def test_abort_waiting_queue(self):
pass
self.assertEqual(len(db.conns.waiting_queue), 0)

@gen_test
def test_execute_can_start_before_connection_is_done(self):
db = momoko.Pool(dsn=self.good_dsn, size=1, ioloop=self.io_loop)
db.connect()
cursor = yield db.execute("SELECT 1")
self.assertEqual(cursor.fetchone()[0], 1)

@gen_test
def test_execute_before_connection_is_done_will_error(self):
db = momoko.Pool(dsn=bad_dsn, size=1, ioloop=self.io_loop)
db.connect()

try:
yield db.execute("SELECT 1")

# This likely won't get hit; instead the test will timeout on
# failure.
self.fail("Exception should have been raised")
except psycopg2.DatabaseError:
pass


class MomokoPoolVolatileDbTestProxy(ProxyMixIn, MomokoPoolVolatileDbTest):
pass

@gen_test
def test_execute_can_wait_for_connection_after_disconnect(self):
db = yield self.build_pool(dsn=self.good_dsn, size=1)

f1 = db.execute("SELECT 1")
self.terminate_proxy()
try:
yield f1
except psycopg2.DatabaseError:
pass
self.start_proxy()
yield gen.sleep(db.reconnect_interval)
f2 = db.execute("SELECT 1")
f3 = db.execute("SELECT 1")

cursors = yield [f2, f3]
self.assertEqual(cursors[0].fetchone()[0], 1)
self.assertEqual(cursors[1].fetchone()[0], 1)

@gen_test
def test_execute_can_fail_after_disconnect_with_no_reconnect(self):
db = yield self.build_pool(dsn=self.good_dsn, size=1)

f1 = db.execute("SELECT 1")
self.terminate_proxy()
try:
yield f1
except psycopg2.DatabaseError:
pass

# No start proxy here!
yield gen.sleep(db.reconnect_interval)
f2 = db.execute("SELECT 1")
f3 = db.execute("SELECT 1")

try:
yield [f2, f3]
self.fail("Exception should have been raised")
except psycopg2.DatabaseError:
pass
self.assertEqual(len(db.conns.waiting_queue), 0)


class MomokoPoolPartiallyConnectedTest(PoolBaseTest):
Expand Down

0 comments on commit a0391c2

Please sign in to comment.