Skip to content
This repository has been archived by the owner on Sep 24, 2022. It is now read-only.

Commit

Permalink
Fixed serious flaw with connection retrials
Browse files Browse the repository at this point in the history
There was bug in reconnect logic. Retries were only performed
if operation attempt failes synchrnously. However typically
this is not the case. The most obvouis case for reconnects
is when database server restarts. However client does not detects
that server went await up until `connection.poll()`. I.e.
`cursor.execute` finishes successfully.

Unfortunately in such cases Momoko simply progated the error to
the caller, practically rendering reconnection code useless.

The bug existed primarily to the fact, that I did not find a good
and simple way to simulate sever going away and coming back in my
unitests. Now I've integrated third-party tcproxy package that
I launch and restart when needed from unittest code. So finally,
I have all the tools to fix and verify all reconnection logic.
  • Loading branch information
haizaar committed Aug 26, 2015
1 parent 78175bc commit 85183f5
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 44 deletions.
1 change: 1 addition & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ Testing
Set the following environment variables with your own values before running the
unit tests::

make -C tcpproxy
export MOMOKO_TEST_DB='your_db'
export MOMOKO_TEST_USER='your_user'
export MOMOKO_TEST_PASSWORD='your_password'
Expand Down
10 changes: 8 additions & 2 deletions docs/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ can also be used by setting the ``MOMOKO_PSYCOPG2_IMPL`` environment variable to
# 'psycopg2' or 'psycopg2cffi'
export MOMOKO_PSYCOPG2_IMPL='psycopg2cffi'

The unit tests als use this variable. It needs to be set if something else is used
The unit tests all use this variable. It needs to be set if something else is used
instead of Psycopg2 when running the unit tests. Besides ``MOMOKO_PSYCOPG2_IMPL``
there are also other variables that need to be set for the unit tests.

Expand All @@ -37,11 +37,17 @@ Here's an example for the environment variables::
# Set to '0' if hstore extension isn't enabled
export MOMOKO_TEST_HSTORE='1' # Default: 0

And running the tests is easy::
Momoko tests use tcproxy_ for simulating Postgres server unavailablity. The copy
of tcproxy is bundled with Momoko, but you need to build it first::

make -C tcproxy

Finally, running the tests is easy::

python setup.py test


.. _tcproxy: https://github.com/dccmx/tcproxy
.. _psycopg2cffi: http://pypi.python.org/pypi/psycopg2cffi
.. _Tornado: http://www.tornadoweb.org/
.. _Psycopg2: http://initd.org/psycopg/
Expand Down
71 changes: 48 additions & 23 deletions momoko/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class Pool(object):
:param list setsession:
List of intial sql commands to be executed once connection is established.
If any of the commands failes, the connection will be closed.
If any of the commands fails, the connection will be closed.
**NOTE:** The commands will be executed as one transaction block.
:param bool auto_shrink:
Expand All @@ -195,6 +195,10 @@ class Pool(object):
.. _psycopg2.extensions.connection: http://initd.org/psycopg/docs/connection.html#connection
.. _Connection and cursor factories: http://initd.org/psycopg/docs/advanced.html#subclassing-cursor
"""

class DatabaseNotAvailable(psycopg2.DatabaseError):
"""Raised when Pool can not connect to database server"""

def __init__(self,
dsn,
connection_factory=None,
Expand Down Expand Up @@ -232,7 +236,7 @@ def __init__(self,
self.conns = ConnectionContainer()

self._last_connect_time = 0
self._no_conn_availble_error = psycopg2.DatabaseError("No database connection available")
self._no_conn_availble_error = self.DatabaseNotAvailable("No database connection available")
self.shrink_period = shrink_period
self.shrink_delay = shrink_delay
self.auto_shrink = auto_shrink
Expand Down Expand Up @@ -440,36 +444,35 @@ def when_available(fut):
conn = fut.result()
except psycopg2.Error as error:
future.set_exc_info(sys.exc_info())
if retry:
if retry and not keep:
self.putconn(retry[0])
return

log.debug("Obtained connection: %s", conn.fileno)
try:
future_or_result = method(conn, *args, **kwargs)
except psycopg2.Error as error:
if conn.closed:
if not retry:
retry.append(conn)
self.ioloop.add_future(conn.connect(), when_available)
return
else:
future.set_exception(self._no_conn_availble_error)
else:
future.set_exc_info(sys.exc_info())
log.debug(2)
self.putconn(conn)
return
log.debug("Method failed synchronously")
return self._retry(retry, when_available, conn, keep, future)

if not async:
if not keep:
self.putconn(conn)
future.set_result(future_or_result)
log.debug(3)
self.putconn(conn)
return

chain_future(future_or_result, future)
if not keep:
future.add_done_callback(lambda f: self.putconn(conn))
def when_done(rfut):
try:
result = rfut.result()
except psycopg2.Error as error:
log.debug("Method failed Asynchronously")
return self._retry(retry, when_available, conn, keep, future)

if not keep:
self.putconn(conn)
future.set_result(result)

self.ioloop.add_future(future_or_result, when_done)

if not connection:
self.ioloop.add_future(self.getconn(ping=False), when_available)
Expand All @@ -479,6 +482,20 @@ def when_available(fut):
when_available(f)
return future

def _retry(self, retry, what, conn, keep, future):
if conn.closed:
if not retry:
retry.append(conn)
self.ioloop.add_future(conn.connect(), what)
return
else:
future.set_exception(self._no_conn_availble_error)
else:
future.set_exc_info(sys.exc_info())
if not keep:
self.putconn(conn)
return

def _reanimate(self):
assert self.conns.dead, "BUG: don't call reanimate when there is no one to reanimate"

Expand Down Expand Up @@ -601,7 +618,7 @@ class Connection(object):
:param list setsession:
List of intial sql commands to be executed once connection is established.
If any of the commands failes, the connection will be closed.
If any of the commands fails, the connection will be closed.
**NOTE:** The commands will be executed as one transaction block.
.. _Data Source Name: http://en.wikipedia.org/wiki/Data_Source_Name
Expand Down Expand Up @@ -639,6 +656,7 @@ def connect(self):
try:
self.connection = psycopg2.connect(self.dsn, **kwargs)
except psycopg2.Error as error:
self.connection = None
future.set_exc_info(sys.exc_info())
return future

Expand All @@ -657,6 +675,7 @@ def on_connect(on_connect_future):

self.ioloop.add_handler(self.fileno, callback, IOLoop.WRITE)
self.ioloop.add_future(future, self._set_server_version)
self.ioloop.add_future(future, self._close_on_fail)

return future

Expand All @@ -665,6 +684,12 @@ def _set_server_version(self, future):
return
self.server_version = self.connection.server_version

def _close_on_fail(self, future):
# If connection attempt evetually fails - marks connection as closed by ourselves
# since psycopg2 does not do that for us (on connection attempts)
if future.exception():
self.connection = None

def _io_callback(self, future, result, fd=None, events=None):
try:
state = self.connection.poll()
Expand All @@ -680,7 +705,7 @@ def _io_callback(self, future, result, fd=None, events=None):
elif state == POLL_WRITE:
self.ioloop.update_handler(self.fileno, IOLoop.WRITE)
else:
future.set_exception(psycopg2.OperationalError('poll() returned {0}'.format(state)))
future.set_exception(psycopg2.OperationalError("poll() returned %s" % state))

def ping(self):
"""
Expand Down Expand Up @@ -800,7 +825,7 @@ def transaction(self,
The class returned must be a subclass of `psycopg2.extensions.cursor`_.
See `Connection and cursor factories`_ for details. Defaults to ``None``.
:param bool auto_rollback:
If one of the transaction statements failes, try to automatically
If one of the transaction statements fails, try to automatically
execute ROLLBACK to abort the transaction. If ROLLBACK fails, it would
not be raised, but only logged.
Expand Down
Loading

0 comments on commit 85183f5

Please sign in to comment.