Skip to content
This repository has been archived by the owner on Sep 24, 2022. It is now read-only.

Commit

Permalink
Merge pull request #122 from jbowes/wait-on-pending
Browse files Browse the repository at this point in the history
Wait for a pending connection when acquiring
  • Loading branch information
haizaar committed Oct 13, 2015
2 parents 193f309 + a0391c2 commit 3671d84
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 4 deletions.
13 changes: 10 additions & 3 deletions momoko/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ def add_dead(self, conn):
self.pending.discard(conn)
self.dead.add(conn)

# If everything is dead, abort anything pending.
if not self.pending:
self.abort_waiting_queue(Pool.DatabaseNotAvailable(
"No database connection available"))

def acquire(self):
"""Occupy free connection"""
future = Future()
Expand All @@ -84,6 +89,10 @@ def acquire(self):
log.debug("No free connections, and some are busy - put in waiting queue")
self.waiting_queue.appendleft(future)
return future
elif self.pending:
log.debug("No free connections, but some are pending - put in waiting queue")
self.waiting_queue.appendleft(future)
return future
else:
log.debug("All connections are dead")
return None
Expand Down Expand Up @@ -115,7 +124,7 @@ def shrink(self, target_size, delay_in_seconds):

@property
def all_dead(self):
return not (self.free or self.busy)
return not (self.free or self.busy or self.waiting_queue)

@property
def total(self):
Expand Down Expand Up @@ -295,8 +304,6 @@ def getconn(self, ping=True):
future = rv
else:
# Else, all connections are dead
assert len(self.conns.pending) == 0, "BUG! should be no pending connection"

future = Future()

def on_reanimate_done(fut):
Expand Down
64 changes: 63 additions & 1 deletion tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -812,9 +812,71 @@ def test_abort_waiting_queue(self):
pass
self.assertEqual(len(db.conns.waiting_queue), 0)

@gen_test
def test_execute_can_start_before_connection_is_done(self):
db = momoko.Pool(dsn=self.good_dsn, size=1, ioloop=self.io_loop)
db.connect()
cursor = yield db.execute("SELECT 1")
self.assertEqual(cursor.fetchone()[0], 1)

@gen_test
def test_execute_before_connection_is_done_will_error(self):
db = momoko.Pool(dsn=bad_dsn, size=1, ioloop=self.io_loop)
db.connect()

try:
yield db.execute("SELECT 1")

# This likely won't get hit; instead the test will timeout on
# failure.
self.fail("Exception should have been raised")
except psycopg2.DatabaseError:
pass


class MomokoPoolVolatileDbTestProxy(ProxyMixIn, MomokoPoolVolatileDbTest):
pass

@gen_test
def test_execute_can_wait_for_connection_after_disconnect(self):
db = yield self.build_pool(dsn=self.good_dsn, size=1)

f1 = db.execute("SELECT 1")
self.terminate_proxy()
try:
yield f1
except psycopg2.DatabaseError:
pass
self.start_proxy()
yield gen.sleep(db.reconnect_interval)
f2 = db.execute("SELECT 1")
f3 = db.execute("SELECT 1")

cursors = yield [f2, f3]
self.assertEqual(cursors[0].fetchone()[0], 1)
self.assertEqual(cursors[1].fetchone()[0], 1)

@gen_test
def test_execute_can_fail_after_disconnect_with_no_reconnect(self):
db = yield self.build_pool(dsn=self.good_dsn, size=1)

f1 = db.execute("SELECT 1")
self.terminate_proxy()
try:
yield f1
except psycopg2.DatabaseError:
pass

# No start proxy here!
yield gen.sleep(db.reconnect_interval)
f2 = db.execute("SELECT 1")
f3 = db.execute("SELECT 1")

try:
yield [f2, f3]
self.fail("Exception should have been raised")
except psycopg2.DatabaseError:
pass
self.assertEqual(len(db.conns.waiting_queue), 0)


class MomokoPoolPartiallyConnectedTest(PoolBaseTest):
Expand Down

0 comments on commit 3671d84

Please sign in to comment.