Skip to content
This repository has been archived by the owner on Sep 24, 2022. It is now read-only.

Wait for a pending connection when acquiring #122

Merged
merged 1 commit into from
Oct 13, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions momoko/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ def add_dead(self, conn):
self.pending.discard(conn)
self.dead.add(conn)

# If everything is dead, abort anything pending.
if not self.pending:
self.abort_waiting_queue(Pool.DatabaseNotAvailable(
"No database connection available"))

def acquire(self):
"""Occupy free connection"""
future = Future()
Expand All @@ -84,6 +89,10 @@ def acquire(self):
log.debug("No free connections, and some are busy - put in waiting queue")
self.waiting_queue.appendleft(future)
return future
elif self.pending:
log.debug("No free connections, but some are pending - put in waiting queue")
self.waiting_queue.appendleft(future)
return future
else:
log.debug("All connections are dead")
return None
Expand Down Expand Up @@ -115,7 +124,7 @@ def shrink(self, target_size, delay_in_seconds):

@property
def all_dead(self):
return not (self.free or self.busy)
return not (self.free or self.busy or self.waiting_queue)

@property
def total(self):
Expand Down Expand Up @@ -295,8 +304,6 @@ def getconn(self, ping=True):
future = rv
else:
# Else, all connections are dead
assert len(self.conns.pending) == 0, "BUG! should be no pending connection"

future = Future()

def on_reanimate_done(fut):
Expand Down
64 changes: 63 additions & 1 deletion tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -812,9 +812,71 @@ def test_abort_waiting_queue(self):
pass
self.assertEqual(len(db.conns.waiting_queue), 0)

@gen_test
def test_execute_can_start_before_connection_is_done(self):
db = momoko.Pool(dsn=self.good_dsn, size=1, ioloop=self.io_loop)
db.connect()
cursor = yield db.execute("SELECT 1")
self.assertEqual(cursor.fetchone()[0], 1)

@gen_test
def test_execute_before_connection_is_done_will_error(self):
db = momoko.Pool(dsn=bad_dsn, size=1, ioloop=self.io_loop)
db.connect()

try:
yield db.execute("SELECT 1")

# This likely won't get hit; instead the test will timeout on
# failure.
self.fail("Exception should have been raised")
except psycopg2.DatabaseError:
pass


class MomokoPoolVolatileDbTestProxy(ProxyMixIn, MomokoPoolVolatileDbTest):
pass

@gen_test
def test_execute_can_wait_for_connection_after_disconnect(self):
db = yield self.build_pool(dsn=self.good_dsn, size=1)

f1 = db.execute("SELECT 1")
self.terminate_proxy()
try:
yield f1
except psycopg2.DatabaseError:
pass
self.start_proxy()
yield gen.sleep(db.reconnect_interval)
f2 = db.execute("SELECT 1")
f3 = db.execute("SELECT 1")

cursors = yield [f2, f3]
self.assertEqual(cursors[0].fetchone()[0], 1)
self.assertEqual(cursors[1].fetchone()[0], 1)

@gen_test
def test_execute_can_fail_after_disconnect_with_no_reconnect(self):
db = yield self.build_pool(dsn=self.good_dsn, size=1)

f1 = db.execute("SELECT 1")
self.terminate_proxy()
try:
yield f1
except psycopg2.DatabaseError:
pass

# No start proxy here!
yield gen.sleep(db.reconnect_interval)
f2 = db.execute("SELECT 1")
f3 = db.execute("SELECT 1")

try:
yield [f2, f3]
self.fail("Exception should have been raised")
except psycopg2.DatabaseError:
pass
self.assertEqual(len(db.conns.waiting_queue), 0)


class MomokoPoolPartiallyConnectedTest(PoolBaseTest):
Expand Down