From 85183f5370181f75a29e876f5211d99c40b4ba5e Mon Sep 17 00:00:00 2001 From: Zaar Hai Date: Wed, 26 Aug 2015 21:39:56 +0300 Subject: [PATCH] Fixed serious flaw with connection retrials There was bug in reconnect logic. Retries were only performed if operation attempt failes synchrnously. However typically this is not the case. The most obvouis case for reconnects is when database server restarts. However client does not detects that server went await up until `connection.poll()`. I.e. `cursor.execute` finishes successfully. Unfortunately in such cases Momoko simply progated the error to the caller, practically rendering reconnection code useless. The bug existed primarily to the fact, that I did not find a good and simple way to simulate sever going away and coming back in my unitests. Now I've integrated third-party tcproxy package that I launch and restart when needed from unittest code. So finally, I have all the tools to fix and verify all reconnection logic. --- README.rst | 1 + docs/installation.rst | 10 +++- momoko/connection.py | 71 ++++++++++++++++--------- tests.py | 117 +++++++++++++++++++++++++++++++++++------- 4 files changed, 155 insertions(+), 44 deletions(-) diff --git a/README.rst b/README.rst index ffe4943..17ff8a7 100644 --- a/README.rst +++ b/README.rst @@ -40,6 +40,7 @@ Testing Set the following environment variables with your own values before running the unit tests:: + make -C tcpproxy export MOMOKO_TEST_DB='your_db' export MOMOKO_TEST_USER='your_user' export MOMOKO_TEST_PASSWORD='your_password' diff --git a/docs/installation.rst b/docs/installation.rst index ee4e039..aaf2a40 100644 --- a/docs/installation.rst +++ b/docs/installation.rst @@ -22,7 +22,7 @@ can also be used by setting the ``MOMOKO_PSYCOPG2_IMPL`` environment variable to # 'psycopg2' or 'psycopg2cffi' export MOMOKO_PSYCOPG2_IMPL='psycopg2cffi' -The unit tests als use this variable. It needs to be set if something else is used +The unit tests all use this variable. It needs to be set if something else is used instead of Psycopg2 when running the unit tests. Besides ``MOMOKO_PSYCOPG2_IMPL`` there are also other variables that need to be set for the unit tests. @@ -37,11 +37,17 @@ Here's an example for the environment variables:: # Set to '0' if hstore extension isn't enabled export MOMOKO_TEST_HSTORE='1' # Default: 0 -And running the tests is easy:: +Momoko tests use tcproxy_ for simulating Postgres server unavailablity. The copy +of tcproxy is bundled with Momoko, but you need to build it first:: + + make -C tcproxy + +Finally, running the tests is easy:: python setup.py test +.. _tcproxy: https://github.com/dccmx/tcproxy .. _psycopg2cffi: http://pypi.python.org/pypi/psycopg2cffi .. _Tornado: http://www.tornadoweb.org/ .. _Psycopg2: http://initd.org/psycopg/ diff --git a/momoko/connection.py b/momoko/connection.py index 6598eb4..2fe25f2 100644 --- a/momoko/connection.py +++ b/momoko/connection.py @@ -173,7 +173,7 @@ class Pool(object): :param list setsession: List of intial sql commands to be executed once connection is established. - If any of the commands failes, the connection will be closed. + If any of the commands fails, the connection will be closed. **NOTE:** The commands will be executed as one transaction block. :param bool auto_shrink: @@ -195,6 +195,10 @@ class Pool(object): .. _psycopg2.extensions.connection: http://initd.org/psycopg/docs/connection.html#connection .. _Connection and cursor factories: http://initd.org/psycopg/docs/advanced.html#subclassing-cursor """ + + class DatabaseNotAvailable(psycopg2.DatabaseError): + """Raised when Pool can not connect to database server""" + def __init__(self, dsn, connection_factory=None, @@ -232,7 +236,7 @@ def __init__(self, self.conns = ConnectionContainer() self._last_connect_time = 0 - self._no_conn_availble_error = psycopg2.DatabaseError("No database connection available") + self._no_conn_availble_error = self.DatabaseNotAvailable("No database connection available") self.shrink_period = shrink_period self.shrink_delay = shrink_delay self.auto_shrink = auto_shrink @@ -440,7 +444,7 @@ def when_available(fut): conn = fut.result() except psycopg2.Error as error: future.set_exc_info(sys.exc_info()) - if retry: + if retry and not keep: self.putconn(retry[0]) return @@ -448,28 +452,27 @@ def when_available(fut): try: future_or_result = method(conn, *args, **kwargs) except psycopg2.Error as error: - if conn.closed: - if not retry: - retry.append(conn) - self.ioloop.add_future(conn.connect(), when_available) - return - else: - future.set_exception(self._no_conn_availble_error) - else: - future.set_exc_info(sys.exc_info()) - log.debug(2) - self.putconn(conn) - return + log.debug("Method failed synchronously") + return self._retry(retry, when_available, conn, keep, future) if not async: + if not keep: + self.putconn(conn) future.set_result(future_or_result) - log.debug(3) - self.putconn(conn) return - chain_future(future_or_result, future) - if not keep: - future.add_done_callback(lambda f: self.putconn(conn)) + def when_done(rfut): + try: + result = rfut.result() + except psycopg2.Error as error: + log.debug("Method failed Asynchronously") + return self._retry(retry, when_available, conn, keep, future) + + if not keep: + self.putconn(conn) + future.set_result(result) + + self.ioloop.add_future(future_or_result, when_done) if not connection: self.ioloop.add_future(self.getconn(ping=False), when_available) @@ -479,6 +482,20 @@ def when_available(fut): when_available(f) return future + def _retry(self, retry, what, conn, keep, future): + if conn.closed: + if not retry: + retry.append(conn) + self.ioloop.add_future(conn.connect(), what) + return + else: + future.set_exception(self._no_conn_availble_error) + else: + future.set_exc_info(sys.exc_info()) + if not keep: + self.putconn(conn) + return + def _reanimate(self): assert self.conns.dead, "BUG: don't call reanimate when there is no one to reanimate" @@ -601,7 +618,7 @@ class Connection(object): :param list setsession: List of intial sql commands to be executed once connection is established. - If any of the commands failes, the connection will be closed. + If any of the commands fails, the connection will be closed. **NOTE:** The commands will be executed as one transaction block. .. _Data Source Name: http://en.wikipedia.org/wiki/Data_Source_Name @@ -639,6 +656,7 @@ def connect(self): try: self.connection = psycopg2.connect(self.dsn, **kwargs) except psycopg2.Error as error: + self.connection = None future.set_exc_info(sys.exc_info()) return future @@ -657,6 +675,7 @@ def on_connect(on_connect_future): self.ioloop.add_handler(self.fileno, callback, IOLoop.WRITE) self.ioloop.add_future(future, self._set_server_version) + self.ioloop.add_future(future, self._close_on_fail) return future @@ -665,6 +684,12 @@ def _set_server_version(self, future): return self.server_version = self.connection.server_version + def _close_on_fail(self, future): + # If connection attempt evetually fails - marks connection as closed by ourselves + # since psycopg2 does not do that for us (on connection attempts) + if future.exception(): + self.connection = None + def _io_callback(self, future, result, fd=None, events=None): try: state = self.connection.poll() @@ -680,7 +705,7 @@ def _io_callback(self, future, result, fd=None, events=None): elif state == POLL_WRITE: self.ioloop.update_handler(self.fileno, IOLoop.WRITE) else: - future.set_exception(psycopg2.OperationalError('poll() returned {0}'.format(state))) + future.set_exception(psycopg2.OperationalError("poll() returned %s" % state)) def ping(self): """ @@ -800,7 +825,7 @@ def transaction(self, The class returned must be a subclass of `psycopg2.extensions.cursor`_. See `Connection and cursor factories`_ for details. Defaults to ``None``. :param bool auto_rollback: - If one of the transaction statements failes, try to automatically + If one of the transaction statements fails, try to automatically execute ROLLBACK to abort the transaction. If ROLLBACK fails, it would not be raised, but only logged. diff --git a/tests.py b/tests.py index f1a36ff..39a3773 100644 --- a/tests.py +++ b/tests.py @@ -10,7 +10,9 @@ import logging import datetime import threading +import socket from tornado.concurrent import Future +import subprocess from tornado import gen from tornado.testing import unittest, AsyncTestCase, gen_test @@ -27,14 +29,19 @@ db_password = os.environ.get('MOMOKO_TEST_PASSWORD', '') db_host = os.environ.get('MOMOKO_TEST_HOST', '') db_port = os.environ.get('MOMOKO_TEST_PORT', 5432) +db_proxy_port = os.environ.get('MOMOKO_TEST_PROXY_PORT', 15432) test_hstore = True if os.environ.get('MOMOKO_TEST_HSTORE', False) == '1' else False good_dsn = 'dbname=%s user=%s password=%s host=%s port=%s' % ( db_database, db_user, db_password, db_host, db_port) +good_proxy_dsn = 'dbname=%s user=%s password=%s host=%s port=%s' % ( + db_database, db_user, db_password, db_host, db_proxy_port) bad_dsn = 'dbname=%s user=%s password=xx%s host=%s port=%s' % ( 'db', 'user', 'password', "127.0.0.127", 11111) local_bad_dsn = 'dbname=%s user=%s password=xx%s' % ( 'db', 'user', 'password') +TCPPROXY_PATH = os.environ.get('MOMOKO_TCPPROXY_PATH', "./tcproxy/src/tcproxy") + assert (db_database or db_user or db_password or db_host or db_port) is not None, ( 'Environment variables for the unit tests are not set. Please set the following ' 'variables: MOMOKO_TEST_DB, MOMOKO_TEST_USER, MOMOKO_TEST_PASSWORD, ' @@ -61,6 +68,7 @@ class BaseTest(AsyncTestCase): dsn = good_dsn + good_dsn = good_dsn # This is a hack to overcome lack of "yield from" in Python < 3.3. # The goal is to support several set_up methods in inheriatnace chain @@ -342,6 +350,8 @@ def test_ping_with_named_cursor(self): """Test whether Connection.ping works fine with named cursors. Issue #74""" conn = yield momoko.connect(self.dsn, ioloop=self.io_loop, cursor_factory=RealDictCursor) yield conn.ping() + + # # Pool tests # @@ -388,6 +398,13 @@ def close_connections(self, db, amount=None): conn.close() amount -= 1 + shutter = close_connections + + def total_close(self, db): + self.shutter(db) + for conn in db.conns.busy.union(db.conns.free).union(db.conns.dead): + conn.dsn = bad_dsn + @gen_test def run_and_check_query(self, db): cursor = yield db.execute("SELECT 6, 19, 24;") @@ -406,6 +423,34 @@ class PoolBaseDataTest(PoolBaseTest, BaseDataTest): pass +class ProxyMixIn(object): + dsn = good_proxy_dsn + good_dsn = good_proxy_dsn + + def start_proxy(self): + # Dirty way to make sure there are no proxies leftovers + subprocess.call(("killall", TCPPROXY_PATH), stderr=open("/dev/null", "w")) + + proxy_conf = "127.0.0.1:%s -> %s:%s" % (db_proxy_port, db_host, db_port) + self.proxy = subprocess.Popen((TCPPROXY_PATH, proxy_conf,)) + time.sleep(0.1) + + def terminate_proxy(self): + self.proxy.terminate() + + def kill_connections(self, db, amount=None): + self.terminate_proxy() + self.start_proxy() + + def set_up_00(self): + self.start_proxy() + + def tear_down_00(self): + self.terminate_proxy() + + shutter = kill_connections + + class MomokoPoolTest(PoolBaseTest): @gen_test def test_connect(self): @@ -462,11 +507,11 @@ def test_mogrify_error(self): @gen_test def test_transaction_with_reconnect(self): - """Test whether transaction works after reconnect""" + """Test whether transaction works after connections were closed""" # Added result counting, since there was a bug in retry mechanism that caused # double-execution of query after reconnect - self.close_connections(self.db) + self.shutter(self.db) yield self.db.transaction(("INSERT INTO unit_test_int_table VALUES (1)",)) cursor = yield self.db.execute("SELECT COUNT(1) FROM unit_test_int_table") self.assertEqual(cursor.fetchall(), [(1,)]) @@ -484,10 +529,10 @@ def test_getconn_putconn(self): @gen_test def test_getconn_putconn_with_reconnect(self): - """Testing getconn/putconn functionality with reconnect""" + """Testing getconn/putconn functionality with reconnect after closing connections""" for i in range(self.pool_size * 5): # Run many times to check that connections get recycled properly - self.close_connections(self.db) + self.shutter(self.db) conn = yield self.db.getconn() for j in range(10): cursor = yield conn.execute("SELECT %s", (j,)) @@ -496,9 +541,21 @@ def test_getconn_putconn_with_reconnect(self): @gen_test def test_getconn_manage(self): - """Testing getcontest_getconn_putconn_with_reconnectn + context manager functionality""" + """Testing getcontest_getconn_putconn + context manager functionality""" + for i in range(self.pool_size * 5): + # Run many times to check that connections get recycled properly + conn = yield self.db.getconn() + with self.db.manage(conn): + for j in range(10): + cursor = yield conn.execute("SELECT %s", (j,)) + self.assertEqual(cursor.fetchall(), [(j, )]) + + @gen_test + def test_getconn_manage_with_reconnect(self): + """Testing getcontest_getconn_putconn_with_reconnect + context manager functionality""" for i in range(self.pool_size * 5): # Run many times to check that connections get recycled properly + self.shutter(self.db) conn = yield self.db.getconn() with self.db.manage(conn): for j in range(10): @@ -508,7 +565,7 @@ def test_getconn_manage(self): @gen_test def test_getconn_manage_with_exception(self): """Testing getconn + context manager functionality + deliberate exception""" - self.close_connections(self.db) + self.shutter(self.db) conn = yield self.db.getconn(ping=False) with self.db.manage(conn): try: @@ -518,6 +575,10 @@ def test_getconn_manage_with_exception(self): self.assertEqual(len(self.db.conns.busy), 0, msg="Some connections were not recycled") +class MomokoPoolDataTestProxy(ProxyMixIn, MomokoPoolDataTest): + pass + + class MomokoPoolServerSideCursorTest(PoolBaseDataTest): @gen_test def test_server_side_cursor(self): @@ -601,15 +662,19 @@ def test_request_queueing(self): def test_parallel_queries_after_reconnect_all(self): """Testing that pool still queries database in parallel after ALL connections were closeded""" - self.close_connections(self.db) + self.shutter(self.db) self.run_parallel_queries() def test_parallel_queries_after_reconnect_some(self): """Testing that pool still queries database in parallel after SOME connections were closed""" - self.close_connections(self.db, amount=self.pool_size/2) + self.shutter(self.db, amount=self.pool_size/2) self.run_parallel_queries() +class MomokoPoolParallelTestProxy(ProxyMixIn, MomokoPoolParallelTest): + pass + + class MomokoPoolStretchTest(MomokoPoolParallelTest): pool_size = 1 max_size = 5 @@ -629,12 +694,12 @@ def test_dont_stretch(self): def test_dont_stretch_after_reconnect(self): """Testing that reconnecting dead connection does not trigger pool stretch""" - self.close_connections(self.db) + self.shutter(self.db) self.test_dont_stretch() def test_stretch_after_disonnect(self): """Testing that stretch works after disconnect""" - self.close_connections(self.db) + self.shutter(self.db) self.test_parallel_queries() @gen_test @@ -669,6 +734,10 @@ def test_stretch_genconn(self): self.assertEqual(self.db.conns.total, 3) +class MomokoPoolStretchTestProxy(ProxyMixIn, MomokoPoolStretchTest): + pass + + class MomokoPoolVolatileDbTest(PoolBaseTest): pool_size = 3 @@ -686,8 +755,9 @@ def test_startup_local(self): def test_reconnect(self): """Testing if we can reconnect if connections dies""" - db = self.build_pool_sync(dsn=good_dsn) - self.close_connections(db) + db = self.build_pool_sync(dsn=self.good_dsn) + log.debug("Killing connections") + self.shutter(db) self.run_and_check_query(db) def test_reconnect_interval_good_path(self): @@ -710,21 +780,26 @@ def test_reconnect_interval_bad_path(self): except psycopg2.DatabaseError: pass + @gen_test + def test_ping_error(self): + """Test that getconn uses ping properly to detect database unavailablity""" + db = yield self.build_pool(dsn=self.good_dsn, size=3) + self.total_close(db) + try: + yield db.getconn() + except db.DatabaseNotAvailable as err: + pass + @gen_test def test_abort_waiting_queue(self): """Testing that waiting queue is aborted properly when all connections are dead""" - db = yield self.build_pool(dsn=good_dsn, size=1) + db = yield self.build_pool(dsn=self.good_dsn, size=1) f1 = db.execute("SELECT 1") f2 = db.execute("SELECT 1") self.assertEqual(len(db.conns.waiting_queue), 1) - def total_close(f): - self.close_connections(db) - for conn in db.conns.dead: - conn.dsn = bad_dsn - - f1.add_done_callback(total_close) + f1.add_done_callback(lambda f: self.total_close(db)) try: yield [f1, f2] @@ -733,6 +808,10 @@ def total_close(f): self.assertEqual(len(db.conns.waiting_queue), 0) +class MomokoPoolVolatileDbTestProxy(ProxyMixIn, MomokoPoolVolatileDbTest): + pass + + class MomokoPoolPartiallyConnectedTest(PoolBaseTest): raise_connect_errors = True pool_size = 3